sentinel_agent_protocol/lib.rs
1// Allow large enum variants in generated protobuf code
2#![allow(clippy::large_enum_variant)]
3
4//! Agent protocol for Sentinel proxy
5//!
6//! This crate defines the protocol for communication between the proxy dataplane
7//! and external processing agents (WAF, auth, rate limiting, custom logic).
8//!
9//! The protocol is inspired by SPOE (Stream Processing Offload Engine) and Envoy's ext_proc,
10//! designed for bounded, predictable behavior with strong failure isolation.
11//!
12//! # Architecture
13//!
14//! - [`AgentClient`]: Client for sending events to agents from the proxy
15//! - [`AgentServer`]: Server for implementing agent handlers
16//! - [`AgentHandler`]: Trait for implementing agent logic
17//! - [`AgentResponse`]: Response from agent with decision and mutations
18//!
19//! # Transports
20//!
21//! Two transport options are supported:
22//!
23//! ## Unix Domain Sockets (Default)
24//! Messages are length-prefixed JSON:
25//! - 4-byte big-endian length prefix
26//! - JSON payload (max 10MB)
27//!
28//! ## gRPC
29//! Binary protocol using Protocol Buffers over HTTP/2:
30//! - Better performance for high-throughput scenarios
31//! - Native support for TLS/mTLS
32//! - Language-agnostic (agents can be written in any language with gRPC support)
33//!
34//! # Example: Client Usage (Unix Socket)
35//!
36//! ```ignore
37//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
38//!
39//! let mut client = AgentClient::unix_socket("my-agent", "/tmp/agent.sock", timeout).await?;
40//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
41//! ```
42//!
43//! # Example: Client Usage (gRPC)
44//!
45//! ```ignore
46//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
47//!
48//! let mut client = AgentClient::grpc("my-agent", "http://localhost:50051", timeout).await?;
49//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
50//! ```
51//!
52//! # Example: Client Usage (gRPC with TLS)
53//!
54//! ```ignore
55//! use sentinel_agent_protocol::{AgentClient, GrpcTlsConfig, EventType, RequestHeadersEvent};
56//!
57//! // Simple TLS (server verification only)
58//! let tls_config = GrpcTlsConfig::new()
59//! .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?;
60//!
61//! let mut client = AgentClient::grpc_tls(
62//! "my-agent",
63//! "https://agent.internal:50051",
64//! timeout,
65//! tls_config
66//! ).await?;
67//!
68//! // mTLS (mutual authentication)
69//! let tls_config = GrpcTlsConfig::new()
70//! .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?
71//! .with_client_cert_files(
72//! "/etc/sentinel/certs/client.crt",
73//! "/etc/sentinel/certs/client.key"
74//! ).await?;
75//!
76//! let mut client = AgentClient::grpc_tls("my-agent", "https://agent.internal:50051", timeout, tls_config).await?;
77//! ```
78//!
79//! # Example: Client Usage (HTTP REST)
80//!
81//! ```ignore
82//! use sentinel_agent_protocol::{AgentClient, HttpTlsConfig, EventType, RequestHeadersEvent};
83//!
84//! // Plain HTTP
85//! let mut client = AgentClient::http("my-agent", "http://localhost:8080/agent", timeout).await?;
86//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
87//!
88//! // HTTPS with TLS
89//! let tls_config = HttpTlsConfig::new()
90//! .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?;
91//!
92//! let mut client = AgentClient::http_tls(
93//! "my-agent",
94//! "https://agent.internal:8443/agent",
95//! timeout,
96//! tls_config
97//! ).await?;
98//! ```
99//!
100//! # Example: Server Implementation
101//!
102//! ```ignore
103//! use sentinel_agent_protocol::{AgentServer, AgentHandler, AgentResponse};
104//!
105//! struct MyAgent;
106//!
107//! #[async_trait]
108//! impl AgentHandler for MyAgent {
109//! async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
110//! // Implement your logic here
111//! AgentResponse::default_allow()
112//! }
113//! }
114//!
115//! let server = AgentServer::new("my-agent", "/tmp/agent.sock", Box::new(MyAgent));
116//! server.run().await?;
117//! ```
118
119#![allow(dead_code)]
120
121pub mod binary;
122pub mod buffer_pool;
123mod client;
124mod errors;
125pub mod headers;
126#[cfg(feature = "mmap-buffers")]
127pub mod mmap_buffer;
128mod protocol;
129mod server;
130
131/// gRPC protocol definitions generated from proto/agent.proto
132pub mod grpc {
133 tonic::include_proto!("sentinel.agent.v1");
134}
135
136/// Protocol v2 types with bidirectional streaming, capabilities, and flow control
137pub mod v2;
138
139/// gRPC v2 protocol definitions generated from proto/agent_v2.proto
140pub mod grpc_v2 {
141 tonic::include_proto!("sentinel.agent.v2");
142}
143
144// Re-export error types
145pub use errors::AgentProtocolError;
146
147// Re-export protocol types
148pub use protocol::{
149 AgentRequest, AgentResponse, AuditMetadata, BinaryRequestBodyChunkEvent,
150 BinaryResponseBodyChunkEvent, BodyMutation, ConfigureEvent, Decision, DetectionSeverity,
151 EventType, GuardrailDetection, GuardrailInspectEvent, GuardrailInspectionType,
152 GuardrailResponse, HeaderOp, RequestBodyChunkEvent, RequestCompleteEvent, RequestHeadersEvent,
153 RequestMetadata, ResponseBodyChunkEvent, ResponseHeadersEvent, TextSpan, WebSocketDecision,
154 WebSocketFrameEvent, WebSocketOpcode, MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
155};
156
157// Re-export client
158pub use client::{AgentClient, GrpcTlsConfig, HttpTlsConfig};
159
160// Re-export server and handler
161pub use server::{
162 AgentHandler, AgentServer, DenylistAgent, EchoAgent, GrpcAgentHandler, GrpcAgentServer,
163};
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use std::collections::HashMap;
169 use std::time::Duration;
170 use tempfile::tempdir;
171
172 #[tokio::test]
173 async fn test_agent_protocol_echo() {
174 let dir = tempdir().unwrap();
175 let socket_path = dir.path().join("test.sock");
176
177 // Start echo agent server
178 let server = AgentServer::new("test-echo", socket_path.clone(), Box::new(EchoAgent));
179
180 let server_handle = tokio::spawn(async move {
181 server.run().await.unwrap();
182 });
183
184 // Give server time to start
185 tokio::time::sleep(Duration::from_millis(100)).await;
186
187 // Connect client
188 let mut client =
189 AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
190 .await
191 .unwrap();
192
193 // Send request headers event
194 let event = RequestHeadersEvent {
195 metadata: RequestMetadata {
196 correlation_id: "test-123".to_string(),
197 request_id: "req-456".to_string(),
198 client_ip: "127.0.0.1".to_string(),
199 client_port: 12345,
200 server_name: Some("example.com".to_string()),
201 protocol: "HTTP/1.1".to_string(),
202 tls_version: None,
203 tls_cipher: None,
204 route_id: Some("default".to_string()),
205 upstream_id: Some("backend".to_string()),
206 timestamp: chrono::Utc::now().to_rfc3339(),
207 traceparent: None,
208 },
209 method: "GET".to_string(),
210 uri: "/test".to_string(),
211 headers: HashMap::new(),
212 };
213
214 let response = client
215 .send_event(EventType::RequestHeaders, &event)
216 .await
217 .unwrap();
218
219 // Check response
220 assert_eq!(response.decision, Decision::Allow);
221 assert_eq!(response.request_headers.len(), 1);
222
223 // Clean up
224 client.close().await.unwrap();
225 server_handle.abort();
226 }
227
228 #[tokio::test]
229 async fn test_agent_protocol_denylist() {
230 let dir = tempdir().unwrap();
231 let socket_path = dir.path().join("denylist.sock");
232
233 // Start denylist agent server
234 let agent = DenylistAgent::new(vec!["/admin".to_string()], vec!["10.0.0.1".to_string()]);
235 let server = AgentServer::new("test-denylist", socket_path.clone(), Box::new(agent));
236
237 let server_handle = tokio::spawn(async move {
238 server.run().await.unwrap();
239 });
240
241 // Give server time to start
242 tokio::time::sleep(Duration::from_millis(100)).await;
243
244 // Connect client
245 let mut client =
246 AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
247 .await
248 .unwrap();
249
250 // Test blocked path
251 let event = RequestHeadersEvent {
252 metadata: RequestMetadata {
253 correlation_id: "test-123".to_string(),
254 request_id: "req-456".to_string(),
255 client_ip: "127.0.0.1".to_string(),
256 client_port: 12345,
257 server_name: Some("example.com".to_string()),
258 protocol: "HTTP/1.1".to_string(),
259 tls_version: None,
260 tls_cipher: None,
261 route_id: Some("default".to_string()),
262 upstream_id: Some("backend".to_string()),
263 timestamp: chrono::Utc::now().to_rfc3339(),
264 traceparent: None,
265 },
266 method: "GET".to_string(),
267 uri: "/admin/secret".to_string(),
268 headers: HashMap::new(),
269 };
270
271 let response = client
272 .send_event(EventType::RequestHeaders, &event)
273 .await
274 .unwrap();
275
276 // Check response is blocked
277 match response.decision {
278 Decision::Block { status, .. } => assert_eq!(status, 403),
279 _ => panic!("Expected block decision"),
280 }
281
282 // Clean up
283 client.close().await.unwrap();
284 server_handle.abort();
285 }
286
287 #[test]
288 fn test_body_mutation_types() {
289 // Test pass-through mutation
290 let pass_through = BodyMutation::pass_through(0);
291 assert!(pass_through.is_pass_through());
292 assert!(!pass_through.is_drop());
293 assert_eq!(pass_through.chunk_index, 0);
294
295 // Test drop mutation
296 let drop = BodyMutation::drop_chunk(1);
297 assert!(!drop.is_pass_through());
298 assert!(drop.is_drop());
299 assert_eq!(drop.chunk_index, 1);
300
301 // Test replace mutation
302 let replace = BodyMutation::replace(2, "modified content".to_string());
303 assert!(!replace.is_pass_through());
304 assert!(!replace.is_drop());
305 assert_eq!(replace.chunk_index, 2);
306 assert_eq!(replace.data, Some("modified content".to_string()));
307 }
308
309 #[test]
310 fn test_agent_response_streaming() {
311 // Test needs_more_data response
312 let response = AgentResponse::needs_more_data();
313 assert!(response.needs_more);
314 assert_eq!(response.decision, Decision::Allow);
315
316 // Test response with body mutation
317 let mutation = BodyMutation::replace(0, "new content".to_string());
318 let response = AgentResponse::default_allow().with_request_body_mutation(mutation.clone());
319 assert!(!response.needs_more);
320 assert!(response.request_body_mutation.is_some());
321 assert_eq!(
322 response.request_body_mutation.unwrap().data,
323 Some("new content".to_string())
324 );
325
326 // Test set_needs_more
327 let response = AgentResponse::default_allow().set_needs_more(true);
328 assert!(response.needs_more);
329 }
330}