sentinel_agent_protocol/
lib.rs

1// Allow large enum variants in generated protobuf code
2#![allow(clippy::large_enum_variant)]
3
4//! Agent protocol for Sentinel proxy
5//!
6//! This crate defines the protocol for communication between the proxy dataplane
7//! and external processing agents (WAF, auth, rate limiting, custom logic).
8//!
9//! The protocol is inspired by SPOE (Stream Processing Offload Engine) and Envoy's ext_proc,
10//! designed for bounded, predictable behavior with strong failure isolation.
11//!
12//! # Architecture
13//!
14//! - [`AgentClient`]: Client for sending events to agents from the proxy
15//! - [`AgentServer`]: Server for implementing agent handlers
16//! - [`AgentHandler`]: Trait for implementing agent logic
17//! - [`AgentResponse`]: Response from agent with decision and mutations
18//!
19//! # Transports
20//!
21//! Two transport options are supported:
22//!
23//! ## Unix Domain Sockets (Default)
24//! Messages are length-prefixed JSON:
25//! - 4-byte big-endian length prefix
26//! - JSON payload (max 10MB)
27//!
28//! ## gRPC
29//! Binary protocol using Protocol Buffers over HTTP/2:
30//! - Better performance for high-throughput scenarios
31//! - Native support for TLS/mTLS
32//! - Language-agnostic (agents can be written in any language with gRPC support)
33//!
34//! # Example: Client Usage (Unix Socket)
35//!
36//! ```ignore
37//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
38//!
39//! let mut client = AgentClient::unix_socket("my-agent", "/tmp/agent.sock", timeout).await?;
40//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
41//! ```
42//!
43//! # Example: Client Usage (gRPC)
44//!
45//! ```ignore
46//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
47//!
48//! let mut client = AgentClient::grpc("my-agent", "http://localhost:50051", timeout).await?;
49//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
50//! ```
51//!
52//! # Example: Client Usage (gRPC with TLS)
53//!
54//! ```ignore
55//! use sentinel_agent_protocol::{AgentClient, GrpcTlsConfig, EventType, RequestHeadersEvent};
56//!
57//! // Simple TLS (server verification only)
58//! let tls_config = GrpcTlsConfig::new()
59//!     .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?;
60//!
61//! let mut client = AgentClient::grpc_tls(
62//!     "my-agent",
63//!     "https://agent.internal:50051",
64//!     timeout,
65//!     tls_config
66//! ).await?;
67//!
68//! // mTLS (mutual authentication)
69//! let tls_config = GrpcTlsConfig::new()
70//!     .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?
71//!     .with_client_cert_files(
72//!         "/etc/sentinel/certs/client.crt",
73//!         "/etc/sentinel/certs/client.key"
74//!     ).await?;
75//!
76//! let mut client = AgentClient::grpc_tls("my-agent", "https://agent.internal:50051", timeout, tls_config).await?;
77//! ```
78//!
79//! # Example: Client Usage (HTTP REST)
80//!
81//! ```ignore
82//! use sentinel_agent_protocol::{AgentClient, HttpTlsConfig, EventType, RequestHeadersEvent};
83//!
84//! // Plain HTTP
85//! let mut client = AgentClient::http("my-agent", "http://localhost:8080/agent", timeout).await?;
86//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
87//!
88//! // HTTPS with TLS
89//! let tls_config = HttpTlsConfig::new()
90//!     .with_ca_cert_file("/etc/sentinel/certs/ca.crt").await?;
91//!
92//! let mut client = AgentClient::http_tls(
93//!     "my-agent",
94//!     "https://agent.internal:8443/agent",
95//!     timeout,
96//!     tls_config
97//! ).await?;
98//! ```
99//!
100//! # Example: Server Implementation
101//!
102//! ```ignore
103//! use sentinel_agent_protocol::{AgentServer, AgentHandler, AgentResponse};
104//!
105//! struct MyAgent;
106//!
107//! #[async_trait]
108//! impl AgentHandler for MyAgent {
109//!     async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
110//!         // Implement your logic here
111//!         AgentResponse::default_allow()
112//!     }
113//! }
114//!
115//! let server = AgentServer::new("my-agent", "/tmp/agent.sock", Box::new(MyAgent));
116//! server.run().await?;
117//! ```
118
119#![allow(dead_code)]
120
121mod client;
122mod errors;
123mod protocol;
124mod server;
125
126/// gRPC protocol definitions generated from proto/agent.proto
127pub mod grpc {
128    tonic::include_proto!("sentinel.agent.v1");
129}
130
131// Re-export error types
132pub use errors::AgentProtocolError;
133
134// Re-export protocol types
135pub use protocol::{
136    AgentRequest, AgentResponse, AuditMetadata, BodyMutation, ConfigureEvent, Decision,
137    DetectionSeverity, EventType, GuardrailDetection, GuardrailInspectEvent,
138    GuardrailInspectionType, GuardrailResponse, HeaderOp, RequestBodyChunkEvent,
139    RequestCompleteEvent, RequestHeadersEvent, RequestMetadata, ResponseBodyChunkEvent,
140    ResponseHeadersEvent, TextSpan, WebSocketDecision, WebSocketFrameEvent, WebSocketOpcode,
141    MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
142};
143
144// Re-export client
145pub use client::{AgentClient, GrpcTlsConfig, HttpTlsConfig};
146
147// Re-export server and handler
148pub use server::{
149    AgentHandler, AgentServer, DenylistAgent, EchoAgent, GrpcAgentHandler, GrpcAgentServer,
150};
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use std::collections::HashMap;
156    use std::time::Duration;
157    use tempfile::tempdir;
158
159    #[tokio::test]
160    async fn test_agent_protocol_echo() {
161        let dir = tempdir().unwrap();
162        let socket_path = dir.path().join("test.sock");
163
164        // Start echo agent server
165        let server = AgentServer::new("test-echo", socket_path.clone(), Box::new(EchoAgent));
166
167        let server_handle = tokio::spawn(async move {
168            server.run().await.unwrap();
169        });
170
171        // Give server time to start
172        tokio::time::sleep(Duration::from_millis(100)).await;
173
174        // Connect client
175        let mut client =
176            AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
177                .await
178                .unwrap();
179
180        // Send request headers event
181        let event = RequestHeadersEvent {
182            metadata: RequestMetadata {
183                correlation_id: "test-123".to_string(),
184                request_id: "req-456".to_string(),
185                client_ip: "127.0.0.1".to_string(),
186                client_port: 12345,
187                server_name: Some("example.com".to_string()),
188                protocol: "HTTP/1.1".to_string(),
189                tls_version: None,
190                tls_cipher: None,
191                route_id: Some("default".to_string()),
192                upstream_id: Some("backend".to_string()),
193                timestamp: chrono::Utc::now().to_rfc3339(),
194                traceparent: None,
195            },
196            method: "GET".to_string(),
197            uri: "/test".to_string(),
198            headers: HashMap::new(),
199        };
200
201        let response = client
202            .send_event(EventType::RequestHeaders, &event)
203            .await
204            .unwrap();
205
206        // Check response
207        assert_eq!(response.decision, Decision::Allow);
208        assert_eq!(response.request_headers.len(), 1);
209
210        // Clean up
211        client.close().await.unwrap();
212        server_handle.abort();
213    }
214
215    #[tokio::test]
216    async fn test_agent_protocol_denylist() {
217        let dir = tempdir().unwrap();
218        let socket_path = dir.path().join("denylist.sock");
219
220        // Start denylist agent server
221        let agent = DenylistAgent::new(vec!["/admin".to_string()], vec!["10.0.0.1".to_string()]);
222        let server = AgentServer::new("test-denylist", socket_path.clone(), Box::new(agent));
223
224        let server_handle = tokio::spawn(async move {
225            server.run().await.unwrap();
226        });
227
228        // Give server time to start
229        tokio::time::sleep(Duration::from_millis(100)).await;
230
231        // Connect client
232        let mut client =
233            AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
234                .await
235                .unwrap();
236
237        // Test blocked path
238        let event = RequestHeadersEvent {
239            metadata: RequestMetadata {
240                correlation_id: "test-123".to_string(),
241                request_id: "req-456".to_string(),
242                client_ip: "127.0.0.1".to_string(),
243                client_port: 12345,
244                server_name: Some("example.com".to_string()),
245                protocol: "HTTP/1.1".to_string(),
246                tls_version: None,
247                tls_cipher: None,
248                route_id: Some("default".to_string()),
249                upstream_id: Some("backend".to_string()),
250                timestamp: chrono::Utc::now().to_rfc3339(),
251                traceparent: None,
252            },
253            method: "GET".to_string(),
254            uri: "/admin/secret".to_string(),
255            headers: HashMap::new(),
256        };
257
258        let response = client
259            .send_event(EventType::RequestHeaders, &event)
260            .await
261            .unwrap();
262
263        // Check response is blocked
264        match response.decision {
265            Decision::Block { status, .. } => assert_eq!(status, 403),
266            _ => panic!("Expected block decision"),
267        }
268
269        // Clean up
270        client.close().await.unwrap();
271        server_handle.abort();
272    }
273
274    #[test]
275    fn test_body_mutation_types() {
276        // Test pass-through mutation
277        let pass_through = BodyMutation::pass_through(0);
278        assert!(pass_through.is_pass_through());
279        assert!(!pass_through.is_drop());
280        assert_eq!(pass_through.chunk_index, 0);
281
282        // Test drop mutation
283        let drop = BodyMutation::drop_chunk(1);
284        assert!(!drop.is_pass_through());
285        assert!(drop.is_drop());
286        assert_eq!(drop.chunk_index, 1);
287
288        // Test replace mutation
289        let replace = BodyMutation::replace(2, "modified content".to_string());
290        assert!(!replace.is_pass_through());
291        assert!(!replace.is_drop());
292        assert_eq!(replace.chunk_index, 2);
293        assert_eq!(replace.data, Some("modified content".to_string()));
294    }
295
296    #[test]
297    fn test_agent_response_streaming() {
298        // Test needs_more_data response
299        let response = AgentResponse::needs_more_data();
300        assert!(response.needs_more);
301        assert_eq!(response.decision, Decision::Allow);
302
303        // Test response with body mutation
304        let mutation = BodyMutation::replace(0, "new content".to_string());
305        let response = AgentResponse::default_allow().with_request_body_mutation(mutation.clone());
306        assert!(!response.needs_more);
307        assert!(response.request_body_mutation.is_some());
308        assert_eq!(
309            response.request_body_mutation.unwrap().data,
310            Some("new content".to_string())
311        );
312
313        // Test set_needs_more
314        let response = AgentResponse::default_allow().set_needs_more(true);
315        assert!(response.needs_more);
316    }
317}