sentinel_proxy/agents/
agent.rs

1//! Individual agent implementation.
2
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, ConfigureEvent, Decision, EventType, GrpcTlsConfig, HttpTlsConfig};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::{debug, error, info, trace, warn};
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16/// Individual agent configuration and state.
17pub struct Agent {
18    /// Agent configuration
19    pub(super) config: AgentConfig,
20    /// Agent client
21    pub(super) client: Arc<RwLock<Option<AgentClient>>>,
22    /// Connection pool
23    pub(super) pool: Arc<AgentConnectionPool>,
24    /// Circuit breaker
25    pub(super) circuit_breaker: Arc<CircuitBreaker>,
26    /// Agent-specific metrics
27    pub(super) metrics: Arc<AgentMetrics>,
28    /// Last successful call
29    pub(super) last_success: Arc<RwLock<Option<Instant>>>,
30    /// Consecutive failures
31    pub(super) consecutive_failures: AtomicU32,
32}
33
34impl Agent {
35    /// Create a new agent.
36    pub fn new(
37        config: AgentConfig,
38        pool: Arc<AgentConnectionPool>,
39        circuit_breaker: Arc<CircuitBreaker>,
40    ) -> Self {
41        trace!(
42            agent_id = %config.id,
43            agent_type = ?config.agent_type,
44            timeout_ms = config.timeout_ms,
45            events = ?config.events,
46            "Creating agent instance"
47        );
48        Self {
49            config,
50            client: Arc::new(RwLock::new(None)),
51            pool,
52            circuit_breaker,
53            metrics: Arc::new(AgentMetrics::default()),
54            last_success: Arc::new(RwLock::new(None)),
55            consecutive_failures: AtomicU32::new(0),
56        }
57    }
58
59    /// Get the agent ID.
60    pub fn id(&self) -> &str {
61        &self.config.id
62    }
63
64    /// Get the agent's circuit breaker.
65    pub fn circuit_breaker(&self) -> &CircuitBreaker {
66        &self.circuit_breaker
67    }
68
69    /// Get the agent's failure mode.
70    pub fn failure_mode(&self) -> sentinel_config::FailureMode {
71        self.config.failure_mode
72    }
73
74    /// Get the agent's timeout in milliseconds.
75    pub fn timeout_ms(&self) -> u64 {
76        self.config.timeout_ms
77    }
78
79    /// Get the agent's metrics.
80    pub fn metrics(&self) -> &AgentMetrics {
81        &self.metrics
82    }
83
84    /// Check if agent handles a specific event type.
85    pub fn handles_event(&self, event_type: EventType) -> bool {
86        self.config.events.iter().any(|e| match (e, event_type) {
87            (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
88            (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
89            (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
90            (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
91            (AgentEvent::Log, EventType::RequestComplete) => true,
92            (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
93            _ => false,
94        })
95    }
96
97    /// Initialize agent connection.
98    pub async fn initialize(&self) -> SentinelResult<()> {
99        let timeout = Duration::from_millis(self.config.timeout_ms);
100
101        debug!(
102            agent_id = %self.config.id,
103            transport = ?self.config.transport,
104            timeout_ms = self.config.timeout_ms,
105            "Initializing agent connection"
106        );
107
108        let start = Instant::now();
109
110        match &self.config.transport {
111            AgentTransport::UnixSocket { path } => {
112                trace!(
113                    agent_id = %self.config.id,
114                    socket_path = %path.display(),
115                    "Connecting to agent via Unix socket"
116                );
117
118                let client = AgentClient::unix_socket(&self.config.id, path, timeout)
119                    .await
120                    .map_err(|e| {
121                        error!(
122                            agent_id = %self.config.id,
123                            socket_path = %path.display(),
124                            error = %e,
125                            "Failed to connect to agent via Unix socket"
126                        );
127                        SentinelError::Agent {
128                            agent: self.config.id.clone(),
129                            message: format!("Failed to connect via Unix socket: {}", e),
130                            event: "initialize".to_string(),
131                            source: None,
132                        }
133                    })?;
134
135                *self.client.write().await = Some(client);
136
137                info!(
138                    agent_id = %self.config.id,
139                    socket_path = %path.display(),
140                    connect_time_ms = start.elapsed().as_millis(),
141                    "Agent connected via Unix socket"
142                );
143
144                // Send Configure event if config is present
145                self.send_configure_event().await?;
146
147                Ok(())
148            }
149            AgentTransport::Grpc { address, tls } => {
150                trace!(
151                    agent_id = %self.config.id,
152                    address = %address,
153                    tls_enabled = tls.is_some(),
154                    "Connecting to agent via gRPC"
155                );
156
157                let client = match tls {
158                    Some(tls_config) => {
159                        // Build TLS configuration
160                        let mut grpc_tls = GrpcTlsConfig::new();
161
162                        // Load CA certificate if provided
163                        if let Some(ca_path) = &tls_config.ca_cert {
164                            grpc_tls = grpc_tls.with_ca_cert_file(ca_path).await.map_err(|e| {
165                                error!(
166                                    agent_id = %self.config.id,
167                                    ca_path = %ca_path.display(),
168                                    error = %e,
169                                    "Failed to load CA certificate for gRPC TLS"
170                                );
171                                SentinelError::Agent {
172                                    agent: self.config.id.clone(),
173                                    message: format!("Failed to load CA certificate: {}", e),
174                                    event: "initialize".to_string(),
175                                    source: None,
176                                }
177                            })?;
178                        }
179
180                        // Load client certificate and key for mTLS if provided
181                        if let (Some(cert_path), Some(key_path)) = (&tls_config.client_cert, &tls_config.client_key) {
182                            grpc_tls = grpc_tls.with_client_cert_files(cert_path, key_path).await.map_err(|e| {
183                                error!(
184                                    agent_id = %self.config.id,
185                                    cert_path = %cert_path.display(),
186                                    key_path = %key_path.display(),
187                                    error = %e,
188                                    "Failed to load client certificate for gRPC mTLS"
189                                );
190                                SentinelError::Agent {
191                                    agent: self.config.id.clone(),
192                                    message: format!("Failed to load client certificate: {}", e),
193                                    event: "initialize".to_string(),
194                                    source: None,
195                                }
196                            })?;
197                        }
198
199                        // Handle insecure skip verify
200                        if tls_config.insecure_skip_verify {
201                            warn!(
202                                agent_id = %self.config.id,
203                                address = %address,
204                                "SECURITY WARNING: TLS certificate verification disabled for agent"
205                            );
206                            grpc_tls = grpc_tls.with_insecure_skip_verify();
207                        }
208
209                        debug!(
210                            agent_id = %self.config.id,
211                            address = %address,
212                            has_ca_cert = tls_config.ca_cert.is_some(),
213                            has_client_cert = tls_config.client_cert.is_some(),
214                            "Connecting to agent via gRPC with TLS"
215                        );
216
217                        AgentClient::grpc_tls(&self.config.id, address, timeout, grpc_tls)
218                            .await
219                            .map_err(|e| {
220                                error!(
221                                    agent_id = %self.config.id,
222                                    address = %address,
223                                    error = %e,
224                                    "Failed to connect to agent via gRPC with TLS"
225                                );
226                                SentinelError::Agent {
227                                    agent: self.config.id.clone(),
228                                    message: format!("Failed to connect via gRPC TLS: {}", e),
229                                    event: "initialize".to_string(),
230                                    source: None,
231                                }
232                            })?
233                    }
234                    None => {
235                        // Plain gRPC without TLS
236                        AgentClient::grpc(&self.config.id, address, timeout)
237                            .await
238                            .map_err(|e| {
239                                error!(
240                                    agent_id = %self.config.id,
241                                    address = %address,
242                                    error = %e,
243                                    "Failed to connect to agent via gRPC"
244                                );
245                                SentinelError::Agent {
246                                    agent: self.config.id.clone(),
247                                    message: format!("Failed to connect via gRPC: {}", e),
248                                    event: "initialize".to_string(),
249                                    source: None,
250                                }
251                            })?
252                    }
253                };
254
255                *self.client.write().await = Some(client);
256
257                info!(
258                    agent_id = %self.config.id,
259                    address = %address,
260                    tls_enabled = tls.is_some(),
261                    connect_time_ms = start.elapsed().as_millis(),
262                    "Agent connected via gRPC"
263                );
264
265                // Send Configure event if config is present
266                self.send_configure_event().await?;
267
268                Ok(())
269            }
270            AgentTransport::Http { url, tls } => {
271                trace!(
272                    agent_id = %self.config.id,
273                    url = %url,
274                    tls_enabled = tls.is_some(),
275                    "Connecting to agent via HTTP"
276                );
277
278                let client = match tls {
279                    Some(tls_config) => {
280                        // Build TLS configuration
281                        let mut http_tls = HttpTlsConfig::new();
282
283                        // Load CA certificate if provided
284                        if let Some(ca_path) = &tls_config.ca_cert {
285                            http_tls = http_tls.with_ca_cert_file(ca_path).await.map_err(|e| {
286                                error!(
287                                    agent_id = %self.config.id,
288                                    ca_path = %ca_path.display(),
289                                    error = %e,
290                                    "Failed to load CA certificate for HTTP TLS"
291                                );
292                                SentinelError::Agent {
293                                    agent: self.config.id.clone(),
294                                    message: format!("Failed to load CA certificate: {}", e),
295                                    event: "initialize".to_string(),
296                                    source: None,
297                                }
298                            })?;
299                        }
300
301                        // Load client certificate and key for mTLS if provided
302                        if let (Some(cert_path), Some(key_path)) = (&tls_config.client_cert, &tls_config.client_key) {
303                            http_tls = http_tls.with_client_cert_files(cert_path, key_path).await.map_err(|e| {
304                                error!(
305                                    agent_id = %self.config.id,
306                                    cert_path = %cert_path.display(),
307                                    key_path = %key_path.display(),
308                                    error = %e,
309                                    "Failed to load client certificate for HTTP mTLS"
310                                );
311                                SentinelError::Agent {
312                                    agent: self.config.id.clone(),
313                                    message: format!("Failed to load client certificate: {}", e),
314                                    event: "initialize".to_string(),
315                                    source: None,
316                                }
317                            })?;
318                        }
319
320                        // Handle insecure skip verify
321                        if tls_config.insecure_skip_verify {
322                            warn!(
323                                agent_id = %self.config.id,
324                                url = %url,
325                                "SECURITY WARNING: TLS certificate verification disabled for HTTP agent"
326                            );
327                            http_tls = http_tls.with_insecure_skip_verify();
328                        }
329
330                        debug!(
331                            agent_id = %self.config.id,
332                            url = %url,
333                            has_ca_cert = tls_config.ca_cert.is_some(),
334                            has_client_cert = tls_config.client_cert.is_some(),
335                            "Connecting to agent via HTTP with TLS"
336                        );
337
338                        AgentClient::http_tls(&self.config.id, url, timeout, http_tls)
339                            .await
340                            .map_err(|e| {
341                                error!(
342                                    agent_id = %self.config.id,
343                                    url = %url,
344                                    error = %e,
345                                    "Failed to create HTTP TLS agent client"
346                                );
347                                SentinelError::Agent {
348                                    agent: self.config.id.clone(),
349                                    message: format!("Failed to create HTTP TLS client: {}", e),
350                                    event: "initialize".to_string(),
351                                    source: None,
352                                }
353                            })?
354                    }
355                    None => {
356                        // Plain HTTP without TLS
357                        AgentClient::http(&self.config.id, url, timeout)
358                            .await
359                            .map_err(|e| {
360                                error!(
361                                    agent_id = %self.config.id,
362                                    url = %url,
363                                    error = %e,
364                                    "Failed to create HTTP agent client"
365                                );
366                                SentinelError::Agent {
367                                    agent: self.config.id.clone(),
368                                    message: format!("Failed to create HTTP client: {}", e),
369                                    event: "initialize".to_string(),
370                                    source: None,
371                                }
372                            })?
373                    }
374                };
375
376                *self.client.write().await = Some(client);
377
378                info!(
379                    agent_id = %self.config.id,
380                    url = %url,
381                    tls_enabled = tls.is_some(),
382                    connect_time_ms = start.elapsed().as_millis(),
383                    "Agent connected via HTTP"
384                );
385
386                // Send Configure event if config is present
387                self.send_configure_event().await?;
388
389                Ok(())
390            }
391        }
392    }
393
394    /// Send Configure event to agent if config is present.
395    async fn send_configure_event(&self) -> SentinelResult<()> {
396        // Only send Configure if agent has config
397        let config = match &self.config.config {
398            Some(c) => c.clone(),
399            None => {
400                trace!(
401                    agent_id = %self.config.id,
402                    "No config for agent, skipping Configure event"
403                );
404                return Ok(());
405            }
406        };
407
408        let event = ConfigureEvent {
409            agent_id: self.config.id.clone(),
410            config,
411        };
412
413        debug!(
414            agent_id = %self.config.id,
415            "Sending Configure event to agent"
416        );
417
418        let mut client_guard = self.client.write().await;
419        let client = client_guard.as_mut().ok_or_else(|| SentinelError::Agent {
420            agent: self.config.id.clone(),
421            message: "No client connection for Configure event".to_string(),
422            event: "configure".to_string(),
423            source: None,
424        })?;
425
426        let response = client.send_event(EventType::Configure, &event).await.map_err(|e| {
427            error!(
428                agent_id = %self.config.id,
429                error = %e,
430                "Failed to send Configure event"
431            );
432            SentinelError::Agent {
433                agent: self.config.id.clone(),
434                message: format!("Configure event failed: {}", e),
435                event: "configure".to_string(),
436                source: None,
437            }
438        })?;
439
440        // Check if agent accepted the configuration
441        if !matches!(response.decision, Decision::Allow) {
442            error!(
443                agent_id = %self.config.id,
444                decision = ?response.decision,
445                "Agent rejected configuration"
446            );
447            return Err(SentinelError::Agent {
448                agent: self.config.id.clone(),
449                message: "Agent rejected configuration".to_string(),
450                event: "configure".to_string(),
451                source: None,
452            });
453        }
454
455        info!(
456            agent_id = %self.config.id,
457            "Agent accepted configuration"
458        );
459
460        Ok(())
461    }
462
463    /// Call agent with event.
464    pub async fn call_event<T: serde::Serialize>(
465        &self,
466        event_type: EventType,
467        event: &T,
468    ) -> SentinelResult<AgentResponse> {
469        trace!(
470            agent_id = %self.config.id,
471            event_type = ?event_type,
472            "Preparing to call agent"
473        );
474
475        // Get or create connection
476        let mut client_guard = self.client.write().await;
477
478        if client_guard.is_none() {
479            trace!(
480                agent_id = %self.config.id,
481                "No existing connection, initializing"
482            );
483            drop(client_guard);
484            self.initialize().await?;
485            client_guard = self.client.write().await;
486        }
487
488        let client = client_guard.as_mut().ok_or_else(|| {
489            error!(
490                agent_id = %self.config.id,
491                event_type = ?event_type,
492                "No client connection available after initialization"
493            );
494            SentinelError::Agent {
495                agent: self.config.id.clone(),
496                message: "No client connection".to_string(),
497                event: format!("{:?}", event_type),
498                source: None,
499            }
500        })?;
501
502        // Make the call
503        let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
504
505        trace!(
506            agent_id = %self.config.id,
507            event_type = ?event_type,
508            call_num = call_num,
509            "Sending event to agent"
510        );
511
512        let result = client.send_event(event_type, event).await;
513
514        // Handle result - clear stale connection on connection errors
515        match result {
516            Ok(response) => Ok(response),
517            Err(e) => {
518                let error_str = e.to_string();
519                let is_connection_error = error_str.contains("Broken pipe")
520                    || error_str.contains("Connection reset")
521                    || error_str.contains("Connection refused")
522                    || error_str.contains("not connected")
523                    || error_str.contains("transport error");
524
525                error!(
526                    agent_id = %self.config.id,
527                    event_type = ?event_type,
528                    error = %e,
529                    is_connection_error = is_connection_error,
530                    "Agent call failed"
531                );
532
533                // Drop the client guard to release the lock
534                drop(client_guard);
535
536                // Clear cached client on connection errors to force reconnect on next call
537                if is_connection_error {
538                    warn!(
539                        agent_id = %self.config.id,
540                        "Clearing cached client due to connection error, next call will reconnect"
541                    );
542                    *self.client.write().await = None;
543                }
544
545                Err(SentinelError::Agent {
546                    agent: self.config.id.clone(),
547                    message: e.to_string(),
548                    event: format!("{:?}", event_type),
549                    source: None,
550                })
551            }
552        }
553    }
554
555    /// Record successful call.
556    pub async fn record_success(&self, duration: Duration) {
557        let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
558        self.metrics
559            .duration_total_us
560            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
561        self.consecutive_failures.store(0, Ordering::Relaxed);
562        *self.last_success.write().await = Some(Instant::now());
563
564        trace!(
565            agent_id = %self.config.id,
566            duration_ms = duration.as_millis(),
567            total_successes = success_count,
568            "Recorded agent call success"
569        );
570
571        self.circuit_breaker.record_success().await;
572    }
573
574    /// Record failed call.
575    pub async fn record_failure(&self) {
576        let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
577        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
578
579        debug!(
580            agent_id = %self.config.id,
581            total_failures = fail_count,
582            consecutive_failures = consecutive,
583            "Recorded agent call failure"
584        );
585
586        self.circuit_breaker.record_failure().await;
587    }
588
589    /// Record timeout.
590    pub async fn record_timeout(&self) {
591        let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
592        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
593
594        debug!(
595            agent_id = %self.config.id,
596            total_timeouts = timeout_count,
597            consecutive_failures = consecutive,
598            timeout_ms = self.config.timeout_ms,
599            "Recorded agent call timeout"
600        );
601
602        self.circuit_breaker.record_failure().await;
603    }
604
605    /// Shutdown agent.
606    pub async fn shutdown(&self) {
607        debug!(
608            agent_id = %self.config.id,
609            "Shutting down agent"
610        );
611
612        if let Some(client) = self.client.write().await.take() {
613            trace!(
614                agent_id = %self.config.id,
615                "Closing agent client connection"
616            );
617            let _ = client.close().await;
618        }
619
620        let stats = (
621            self.metrics.calls_total.load(Ordering::Relaxed),
622            self.metrics.calls_success.load(Ordering::Relaxed),
623            self.metrics.calls_failed.load(Ordering::Relaxed),
624            self.metrics.calls_timeout.load(Ordering::Relaxed),
625        );
626
627        info!(
628            agent_id = %self.config.id,
629            total_calls = stats.0,
630            successes = stats.1,
631            failures = stats.2,
632            timeouts = stats.3,
633            "Agent shutdown complete"
634        );
635    }
636}