Skip to main content

sentinel_proxy/agents/
mod.rs

1//! Agent integration module for Sentinel proxy.
2//!
3//! This module provides integration with external processing agents for WAF,
4//! auth, rate limiting, and custom logic. It implements the SPOE-inspired
5//! protocol with bounded behavior and failure isolation.
6//!
7//! # Architecture
8//!
9//! - [`AgentManager`]: Coordinates all agents, handles routing to appropriate agents
10//! - [`Agent`]: Protocol v1 agent with connection, circuit breaker, and metrics
11//! - [`AgentV2`]: Protocol v2 agent with bidirectional streaming and pooling
12//! - [`AgentConnectionPool`]: Connection pooling for efficient connection reuse (v1)
13//! - [`AgentDecision`]: Combined result from processing through agents
14//! - [`AgentCallContext`]: Request context passed to agents
15//!
16//! # Protocol Versions
17//!
18//! - **V1**: Simple request/response protocol (backwards compatible)
19//! - **V2**: Bidirectional streaming with capabilities, health reporting,
20//!   metrics export, and flow control
21//!
22//! # Queue Isolation
23//!
24//! Each agent has its own semaphore for queue isolation, preventing a slow agent
25//! from affecting other agents (noisy neighbor problem). Configure concurrency
26//! limits per-agent via `max_concurrent_calls` in the agent configuration.
27//!
28//! # Example
29//!
30//! ```ignore
31//! use sentinel_proxy::agents::{AgentManager, AgentCallContext};
32//!
33//! // Each agent manages its own concurrency limit (default: 100)
34//! let manager = AgentManager::new(agent_configs).await?;
35//! manager.initialize().await?;
36//!
37//! let decision = manager.process_request_headers(&ctx, headers, &["waf", "auth"]).await?;
38//! if !decision.is_allow() {
39//!     // Handle block/redirect/challenge
40//! }
41//! ```
42
43mod agent;
44mod agent_v2;
45mod context;
46mod decision;
47mod manager;
48mod metrics;
49mod pool;
50
51pub use agent::Agent;
52pub use agent_v2::AgentV2;
53pub use context::AgentCallContext;
54pub use decision::{AgentAction, AgentDecision};
55pub use manager::AgentManager;
56pub use metrics::AgentMetrics;
57pub use pool::AgentConnectionPool;
58
59#[cfg(test)]
60mod tests {
61    use super::*;
62    use sentinel_agent_protocol::HeaderOp;
63    use sentinel_common::types::CircuitBreakerConfig;
64    use sentinel_common::CircuitBreaker;
65    use std::time::Duration;
66
67    #[tokio::test]
68    async fn test_agent_decision_merge() {
69        let mut decision1 = AgentDecision::default_allow();
70        decision1.request_headers.push(HeaderOp::Set {
71            name: "X-Test".to_string(),
72            value: "1".to_string(),
73        });
74
75        let decision2 = AgentDecision::block(403, "Forbidden");
76
77        decision1.merge(decision2);
78        assert!(!decision1.is_allow());
79    }
80
81    #[tokio::test]
82    async fn test_circuit_breaker() {
83        let config = CircuitBreakerConfig {
84            failure_threshold: 3,
85            success_threshold: 2,
86            timeout_seconds: 1,
87            half_open_max_requests: 1,
88        };
89
90        let breaker = CircuitBreaker::new(config);
91        assert!(breaker.is_closed()); // Lock-free
92
93        // Record failures to open
94        for _ in 0..3 {
95            breaker.record_failure(); // Lock-free
96        }
97        assert!(!breaker.is_closed()); // Lock-free
98
99        // Wait for timeout
100        tokio::time::sleep(Duration::from_secs(2)).await;
101        assert!(breaker.is_closed()); // Should be half-open now (lock-free)
102    }
103
104    #[tokio::test]
105    async fn test_per_agent_queue_isolation_config() {
106        use sentinel_config::{
107            AgentConfig, AgentEvent, AgentProtocolVersion, AgentTransport, AgentType,
108        };
109        use std::path::PathBuf;
110
111        // Verify the max_concurrent_calls field works in AgentConfig
112        let config = AgentConfig {
113            id: "test-agent".to_string(),
114            agent_type: AgentType::Custom("test".to_string()),
115            transport: AgentTransport::UnixSocket {
116                path: PathBuf::from("/tmp/test.sock"),
117            },
118            events: vec![AgentEvent::RequestHeaders],
119            protocol_version: AgentProtocolVersion::V1,
120            pool: None,
121            timeout_ms: 1000,
122            failure_mode: Default::default(),
123            circuit_breaker: None,
124            max_request_body_bytes: None,
125            max_response_body_bytes: None,
126            request_body_mode: Default::default(),
127            response_body_mode: Default::default(),
128            chunk_timeout_ms: 5000,
129            config: None,
130            max_concurrent_calls: 50, // Custom limit
131        };
132
133        assert_eq!(config.max_concurrent_calls, 50);
134
135        // Verify default value
136        let default_config = AgentConfig {
137            id: "default-agent".to_string(),
138            agent_type: AgentType::Custom("test".to_string()),
139            transport: AgentTransport::UnixSocket {
140                path: PathBuf::from("/tmp/default.sock"),
141            },
142            events: vec![AgentEvent::RequestHeaders],
143            protocol_version: AgentProtocolVersion::V1,
144            pool: None,
145            timeout_ms: 1000,
146            failure_mode: Default::default(),
147            circuit_breaker: None,
148            max_request_body_bytes: None,
149            max_response_body_bytes: None,
150            request_body_mode: Default::default(),
151            response_body_mode: Default::default(),
152            chunk_timeout_ms: 5000,
153            config: None,
154            max_concurrent_calls: 100, // Default value
155        };
156
157        assert_eq!(default_config.max_concurrent_calls, 100);
158    }
159
160    #[tokio::test]
161    async fn test_v2_agent_config() {
162        use sentinel_config::{
163            AgentConfig, AgentEvent, AgentPoolConfig, AgentProtocolVersion, AgentTransport,
164            AgentType, LoadBalanceStrategy,
165        };
166
167        let config = AgentConfig {
168            id: "v2-agent".to_string(),
169            agent_type: AgentType::Waf,
170            transport: AgentTransport::Grpc {
171                address: "localhost:50051".to_string(),
172                tls: None,
173            },
174            events: vec![AgentEvent::RequestHeaders, AgentEvent::RequestBody],
175            protocol_version: AgentProtocolVersion::V2,
176            pool: Some(AgentPoolConfig {
177                connections_per_agent: 8,
178                load_balance_strategy: LoadBalanceStrategy::LeastConnections,
179                connect_timeout_ms: 3000,
180                reconnect_interval_ms: 5000,
181                max_reconnect_attempts: 5,
182                drain_timeout_ms: 60000,
183                max_concurrent_per_connection: 200,
184                health_check_interval_ms: 5000,
185            }),
186            timeout_ms: 2000,
187            failure_mode: Default::default(),
188            circuit_breaker: None,
189            max_request_body_bytes: Some(1024 * 1024),
190            max_response_body_bytes: None,
191            request_body_mode: Default::default(),
192            response_body_mode: Default::default(),
193            chunk_timeout_ms: 5000,
194            config: None,
195            max_concurrent_calls: 100,
196        };
197
198        assert_eq!(config.protocol_version, AgentProtocolVersion::V2);
199        assert!(config.pool.is_some());
200        let pool = config.pool.unwrap();
201        assert_eq!(pool.connections_per_agent, 8);
202        assert_eq!(
203            pool.load_balance_strategy,
204            LoadBalanceStrategy::LeastConnections
205        );
206    }
207}