Skip to main content

sentinel_proxy/agents/
agent.rs

1//! Individual agent implementation.
2
3use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{
8    AgentClient, AgentResponse, ConfigureEvent, Decision, EventType, GrpcTlsConfig, HttpTlsConfig,
9};
10use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
11use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
12use tokio::sync::RwLock;
13use tracing::{debug, error, info, trace, warn};
14
15use super::metrics::AgentMetrics;
16use super::pool::AgentConnectionPool;
17
18/// Sentinel value indicating no timestamp recorded (Option::None equivalent)
19const NO_TIMESTAMP: u64 = 0;
20
21/// Individual agent configuration and state.
22pub struct Agent {
23    /// Agent configuration
24    pub(super) config: AgentConfig,
25    /// Agent client
26    pub(super) client: Arc<RwLock<Option<AgentClient>>>,
27    /// Connection pool
28    pub(super) pool: Arc<AgentConnectionPool>,
29    /// Circuit breaker
30    pub(super) circuit_breaker: Arc<CircuitBreaker>,
31    /// Agent-specific metrics
32    pub(super) metrics: Arc<AgentMetrics>,
33    /// Base instant for timestamp calculations
34    pub(super) base_instant: Instant,
35    /// Last successful call (nanoseconds since base_instant, 0 = never)
36    pub(super) last_success_ns: AtomicU64,
37    /// Consecutive failures
38    pub(super) consecutive_failures: AtomicU32,
39}
40
41impl Agent {
42    /// Create a new agent.
43    pub fn new(
44        config: AgentConfig,
45        pool: Arc<AgentConnectionPool>,
46        circuit_breaker: Arc<CircuitBreaker>,
47    ) -> Self {
48        trace!(
49            agent_id = %config.id,
50            agent_type = ?config.agent_type,
51            timeout_ms = config.timeout_ms,
52            events = ?config.events,
53            "Creating agent instance"
54        );
55        Self {
56            config,
57            client: Arc::new(RwLock::new(None)),
58            pool,
59            circuit_breaker,
60            metrics: Arc::new(AgentMetrics::default()),
61            base_instant: Instant::now(),
62            last_success_ns: AtomicU64::new(NO_TIMESTAMP),
63            consecutive_failures: AtomicU32::new(0),
64        }
65    }
66
67    /// Get the agent ID.
68    pub fn id(&self) -> &str {
69        &self.config.id
70    }
71
72    /// Get the agent's circuit breaker.
73    pub fn circuit_breaker(&self) -> &CircuitBreaker {
74        &self.circuit_breaker
75    }
76
77    /// Get the agent's failure mode.
78    pub fn failure_mode(&self) -> sentinel_config::FailureMode {
79        self.config.failure_mode
80    }
81
82    /// Get the agent's timeout in milliseconds.
83    pub fn timeout_ms(&self) -> u64 {
84        self.config.timeout_ms
85    }
86
87    /// Get the agent's metrics.
88    pub fn metrics(&self) -> &AgentMetrics {
89        &self.metrics
90    }
91
92    /// Check if agent handles a specific event type.
93    pub fn handles_event(&self, event_type: EventType) -> bool {
94        self.config.events.iter().any(|e| match (e, event_type) {
95            (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
96            (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
97            (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
98            (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
99            (AgentEvent::Log, EventType::RequestComplete) => true,
100            (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
101            _ => false,
102        })
103    }
104
105    /// Initialize agent connection.
106    pub async fn initialize(&self) -> SentinelResult<()> {
107        let timeout = Duration::from_millis(self.config.timeout_ms);
108
109        debug!(
110            agent_id = %self.config.id,
111            transport = ?self.config.transport,
112            timeout_ms = self.config.timeout_ms,
113            "Initializing agent connection"
114        );
115
116        let start = Instant::now();
117
118        match &self.config.transport {
119            AgentTransport::UnixSocket { path } => {
120                trace!(
121                    agent_id = %self.config.id,
122                    socket_path = %path.display(),
123                    "Connecting to agent via Unix socket"
124                );
125
126                let client = AgentClient::unix_socket(&self.config.id, path, timeout)
127                    .await
128                    .map_err(|e| {
129                        error!(
130                            agent_id = %self.config.id,
131                            socket_path = %path.display(),
132                            error = %e,
133                            "Failed to connect to agent via Unix socket"
134                        );
135                        SentinelError::Agent {
136                            agent: self.config.id.clone(),
137                            message: format!("Failed to connect via Unix socket: {}", e),
138                            event: "initialize".to_string(),
139                            source: None,
140                        }
141                    })?;
142
143                *self.client.write().await = Some(client);
144
145                info!(
146                    agent_id = %self.config.id,
147                    socket_path = %path.display(),
148                    connect_time_ms = start.elapsed().as_millis(),
149                    "Agent connected via Unix socket"
150                );
151
152                // Send Configure event if config is present
153                self.send_configure_event().await?;
154
155                Ok(())
156            }
157            AgentTransport::Grpc { address, tls } => {
158                trace!(
159                    agent_id = %self.config.id,
160                    address = %address,
161                    tls_enabled = tls.is_some(),
162                    "Connecting to agent via gRPC"
163                );
164
165                let client = match tls {
166                    Some(tls_config) => {
167                        // Build TLS configuration
168                        let mut grpc_tls = GrpcTlsConfig::new();
169
170                        // Load CA certificate if provided
171                        if let Some(ca_path) = &tls_config.ca_cert {
172                            grpc_tls = grpc_tls.with_ca_cert_file(ca_path).await.map_err(|e| {
173                                error!(
174                                    agent_id = %self.config.id,
175                                    ca_path = %ca_path.display(),
176                                    error = %e,
177                                    "Failed to load CA certificate for gRPC TLS"
178                                );
179                                SentinelError::Agent {
180                                    agent: self.config.id.clone(),
181                                    message: format!("Failed to load CA certificate: {}", e),
182                                    event: "initialize".to_string(),
183                                    source: None,
184                                }
185                            })?;
186                        }
187
188                        // Load client certificate and key for mTLS if provided
189                        if let (Some(cert_path), Some(key_path)) =
190                            (&tls_config.client_cert, &tls_config.client_key)
191                        {
192                            grpc_tls = grpc_tls
193                                .with_client_cert_files(cert_path, key_path)
194                                .await
195                                .map_err(|e| {
196                                error!(
197                                    agent_id = %self.config.id,
198                                    cert_path = %cert_path.display(),
199                                    key_path = %key_path.display(),
200                                    error = %e,
201                                    "Failed to load client certificate for gRPC mTLS"
202                                );
203                                SentinelError::Agent {
204                                    agent: self.config.id.clone(),
205                                    message: format!("Failed to load client certificate: {}", e),
206                                    event: "initialize".to_string(),
207                                    source: None,
208                                }
209                            })?;
210                        }
211
212                        // Handle insecure skip verify
213                        if tls_config.insecure_skip_verify {
214                            warn!(
215                                agent_id = %self.config.id,
216                                address = %address,
217                                "SECURITY WARNING: TLS certificate verification disabled for agent"
218                            );
219                            grpc_tls = grpc_tls.with_insecure_skip_verify();
220                        }
221
222                        debug!(
223                            agent_id = %self.config.id,
224                            address = %address,
225                            has_ca_cert = tls_config.ca_cert.is_some(),
226                            has_client_cert = tls_config.client_cert.is_some(),
227                            "Connecting to agent via gRPC with TLS"
228                        );
229
230                        AgentClient::grpc_tls(&self.config.id, address, timeout, grpc_tls)
231                            .await
232                            .map_err(|e| {
233                                error!(
234                                    agent_id = %self.config.id,
235                                    address = %address,
236                                    error = %e,
237                                    "Failed to connect to agent via gRPC with TLS"
238                                );
239                                SentinelError::Agent {
240                                    agent: self.config.id.clone(),
241                                    message: format!("Failed to connect via gRPC TLS: {}", e),
242                                    event: "initialize".to_string(),
243                                    source: None,
244                                }
245                            })?
246                    }
247                    None => {
248                        // Plain gRPC without TLS
249                        AgentClient::grpc(&self.config.id, address, timeout)
250                            .await
251                            .map_err(|e| {
252                                error!(
253                                    agent_id = %self.config.id,
254                                    address = %address,
255                                    error = %e,
256                                    "Failed to connect to agent via gRPC"
257                                );
258                                SentinelError::Agent {
259                                    agent: self.config.id.clone(),
260                                    message: format!("Failed to connect via gRPC: {}", e),
261                                    event: "initialize".to_string(),
262                                    source: None,
263                                }
264                            })?
265                    }
266                };
267
268                *self.client.write().await = Some(client);
269
270                info!(
271                    agent_id = %self.config.id,
272                    address = %address,
273                    tls_enabled = tls.is_some(),
274                    connect_time_ms = start.elapsed().as_millis(),
275                    "Agent connected via gRPC"
276                );
277
278                // Send Configure event if config is present
279                self.send_configure_event().await?;
280
281                Ok(())
282            }
283            AgentTransport::Http { url, tls } => {
284                trace!(
285                    agent_id = %self.config.id,
286                    url = %url,
287                    tls_enabled = tls.is_some(),
288                    "Connecting to agent via HTTP"
289                );
290
291                let client = match tls {
292                    Some(tls_config) => {
293                        // Build TLS configuration
294                        let mut http_tls = HttpTlsConfig::new();
295
296                        // Load CA certificate if provided
297                        if let Some(ca_path) = &tls_config.ca_cert {
298                            http_tls = http_tls.with_ca_cert_file(ca_path).await.map_err(|e| {
299                                error!(
300                                    agent_id = %self.config.id,
301                                    ca_path = %ca_path.display(),
302                                    error = %e,
303                                    "Failed to load CA certificate for HTTP TLS"
304                                );
305                                SentinelError::Agent {
306                                    agent: self.config.id.clone(),
307                                    message: format!("Failed to load CA certificate: {}", e),
308                                    event: "initialize".to_string(),
309                                    source: None,
310                                }
311                            })?;
312                        }
313
314                        // Load client certificate and key for mTLS if provided
315                        if let (Some(cert_path), Some(key_path)) =
316                            (&tls_config.client_cert, &tls_config.client_key)
317                        {
318                            http_tls = http_tls
319                                .with_client_cert_files(cert_path, key_path)
320                                .await
321                                .map_err(|e| {
322                                error!(
323                                    agent_id = %self.config.id,
324                                    cert_path = %cert_path.display(),
325                                    key_path = %key_path.display(),
326                                    error = %e,
327                                    "Failed to load client certificate for HTTP mTLS"
328                                );
329                                SentinelError::Agent {
330                                    agent: self.config.id.clone(),
331                                    message: format!("Failed to load client certificate: {}", e),
332                                    event: "initialize".to_string(),
333                                    source: None,
334                                }
335                            })?;
336                        }
337
338                        // Handle insecure skip verify
339                        if tls_config.insecure_skip_verify {
340                            warn!(
341                                agent_id = %self.config.id,
342                                url = %url,
343                                "SECURITY WARNING: TLS certificate verification disabled for HTTP agent"
344                            );
345                            http_tls = http_tls.with_insecure_skip_verify();
346                        }
347
348                        debug!(
349                            agent_id = %self.config.id,
350                            url = %url,
351                            has_ca_cert = tls_config.ca_cert.is_some(),
352                            has_client_cert = tls_config.client_cert.is_some(),
353                            "Connecting to agent via HTTP with TLS"
354                        );
355
356                        AgentClient::http_tls(&self.config.id, url, timeout, http_tls)
357                            .await
358                            .map_err(|e| {
359                                error!(
360                                    agent_id = %self.config.id,
361                                    url = %url,
362                                    error = %e,
363                                    "Failed to create HTTP TLS agent client"
364                                );
365                                SentinelError::Agent {
366                                    agent: self.config.id.clone(),
367                                    message: format!("Failed to create HTTP TLS client: {}", e),
368                                    event: "initialize".to_string(),
369                                    source: None,
370                                }
371                            })?
372                    }
373                    None => {
374                        // Plain HTTP without TLS
375                        AgentClient::http(&self.config.id, url, timeout)
376                            .await
377                            .map_err(|e| {
378                                error!(
379                                    agent_id = %self.config.id,
380                                    url = %url,
381                                    error = %e,
382                                    "Failed to create HTTP agent client"
383                                );
384                                SentinelError::Agent {
385                                    agent: self.config.id.clone(),
386                                    message: format!("Failed to create HTTP client: {}", e),
387                                    event: "initialize".to_string(),
388                                    source: None,
389                                }
390                            })?
391                    }
392                };
393
394                *self.client.write().await = Some(client);
395
396                info!(
397                    agent_id = %self.config.id,
398                    url = %url,
399                    tls_enabled = tls.is_some(),
400                    connect_time_ms = start.elapsed().as_millis(),
401                    "Agent connected via HTTP"
402                );
403
404                // Send Configure event if config is present
405                self.send_configure_event().await?;
406
407                Ok(())
408            }
409        }
410    }
411
412    /// Create a new client connection (for pooling).
413    ///
414    /// This creates a new AgentClient without storing it in `self.client`.
415    /// Use this when you need a new connection for the pool.
416    async fn create_client(&self) -> SentinelResult<AgentClient> {
417        let timeout = Duration::from_millis(self.config.timeout_ms);
418
419        trace!(
420            agent_id = %self.config.id,
421            transport = ?self.config.transport,
422            "Creating new client connection for pool"
423        );
424
425        match &self.config.transport {
426            AgentTransport::UnixSocket { path } => {
427                AgentClient::unix_socket(&self.config.id, path, timeout)
428                    .await
429                    .map_err(|e| {
430                        error!(
431                            agent_id = %self.config.id,
432                            socket_path = %path.display(),
433                            error = %e,
434                            "Failed to create Unix socket client"
435                        );
436                        SentinelError::Agent {
437                            agent: self.config.id.clone(),
438                            message: format!("Failed to connect via Unix socket: {}", e),
439                            event: "create_client".to_string(),
440                            source: None,
441                        }
442                    })
443            }
444            AgentTransport::Grpc { address, tls } => match tls {
445                Some(tls_config) => {
446                    let mut grpc_tls = GrpcTlsConfig::new();
447
448                    if let Some(ca_path) = &tls_config.ca_cert {
449                        grpc_tls = grpc_tls.with_ca_cert_file(ca_path).await.map_err(|e| {
450                            SentinelError::Agent {
451                                agent: self.config.id.clone(),
452                                message: format!("Failed to load CA certificate: {}", e),
453                                event: "create_client".to_string(),
454                                source: None,
455                            }
456                        })?;
457                    }
458
459                    if let (Some(cert_path), Some(key_path)) =
460                        (&tls_config.client_cert, &tls_config.client_key)
461                    {
462                        grpc_tls = grpc_tls
463                            .with_client_cert_files(cert_path, key_path)
464                            .await
465                            .map_err(|e| SentinelError::Agent {
466                                agent: self.config.id.clone(),
467                                message: format!("Failed to load client certificate: {}", e),
468                                event: "create_client".to_string(),
469                                source: None,
470                            })?;
471                    }
472
473                    if tls_config.insecure_skip_verify {
474                        grpc_tls = grpc_tls.with_insecure_skip_verify();
475                    }
476
477                    AgentClient::grpc_tls(&self.config.id, address, timeout, grpc_tls)
478                        .await
479                        .map_err(|e| SentinelError::Agent {
480                            agent: self.config.id.clone(),
481                            message: format!("Failed to connect via gRPC TLS: {}", e),
482                            event: "create_client".to_string(),
483                            source: None,
484                        })
485                }
486                None => AgentClient::grpc(&self.config.id, address, timeout)
487                    .await
488                    .map_err(|e| SentinelError::Agent {
489                        agent: self.config.id.clone(),
490                        message: format!("Failed to connect via gRPC: {}", e),
491                        event: "create_client".to_string(),
492                        source: None,
493                    }),
494            },
495            AgentTransport::Http { url, tls } => match tls {
496                Some(tls_config) => {
497                    let mut http_tls = HttpTlsConfig::new();
498
499                    if let Some(ca_path) = &tls_config.ca_cert {
500                        http_tls = http_tls.with_ca_cert_file(ca_path).await.map_err(|e| {
501                            SentinelError::Agent {
502                                agent: self.config.id.clone(),
503                                message: format!("Failed to load CA certificate: {}", e),
504                                event: "create_client".to_string(),
505                                source: None,
506                            }
507                        })?;
508                    }
509
510                    if let (Some(cert_path), Some(key_path)) =
511                        (&tls_config.client_cert, &tls_config.client_key)
512                    {
513                        http_tls = http_tls
514                            .with_client_cert_files(cert_path, key_path)
515                            .await
516                            .map_err(|e| SentinelError::Agent {
517                                agent: self.config.id.clone(),
518                                message: format!("Failed to load client certificate: {}", e),
519                                event: "create_client".to_string(),
520                                source: None,
521                            })?;
522                    }
523
524                    if tls_config.insecure_skip_verify {
525                        http_tls = http_tls.with_insecure_skip_verify();
526                    }
527
528                    AgentClient::http_tls(&self.config.id, url, timeout, http_tls)
529                        .await
530                        .map_err(|e| SentinelError::Agent {
531                            agent: self.config.id.clone(),
532                            message: format!("Failed to create HTTP TLS client: {}", e),
533                            event: "create_client".to_string(),
534                            source: None,
535                        })
536                }
537                None => AgentClient::http(&self.config.id, url, timeout)
538                    .await
539                    .map_err(|e| SentinelError::Agent {
540                        agent: self.config.id.clone(),
541                        message: format!("Failed to create HTTP client: {}", e),
542                        event: "create_client".to_string(),
543                        source: None,
544                    }),
545            },
546        }
547    }
548
549    /// Send Configure event to agent if config is present.
550    async fn send_configure_event(&self) -> SentinelResult<()> {
551        // Only send Configure if agent has config
552        let config = match &self.config.config {
553            Some(c) => c.clone(),
554            None => {
555                trace!(
556                    agent_id = %self.config.id,
557                    "No config for agent, skipping Configure event"
558                );
559                return Ok(());
560            }
561        };
562
563        let event = ConfigureEvent {
564            agent_id: self.config.id.clone(),
565            config,
566        };
567
568        debug!(
569            agent_id = %self.config.id,
570            "Sending Configure event to agent"
571        );
572
573        let mut client_guard = self.client.write().await;
574        let client = client_guard.as_mut().ok_or_else(|| SentinelError::Agent {
575            agent: self.config.id.clone(),
576            message: "No client connection for Configure event".to_string(),
577            event: "configure".to_string(),
578            source: None,
579        })?;
580
581        let response = client
582            .send_event(EventType::Configure, &event)
583            .await
584            .map_err(|e| {
585                error!(
586                    agent_id = %self.config.id,
587                    error = %e,
588                    "Failed to send Configure event"
589                );
590                SentinelError::Agent {
591                    agent: self.config.id.clone(),
592                    message: format!("Configure event failed: {}", e),
593                    event: "configure".to_string(),
594                    source: None,
595                }
596            })?;
597
598        // Check if agent accepted the configuration
599        if !matches!(response.decision, Decision::Allow) {
600            error!(
601                agent_id = %self.config.id,
602                decision = ?response.decision,
603                "Agent rejected configuration"
604            );
605            return Err(SentinelError::Agent {
606                agent: self.config.id.clone(),
607                message: "Agent rejected configuration".to_string(),
608                event: "configure".to_string(),
609                source: None,
610            });
611        }
612
613        info!(
614            agent_id = %self.config.id,
615            "Agent accepted configuration"
616        );
617
618        Ok(())
619    }
620
621    /// Call agent with event.
622    ///
623    /// Uses connection pooling for concurrent request handling. Multiple requests
624    /// can execute simultaneously using different connections from the pool.
625    pub async fn call_event<T: serde::Serialize>(
626        &self,
627        event_type: EventType,
628        event: &T,
629    ) -> SentinelResult<AgentResponse> {
630        trace!(
631            agent_id = %self.config.id,
632            event_type = ?event_type,
633            "Preparing to call agent"
634        );
635
636        // Try to get a connection from the pool (fast path)
637        let mut pooled_conn = self.pool.try_get();
638
639        // If no pooled connection, try to create a new one
640        if pooled_conn.is_none() {
641            if self.pool.can_create() {
642                trace!(
643                    agent_id = %self.config.id,
644                    "No pooled connection available, creating new connection"
645                );
646                match self.create_client().await {
647                    Ok(client) => {
648                        self.pool.register_created();
649                        pooled_conn = Some(super::pool::PooledConnection::new(client));
650                    }
651                    Err(e) => {
652                        error!(
653                            agent_id = %self.config.id,
654                            error = %e,
655                            "Failed to create new connection"
656                        );
657                        return Err(e);
658                    }
659                }
660            } else {
661                // Pool is at capacity, fall back to single client with lock
662                // This ensures we don't create unbounded connections
663                trace!(
664                    agent_id = %self.config.id,
665                    "Pool at capacity, using fallback client"
666                );
667                return self.call_event_fallback(event_type, event).await;
668            }
669        }
670
671        let mut conn = pooled_conn.expect("Connection should be available");
672
673        // Make the call
674        let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
675
676        trace!(
677            agent_id = %self.config.id,
678            event_type = ?event_type,
679            call_num = call_num,
680            pool_active = self.pool.active_count(),
681            "Sending event to agent via pooled connection"
682        );
683
684        let result = conn.client.send_event(event_type, event).await;
685
686        // Handle result
687        match result {
688            Ok(response) => {
689                // Return connection to pool on success
690                self.pool.return_connection(conn);
691                Ok(response)
692            }
693            Err(e) => {
694                let error_str = e.to_string();
695                let is_connection_error = error_str.contains("Broken pipe")
696                    || error_str.contains("Connection reset")
697                    || error_str.contains("Connection refused")
698                    || error_str.contains("not connected")
699                    || error_str.contains("transport error");
700
701                error!(
702                    agent_id = %self.config.id,
703                    event_type = ?event_type,
704                    error = %e,
705                    is_connection_error = is_connection_error,
706                    "Agent call failed"
707                );
708
709                // Don't return connection to pool on error - mark it as failed
710                self.pool.mark_failed();
711                // Connection will be dropped here
712
713                Err(SentinelError::Agent {
714                    agent: self.config.id.clone(),
715                    message: e.to_string(),
716                    event: format!("{:?}", event_type),
717                    source: None,
718                })
719            }
720        }
721    }
722
723    /// Fallback call method using the single cached client (for when pool is exhausted).
724    async fn call_event_fallback<T: serde::Serialize>(
725        &self,
726        event_type: EventType,
727        event: &T,
728    ) -> SentinelResult<AgentResponse> {
729        // Get or create connection using the fallback single client
730        let mut client_guard = self.client.write().await;
731
732        if client_guard.is_none() {
733            trace!(
734                agent_id = %self.config.id,
735                "No existing fallback connection, initializing"
736            );
737            drop(client_guard);
738            self.initialize().await?;
739            client_guard = self.client.write().await;
740        }
741
742        let client = client_guard.as_mut().ok_or_else(|| {
743            error!(
744                agent_id = %self.config.id,
745                event_type = ?event_type,
746                "No client connection available after initialization"
747            );
748            SentinelError::Agent {
749                agent: self.config.id.clone(),
750                message: "No client connection".to_string(),
751                event: format!("{:?}", event_type),
752                source: None,
753            }
754        })?;
755
756        // Make the call
757        let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
758
759        trace!(
760            agent_id = %self.config.id,
761            event_type = ?event_type,
762            call_num = call_num,
763            "Sending event to agent via fallback client"
764        );
765
766        let result = client.send_event(event_type, event).await;
767
768        match result {
769            Ok(response) => Ok(response),
770            Err(e) => {
771                let error_str = e.to_string();
772                let is_connection_error = error_str.contains("Broken pipe")
773                    || error_str.contains("Connection reset")
774                    || error_str.contains("Connection refused")
775                    || error_str.contains("not connected")
776                    || error_str.contains("transport error");
777
778                error!(
779                    agent_id = %self.config.id,
780                    event_type = ?event_type,
781                    error = %e,
782                    is_connection_error = is_connection_error,
783                    "Agent call failed (fallback)"
784                );
785
786                drop(client_guard);
787
788                if is_connection_error {
789                    warn!(
790                        agent_id = %self.config.id,
791                        "Clearing cached fallback client due to connection error"
792                    );
793                    *self.client.write().await = None;
794                }
795
796                Err(SentinelError::Agent {
797                    agent: self.config.id.clone(),
798                    message: e.to_string(),
799                    event: format!("{:?}", event_type),
800                    source: None,
801                })
802            }
803        }
804    }
805
806    /// Record successful call (lock-free).
807    pub async fn record_success(&self, duration: Duration) {
808        let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
809        self.metrics
810            .duration_total_us
811            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
812        self.consecutive_failures.store(0, Ordering::Relaxed);
813        // Store timestamp as nanoseconds since base_instant (lock-free)
814        self.last_success_ns.store(
815            self.base_instant.elapsed().as_nanos() as u64,
816            Ordering::Relaxed,
817        );
818
819        trace!(
820            agent_id = %self.config.id,
821            duration_ms = duration.as_millis(),
822            total_successes = success_count,
823            "Recorded agent call success"
824        );
825
826        self.circuit_breaker.record_success(); // Lock-free
827    }
828
829    /// Get the time since last successful call (for monitoring).
830    /// Returns None if no successful call has been recorded.
831    #[inline]
832    pub fn time_since_last_success(&self) -> Option<Duration> {
833        let last_ns = self.last_success_ns.load(Ordering::Relaxed);
834        if last_ns == NO_TIMESTAMP {
835            return None;
836        }
837        let current_ns = self.base_instant.elapsed().as_nanos() as u64;
838        Some(Duration::from_nanos(current_ns.saturating_sub(last_ns)))
839    }
840
841    /// Record failed call.
842    pub async fn record_failure(&self) {
843        let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
844        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
845
846        debug!(
847            agent_id = %self.config.id,
848            total_failures = fail_count,
849            consecutive_failures = consecutive,
850            "Recorded agent call failure"
851        );
852
853        self.circuit_breaker.record_failure(); // Lock-free
854    }
855
856    /// Record timeout.
857    pub async fn record_timeout(&self) {
858        let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
859        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
860
861        debug!(
862            agent_id = %self.config.id,
863            total_timeouts = timeout_count,
864            consecutive_failures = consecutive,
865            timeout_ms = self.config.timeout_ms,
866            "Recorded agent call timeout"
867        );
868
869        self.circuit_breaker.record_failure(); // Lock-free
870    }
871
872    /// Shutdown agent.
873    pub async fn shutdown(&self) {
874        debug!(
875            agent_id = %self.config.id,
876            "Shutting down agent"
877        );
878
879        if let Some(client) = self.client.write().await.take() {
880            trace!(
881                agent_id = %self.config.id,
882                "Closing agent client connection"
883            );
884            let _ = client.close().await;
885        }
886
887        let stats = (
888            self.metrics.calls_total.load(Ordering::Relaxed),
889            self.metrics.calls_success.load(Ordering::Relaxed),
890            self.metrics.calls_failed.load(Ordering::Relaxed),
891            self.metrics.calls_timeout.load(Ordering::Relaxed),
892        );
893
894        info!(
895            agent_id = %self.config.id,
896            total_calls = stats.0,
897            successes = stats.1,
898            failures = stats.2,
899            timeouts = stats.3,
900            "Agent shutdown complete"
901        );
902    }
903}