sentinel_proxy/agents/
mod.rs

1//! Agent integration module for Sentinel proxy.
2//!
3//! This module provides integration with external processing agents for WAF,
4//! auth, rate limiting, and custom logic. It implements the SPOE-inspired
5//! protocol with bounded behavior and failure isolation.
6//!
7//! # Architecture
8//!
9//! - [`AgentManager`]: Coordinates all agents, handles routing to appropriate agents
10//! - [`Agent`]: Individual agent with connection, circuit breaker, and metrics
11//! - [`AgentConnectionPool`]: Connection pooling for efficient connection reuse
12//! - [`AgentDecision`]: Combined result from processing through agents
13//! - [`AgentCallContext`]: Request context passed to agents
14//!
15//! # Queue Isolation
16//!
17//! Each agent has its own semaphore for queue isolation, preventing a slow agent
18//! from affecting other agents (noisy neighbor problem). Configure concurrency
19//! limits per-agent via `max_concurrent_calls` in the agent configuration.
20//!
21//! # Example
22//!
23//! ```ignore
24//! use sentinel_proxy::agents::{AgentManager, AgentCallContext};
25//!
26//! // Each agent manages its own concurrency limit (default: 100)
27//! let manager = AgentManager::new(agent_configs).await?;
28//! manager.initialize().await?;
29//!
30//! let decision = manager.process_request_headers(&ctx, &headers, &["waf", "auth"]).await?;
31//! if !decision.is_allow() {
32//!     // Handle block/redirect/challenge
33//! }
34//! ```
35
36mod agent;
37mod context;
38mod decision;
39mod manager;
40mod metrics;
41mod pool;
42
43pub use agent::Agent;
44pub use context::AgentCallContext;
45pub use decision::{AgentAction, AgentDecision};
46pub use manager::AgentManager;
47pub use metrics::AgentMetrics;
48pub use pool::AgentConnectionPool;
49
50#[cfg(test)]
51mod tests {
52    use super::*;
53    use sentinel_agent_protocol::HeaderOp;
54    use sentinel_common::types::CircuitBreakerConfig;
55    use sentinel_common::CircuitBreaker;
56    use std::time::Duration;
57
58    #[tokio::test]
59    async fn test_agent_decision_merge() {
60        let mut decision1 = AgentDecision::default_allow();
61        decision1.request_headers.push(HeaderOp::Set {
62            name: "X-Test".to_string(),
63            value: "1".to_string(),
64        });
65
66        let decision2 = AgentDecision::block(403, "Forbidden");
67
68        decision1.merge(decision2);
69        assert!(!decision1.is_allow());
70    }
71
72    #[tokio::test]
73    async fn test_circuit_breaker() {
74        let config = CircuitBreakerConfig {
75            failure_threshold: 3,
76            success_threshold: 2,
77            timeout_seconds: 1,
78            half_open_max_requests: 1,
79        };
80
81        let breaker = CircuitBreaker::new(config);
82        assert!(breaker.is_closed().await);
83
84        // Record failures to open
85        for _ in 0..3 {
86            breaker.record_failure().await;
87        }
88        assert!(!breaker.is_closed().await);
89
90        // Wait for timeout
91        tokio::time::sleep(Duration::from_secs(2)).await;
92        assert!(breaker.is_closed().await); // Should be half-open now
93    }
94
95    #[tokio::test]
96    async fn test_per_agent_queue_isolation_config() {
97        use sentinel_config::{AgentConfig, AgentTransport, AgentType, AgentEvent};
98        use std::path::PathBuf;
99
100        // Verify the max_concurrent_calls field works in AgentConfig
101        let config = AgentConfig {
102            id: "test-agent".to_string(),
103            agent_type: AgentType::Custom("test".to_string()),
104            transport: AgentTransport::UnixSocket {
105                path: PathBuf::from("/tmp/test.sock"),
106            },
107            events: vec![AgentEvent::RequestHeaders],
108            timeout_ms: 1000,
109            failure_mode: Default::default(),
110            circuit_breaker: None,
111            max_request_body_bytes: None,
112            max_response_body_bytes: None,
113            request_body_mode: Default::default(),
114            response_body_mode: Default::default(),
115            chunk_timeout_ms: 5000,
116            config: None,
117            max_concurrent_calls: 50, // Custom limit
118        };
119
120        assert_eq!(config.max_concurrent_calls, 50);
121
122        // Verify default value
123        let default_config = AgentConfig {
124            id: "default-agent".to_string(),
125            agent_type: AgentType::Custom("test".to_string()),
126            transport: AgentTransport::UnixSocket {
127                path: PathBuf::from("/tmp/default.sock"),
128            },
129            events: vec![AgentEvent::RequestHeaders],
130            timeout_ms: 1000,
131            failure_mode: Default::default(),
132            circuit_breaker: None,
133            max_request_body_bytes: None,
134            max_response_body_bytes: None,
135            request_body_mode: Default::default(),
136            response_body_mode: Default::default(),
137            chunk_timeout_ms: 5000,
138            config: None,
139            max_concurrent_calls: 100, // Default value
140        };
141
142        assert_eq!(default_config.max_concurrent_calls, 100);
143    }
144}