sentinel_proxy/agents/
agent.rs

1//! Individual agent implementation.
2
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, EventType};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::{debug, error, info, trace, warn};
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16/// Individual agent configuration and state.
17pub struct Agent {
18    /// Agent configuration
19    pub(super) config: AgentConfig,
20    /// Agent client
21    pub(super) client: Arc<RwLock<Option<AgentClient>>>,
22    /// Connection pool
23    pub(super) pool: Arc<AgentConnectionPool>,
24    /// Circuit breaker
25    pub(super) circuit_breaker: Arc<CircuitBreaker>,
26    /// Agent-specific metrics
27    pub(super) metrics: Arc<AgentMetrics>,
28    /// Last successful call
29    pub(super) last_success: Arc<RwLock<Option<Instant>>>,
30    /// Consecutive failures
31    pub(super) consecutive_failures: AtomicU32,
32}
33
34impl Agent {
35    /// Create a new agent.
36    pub fn new(
37        config: AgentConfig,
38        pool: Arc<AgentConnectionPool>,
39        circuit_breaker: Arc<CircuitBreaker>,
40    ) -> Self {
41        trace!(
42            agent_id = %config.id,
43            agent_type = ?config.agent_type,
44            timeout_ms = config.timeout_ms,
45            events = ?config.events,
46            "Creating agent instance"
47        );
48        Self {
49            config,
50            client: Arc::new(RwLock::new(None)),
51            pool,
52            circuit_breaker,
53            metrics: Arc::new(AgentMetrics::default()),
54            last_success: Arc::new(RwLock::new(None)),
55            consecutive_failures: AtomicU32::new(0),
56        }
57    }
58
59    /// Get the agent ID.
60    pub fn id(&self) -> &str {
61        &self.config.id
62    }
63
64    /// Get the agent's circuit breaker.
65    pub fn circuit_breaker(&self) -> &CircuitBreaker {
66        &self.circuit_breaker
67    }
68
69    /// Get the agent's failure mode.
70    pub fn failure_mode(&self) -> sentinel_config::FailureMode {
71        self.config.failure_mode.clone()
72    }
73
74    /// Get the agent's timeout in milliseconds.
75    pub fn timeout_ms(&self) -> u64 {
76        self.config.timeout_ms
77    }
78
79    /// Get the agent's metrics.
80    pub fn metrics(&self) -> &AgentMetrics {
81        &self.metrics
82    }
83
84    /// Check if agent handles a specific event type.
85    pub fn handles_event(&self, event_type: EventType) -> bool {
86        self.config.events.iter().any(|e| match (e, event_type) {
87            (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
88            (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
89            (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
90            (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
91            (AgentEvent::Log, EventType::RequestComplete) => true,
92            _ => false,
93        })
94    }
95
96    /// Initialize agent connection.
97    pub async fn initialize(&self) -> SentinelResult<()> {
98        let timeout = Duration::from_millis(self.config.timeout_ms);
99
100        debug!(
101            agent_id = %self.config.id,
102            transport = ?self.config.transport,
103            timeout_ms = self.config.timeout_ms,
104            "Initializing agent connection"
105        );
106
107        let start = Instant::now();
108
109        match &self.config.transport {
110            AgentTransport::UnixSocket { path } => {
111                trace!(
112                    agent_id = %self.config.id,
113                    socket_path = %path.display(),
114                    "Connecting to agent via Unix socket"
115                );
116
117                let client = AgentClient::unix_socket(&self.config.id, path, timeout)
118                    .await
119                    .map_err(|e| {
120                        error!(
121                            agent_id = %self.config.id,
122                            socket_path = %path.display(),
123                            error = %e,
124                            "Failed to connect to agent via Unix socket"
125                        );
126                        SentinelError::Agent {
127                            agent: self.config.id.clone(),
128                            message: format!("Failed to connect via Unix socket: {}", e),
129                            event: "initialize".to_string(),
130                            source: None,
131                        }
132                    })?;
133
134                *self.client.write().await = Some(client);
135
136                info!(
137                    agent_id = %self.config.id,
138                    socket_path = %path.display(),
139                    connect_time_ms = start.elapsed().as_millis(),
140                    "Agent connected via Unix socket"
141                );
142                Ok(())
143            }
144            AgentTransport::Grpc { address, tls: _ } => {
145                trace!(
146                    agent_id = %self.config.id,
147                    address = %address,
148                    "Connecting to agent via gRPC"
149                );
150
151                // TODO: Add TLS support for gRPC connections
152                let client = AgentClient::grpc(&self.config.id, address, timeout)
153                    .await
154                    .map_err(|e| {
155                        error!(
156                            agent_id = %self.config.id,
157                            address = %address,
158                            error = %e,
159                            "Failed to connect to agent via gRPC"
160                        );
161                        SentinelError::Agent {
162                            agent: self.config.id.clone(),
163                            message: format!("Failed to connect via gRPC: {}", e),
164                            event: "initialize".to_string(),
165                            source: None,
166                        }
167                    })?;
168
169                *self.client.write().await = Some(client);
170
171                info!(
172                    agent_id = %self.config.id,
173                    address = %address,
174                    connect_time_ms = start.elapsed().as_millis(),
175                    "Agent connected via gRPC"
176                );
177                Ok(())
178            }
179            AgentTransport::Http { url, tls: _ } => {
180                warn!(
181                    agent_id = %self.config.id,
182                    url = %url,
183                    "HTTP transport not yet implemented, agent will not be available"
184                );
185                Ok(())
186            }
187        }
188    }
189
190    /// Call agent with event.
191    pub async fn call_event<T: serde::Serialize>(
192        &self,
193        event_type: EventType,
194        event: &T,
195    ) -> SentinelResult<AgentResponse> {
196        trace!(
197            agent_id = %self.config.id,
198            event_type = ?event_type,
199            "Preparing to call agent"
200        );
201
202        // Get or create connection
203        let mut client_guard = self.client.write().await;
204
205        if client_guard.is_none() {
206            trace!(
207                agent_id = %self.config.id,
208                "No existing connection, initializing"
209            );
210            drop(client_guard);
211            self.initialize().await?;
212            client_guard = self.client.write().await;
213        }
214
215        let client = client_guard.as_mut().ok_or_else(|| {
216            error!(
217                agent_id = %self.config.id,
218                event_type = ?event_type,
219                "No client connection available after initialization"
220            );
221            SentinelError::Agent {
222                agent: self.config.id.clone(),
223                message: "No client connection".to_string(),
224                event: format!("{:?}", event_type),
225                source: None,
226            }
227        })?;
228
229        // Make the call
230        let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
231
232        trace!(
233            agent_id = %self.config.id,
234            event_type = ?event_type,
235            call_num = call_num,
236            "Sending event to agent"
237        );
238
239        client.send_event(event_type, event).await.map_err(|e| {
240            error!(
241                agent_id = %self.config.id,
242                event_type = ?event_type,
243                error = %e,
244                "Agent call failed"
245            );
246            SentinelError::Agent {
247                agent: self.config.id.clone(),
248                message: e.to_string(),
249                event: format!("{:?}", event_type),
250                source: None,
251            }
252        })
253    }
254
255    /// Record successful call.
256    pub async fn record_success(&self, duration: Duration) {
257        let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
258        self.metrics
259            .duration_total_us
260            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
261        self.consecutive_failures.store(0, Ordering::Relaxed);
262        *self.last_success.write().await = Some(Instant::now());
263
264        trace!(
265            agent_id = %self.config.id,
266            duration_ms = duration.as_millis(),
267            total_successes = success_count,
268            "Recorded agent call success"
269        );
270
271        self.circuit_breaker.record_success().await;
272    }
273
274    /// Record failed call.
275    pub async fn record_failure(&self) {
276        let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
277        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
278
279        debug!(
280            agent_id = %self.config.id,
281            total_failures = fail_count,
282            consecutive_failures = consecutive,
283            "Recorded agent call failure"
284        );
285
286        self.circuit_breaker.record_failure().await;
287    }
288
289    /// Record timeout.
290    pub async fn record_timeout(&self) {
291        let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
292        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
293
294        debug!(
295            agent_id = %self.config.id,
296            total_timeouts = timeout_count,
297            consecutive_failures = consecutive,
298            timeout_ms = self.config.timeout_ms,
299            "Recorded agent call timeout"
300        );
301
302        self.circuit_breaker.record_failure().await;
303    }
304
305    /// Shutdown agent.
306    pub async fn shutdown(&self) {
307        debug!(
308            agent_id = %self.config.id,
309            "Shutting down agent"
310        );
311
312        if let Some(client) = self.client.write().await.take() {
313            trace!(
314                agent_id = %self.config.id,
315                "Closing agent client connection"
316            );
317            let _ = client.close().await;
318        }
319
320        let stats = (
321            self.metrics.calls_total.load(Ordering::Relaxed),
322            self.metrics.calls_success.load(Ordering::Relaxed),
323            self.metrics.calls_failed.load(Ordering::Relaxed),
324            self.metrics.calls_timeout.load(Ordering::Relaxed),
325        );
326
327        info!(
328            agent_id = %self.config.id,
329            total_calls = stats.0,
330            successes = stats.1,
331            failures = stats.2,
332            timeouts = stats.3,
333            "Agent shutdown complete"
334        );
335    }
336}