sentinel_proxy/agents/
agent.rs

1//! Individual agent implementation.
2
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, ConfigureEvent, Decision, EventType};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::{debug, error, info, trace, warn};
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16/// Individual agent configuration and state.
17pub struct Agent {
18    /// Agent configuration
19    pub(super) config: AgentConfig,
20    /// Agent client
21    pub(super) client: Arc<RwLock<Option<AgentClient>>>,
22    /// Connection pool
23    pub(super) pool: Arc<AgentConnectionPool>,
24    /// Circuit breaker
25    pub(super) circuit_breaker: Arc<CircuitBreaker>,
26    /// Agent-specific metrics
27    pub(super) metrics: Arc<AgentMetrics>,
28    /// Last successful call
29    pub(super) last_success: Arc<RwLock<Option<Instant>>>,
30    /// Consecutive failures
31    pub(super) consecutive_failures: AtomicU32,
32}
33
34impl Agent {
35    /// Create a new agent.
36    pub fn new(
37        config: AgentConfig,
38        pool: Arc<AgentConnectionPool>,
39        circuit_breaker: Arc<CircuitBreaker>,
40    ) -> Self {
41        trace!(
42            agent_id = %config.id,
43            agent_type = ?config.agent_type,
44            timeout_ms = config.timeout_ms,
45            events = ?config.events,
46            "Creating agent instance"
47        );
48        Self {
49            config,
50            client: Arc::new(RwLock::new(None)),
51            pool,
52            circuit_breaker,
53            metrics: Arc::new(AgentMetrics::default()),
54            last_success: Arc::new(RwLock::new(None)),
55            consecutive_failures: AtomicU32::new(0),
56        }
57    }
58
59    /// Get the agent ID.
60    pub fn id(&self) -> &str {
61        &self.config.id
62    }
63
64    /// Get the agent's circuit breaker.
65    pub fn circuit_breaker(&self) -> &CircuitBreaker {
66        &self.circuit_breaker
67    }
68
69    /// Get the agent's failure mode.
70    pub fn failure_mode(&self) -> sentinel_config::FailureMode {
71        self.config.failure_mode
72    }
73
74    /// Get the agent's timeout in milliseconds.
75    pub fn timeout_ms(&self) -> u64 {
76        self.config.timeout_ms
77    }
78
79    /// Get the agent's metrics.
80    pub fn metrics(&self) -> &AgentMetrics {
81        &self.metrics
82    }
83
84    /// Check if agent handles a specific event type.
85    pub fn handles_event(&self, event_type: EventType) -> bool {
86        self.config.events.iter().any(|e| match (e, event_type) {
87            (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
88            (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
89            (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
90            (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
91            (AgentEvent::Log, EventType::RequestComplete) => true,
92            (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
93            _ => false,
94        })
95    }
96
97    /// Initialize agent connection.
98    pub async fn initialize(&self) -> SentinelResult<()> {
99        let timeout = Duration::from_millis(self.config.timeout_ms);
100
101        debug!(
102            agent_id = %self.config.id,
103            transport = ?self.config.transport,
104            timeout_ms = self.config.timeout_ms,
105            "Initializing agent connection"
106        );
107
108        let start = Instant::now();
109
110        match &self.config.transport {
111            AgentTransport::UnixSocket { path } => {
112                trace!(
113                    agent_id = %self.config.id,
114                    socket_path = %path.display(),
115                    "Connecting to agent via Unix socket"
116                );
117
118                let client = AgentClient::unix_socket(&self.config.id, path, timeout)
119                    .await
120                    .map_err(|e| {
121                        error!(
122                            agent_id = %self.config.id,
123                            socket_path = %path.display(),
124                            error = %e,
125                            "Failed to connect to agent via Unix socket"
126                        );
127                        SentinelError::Agent {
128                            agent: self.config.id.clone(),
129                            message: format!("Failed to connect via Unix socket: {}", e),
130                            event: "initialize".to_string(),
131                            source: None,
132                        }
133                    })?;
134
135                *self.client.write().await = Some(client);
136
137                info!(
138                    agent_id = %self.config.id,
139                    socket_path = %path.display(),
140                    connect_time_ms = start.elapsed().as_millis(),
141                    "Agent connected via Unix socket"
142                );
143
144                // Send Configure event if config is present
145                self.send_configure_event().await?;
146
147                Ok(())
148            }
149            AgentTransport::Grpc { address, tls: _ } => {
150                trace!(
151                    agent_id = %self.config.id,
152                    address = %address,
153                    "Connecting to agent via gRPC"
154                );
155
156                // TODO: Add TLS support for gRPC connections
157                let client = AgentClient::grpc(&self.config.id, address, timeout)
158                    .await
159                    .map_err(|e| {
160                        error!(
161                            agent_id = %self.config.id,
162                            address = %address,
163                            error = %e,
164                            "Failed to connect to agent via gRPC"
165                        );
166                        SentinelError::Agent {
167                            agent: self.config.id.clone(),
168                            message: format!("Failed to connect via gRPC: {}", e),
169                            event: "initialize".to_string(),
170                            source: None,
171                        }
172                    })?;
173
174                *self.client.write().await = Some(client);
175
176                info!(
177                    agent_id = %self.config.id,
178                    address = %address,
179                    connect_time_ms = start.elapsed().as_millis(),
180                    "Agent connected via gRPC"
181                );
182
183                // Send Configure event if config is present
184                self.send_configure_event().await?;
185
186                Ok(())
187            }
188            AgentTransport::Http { url, tls: _ } => {
189                warn!(
190                    agent_id = %self.config.id,
191                    url = %url,
192                    "HTTP transport not yet implemented, agent will not be available"
193                );
194                Ok(())
195            }
196        }
197    }
198
199    /// Send Configure event to agent if config is present.
200    async fn send_configure_event(&self) -> SentinelResult<()> {
201        // Only send Configure if agent has config
202        let config = match &self.config.config {
203            Some(c) => c.clone(),
204            None => {
205                trace!(
206                    agent_id = %self.config.id,
207                    "No config for agent, skipping Configure event"
208                );
209                return Ok(());
210            }
211        };
212
213        let event = ConfigureEvent {
214            agent_id: self.config.id.clone(),
215            config,
216        };
217
218        debug!(
219            agent_id = %self.config.id,
220            "Sending Configure event to agent"
221        );
222
223        let mut client_guard = self.client.write().await;
224        let client = client_guard.as_mut().ok_or_else(|| SentinelError::Agent {
225            agent: self.config.id.clone(),
226            message: "No client connection for Configure event".to_string(),
227            event: "configure".to_string(),
228            source: None,
229        })?;
230
231        let response = client.send_event(EventType::Configure, &event).await.map_err(|e| {
232            error!(
233                agent_id = %self.config.id,
234                error = %e,
235                "Failed to send Configure event"
236            );
237            SentinelError::Agent {
238                agent: self.config.id.clone(),
239                message: format!("Configure event failed: {}", e),
240                event: "configure".to_string(),
241                source: None,
242            }
243        })?;
244
245        // Check if agent accepted the configuration
246        if !matches!(response.decision, Decision::Allow) {
247            error!(
248                agent_id = %self.config.id,
249                decision = ?response.decision,
250                "Agent rejected configuration"
251            );
252            return Err(SentinelError::Agent {
253                agent: self.config.id.clone(),
254                message: "Agent rejected configuration".to_string(),
255                event: "configure".to_string(),
256                source: None,
257            });
258        }
259
260        info!(
261            agent_id = %self.config.id,
262            "Agent accepted configuration"
263        );
264
265        Ok(())
266    }
267
268    /// Call agent with event.
269    pub async fn call_event<T: serde::Serialize>(
270        &self,
271        event_type: EventType,
272        event: &T,
273    ) -> SentinelResult<AgentResponse> {
274        trace!(
275            agent_id = %self.config.id,
276            event_type = ?event_type,
277            "Preparing to call agent"
278        );
279
280        // Get or create connection
281        let mut client_guard = self.client.write().await;
282
283        if client_guard.is_none() {
284            trace!(
285                agent_id = %self.config.id,
286                "No existing connection, initializing"
287            );
288            drop(client_guard);
289            self.initialize().await?;
290            client_guard = self.client.write().await;
291        }
292
293        let client = client_guard.as_mut().ok_or_else(|| {
294            error!(
295                agent_id = %self.config.id,
296                event_type = ?event_type,
297                "No client connection available after initialization"
298            );
299            SentinelError::Agent {
300                agent: self.config.id.clone(),
301                message: "No client connection".to_string(),
302                event: format!("{:?}", event_type),
303                source: None,
304            }
305        })?;
306
307        // Make the call
308        let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
309
310        trace!(
311            agent_id = %self.config.id,
312            event_type = ?event_type,
313            call_num = call_num,
314            "Sending event to agent"
315        );
316
317        let result = client.send_event(event_type, event).await;
318
319        // Handle result - clear stale connection on connection errors
320        match result {
321            Ok(response) => Ok(response),
322            Err(e) => {
323                let error_str = e.to_string();
324                let is_connection_error = error_str.contains("Broken pipe")
325                    || error_str.contains("Connection reset")
326                    || error_str.contains("Connection refused")
327                    || error_str.contains("not connected")
328                    || error_str.contains("transport error");
329
330                error!(
331                    agent_id = %self.config.id,
332                    event_type = ?event_type,
333                    error = %e,
334                    is_connection_error = is_connection_error,
335                    "Agent call failed"
336                );
337
338                // Drop the client guard to release the lock
339                drop(client_guard);
340
341                // Clear cached client on connection errors to force reconnect on next call
342                if is_connection_error {
343                    warn!(
344                        agent_id = %self.config.id,
345                        "Clearing cached client due to connection error, next call will reconnect"
346                    );
347                    *self.client.write().await = None;
348                }
349
350                Err(SentinelError::Agent {
351                    agent: self.config.id.clone(),
352                    message: e.to_string(),
353                    event: format!("{:?}", event_type),
354                    source: None,
355                })
356            }
357        }
358    }
359
360    /// Record successful call.
361    pub async fn record_success(&self, duration: Duration) {
362        let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
363        self.metrics
364            .duration_total_us
365            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
366        self.consecutive_failures.store(0, Ordering::Relaxed);
367        *self.last_success.write().await = Some(Instant::now());
368
369        trace!(
370            agent_id = %self.config.id,
371            duration_ms = duration.as_millis(),
372            total_successes = success_count,
373            "Recorded agent call success"
374        );
375
376        self.circuit_breaker.record_success().await;
377    }
378
379    /// Record failed call.
380    pub async fn record_failure(&self) {
381        let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
382        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
383
384        debug!(
385            agent_id = %self.config.id,
386            total_failures = fail_count,
387            consecutive_failures = consecutive,
388            "Recorded agent call failure"
389        );
390
391        self.circuit_breaker.record_failure().await;
392    }
393
394    /// Record timeout.
395    pub async fn record_timeout(&self) {
396        let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
397        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
398
399        debug!(
400            agent_id = %self.config.id,
401            total_timeouts = timeout_count,
402            consecutive_failures = consecutive,
403            timeout_ms = self.config.timeout_ms,
404            "Recorded agent call timeout"
405        );
406
407        self.circuit_breaker.record_failure().await;
408    }
409
410    /// Shutdown agent.
411    pub async fn shutdown(&self) {
412        debug!(
413            agent_id = %self.config.id,
414            "Shutting down agent"
415        );
416
417        if let Some(client) = self.client.write().await.take() {
418            trace!(
419                agent_id = %self.config.id,
420                "Closing agent client connection"
421            );
422            let _ = client.close().await;
423        }
424
425        let stats = (
426            self.metrics.calls_total.load(Ordering::Relaxed),
427            self.metrics.calls_success.load(Ordering::Relaxed),
428            self.metrics.calls_failed.load(Ordering::Relaxed),
429            self.metrics.calls_timeout.load(Ordering::Relaxed),
430        );
431
432        info!(
433            agent_id = %self.config.id,
434            total_calls = stats.0,
435            successes = stats.1,
436            failures = stats.2,
437            timeouts = stats.3,
438            "Agent shutdown complete"
439        );
440    }
441}