1use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, ConfigureEvent, Decision, EventType};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::{debug, error, info, trace, warn};
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16pub struct Agent {
18 pub(super) config: AgentConfig,
20 pub(super) client: Arc<RwLock<Option<AgentClient>>>,
22 pub(super) pool: Arc<AgentConnectionPool>,
24 pub(super) circuit_breaker: Arc<CircuitBreaker>,
26 pub(super) metrics: Arc<AgentMetrics>,
28 pub(super) last_success: Arc<RwLock<Option<Instant>>>,
30 pub(super) consecutive_failures: AtomicU32,
32}
33
34impl Agent {
35 pub fn new(
37 config: AgentConfig,
38 pool: Arc<AgentConnectionPool>,
39 circuit_breaker: Arc<CircuitBreaker>,
40 ) -> Self {
41 trace!(
42 agent_id = %config.id,
43 agent_type = ?config.agent_type,
44 timeout_ms = config.timeout_ms,
45 events = ?config.events,
46 "Creating agent instance"
47 );
48 Self {
49 config,
50 client: Arc::new(RwLock::new(None)),
51 pool,
52 circuit_breaker,
53 metrics: Arc::new(AgentMetrics::default()),
54 last_success: Arc::new(RwLock::new(None)),
55 consecutive_failures: AtomicU32::new(0),
56 }
57 }
58
59 pub fn id(&self) -> &str {
61 &self.config.id
62 }
63
64 pub fn circuit_breaker(&self) -> &CircuitBreaker {
66 &self.circuit_breaker
67 }
68
69 pub fn failure_mode(&self) -> sentinel_config::FailureMode {
71 self.config.failure_mode
72 }
73
74 pub fn timeout_ms(&self) -> u64 {
76 self.config.timeout_ms
77 }
78
79 pub fn metrics(&self) -> &AgentMetrics {
81 &self.metrics
82 }
83
84 pub fn handles_event(&self, event_type: EventType) -> bool {
86 self.config.events.iter().any(|e| match (e, event_type) {
87 (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
88 (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
89 (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
90 (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
91 (AgentEvent::Log, EventType::RequestComplete) => true,
92 (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
93 _ => false,
94 })
95 }
96
97 pub async fn initialize(&self) -> SentinelResult<()> {
99 let timeout = Duration::from_millis(self.config.timeout_ms);
100
101 debug!(
102 agent_id = %self.config.id,
103 transport = ?self.config.transport,
104 timeout_ms = self.config.timeout_ms,
105 "Initializing agent connection"
106 );
107
108 let start = Instant::now();
109
110 match &self.config.transport {
111 AgentTransport::UnixSocket { path } => {
112 trace!(
113 agent_id = %self.config.id,
114 socket_path = %path.display(),
115 "Connecting to agent via Unix socket"
116 );
117
118 let client = AgentClient::unix_socket(&self.config.id, path, timeout)
119 .await
120 .map_err(|e| {
121 error!(
122 agent_id = %self.config.id,
123 socket_path = %path.display(),
124 error = %e,
125 "Failed to connect to agent via Unix socket"
126 );
127 SentinelError::Agent {
128 agent: self.config.id.clone(),
129 message: format!("Failed to connect via Unix socket: {}", e),
130 event: "initialize".to_string(),
131 source: None,
132 }
133 })?;
134
135 *self.client.write().await = Some(client);
136
137 info!(
138 agent_id = %self.config.id,
139 socket_path = %path.display(),
140 connect_time_ms = start.elapsed().as_millis(),
141 "Agent connected via Unix socket"
142 );
143
144 self.send_configure_event().await?;
146
147 Ok(())
148 }
149 AgentTransport::Grpc { address, tls: _ } => {
150 trace!(
151 agent_id = %self.config.id,
152 address = %address,
153 "Connecting to agent via gRPC"
154 );
155
156 let client = AgentClient::grpc(&self.config.id, address, timeout)
158 .await
159 .map_err(|e| {
160 error!(
161 agent_id = %self.config.id,
162 address = %address,
163 error = %e,
164 "Failed to connect to agent via gRPC"
165 );
166 SentinelError::Agent {
167 agent: self.config.id.clone(),
168 message: format!("Failed to connect via gRPC: {}", e),
169 event: "initialize".to_string(),
170 source: None,
171 }
172 })?;
173
174 *self.client.write().await = Some(client);
175
176 info!(
177 agent_id = %self.config.id,
178 address = %address,
179 connect_time_ms = start.elapsed().as_millis(),
180 "Agent connected via gRPC"
181 );
182
183 self.send_configure_event().await?;
185
186 Ok(())
187 }
188 AgentTransport::Http { url, tls: _ } => {
189 warn!(
190 agent_id = %self.config.id,
191 url = %url,
192 "HTTP transport not yet implemented, agent will not be available"
193 );
194 Ok(())
195 }
196 }
197 }
198
199 async fn send_configure_event(&self) -> SentinelResult<()> {
201 let config = match &self.config.config {
203 Some(c) => c.clone(),
204 None => {
205 trace!(
206 agent_id = %self.config.id,
207 "No config for agent, skipping Configure event"
208 );
209 return Ok(());
210 }
211 };
212
213 let event = ConfigureEvent {
214 agent_id: self.config.id.clone(),
215 config,
216 };
217
218 debug!(
219 agent_id = %self.config.id,
220 "Sending Configure event to agent"
221 );
222
223 let mut client_guard = self.client.write().await;
224 let client = client_guard.as_mut().ok_or_else(|| SentinelError::Agent {
225 agent: self.config.id.clone(),
226 message: "No client connection for Configure event".to_string(),
227 event: "configure".to_string(),
228 source: None,
229 })?;
230
231 let response = client.send_event(EventType::Configure, &event).await.map_err(|e| {
232 error!(
233 agent_id = %self.config.id,
234 error = %e,
235 "Failed to send Configure event"
236 );
237 SentinelError::Agent {
238 agent: self.config.id.clone(),
239 message: format!("Configure event failed: {}", e),
240 event: "configure".to_string(),
241 source: None,
242 }
243 })?;
244
245 if !matches!(response.decision, Decision::Allow) {
247 error!(
248 agent_id = %self.config.id,
249 decision = ?response.decision,
250 "Agent rejected configuration"
251 );
252 return Err(SentinelError::Agent {
253 agent: self.config.id.clone(),
254 message: "Agent rejected configuration".to_string(),
255 event: "configure".to_string(),
256 source: None,
257 });
258 }
259
260 info!(
261 agent_id = %self.config.id,
262 "Agent accepted configuration"
263 );
264
265 Ok(())
266 }
267
268 pub async fn call_event<T: serde::Serialize>(
270 &self,
271 event_type: EventType,
272 event: &T,
273 ) -> SentinelResult<AgentResponse> {
274 trace!(
275 agent_id = %self.config.id,
276 event_type = ?event_type,
277 "Preparing to call agent"
278 );
279
280 let mut client_guard = self.client.write().await;
282
283 if client_guard.is_none() {
284 trace!(
285 agent_id = %self.config.id,
286 "No existing connection, initializing"
287 );
288 drop(client_guard);
289 self.initialize().await?;
290 client_guard = self.client.write().await;
291 }
292
293 let client = client_guard.as_mut().ok_or_else(|| {
294 error!(
295 agent_id = %self.config.id,
296 event_type = ?event_type,
297 "No client connection available after initialization"
298 );
299 SentinelError::Agent {
300 agent: self.config.id.clone(),
301 message: "No client connection".to_string(),
302 event: format!("{:?}", event_type),
303 source: None,
304 }
305 })?;
306
307 let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
309
310 trace!(
311 agent_id = %self.config.id,
312 event_type = ?event_type,
313 call_num = call_num,
314 "Sending event to agent"
315 );
316
317 let result = client.send_event(event_type, event).await;
318
319 match result {
321 Ok(response) => Ok(response),
322 Err(e) => {
323 let error_str = e.to_string();
324 let is_connection_error = error_str.contains("Broken pipe")
325 || error_str.contains("Connection reset")
326 || error_str.contains("Connection refused")
327 || error_str.contains("not connected")
328 || error_str.contains("transport error");
329
330 error!(
331 agent_id = %self.config.id,
332 event_type = ?event_type,
333 error = %e,
334 is_connection_error = is_connection_error,
335 "Agent call failed"
336 );
337
338 drop(client_guard);
340
341 if is_connection_error {
343 warn!(
344 agent_id = %self.config.id,
345 "Clearing cached client due to connection error, next call will reconnect"
346 );
347 *self.client.write().await = None;
348 }
349
350 Err(SentinelError::Agent {
351 agent: self.config.id.clone(),
352 message: e.to_string(),
353 event: format!("{:?}", event_type),
354 source: None,
355 })
356 }
357 }
358 }
359
360 pub async fn record_success(&self, duration: Duration) {
362 let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
363 self.metrics
364 .duration_total_us
365 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
366 self.consecutive_failures.store(0, Ordering::Relaxed);
367 *self.last_success.write().await = Some(Instant::now());
368
369 trace!(
370 agent_id = %self.config.id,
371 duration_ms = duration.as_millis(),
372 total_successes = success_count,
373 "Recorded agent call success"
374 );
375
376 self.circuit_breaker.record_success().await;
377 }
378
379 pub async fn record_failure(&self) {
381 let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
382 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
383
384 debug!(
385 agent_id = %self.config.id,
386 total_failures = fail_count,
387 consecutive_failures = consecutive,
388 "Recorded agent call failure"
389 );
390
391 self.circuit_breaker.record_failure().await;
392 }
393
394 pub async fn record_timeout(&self) {
396 let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
397 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
398
399 debug!(
400 agent_id = %self.config.id,
401 total_timeouts = timeout_count,
402 consecutive_failures = consecutive,
403 timeout_ms = self.config.timeout_ms,
404 "Recorded agent call timeout"
405 );
406
407 self.circuit_breaker.record_failure().await;
408 }
409
410 pub async fn shutdown(&self) {
412 debug!(
413 agent_id = %self.config.id,
414 "Shutting down agent"
415 );
416
417 if let Some(client) = self.client.write().await.take() {
418 trace!(
419 agent_id = %self.config.id,
420 "Closing agent client connection"
421 );
422 let _ = client.close().await;
423 }
424
425 let stats = (
426 self.metrics.calls_total.load(Ordering::Relaxed),
427 self.metrics.calls_success.load(Ordering::Relaxed),
428 self.metrics.calls_failed.load(Ordering::Relaxed),
429 self.metrics.calls_timeout.load(Ordering::Relaxed),
430 );
431
432 info!(
433 agent_id = %self.config.id,
434 total_calls = stats.0,
435 successes = stats.1,
436 failures = stats.2,
437 timeouts = stats.3,
438 "Agent shutdown complete"
439 );
440 }
441}