Skip to main content

sentinel_proxy/agents/
mod.rs

1//! Agent integration module for Sentinel proxy.
2//!
3//! This module provides integration with external processing agents for WAF,
4//! auth, rate limiting, and custom logic. It implements the SPOE-inspired
5//! protocol with bounded behavior and failure isolation.
6//!
7//! # Architecture
8//!
9//! - [`AgentManager`]: Coordinates all agents, handles routing to appropriate agents
10//! - [`Agent`]: Protocol v1 agent with connection, circuit breaker, and metrics
11//! - [`AgentV2`]: Protocol v2 agent with bidirectional streaming and pooling
12//! - [`AgentConnectionPool`]: Connection pooling for efficient connection reuse (v1)
13//! - [`AgentDecision`]: Combined result from processing through agents
14//! - [`AgentCallContext`]: Request context passed to agents
15//!
16//! # Protocol Versions
17//!
18//! - **V1**: Simple request/response protocol (backwards compatible)
19//! - **V2**: Bidirectional streaming with capabilities, health reporting,
20//!           metrics export, and flow control
21//!
22//! # Queue Isolation
23//!
24//! Each agent has its own semaphore for queue isolation, preventing a slow agent
25//! from affecting other agents (noisy neighbor problem). Configure concurrency
26//! limits per-agent via `max_concurrent_calls` in the agent configuration.
27//!
28//! # Example
29//!
30//! ```ignore
31//! use sentinel_proxy::agents::{AgentManager, AgentCallContext};
32//!
33//! // Each agent manages its own concurrency limit (default: 100)
34//! let manager = AgentManager::new(agent_configs).await?;
35//! manager.initialize().await?;
36//!
37//! let decision = manager.process_request_headers(&ctx, headers, &["waf", "auth"]).await?;
38//! if !decision.is_allow() {
39//!     // Handle block/redirect/challenge
40//! }
41//! ```
42
43mod agent;
44mod agent_v2;
45mod context;
46mod decision;
47mod manager;
48mod metrics;
49mod pool;
50
51pub use agent::Agent;
52pub use agent_v2::AgentV2;
53pub use context::AgentCallContext;
54pub use decision::{AgentAction, AgentDecision};
55pub use manager::AgentManager;
56pub use metrics::AgentMetrics;
57pub use pool::AgentConnectionPool;
58
59#[cfg(test)]
60mod tests {
61    use super::*;
62    use sentinel_agent_protocol::HeaderOp;
63    use sentinel_common::types::CircuitBreakerConfig;
64    use sentinel_common::CircuitBreaker;
65    use std::time::Duration;
66
67    #[tokio::test]
68    async fn test_agent_decision_merge() {
69        let mut decision1 = AgentDecision::default_allow();
70        decision1.request_headers.push(HeaderOp::Set {
71            name: "X-Test".to_string(),
72            value: "1".to_string(),
73        });
74
75        let decision2 = AgentDecision::block(403, "Forbidden");
76
77        decision1.merge(decision2);
78        assert!(!decision1.is_allow());
79    }
80
81    #[tokio::test]
82    async fn test_circuit_breaker() {
83        let config = CircuitBreakerConfig {
84            failure_threshold: 3,
85            success_threshold: 2,
86            timeout_seconds: 1,
87            half_open_max_requests: 1,
88        };
89
90        let breaker = CircuitBreaker::new(config);
91        assert!(breaker.is_closed()); // Lock-free
92
93        // Record failures to open
94        for _ in 0..3 {
95            breaker.record_failure(); // Lock-free
96        }
97        assert!(!breaker.is_closed()); // Lock-free
98
99        // Wait for timeout
100        tokio::time::sleep(Duration::from_secs(2)).await;
101        assert!(breaker.is_closed()); // Should be half-open now (lock-free)
102    }
103
104    #[tokio::test]
105    async fn test_per_agent_queue_isolation_config() {
106        use sentinel_config::{AgentConfig, AgentTransport, AgentType, AgentEvent, AgentProtocolVersion};
107        use std::path::PathBuf;
108
109        // Verify the max_concurrent_calls field works in AgentConfig
110        let config = AgentConfig {
111            id: "test-agent".to_string(),
112            agent_type: AgentType::Custom("test".to_string()),
113            transport: AgentTransport::UnixSocket {
114                path: PathBuf::from("/tmp/test.sock"),
115            },
116            events: vec![AgentEvent::RequestHeaders],
117            protocol_version: AgentProtocolVersion::V1,
118            pool: None,
119            timeout_ms: 1000,
120            failure_mode: Default::default(),
121            circuit_breaker: None,
122            max_request_body_bytes: None,
123            max_response_body_bytes: None,
124            request_body_mode: Default::default(),
125            response_body_mode: Default::default(),
126            chunk_timeout_ms: 5000,
127            config: None,
128            max_concurrent_calls: 50, // Custom limit
129        };
130
131        assert_eq!(config.max_concurrent_calls, 50);
132
133        // Verify default value
134        let default_config = AgentConfig {
135            id: "default-agent".to_string(),
136            agent_type: AgentType::Custom("test".to_string()),
137            transport: AgentTransport::UnixSocket {
138                path: PathBuf::from("/tmp/default.sock"),
139            },
140            events: vec![AgentEvent::RequestHeaders],
141            protocol_version: AgentProtocolVersion::V1,
142            pool: None,
143            timeout_ms: 1000,
144            failure_mode: Default::default(),
145            circuit_breaker: None,
146            max_request_body_bytes: None,
147            max_response_body_bytes: None,
148            request_body_mode: Default::default(),
149            response_body_mode: Default::default(),
150            chunk_timeout_ms: 5000,
151            config: None,
152            max_concurrent_calls: 100, // Default value
153        };
154
155        assert_eq!(default_config.max_concurrent_calls, 100);
156    }
157
158    #[tokio::test]
159    async fn test_v2_agent_config() {
160        use sentinel_config::{
161            AgentConfig, AgentTransport, AgentType, AgentEvent,
162            AgentProtocolVersion, AgentPoolConfig, LoadBalanceStrategy,
163        };
164
165        let config = AgentConfig {
166            id: "v2-agent".to_string(),
167            agent_type: AgentType::Waf,
168            transport: AgentTransport::Grpc {
169                address: "localhost:50051".to_string(),
170                tls: None,
171            },
172            events: vec![AgentEvent::RequestHeaders, AgentEvent::RequestBody],
173            protocol_version: AgentProtocolVersion::V2,
174            pool: Some(AgentPoolConfig {
175                connections_per_agent: 8,
176                load_balance_strategy: LoadBalanceStrategy::LeastConnections,
177                connect_timeout_ms: 3000,
178                reconnect_interval_ms: 5000,
179                max_reconnect_attempts: 5,
180                drain_timeout_ms: 60000,
181                max_concurrent_per_connection: 200,
182                health_check_interval_ms: 5000,
183            }),
184            timeout_ms: 2000,
185            failure_mode: Default::default(),
186            circuit_breaker: None,
187            max_request_body_bytes: Some(1024 * 1024),
188            max_response_body_bytes: None,
189            request_body_mode: Default::default(),
190            response_body_mode: Default::default(),
191            chunk_timeout_ms: 5000,
192            config: None,
193            max_concurrent_calls: 100,
194        };
195
196        assert_eq!(config.protocol_version, AgentProtocolVersion::V2);
197        assert!(config.pool.is_some());
198        let pool = config.pool.unwrap();
199        assert_eq!(pool.connections_per_agent, 8);
200        assert_eq!(pool.load_balance_strategy, LoadBalanceStrategy::LeastConnections);
201    }
202}