sentinel_agent_protocol/
lib.rs

1// Allow large enum variants in generated protobuf code
2#![allow(clippy::large_enum_variant)]
3
4//! Agent protocol for Sentinel proxy
5//!
6//! This crate defines the protocol for communication between the proxy dataplane
7//! and external processing agents (WAF, auth, rate limiting, custom logic).
8//!
9//! The protocol is inspired by SPOE (Stream Processing Offload Engine) and Envoy's ext_proc,
10//! designed for bounded, predictable behavior with strong failure isolation.
11//!
12//! # Architecture
13//!
14//! - [`AgentClient`]: Client for sending events to agents from the proxy
15//! - [`AgentServer`]: Server for implementing agent handlers
16//! - [`AgentHandler`]: Trait for implementing agent logic
17//! - [`AgentResponse`]: Response from agent with decision and mutations
18//!
19//! # Transports
20//!
21//! Two transport options are supported:
22//!
23//! ## Unix Domain Sockets (Default)
24//! Messages are length-prefixed JSON:
25//! - 4-byte big-endian length prefix
26//! - JSON payload (max 10MB)
27//!
28//! ## gRPC
29//! Binary protocol using Protocol Buffers over HTTP/2:
30//! - Better performance for high-throughput scenarios
31//! - Native support for TLS/mTLS
32//! - Language-agnostic (agents can be written in any language with gRPC support)
33//!
34//! # Example: Client Usage (Unix Socket)
35//!
36//! ```ignore
37//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
38//!
39//! let mut client = AgentClient::unix_socket("my-agent", "/tmp/agent.sock", timeout).await?;
40//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
41//! ```
42//!
43//! # Example: Client Usage (gRPC)
44//!
45//! ```ignore
46//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
47//!
48//! let mut client = AgentClient::grpc("my-agent", "http://localhost:50051", timeout).await?;
49//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
50//! ```
51//!
52//! # Example: Client Usage (gRPC with TLS)
53//!
54//! ```ignore
55//! use sentinel_agent_protocol::{AgentClient, GrpcTlsConfig, EventType, RequestHeadersEvent};
56//!
57//! // Simple TLS (server verification only)
58//! let tls_config = GrpcTlsConfig::new()
59//!     .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?;
60//!
61//! let mut client = AgentClient::grpc_tls(
62//!     "my-agent",
63//!     "https://agent.internal:50051",
64//!     timeout,
65//!     tls_config
66//! ).await?;
67//!
68//! // mTLS (mutual authentication)
69//! let tls_config = GrpcTlsConfig::new()
70//!     .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?
71//!     .with_client_cert_files(
72//!         "/etc/sentinel/certs/client.crt",
73//!         "/etc/sentinel/certs/client.key"
74//!     ).await?;
75//!
76//! let mut client = AgentClient::grpc_tls("my-agent", "https://agent.internal:50051", timeout, tls_config).await?;
77//! ```
78//!
79//! # Example: Client Usage (HTTP REST)
80//!
81//! ```ignore
82//! use sentinel_agent_protocol::{AgentClient, HttpTlsConfig, EventType, RequestHeadersEvent};
83//!
84//! // Plain HTTP
85//! let mut client = AgentClient::http("my-agent", "http://localhost:8080/agent", timeout).await?;
86//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
87//!
88//! // HTTPS with TLS
89//! let tls_config = HttpTlsConfig::new()
90//!     .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?;
91//!
92//! let mut client = AgentClient::http_tls(
93//!     "my-agent",
94//!     "https://agent.internal:8443/agent",
95//!     timeout,
96//!     tls_config
97//! ).await?;
98//! ```
99//!
100//! # Example: Server Implementation
101//!
102//! ```ignore
103//! use sentinel_agent_protocol::{AgentServer, AgentHandler, AgentResponse};
104//!
105//! struct MyAgent;
106//!
107//! #[async_trait]
108//! impl AgentHandler for MyAgent {
109//!     async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
110//!         // Implement your logic here
111//!         AgentResponse::default_allow()
112//!     }
113//! }
114//!
115//! let server = AgentServer::new("my-agent", "/tmp/agent.sock", Box::new(MyAgent));
116//! server.run().await?;
117//! ```
118
119#![allow(dead_code)]
120
121pub mod binary;
122pub mod buffer_pool;
123mod client;
124mod errors;
125pub mod headers;
126mod protocol;
127mod server;
128
129/// gRPC protocol definitions generated from proto/agent.proto
130pub mod grpc {
131    tonic::include_proto!("sentinel.agent.v1");
132}
133
134/// Protocol v2 types with bidirectional streaming, capabilities, and flow control
135pub mod v2;
136
137/// gRPC v2 protocol definitions generated from proto/agent_v2.proto
138pub mod grpc_v2 {
139    tonic::include_proto!("sentinel.agent.v2");
140}
141
142// Re-export error types
143pub use errors::AgentProtocolError;
144
145// Re-export protocol types
146pub use protocol::{
147    AgentRequest, AgentResponse, AuditMetadata, BodyMutation, ConfigureEvent, Decision,
148    DetectionSeverity, EventType, GuardrailDetection, GuardrailInspectEvent,
149    GuardrailInspectionType, GuardrailResponse, HeaderOp, RequestBodyChunkEvent,
150    RequestCompleteEvent, RequestHeadersEvent, RequestMetadata, ResponseBodyChunkEvent,
151    ResponseHeadersEvent, TextSpan, WebSocketDecision, WebSocketFrameEvent, WebSocketOpcode,
152    MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
153};
154
155// Re-export client
156pub use client::{AgentClient, GrpcTlsConfig, HttpTlsConfig};
157
158// Re-export server and handler
159pub use server::{
160    AgentHandler, AgentServer, DenylistAgent, EchoAgent, GrpcAgentHandler, GrpcAgentServer,
161};
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use std::collections::HashMap;
167    use std::time::Duration;
168    use tempfile::tempdir;
169
170    #[tokio::test]
171    async fn test_agent_protocol_echo() {
172        let dir = tempdir().unwrap();
173        let socket_path = dir.path().join("test.sock");
174
175        // Start echo agent server
176        let server = AgentServer::new("test-echo", socket_path.clone(), Box::new(EchoAgent));
177
178        let server_handle = tokio::spawn(async move {
179            server.run().await.unwrap();
180        });
181
182        // Give server time to start
183        tokio::time::sleep(Duration::from_millis(100)).await;
184
185        // Connect client
186        let mut client =
187            AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
188                .await
189                .unwrap();
190
191        // Send request headers event
192        let event = RequestHeadersEvent {
193            metadata: RequestMetadata {
194                correlation_id: "test-123".to_string(),
195                request_id: "req-456".to_string(),
196                client_ip: "127.0.0.1".to_string(),
197                client_port: 12345,
198                server_name: Some("example.com".to_string()),
199                protocol: "HTTP/1.1".to_string(),
200                tls_version: None,
201                tls_cipher: None,
202                route_id: Some("default".to_string()),
203                upstream_id: Some("backend".to_string()),
204                timestamp: chrono::Utc::now().to_rfc3339(),
205                traceparent: None,
206            },
207            method: "GET".to_string(),
208            uri: "/test".to_string(),
209            headers: HashMap::new(),
210        };
211
212        let response = client
213            .send_event(EventType::RequestHeaders, &event)
214            .await
215            .unwrap();
216
217        // Check response
218        assert_eq!(response.decision, Decision::Allow);
219        assert_eq!(response.request_headers.len(), 1);
220
221        // Clean up
222        client.close().await.unwrap();
223        server_handle.abort();
224    }
225
226    #[tokio::test]
227    async fn test_agent_protocol_denylist() {
228        let dir = tempdir().unwrap();
229        let socket_path = dir.path().join("denylist.sock");
230
231        // Start denylist agent server
232        let agent = DenylistAgent::new(vec!["/admin".to_string()], vec!["10.0.0.1".to_string()]);
233        let server = AgentServer::new("test-denylist", socket_path.clone(), Box::new(agent));
234
235        let server_handle = tokio::spawn(async move {
236            server.run().await.unwrap();
237        });
238
239        // Give server time to start
240        tokio::time::sleep(Duration::from_millis(100)).await;
241
242        // Connect client
243        let mut client =
244            AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
245                .await
246                .unwrap();
247
248        // Test blocked path
249        let event = RequestHeadersEvent {
250            metadata: RequestMetadata {
251                correlation_id: "test-123".to_string(),
252                request_id: "req-456".to_string(),
253                client_ip: "127.0.0.1".to_string(),
254                client_port: 12345,
255                server_name: Some("example.com".to_string()),
256                protocol: "HTTP/1.1".to_string(),
257                tls_version: None,
258                tls_cipher: None,
259                route_id: Some("default".to_string()),
260                upstream_id: Some("backend".to_string()),
261                timestamp: chrono::Utc::now().to_rfc3339(),
262                traceparent: None,
263            },
264            method: "GET".to_string(),
265            uri: "/admin/secret".to_string(),
266            headers: HashMap::new(),
267        };
268
269        let response = client
270            .send_event(EventType::RequestHeaders, &event)
271            .await
272            .unwrap();
273
274        // Check response is blocked
275        match response.decision {
276            Decision::Block { status, .. } => assert_eq!(status, 403),
277            _ => panic!("Expected block decision"),
278        }
279
280        // Clean up
281        client.close().await.unwrap();
282        server_handle.abort();
283    }
284
285    #[test]
286    fn test_body_mutation_types() {
287        // Test pass-through mutation
288        let pass_through = BodyMutation::pass_through(0);
289        assert!(pass_through.is_pass_through());
290        assert!(!pass_through.is_drop());
291        assert_eq!(pass_through.chunk_index, 0);
292
293        // Test drop mutation
294        let drop = BodyMutation::drop_chunk(1);
295        assert!(!drop.is_pass_through());
296        assert!(drop.is_drop());
297        assert_eq!(drop.chunk_index, 1);
298
299        // Test replace mutation
300        let replace = BodyMutation::replace(2, "modified content".to_string());
301        assert!(!replace.is_pass_through());
302        assert!(!replace.is_drop());
303        assert_eq!(replace.chunk_index, 2);
304        assert_eq!(replace.data, Some("modified content".to_string()));
305    }
306
307    #[test]
308    fn test_agent_response_streaming() {
309        // Test needs_more_data response
310        let response = AgentResponse::needs_more_data();
311        assert!(response.needs_more);
312        assert_eq!(response.decision, Decision::Allow);
313
314        // Test response with body mutation
315        let mutation = BodyMutation::replace(0, "new content".to_string());
316        let response = AgentResponse::default_allow().with_request_body_mutation(mutation.clone());
317        assert!(!response.needs_more);
318        assert!(response.request_body_mutation.is_some());
319        assert_eq!(
320            response.request_body_mutation.unwrap().data,
321            Some("new content".to_string())
322        );
323
324        // Test set_needs_more
325        let response = AgentResponse::default_allow().set_needs_more(true);
326        assert!(response.needs_more);
327    }
328}