sentinel_agent_protocol/
lib.rs

1//! Agent protocol for Sentinel proxy
2//!
3//! This crate defines the protocol for communication between the proxy dataplane
4//! and external processing agents (WAF, auth, rate limiting, custom logic).
5//!
6//! The protocol is inspired by SPOE (Stream Processing Offload Engine) and Envoy's ext_proc,
7//! designed for bounded, predictable behavior with strong failure isolation.
8//!
9//! # Architecture
10//!
11//! - [`AgentClient`]: Client for sending events to agents from the proxy
12//! - [`AgentServer`]: Server for implementing agent handlers
13//! - [`AgentHandler`]: Trait for implementing agent logic
14//! - [`AgentResponse`]: Response from agent with decision and mutations
15//!
16//! # Transports
17//!
18//! Two transport options are supported:
19//!
20//! ## Unix Domain Sockets (Default)
21//! Messages are length-prefixed JSON:
22//! - 4-byte big-endian length prefix
23//! - JSON payload (max 10MB)
24//!
25//! ## gRPC
26//! Binary protocol using Protocol Buffers over HTTP/2:
27//! - Better performance for high-throughput scenarios
28//! - Native support for TLS/mTLS
29//! - Language-agnostic (agents can be written in any language with gRPC support)
30//!
31//! # Example: Client Usage (Unix Socket)
32//!
33//! ```ignore
34//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
35//!
36//! let mut client = AgentClient::unix_socket("my-agent", "/tmp/agent.sock", timeout).await?;
37//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
38//! ```
39//!
40//! # Example: Client Usage (gRPC)
41//!
42//! ```ignore
43//! use sentinel_agent_protocol::{AgentClient, EventType, RequestHeadersEvent};
44//!
45//! let mut client = AgentClient::grpc("my-agent", "http://localhost:50051", timeout).await?;
46//! let response = client.send_event(EventType::RequestHeaders, &event).await?;
47//! ```
48//!
49//! # Example: Server Implementation
50//!
51//! ```ignore
52//! use sentinel_agent_protocol::{AgentServer, AgentHandler, AgentResponse};
53//!
54//! struct MyAgent;
55//!
56//! #[async_trait]
57//! impl AgentHandler for MyAgent {
58//!     async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
59//!         // Implement your logic here
60//!         AgentResponse::default_allow()
61//!     }
62//! }
63//!
64//! let server = AgentServer::new("my-agent", "/tmp/agent.sock", Box::new(MyAgent));
65//! server.run().await?;
66//! ```
67
68#![allow(dead_code)]
69
70mod client;
71mod errors;
72mod protocol;
73mod server;
74
75/// gRPC protocol definitions generated from proto/agent.proto
76pub mod grpc {
77    tonic::include_proto!("sentinel.agent.v1");
78}
79
80// Re-export error types
81pub use errors::AgentProtocolError;
82
83// Re-export protocol types
84pub use protocol::{
85    AgentRequest, AgentResponse, AuditMetadata, Decision, EventType, HeaderOp,
86    RequestBodyChunkEvent, RequestCompleteEvent, RequestHeadersEvent, RequestMetadata,
87    ResponseBodyChunkEvent, ResponseHeadersEvent, MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
88};
89
90// Re-export client
91pub use client::AgentClient;
92
93// Re-export server and handler
94pub use server::{AgentHandler, AgentServer, DenylistAgent, EchoAgent, GrpcAgentHandler, GrpcAgentServer};
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use std::collections::HashMap;
100    use std::time::Duration;
101    use tempfile::tempdir;
102
103    #[tokio::test]
104    async fn test_agent_protocol_echo() {
105        let dir = tempdir().unwrap();
106        let socket_path = dir.path().join("test.sock");
107
108        // Start echo agent server
109        let server = AgentServer::new("test-echo", socket_path.clone(), Box::new(EchoAgent));
110
111        let server_handle = tokio::spawn(async move {
112            server.run().await.unwrap();
113        });
114
115        // Give server time to start
116        tokio::time::sleep(Duration::from_millis(100)).await;
117
118        // Connect client
119        let mut client =
120            AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
121                .await
122                .unwrap();
123
124        // Send request headers event
125        let event = RequestHeadersEvent {
126            metadata: RequestMetadata {
127                correlation_id: "test-123".to_string(),
128                request_id: "req-456".to_string(),
129                client_ip: "127.0.0.1".to_string(),
130                client_port: 12345,
131                server_name: Some("example.com".to_string()),
132                protocol: "HTTP/1.1".to_string(),
133                tls_version: None,
134                tls_cipher: None,
135                route_id: Some("default".to_string()),
136                upstream_id: Some("backend".to_string()),
137                timestamp: chrono::Utc::now().to_rfc3339(),
138            },
139            method: "GET".to_string(),
140            uri: "/test".to_string(),
141            headers: HashMap::new(),
142        };
143
144        let response = client
145            .send_event(EventType::RequestHeaders, &event)
146            .await
147            .unwrap();
148
149        // Check response
150        assert_eq!(response.decision, Decision::Allow);
151        assert_eq!(response.request_headers.len(), 1);
152
153        // Clean up
154        client.close().await.unwrap();
155        server_handle.abort();
156    }
157
158    #[tokio::test]
159    async fn test_agent_protocol_denylist() {
160        let dir = tempdir().unwrap();
161        let socket_path = dir.path().join("denylist.sock");
162
163        // Start denylist agent server
164        let agent = DenylistAgent::new(vec!["/admin".to_string()], vec!["10.0.0.1".to_string()]);
165        let server = AgentServer::new("test-denylist", socket_path.clone(), Box::new(agent));
166
167        let server_handle = tokio::spawn(async move {
168            server.run().await.unwrap();
169        });
170
171        // Give server time to start
172        tokio::time::sleep(Duration::from_millis(100)).await;
173
174        // Connect client
175        let mut client =
176            AgentClient::unix_socket("test-client", &socket_path, Duration::from_secs(5))
177                .await
178                .unwrap();
179
180        // Test blocked path
181        let event = RequestHeadersEvent {
182            metadata: RequestMetadata {
183                correlation_id: "test-123".to_string(),
184                request_id: "req-456".to_string(),
185                client_ip: "127.0.0.1".to_string(),
186                client_port: 12345,
187                server_name: Some("example.com".to_string()),
188                protocol: "HTTP/1.1".to_string(),
189                tls_version: None,
190                tls_cipher: None,
191                route_id: Some("default".to_string()),
192                upstream_id: Some("backend".to_string()),
193                timestamp: chrono::Utc::now().to_rfc3339(),
194            },
195            method: "GET".to_string(),
196            uri: "/admin/secret".to_string(),
197            headers: HashMap::new(),
198        };
199
200        let response = client
201            .send_event(EventType::RequestHeaders, &event)
202            .await
203            .unwrap();
204
205        // Check response is blocked
206        match response.decision {
207            Decision::Block { status, .. } => assert_eq!(status, 403),
208            _ => panic!("Expected block decision"),
209        }
210
211        // Clean up
212        client.close().await.unwrap();
213        server_handle.abort();
214    }
215}