1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use base64::{engine::general_purpose::STANDARD, Engine as _};
8use pingora_timeout::timeout;
9use sentinel_agent_protocol::{
10 v2::MetricsCollector,
11 AgentResponse, EventType, RequestBodyChunkEvent, RequestHeadersEvent, ResponseBodyChunkEvent,
12 ResponseHeadersEvent, WebSocketFrameEvent,
13};
14use sentinel_common::{
15 errors::{SentinelError, SentinelResult},
16 types::CircuitBreakerConfig,
17 CircuitBreaker,
18};
19use futures::future::join_all;
20use sentinel_config::{AgentConfig, AgentProtocolVersion, FailureMode};
21use tokio::sync::{RwLock, Semaphore};
22use tracing::{debug, error, info, trace, warn};
23
24use super::agent::Agent;
25use super::agent_v2::AgentV2;
26use super::context::AgentCallContext;
27use super::decision::AgentDecision;
28use super::metrics::AgentMetrics;
29use super::pool::AgentConnectionPool;
30
31pub enum UnifiedAgent {
33 V1(Arc<Agent>),
34 V2(Arc<AgentV2>),
35}
36
37impl UnifiedAgent {
38 pub fn id(&self) -> &str {
40 match self {
41 UnifiedAgent::V1(agent) => agent.id(),
42 UnifiedAgent::V2(agent) => agent.id(),
43 }
44 }
45
46 pub fn circuit_breaker(&self) -> &CircuitBreaker {
48 match self {
49 UnifiedAgent::V1(agent) => agent.circuit_breaker(),
50 UnifiedAgent::V2(agent) => agent.circuit_breaker(),
51 }
52 }
53
54 pub fn failure_mode(&self) -> FailureMode {
56 match self {
57 UnifiedAgent::V1(agent) => agent.failure_mode(),
58 UnifiedAgent::V2(agent) => agent.failure_mode(),
59 }
60 }
61
62 pub fn timeout_ms(&self) -> u64 {
64 match self {
65 UnifiedAgent::V1(agent) => agent.timeout_ms(),
66 UnifiedAgent::V2(agent) => agent.timeout_ms(),
67 }
68 }
69
70 pub fn handles_event(&self, event_type: EventType) -> bool {
72 match self {
73 UnifiedAgent::V1(agent) => agent.handles_event(event_type),
74 UnifiedAgent::V2(agent) => agent.handles_event(event_type),
75 }
76 }
77
78 pub async fn initialize(&self) -> SentinelResult<()> {
80 match self {
81 UnifiedAgent::V1(agent) => agent.initialize().await,
82 UnifiedAgent::V2(agent) => agent.initialize().await,
83 }
84 }
85
86 pub async fn call_event<T: serde::Serialize>(
92 &self,
93 event_type: EventType,
94 event: &T,
95 ) -> SentinelResult<AgentResponse> {
96 match self {
97 UnifiedAgent::V1(agent) => agent.call_event(event_type, event).await,
98 UnifiedAgent::V2(agent) => {
99 let json = serde_json::to_value(event).map_err(|e| SentinelError::Agent {
101 agent: agent.id().to_string(),
102 message: format!("Failed to serialize event: {}", e),
103 event: format!("{:?}", event_type),
104 source: None,
105 })?;
106
107 match event_type {
108 EventType::RequestHeaders => {
109 let typed_event: RequestHeadersEvent =
110 serde_json::from_value(json).map_err(|e| SentinelError::Agent {
111 agent: agent.id().to_string(),
112 message: format!("Failed to deserialize RequestHeadersEvent: {}", e),
113 event: format!("{:?}", event_type),
114 source: None,
115 })?;
116 agent.call_request_headers(&typed_event).await
117 }
118 EventType::RequestBodyChunk => {
119 let typed_event: RequestBodyChunkEvent =
120 serde_json::from_value(json).map_err(|e| SentinelError::Agent {
121 agent: agent.id().to_string(),
122 message: format!("Failed to deserialize RequestBodyChunkEvent: {}", e),
123 event: format!("{:?}", event_type),
124 source: None,
125 })?;
126 agent.call_request_body_chunk(&typed_event).await
127 }
128 EventType::ResponseHeaders => {
129 let typed_event: ResponseHeadersEvent =
130 serde_json::from_value(json).map_err(|e| SentinelError::Agent {
131 agent: agent.id().to_string(),
132 message: format!("Failed to deserialize ResponseHeadersEvent: {}", e),
133 event: format!("{:?}", event_type),
134 source: None,
135 })?;
136 agent.call_response_headers(&typed_event).await
137 }
138 EventType::ResponseBodyChunk => {
139 let typed_event: ResponseBodyChunkEvent =
140 serde_json::from_value(json).map_err(|e| SentinelError::Agent {
141 agent: agent.id().to_string(),
142 message: format!("Failed to deserialize ResponseBodyChunkEvent: {}", e),
143 event: format!("{:?}", event_type),
144 source: None,
145 })?;
146 agent.call_response_body_chunk(&typed_event).await
147 }
148 _ => {
149 Err(SentinelError::Agent {
151 agent: agent.id().to_string(),
152 message: format!("V2 does not support event type {:?}", event_type),
153 event: format!("{:?}", event_type),
154 source: None,
155 })
156 }
157 }
158 }
159 }
160 }
161
162 pub async fn call_request_headers(
164 &self,
165 event: &RequestHeadersEvent,
166 ) -> SentinelResult<AgentResponse> {
167 match self {
168 UnifiedAgent::V1(agent) => agent.call_event(EventType::RequestHeaders, event).await,
169 UnifiedAgent::V2(agent) => agent.call_request_headers(event).await,
170 }
171 }
172
173 pub async fn call_request_body_chunk(
175 &self,
176 event: &RequestBodyChunkEvent,
177 ) -> SentinelResult<AgentResponse> {
178 match self {
179 UnifiedAgent::V1(agent) => agent.call_event(EventType::RequestBodyChunk, event).await,
180 UnifiedAgent::V2(agent) => agent.call_request_body_chunk(event).await,
181 }
182 }
183
184 pub async fn call_response_headers(
186 &self,
187 event: &ResponseHeadersEvent,
188 ) -> SentinelResult<AgentResponse> {
189 match self {
190 UnifiedAgent::V1(agent) => agent.call_event(EventType::ResponseHeaders, event).await,
191 UnifiedAgent::V2(agent) => agent.call_response_headers(event).await,
192 }
193 }
194
195 pub async fn call_response_body_chunk(
197 &self,
198 event: &ResponseBodyChunkEvent,
199 ) -> SentinelResult<AgentResponse> {
200 match self {
201 UnifiedAgent::V1(agent) => agent.call_event(EventType::ResponseBodyChunk, event).await,
202 UnifiedAgent::V2(agent) => agent.call_response_body_chunk(event).await,
203 }
204 }
205
206 pub async fn record_success(&self, duration: Duration) {
208 match self {
209 UnifiedAgent::V1(agent) => agent.record_success(duration).await,
210 UnifiedAgent::V2(agent) => agent.record_success(duration),
211 }
212 }
213
214 pub async fn record_failure(&self) {
216 match self {
217 UnifiedAgent::V1(agent) => agent.record_failure().await,
218 UnifiedAgent::V2(agent) => agent.record_failure(),
219 }
220 }
221
222 pub async fn record_timeout(&self) {
224 match self {
225 UnifiedAgent::V1(agent) => agent.record_timeout().await,
226 UnifiedAgent::V2(agent) => agent.record_timeout(),
227 }
228 }
229
230 pub async fn shutdown(&self) {
232 match self {
233 UnifiedAgent::V1(agent) => agent.shutdown().await,
234 UnifiedAgent::V2(agent) => agent.shutdown().await,
235 }
236 }
237
238 pub fn is_v2(&self) -> bool {
240 matches!(self, UnifiedAgent::V2(_))
241 }
242}
243
244pub struct AgentManager {
250 agents: Arc<RwLock<HashMap<String, Arc<UnifiedAgent>>>>,
252 connection_pools: Arc<RwLock<HashMap<String, Arc<AgentConnectionPool>>>>,
254 circuit_breakers: Arc<RwLock<HashMap<String, Arc<CircuitBreaker>>>>,
256 metrics: Arc<AgentMetrics>,
258 agent_semaphores: Arc<RwLock<HashMap<String, Arc<Semaphore>>>>,
260}
261
262impl AgentManager {
263 pub async fn new(agents: Vec<AgentConfig>) -> SentinelResult<Self> {
271 info!(agent_count = agents.len(), "Creating agent manager");
272
273 let mut agent_map = HashMap::new();
274 let mut pools = HashMap::new();
275 let mut breakers = HashMap::new();
276 let mut semaphores = HashMap::new();
277
278 let mut v1_count = 0;
279 let mut v2_count = 0;
280
281 for config in agents {
282 debug!(
283 agent_id = %config.id,
284 transport = ?config.transport,
285 timeout_ms = config.timeout_ms,
286 failure_mode = ?config.failure_mode,
287 max_concurrent_calls = config.max_concurrent_calls,
288 protocol_version = ?config.protocol_version,
289 "Configuring agent"
290 );
291
292 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_calls));
294
295 let unified_agent = match config.protocol_version {
296 AgentProtocolVersion::V1 => {
297 let pool = Arc::new(AgentConnectionPool::new(
299 10, 2, 5, Duration::from_secs(60),
303 ));
304
305 let circuit_breaker = Arc::new(CircuitBreaker::new(
306 config
307 .circuit_breaker
308 .clone()
309 .unwrap_or_else(CircuitBreakerConfig::default),
310 ));
311
312 trace!(
313 agent_id = %config.id,
314 max_concurrent_calls = config.max_concurrent_calls,
315 "Creating v1 agent instance with isolated queue"
316 );
317
318 let agent = Arc::new(Agent::new(
319 config.clone(),
320 Arc::clone(&pool),
321 Arc::clone(&circuit_breaker),
322 ));
323
324 pools.insert(config.id.clone(), pool);
325 breakers.insert(config.id.clone(), circuit_breaker);
326 v1_count += 1;
327
328 Arc::new(UnifiedAgent::V1(agent))
329 }
330 AgentProtocolVersion::V2 => {
331 let circuit_breaker = Arc::new(CircuitBreaker::new(
333 config
334 .circuit_breaker
335 .clone()
336 .unwrap_or_else(CircuitBreakerConfig::default),
337 ));
338
339 trace!(
340 agent_id = %config.id,
341 max_concurrent_calls = config.max_concurrent_calls,
342 pool_config = ?config.pool,
343 "Creating v2 agent instance with internal pool"
344 );
345
346 let agent = Arc::new(AgentV2::new(
347 config.clone(),
348 circuit_breaker,
349 ));
350
351 v2_count += 1;
352
353 Arc::new(UnifiedAgent::V2(agent))
354 }
355 };
356
357 agent_map.insert(config.id.clone(), unified_agent);
358 semaphores.insert(config.id.clone(), semaphore);
359
360 debug!(
361 agent_id = %config.id,
362 protocol_version = ?config.protocol_version,
363 "Agent configured successfully"
364 );
365 }
366
367 info!(
368 configured_agents = agent_map.len(),
369 v1_agents = v1_count,
370 v2_agents = v2_count,
371 "Agent manager created successfully with per-agent queue isolation"
372 );
373
374 Ok(Self {
375 agents: Arc::new(RwLock::new(agent_map)),
376 connection_pools: Arc::new(RwLock::new(pools)),
377 circuit_breakers: Arc::new(RwLock::new(breakers)),
378 metrics: Arc::new(AgentMetrics::default()),
379 agent_semaphores: Arc::new(RwLock::new(semaphores)),
380 })
381 }
382
383 pub async fn process_request_headers(
390 &self,
391 ctx: &AgentCallContext,
392 headers: &HashMap<String, Vec<String>>,
393 route_agents: &[(String, FailureMode)],
394 ) -> SentinelResult<AgentDecision> {
395 let event = RequestHeadersEvent {
396 metadata: ctx.metadata.clone(),
397 method: headers
398 .get(":method")
399 .and_then(|v| v.first())
400 .unwrap_or(&"GET".to_string())
401 .clone(),
402 uri: headers
403 .get(":path")
404 .and_then(|v| v.first())
405 .unwrap_or(&"/".to_string())
406 .clone(),
407 headers: headers.clone(),
408 };
409
410 self.process_event_parallel(EventType::RequestHeaders, &event, route_agents, ctx)
412 .await
413 }
414
415 pub async fn process_request_body(
417 &self,
418 ctx: &AgentCallContext,
419 data: &[u8],
420 is_last: bool,
421 route_agents: &[String],
422 ) -> SentinelResult<AgentDecision> {
423 let max_size = 1024 * 1024; if data.len() > max_size {
426 warn!(
427 correlation_id = %ctx.correlation_id,
428 size = data.len(),
429 "Request body exceeds agent inspection limit"
430 );
431 return Ok(AgentDecision::default_allow());
432 }
433
434 let event = RequestBodyChunkEvent {
435 correlation_id: ctx.correlation_id.to_string(),
436 data: STANDARD.encode(data),
437 is_last,
438 total_size: ctx.request_body.as_ref().map(|b| b.len()),
439 chunk_index: 0, bytes_received: data.len(),
441 };
442
443 self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
444 .await
445 }
446
447 pub async fn process_request_body_streaming(
452 &self,
453 ctx: &AgentCallContext,
454 data: &[u8],
455 is_last: bool,
456 chunk_index: u32,
457 bytes_received: usize,
458 total_size: Option<usize>,
459 route_agents: &[String],
460 ) -> SentinelResult<AgentDecision> {
461 trace!(
462 correlation_id = %ctx.correlation_id,
463 chunk_index = chunk_index,
464 chunk_size = data.len(),
465 bytes_received = bytes_received,
466 is_last = is_last,
467 "Processing streaming request body chunk"
468 );
469
470 let event = RequestBodyChunkEvent {
471 correlation_id: ctx.correlation_id.to_string(),
472 data: STANDARD.encode(data),
473 is_last,
474 total_size,
475 chunk_index,
476 bytes_received,
477 };
478
479 self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
480 .await
481 }
482
483 pub async fn process_response_body_streaming(
485 &self,
486 ctx: &AgentCallContext,
487 data: &[u8],
488 is_last: bool,
489 chunk_index: u32,
490 bytes_sent: usize,
491 total_size: Option<usize>,
492 route_agents: &[String],
493 ) -> SentinelResult<AgentDecision> {
494 trace!(
495 correlation_id = %ctx.correlation_id,
496 chunk_index = chunk_index,
497 chunk_size = data.len(),
498 bytes_sent = bytes_sent,
499 is_last = is_last,
500 "Processing streaming response body chunk"
501 );
502
503 let event = ResponseBodyChunkEvent {
504 correlation_id: ctx.correlation_id.to_string(),
505 data: STANDARD.encode(data),
506 is_last,
507 total_size,
508 chunk_index,
509 bytes_sent,
510 };
511
512 self.process_event(EventType::ResponseBodyChunk, &event, route_agents, ctx)
513 .await
514 }
515
516 pub async fn process_response_headers(
518 &self,
519 ctx: &AgentCallContext,
520 status: u16,
521 headers: &HashMap<String, Vec<String>>,
522 route_agents: &[String],
523 ) -> SentinelResult<AgentDecision> {
524 let event = ResponseHeadersEvent {
525 correlation_id: ctx.correlation_id.to_string(),
526 status,
527 headers: headers.clone(),
528 };
529
530 self.process_event(EventType::ResponseHeaders, &event, route_agents, ctx)
531 .await
532 }
533
534 pub async fn process_websocket_frame(
540 &self,
541 route_id: &str,
542 event: WebSocketFrameEvent,
543 ) -> SentinelResult<AgentResponse> {
544 trace!(
545 correlation_id = %event.correlation_id,
546 route_id = %route_id,
547 frame_index = event.frame_index,
548 opcode = %event.opcode,
549 "Processing WebSocket frame through agents"
550 );
551
552 let agents = self.agents.read().await;
554 let relevant_agents: Vec<_> = agents
555 .values()
556 .filter(|agent| agent.handles_event(EventType::WebSocketFrame))
557 .collect();
558
559 if relevant_agents.is_empty() {
560 trace!(
561 correlation_id = %event.correlation_id,
562 "No agents handle WebSocket frames, allowing"
563 );
564 return Ok(AgentResponse::websocket_allow());
565 }
566
567 debug!(
568 correlation_id = %event.correlation_id,
569 route_id = %route_id,
570 agent_count = relevant_agents.len(),
571 "Processing WebSocket frame through agents"
572 );
573
574 for agent in relevant_agents {
576 if !agent.circuit_breaker().is_closed() {
578 warn!(
579 agent_id = %agent.id(),
580 correlation_id = %event.correlation_id,
581 failure_mode = ?agent.failure_mode(),
582 "Circuit breaker open, skipping agent for WebSocket frame"
583 );
584
585 if agent.failure_mode() == FailureMode::Closed {
586 debug!(
587 correlation_id = %event.correlation_id,
588 agent_id = %agent.id(),
589 "Closing WebSocket due to circuit breaker (fail-closed mode)"
590 );
591 return Ok(AgentResponse::websocket_close(
592 1011,
593 "Service unavailable".to_string(),
594 ));
595 }
596 continue;
597 }
598
599 let start = Instant::now();
601 let timeout_duration = Duration::from_millis(agent.timeout_ms());
602
603 match timeout(
604 timeout_duration,
605 agent.call_event(EventType::WebSocketFrame, &event),
606 )
607 .await
608 {
609 Ok(Ok(response)) => {
610 let duration = start.elapsed();
611 agent.record_success(duration).await;
612
613 trace!(
614 correlation_id = %event.correlation_id,
615 agent_id = %agent.id(),
616 duration_ms = duration.as_millis(),
617 "WebSocket frame agent call succeeded"
618 );
619
620 if let Some(ref ws_decision) = response.websocket_decision {
622 if !matches!(
623 ws_decision,
624 sentinel_agent_protocol::WebSocketDecision::Allow
625 ) {
626 debug!(
627 correlation_id = %event.correlation_id,
628 agent_id = %agent.id(),
629 decision = ?ws_decision,
630 "Agent returned non-allow WebSocket decision"
631 );
632 return Ok(response);
633 }
634 }
635 }
636 Ok(Err(e)) => {
637 agent.record_failure().await;
638 error!(
639 agent_id = %agent.id(),
640 correlation_id = %event.correlation_id,
641 error = %e,
642 duration_ms = start.elapsed().as_millis(),
643 failure_mode = ?agent.failure_mode(),
644 "WebSocket frame agent call failed"
645 );
646
647 if agent.failure_mode() == FailureMode::Closed {
648 return Ok(AgentResponse::websocket_close(
649 1011,
650 "Agent error".to_string(),
651 ));
652 }
653 }
654 Err(_) => {
655 agent.record_timeout().await;
656 warn!(
657 agent_id = %agent.id(),
658 correlation_id = %event.correlation_id,
659 timeout_ms = agent.timeout_ms(),
660 failure_mode = ?agent.failure_mode(),
661 "WebSocket frame agent call timed out"
662 );
663
664 if agent.failure_mode() == FailureMode::Closed {
665 return Ok(AgentResponse::websocket_close(
666 1011,
667 "Gateway timeout".to_string(),
668 ));
669 }
670 }
671 }
672 }
673
674 Ok(AgentResponse::websocket_allow())
676 }
677
678 async fn process_event<T: serde::Serialize>(
680 &self,
681 event_type: EventType,
682 event: &T,
683 route_agents: &[String],
684 ctx: &AgentCallContext,
685 ) -> SentinelResult<AgentDecision> {
686 trace!(
687 correlation_id = %ctx.correlation_id,
688 event_type = ?event_type,
689 route_agents = ?route_agents,
690 "Starting agent event processing"
691 );
692
693 let agents = self.agents.read().await;
695 let relevant_agents: Vec<_> = route_agents
696 .iter()
697 .filter_map(|id| agents.get(id))
698 .filter(|agent| agent.handles_event(event_type))
699 .collect();
700
701 if relevant_agents.is_empty() {
702 trace!(
703 correlation_id = %ctx.correlation_id,
704 event_type = ?event_type,
705 "No relevant agents for event, allowing request"
706 );
707 return Ok(AgentDecision::default_allow());
708 }
709
710 debug!(
711 correlation_id = %ctx.correlation_id,
712 event_type = ?event_type,
713 agent_count = relevant_agents.len(),
714 agent_ids = ?relevant_agents.iter().map(|a| a.id()).collect::<Vec<_>>(),
715 "Processing event through agents"
716 );
717
718 let mut combined_decision = AgentDecision::default_allow();
720
721 for (agent_index, agent) in relevant_agents.iter().enumerate() {
722 trace!(
723 correlation_id = %ctx.correlation_id,
724 agent_id = %agent.id(),
725 agent_index = agent_index,
726 event_type = ?event_type,
727 "Processing event through agent"
728 );
729
730 let semaphores = self.agent_semaphores.read().await;
732 let agent_semaphore = semaphores.get(agent.id()).cloned();
733 drop(semaphores); let _permit = match agent_semaphore {
736 Some(semaphore) => {
737 trace!(
738 correlation_id = %ctx.correlation_id,
739 agent_id = %agent.id(),
740 "Acquiring per-agent semaphore permit"
741 );
742 Some(semaphore.acquire_owned().await.map_err(|_| {
743 error!(
744 correlation_id = %ctx.correlation_id,
745 agent_id = %agent.id(),
746 "Failed to acquire agent call semaphore permit"
747 );
748 SentinelError::Internal {
749 message: "Failed to acquire agent call permit".to_string(),
750 correlation_id: Some(ctx.correlation_id.to_string()),
751 source: None,
752 }
753 })?)
754 }
755 None => {
756 warn!(
758 correlation_id = %ctx.correlation_id,
759 agent_id = %agent.id(),
760 "No semaphore found for agent, proceeding without queue isolation"
761 );
762 None
763 }
764 };
765
766 if !agent.circuit_breaker().is_closed() {
768 warn!(
769 agent_id = %agent.id(),
770 correlation_id = %ctx.correlation_id,
771 failure_mode = ?agent.failure_mode(),
772 "Circuit breaker open, skipping agent"
773 );
774
775 if agent.failure_mode() == FailureMode::Closed {
777 debug!(
778 correlation_id = %ctx.correlation_id,
779 agent_id = %agent.id(),
780 "Blocking request due to circuit breaker (fail-closed mode)"
781 );
782 return Ok(AgentDecision::block(503, "Service unavailable"));
783 }
784 continue;
785 }
786
787 let start = Instant::now();
789 let timeout_duration = Duration::from_millis(agent.timeout_ms());
790
791 trace!(
792 correlation_id = %ctx.correlation_id,
793 agent_id = %agent.id(),
794 timeout_ms = agent.timeout_ms(),
795 "Calling agent"
796 );
797
798 match timeout(timeout_duration, agent.call_event(event_type, event)).await {
799 Ok(Ok(response)) => {
800 let duration = start.elapsed();
801 agent.record_success(duration).await;
802
803 trace!(
804 correlation_id = %ctx.correlation_id,
805 agent_id = %agent.id(),
806 duration_ms = duration.as_millis(),
807 decision = ?response,
808 "Agent call succeeded"
809 );
810
811 combined_decision.merge(response.into());
813
814 if !combined_decision.is_allow() {
816 debug!(
817 correlation_id = %ctx.correlation_id,
818 agent_id = %agent.id(),
819 decision = ?combined_decision,
820 "Agent returned blocking decision, stopping agent chain"
821 );
822 break;
823 }
824 }
825 Ok(Err(e)) => {
826 agent.record_failure().await;
827 error!(
828 agent_id = %agent.id(),
829 correlation_id = %ctx.correlation_id,
830 error = %e,
831 duration_ms = start.elapsed().as_millis(),
832 failure_mode = ?agent.failure_mode(),
833 "Agent call failed"
834 );
835
836 if agent.failure_mode() == FailureMode::Closed {
837 return Err(e);
838 }
839 }
840 Err(_) => {
841 agent.record_timeout().await;
842 warn!(
843 agent_id = %agent.id(),
844 correlation_id = %ctx.correlation_id,
845 timeout_ms = agent.timeout_ms(),
846 failure_mode = ?agent.failure_mode(),
847 "Agent call timed out"
848 );
849
850 if agent.failure_mode() == FailureMode::Closed {
851 debug!(
852 correlation_id = %ctx.correlation_id,
853 agent_id = %agent.id(),
854 "Blocking request due to timeout (fail-closed mode)"
855 );
856 return Ok(AgentDecision::block(504, "Gateway timeout"));
857 }
858 }
859 }
860 }
861
862 trace!(
863 correlation_id = %ctx.correlation_id,
864 decision = ?combined_decision,
865 agents_processed = relevant_agents.len(),
866 "Agent event processing completed"
867 );
868
869 Ok(combined_decision)
870 }
871
872 async fn process_event_with_failure_modes<T: serde::Serialize>(
877 &self,
878 event_type: EventType,
879 event: &T,
880 route_agents: &[(String, FailureMode)],
881 ctx: &AgentCallContext,
882 ) -> SentinelResult<AgentDecision> {
883 trace!(
884 correlation_id = %ctx.correlation_id,
885 event_type = ?event_type,
886 route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
887 "Starting agent event processing with failure modes"
888 );
889
890 let agents = self.agents.read().await;
892 let relevant_agents: Vec<_> = route_agents
893 .iter()
894 .filter_map(|(id, failure_mode)| {
895 agents.get(id).map(|agent| (agent, *failure_mode))
896 })
897 .filter(|(agent, _)| agent.handles_event(event_type))
898 .collect();
899
900 if relevant_agents.is_empty() {
901 trace!(
902 correlation_id = %ctx.correlation_id,
903 event_type = ?event_type,
904 "No relevant agents for event, allowing request"
905 );
906 return Ok(AgentDecision::default_allow());
907 }
908
909 debug!(
910 correlation_id = %ctx.correlation_id,
911 event_type = ?event_type,
912 agent_count = relevant_agents.len(),
913 agent_ids = ?relevant_agents.iter().map(|(a, _)| a.id()).collect::<Vec<_>>(),
914 "Processing event through agents"
915 );
916
917 let mut combined_decision = AgentDecision::default_allow();
919
920 for (agent_index, (agent, filter_failure_mode)) in relevant_agents.iter().enumerate() {
921 trace!(
922 correlation_id = %ctx.correlation_id,
923 agent_id = %agent.id(),
924 agent_index = agent_index,
925 event_type = ?event_type,
926 filter_failure_mode = ?filter_failure_mode,
927 "Processing event through agent with filter failure mode"
928 );
929
930 let semaphores = self.agent_semaphores.read().await;
932 let agent_semaphore = semaphores.get(agent.id()).cloned();
933 drop(semaphores); let _permit = if let Some(semaphore) = agent_semaphore {
936 trace!(
937 correlation_id = %ctx.correlation_id,
938 agent_id = %agent.id(),
939 "Acquiring per-agent semaphore permit"
940 );
941 Some(semaphore.acquire_owned().await.map_err(|_| {
942 error!(
943 correlation_id = %ctx.correlation_id,
944 agent_id = %agent.id(),
945 "Failed to acquire agent call semaphore permit"
946 );
947 SentinelError::Internal {
948 message: "Failed to acquire agent call permit".to_string(),
949 correlation_id: Some(ctx.correlation_id.to_string()),
950 source: None,
951 }
952 })?)
953 } else {
954 warn!(
956 correlation_id = %ctx.correlation_id,
957 agent_id = %agent.id(),
958 "No semaphore found for agent, proceeding without queue isolation"
959 );
960 None
961 };
962
963 if !agent.circuit_breaker().is_closed() {
965 warn!(
966 agent_id = %agent.id(),
967 correlation_id = %ctx.correlation_id,
968 filter_failure_mode = ?filter_failure_mode,
969 "Circuit breaker open, skipping agent"
970 );
971
972 if *filter_failure_mode == FailureMode::Closed {
974 debug!(
975 correlation_id = %ctx.correlation_id,
976 agent_id = %agent.id(),
977 "Blocking request due to circuit breaker (filter fail-closed mode)"
978 );
979 return Ok(AgentDecision::block(503, "Service unavailable"));
980 }
981 continue;
983 }
984
985 let start = Instant::now();
987 let timeout_duration = Duration::from_millis(agent.timeout_ms());
988
989 trace!(
990 correlation_id = %ctx.correlation_id,
991 agent_id = %agent.id(),
992 timeout_ms = agent.timeout_ms(),
993 "Calling agent"
994 );
995
996 match timeout(timeout_duration, agent.call_event(event_type, event)).await {
997 Ok(Ok(response)) => {
998 let duration = start.elapsed();
999 agent.record_success(duration).await;
1000
1001 trace!(
1002 correlation_id = %ctx.correlation_id,
1003 agent_id = %agent.id(),
1004 duration_ms = duration.as_millis(),
1005 decision = ?response,
1006 "Agent call succeeded"
1007 );
1008
1009 combined_decision.merge(response.into());
1011
1012 if !combined_decision.is_allow() {
1014 debug!(
1015 correlation_id = %ctx.correlation_id,
1016 agent_id = %agent.id(),
1017 decision = ?combined_decision,
1018 "Agent returned blocking decision, stopping agent chain"
1019 );
1020 break;
1021 }
1022 }
1023 Ok(Err(e)) => {
1024 agent.record_failure().await;
1025 error!(
1026 agent_id = %agent.id(),
1027 correlation_id = %ctx.correlation_id,
1028 error = %e,
1029 duration_ms = start.elapsed().as_millis(),
1030 filter_failure_mode = ?filter_failure_mode,
1031 "Agent call failed"
1032 );
1033
1034 if *filter_failure_mode == FailureMode::Closed {
1036 debug!(
1037 correlation_id = %ctx.correlation_id,
1038 agent_id = %agent.id(),
1039 "Blocking request due to agent failure (filter fail-closed mode)"
1040 );
1041 return Ok(AgentDecision::block(503, "Agent unavailable"));
1042 }
1043 debug!(
1045 correlation_id = %ctx.correlation_id,
1046 agent_id = %agent.id(),
1047 "Continuing despite agent failure (filter fail-open mode)"
1048 );
1049 }
1050 Err(_) => {
1051 agent.record_timeout().await;
1052 warn!(
1053 agent_id = %agent.id(),
1054 correlation_id = %ctx.correlation_id,
1055 timeout_ms = agent.timeout_ms(),
1056 filter_failure_mode = ?filter_failure_mode,
1057 "Agent call timed out"
1058 );
1059
1060 if *filter_failure_mode == FailureMode::Closed {
1062 debug!(
1063 correlation_id = %ctx.correlation_id,
1064 agent_id = %agent.id(),
1065 "Blocking request due to timeout (filter fail-closed mode)"
1066 );
1067 return Ok(AgentDecision::block(504, "Gateway timeout"));
1068 }
1069 debug!(
1071 correlation_id = %ctx.correlation_id,
1072 agent_id = %agent.id(),
1073 "Continuing despite timeout (filter fail-open mode)"
1074 );
1075 }
1076 }
1077 }
1078
1079 trace!(
1080 correlation_id = %ctx.correlation_id,
1081 decision = ?combined_decision,
1082 agents_processed = relevant_agents.len(),
1083 "Agent event processing with failure modes completed"
1084 );
1085
1086 Ok(combined_decision)
1087 }
1088
1089 async fn process_event_parallel<T: serde::Serialize + Sync>(
1104 &self,
1105 event_type: EventType,
1106 event: &T,
1107 route_agents: &[(String, FailureMode)],
1108 ctx: &AgentCallContext,
1109 ) -> SentinelResult<AgentDecision> {
1110 trace!(
1111 correlation_id = %ctx.correlation_id,
1112 event_type = ?event_type,
1113 route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
1114 "Starting parallel agent event processing"
1115 );
1116
1117 let agents = self.agents.read().await;
1119 let semaphores = self.agent_semaphores.read().await;
1120
1121 let agent_info: Vec<_> = route_agents
1123 .iter()
1124 .filter_map(|(id, failure_mode)| {
1125 let agent = agents.get(id)?;
1126 if !agent.handles_event(event_type) {
1127 return None;
1128 }
1129 let semaphore = semaphores.get(id).cloned();
1130 Some((Arc::clone(agent), *failure_mode, semaphore))
1131 })
1132 .collect();
1133
1134 drop(agents);
1136 drop(semaphores);
1137
1138 if agent_info.is_empty() {
1139 trace!(
1140 correlation_id = %ctx.correlation_id,
1141 event_type = ?event_type,
1142 "No relevant agents for event, allowing request"
1143 );
1144 return Ok(AgentDecision::default_allow());
1145 }
1146
1147 debug!(
1148 correlation_id = %ctx.correlation_id,
1149 event_type = ?event_type,
1150 agent_count = agent_info.len(),
1151 agent_ids = ?agent_info.iter().map(|(a, _, _)| a.id()).collect::<Vec<_>>(),
1152 "Processing event through agents in parallel"
1153 );
1154
1155 let futures: Vec<_> = agent_info
1157 .iter()
1158 .map(|(agent, filter_failure_mode, semaphore)| {
1159 let agent = Arc::clone(agent);
1160 let filter_failure_mode = *filter_failure_mode;
1161 let semaphore = semaphore.clone();
1162 let correlation_id = ctx.correlation_id.clone();
1163
1164 async move {
1165 let _permit = if let Some(sem) = semaphore {
1167 match sem.acquire_owned().await {
1168 Ok(permit) => Some(permit),
1169 Err(_) => {
1170 error!(
1171 correlation_id = %correlation_id,
1172 agent_id = %agent.id(),
1173 "Failed to acquire agent semaphore permit"
1174 );
1175 return Err((
1176 agent.id().to_string(),
1177 filter_failure_mode,
1178 "Failed to acquire permit".to_string(),
1179 ));
1180 }
1181 }
1182 } else {
1183 None
1184 };
1185
1186 if !agent.circuit_breaker().is_closed() {
1188 warn!(
1189 agent_id = %agent.id(),
1190 correlation_id = %correlation_id,
1191 filter_failure_mode = ?filter_failure_mode,
1192 "Circuit breaker open, skipping agent"
1193 );
1194 return Err((
1195 agent.id().to_string(),
1196 filter_failure_mode,
1197 "Circuit breaker open".to_string(),
1198 ));
1199 }
1200
1201 let start = Instant::now();
1203 let timeout_duration = Duration::from_millis(agent.timeout_ms());
1204
1205 match timeout(timeout_duration, agent.call_event(event_type, event)).await {
1206 Ok(Ok(response)) => {
1207 let duration = start.elapsed();
1208 agent.record_success(duration).await;
1209 trace!(
1210 correlation_id = %correlation_id,
1211 agent_id = %agent.id(),
1212 duration_ms = duration.as_millis(),
1213 "Parallel agent call succeeded"
1214 );
1215 Ok((agent.id().to_string(), response))
1216 }
1217 Ok(Err(e)) => {
1218 agent.record_failure().await;
1219 error!(
1220 agent_id = %agent.id(),
1221 correlation_id = %correlation_id,
1222 error = %e,
1223 duration_ms = start.elapsed().as_millis(),
1224 filter_failure_mode = ?filter_failure_mode,
1225 "Parallel agent call failed"
1226 );
1227 Err((
1228 agent.id().to_string(),
1229 filter_failure_mode,
1230 format!("Agent error: {}", e),
1231 ))
1232 }
1233 Err(_) => {
1234 agent.record_timeout().await;
1235 warn!(
1236 agent_id = %agent.id(),
1237 correlation_id = %correlation_id,
1238 timeout_ms = agent.timeout_ms(),
1239 filter_failure_mode = ?filter_failure_mode,
1240 "Parallel agent call timed out"
1241 );
1242 Err((
1243 agent.id().to_string(),
1244 filter_failure_mode,
1245 "Timeout".to_string(),
1246 ))
1247 }
1248 }
1249 }
1250 })
1251 .collect();
1252
1253 let results = join_all(futures).await;
1255
1256 let mut combined_decision = AgentDecision::default_allow();
1258 let mut blocking_error: Option<AgentDecision> = None;
1259
1260 for result in results {
1261 match result {
1262 Ok((agent_id, response)) => {
1263 let decision: AgentDecision = response.into();
1264
1265 if !decision.is_allow() {
1267 debug!(
1268 correlation_id = %ctx.correlation_id,
1269 agent_id = %agent_id,
1270 decision = ?decision,
1271 "Agent returned blocking decision"
1272 );
1273 return Ok(decision);
1275 }
1276
1277 combined_decision.merge(decision);
1278 }
1279 Err((agent_id, failure_mode, reason)) => {
1280 if failure_mode == FailureMode::Closed && blocking_error.is_none() {
1282 debug!(
1283 correlation_id = %ctx.correlation_id,
1284 agent_id = %agent_id,
1285 reason = %reason,
1286 "Agent failure in fail-closed mode"
1287 );
1288 let status = if reason.contains("Timeout") { 504 } else { 503 };
1291 let message = if reason.contains("Timeout") {
1292 "Gateway timeout"
1293 } else {
1294 "Service unavailable"
1295 };
1296 blocking_error = Some(AgentDecision::block(status, message));
1297 } else {
1298 debug!(
1300 correlation_id = %ctx.correlation_id,
1301 agent_id = %agent_id,
1302 reason = %reason,
1303 "Agent failure in fail-open mode, continuing"
1304 );
1305 }
1306 }
1307 }
1308 }
1309
1310 if let Some(error_decision) = blocking_error {
1312 return Ok(error_decision);
1313 }
1314
1315 trace!(
1316 correlation_id = %ctx.correlation_id,
1317 decision = ?combined_decision,
1318 agents_processed = agent_info.len(),
1319 "Parallel agent event processing completed"
1320 );
1321
1322 Ok(combined_decision)
1323 }
1324
1325 pub async fn initialize(&self) -> SentinelResult<()> {
1327 let agents = self.agents.read().await;
1328
1329 info!(agent_count = agents.len(), "Initializing agent connections");
1330
1331 let mut initialized_count = 0;
1332 let mut failed_count = 0;
1333
1334 for (id, agent) in agents.iter() {
1335 debug!(agent_id = %id, "Initializing agent connection");
1336 if let Err(e) = agent.initialize().await {
1337 error!(
1338 agent_id = %id,
1339 error = %e,
1340 "Failed to initialize agent"
1341 );
1342 failed_count += 1;
1343 } else {
1345 trace!(agent_id = %id, "Agent initialized successfully");
1346 initialized_count += 1;
1347 }
1348 }
1349
1350 info!(
1351 initialized = initialized_count,
1352 failed = failed_count,
1353 total = agents.len(),
1354 "Agent initialization complete"
1355 );
1356
1357 Ok(())
1358 }
1359
1360 pub async fn shutdown(&self) {
1362 let agents = self.agents.read().await;
1363
1364 info!(agent_count = agents.len(), "Shutting down agent manager");
1365
1366 for (id, agent) in agents.iter() {
1367 debug!(agent_id = %id, "Shutting down agent");
1368 agent.shutdown().await;
1369 trace!(agent_id = %id, "Agent shutdown complete");
1370 }
1371
1372 info!("Agent manager shutdown complete");
1373 }
1374
1375 pub fn metrics(&self) -> &AgentMetrics {
1377 &self.metrics
1378 }
1379
1380 pub fn get_agents_for_event(&self, event_type: EventType) -> Vec<String> {
1385 if let Ok(agents) = self.agents.try_read() {
1388 agents
1389 .values()
1390 .filter(|agent| agent.handles_event(event_type))
1391 .map(|agent| agent.id().to_string())
1392 .collect()
1393 } else {
1394 Vec::new()
1395 }
1396 }
1397
1398 pub async fn get_v2_pool_metrics(&self) -> Vec<(String, Arc<MetricsCollector>)> {
1404 let agents = self.agents.read().await;
1405 agents
1406 .iter()
1407 .filter_map(|(id, agent)| {
1408 if let UnifiedAgent::V2(v2_agent) = agent.as_ref() {
1409 Some((id.clone(), v2_agent.pool_metrics_collector_arc()))
1410 } else {
1411 None
1412 }
1413 })
1414 .collect()
1415 }
1416
1417 pub async fn export_v2_pool_metrics(&self) -> String {
1421 let agents = self.agents.read().await;
1422 let mut output = String::new();
1423
1424 for (id, agent) in agents.iter() {
1425 if let UnifiedAgent::V2(v2_agent) = agent.as_ref() {
1426 let pool_metrics = v2_agent.export_prometheus();
1427 if !pool_metrics.is_empty() {
1428 output.push_str(&format!("\n# Agent pool metrics: {}\n", id));
1429 output.push_str(&pool_metrics);
1430 }
1431 }
1432 }
1433
1434 output
1435 }
1436
1437 pub async fn get_v2_metrics_collector(&self, agent_id: &str) -> Option<Arc<MetricsCollector>> {
1441 let agents = self.agents.read().await;
1442 if let Some(agent) = agents.get(agent_id) {
1443 if let UnifiedAgent::V2(v2_agent) = agent.as_ref() {
1444 Some(v2_agent.pool_metrics_collector_arc())
1445 } else {
1446 None
1447 }
1448 } else {
1449 None
1450 }
1451 }
1452}