Skip to main content

grapsus_proxy/agents/
mod.rs

1//! Agent integration module for Grapsus proxy.
2//!
3//! This module provides integration with external processing agents for WAF,
4//! auth, rate limiting, and custom logic. Agents communicate using the v2
5//! binary protocol with bidirectional streaming, capabilities, health
6//! reporting, metrics export, and flow control.
7//!
8//! # Architecture
9//!
10//! - [`AgentManager`]: Coordinates all agents, handles routing to appropriate agents
11//! - [`AgentV2`]: Agent with bidirectional streaming and connection pooling
12//! - [`AgentDecision`]: Combined result from processing through agents
13//! - [`AgentCallContext`]: Request context passed to agents
14//!
15//! # Queue Isolation
16//!
17//! Each agent has its own semaphore for queue isolation, preventing a slow agent
18//! from affecting other agents (noisy neighbor problem). Configure concurrency
19//! limits per-agent via `max_concurrent_calls` in the agent configuration.
20//!
21//! # Example
22//!
23//! ```ignore
24//! use grapsus_proxy::agents::{AgentManager, AgentCallContext};
25//!
26//! // Each agent manages its own concurrency limit (default: 100)
27//! let manager = AgentManager::new(agent_configs).await?;
28//! manager.initialize().await?;
29//!
30//! let decision = manager.process_request_headers(&ctx, headers, &["waf", "auth"]).await?;
31//! if !decision.is_allow() {
32//!     // Handle block/redirect/challenge
33//! }
34//! ```
35
36mod agent_v2;
37mod context;
38mod decision;
39mod manager;
40mod metrics;
41
42pub use agent_v2::AgentV2;
43pub use context::AgentCallContext;
44pub use decision::{AgentAction, AgentDecision};
45pub use manager::AgentManager;
46pub use metrics::AgentMetrics;
47
48#[cfg(test)]
49mod tests {
50    use super::*;
51    use std::time::Duration;
52    use grapsus_agent_protocol::HeaderOp;
53    use grapsus_common::types::CircuitBreakerConfig;
54    use grapsus_common::CircuitBreaker;
55
56    #[tokio::test]
57    async fn test_agent_decision_merge() {
58        let mut decision1 = AgentDecision::default_allow();
59        decision1.request_headers.push(HeaderOp::Set {
60            name: "X-Test".to_string(),
61            value: "1".to_string(),
62        });
63
64        let decision2 = AgentDecision::block(403, "Forbidden");
65
66        decision1.merge(decision2);
67        assert!(!decision1.is_allow());
68    }
69
70    #[tokio::test]
71    async fn test_circuit_breaker() {
72        let config = CircuitBreakerConfig {
73            failure_threshold: 3,
74            success_threshold: 2,
75            timeout_seconds: 1,
76            half_open_max_requests: 1,
77        };
78
79        let breaker = CircuitBreaker::new(config);
80        assert!(breaker.is_closed()); // Lock-free
81
82        // Record failures to open
83        for _ in 0..3 {
84            breaker.record_failure(); // Lock-free
85        }
86        assert!(!breaker.is_closed()); // Lock-free
87
88        // Wait for timeout
89        tokio::time::sleep(Duration::from_secs(2)).await;
90        assert!(breaker.is_closed()); // Should be half-open now (lock-free)
91    }
92
93    #[tokio::test]
94    async fn test_per_agent_queue_isolation_config() {
95        use std::path::PathBuf;
96        use grapsus_config::{AgentConfig, AgentEvent, AgentTransport, AgentType};
97
98        // Verify the max_concurrent_calls field works in AgentConfig
99        let config = AgentConfig {
100            id: "test-agent".to_string(),
101            agent_type: AgentType::Custom("test".to_string()),
102            transport: AgentTransport::UnixSocket {
103                path: PathBuf::from("/tmp/test.sock"),
104            },
105            events: vec![AgentEvent::RequestHeaders],
106            pool: None,
107            timeout_ms: 1000,
108            failure_mode: Default::default(),
109            circuit_breaker: None,
110            max_request_body_bytes: None,
111            max_response_body_bytes: None,
112            request_body_mode: Default::default(),
113            response_body_mode: Default::default(),
114            chunk_timeout_ms: 5000,
115            config: None,
116            max_concurrent_calls: 50, // Custom limit
117        };
118
119        assert_eq!(config.max_concurrent_calls, 50);
120
121        // Verify default value
122        let default_config = AgentConfig {
123            id: "default-agent".to_string(),
124            agent_type: AgentType::Custom("test".to_string()),
125            transport: AgentTransport::UnixSocket {
126                path: PathBuf::from("/tmp/default.sock"),
127            },
128            events: vec![AgentEvent::RequestHeaders],
129            pool: None,
130            timeout_ms: 1000,
131            failure_mode: Default::default(),
132            circuit_breaker: None,
133            max_request_body_bytes: None,
134            max_response_body_bytes: None,
135            request_body_mode: Default::default(),
136            response_body_mode: Default::default(),
137            chunk_timeout_ms: 5000,
138            config: None,
139            max_concurrent_calls: 100, // Default value
140        };
141
142        assert_eq!(default_config.max_concurrent_calls, 100);
143    }
144
145    #[tokio::test]
146    async fn test_agent_pool_config() {
147        use grapsus_config::{
148            AgentConfig, AgentEvent, AgentPoolConfig, AgentTransport, AgentType,
149            LoadBalanceStrategy,
150        };
151
152        let config = AgentConfig {
153            id: "pooled-agent".to_string(),
154            agent_type: AgentType::Waf,
155            transport: AgentTransport::Grpc {
156                address: "localhost:50051".to_string(),
157                tls: None,
158            },
159            events: vec![AgentEvent::RequestHeaders, AgentEvent::RequestBody],
160            pool: Some(AgentPoolConfig {
161                connections_per_agent: 8,
162                load_balance_strategy: LoadBalanceStrategy::LeastConnections,
163                connect_timeout_ms: 3000,
164                reconnect_interval_ms: 5000,
165                max_reconnect_attempts: 5,
166                drain_timeout_ms: 60000,
167                max_concurrent_per_connection: 200,
168                health_check_interval_ms: 5000,
169            }),
170            timeout_ms: 2000,
171            failure_mode: Default::default(),
172            circuit_breaker: None,
173            max_request_body_bytes: Some(1024 * 1024),
174            max_response_body_bytes: None,
175            request_body_mode: Default::default(),
176            response_body_mode: Default::default(),
177            chunk_timeout_ms: 5000,
178            config: None,
179            max_concurrent_calls: 100,
180        };
181
182        assert!(config.pool.is_some());
183        let pool = config.pool.unwrap();
184        assert_eq!(pool.connections_per_agent, 8);
185        assert_eq!(
186            pool.load_balance_strategy,
187            LoadBalanceStrategy::LeastConnections
188        );
189    }
190}