sentinel_proxy/agents/
agent.rs

1//! Individual agent implementation.
2
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, EventType};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::{debug, error, info, trace, warn};
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16/// Individual agent configuration and state.
17pub struct Agent {
18    /// Agent configuration
19    pub(super) config: AgentConfig,
20    /// Agent client
21    pub(super) client: Arc<RwLock<Option<AgentClient>>>,
22    /// Connection pool
23    pub(super) pool: Arc<AgentConnectionPool>,
24    /// Circuit breaker
25    pub(super) circuit_breaker: Arc<CircuitBreaker>,
26    /// Agent-specific metrics
27    pub(super) metrics: Arc<AgentMetrics>,
28    /// Last successful call
29    pub(super) last_success: Arc<RwLock<Option<Instant>>>,
30    /// Consecutive failures
31    pub(super) consecutive_failures: AtomicU32,
32}
33
34impl Agent {
35    /// Create a new agent.
36    pub fn new(
37        config: AgentConfig,
38        pool: Arc<AgentConnectionPool>,
39        circuit_breaker: Arc<CircuitBreaker>,
40    ) -> Self {
41        trace!(
42            agent_id = %config.id,
43            agent_type = ?config.agent_type,
44            timeout_ms = config.timeout_ms,
45            events = ?config.events,
46            "Creating agent instance"
47        );
48        Self {
49            config,
50            client: Arc::new(RwLock::new(None)),
51            pool,
52            circuit_breaker,
53            metrics: Arc::new(AgentMetrics::default()),
54            last_success: Arc::new(RwLock::new(None)),
55            consecutive_failures: AtomicU32::new(0),
56        }
57    }
58
59    /// Get the agent ID.
60    pub fn id(&self) -> &str {
61        &self.config.id
62    }
63
64    /// Get the agent's circuit breaker.
65    pub fn circuit_breaker(&self) -> &CircuitBreaker {
66        &self.circuit_breaker
67    }
68
69    /// Get the agent's failure mode.
70    pub fn failure_mode(&self) -> sentinel_config::FailureMode {
71        self.config.failure_mode
72    }
73
74    /// Get the agent's timeout in milliseconds.
75    pub fn timeout_ms(&self) -> u64 {
76        self.config.timeout_ms
77    }
78
79    /// Get the agent's metrics.
80    pub fn metrics(&self) -> &AgentMetrics {
81        &self.metrics
82    }
83
84    /// Check if agent handles a specific event type.
85    pub fn handles_event(&self, event_type: EventType) -> bool {
86        self.config.events.iter().any(|e| match (e, event_type) {
87            (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
88            (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
89            (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
90            (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
91            (AgentEvent::Log, EventType::RequestComplete) => true,
92            (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
93            _ => false,
94        })
95    }
96
97    /// Initialize agent connection.
98    pub async fn initialize(&self) -> SentinelResult<()> {
99        let timeout = Duration::from_millis(self.config.timeout_ms);
100
101        debug!(
102            agent_id = %self.config.id,
103            transport = ?self.config.transport,
104            timeout_ms = self.config.timeout_ms,
105            "Initializing agent connection"
106        );
107
108        let start = Instant::now();
109
110        match &self.config.transport {
111            AgentTransport::UnixSocket { path } => {
112                trace!(
113                    agent_id = %self.config.id,
114                    socket_path = %path.display(),
115                    "Connecting to agent via Unix socket"
116                );
117
118                let client = AgentClient::unix_socket(&self.config.id, path, timeout)
119                    .await
120                    .map_err(|e| {
121                        error!(
122                            agent_id = %self.config.id,
123                            socket_path = %path.display(),
124                            error = %e,
125                            "Failed to connect to agent via Unix socket"
126                        );
127                        SentinelError::Agent {
128                            agent: self.config.id.clone(),
129                            message: format!("Failed to connect via Unix socket: {}", e),
130                            event: "initialize".to_string(),
131                            source: None,
132                        }
133                    })?;
134
135                *self.client.write().await = Some(client);
136
137                info!(
138                    agent_id = %self.config.id,
139                    socket_path = %path.display(),
140                    connect_time_ms = start.elapsed().as_millis(),
141                    "Agent connected via Unix socket"
142                );
143                Ok(())
144            }
145            AgentTransport::Grpc { address, tls: _ } => {
146                trace!(
147                    agent_id = %self.config.id,
148                    address = %address,
149                    "Connecting to agent via gRPC"
150                );
151
152                // TODO: Add TLS support for gRPC connections
153                let client = AgentClient::grpc(&self.config.id, address, timeout)
154                    .await
155                    .map_err(|e| {
156                        error!(
157                            agent_id = %self.config.id,
158                            address = %address,
159                            error = %e,
160                            "Failed to connect to agent via gRPC"
161                        );
162                        SentinelError::Agent {
163                            agent: self.config.id.clone(),
164                            message: format!("Failed to connect via gRPC: {}", e),
165                            event: "initialize".to_string(),
166                            source: None,
167                        }
168                    })?;
169
170                *self.client.write().await = Some(client);
171
172                info!(
173                    agent_id = %self.config.id,
174                    address = %address,
175                    connect_time_ms = start.elapsed().as_millis(),
176                    "Agent connected via gRPC"
177                );
178                Ok(())
179            }
180            AgentTransport::Http { url, tls: _ } => {
181                warn!(
182                    agent_id = %self.config.id,
183                    url = %url,
184                    "HTTP transport not yet implemented, agent will not be available"
185                );
186                Ok(())
187            }
188        }
189    }
190
191    /// Call agent with event.
192    pub async fn call_event<T: serde::Serialize>(
193        &self,
194        event_type: EventType,
195        event: &T,
196    ) -> SentinelResult<AgentResponse> {
197        trace!(
198            agent_id = %self.config.id,
199            event_type = ?event_type,
200            "Preparing to call agent"
201        );
202
203        // Get or create connection
204        let mut client_guard = self.client.write().await;
205
206        if client_guard.is_none() {
207            trace!(
208                agent_id = %self.config.id,
209                "No existing connection, initializing"
210            );
211            drop(client_guard);
212            self.initialize().await?;
213            client_guard = self.client.write().await;
214        }
215
216        let client = client_guard.as_mut().ok_or_else(|| {
217            error!(
218                agent_id = %self.config.id,
219                event_type = ?event_type,
220                "No client connection available after initialization"
221            );
222            SentinelError::Agent {
223                agent: self.config.id.clone(),
224                message: "No client connection".to_string(),
225                event: format!("{:?}", event_type),
226                source: None,
227            }
228        })?;
229
230        // Make the call
231        let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
232
233        trace!(
234            agent_id = %self.config.id,
235            event_type = ?event_type,
236            call_num = call_num,
237            "Sending event to agent"
238        );
239
240        client.send_event(event_type, event).await.map_err(|e| {
241            error!(
242                agent_id = %self.config.id,
243                event_type = ?event_type,
244                error = %e,
245                "Agent call failed"
246            );
247            SentinelError::Agent {
248                agent: self.config.id.clone(),
249                message: e.to_string(),
250                event: format!("{:?}", event_type),
251                source: None,
252            }
253        })
254    }
255
256    /// Record successful call.
257    pub async fn record_success(&self, duration: Duration) {
258        let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
259        self.metrics
260            .duration_total_us
261            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
262        self.consecutive_failures.store(0, Ordering::Relaxed);
263        *self.last_success.write().await = Some(Instant::now());
264
265        trace!(
266            agent_id = %self.config.id,
267            duration_ms = duration.as_millis(),
268            total_successes = success_count,
269            "Recorded agent call success"
270        );
271
272        self.circuit_breaker.record_success().await;
273    }
274
275    /// Record failed call.
276    pub async fn record_failure(&self) {
277        let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
278        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
279
280        debug!(
281            agent_id = %self.config.id,
282            total_failures = fail_count,
283            consecutive_failures = consecutive,
284            "Recorded agent call failure"
285        );
286
287        self.circuit_breaker.record_failure().await;
288    }
289
290    /// Record timeout.
291    pub async fn record_timeout(&self) {
292        let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
293        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
294
295        debug!(
296            agent_id = %self.config.id,
297            total_timeouts = timeout_count,
298            consecutive_failures = consecutive,
299            timeout_ms = self.config.timeout_ms,
300            "Recorded agent call timeout"
301        );
302
303        self.circuit_breaker.record_failure().await;
304    }
305
306    /// Shutdown agent.
307    pub async fn shutdown(&self) {
308        debug!(
309            agent_id = %self.config.id,
310            "Shutting down agent"
311        );
312
313        if let Some(client) = self.client.write().await.take() {
314            trace!(
315                agent_id = %self.config.id,
316                "Closing agent client connection"
317            );
318            let _ = client.close().await;
319        }
320
321        let stats = (
322            self.metrics.calls_total.load(Ordering::Relaxed),
323            self.metrics.calls_success.load(Ordering::Relaxed),
324            self.metrics.calls_failed.load(Ordering::Relaxed),
325            self.metrics.calls_timeout.load(Ordering::Relaxed),
326        );
327
328        info!(
329            agent_id = %self.config.id,
330            total_calls = stats.0,
331            successes = stats.1,
332            failures = stats.2,
333            timeouts = stats.3,
334            "Agent shutdown complete"
335        );
336    }
337}