sentinel_proxy/agents/
agent.rs

1//! Individual agent implementation.
2
3use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, ConfigureEvent, Decision, EventType, GrpcTlsConfig, HttpTlsConfig};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::{debug, error, info, trace, warn};
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16/// Sentinel value indicating no timestamp recorded (Option::None equivalent)
17const NO_TIMESTAMP: u64 = 0;
18
19/// Individual agent configuration and state.
20pub struct Agent {
21    /// Agent configuration
22    pub(super) config: AgentConfig,
23    /// Agent client
24    pub(super) client: Arc<RwLock<Option<AgentClient>>>,
25    /// Connection pool
26    pub(super) pool: Arc<AgentConnectionPool>,
27    /// Circuit breaker
28    pub(super) circuit_breaker: Arc<CircuitBreaker>,
29    /// Agent-specific metrics
30    pub(super) metrics: Arc<AgentMetrics>,
31    /// Base instant for timestamp calculations
32    pub(super) base_instant: Instant,
33    /// Last successful call (nanoseconds since base_instant, 0 = never)
34    pub(super) last_success_ns: AtomicU64,
35    /// Consecutive failures
36    pub(super) consecutive_failures: AtomicU32,
37}
38
39impl Agent {
40    /// Create a new agent.
41    pub fn new(
42        config: AgentConfig,
43        pool: Arc<AgentConnectionPool>,
44        circuit_breaker: Arc<CircuitBreaker>,
45    ) -> Self {
46        trace!(
47            agent_id = %config.id,
48            agent_type = ?config.agent_type,
49            timeout_ms = config.timeout_ms,
50            events = ?config.events,
51            "Creating agent instance"
52        );
53        Self {
54            config,
55            client: Arc::new(RwLock::new(None)),
56            pool,
57            circuit_breaker,
58            metrics: Arc::new(AgentMetrics::default()),
59            base_instant: Instant::now(),
60            last_success_ns: AtomicU64::new(NO_TIMESTAMP),
61            consecutive_failures: AtomicU32::new(0),
62        }
63    }
64
65    /// Get the agent ID.
66    pub fn id(&self) -> &str {
67        &self.config.id
68    }
69
70    /// Get the agent's circuit breaker.
71    pub fn circuit_breaker(&self) -> &CircuitBreaker {
72        &self.circuit_breaker
73    }
74
75    /// Get the agent's failure mode.
76    pub fn failure_mode(&self) -> sentinel_config::FailureMode {
77        self.config.failure_mode
78    }
79
80    /// Get the agent's timeout in milliseconds.
81    pub fn timeout_ms(&self) -> u64 {
82        self.config.timeout_ms
83    }
84
85    /// Get the agent's metrics.
86    pub fn metrics(&self) -> &AgentMetrics {
87        &self.metrics
88    }
89
90    /// Check if agent handles a specific event type.
91    pub fn handles_event(&self, event_type: EventType) -> bool {
92        self.config.events.iter().any(|e| match (e, event_type) {
93            (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
94            (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
95            (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
96            (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
97            (AgentEvent::Log, EventType::RequestComplete) => true,
98            (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
99            _ => false,
100        })
101    }
102
103    /// Initialize agent connection.
104    pub async fn initialize(&self) -> SentinelResult<()> {
105        let timeout = Duration::from_millis(self.config.timeout_ms);
106
107        debug!(
108            agent_id = %self.config.id,
109            transport = ?self.config.transport,
110            timeout_ms = self.config.timeout_ms,
111            "Initializing agent connection"
112        );
113
114        let start = Instant::now();
115
116        match &self.config.transport {
117            AgentTransport::UnixSocket { path } => {
118                trace!(
119                    agent_id = %self.config.id,
120                    socket_path = %path.display(),
121                    "Connecting to agent via Unix socket"
122                );
123
124                let client = AgentClient::unix_socket(&self.config.id, path, timeout)
125                    .await
126                    .map_err(|e| {
127                        error!(
128                            agent_id = %self.config.id,
129                            socket_path = %path.display(),
130                            error = %e,
131                            "Failed to connect to agent via Unix socket"
132                        );
133                        SentinelError::Agent {
134                            agent: self.config.id.clone(),
135                            message: format!("Failed to connect via Unix socket: {}", e),
136                            event: "initialize".to_string(),
137                            source: None,
138                        }
139                    })?;
140
141                *self.client.write().await = Some(client);
142
143                info!(
144                    agent_id = %self.config.id,
145                    socket_path = %path.display(),
146                    connect_time_ms = start.elapsed().as_millis(),
147                    "Agent connected via Unix socket"
148                );
149
150                // Send Configure event if config is present
151                self.send_configure_event().await?;
152
153                Ok(())
154            }
155            AgentTransport::Grpc { address, tls } => {
156                trace!(
157                    agent_id = %self.config.id,
158                    address = %address,
159                    tls_enabled = tls.is_some(),
160                    "Connecting to agent via gRPC"
161                );
162
163                let client = match tls {
164                    Some(tls_config) => {
165                        // Build TLS configuration
166                        let mut grpc_tls = GrpcTlsConfig::new();
167
168                        // Load CA certificate if provided
169                        if let Some(ca_path) = &tls_config.ca_cert {
170                            grpc_tls = grpc_tls.with_ca_cert_file(ca_path).await.map_err(|e| {
171                                error!(
172                                    agent_id = %self.config.id,
173                                    ca_path = %ca_path.display(),
174                                    error = %e,
175                                    "Failed to load CA certificate for gRPC TLS"
176                                );
177                                SentinelError::Agent {
178                                    agent: self.config.id.clone(),
179                                    message: format!("Failed to load CA certificate: {}", e),
180                                    event: "initialize".to_string(),
181                                    source: None,
182                                }
183                            })?;
184                        }
185
186                        // Load client certificate and key for mTLS if provided
187                        if let (Some(cert_path), Some(key_path)) = (&tls_config.client_cert, &tls_config.client_key) {
188                            grpc_tls = grpc_tls.with_client_cert_files(cert_path, key_path).await.map_err(|e| {
189                                error!(
190                                    agent_id = %self.config.id,
191                                    cert_path = %cert_path.display(),
192                                    key_path = %key_path.display(),
193                                    error = %e,
194                                    "Failed to load client certificate for gRPC mTLS"
195                                );
196                                SentinelError::Agent {
197                                    agent: self.config.id.clone(),
198                                    message: format!("Failed to load client certificate: {}", e),
199                                    event: "initialize".to_string(),
200                                    source: None,
201                                }
202                            })?;
203                        }
204
205                        // Handle insecure skip verify
206                        if tls_config.insecure_skip_verify {
207                            warn!(
208                                agent_id = %self.config.id,
209                                address = %address,
210                                "SECURITY WARNING: TLS certificate verification disabled for agent"
211                            );
212                            grpc_tls = grpc_tls.with_insecure_skip_verify();
213                        }
214
215                        debug!(
216                            agent_id = %self.config.id,
217                            address = %address,
218                            has_ca_cert = tls_config.ca_cert.is_some(),
219                            has_client_cert = tls_config.client_cert.is_some(),
220                            "Connecting to agent via gRPC with TLS"
221                        );
222
223                        AgentClient::grpc_tls(&self.config.id, address, timeout, grpc_tls)
224                            .await
225                            .map_err(|e| {
226                                error!(
227                                    agent_id = %self.config.id,
228                                    address = %address,
229                                    error = %e,
230                                    "Failed to connect to agent via gRPC with TLS"
231                                );
232                                SentinelError::Agent {
233                                    agent: self.config.id.clone(),
234                                    message: format!("Failed to connect via gRPC TLS: {}", e),
235                                    event: "initialize".to_string(),
236                                    source: None,
237                                }
238                            })?
239                    }
240                    None => {
241                        // Plain gRPC without TLS
242                        AgentClient::grpc(&self.config.id, address, timeout)
243                            .await
244                            .map_err(|e| {
245                                error!(
246                                    agent_id = %self.config.id,
247                                    address = %address,
248                                    error = %e,
249                                    "Failed to connect to agent via gRPC"
250                                );
251                                SentinelError::Agent {
252                                    agent: self.config.id.clone(),
253                                    message: format!("Failed to connect via gRPC: {}", e),
254                                    event: "initialize".to_string(),
255                                    source: None,
256                                }
257                            })?
258                    }
259                };
260
261                *self.client.write().await = Some(client);
262
263                info!(
264                    agent_id = %self.config.id,
265                    address = %address,
266                    tls_enabled = tls.is_some(),
267                    connect_time_ms = start.elapsed().as_millis(),
268                    "Agent connected via gRPC"
269                );
270
271                // Send Configure event if config is present
272                self.send_configure_event().await?;
273
274                Ok(())
275            }
276            AgentTransport::Http { url, tls } => {
277                trace!(
278                    agent_id = %self.config.id,
279                    url = %url,
280                    tls_enabled = tls.is_some(),
281                    "Connecting to agent via HTTP"
282                );
283
284                let client = match tls {
285                    Some(tls_config) => {
286                        // Build TLS configuration
287                        let mut http_tls = HttpTlsConfig::new();
288
289                        // Load CA certificate if provided
290                        if let Some(ca_path) = &tls_config.ca_cert {
291                            http_tls = http_tls.with_ca_cert_file(ca_path).await.map_err(|e| {
292                                error!(
293                                    agent_id = %self.config.id,
294                                    ca_path = %ca_path.display(),
295                                    error = %e,
296                                    "Failed to load CA certificate for HTTP TLS"
297                                );
298                                SentinelError::Agent {
299                                    agent: self.config.id.clone(),
300                                    message: format!("Failed to load CA certificate: {}", e),
301                                    event: "initialize".to_string(),
302                                    source: None,
303                                }
304                            })?;
305                        }
306
307                        // Load client certificate and key for mTLS if provided
308                        if let (Some(cert_path), Some(key_path)) = (&tls_config.client_cert, &tls_config.client_key) {
309                            http_tls = http_tls.with_client_cert_files(cert_path, key_path).await.map_err(|e| {
310                                error!(
311                                    agent_id = %self.config.id,
312                                    cert_path = %cert_path.display(),
313                                    key_path = %key_path.display(),
314                                    error = %e,
315                                    "Failed to load client certificate for HTTP mTLS"
316                                );
317                                SentinelError::Agent {
318                                    agent: self.config.id.clone(),
319                                    message: format!("Failed to load client certificate: {}", e),
320                                    event: "initialize".to_string(),
321                                    source: None,
322                                }
323                            })?;
324                        }
325
326                        // Handle insecure skip verify
327                        if tls_config.insecure_skip_verify {
328                            warn!(
329                                agent_id = %self.config.id,
330                                url = %url,
331                                "SECURITY WARNING: TLS certificate verification disabled for HTTP agent"
332                            );
333                            http_tls = http_tls.with_insecure_skip_verify();
334                        }
335
336                        debug!(
337                            agent_id = %self.config.id,
338                            url = %url,
339                            has_ca_cert = tls_config.ca_cert.is_some(),
340                            has_client_cert = tls_config.client_cert.is_some(),
341                            "Connecting to agent via HTTP with TLS"
342                        );
343
344                        AgentClient::http_tls(&self.config.id, url, timeout, http_tls)
345                            .await
346                            .map_err(|e| {
347                                error!(
348                                    agent_id = %self.config.id,
349                                    url = %url,
350                                    error = %e,
351                                    "Failed to create HTTP TLS agent client"
352                                );
353                                SentinelError::Agent {
354                                    agent: self.config.id.clone(),
355                                    message: format!("Failed to create HTTP TLS client: {}", e),
356                                    event: "initialize".to_string(),
357                                    source: None,
358                                }
359                            })?
360                    }
361                    None => {
362                        // Plain HTTP without TLS
363                        AgentClient::http(&self.config.id, url, timeout)
364                            .await
365                            .map_err(|e| {
366                                error!(
367                                    agent_id = %self.config.id,
368                                    url = %url,
369                                    error = %e,
370                                    "Failed to create HTTP agent client"
371                                );
372                                SentinelError::Agent {
373                                    agent: self.config.id.clone(),
374                                    message: format!("Failed to create HTTP client: {}", e),
375                                    event: "initialize".to_string(),
376                                    source: None,
377                                }
378                            })?
379                    }
380                };
381
382                *self.client.write().await = Some(client);
383
384                info!(
385                    agent_id = %self.config.id,
386                    url = %url,
387                    tls_enabled = tls.is_some(),
388                    connect_time_ms = start.elapsed().as_millis(),
389                    "Agent connected via HTTP"
390                );
391
392                // Send Configure event if config is present
393                self.send_configure_event().await?;
394
395                Ok(())
396            }
397        }
398    }
399
400    /// Create a new client connection (for pooling).
401    ///
402    /// This creates a new AgentClient without storing it in `self.client`.
403    /// Use this when you need a new connection for the pool.
404    async fn create_client(&self) -> SentinelResult<AgentClient> {
405        let timeout = Duration::from_millis(self.config.timeout_ms);
406
407        trace!(
408            agent_id = %self.config.id,
409            transport = ?self.config.transport,
410            "Creating new client connection for pool"
411        );
412
413        match &self.config.transport {
414            AgentTransport::UnixSocket { path } => {
415                AgentClient::unix_socket(&self.config.id, path, timeout)
416                    .await
417                    .map_err(|e| {
418                        error!(
419                            agent_id = %self.config.id,
420                            socket_path = %path.display(),
421                            error = %e,
422                            "Failed to create Unix socket client"
423                        );
424                        SentinelError::Agent {
425                            agent: self.config.id.clone(),
426                            message: format!("Failed to connect via Unix socket: {}", e),
427                            event: "create_client".to_string(),
428                            source: None,
429                        }
430                    })
431            }
432            AgentTransport::Grpc { address, tls } => {
433                match tls {
434                    Some(tls_config) => {
435                        let mut grpc_tls = GrpcTlsConfig::new();
436
437                        if let Some(ca_path) = &tls_config.ca_cert {
438                            grpc_tls = grpc_tls.with_ca_cert_file(ca_path).await.map_err(|e| {
439                                SentinelError::Agent {
440                                    agent: self.config.id.clone(),
441                                    message: format!("Failed to load CA certificate: {}", e),
442                                    event: "create_client".to_string(),
443                                    source: None,
444                                }
445                            })?;
446                        }
447
448                        if let (Some(cert_path), Some(key_path)) = (&tls_config.client_cert, &tls_config.client_key) {
449                            grpc_tls = grpc_tls.with_client_cert_files(cert_path, key_path).await.map_err(|e| {
450                                SentinelError::Agent {
451                                    agent: self.config.id.clone(),
452                                    message: format!("Failed to load client certificate: {}", e),
453                                    event: "create_client".to_string(),
454                                    source: None,
455                                }
456                            })?;
457                        }
458
459                        if tls_config.insecure_skip_verify {
460                            grpc_tls = grpc_tls.with_insecure_skip_verify();
461                        }
462
463                        AgentClient::grpc_tls(&self.config.id, address, timeout, grpc_tls)
464                            .await
465                            .map_err(|e| {
466                                SentinelError::Agent {
467                                    agent: self.config.id.clone(),
468                                    message: format!("Failed to connect via gRPC TLS: {}", e),
469                                    event: "create_client".to_string(),
470                                    source: None,
471                                }
472                            })
473                    }
474                    None => {
475                        AgentClient::grpc(&self.config.id, address, timeout)
476                            .await
477                            .map_err(|e| {
478                                SentinelError::Agent {
479                                    agent: self.config.id.clone(),
480                                    message: format!("Failed to connect via gRPC: {}", e),
481                                    event: "create_client".to_string(),
482                                    source: None,
483                                }
484                            })
485                    }
486                }
487            }
488            AgentTransport::Http { url, tls } => {
489                match tls {
490                    Some(tls_config) => {
491                        let mut http_tls = HttpTlsConfig::new();
492
493                        if let Some(ca_path) = &tls_config.ca_cert {
494                            http_tls = http_tls.with_ca_cert_file(ca_path).await.map_err(|e| {
495                                SentinelError::Agent {
496                                    agent: self.config.id.clone(),
497                                    message: format!("Failed to load CA certificate: {}", e),
498                                    event: "create_client".to_string(),
499                                    source: None,
500                                }
501                            })?;
502                        }
503
504                        if let (Some(cert_path), Some(key_path)) = (&tls_config.client_cert, &tls_config.client_key) {
505                            http_tls = http_tls.with_client_cert_files(cert_path, key_path).await.map_err(|e| {
506                                SentinelError::Agent {
507                                    agent: self.config.id.clone(),
508                                    message: format!("Failed to load client certificate: {}", e),
509                                    event: "create_client".to_string(),
510                                    source: None,
511                                }
512                            })?;
513                        }
514
515                        if tls_config.insecure_skip_verify {
516                            http_tls = http_tls.with_insecure_skip_verify();
517                        }
518
519                        AgentClient::http_tls(&self.config.id, url, timeout, http_tls)
520                            .await
521                            .map_err(|e| {
522                                SentinelError::Agent {
523                                    agent: self.config.id.clone(),
524                                    message: format!("Failed to create HTTP TLS client: {}", e),
525                                    event: "create_client".to_string(),
526                                    source: None,
527                                }
528                            })
529                    }
530                    None => {
531                        AgentClient::http(&self.config.id, url, timeout)
532                            .await
533                            .map_err(|e| {
534                                SentinelError::Agent {
535                                    agent: self.config.id.clone(),
536                                    message: format!("Failed to create HTTP client: {}", e),
537                                    event: "create_client".to_string(),
538                                    source: None,
539                                }
540                            })
541                    }
542                }
543            }
544        }
545    }
546
547    /// Send Configure event to agent if config is present.
548    async fn send_configure_event(&self) -> SentinelResult<()> {
549        // Only send Configure if agent has config
550        let config = match &self.config.config {
551            Some(c) => c.clone(),
552            None => {
553                trace!(
554                    agent_id = %self.config.id,
555                    "No config for agent, skipping Configure event"
556                );
557                return Ok(());
558            }
559        };
560
561        let event = ConfigureEvent {
562            agent_id: self.config.id.clone(),
563            config,
564        };
565
566        debug!(
567            agent_id = %self.config.id,
568            "Sending Configure event to agent"
569        );
570
571        let mut client_guard = self.client.write().await;
572        let client = client_guard.as_mut().ok_or_else(|| SentinelError::Agent {
573            agent: self.config.id.clone(),
574            message: "No client connection for Configure event".to_string(),
575            event: "configure".to_string(),
576            source: None,
577        })?;
578
579        let response = client.send_event(EventType::Configure, &event).await.map_err(|e| {
580            error!(
581                agent_id = %self.config.id,
582                error = %e,
583                "Failed to send Configure event"
584            );
585            SentinelError::Agent {
586                agent: self.config.id.clone(),
587                message: format!("Configure event failed: {}", e),
588                event: "configure".to_string(),
589                source: None,
590            }
591        })?;
592
593        // Check if agent accepted the configuration
594        if !matches!(response.decision, Decision::Allow) {
595            error!(
596                agent_id = %self.config.id,
597                decision = ?response.decision,
598                "Agent rejected configuration"
599            );
600            return Err(SentinelError::Agent {
601                agent: self.config.id.clone(),
602                message: "Agent rejected configuration".to_string(),
603                event: "configure".to_string(),
604                source: None,
605            });
606        }
607
608        info!(
609            agent_id = %self.config.id,
610            "Agent accepted configuration"
611        );
612
613        Ok(())
614    }
615
616    /// Call agent with event.
617    ///
618    /// Uses connection pooling for concurrent request handling. Multiple requests
619    /// can execute simultaneously using different connections from the pool.
620    pub async fn call_event<T: serde::Serialize>(
621        &self,
622        event_type: EventType,
623        event: &T,
624    ) -> SentinelResult<AgentResponse> {
625        trace!(
626            agent_id = %self.config.id,
627            event_type = ?event_type,
628            "Preparing to call agent"
629        );
630
631        // Try to get a connection from the pool (fast path)
632        let mut pooled_conn = self.pool.try_get();
633
634        // If no pooled connection, try to create a new one
635        if pooled_conn.is_none() {
636            if self.pool.can_create() {
637                trace!(
638                    agent_id = %self.config.id,
639                    "No pooled connection available, creating new connection"
640                );
641                match self.create_client().await {
642                    Ok(client) => {
643                        self.pool.register_created();
644                        pooled_conn = Some(super::pool::PooledConnection::new(client));
645                    }
646                    Err(e) => {
647                        error!(
648                            agent_id = %self.config.id,
649                            error = %e,
650                            "Failed to create new connection"
651                        );
652                        return Err(e);
653                    }
654                }
655            } else {
656                // Pool is at capacity, fall back to single client with lock
657                // This ensures we don't create unbounded connections
658                trace!(
659                    agent_id = %self.config.id,
660                    "Pool at capacity, using fallback client"
661                );
662                return self.call_event_fallback(event_type, event).await;
663            }
664        }
665
666        let mut conn = pooled_conn.expect("Connection should be available");
667
668        // Make the call
669        let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
670
671        trace!(
672            agent_id = %self.config.id,
673            event_type = ?event_type,
674            call_num = call_num,
675            pool_active = self.pool.active_count(),
676            "Sending event to agent via pooled connection"
677        );
678
679        let result = conn.client.send_event(event_type, event).await;
680
681        // Handle result
682        match result {
683            Ok(response) => {
684                // Return connection to pool on success
685                self.pool.return_connection(conn);
686                Ok(response)
687            }
688            Err(e) => {
689                let error_str = e.to_string();
690                let is_connection_error = error_str.contains("Broken pipe")
691                    || error_str.contains("Connection reset")
692                    || error_str.contains("Connection refused")
693                    || error_str.contains("not connected")
694                    || error_str.contains("transport error");
695
696                error!(
697                    agent_id = %self.config.id,
698                    event_type = ?event_type,
699                    error = %e,
700                    is_connection_error = is_connection_error,
701                    "Agent call failed"
702                );
703
704                // Don't return connection to pool on error - mark it as failed
705                self.pool.mark_failed();
706                // Connection will be dropped here
707
708                Err(SentinelError::Agent {
709                    agent: self.config.id.clone(),
710                    message: e.to_string(),
711                    event: format!("{:?}", event_type),
712                    source: None,
713                })
714            }
715        }
716    }
717
718    /// Fallback call method using the single cached client (for when pool is exhausted).
719    async fn call_event_fallback<T: serde::Serialize>(
720        &self,
721        event_type: EventType,
722        event: &T,
723    ) -> SentinelResult<AgentResponse> {
724        // Get or create connection using the fallback single client
725        let mut client_guard = self.client.write().await;
726
727        if client_guard.is_none() {
728            trace!(
729                agent_id = %self.config.id,
730                "No existing fallback connection, initializing"
731            );
732            drop(client_guard);
733            self.initialize().await?;
734            client_guard = self.client.write().await;
735        }
736
737        let client = client_guard.as_mut().ok_or_else(|| {
738            error!(
739                agent_id = %self.config.id,
740                event_type = ?event_type,
741                "No client connection available after initialization"
742            );
743            SentinelError::Agent {
744                agent: self.config.id.clone(),
745                message: "No client connection".to_string(),
746                event: format!("{:?}", event_type),
747                source: None,
748            }
749        })?;
750
751        // Make the call
752        let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
753
754        trace!(
755            agent_id = %self.config.id,
756            event_type = ?event_type,
757            call_num = call_num,
758            "Sending event to agent via fallback client"
759        );
760
761        let result = client.send_event(event_type, event).await;
762
763        match result {
764            Ok(response) => Ok(response),
765            Err(e) => {
766                let error_str = e.to_string();
767                let is_connection_error = error_str.contains("Broken pipe")
768                    || error_str.contains("Connection reset")
769                    || error_str.contains("Connection refused")
770                    || error_str.contains("not connected")
771                    || error_str.contains("transport error");
772
773                error!(
774                    agent_id = %self.config.id,
775                    event_type = ?event_type,
776                    error = %e,
777                    is_connection_error = is_connection_error,
778                    "Agent call failed (fallback)"
779                );
780
781                drop(client_guard);
782
783                if is_connection_error {
784                    warn!(
785                        agent_id = %self.config.id,
786                        "Clearing cached fallback client due to connection error"
787                    );
788                    *self.client.write().await = None;
789                }
790
791                Err(SentinelError::Agent {
792                    agent: self.config.id.clone(),
793                    message: e.to_string(),
794                    event: format!("{:?}", event_type),
795                    source: None,
796                })
797            }
798        }
799    }
800
801    /// Record successful call (lock-free).
802    pub async fn record_success(&self, duration: Duration) {
803        let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
804        self.metrics
805            .duration_total_us
806            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
807        self.consecutive_failures.store(0, Ordering::Relaxed);
808        // Store timestamp as nanoseconds since base_instant (lock-free)
809        self.last_success_ns.store(
810            self.base_instant.elapsed().as_nanos() as u64,
811            Ordering::Relaxed,
812        );
813
814        trace!(
815            agent_id = %self.config.id,
816            duration_ms = duration.as_millis(),
817            total_successes = success_count,
818            "Recorded agent call success"
819        );
820
821        self.circuit_breaker.record_success(); // Lock-free
822    }
823
824    /// Get the time since last successful call (for monitoring).
825    /// Returns None if no successful call has been recorded.
826    #[inline]
827    pub fn time_since_last_success(&self) -> Option<Duration> {
828        let last_ns = self.last_success_ns.load(Ordering::Relaxed);
829        if last_ns == NO_TIMESTAMP {
830            return None;
831        }
832        let current_ns = self.base_instant.elapsed().as_nanos() as u64;
833        Some(Duration::from_nanos(current_ns.saturating_sub(last_ns)))
834    }
835
836    /// Record failed call.
837    pub async fn record_failure(&self) {
838        let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
839        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
840
841        debug!(
842            agent_id = %self.config.id,
843            total_failures = fail_count,
844            consecutive_failures = consecutive,
845            "Recorded agent call failure"
846        );
847
848        self.circuit_breaker.record_failure(); // Lock-free
849    }
850
851    /// Record timeout.
852    pub async fn record_timeout(&self) {
853        let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
854        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
855
856        debug!(
857            agent_id = %self.config.id,
858            total_timeouts = timeout_count,
859            consecutive_failures = consecutive,
860            timeout_ms = self.config.timeout_ms,
861            "Recorded agent call timeout"
862        );
863
864        self.circuit_breaker.record_failure(); // Lock-free
865    }
866
867    /// Shutdown agent.
868    pub async fn shutdown(&self) {
869        debug!(
870            agent_id = %self.config.id,
871            "Shutting down agent"
872        );
873
874        if let Some(client) = self.client.write().await.take() {
875            trace!(
876                agent_id = %self.config.id,
877                "Closing agent client connection"
878            );
879            let _ = client.close().await;
880        }
881
882        let stats = (
883            self.metrics.calls_total.load(Ordering::Relaxed),
884            self.metrics.calls_success.load(Ordering::Relaxed),
885            self.metrics.calls_failed.load(Ordering::Relaxed),
886            self.metrics.calls_timeout.load(Ordering::Relaxed),
887        );
888
889        info!(
890            agent_id = %self.config.id,
891            total_calls = stats.0,
892            successes = stats.1,
893            failures = stats.2,
894            timeouts = stats.3,
895            "Agent shutdown complete"
896        );
897    }
898}