1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use base64::{engine::general_purpose::STANDARD, Engine as _};
8use futures::future::join_all;
9use pingora_timeout::timeout;
10use sentinel_agent_protocol::{
11 v2::MetricsCollector, AgentResponse, EventType, RequestBodyChunkEvent, RequestHeadersEvent,
12 ResponseBodyChunkEvent, ResponseHeadersEvent, WebSocketFrameEvent,
13};
14use sentinel_common::{
15 errors::{SentinelError, SentinelResult},
16 types::CircuitBreakerConfig,
17 CircuitBreaker,
18};
19use sentinel_config::{AgentConfig, AgentProtocolVersion, FailureMode};
20use tokio::sync::{RwLock, Semaphore};
21use tracing::{debug, error, info, trace, warn};
22
23use super::agent::Agent;
24use super::agent_v2::AgentV2;
25use super::context::AgentCallContext;
26use super::decision::AgentDecision;
27use super::metrics::AgentMetrics;
28use super::pool::AgentConnectionPool;
29
30pub enum UnifiedAgent {
32 V1(Arc<Agent>),
33 V2(Arc<AgentV2>),
34}
35
36impl UnifiedAgent {
37 pub fn id(&self) -> &str {
39 match self {
40 UnifiedAgent::V1(agent) => agent.id(),
41 UnifiedAgent::V2(agent) => agent.id(),
42 }
43 }
44
45 pub fn circuit_breaker(&self) -> &CircuitBreaker {
47 match self {
48 UnifiedAgent::V1(agent) => agent.circuit_breaker(),
49 UnifiedAgent::V2(agent) => agent.circuit_breaker(),
50 }
51 }
52
53 pub fn failure_mode(&self) -> FailureMode {
55 match self {
56 UnifiedAgent::V1(agent) => agent.failure_mode(),
57 UnifiedAgent::V2(agent) => agent.failure_mode(),
58 }
59 }
60
61 pub fn timeout_ms(&self) -> u64 {
63 match self {
64 UnifiedAgent::V1(agent) => agent.timeout_ms(),
65 UnifiedAgent::V2(agent) => agent.timeout_ms(),
66 }
67 }
68
69 pub fn handles_event(&self, event_type: EventType) -> bool {
71 match self {
72 UnifiedAgent::V1(agent) => agent.handles_event(event_type),
73 UnifiedAgent::V2(agent) => agent.handles_event(event_type),
74 }
75 }
76
77 pub async fn initialize(&self) -> SentinelResult<()> {
79 match self {
80 UnifiedAgent::V1(agent) => agent.initialize().await,
81 UnifiedAgent::V2(agent) => agent.initialize().await,
82 }
83 }
84
85 pub async fn call_event<T: serde::Serialize>(
91 &self,
92 event_type: EventType,
93 event: &T,
94 ) -> SentinelResult<AgentResponse> {
95 match self {
96 UnifiedAgent::V1(agent) => agent.call_event(event_type, event).await,
97 UnifiedAgent::V2(agent) => {
98 let json = serde_json::to_value(event).map_err(|e| SentinelError::Agent {
100 agent: agent.id().to_string(),
101 message: format!("Failed to serialize event: {}", e),
102 event: format!("{:?}", event_type),
103 source: None,
104 })?;
105
106 match event_type {
107 EventType::RequestHeaders => {
108 let typed_event: RequestHeadersEvent = serde_json::from_value(json)
109 .map_err(|e| SentinelError::Agent {
110 agent: agent.id().to_string(),
111 message: format!(
112 "Failed to deserialize RequestHeadersEvent: {}",
113 e
114 ),
115 event: format!("{:?}", event_type),
116 source: None,
117 })?;
118 agent.call_request_headers(&typed_event).await
119 }
120 EventType::RequestBodyChunk => {
121 let typed_event: RequestBodyChunkEvent = serde_json::from_value(json)
122 .map_err(|e| SentinelError::Agent {
123 agent: agent.id().to_string(),
124 message: format!(
125 "Failed to deserialize RequestBodyChunkEvent: {}",
126 e
127 ),
128 event: format!("{:?}", event_type),
129 source: None,
130 })?;
131 agent.call_request_body_chunk(&typed_event).await
132 }
133 EventType::ResponseHeaders => {
134 let typed_event: ResponseHeadersEvent = serde_json::from_value(json)
135 .map_err(|e| SentinelError::Agent {
136 agent: agent.id().to_string(),
137 message: format!(
138 "Failed to deserialize ResponseHeadersEvent: {}",
139 e
140 ),
141 event: format!("{:?}", event_type),
142 source: None,
143 })?;
144 agent.call_response_headers(&typed_event).await
145 }
146 EventType::ResponseBodyChunk => {
147 let typed_event: ResponseBodyChunkEvent = serde_json::from_value(json)
148 .map_err(|e| SentinelError::Agent {
149 agent: agent.id().to_string(),
150 message: format!(
151 "Failed to deserialize ResponseBodyChunkEvent: {}",
152 e
153 ),
154 event: format!("{:?}", event_type),
155 source: None,
156 })?;
157 agent.call_response_body_chunk(&typed_event).await
158 }
159 _ => {
160 Err(SentinelError::Agent {
162 agent: agent.id().to_string(),
163 message: format!("V2 does not support event type {:?}", event_type),
164 event: format!("{:?}", event_type),
165 source: None,
166 })
167 }
168 }
169 }
170 }
171 }
172
173 pub async fn call_request_headers(
175 &self,
176 event: &RequestHeadersEvent,
177 ) -> SentinelResult<AgentResponse> {
178 match self {
179 UnifiedAgent::V1(agent) => agent.call_event(EventType::RequestHeaders, event).await,
180 UnifiedAgent::V2(agent) => agent.call_request_headers(event).await,
181 }
182 }
183
184 pub async fn call_request_body_chunk(
186 &self,
187 event: &RequestBodyChunkEvent,
188 ) -> SentinelResult<AgentResponse> {
189 match self {
190 UnifiedAgent::V1(agent) => agent.call_event(EventType::RequestBodyChunk, event).await,
191 UnifiedAgent::V2(agent) => agent.call_request_body_chunk(event).await,
192 }
193 }
194
195 pub async fn call_response_headers(
197 &self,
198 event: &ResponseHeadersEvent,
199 ) -> SentinelResult<AgentResponse> {
200 match self {
201 UnifiedAgent::V1(agent) => agent.call_event(EventType::ResponseHeaders, event).await,
202 UnifiedAgent::V2(agent) => agent.call_response_headers(event).await,
203 }
204 }
205
206 pub async fn call_response_body_chunk(
208 &self,
209 event: &ResponseBodyChunkEvent,
210 ) -> SentinelResult<AgentResponse> {
211 match self {
212 UnifiedAgent::V1(agent) => agent.call_event(EventType::ResponseBodyChunk, event).await,
213 UnifiedAgent::V2(agent) => agent.call_response_body_chunk(event).await,
214 }
215 }
216
217 pub async fn record_success(&self, duration: Duration) {
219 match self {
220 UnifiedAgent::V1(agent) => agent.record_success(duration).await,
221 UnifiedAgent::V2(agent) => agent.record_success(duration),
222 }
223 }
224
225 pub async fn record_failure(&self) {
227 match self {
228 UnifiedAgent::V1(agent) => agent.record_failure().await,
229 UnifiedAgent::V2(agent) => agent.record_failure(),
230 }
231 }
232
233 pub async fn record_timeout(&self) {
235 match self {
236 UnifiedAgent::V1(agent) => agent.record_timeout().await,
237 UnifiedAgent::V2(agent) => agent.record_timeout(),
238 }
239 }
240
241 pub async fn shutdown(&self) {
243 match self {
244 UnifiedAgent::V1(agent) => agent.shutdown().await,
245 UnifiedAgent::V2(agent) => agent.shutdown().await,
246 }
247 }
248
249 pub fn is_v2(&self) -> bool {
251 matches!(self, UnifiedAgent::V2(_))
252 }
253}
254
255pub struct AgentManager {
261 agents: Arc<RwLock<HashMap<String, Arc<UnifiedAgent>>>>,
263 connection_pools: Arc<RwLock<HashMap<String, Arc<AgentConnectionPool>>>>,
265 circuit_breakers: Arc<RwLock<HashMap<String, Arc<CircuitBreaker>>>>,
267 metrics: Arc<AgentMetrics>,
269 agent_semaphores: Arc<RwLock<HashMap<String, Arc<Semaphore>>>>,
271}
272
273impl AgentManager {
274 pub async fn new(agents: Vec<AgentConfig>) -> SentinelResult<Self> {
282 info!(agent_count = agents.len(), "Creating agent manager");
283
284 let mut agent_map = HashMap::new();
285 let mut pools = HashMap::new();
286 let mut breakers = HashMap::new();
287 let mut semaphores = HashMap::new();
288
289 let mut v1_count = 0;
290 let mut v2_count = 0;
291
292 for config in agents {
293 debug!(
294 agent_id = %config.id,
295 transport = ?config.transport,
296 timeout_ms = config.timeout_ms,
297 failure_mode = ?config.failure_mode,
298 max_concurrent_calls = config.max_concurrent_calls,
299 protocol_version = ?config.protocol_version,
300 "Configuring agent"
301 );
302
303 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_calls));
305
306 let unified_agent = match config.protocol_version {
307 AgentProtocolVersion::V1 => {
308 let pool = Arc::new(AgentConnectionPool::new(
310 10, 2, 5, Duration::from_secs(60),
314 ));
315
316 let circuit_breaker = Arc::new(CircuitBreaker::new(
317 config
318 .circuit_breaker
319 .clone()
320 .unwrap_or_else(CircuitBreakerConfig::default),
321 ));
322
323 trace!(
324 agent_id = %config.id,
325 max_concurrent_calls = config.max_concurrent_calls,
326 "Creating v1 agent instance with isolated queue"
327 );
328
329 let agent = Arc::new(Agent::new(
330 config.clone(),
331 Arc::clone(&pool),
332 Arc::clone(&circuit_breaker),
333 ));
334
335 pools.insert(config.id.clone(), pool);
336 breakers.insert(config.id.clone(), circuit_breaker);
337 v1_count += 1;
338
339 Arc::new(UnifiedAgent::V1(agent))
340 }
341 AgentProtocolVersion::V2 => {
342 let circuit_breaker = Arc::new(CircuitBreaker::new(
344 config
345 .circuit_breaker
346 .clone()
347 .unwrap_or_else(CircuitBreakerConfig::default),
348 ));
349
350 trace!(
351 agent_id = %config.id,
352 max_concurrent_calls = config.max_concurrent_calls,
353 pool_config = ?config.pool,
354 "Creating v2 agent instance with internal pool"
355 );
356
357 let agent = Arc::new(AgentV2::new(config.clone(), circuit_breaker));
358
359 v2_count += 1;
360
361 Arc::new(UnifiedAgent::V2(agent))
362 }
363 };
364
365 agent_map.insert(config.id.clone(), unified_agent);
366 semaphores.insert(config.id.clone(), semaphore);
367
368 debug!(
369 agent_id = %config.id,
370 protocol_version = ?config.protocol_version,
371 "Agent configured successfully"
372 );
373 }
374
375 info!(
376 configured_agents = agent_map.len(),
377 v1_agents = v1_count,
378 v2_agents = v2_count,
379 "Agent manager created successfully with per-agent queue isolation"
380 );
381
382 Ok(Self {
383 agents: Arc::new(RwLock::new(agent_map)),
384 connection_pools: Arc::new(RwLock::new(pools)),
385 circuit_breakers: Arc::new(RwLock::new(breakers)),
386 metrics: Arc::new(AgentMetrics::default()),
387 agent_semaphores: Arc::new(RwLock::new(semaphores)),
388 })
389 }
390
391 pub async fn process_request_headers(
398 &self,
399 ctx: &AgentCallContext,
400 mut headers: HashMap<String, Vec<String>>,
401 route_agents: &[(String, FailureMode)],
402 ) -> SentinelResult<AgentDecision> {
403 let method = headers
404 .remove(":method")
405 .and_then(|mut v| {
406 if v.is_empty() {
407 None
408 } else {
409 Some(v.swap_remove(0))
410 }
411 })
412 .unwrap_or_else(|| "GET".to_string());
413 let uri = headers
414 .remove(":path")
415 .and_then(|mut v| {
416 if v.is_empty() {
417 None
418 } else {
419 Some(v.swap_remove(0))
420 }
421 })
422 .unwrap_or_else(|| "/".to_string());
423 let event = RequestHeadersEvent {
424 metadata: ctx.metadata.clone(),
425 method,
426 uri,
427 headers,
428 };
429
430 self.process_event_parallel(EventType::RequestHeaders, &event, route_agents, ctx)
432 .await
433 }
434
435 pub async fn process_request_body(
437 &self,
438 ctx: &AgentCallContext,
439 data: &[u8],
440 is_last: bool,
441 route_agents: &[String],
442 ) -> SentinelResult<AgentDecision> {
443 let max_size = 1024 * 1024; if data.len() > max_size {
446 warn!(
447 correlation_id = %ctx.correlation_id,
448 size = data.len(),
449 "Request body exceeds agent inspection limit"
450 );
451 return Ok(AgentDecision::default_allow());
452 }
453
454 let event = RequestBodyChunkEvent {
455 correlation_id: ctx.correlation_id.to_string(),
456 data: STANDARD.encode(data),
457 is_last,
458 total_size: ctx.request_body.as_ref().map(|b| b.len()),
459 chunk_index: 0, bytes_received: data.len(),
461 };
462
463 self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
464 .await
465 }
466
467 pub async fn process_request_body_streaming(
472 &self,
473 ctx: &AgentCallContext,
474 data: &[u8],
475 is_last: bool,
476 chunk_index: u32,
477 bytes_received: usize,
478 total_size: Option<usize>,
479 route_agents: &[String],
480 ) -> SentinelResult<AgentDecision> {
481 trace!(
482 correlation_id = %ctx.correlation_id,
483 chunk_index = chunk_index,
484 chunk_size = data.len(),
485 bytes_received = bytes_received,
486 is_last = is_last,
487 "Processing streaming request body chunk"
488 );
489
490 let event = RequestBodyChunkEvent {
491 correlation_id: ctx.correlation_id.to_string(),
492 data: STANDARD.encode(data),
493 is_last,
494 total_size,
495 chunk_index,
496 bytes_received,
497 };
498
499 self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
500 .await
501 }
502
503 pub async fn process_response_body_streaming(
505 &self,
506 ctx: &AgentCallContext,
507 data: &[u8],
508 is_last: bool,
509 chunk_index: u32,
510 bytes_sent: usize,
511 total_size: Option<usize>,
512 route_agents: &[String],
513 ) -> SentinelResult<AgentDecision> {
514 trace!(
515 correlation_id = %ctx.correlation_id,
516 chunk_index = chunk_index,
517 chunk_size = data.len(),
518 bytes_sent = bytes_sent,
519 is_last = is_last,
520 "Processing streaming response body chunk"
521 );
522
523 let event = ResponseBodyChunkEvent {
524 correlation_id: ctx.correlation_id.to_string(),
525 data: STANDARD.encode(data),
526 is_last,
527 total_size,
528 chunk_index,
529 bytes_sent,
530 };
531
532 self.process_event(EventType::ResponseBodyChunk, &event, route_agents, ctx)
533 .await
534 }
535
536 pub async fn process_response_headers(
538 &self,
539 ctx: &AgentCallContext,
540 status: u16,
541 headers: &HashMap<String, Vec<String>>,
542 route_agents: &[String],
543 ) -> SentinelResult<AgentDecision> {
544 let event = ResponseHeadersEvent {
545 correlation_id: ctx.correlation_id.to_string(),
546 status,
547 headers: headers.clone(),
548 };
549
550 self.process_event(EventType::ResponseHeaders, &event, route_agents, ctx)
551 .await
552 }
553
554 pub async fn process_websocket_frame(
560 &self,
561 route_id: &str,
562 event: WebSocketFrameEvent,
563 ) -> SentinelResult<AgentResponse> {
564 trace!(
565 correlation_id = %event.correlation_id,
566 route_id = %route_id,
567 frame_index = event.frame_index,
568 opcode = %event.opcode,
569 "Processing WebSocket frame through agents"
570 );
571
572 let agents = self.agents.read().await;
574 let relevant_agents: Vec<_> = agents
575 .values()
576 .filter(|agent| agent.handles_event(EventType::WebSocketFrame))
577 .collect();
578
579 if relevant_agents.is_empty() {
580 trace!(
581 correlation_id = %event.correlation_id,
582 "No agents handle WebSocket frames, allowing"
583 );
584 return Ok(AgentResponse::websocket_allow());
585 }
586
587 debug!(
588 correlation_id = %event.correlation_id,
589 route_id = %route_id,
590 agent_count = relevant_agents.len(),
591 "Processing WebSocket frame through agents"
592 );
593
594 for agent in relevant_agents {
596 if !agent.circuit_breaker().is_closed() {
598 warn!(
599 agent_id = %agent.id(),
600 correlation_id = %event.correlation_id,
601 failure_mode = ?agent.failure_mode(),
602 "Circuit breaker open, skipping agent for WebSocket frame"
603 );
604
605 if agent.failure_mode() == FailureMode::Closed {
606 debug!(
607 correlation_id = %event.correlation_id,
608 agent_id = %agent.id(),
609 "Closing WebSocket due to circuit breaker (fail-closed mode)"
610 );
611 return Ok(AgentResponse::websocket_close(
612 1011,
613 "Service unavailable".to_string(),
614 ));
615 }
616 continue;
617 }
618
619 let start = Instant::now();
621 let timeout_duration = Duration::from_millis(agent.timeout_ms());
622
623 match timeout(
624 timeout_duration,
625 agent.call_event(EventType::WebSocketFrame, &event),
626 )
627 .await
628 {
629 Ok(Ok(response)) => {
630 let duration = start.elapsed();
631 agent.record_success(duration).await;
632
633 trace!(
634 correlation_id = %event.correlation_id,
635 agent_id = %agent.id(),
636 duration_ms = duration.as_millis(),
637 "WebSocket frame agent call succeeded"
638 );
639
640 if let Some(ref ws_decision) = response.websocket_decision {
642 if !matches!(
643 ws_decision,
644 sentinel_agent_protocol::WebSocketDecision::Allow
645 ) {
646 debug!(
647 correlation_id = %event.correlation_id,
648 agent_id = %agent.id(),
649 decision = ?ws_decision,
650 "Agent returned non-allow WebSocket decision"
651 );
652 return Ok(response);
653 }
654 }
655 }
656 Ok(Err(e)) => {
657 agent.record_failure().await;
658 error!(
659 agent_id = %agent.id(),
660 correlation_id = %event.correlation_id,
661 error = %e,
662 duration_ms = start.elapsed().as_millis(),
663 failure_mode = ?agent.failure_mode(),
664 "WebSocket frame agent call failed"
665 );
666
667 if agent.failure_mode() == FailureMode::Closed {
668 return Ok(AgentResponse::websocket_close(
669 1011,
670 "Agent error".to_string(),
671 ));
672 }
673 }
674 Err(_) => {
675 agent.record_timeout().await;
676 warn!(
677 agent_id = %agent.id(),
678 correlation_id = %event.correlation_id,
679 timeout_ms = agent.timeout_ms(),
680 failure_mode = ?agent.failure_mode(),
681 "WebSocket frame agent call timed out"
682 );
683
684 if agent.failure_mode() == FailureMode::Closed {
685 return Ok(AgentResponse::websocket_close(
686 1011,
687 "Gateway timeout".to_string(),
688 ));
689 }
690 }
691 }
692 }
693
694 Ok(AgentResponse::websocket_allow())
696 }
697
698 async fn process_event<T: serde::Serialize>(
700 &self,
701 event_type: EventType,
702 event: &T,
703 route_agents: &[String],
704 ctx: &AgentCallContext,
705 ) -> SentinelResult<AgentDecision> {
706 trace!(
707 correlation_id = %ctx.correlation_id,
708 event_type = ?event_type,
709 route_agents = ?route_agents,
710 "Starting agent event processing"
711 );
712
713 let agents = self.agents.read().await;
715 let relevant_agents: Vec<_> = route_agents
716 .iter()
717 .filter_map(|id| agents.get(id))
718 .filter(|agent| agent.handles_event(event_type))
719 .collect();
720
721 if relevant_agents.is_empty() {
722 trace!(
723 correlation_id = %ctx.correlation_id,
724 event_type = ?event_type,
725 "No relevant agents for event, allowing request"
726 );
727 return Ok(AgentDecision::default_allow());
728 }
729
730 debug!(
731 correlation_id = %ctx.correlation_id,
732 event_type = ?event_type,
733 agent_count = relevant_agents.len(),
734 agent_ids = ?relevant_agents.iter().map(|a| a.id()).collect::<Vec<_>>(),
735 "Processing event through agents"
736 );
737
738 let mut combined_decision = AgentDecision::default_allow();
740
741 for (agent_index, agent) in relevant_agents.iter().enumerate() {
742 trace!(
743 correlation_id = %ctx.correlation_id,
744 agent_id = %agent.id(),
745 agent_index = agent_index,
746 event_type = ?event_type,
747 "Processing event through agent"
748 );
749
750 let semaphores = self.agent_semaphores.read().await;
752 let agent_semaphore = semaphores.get(agent.id()).cloned();
753 drop(semaphores); let _permit = match agent_semaphore {
756 Some(semaphore) => {
757 trace!(
758 correlation_id = %ctx.correlation_id,
759 agent_id = %agent.id(),
760 "Acquiring per-agent semaphore permit"
761 );
762 Some(semaphore.acquire_owned().await.map_err(|_| {
763 error!(
764 correlation_id = %ctx.correlation_id,
765 agent_id = %agent.id(),
766 "Failed to acquire agent call semaphore permit"
767 );
768 SentinelError::Internal {
769 message: "Failed to acquire agent call permit".to_string(),
770 correlation_id: Some(ctx.correlation_id.to_string()),
771 source: None,
772 }
773 })?)
774 }
775 None => {
776 warn!(
778 correlation_id = %ctx.correlation_id,
779 agent_id = %agent.id(),
780 "No semaphore found for agent, proceeding without queue isolation"
781 );
782 None
783 }
784 };
785
786 if !agent.circuit_breaker().is_closed() {
788 warn!(
789 agent_id = %agent.id(),
790 correlation_id = %ctx.correlation_id,
791 failure_mode = ?agent.failure_mode(),
792 "Circuit breaker open, skipping agent"
793 );
794
795 if agent.failure_mode() == FailureMode::Closed {
797 debug!(
798 correlation_id = %ctx.correlation_id,
799 agent_id = %agent.id(),
800 "Blocking request due to circuit breaker (fail-closed mode)"
801 );
802 return Ok(AgentDecision::block(503, "Service unavailable"));
803 }
804 continue;
805 }
806
807 let start = Instant::now();
809 let timeout_duration = Duration::from_millis(agent.timeout_ms());
810
811 trace!(
812 correlation_id = %ctx.correlation_id,
813 agent_id = %agent.id(),
814 timeout_ms = agent.timeout_ms(),
815 "Calling agent"
816 );
817
818 match timeout(timeout_duration, agent.call_event(event_type, event)).await {
819 Ok(Ok(response)) => {
820 let duration = start.elapsed();
821 agent.record_success(duration).await;
822
823 trace!(
824 correlation_id = %ctx.correlation_id,
825 agent_id = %agent.id(),
826 duration_ms = duration.as_millis(),
827 decision = ?response,
828 "Agent call succeeded"
829 );
830
831 combined_decision.merge(response.into());
833
834 if !combined_decision.is_allow() {
836 debug!(
837 correlation_id = %ctx.correlation_id,
838 agent_id = %agent.id(),
839 decision = ?combined_decision,
840 "Agent returned blocking decision, stopping agent chain"
841 );
842 break;
843 }
844 }
845 Ok(Err(e)) => {
846 agent.record_failure().await;
847 error!(
848 agent_id = %agent.id(),
849 correlation_id = %ctx.correlation_id,
850 error = %e,
851 duration_ms = start.elapsed().as_millis(),
852 failure_mode = ?agent.failure_mode(),
853 "Agent call failed"
854 );
855
856 if agent.failure_mode() == FailureMode::Closed {
857 return Err(e);
858 }
859 }
860 Err(_) => {
861 agent.record_timeout().await;
862 warn!(
863 agent_id = %agent.id(),
864 correlation_id = %ctx.correlation_id,
865 timeout_ms = agent.timeout_ms(),
866 failure_mode = ?agent.failure_mode(),
867 "Agent call timed out"
868 );
869
870 if agent.failure_mode() == FailureMode::Closed {
871 debug!(
872 correlation_id = %ctx.correlation_id,
873 agent_id = %agent.id(),
874 "Blocking request due to timeout (fail-closed mode)"
875 );
876 return Ok(AgentDecision::block(504, "Gateway timeout"));
877 }
878 }
879 }
880 }
881
882 trace!(
883 correlation_id = %ctx.correlation_id,
884 decision = ?combined_decision,
885 agents_processed = relevant_agents.len(),
886 "Agent event processing completed"
887 );
888
889 Ok(combined_decision)
890 }
891
892 async fn process_event_with_failure_modes<T: serde::Serialize>(
897 &self,
898 event_type: EventType,
899 event: &T,
900 route_agents: &[(String, FailureMode)],
901 ctx: &AgentCallContext,
902 ) -> SentinelResult<AgentDecision> {
903 trace!(
904 correlation_id = %ctx.correlation_id,
905 event_type = ?event_type,
906 route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
907 "Starting agent event processing with failure modes"
908 );
909
910 let agents = self.agents.read().await;
912 let relevant_agents: Vec<_> = route_agents
913 .iter()
914 .filter_map(|(id, failure_mode)| agents.get(id).map(|agent| (agent, *failure_mode)))
915 .filter(|(agent, _)| agent.handles_event(event_type))
916 .collect();
917
918 if relevant_agents.is_empty() {
919 trace!(
920 correlation_id = %ctx.correlation_id,
921 event_type = ?event_type,
922 "No relevant agents for event, allowing request"
923 );
924 return Ok(AgentDecision::default_allow());
925 }
926
927 debug!(
928 correlation_id = %ctx.correlation_id,
929 event_type = ?event_type,
930 agent_count = relevant_agents.len(),
931 agent_ids = ?relevant_agents.iter().map(|(a, _)| a.id()).collect::<Vec<_>>(),
932 "Processing event through agents"
933 );
934
935 let mut combined_decision = AgentDecision::default_allow();
937
938 for (agent_index, (agent, filter_failure_mode)) in relevant_agents.iter().enumerate() {
939 trace!(
940 correlation_id = %ctx.correlation_id,
941 agent_id = %agent.id(),
942 agent_index = agent_index,
943 event_type = ?event_type,
944 filter_failure_mode = ?filter_failure_mode,
945 "Processing event through agent with filter failure mode"
946 );
947
948 let semaphores = self.agent_semaphores.read().await;
950 let agent_semaphore = semaphores.get(agent.id()).cloned();
951 drop(semaphores); let _permit = if let Some(semaphore) = agent_semaphore {
954 trace!(
955 correlation_id = %ctx.correlation_id,
956 agent_id = %agent.id(),
957 "Acquiring per-agent semaphore permit"
958 );
959 Some(semaphore.acquire_owned().await.map_err(|_| {
960 error!(
961 correlation_id = %ctx.correlation_id,
962 agent_id = %agent.id(),
963 "Failed to acquire agent call semaphore permit"
964 );
965 SentinelError::Internal {
966 message: "Failed to acquire agent call permit".to_string(),
967 correlation_id: Some(ctx.correlation_id.to_string()),
968 source: None,
969 }
970 })?)
971 } else {
972 warn!(
974 correlation_id = %ctx.correlation_id,
975 agent_id = %agent.id(),
976 "No semaphore found for agent, proceeding without queue isolation"
977 );
978 None
979 };
980
981 if !agent.circuit_breaker().is_closed() {
983 warn!(
984 agent_id = %agent.id(),
985 correlation_id = %ctx.correlation_id,
986 filter_failure_mode = ?filter_failure_mode,
987 "Circuit breaker open, skipping agent"
988 );
989
990 if *filter_failure_mode == FailureMode::Closed {
992 debug!(
993 correlation_id = %ctx.correlation_id,
994 agent_id = %agent.id(),
995 "Blocking request due to circuit breaker (filter fail-closed mode)"
996 );
997 return Ok(AgentDecision::block(503, "Service unavailable"));
998 }
999 continue;
1001 }
1002
1003 let start = Instant::now();
1005 let timeout_duration = Duration::from_millis(agent.timeout_ms());
1006
1007 trace!(
1008 correlation_id = %ctx.correlation_id,
1009 agent_id = %agent.id(),
1010 timeout_ms = agent.timeout_ms(),
1011 "Calling agent"
1012 );
1013
1014 match timeout(timeout_duration, agent.call_event(event_type, event)).await {
1015 Ok(Ok(response)) => {
1016 let duration = start.elapsed();
1017 agent.record_success(duration).await;
1018
1019 trace!(
1020 correlation_id = %ctx.correlation_id,
1021 agent_id = %agent.id(),
1022 duration_ms = duration.as_millis(),
1023 decision = ?response,
1024 "Agent call succeeded"
1025 );
1026
1027 combined_decision.merge(response.into());
1029
1030 if !combined_decision.is_allow() {
1032 debug!(
1033 correlation_id = %ctx.correlation_id,
1034 agent_id = %agent.id(),
1035 decision = ?combined_decision,
1036 "Agent returned blocking decision, stopping agent chain"
1037 );
1038 break;
1039 }
1040 }
1041 Ok(Err(e)) => {
1042 agent.record_failure().await;
1043 error!(
1044 agent_id = %agent.id(),
1045 correlation_id = %ctx.correlation_id,
1046 error = %e,
1047 duration_ms = start.elapsed().as_millis(),
1048 filter_failure_mode = ?filter_failure_mode,
1049 "Agent call failed"
1050 );
1051
1052 if *filter_failure_mode == FailureMode::Closed {
1054 debug!(
1055 correlation_id = %ctx.correlation_id,
1056 agent_id = %agent.id(),
1057 "Blocking request due to agent failure (filter fail-closed mode)"
1058 );
1059 return Ok(AgentDecision::block(503, "Agent unavailable"));
1060 }
1061 debug!(
1063 correlation_id = %ctx.correlation_id,
1064 agent_id = %agent.id(),
1065 "Continuing despite agent failure (filter fail-open mode)"
1066 );
1067 }
1068 Err(_) => {
1069 agent.record_timeout().await;
1070 warn!(
1071 agent_id = %agent.id(),
1072 correlation_id = %ctx.correlation_id,
1073 timeout_ms = agent.timeout_ms(),
1074 filter_failure_mode = ?filter_failure_mode,
1075 "Agent call timed out"
1076 );
1077
1078 if *filter_failure_mode == FailureMode::Closed {
1080 debug!(
1081 correlation_id = %ctx.correlation_id,
1082 agent_id = %agent.id(),
1083 "Blocking request due to timeout (filter fail-closed mode)"
1084 );
1085 return Ok(AgentDecision::block(504, "Gateway timeout"));
1086 }
1087 debug!(
1089 correlation_id = %ctx.correlation_id,
1090 agent_id = %agent.id(),
1091 "Continuing despite timeout (filter fail-open mode)"
1092 );
1093 }
1094 }
1095 }
1096
1097 trace!(
1098 correlation_id = %ctx.correlation_id,
1099 decision = ?combined_decision,
1100 agents_processed = relevant_agents.len(),
1101 "Agent event processing with failure modes completed"
1102 );
1103
1104 Ok(combined_decision)
1105 }
1106
1107 async fn process_event_parallel<T: serde::Serialize + Sync>(
1122 &self,
1123 event_type: EventType,
1124 event: &T,
1125 route_agents: &[(String, FailureMode)],
1126 ctx: &AgentCallContext,
1127 ) -> SentinelResult<AgentDecision> {
1128 trace!(
1129 correlation_id = %ctx.correlation_id,
1130 event_type = ?event_type,
1131 route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
1132 "Starting parallel agent event processing"
1133 );
1134
1135 let agents = self.agents.read().await;
1137 let semaphores = self.agent_semaphores.read().await;
1138
1139 let agent_info: Vec<_> = route_agents
1141 .iter()
1142 .filter_map(|(id, failure_mode)| {
1143 let agent = agents.get(id)?;
1144 if !agent.handles_event(event_type) {
1145 return None;
1146 }
1147 let semaphore = semaphores.get(id).cloned();
1148 Some((Arc::clone(agent), *failure_mode, semaphore))
1149 })
1150 .collect();
1151
1152 drop(agents);
1154 drop(semaphores);
1155
1156 if agent_info.is_empty() {
1157 trace!(
1158 correlation_id = %ctx.correlation_id,
1159 event_type = ?event_type,
1160 "No relevant agents for event, allowing request"
1161 );
1162 return Ok(AgentDecision::default_allow());
1163 }
1164
1165 debug!(
1166 correlation_id = %ctx.correlation_id,
1167 event_type = ?event_type,
1168 agent_count = agent_info.len(),
1169 agent_ids = ?agent_info.iter().map(|(a, _, _)| a.id()).collect::<Vec<_>>(),
1170 "Processing event through agents in parallel"
1171 );
1172
1173 let futures: Vec<_> = agent_info
1175 .iter()
1176 .map(|(agent, filter_failure_mode, semaphore)| {
1177 let agent = Arc::clone(agent);
1178 let filter_failure_mode = *filter_failure_mode;
1179 let semaphore = semaphore.clone();
1180 let correlation_id = ctx.correlation_id.clone();
1181
1182 async move {
1183 let _permit = if let Some(sem) = semaphore {
1185 match sem.acquire_owned().await {
1186 Ok(permit) => Some(permit),
1187 Err(_) => {
1188 error!(
1189 correlation_id = %correlation_id,
1190 agent_id = %agent.id(),
1191 "Failed to acquire agent semaphore permit"
1192 );
1193 return Err((
1194 agent.id().to_string(),
1195 filter_failure_mode,
1196 "Failed to acquire permit".to_string(),
1197 ));
1198 }
1199 }
1200 } else {
1201 None
1202 };
1203
1204 if !agent.circuit_breaker().is_closed() {
1206 warn!(
1207 agent_id = %agent.id(),
1208 correlation_id = %correlation_id,
1209 filter_failure_mode = ?filter_failure_mode,
1210 "Circuit breaker open, skipping agent"
1211 );
1212 return Err((
1213 agent.id().to_string(),
1214 filter_failure_mode,
1215 "Circuit breaker open".to_string(),
1216 ));
1217 }
1218
1219 let start = Instant::now();
1221 let timeout_duration = Duration::from_millis(agent.timeout_ms());
1222
1223 match timeout(timeout_duration, agent.call_event(event_type, event)).await {
1224 Ok(Ok(response)) => {
1225 let duration = start.elapsed();
1226 agent.record_success(duration).await;
1227 trace!(
1228 correlation_id = %correlation_id,
1229 agent_id = %agent.id(),
1230 duration_ms = duration.as_millis(),
1231 "Parallel agent call succeeded"
1232 );
1233 Ok((agent.id().to_string(), response))
1234 }
1235 Ok(Err(e)) => {
1236 agent.record_failure().await;
1237 error!(
1238 agent_id = %agent.id(),
1239 correlation_id = %correlation_id,
1240 error = %e,
1241 duration_ms = start.elapsed().as_millis(),
1242 filter_failure_mode = ?filter_failure_mode,
1243 "Parallel agent call failed"
1244 );
1245 Err((
1246 agent.id().to_string(),
1247 filter_failure_mode,
1248 format!("Agent error: {}", e),
1249 ))
1250 }
1251 Err(_) => {
1252 agent.record_timeout().await;
1253 warn!(
1254 agent_id = %agent.id(),
1255 correlation_id = %correlation_id,
1256 timeout_ms = agent.timeout_ms(),
1257 filter_failure_mode = ?filter_failure_mode,
1258 "Parallel agent call timed out"
1259 );
1260 Err((
1261 agent.id().to_string(),
1262 filter_failure_mode,
1263 "Timeout".to_string(),
1264 ))
1265 }
1266 }
1267 }
1268 })
1269 .collect();
1270
1271 let results = join_all(futures).await;
1273
1274 let mut combined_decision = AgentDecision::default_allow();
1276 let mut blocking_error: Option<AgentDecision> = None;
1277
1278 for result in results {
1279 match result {
1280 Ok((agent_id, response)) => {
1281 let decision: AgentDecision = response.into();
1282
1283 if !decision.is_allow() {
1285 debug!(
1286 correlation_id = %ctx.correlation_id,
1287 agent_id = %agent_id,
1288 decision = ?decision,
1289 "Agent returned blocking decision"
1290 );
1291 return Ok(decision);
1293 }
1294
1295 combined_decision.merge(decision);
1296 }
1297 Err((agent_id, failure_mode, reason)) => {
1298 if failure_mode == FailureMode::Closed && blocking_error.is_none() {
1300 debug!(
1301 correlation_id = %ctx.correlation_id,
1302 agent_id = %agent_id,
1303 reason = %reason,
1304 "Agent failure in fail-closed mode"
1305 );
1306 let status = if reason.contains("Timeout") { 504 } else { 503 };
1309 let message = if reason.contains("Timeout") {
1310 "Gateway timeout"
1311 } else {
1312 "Service unavailable"
1313 };
1314 blocking_error = Some(AgentDecision::block(status, message));
1315 } else {
1316 debug!(
1318 correlation_id = %ctx.correlation_id,
1319 agent_id = %agent_id,
1320 reason = %reason,
1321 "Agent failure in fail-open mode, continuing"
1322 );
1323 }
1324 }
1325 }
1326 }
1327
1328 if let Some(error_decision) = blocking_error {
1330 return Ok(error_decision);
1331 }
1332
1333 trace!(
1334 correlation_id = %ctx.correlation_id,
1335 decision = ?combined_decision,
1336 agents_processed = agent_info.len(),
1337 "Parallel agent event processing completed"
1338 );
1339
1340 Ok(combined_decision)
1341 }
1342
1343 pub async fn initialize(&self) -> SentinelResult<()> {
1345 let agents = self.agents.read().await;
1346
1347 info!(agent_count = agents.len(), "Initializing agent connections");
1348
1349 let mut initialized_count = 0;
1350 let mut failed_count = 0;
1351
1352 for (id, agent) in agents.iter() {
1353 debug!(agent_id = %id, "Initializing agent connection");
1354 if let Err(e) = agent.initialize().await {
1355 error!(
1356 agent_id = %id,
1357 error = %e,
1358 "Failed to initialize agent"
1359 );
1360 failed_count += 1;
1361 } else {
1363 trace!(agent_id = %id, "Agent initialized successfully");
1364 initialized_count += 1;
1365 }
1366 }
1367
1368 info!(
1369 initialized = initialized_count,
1370 failed = failed_count,
1371 total = agents.len(),
1372 "Agent initialization complete"
1373 );
1374
1375 Ok(())
1376 }
1377
1378 pub async fn shutdown(&self) {
1380 let agents = self.agents.read().await;
1381
1382 info!(agent_count = agents.len(), "Shutting down agent manager");
1383
1384 for (id, agent) in agents.iter() {
1385 debug!(agent_id = %id, "Shutting down agent");
1386 agent.shutdown().await;
1387 trace!(agent_id = %id, "Agent shutdown complete");
1388 }
1389
1390 info!("Agent manager shutdown complete");
1391 }
1392
1393 pub fn metrics(&self) -> &AgentMetrics {
1395 &self.metrics
1396 }
1397
1398 pub fn get_agents_for_event(&self, event_type: EventType) -> Vec<String> {
1403 if let Ok(agents) = self.agents.try_read() {
1406 agents
1407 .values()
1408 .filter(|agent| agent.handles_event(event_type))
1409 .map(|agent| agent.id().to_string())
1410 .collect()
1411 } else {
1412 Vec::new()
1413 }
1414 }
1415
1416 pub async fn get_v2_pool_metrics(&self) -> Vec<(String, Arc<MetricsCollector>)> {
1422 let agents = self.agents.read().await;
1423 agents
1424 .iter()
1425 .filter_map(|(id, agent)| {
1426 if let UnifiedAgent::V2(v2_agent) = agent.as_ref() {
1427 Some((id.clone(), v2_agent.pool_metrics_collector_arc()))
1428 } else {
1429 None
1430 }
1431 })
1432 .collect()
1433 }
1434
1435 pub async fn export_v2_pool_metrics(&self) -> String {
1439 let agents = self.agents.read().await;
1440 let mut output = String::new();
1441
1442 for (id, agent) in agents.iter() {
1443 if let UnifiedAgent::V2(v2_agent) = agent.as_ref() {
1444 let pool_metrics = v2_agent.export_prometheus();
1445 if !pool_metrics.is_empty() {
1446 output.push_str(&format!("\n# Agent pool metrics: {}\n", id));
1447 output.push_str(&pool_metrics);
1448 }
1449 }
1450 }
1451
1452 output
1453 }
1454
1455 pub async fn get_v2_metrics_collector(&self, agent_id: &str) -> Option<Arc<MetricsCollector>> {
1459 let agents = self.agents.read().await;
1460 if let Some(agent) = agents.get(agent_id) {
1461 if let UnifiedAgent::V2(v2_agent) = agent.as_ref() {
1462 Some(v2_agent.pool_metrics_collector_arc())
1463 } else {
1464 None
1465 }
1466 } else {
1467 None
1468 }
1469 }
1470}