sentinel_proxy/agents/
agent.rs

1//! Individual agent implementation.
2
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, EventType};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::warn;
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16/// Individual agent configuration and state.
17pub struct Agent {
18    /// Agent configuration
19    pub(super) config: AgentConfig,
20    /// Agent client
21    pub(super) client: Arc<RwLock<Option<AgentClient>>>,
22    /// Connection pool
23    pub(super) pool: Arc<AgentConnectionPool>,
24    /// Circuit breaker
25    pub(super) circuit_breaker: Arc<CircuitBreaker>,
26    /// Agent-specific metrics
27    pub(super) metrics: Arc<AgentMetrics>,
28    /// Last successful call
29    pub(super) last_success: Arc<RwLock<Option<Instant>>>,
30    /// Consecutive failures
31    pub(super) consecutive_failures: AtomicU32,
32}
33
34impl Agent {
35    /// Create a new agent.
36    pub fn new(
37        config: AgentConfig,
38        pool: Arc<AgentConnectionPool>,
39        circuit_breaker: Arc<CircuitBreaker>,
40    ) -> Self {
41        Self {
42            config,
43            client: Arc::new(RwLock::new(None)),
44            pool,
45            circuit_breaker,
46            metrics: Arc::new(AgentMetrics::default()),
47            last_success: Arc::new(RwLock::new(None)),
48            consecutive_failures: AtomicU32::new(0),
49        }
50    }
51
52    /// Get the agent ID.
53    pub fn id(&self) -> &str {
54        &self.config.id
55    }
56
57    /// Get the agent's circuit breaker.
58    pub fn circuit_breaker(&self) -> &CircuitBreaker {
59        &self.circuit_breaker
60    }
61
62    /// Get the agent's failure mode.
63    pub fn failure_mode(&self) -> sentinel_config::FailureMode {
64        self.config.failure_mode.clone()
65    }
66
67    /// Get the agent's timeout in milliseconds.
68    pub fn timeout_ms(&self) -> u64 {
69        self.config.timeout_ms
70    }
71
72    /// Get the agent's metrics.
73    pub fn metrics(&self) -> &AgentMetrics {
74        &self.metrics
75    }
76
77    /// Check if agent handles a specific event type.
78    pub fn handles_event(&self, event_type: EventType) -> bool {
79        self.config.events.iter().any(|e| match (e, event_type) {
80            (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
81            (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
82            (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
83            (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
84            (AgentEvent::Log, EventType::RequestComplete) => true,
85            _ => false,
86        })
87    }
88
89    /// Initialize agent connection.
90    pub async fn initialize(&self) -> SentinelResult<()> {
91        let timeout = Duration::from_millis(self.config.timeout_ms);
92
93        match &self.config.transport {
94            AgentTransport::UnixSocket { path } => {
95                let client = AgentClient::unix_socket(&self.config.id, path, timeout)
96                    .await
97                    .map_err(|e| SentinelError::Agent {
98                        agent: self.config.id.clone(),
99                        message: format!("Failed to connect via Unix socket: {}", e),
100                        event: "initialize".to_string(),
101                        source: None,
102                    })?;
103
104                *self.client.write().await = Some(client);
105                Ok(())
106            }
107            AgentTransport::Grpc { address, tls: _ } => {
108                // TODO: Add TLS support for gRPC connections
109                let client = AgentClient::grpc(&self.config.id, address, timeout)
110                    .await
111                    .map_err(|e| SentinelError::Agent {
112                        agent: self.config.id.clone(),
113                        message: format!("Failed to connect via gRPC: {}", e),
114                        event: "initialize".to_string(),
115                        source: None,
116                    })?;
117
118                *self.client.write().await = Some(client);
119                Ok(())
120            }
121            AgentTransport::Http { url, tls: _ } => {
122                warn!(
123                    agent = %self.config.id,
124                    url = %url,
125                    "HTTP transport not yet implemented, agent will not be available"
126                );
127                Ok(())
128            }
129        }
130    }
131
132    /// Call agent with event.
133    pub async fn call_event<T: serde::Serialize>(
134        &self,
135        event_type: EventType,
136        event: &T,
137    ) -> SentinelResult<AgentResponse> {
138        // Get or create connection
139        let mut client_guard = self.client.write().await;
140
141        if client_guard.is_none() {
142            drop(client_guard);
143            self.initialize().await?;
144            client_guard = self.client.write().await;
145        }
146
147        let client = client_guard.as_mut().ok_or_else(|| SentinelError::Agent {
148            agent: self.config.id.clone(),
149            message: "No client connection".to_string(),
150            event: format!("{:?}", event_type),
151            source: None,
152        })?;
153
154        // Make the call
155        self.metrics.calls_total.fetch_add(1, Ordering::Relaxed);
156
157        client.send_event(event_type, event).await.map_err(|e| {
158            SentinelError::Agent {
159                agent: self.config.id.clone(),
160                message: e.to_string(),
161                event: format!("{:?}", event_type),
162                source: None,
163            }
164        })
165    }
166
167    /// Record successful call.
168    pub async fn record_success(&self, duration: Duration) {
169        self.metrics.calls_success.fetch_add(1, Ordering::Relaxed);
170        self.metrics
171            .duration_total_us
172            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
173        self.consecutive_failures.store(0, Ordering::Relaxed);
174        *self.last_success.write().await = Some(Instant::now());
175
176        self.circuit_breaker.record_success().await;
177    }
178
179    /// Record failed call.
180    pub async fn record_failure(&self) {
181        self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed);
182        self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
183
184        self.circuit_breaker.record_failure().await;
185    }
186
187    /// Record timeout.
188    pub async fn record_timeout(&self) {
189        self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed);
190        self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
191
192        self.circuit_breaker.record_failure().await;
193    }
194
195    /// Shutdown agent.
196    pub async fn shutdown(&self) {
197        if let Some(client) = self.client.write().await.take() {
198            let _ = client.close().await;
199        }
200    }
201}