sentinel_agent_protocol/
lib.rs

1//! Agent protocol for Sentinel proxy
2//!
3//! This crate defines the protocol for communication between the proxy dataplane
4//! and external processing agents (WAF, auth, rate limiting, custom logic).
5//!
6//! The protocol is inspired by SPOE (Stream Processing Offload Engine) and Envoy's ext_proc,
7//! designed for bounded, predictable behavior with strong failure isolation.
8//!
9//! # Architecture
10//!
11//! - [`AgentClient`]: Client for sending events to agents from the proxy
12//! - [`AgentServer`]: Server for implementing agent handlers
13//! - [`AgentHandler`]: Trait for implementing agent logic
14//! - [`AgentResponse`]: Response from agent with decision and mutations
15//!
16//! # Protocol
17//!
18//! Messages are length-prefixed JSON over Unix domain sockets:
19//! - 4-byte big-endian length prefix
20//! - JSON payload (max 10MB)
21//!
22//! # Example: Client Usage
23//!
24//! ```ignore
25//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
26//!
27//! let mut client = AgentClient::unix_socket("my-agent", "/tmp/agent.sock", timeout).await?;
28//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
29//! ```
30//!
31//! # Example: Server Implementation
32//!
33//! ```ignore
34//! use sentinel_agent_protocol::{AgentServer, AgentHandler, AgentResponse};
35//!
36//! struct MyAgent;
37//!
38//! #[async_trait]
39//! impl AgentHandler for MyAgent {
40//!     async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
41//!         // Implement your logic here
42//!         AgentResponse::default_allow()
43//!     }
44//! }
45//!
46//! let server = AgentServer::new("my-agent", "/tmp/agent.sock", Box::new(MyAgent));
47//! server.run().await?;
48//! ```
49
50#![allow(dead_code)]
51
52mod client;
53mod errors;
54mod protocol;
55mod server;
56
57// Re-export error types
58pub use errors::AgentProtocolError;
59
60// Re-export protocol types
61pub use protocol::{
62    AgentRequest, AgentResponse, AuditMetadata, Decision, EventType, HeaderOp,
63    RequestBodyChunkEvent, RequestCompleteEvent, RequestHeadersEvent, RequestMetadata,
64    ResponseBodyChunkEvent, ResponseHeadersEvent, MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
65};
66
67// Re-export client
68pub use client::AgentClient;
69
70// Re-export server and handler
71pub use server::{AgentHandler, AgentServer, DenylistAgent, EchoAgent};
72
73#[cfg(test)]
74mod tests {
75    use super::*;
76    use std::collections::HashMap;
77    use std::time::Duration;
78    use tempfile::tempdir;
79
80    #[tokio::test]
81    async fn test_agent_protocol_echo() {
82        let dir = tempdir().unwrap();
83        let socket_path = dir.path().join("test.sock");
84
85        // Start echo agent server
86        let server = AgentServer::new("test-echo", socket_path.clone(), Box::new(EchoAgent));
87
88        let server_handle = tokio::spawn(async move {
89            server.run().await.unwrap();
90        });
91
92        // Give server time to start
93        tokio::time::sleep(Duration::from_millis(100)).await;
94
95        // Connect client
96        let mut client =
97            AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
98                .await
99                .unwrap();
100
101        // Send request headers event
102        let event = RequestHeadersEvent {
103            metadata: RequestMetadata {
104                correlation_id: "test-123".to_string(),
105                request_id: "req-456".to_string(),
106                client_ip: "127.0.0.1".to_string(),
107                client_port: 12345,
108                server_name: Some("example.com".to_string()),
109                protocol: "HTTP/1.1".to_string(),
110                tls_version: None,
111                tls_cipher: None,
112                route_id: Some("default".to_string()),
113                upstream_id: Some("backend".to_string()),
114                timestamp: chrono::Utc::now().to_rfc3339(),
115            },
116            method: "GET".to_string(),
117            uri: "/test".to_string(),
118            headers: HashMap::new(),
119        };
120
121        let response = client
122            .send_event(EventType::RequestHeaders, &event)
123            .await
124            .unwrap();
125
126        // Check response
127        assert_eq!(response.decision, Decision::Allow);
128        assert_eq!(response.request_headers.len(), 1);
129
130        // Clean up
131        client.close().await.unwrap();
132        server_handle.abort();
133    }
134
135    #[tokio::test]
136    async fn test_agent_protocol_denylist() {
137        let dir = tempdir().unwrap();
138        let socket_path = dir.path().join("denylist.sock");
139
140        // Start denylist agent server
141        let agent = DenylistAgent::new(vec!["/admin".to_string()], vec!["10.0.0.1".to_string()]);
142        let server = AgentServer::new("test-denylist", socket_path.clone(), Box::new(agent));
143
144        let server_handle = tokio::spawn(async move {
145            server.run().await.unwrap();
146        });
147
148        // Give server time to start
149        tokio::time::sleep(Duration::from_millis(100)).await;
150
151        // Connect client
152        let mut client =
153            AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
154                .await
155                .unwrap();
156
157        // Test blocked path
158        let event = RequestHeadersEvent {
159            metadata: RequestMetadata {
160                correlation_id: "test-123".to_string(),
161                request_id: "req-456".to_string(),
162                client_ip: "127.0.0.1".to_string(),
163                client_port: 12345,
164                server_name: Some("example.com".to_string()),
165                protocol: "HTTP/1.1".to_string(),
166                tls_version: None,
167                tls_cipher: None,
168                route_id: Some("default".to_string()),
169                upstream_id: Some("backend".to_string()),
170                timestamp: chrono::Utc::now().to_rfc3339(),
171            },
172            method: "GET".to_string(),
173            uri: "/admin/secret".to_string(),
174            headers: HashMap::new(),
175        };
176
177        let response = client
178            .send_event(EventType::RequestHeaders, &event)
179            .await
180            .unwrap();
181
182        // Check response is blocked
183        match response.decision {
184            Decision::Block { status, .. } => assert_eq!(status, 403),
185            _ => panic!("Expected block decision"),
186        }
187
188        // Clean up
189        client.close().await.unwrap();
190        server_handle.abort();
191    }
192}