sentinel_proxy/agents/
agent.rs

1//! Individual agent implementation.
2
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, EventType};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::warn;
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16/// Individual agent configuration and state.
17pub struct Agent {
18    /// Agent configuration
19    pub(super) config: AgentConfig,
20    /// Agent client
21    pub(super) client: Arc<RwLock<Option<AgentClient>>>,
22    /// Connection pool
23    pub(super) pool: Arc<AgentConnectionPool>,
24    /// Circuit breaker
25    pub(super) circuit_breaker: Arc<CircuitBreaker>,
26    /// Agent-specific metrics
27    pub(super) metrics: Arc<AgentMetrics>,
28    /// Last successful call
29    pub(super) last_success: Arc<RwLock<Option<Instant>>>,
30    /// Consecutive failures
31    pub(super) consecutive_failures: AtomicU32,
32}
33
34impl Agent {
35    /// Create a new agent.
36    pub fn new(
37        config: AgentConfig,
38        pool: Arc<AgentConnectionPool>,
39        circuit_breaker: Arc<CircuitBreaker>,
40    ) -> Self {
41        Self {
42            config,
43            client: Arc::new(RwLock::new(None)),
44            pool,
45            circuit_breaker,
46            metrics: Arc::new(AgentMetrics::default()),
47            last_success: Arc::new(RwLock::new(None)),
48            consecutive_failures: AtomicU32::new(0),
49        }
50    }
51
52    /// Get the agent ID.
53    pub fn id(&self) -> &str {
54        &self.config.id
55    }
56
57    /// Get the agent's circuit breaker.
58    pub fn circuit_breaker(&self) -> &CircuitBreaker {
59        &self.circuit_breaker
60    }
61
62    /// Get the agent's failure mode.
63    pub fn failure_mode(&self) -> sentinel_config::FailureMode {
64        self.config.failure_mode.clone()
65    }
66
67    /// Get the agent's timeout in milliseconds.
68    pub fn timeout_ms(&self) -> u64 {
69        self.config.timeout_ms
70    }
71
72    /// Get the agent's metrics.
73    pub fn metrics(&self) -> &AgentMetrics {
74        &self.metrics
75    }
76
77    /// Check if agent handles a specific event type.
78    pub fn handles_event(&self, event_type: EventType) -> bool {
79        self.config.events.iter().any(|e| match (e, event_type) {
80            (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
81            (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
82            (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
83            (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
84            (AgentEvent::Log, EventType::RequestComplete) => true,
85            _ => false,
86        })
87    }
88
89    /// Initialize agent connection.
90    pub async fn initialize(&self) -> SentinelResult<()> {
91        match &self.config.transport {
92            AgentTransport::UnixSocket { path } => {
93                let client = AgentClient::unix_socket(
94                    &self.config.id,
95                    path,
96                    Duration::from_millis(self.config.timeout_ms),
97                )
98                .await
99                .map_err(|e| SentinelError::Agent {
100                    agent: self.config.id.clone(),
101                    message: format!("Failed to connect: {}", e),
102                    event: "initialize".to_string(),
103                    source: None,
104                })?;
105
106                *self.client.write().await = Some(client);
107                Ok(())
108            }
109            _ => {
110                warn!(
111                    "Unsupported agent transport: {:?}",
112                    self.config.transport
113                );
114                Ok(())
115            }
116        }
117    }
118
119    /// Call agent with event.
120    pub async fn call_event<T: serde::Serialize>(
121        &self,
122        event_type: EventType,
123        event: &T,
124    ) -> SentinelResult<AgentResponse> {
125        // Get or create connection
126        let mut client_guard = self.client.write().await;
127
128        if client_guard.is_none() {
129            drop(client_guard);
130            self.initialize().await?;
131            client_guard = self.client.write().await;
132        }
133
134        let client = client_guard.as_mut().ok_or_else(|| SentinelError::Agent {
135            agent: self.config.id.clone(),
136            message: "No client connection".to_string(),
137            event: format!("{:?}", event_type),
138            source: None,
139        })?;
140
141        // Make the call
142        self.metrics.calls_total.fetch_add(1, Ordering::Relaxed);
143
144        client.send_event(event_type, event).await.map_err(|e| {
145            SentinelError::Agent {
146                agent: self.config.id.clone(),
147                message: e.to_string(),
148                event: format!("{:?}", event_type),
149                source: None,
150            }
151        })
152    }
153
154    /// Record successful call.
155    pub async fn record_success(&self, duration: Duration) {
156        self.metrics.calls_success.fetch_add(1, Ordering::Relaxed);
157        self.metrics
158            .duration_total_us
159            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
160        self.consecutive_failures.store(0, Ordering::Relaxed);
161        *self.last_success.write().await = Some(Instant::now());
162
163        self.circuit_breaker.record_success().await;
164    }
165
166    /// Record failed call.
167    pub async fn record_failure(&self) {
168        self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed);
169        self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
170
171        self.circuit_breaker.record_failure().await;
172    }
173
174    /// Record timeout.
175    pub async fn record_timeout(&self) {
176        self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed);
177        self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
178
179        self.circuit_breaker.record_failure().await;
180    }
181
182    /// Shutdown agent.
183    pub async fn shutdown(&self) {
184        if let Some(client) = self.client.write().await.take() {
185            let _ = client.close().await;
186        }
187    }
188}