sentinel_agent_protocol/lib.rs
1// Allow large enum variants in generated protobuf code
2#![allow(clippy::large_enum_variant)]
3
4//! Agent protocol for Sentinel proxy
5//!
6//! This crate defines the protocol for communication between the proxy dataplane
7//! and external processing agents (WAF, auth, rate limiting, custom logic).
8//!
9//! The protocol is inspired by SPOE (Stream Processing Offload Engine) and Envoy's ext_proc,
10//! designed for bounded, predictable behavior with strong failure isolation.
11//!
12//! # Architecture
13//!
14//! - [`AgentClient`]: Client for sending events to agents from the proxy
15//! - [`AgentServer`]: Server for implementing agent handlers
16//! - [`AgentHandler`]: Trait for implementing agent logic
17//! - [`AgentResponse`]: Response from agent with decision and mutations
18//!
19//! # Transports
20//!
21//! Two transport options are supported:
22//!
23//! ## Unix Domain Sockets (Default)
24//! Messages are length-prefixed JSON:
25//! - 4-byte big-endian length prefix
26//! - JSON payload (max 10MB)
27//!
28//! ## gRPC
29//! Binary protocol using Protocol Buffers over HTTP/2:
30//! - Better performance for high-throughput scenarios
31//! - Native support for TLS/mTLS
32//! - Language-agnostic (agents can be written in any language with gRPC support)
33//!
34//! # Example: Client Usage (Unix Socket)
35//!
36//! ```ignore
37//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
38//!
39//! let mut client = AgentClient::unix_socket("my-agent", "/tmp/agent.sock", timeout).await?;
40//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
41//! ```
42//!
43//! # Example: Client Usage (gRPC)
44//!
45//! ```ignore
46//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
47//!
48//! let mut client = AgentClient::grpc("my-agent", "http://localhost:50051", timeout).await?;
49//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
50//! ```
51//!
52//! # Example: Client Usage (gRPC with TLS)
53//!
54//! ```ignore
55//! use sentinel_agent_protocol::{AgentClient, GrpcTlsConfig, EventType, RequestHeadersEvent};
56//!
57//! // Simple TLS (server verification only)
58//! let tls_config = GrpcTlsConfig::new()
59//! .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?;
60//!
61//! let mut client = AgentClient::grpc_tls(
62//! "my-agent",
63//! "https://agent.internal:50051",
64//! timeout,
65//! tls_config
66//! ).await?;
67//!
68//! // mTLS (mutual authentication)
69//! let tls_config = GrpcTlsConfig::new()
70//! .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?
71//! .with_client_cert_files(
72//! "/etc/sentinel/certs/client.crt",
73//! "/etc/sentinel/certs/client.key"
74//! ).await?;
75//!
76//! let mut client = AgentClient::grpc_tls("my-agent", "https://agent.internal:50051", timeout, tls_config).await?;
77//! ```
78//!
79//! # Example: Client Usage (HTTP REST)
80//!
81//! ```ignore
82//! use sentinel_agent_protocol::{AgentClient, HttpTlsConfig, EventType, RequestHeadersEvent};
83//!
84//! // Plain HTTP
85//! let mut client = AgentClient::http("my-agent", "http://localhost:8080/agent", timeout).await?;
86//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
87//!
88//! // HTTPS with TLS
89//! let tls_config = HttpTlsConfig::new()
90//! .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?;
91//!
92//! let mut client = AgentClient::http_tls(
93//! "my-agent",
94//! "https://agent.internal:8443/agent",
95//! timeout,
96//! tls_config
97//! ).await?;
98//! ```
99//!
100//! # Example: Server Implementation
101//!
102//! ```ignore
103//! use sentinel_agent_protocol::{AgentServer, AgentHandler, AgentResponse};
104//!
105//! struct MyAgent;
106//!
107//! #[async_trait]
108//! impl AgentHandler for MyAgent {
109//! async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
110//! // Implement your logic here
111//! AgentResponse::default_allow()
112//! }
113//! }
114//!
115//! let server = AgentServer::new("my-agent", "/tmp/agent.sock", Box::new(MyAgent));
116//! server.run().await?;
117//! ```
118
119#![allow(dead_code)]
120
121mod client;
122mod errors;
123mod protocol;
124mod server;
125
126/// gRPC protocol definitions generated from proto/agent.proto
127pub mod grpc {
128 tonic::include_proto!("sentinel.agent.v1");
129}
130
131// Re-export error types
132pub use errors::AgentProtocolError;
133
134// Re-export protocol types
135pub use protocol::{
136 AgentRequest, AgentResponse, AuditMetadata, BodyMutation, ConfigureEvent, Decision,
137 DetectionSeverity, EventType, GuardrailDetection, GuardrailInspectEvent,
138 GuardrailInspectionType, GuardrailResponse, HeaderOp, RequestBodyChunkEvent,
139 RequestCompleteEvent, RequestHeadersEvent, RequestMetadata, ResponseBodyChunkEvent,
140 ResponseHeadersEvent, TextSpan, WebSocketDecision, WebSocketFrameEvent, WebSocketOpcode,
141 MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
142};
143
144// Re-export client
145pub use client::{AgentClient, GrpcTlsConfig, HttpTlsConfig};
146
147// Re-export server and handler
148pub use server::{
149 AgentHandler, AgentServer, DenylistAgent, EchoAgent, GrpcAgentHandler, GrpcAgentServer,
150};
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use std::collections::HashMap;
156 use std::time::Duration;
157 use tempfile::tempdir;
158
159 #[tokio::test]
160 async fn test_agent_protocol_echo() {
161 let dir = tempdir().unwrap();
162 let socket_path = dir.path().join("test.sock");
163
164 // Start echo agent server
165 let server = AgentServer::new("test-echo", socket_path.clone(), Box::new(EchoAgent));
166
167 let server_handle = tokio::spawn(async move {
168 server.run().await.unwrap();
169 });
170
171 // Give server time to start
172 tokio::time::sleep(Duration::from_millis(100)).await;
173
174 // Connect client
175 let mut client =
176 AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
177 .await
178 .unwrap();
179
180 // Send request headers event
181 let event = RequestHeadersEvent {
182 metadata: RequestMetadata {
183 correlation_id: "test-123".to_string(),
184 request_id: "req-456".to_string(),
185 client_ip: "127.0.0.1".to_string(),
186 client_port: 12345,
187 server_name: Some("example.com".to_string()),
188 protocol: "HTTP/1.1".to_string(),
189 tls_version: None,
190 tls_cipher: None,
191 route_id: Some("default".to_string()),
192 upstream_id: Some("backend".to_string()),
193 timestamp: chrono::Utc::now().to_rfc3339(),
194 traceparent: None,
195 },
196 method: "GET".to_string(),
197 uri: "/test".to_string(),
198 headers: HashMap::new(),
199 };
200
201 let response = client
202 .send_event(EventType::RequestHeaders, &event)
203 .await
204 .unwrap();
205
206 // Check response
207 assert_eq!(response.decision, Decision::Allow);
208 assert_eq!(response.request_headers.len(), 1);
209
210 // Clean up
211 client.close().await.unwrap();
212 server_handle.abort();
213 }
214
215 #[tokio::test]
216 async fn test_agent_protocol_denylist() {
217 let dir = tempdir().unwrap();
218 let socket_path = dir.path().join("denylist.sock");
219
220 // Start denylist agent server
221 let agent = DenylistAgent::new(vec!["/admin".to_string()], vec!["10.0.0.1".to_string()]);
222 let server = AgentServer::new("test-denylist", socket_path.clone(), Box::new(agent));
223
224 let server_handle = tokio::spawn(async move {
225 server.run().await.unwrap();
226 });
227
228 // Give server time to start
229 tokio::time::sleep(Duration::from_millis(100)).await;
230
231 // Connect client
232 let mut client =
233 AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
234 .await
235 .unwrap();
236
237 // Test blocked path
238 let event = RequestHeadersEvent {
239 metadata: RequestMetadata {
240 correlation_id: "test-123".to_string(),
241 request_id: "req-456".to_string(),
242 client_ip: "127.0.0.1".to_string(),
243 client_port: 12345,
244 server_name: Some("example.com".to_string()),
245 protocol: "HTTP/1.1".to_string(),
246 tls_version: None,
247 tls_cipher: None,
248 route_id: Some("default".to_string()),
249 upstream_id: Some("backend".to_string()),
250 timestamp: chrono::Utc::now().to_rfc3339(),
251 traceparent: None,
252 },
253 method: "GET".to_string(),
254 uri: "/admin/secret".to_string(),
255 headers: HashMap::new(),
256 };
257
258 let response = client
259 .send_event(EventType::RequestHeaders, &event)
260 .await
261 .unwrap();
262
263 // Check response is blocked
264 match response.decision {
265 Decision::Block { status, .. } => assert_eq!(status, 403),
266 _ => panic!("Expected block decision"),
267 }
268
269 // Clean up
270 client.close().await.unwrap();
271 server_handle.abort();
272 }
273
274 #[test]
275 fn test_body_mutation_types() {
276 // Test pass-through mutation
277 let pass_through = BodyMutation::pass_through(0);
278 assert!(pass_through.is_pass_through());
279 assert!(!pass_through.is_drop());
280 assert_eq!(pass_through.chunk_index, 0);
281
282 // Test drop mutation
283 let drop = BodyMutation::drop_chunk(1);
284 assert!(!drop.is_pass_through());
285 assert!(drop.is_drop());
286 assert_eq!(drop.chunk_index, 1);
287
288 // Test replace mutation
289 let replace = BodyMutation::replace(2, "modified content".to_string());
290 assert!(!replace.is_pass_through());
291 assert!(!replace.is_drop());
292 assert_eq!(replace.chunk_index, 2);
293 assert_eq!(replace.data, Some("modified content".to_string()));
294 }
295
296 #[test]
297 fn test_agent_response_streaming() {
298 // Test needs_more_data response
299 let response = AgentResponse::needs_more_data();
300 assert!(response.needs_more);
301 assert_eq!(response.decision, Decision::Allow);
302
303 // Test response with body mutation
304 let mutation = BodyMutation::replace(0, "new content".to_string());
305 let response = AgentResponse::default_allow().with_request_body_mutation(mutation.clone());
306 assert!(!response.needs_more);
307 assert!(response.request_body_mutation.is_some());
308 assert_eq!(
309 response.request_body_mutation.unwrap().data,
310 Some("new content".to_string())
311 );
312
313 // Test set_needs_more
314 let response = AgentResponse::default_allow().set_needs_more(true);
315 assert!(response.needs_more);
316 }
317}