1use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, ConfigureEvent, Decision, EventType, GrpcTlsConfig, HttpTlsConfig};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::{debug, error, info, trace, warn};
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16pub struct Agent {
18 pub(super) config: AgentConfig,
20 pub(super) client: Arc<RwLock<Option<AgentClient>>>,
22 pub(super) pool: Arc<AgentConnectionPool>,
24 pub(super) circuit_breaker: Arc<CircuitBreaker>,
26 pub(super) metrics: Arc<AgentMetrics>,
28 pub(super) last_success: Arc<RwLock<Option<Instant>>>,
30 pub(super) consecutive_failures: AtomicU32,
32}
33
34impl Agent {
35 pub fn new(
37 config: AgentConfig,
38 pool: Arc<AgentConnectionPool>,
39 circuit_breaker: Arc<CircuitBreaker>,
40 ) -> Self {
41 trace!(
42 agent_id = %config.id,
43 agent_type = ?config.agent_type,
44 timeout_ms = config.timeout_ms,
45 events = ?config.events,
46 "Creating agent instance"
47 );
48 Self {
49 config,
50 client: Arc::new(RwLock::new(None)),
51 pool,
52 circuit_breaker,
53 metrics: Arc::new(AgentMetrics::default()),
54 last_success: Arc::new(RwLock::new(None)),
55 consecutive_failures: AtomicU32::new(0),
56 }
57 }
58
59 pub fn id(&self) -> &str {
61 &self.config.id
62 }
63
64 pub fn circuit_breaker(&self) -> &CircuitBreaker {
66 &self.circuit_breaker
67 }
68
69 pub fn failure_mode(&self) -> sentinel_config::FailureMode {
71 self.config.failure_mode
72 }
73
74 pub fn timeout_ms(&self) -> u64 {
76 self.config.timeout_ms
77 }
78
79 pub fn metrics(&self) -> &AgentMetrics {
81 &self.metrics
82 }
83
84 pub fn handles_event(&self, event_type: EventType) -> bool {
86 self.config.events.iter().any(|e| match (e, event_type) {
87 (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
88 (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
89 (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
90 (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
91 (AgentEvent::Log, EventType::RequestComplete) => true,
92 (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
93 _ => false,
94 })
95 }
96
97 pub async fn initialize(&self) -> SentinelResult<()> {
99 let timeout = Duration::from_millis(self.config.timeout_ms);
100
101 debug!(
102 agent_id = %self.config.id,
103 transport = ?self.config.transport,
104 timeout_ms = self.config.timeout_ms,
105 "Initializing agent connection"
106 );
107
108 let start = Instant::now();
109
110 match &self.config.transport {
111 AgentTransport::UnixSocket { path } => {
112 trace!(
113 agent_id = %self.config.id,
114 socket_path = %path.display(),
115 "Connecting to agent via Unix socket"
116 );
117
118 let client = AgentClient::unix_socket(&self.config.id, path, timeout)
119 .await
120 .map_err(|e| {
121 error!(
122 agent_id = %self.config.id,
123 socket_path = %path.display(),
124 error = %e,
125 "Failed to connect to agent via Unix socket"
126 );
127 SentinelError::Agent {
128 agent: self.config.id.clone(),
129 message: format!("Failed to connect via Unix socket: {}", e),
130 event: "initialize".to_string(),
131 source: None,
132 }
133 })?;
134
135 *self.client.write().await = Some(client);
136
137 info!(
138 agent_id = %self.config.id,
139 socket_path = %path.display(),
140 connect_time_ms = start.elapsed().as_millis(),
141 "Agent connected via Unix socket"
142 );
143
144 self.send_configure_event().await?;
146
147 Ok(())
148 }
149 AgentTransport::Grpc { address, tls } => {
150 trace!(
151 agent_id = %self.config.id,
152 address = %address,
153 tls_enabled = tls.is_some(),
154 "Connecting to agent via gRPC"
155 );
156
157 let client = match tls {
158 Some(tls_config) => {
159 let mut grpc_tls = GrpcTlsConfig::new();
161
162 if let Some(ca_path) = &tls_config.ca_cert {
164 grpc_tls = grpc_tls.with_ca_cert_file(ca_path).await.map_err(|e| {
165 error!(
166 agent_id = %self.config.id,
167 ca_path = %ca_path.display(),
168 error = %e,
169 "Failed to load CA certificate for gRPC TLS"
170 );
171 SentinelError::Agent {
172 agent: self.config.id.clone(),
173 message: format!("Failed to load CA certificate: {}", e),
174 event: "initialize".to_string(),
175 source: None,
176 }
177 })?;
178 }
179
180 if let (Some(cert_path), Some(key_path)) = (&tls_config.client_cert, &tls_config.client_key) {
182 grpc_tls = grpc_tls.with_client_cert_files(cert_path, key_path).await.map_err(|e| {
183 error!(
184 agent_id = %self.config.id,
185 cert_path = %cert_path.display(),
186 key_path = %key_path.display(),
187 error = %e,
188 "Failed to load client certificate for gRPC mTLS"
189 );
190 SentinelError::Agent {
191 agent: self.config.id.clone(),
192 message: format!("Failed to load client certificate: {}", e),
193 event: "initialize".to_string(),
194 source: None,
195 }
196 })?;
197 }
198
199 if tls_config.insecure_skip_verify {
201 warn!(
202 agent_id = %self.config.id,
203 address = %address,
204 "SECURITY WARNING: TLS certificate verification disabled for agent"
205 );
206 grpc_tls = grpc_tls.with_insecure_skip_verify();
207 }
208
209 debug!(
210 agent_id = %self.config.id,
211 address = %address,
212 has_ca_cert = tls_config.ca_cert.is_some(),
213 has_client_cert = tls_config.client_cert.is_some(),
214 "Connecting to agent via gRPC with TLS"
215 );
216
217 AgentClient::grpc_tls(&self.config.id, address, timeout, grpc_tls)
218 .await
219 .map_err(|e| {
220 error!(
221 agent_id = %self.config.id,
222 address = %address,
223 error = %e,
224 "Failed to connect to agent via gRPC with TLS"
225 );
226 SentinelError::Agent {
227 agent: self.config.id.clone(),
228 message: format!("Failed to connect via gRPC TLS: {}", e),
229 event: "initialize".to_string(),
230 source: None,
231 }
232 })?
233 }
234 None => {
235 AgentClient::grpc(&self.config.id, address, timeout)
237 .await
238 .map_err(|e| {
239 error!(
240 agent_id = %self.config.id,
241 address = %address,
242 error = %e,
243 "Failed to connect to agent via gRPC"
244 );
245 SentinelError::Agent {
246 agent: self.config.id.clone(),
247 message: format!("Failed to connect via gRPC: {}", e),
248 event: "initialize".to_string(),
249 source: None,
250 }
251 })?
252 }
253 };
254
255 *self.client.write().await = Some(client);
256
257 info!(
258 agent_id = %self.config.id,
259 address = %address,
260 tls_enabled = tls.is_some(),
261 connect_time_ms = start.elapsed().as_millis(),
262 "Agent connected via gRPC"
263 );
264
265 self.send_configure_event().await?;
267
268 Ok(())
269 }
270 AgentTransport::Http { url, tls } => {
271 trace!(
272 agent_id = %self.config.id,
273 url = %url,
274 tls_enabled = tls.is_some(),
275 "Connecting to agent via HTTP"
276 );
277
278 let client = match tls {
279 Some(tls_config) => {
280 let mut http_tls = HttpTlsConfig::new();
282
283 if let Some(ca_path) = &tls_config.ca_cert {
285 http_tls = http_tls.with_ca_cert_file(ca_path).await.map_err(|e| {
286 error!(
287 agent_id = %self.config.id,
288 ca_path = %ca_path.display(),
289 error = %e,
290 "Failed to load CA certificate for HTTP TLS"
291 );
292 SentinelError::Agent {
293 agent: self.config.id.clone(),
294 message: format!("Failed to load CA certificate: {}", e),
295 event: "initialize".to_string(),
296 source: None,
297 }
298 })?;
299 }
300
301 if let (Some(cert_path), Some(key_path)) = (&tls_config.client_cert, &tls_config.client_key) {
303 http_tls = http_tls.with_client_cert_files(cert_path, key_path).await.map_err(|e| {
304 error!(
305 agent_id = %self.config.id,
306 cert_path = %cert_path.display(),
307 key_path = %key_path.display(),
308 error = %e,
309 "Failed to load client certificate for HTTP mTLS"
310 );
311 SentinelError::Agent {
312 agent: self.config.id.clone(),
313 message: format!("Failed to load client certificate: {}", e),
314 event: "initialize".to_string(),
315 source: None,
316 }
317 })?;
318 }
319
320 if tls_config.insecure_skip_verify {
322 warn!(
323 agent_id = %self.config.id,
324 url = %url,
325 "SECURITY WARNING: TLS certificate verification disabled for HTTP agent"
326 );
327 http_tls = http_tls.with_insecure_skip_verify();
328 }
329
330 debug!(
331 agent_id = %self.config.id,
332 url = %url,
333 has_ca_cert = tls_config.ca_cert.is_some(),
334 has_client_cert = tls_config.client_cert.is_some(),
335 "Connecting to agent via HTTP with TLS"
336 );
337
338 AgentClient::http_tls(&self.config.id, url, timeout, http_tls)
339 .await
340 .map_err(|e| {
341 error!(
342 agent_id = %self.config.id,
343 url = %url,
344 error = %e,
345 "Failed to create HTTP TLS agent client"
346 );
347 SentinelError::Agent {
348 agent: self.config.id.clone(),
349 message: format!("Failed to create HTTP TLS client: {}", e),
350 event: "initialize".to_string(),
351 source: None,
352 }
353 })?
354 }
355 None => {
356 AgentClient::http(&self.config.id, url, timeout)
358 .await
359 .map_err(|e| {
360 error!(
361 agent_id = %self.config.id,
362 url = %url,
363 error = %e,
364 "Failed to create HTTP agent client"
365 );
366 SentinelError::Agent {
367 agent: self.config.id.clone(),
368 message: format!("Failed to create HTTP client: {}", e),
369 event: "initialize".to_string(),
370 source: None,
371 }
372 })?
373 }
374 };
375
376 *self.client.write().await = Some(client);
377
378 info!(
379 agent_id = %self.config.id,
380 url = %url,
381 tls_enabled = tls.is_some(),
382 connect_time_ms = start.elapsed().as_millis(),
383 "Agent connected via HTTP"
384 );
385
386 self.send_configure_event().await?;
388
389 Ok(())
390 }
391 }
392 }
393
394 async fn send_configure_event(&self) -> SentinelResult<()> {
396 let config = match &self.config.config {
398 Some(c) => c.clone(),
399 None => {
400 trace!(
401 agent_id = %self.config.id,
402 "No config for agent, skipping Configure event"
403 );
404 return Ok(());
405 }
406 };
407
408 let event = ConfigureEvent {
409 agent_id: self.config.id.clone(),
410 config,
411 };
412
413 debug!(
414 agent_id = %self.config.id,
415 "Sending Configure event to agent"
416 );
417
418 let mut client_guard = self.client.write().await;
419 let client = client_guard.as_mut().ok_or_else(|| SentinelError::Agent {
420 agent: self.config.id.clone(),
421 message: "No client connection for Configure event".to_string(),
422 event: "configure".to_string(),
423 source: None,
424 })?;
425
426 let response = client.send_event(EventType::Configure, &event).await.map_err(|e| {
427 error!(
428 agent_id = %self.config.id,
429 error = %e,
430 "Failed to send Configure event"
431 );
432 SentinelError::Agent {
433 agent: self.config.id.clone(),
434 message: format!("Configure event failed: {}", e),
435 event: "configure".to_string(),
436 source: None,
437 }
438 })?;
439
440 if !matches!(response.decision, Decision::Allow) {
442 error!(
443 agent_id = %self.config.id,
444 decision = ?response.decision,
445 "Agent rejected configuration"
446 );
447 return Err(SentinelError::Agent {
448 agent: self.config.id.clone(),
449 message: "Agent rejected configuration".to_string(),
450 event: "configure".to_string(),
451 source: None,
452 });
453 }
454
455 info!(
456 agent_id = %self.config.id,
457 "Agent accepted configuration"
458 );
459
460 Ok(())
461 }
462
463 pub async fn call_event<T: serde::Serialize>(
465 &self,
466 event_type: EventType,
467 event: &T,
468 ) -> SentinelResult<AgentResponse> {
469 trace!(
470 agent_id = %self.config.id,
471 event_type = ?event_type,
472 "Preparing to call agent"
473 );
474
475 let mut client_guard = self.client.write().await;
477
478 if client_guard.is_none() {
479 trace!(
480 agent_id = %self.config.id,
481 "No existing connection, initializing"
482 );
483 drop(client_guard);
484 self.initialize().await?;
485 client_guard = self.client.write().await;
486 }
487
488 let client = client_guard.as_mut().ok_or_else(|| {
489 error!(
490 agent_id = %self.config.id,
491 event_type = ?event_type,
492 "No client connection available after initialization"
493 );
494 SentinelError::Agent {
495 agent: self.config.id.clone(),
496 message: "No client connection".to_string(),
497 event: format!("{:?}", event_type),
498 source: None,
499 }
500 })?;
501
502 let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
504
505 trace!(
506 agent_id = %self.config.id,
507 event_type = ?event_type,
508 call_num = call_num,
509 "Sending event to agent"
510 );
511
512 let result = client.send_event(event_type, event).await;
513
514 match result {
516 Ok(response) => Ok(response),
517 Err(e) => {
518 let error_str = e.to_string();
519 let is_connection_error = error_str.contains("Broken pipe")
520 || error_str.contains("Connection reset")
521 || error_str.contains("Connection refused")
522 || error_str.contains("not connected")
523 || error_str.contains("transport error");
524
525 error!(
526 agent_id = %self.config.id,
527 event_type = ?event_type,
528 error = %e,
529 is_connection_error = is_connection_error,
530 "Agent call failed"
531 );
532
533 drop(client_guard);
535
536 if is_connection_error {
538 warn!(
539 agent_id = %self.config.id,
540 "Clearing cached client due to connection error, next call will reconnect"
541 );
542 *self.client.write().await = None;
543 }
544
545 Err(SentinelError::Agent {
546 agent: self.config.id.clone(),
547 message: e.to_string(),
548 event: format!("{:?}", event_type),
549 source: None,
550 })
551 }
552 }
553 }
554
555 pub async fn record_success(&self, duration: Duration) {
557 let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
558 self.metrics
559 .duration_total_us
560 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
561 self.consecutive_failures.store(0, Ordering::Relaxed);
562 *self.last_success.write().await = Some(Instant::now());
563
564 trace!(
565 agent_id = %self.config.id,
566 duration_ms = duration.as_millis(),
567 total_successes = success_count,
568 "Recorded agent call success"
569 );
570
571 self.circuit_breaker.record_success().await;
572 }
573
574 pub async fn record_failure(&self) {
576 let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
577 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
578
579 debug!(
580 agent_id = %self.config.id,
581 total_failures = fail_count,
582 consecutive_failures = consecutive,
583 "Recorded agent call failure"
584 );
585
586 self.circuit_breaker.record_failure().await;
587 }
588
589 pub async fn record_timeout(&self) {
591 let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
592 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
593
594 debug!(
595 agent_id = %self.config.id,
596 total_timeouts = timeout_count,
597 consecutive_failures = consecutive,
598 timeout_ms = self.config.timeout_ms,
599 "Recorded agent call timeout"
600 );
601
602 self.circuit_breaker.record_failure().await;
603 }
604
605 pub async fn shutdown(&self) {
607 debug!(
608 agent_id = %self.config.id,
609 "Shutting down agent"
610 );
611
612 if let Some(client) = self.client.write().await.take() {
613 trace!(
614 agent_id = %self.config.id,
615 "Closing agent client connection"
616 );
617 let _ = client.close().await;
618 }
619
620 let stats = (
621 self.metrics.calls_total.load(Ordering::Relaxed),
622 self.metrics.calls_success.load(Ordering::Relaxed),
623 self.metrics.calls_failed.load(Ordering::Relaxed),
624 self.metrics.calls_timeout.load(Ordering::Relaxed),
625 );
626
627 info!(
628 agent_id = %self.config.id,
629 total_calls = stats.0,
630 successes = stats.1,
631 failures = stats.2,
632 timeouts = stats.3,
633 "Agent shutdown complete"
634 );
635 }
636}