sentinel_agent_protocol/
lib.rs

1// Allow large enum variants in generated protobuf code
2#![allow(clippy::large_enum_variant)]
3
4//! Agent protocol for Sentinel proxy
5//!
6//! This crate defines the protocol for communication between the proxy dataplane
7//! and external processing agents (WAF, auth, rate limiting, custom logic).
8//!
9//! The protocol is inspired by SPOE (Stream Processing Offload Engine) and Envoy's ext_proc,
10//! designed for bounded, predictable behavior with strong failure isolation.
11//!
12//! # Architecture
13//!
14//! - [`AgentClient`]: Client for sending events to agents from the proxy
15//! - [`AgentServer`]: Server for implementing agent handlers
16//! - [`AgentHandler`]: Trait for implementing agent logic
17//! - [`AgentResponse`]: Response from agent with decision and mutations
18//!
19//! # Transports
20//!
21//! Two transport options are supported:
22//!
23//! ## Unix Domain Sockets (Default)
24//! Messages are length-prefixed JSON:
25//! - 4-byte big-endian length prefix
26//! - JSON payload (max 10MB)
27//!
28//! ## gRPC
29//! Binary protocol using Protocol Buffers over HTTP/2:
30//! - Better performance for high-throughput scenarios
31//! - Native support for TLS/mTLS
32//! - Language-agnostic (agents can be written in any language with gRPC support)
33//!
34//! # Example: Client Usage (Unix Socket)
35//!
36//! ```ignore
37//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
38//!
39//! let mut client = AgentClient::unix_socket("my-agent", "/tmp/agent.sock", timeout).await?;
40//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
41//! ```
42//!
43//! # Example: Client Usage (gRPC)
44//!
45//! ```ignore
46//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
47//!
48//! let mut client = AgentClient::grpc("my-agent", "http://localhost:50051", timeout).await?;
49//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
50//! ```
51//!
52//! # Example: Client Usage (gRPC with TLS)
53//!
54//! ```ignore
55//! use sentinel_agent_protocol::{AgentClient, GrpcTlsConfig, EventType, RequestHeadersEvent};
56//!
57//! // Simple TLS (server verification only)
58//! let tls_config = GrpcTlsConfig::new()
59//!     .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?;
60//!
61//! let mut client = AgentClient::grpc_tls(
62//!     "my-agent",
63//!     "https://agent.internal:50051",
64//!     timeout,
65//!     tls_config
66//! ).await?;
67//!
68//! // mTLS (mutual authentication)
69//! let tls_config = GrpcTlsConfig::new()
70//!     .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?
71//!     .with_client_cert_files(
72//!         "/etc/sentinel/certs/client.crt",
73//!         "/etc/sentinel/certs/client.key"
74//!     ).await?;
75//!
76//! let mut client = AgentClient::grpc_tls("my-agent", "https://agent.internal:50051", timeout, tls_config).await?;
77//! ```
78//!
79//! # Example: Client Usage (HTTP REST)
80//!
81//! ```ignore
82//! use sentinel_agent_protocol::{AgentClient, HttpTlsConfig, EventType, RequestHeadersEvent};
83//!
84//! // Plain HTTP
85//! let mut client = AgentClient::http("my-agent", "http://localhost:8080/agent", timeout).await?;
86//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
87//!
88//! // HTTPS with TLS
89//! let tls_config = HttpTlsConfig::new()
90//!     .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?;
91//!
92//! let mut client = AgentClient::http_tls(
93//!     "my-agent",
94//!     "https://agent.internal:8443/agent",
95//!     timeout,
96//!     tls_config
97//! ).await?;
98//! ```
99//!
100//! # Example: Server Implementation
101//!
102//! ```ignore
103//! use sentinel_agent_protocol::{AgentServer, AgentHandler, AgentResponse};
104//!
105//! struct MyAgent;
106//!
107//! #[async_trait]
108//! impl AgentHandler for MyAgent {
109//!     async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
110//!         // Implement your logic here
111//!         AgentResponse::default_allow()
112//!     }
113//! }
114//!
115//! let server = AgentServer::new("my-agent", "/tmp/agent.sock", Box::new(MyAgent));
116//! server.run().await?;
117//! ```
118
119#![allow(dead_code)]
120
121pub mod binary;
122pub mod buffer_pool;
123mod client;
124mod errors;
125pub mod headers;
126#[cfg(feature = "mmap-buffers")]
127pub mod mmap_buffer;
128mod protocol;
129mod server;
130
131/// gRPC protocol definitions generated from proto/agent.proto
132pub mod grpc {
133    tonic::include_proto!("sentinel.agent.v1");
134}
135
136/// Protocol v2 types with bidirectional streaming, capabilities, and flow control
137pub mod v2;
138
139/// gRPC v2 protocol definitions generated from proto/agent_v2.proto
140pub mod grpc_v2 {
141    tonic::include_proto!("sentinel.agent.v2");
142}
143
144// Re-export error types
145pub use errors::AgentProtocolError;
146
147// Re-export protocol types
148pub use protocol::{
149    AgentRequest, AgentResponse, AuditMetadata, BinaryRequestBodyChunkEvent,
150    BinaryResponseBodyChunkEvent, BodyMutation, ConfigureEvent, Decision, DetectionSeverity,
151    EventType, GuardrailDetection, GuardrailInspectEvent, GuardrailInspectionType,
152    GuardrailResponse, HeaderOp, RequestBodyChunkEvent, RequestCompleteEvent, RequestHeadersEvent,
153    RequestMetadata, ResponseBodyChunkEvent, ResponseHeadersEvent, TextSpan, WebSocketDecision,
154    WebSocketFrameEvent, WebSocketOpcode, MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
155};
156
157// Re-export client
158pub use client::{AgentClient, GrpcTlsConfig, HttpTlsConfig};
159
160// Re-export server and handler
161pub use server::{
162    AgentHandler, AgentServer, DenylistAgent, EchoAgent, GrpcAgentHandler, GrpcAgentServer,
163};
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use std::collections::HashMap;
169    use std::time::Duration;
170    use tempfile::tempdir;
171
172    #[tokio::test]
173    async fn test_agent_protocol_echo() {
174        let dir = tempdir().unwrap();
175        let socket_path = dir.path().join("test.sock");
176
177        // Start echo agent server
178        let server = AgentServer::new("test-echo", socket_path.clone(), Box::new(EchoAgent));
179
180        let server_handle = tokio::spawn(async move {
181            server.run().await.unwrap();
182        });
183
184        // Give server time to start
185        tokio::time::sleep(Duration::from_millis(100)).await;
186
187        // Connect client
188        let mut client =
189            AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
190                .await
191                .unwrap();
192
193        // Send request headers event
194        let event = RequestHeadersEvent {
195            metadata: RequestMetadata {
196                correlation_id: "test-123".to_string(),
197                request_id: "req-456".to_string(),
198                client_ip: "127.0.0.1".to_string(),
199                client_port: 12345,
200                server_name: Some("example.com".to_string()),
201                protocol: "HTTP/1.1".to_string(),
202                tls_version: None,
203                tls_cipher: None,
204                route_id: Some("default".to_string()),
205                upstream_id: Some("backend".to_string()),
206                timestamp: chrono::Utc::now().to_rfc3339(),
207                traceparent: None,
208            },
209            method: "GET".to_string(),
210            uri: "/test".to_string(),
211            headers: HashMap::new(),
212        };
213
214        let response = client
215            .send_event(EventType::RequestHeaders, &event)
216            .await
217            .unwrap();
218
219        // Check response
220        assert_eq!(response.decision, Decision::Allow);
221        assert_eq!(response.request_headers.len(), 1);
222
223        // Clean up
224        client.close().await.unwrap();
225        server_handle.abort();
226    }
227
228    #[tokio::test]
229    async fn test_agent_protocol_denylist() {
230        let dir = tempdir().unwrap();
231        let socket_path = dir.path().join("denylist.sock");
232
233        // Start denylist agent server
234        let agent = DenylistAgent::new(vec!["/admin".to_string()], vec!["10.0.0.1".to_string()]);
235        let server = AgentServer::new("test-denylist", socket_path.clone(), Box::new(agent));
236
237        let server_handle = tokio::spawn(async move {
238            server.run().await.unwrap();
239        });
240
241        // Give server time to start
242        tokio::time::sleep(Duration::from_millis(100)).await;
243
244        // Connect client
245        let mut client =
246            AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
247                .await
248                .unwrap();
249
250        // Test blocked path
251        let event = RequestHeadersEvent {
252            metadata: RequestMetadata {
253                correlation_id: "test-123".to_string(),
254                request_id: "req-456".to_string(),
255                client_ip: "127.0.0.1".to_string(),
256                client_port: 12345,
257                server_name: Some("example.com".to_string()),
258                protocol: "HTTP/1.1".to_string(),
259                tls_version: None,
260                tls_cipher: None,
261                route_id: Some("default".to_string()),
262                upstream_id: Some("backend".to_string()),
263                timestamp: chrono::Utc::now().to_rfc3339(),
264                traceparent: None,
265            },
266            method: "GET".to_string(),
267            uri: "/admin/secret".to_string(),
268            headers: HashMap::new(),
269        };
270
271        let response = client
272            .send_event(EventType::RequestHeaders, &event)
273            .await
274            .unwrap();
275
276        // Check response is blocked
277        match response.decision {
278            Decision::Block { status, .. } => assert_eq!(status, 403),
279            _ => panic!("Expected block decision"),
280        }
281
282        // Clean up
283        client.close().await.unwrap();
284        server_handle.abort();
285    }
286
287    #[test]
288    fn test_body_mutation_types() {
289        // Test pass-through mutation
290        let pass_through = BodyMutation::pass_through(0);
291        assert!(pass_through.is_pass_through());
292        assert!(!pass_through.is_drop());
293        assert_eq!(pass_through.chunk_index, 0);
294
295        // Test drop mutation
296        let drop = BodyMutation::drop_chunk(1);
297        assert!(!drop.is_pass_through());
298        assert!(drop.is_drop());
299        assert_eq!(drop.chunk_index, 1);
300
301        // Test replace mutation
302        let replace = BodyMutation::replace(2, "modified content".to_string());
303        assert!(!replace.is_pass_through());
304        assert!(!replace.is_drop());
305        assert_eq!(replace.chunk_index, 2);
306        assert_eq!(replace.data, Some("modified content".to_string()));
307    }
308
309    #[test]
310    fn test_agent_response_streaming() {
311        // Test needs_more_data response
312        let response = AgentResponse::needs_more_data();
313        assert!(response.needs_more);
314        assert_eq!(response.decision, Decision::Allow);
315
316        // Test response with body mutation
317        let mutation = BodyMutation::replace(0, "new content".to_string());
318        let response = AgentResponse::default_allow().with_request_body_mutation(mutation.clone());
319        assert!(!response.needs_more);
320        assert!(response.request_body_mutation.is_some());
321        assert_eq!(
322            response.request_body_mutation.unwrap().data,
323            Some("new content".to_string())
324        );
325
326        // Test set_needs_more
327        let response = AgentResponse::default_allow().set_needs_more(true);
328        assert!(response.needs_more);
329    }
330}