sentinel_agent_protocol/
lib.rs

1// Allow large enum variants in generated protobuf code
2#![allow(clippy::large_enum_variant)]
3
4//! Agent protocol for Sentinel proxy
5//!
6//! This crate defines the protocol for communication between the proxy dataplane
7//! and external processing agents (WAF, auth, rate limiting, custom logic).
8//!
9//! The protocol is inspired by SPOE (Stream Processing Offload Engine) and Envoy's ext_proc,
10//! designed for bounded, predictable behavior with strong failure isolation.
11//!
12//! # Architecture
13//!
14//! - [`AgentClient`]: Client for sending events to agents from the proxy
15//! - [`AgentServer`]: Server for implementing agent handlers
16//! - [`AgentHandler`]: Trait for implementing agent logic
17//! - [`AgentResponse`]: Response from agent with decision and mutations
18//!
19//! # Transports
20//!
21//! Two transport options are supported:
22//!
23//! ## Unix Domain Sockets (Default)
24//! Messages are length-prefixed JSON:
25//! - 4-byte big-endian length prefix
26//! - JSON payload (max 10MB)
27//!
28//! ## gRPC
29//! Binary protocol using Protocol Buffers over HTTP/2:
30//! - Better performance for high-throughput scenarios
31//! - Native support for TLS/mTLS
32//! - Language-agnostic (agents can be written in any language with gRPC support)
33//!
34//! # Example: Client Usage (Unix Socket)
35//!
36//! ```ignore
37//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
38//!
39//! let mut client = AgentClient::unix_socket("my-agent", "/tmp/agent.sock", timeout).await?;
40//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
41//! ```
42//!
43//! # Example: Client Usage (gRPC)
44//!
45//! ```ignore
46//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
47//!
48//! let mut client = AgentClient::grpc("my-agent", "http://localhost:50051", timeout).await?;
49//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
50//! ```
51//!
52//! # Example: Server Implementation
53//!
54//! ```ignore
55//! use sentinel_agent_protocol::{AgentServer, AgentHandler, AgentResponse};
56//!
57//! struct MyAgent;
58//!
59//! #[async_trait]
60//! impl AgentHandler for MyAgent {
61//!     async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
62//!         // Implement your logic here
63//!         AgentResponse::default_allow()
64//!     }
65//! }
66//!
67//! let server = AgentServer::new("my-agent", "/tmp/agent.sock", Box::new(MyAgent));
68//! server.run().await?;
69//! ```
70
71#![allow(dead_code)]
72
73mod client;
74mod errors;
75mod protocol;
76mod server;
77
78/// gRPC protocol definitions generated from proto/agent.proto
79pub mod grpc {
80    tonic::include_proto!("sentinel.agent.v1");
81}
82
83// Re-export error types
84pub use errors::AgentProtocolError;
85
86// Re-export protocol types
87pub use protocol::{
88    AgentRequest, AgentResponse, AuditMetadata, BodyMutation, ConfigureEvent, Decision, EventType,
89    HeaderOp, RequestBodyChunkEvent, RequestCompleteEvent, RequestHeadersEvent, RequestMetadata,
90    ResponseBodyChunkEvent, ResponseHeadersEvent, WebSocketDecision, WebSocketFrameEvent,
91    WebSocketOpcode, MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
92};
93
94// Re-export client
95pub use client::AgentClient;
96
97// Re-export server and handler
98pub use server::{
99    AgentHandler, AgentServer, DenylistAgent, EchoAgent, GrpcAgentHandler, GrpcAgentServer,
100};
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105    use std::collections::HashMap;
106    use std::time::Duration;
107    use tempfile::tempdir;
108
109    #[tokio::test]
110    async fn test_agent_protocol_echo() {
111        let dir = tempdir().unwrap();
112        let socket_path = dir.path().join("test.sock");
113
114        // Start echo agent server
115        let server = AgentServer::new("test-echo", socket_path.clone(), Box::new(EchoAgent));
116
117        let server_handle = tokio::spawn(async move {
118            server.run().await.unwrap();
119        });
120
121        // Give server time to start
122        tokio::time::sleep(Duration::from_millis(100)).await;
123
124        // Connect client
125        let mut client =
126            AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
127                .await
128                .unwrap();
129
130        // Send request headers event
131        let event = RequestHeadersEvent {
132            metadata: RequestMetadata {
133                correlation_id: "test-123".to_string(),
134                request_id: "req-456".to_string(),
135                client_ip: "127.0.0.1".to_string(),
136                client_port: 12345,
137                server_name: Some("example.com".to_string()),
138                protocol: "HTTP/1.1".to_string(),
139                tls_version: None,
140                tls_cipher: None,
141                route_id: Some("default".to_string()),
142                upstream_id: Some("backend".to_string()),
143                timestamp: chrono::Utc::now().to_rfc3339(),
144                traceparent: None,
145            },
146            method: "GET".to_string(),
147            uri: "/test".to_string(),
148            headers: HashMap::new(),
149        };
150
151        let response = client
152            .send_event(EventType::RequestHeaders, &event)
153            .await
154            .unwrap();
155
156        // Check response
157        assert_eq!(response.decision, Decision::Allow);
158        assert_eq!(response.request_headers.len(), 1);
159
160        // Clean up
161        client.close().await.unwrap();
162        server_handle.abort();
163    }
164
165    #[tokio::test]
166    async fn test_agent_protocol_denylist() {
167        let dir = tempdir().unwrap();
168        let socket_path = dir.path().join("denylist.sock");
169
170        // Start denylist agent server
171        let agent = DenylistAgent::new(vec!["/admin".to_string()], vec!["10.0.0.1".to_string()]);
172        let server = AgentServer::new("test-denylist", socket_path.clone(), Box::new(agent));
173
174        let server_handle = tokio::spawn(async move {
175            server.run().await.unwrap();
176        });
177
178        // Give server time to start
179        tokio::time::sleep(Duration::from_millis(100)).await;
180
181        // Connect client
182        let mut client =
183            AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
184                .await
185                .unwrap();
186
187        // Test blocked path
188        let event = RequestHeadersEvent {
189            metadata: RequestMetadata {
190                correlation_id: "test-123".to_string(),
191                request_id: "req-456".to_string(),
192                client_ip: "127.0.0.1".to_string(),
193                client_port: 12345,
194                server_name: Some("example.com".to_string()),
195                protocol: "HTTP/1.1".to_string(),
196                tls_version: None,
197                tls_cipher: None,
198                route_id: Some("default".to_string()),
199                upstream_id: Some("backend".to_string()),
200                timestamp: chrono::Utc::now().to_rfc3339(),
201                traceparent: None,
202            },
203            method: "GET".to_string(),
204            uri: "/admin/secret".to_string(),
205            headers: HashMap::new(),
206        };
207
208        let response = client
209            .send_event(EventType::RequestHeaders, &event)
210            .await
211            .unwrap();
212
213        // Check response is blocked
214        match response.decision {
215            Decision::Block { status, .. } => assert_eq!(status, 403),
216            _ => panic!("Expected block decision"),
217        }
218
219        // Clean up
220        client.close().await.unwrap();
221        server_handle.abort();
222    }
223
224    #[test]
225    fn test_body_mutation_types() {
226        // Test pass-through mutation
227        let pass_through = BodyMutation::pass_through(0);
228        assert!(pass_through.is_pass_through());
229        assert!(!pass_through.is_drop());
230        assert_eq!(pass_through.chunk_index, 0);
231
232        // Test drop mutation
233        let drop = BodyMutation::drop_chunk(1);
234        assert!(!drop.is_pass_through());
235        assert!(drop.is_drop());
236        assert_eq!(drop.chunk_index, 1);
237
238        // Test replace mutation
239        let replace = BodyMutation::replace(2, "modified content".to_string());
240        assert!(!replace.is_pass_through());
241        assert!(!replace.is_drop());
242        assert_eq!(replace.chunk_index, 2);
243        assert_eq!(replace.data, Some("modified content".to_string()));
244    }
245
246    #[test]
247    fn test_agent_response_streaming() {
248        // Test needs_more_data response
249        let response = AgentResponse::needs_more_data();
250        assert!(response.needs_more);
251        assert_eq!(response.decision, Decision::Allow);
252
253        // Test response with body mutation
254        let mutation = BodyMutation::replace(0, "new content".to_string());
255        let response = AgentResponse::default_allow().with_request_body_mutation(mutation.clone());
256        assert!(!response.needs_more);
257        assert!(response.request_body_mutation.is_some());
258        assert_eq!(
259            response.request_body_mutation.unwrap().data,
260            Some("new content".to_string())
261        );
262
263        // Test set_needs_more
264        let response = AgentResponse::default_allow().set_needs_more(true);
265        assert!(response.needs_more);
266    }
267}