1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use base64::{engine::general_purpose::STANDARD, Engine as _};
8use futures::future::join_all;
9use pingora_timeout::timeout;
10use tokio::sync::{RwLock, Semaphore};
11use tracing::{debug, error, info, trace, warn};
12use grapsus_agent_protocol::{
13 v2::MetricsCollector, AgentResponse, EventType, GuardrailInspectEvent, RequestBodyChunkEvent,
14 RequestHeadersEvent, ResponseBodyChunkEvent, ResponseHeadersEvent, WebSocketFrameEvent,
15};
16use grapsus_common::{
17 errors::{GrapsusError, GrapsusResult},
18 types::CircuitBreakerConfig,
19 CircuitBreaker,
20};
21use grapsus_config::{AgentConfig, FailureMode};
22
23use super::agent_v2::AgentV2;
24use super::context::AgentCallContext;
25use super::decision::AgentDecision;
26use super::metrics::AgentMetrics;
27
28pub struct AgentManager {
33 agents: Arc<RwLock<HashMap<String, Arc<AgentV2>>>>,
35 circuit_breakers: Arc<RwLock<HashMap<String, Arc<CircuitBreaker>>>>,
37 metrics: Arc<AgentMetrics>,
39 agent_semaphores: Arc<RwLock<HashMap<String, Arc<Semaphore>>>>,
41}
42
43impl AgentManager {
44 pub async fn new(agents: Vec<AgentConfig>) -> GrapsusResult<Self> {
50 info!(agent_count = agents.len(), "Creating agent manager");
51
52 let mut agent_map = HashMap::new();
53 let breakers = HashMap::new();
54 let mut semaphores = HashMap::new();
55
56 for config in agents {
57 debug!(
58 agent_id = %config.id,
59 transport = ?config.transport,
60 timeout_ms = config.timeout_ms,
61 failure_mode = ?config.failure_mode,
62 max_concurrent_calls = config.max_concurrent_calls,
63 "Configuring agent"
64 );
65
66 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_calls));
68
69 let circuit_breaker = Arc::new(CircuitBreaker::new(
70 config
71 .circuit_breaker
72 .clone()
73 .unwrap_or_else(CircuitBreakerConfig::default),
74 ));
75
76 trace!(
77 agent_id = %config.id,
78 max_concurrent_calls = config.max_concurrent_calls,
79 pool_config = ?config.pool,
80 "Creating agent instance with internal pool"
81 );
82
83 let agent = Arc::new(AgentV2::new(config.clone(), circuit_breaker));
84
85 agent_map.insert(config.id.clone(), agent);
86 semaphores.insert(config.id.clone(), semaphore);
87
88 debug!(
89 agent_id = %config.id,
90 "Agent configured successfully"
91 );
92 }
93
94 info!(
95 configured_agents = agent_map.len(),
96 "Agent manager created successfully with per-agent queue isolation"
97 );
98
99 Ok(Self {
100 agents: Arc::new(RwLock::new(agent_map)),
101 circuit_breakers: Arc::new(RwLock::new(breakers)),
102 metrics: Arc::new(AgentMetrics::default()),
103 agent_semaphores: Arc::new(RwLock::new(semaphores)),
104 })
105 }
106
107 pub async fn any_agent_handles_event(
109 &self,
110 route_agents: &[String],
111 event_type: EventType,
112 ) -> bool {
113 let agents = self.agents.read().await;
114 route_agents
115 .iter()
116 .filter_map(|id| agents.get(id))
117 .any(|agent| agent.handles_event(event_type))
118 }
119
120 pub async fn process_request_headers(
127 &self,
128 ctx: &AgentCallContext,
129 mut headers: HashMap<String, Vec<String>>,
130 route_agents: &[(String, FailureMode)],
131 ) -> GrapsusResult<AgentDecision> {
132 let method = headers
133 .remove(":method")
134 .and_then(|mut v| {
135 if v.is_empty() {
136 None
137 } else {
138 Some(v.swap_remove(0))
139 }
140 })
141 .unwrap_or_else(|| "GET".to_string());
142 let uri = headers
143 .remove(":path")
144 .and_then(|mut v| {
145 if v.is_empty() {
146 None
147 } else {
148 Some(v.swap_remove(0))
149 }
150 })
151 .unwrap_or_else(|| "/".to_string());
152 let event = RequestHeadersEvent {
153 metadata: ctx.metadata.clone(),
154 method,
155 uri,
156 headers,
157 };
158
159 self.process_event_parallel(EventType::RequestHeaders, &event, route_agents, ctx)
161 .await
162 }
163
164 pub async fn process_request_body(
166 &self,
167 ctx: &AgentCallContext,
168 data: &[u8],
169 is_last: bool,
170 route_agents: &[String],
171 ) -> GrapsusResult<AgentDecision> {
172 let max_size = 1024 * 1024; if data.len() > max_size {
175 warn!(
176 correlation_id = %ctx.correlation_id,
177 size = data.len(),
178 "Request body exceeds agent inspection limit"
179 );
180 return Ok(AgentDecision::default_allow());
181 }
182
183 let event = RequestBodyChunkEvent {
184 correlation_id: ctx.correlation_id.to_string(),
185 data: STANDARD.encode(data),
186 is_last,
187 total_size: ctx.request_body.as_ref().map(|b| b.len()),
188 chunk_index: 0, bytes_received: data.len(),
190 };
191
192 self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
193 .await
194 }
195
196 pub async fn process_request_body_streaming(
201 &self,
202 ctx: &AgentCallContext,
203 data: &[u8],
204 is_last: bool,
205 chunk_index: u32,
206 bytes_received: usize,
207 total_size: Option<usize>,
208 route_agents: &[String],
209 ) -> GrapsusResult<AgentDecision> {
210 trace!(
211 correlation_id = %ctx.correlation_id,
212 chunk_index = chunk_index,
213 chunk_size = data.len(),
214 bytes_received = bytes_received,
215 is_last = is_last,
216 "Processing streaming request body chunk"
217 );
218
219 let event = RequestBodyChunkEvent {
220 correlation_id: ctx.correlation_id.to_string(),
221 data: STANDARD.encode(data),
222 is_last,
223 total_size,
224 chunk_index,
225 bytes_received,
226 };
227
228 self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
229 .await
230 }
231
232 pub async fn process_response_body_streaming(
234 &self,
235 ctx: &AgentCallContext,
236 data: &[u8],
237 is_last: bool,
238 chunk_index: u32,
239 bytes_sent: usize,
240 total_size: Option<usize>,
241 route_agents: &[String],
242 ) -> GrapsusResult<AgentDecision> {
243 trace!(
244 correlation_id = %ctx.correlation_id,
245 chunk_index = chunk_index,
246 chunk_size = data.len(),
247 bytes_sent = bytes_sent,
248 is_last = is_last,
249 "Processing streaming response body chunk"
250 );
251
252 let event = ResponseBodyChunkEvent {
253 correlation_id: ctx.correlation_id.to_string(),
254 data: STANDARD.encode(data),
255 is_last,
256 total_size,
257 chunk_index,
258 bytes_sent,
259 };
260
261 self.process_event(EventType::ResponseBodyChunk, &event, route_agents, ctx)
262 .await
263 }
264
265 pub async fn process_response_headers(
267 &self,
268 ctx: &AgentCallContext,
269 status: u16,
270 headers: &HashMap<String, Vec<String>>,
271 route_agents: &[String],
272 ) -> GrapsusResult<AgentDecision> {
273 let event = ResponseHeadersEvent {
274 correlation_id: ctx.correlation_id.to_string(),
275 status,
276 headers: headers.clone(),
277 };
278
279 self.process_event(EventType::ResponseHeaders, &event, route_agents, ctx)
280 .await
281 }
282
283 pub async fn process_websocket_frame(
289 &self,
290 route_id: &str,
291 event: WebSocketFrameEvent,
292 ) -> GrapsusResult<AgentResponse> {
293 trace!(
294 correlation_id = %event.correlation_id,
295 route_id = %route_id,
296 frame_index = event.frame_index,
297 opcode = %event.opcode,
298 "Processing WebSocket frame through agents"
299 );
300
301 let agents = self.agents.read().await;
303 let relevant_agents: Vec<_> = agents
304 .values()
305 .filter(|agent| agent.handles_event(EventType::WebSocketFrame))
306 .collect();
307
308 if relevant_agents.is_empty() {
309 trace!(
310 correlation_id = %event.correlation_id,
311 "No agents handle WebSocket frames, allowing"
312 );
313 return Ok(AgentResponse::websocket_allow());
314 }
315
316 debug!(
317 correlation_id = %event.correlation_id,
318 route_id = %route_id,
319 agent_count = relevant_agents.len(),
320 "Processing WebSocket frame through agents"
321 );
322
323 for agent in relevant_agents {
325 if !agent.circuit_breaker().is_closed() {
327 warn!(
328 agent_id = %agent.id(),
329 correlation_id = %event.correlation_id,
330 failure_mode = ?agent.failure_mode(),
331 "Circuit breaker open, skipping agent for WebSocket frame"
332 );
333
334 if agent.failure_mode() == FailureMode::Closed {
335 debug!(
336 correlation_id = %event.correlation_id,
337 agent_id = %agent.id(),
338 "Closing WebSocket due to circuit breaker (fail-closed mode)"
339 );
340 return Ok(AgentResponse::websocket_close(
341 1011,
342 "Service unavailable".to_string(),
343 ));
344 }
345 continue;
346 }
347
348 let start = Instant::now();
350 let timeout_duration = Duration::from_millis(agent.timeout_ms());
351
352 match timeout(
353 timeout_duration,
354 agent.call_event(EventType::WebSocketFrame, &event),
355 )
356 .await
357 {
358 Ok(Ok(response)) => {
359 let duration = start.elapsed();
360 agent.record_success(duration);
361
362 trace!(
363 correlation_id = %event.correlation_id,
364 agent_id = %agent.id(),
365 duration_ms = duration.as_millis(),
366 "WebSocket frame agent call succeeded"
367 );
368
369 if let Some(ref ws_decision) = response.websocket_decision {
371 if !matches!(
372 ws_decision,
373 grapsus_agent_protocol::WebSocketDecision::Allow
374 ) {
375 debug!(
376 correlation_id = %event.correlation_id,
377 agent_id = %agent.id(),
378 decision = ?ws_decision,
379 "Agent returned non-allow WebSocket decision"
380 );
381 return Ok(response);
382 }
383 }
384 }
385 Ok(Err(e)) => {
386 agent.record_failure();
387 error!(
388 agent_id = %agent.id(),
389 correlation_id = %event.correlation_id,
390 error = %e,
391 duration_ms = start.elapsed().as_millis(),
392 failure_mode = ?agent.failure_mode(),
393 "WebSocket frame agent call failed"
394 );
395
396 if agent.failure_mode() == FailureMode::Closed {
397 return Ok(AgentResponse::websocket_close(
398 1011,
399 "Agent error".to_string(),
400 ));
401 }
402 }
403 Err(_) => {
404 agent.record_timeout();
405 warn!(
406 agent_id = %agent.id(),
407 correlation_id = %event.correlation_id,
408 timeout_ms = agent.timeout_ms(),
409 failure_mode = ?agent.failure_mode(),
410 "WebSocket frame agent call timed out"
411 );
412
413 if agent.failure_mode() == FailureMode::Closed {
414 return Ok(AgentResponse::websocket_close(
415 1011,
416 "Gateway timeout".to_string(),
417 ));
418 }
419 }
420 }
421 }
422
423 Ok(AgentResponse::websocket_allow())
425 }
426
427 async fn process_event<T: serde::Serialize>(
429 &self,
430 event_type: EventType,
431 event: &T,
432 route_agents: &[String],
433 ctx: &AgentCallContext,
434 ) -> GrapsusResult<AgentDecision> {
435 trace!(
436 correlation_id = %ctx.correlation_id,
437 event_type = ?event_type,
438 route_agents = ?route_agents,
439 "Starting agent event processing"
440 );
441
442 let agents = self.agents.read().await;
444 let relevant_agents: Vec<_> = route_agents
445 .iter()
446 .filter_map(|id| agents.get(id))
447 .filter(|agent| agent.handles_event(event_type))
448 .collect();
449
450 if relevant_agents.is_empty() {
451 trace!(
452 correlation_id = %ctx.correlation_id,
453 event_type = ?event_type,
454 "No relevant agents for event, allowing request"
455 );
456 return Ok(AgentDecision::default_allow());
457 }
458
459 debug!(
460 correlation_id = %ctx.correlation_id,
461 event_type = ?event_type,
462 agent_count = relevant_agents.len(),
463 agent_ids = ?relevant_agents.iter().map(|a| a.id()).collect::<Vec<_>>(),
464 "Processing event through agents"
465 );
466
467 let mut combined_decision = AgentDecision::default_allow();
469
470 for (agent_index, agent) in relevant_agents.iter().enumerate() {
471 trace!(
472 correlation_id = %ctx.correlation_id,
473 agent_id = %agent.id(),
474 agent_index = agent_index,
475 event_type = ?event_type,
476 "Processing event through agent"
477 );
478
479 let semaphores = self.agent_semaphores.read().await;
481 let agent_semaphore = semaphores.get(agent.id()).cloned();
482 drop(semaphores); let _permit = match agent_semaphore {
485 Some(semaphore) => {
486 trace!(
487 correlation_id = %ctx.correlation_id,
488 agent_id = %agent.id(),
489 "Acquiring per-agent semaphore permit"
490 );
491 Some(semaphore.acquire_owned().await.map_err(|_| {
492 error!(
493 correlation_id = %ctx.correlation_id,
494 agent_id = %agent.id(),
495 "Failed to acquire agent call semaphore permit"
496 );
497 GrapsusError::Internal {
498 message: "Failed to acquire agent call permit".to_string(),
499 correlation_id: Some(ctx.correlation_id.to_string()),
500 source: None,
501 }
502 })?)
503 }
504 None => {
505 warn!(
507 correlation_id = %ctx.correlation_id,
508 agent_id = %agent.id(),
509 "No semaphore found for agent, proceeding without queue isolation"
510 );
511 None
512 }
513 };
514
515 if !agent.circuit_breaker().is_closed() {
517 warn!(
518 agent_id = %agent.id(),
519 correlation_id = %ctx.correlation_id,
520 failure_mode = ?agent.failure_mode(),
521 "Circuit breaker open, skipping agent"
522 );
523
524 if agent.failure_mode() == FailureMode::Closed {
526 debug!(
527 correlation_id = %ctx.correlation_id,
528 agent_id = %agent.id(),
529 "Blocking request due to circuit breaker (fail-closed mode)"
530 );
531 return Ok(AgentDecision::block(503, "Service unavailable"));
532 }
533 continue;
534 }
535
536 let start = Instant::now();
538 let timeout_duration = Duration::from_millis(agent.timeout_ms());
539
540 trace!(
541 correlation_id = %ctx.correlation_id,
542 agent_id = %agent.id(),
543 timeout_ms = agent.timeout_ms(),
544 "Calling agent"
545 );
546
547 match timeout(timeout_duration, agent.call_event(event_type, event)).await {
548 Ok(Ok(response)) => {
549 let duration = start.elapsed();
550 agent.record_success(duration);
551
552 trace!(
553 correlation_id = %ctx.correlation_id,
554 agent_id = %agent.id(),
555 duration_ms = duration.as_millis(),
556 decision = ?response,
557 "Agent call succeeded"
558 );
559
560 combined_decision.merge(response.into());
562
563 if !combined_decision.is_allow() {
565 debug!(
566 correlation_id = %ctx.correlation_id,
567 agent_id = %agent.id(),
568 decision = ?combined_decision,
569 "Agent returned blocking decision, stopping agent chain"
570 );
571 break;
572 }
573 }
574 Ok(Err(e)) => {
575 agent.record_failure();
576 error!(
577 agent_id = %agent.id(),
578 correlation_id = %ctx.correlation_id,
579 error = %e,
580 duration_ms = start.elapsed().as_millis(),
581 failure_mode = ?agent.failure_mode(),
582 "Agent call failed"
583 );
584
585 if agent.failure_mode() == FailureMode::Closed {
586 return Err(e);
587 }
588 }
589 Err(_) => {
590 agent.record_timeout();
591 warn!(
592 agent_id = %agent.id(),
593 correlation_id = %ctx.correlation_id,
594 timeout_ms = agent.timeout_ms(),
595 failure_mode = ?agent.failure_mode(),
596 "Agent call timed out"
597 );
598
599 if agent.failure_mode() == FailureMode::Closed {
600 debug!(
601 correlation_id = %ctx.correlation_id,
602 agent_id = %agent.id(),
603 "Blocking request due to timeout (fail-closed mode)"
604 );
605 return Ok(AgentDecision::block(504, "Gateway timeout"));
606 }
607 }
608 }
609 }
610
611 trace!(
612 correlation_id = %ctx.correlation_id,
613 decision = ?combined_decision,
614 agents_processed = relevant_agents.len(),
615 "Agent event processing completed"
616 );
617
618 Ok(combined_decision)
619 }
620
621 async fn process_event_with_failure_modes<T: serde::Serialize>(
626 &self,
627 event_type: EventType,
628 event: &T,
629 route_agents: &[(String, FailureMode)],
630 ctx: &AgentCallContext,
631 ) -> GrapsusResult<AgentDecision> {
632 trace!(
633 correlation_id = %ctx.correlation_id,
634 event_type = ?event_type,
635 route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
636 "Starting agent event processing with failure modes"
637 );
638
639 let agents = self.agents.read().await;
641 let relevant_agents: Vec<_> = route_agents
642 .iter()
643 .filter_map(|(id, failure_mode)| agents.get(id).map(|agent| (agent, *failure_mode)))
644 .filter(|(agent, _)| agent.handles_event(event_type))
645 .collect();
646
647 if relevant_agents.is_empty() {
648 trace!(
649 correlation_id = %ctx.correlation_id,
650 event_type = ?event_type,
651 "No relevant agents for event, allowing request"
652 );
653 return Ok(AgentDecision::default_allow());
654 }
655
656 debug!(
657 correlation_id = %ctx.correlation_id,
658 event_type = ?event_type,
659 agent_count = relevant_agents.len(),
660 agent_ids = ?relevant_agents.iter().map(|(a, _)| a.id()).collect::<Vec<_>>(),
661 "Processing event through agents"
662 );
663
664 let mut combined_decision = AgentDecision::default_allow();
666
667 for (agent_index, (agent, filter_failure_mode)) in relevant_agents.iter().enumerate() {
668 trace!(
669 correlation_id = %ctx.correlation_id,
670 agent_id = %agent.id(),
671 agent_index = agent_index,
672 event_type = ?event_type,
673 filter_failure_mode = ?filter_failure_mode,
674 "Processing event through agent with filter failure mode"
675 );
676
677 let semaphores = self.agent_semaphores.read().await;
679 let agent_semaphore = semaphores.get(agent.id()).cloned();
680 drop(semaphores); let _permit = if let Some(semaphore) = agent_semaphore {
683 trace!(
684 correlation_id = %ctx.correlation_id,
685 agent_id = %agent.id(),
686 "Acquiring per-agent semaphore permit"
687 );
688 Some(semaphore.acquire_owned().await.map_err(|_| {
689 error!(
690 correlation_id = %ctx.correlation_id,
691 agent_id = %agent.id(),
692 "Failed to acquire agent call semaphore permit"
693 );
694 GrapsusError::Internal {
695 message: "Failed to acquire agent call permit".to_string(),
696 correlation_id: Some(ctx.correlation_id.to_string()),
697 source: None,
698 }
699 })?)
700 } else {
701 warn!(
703 correlation_id = %ctx.correlation_id,
704 agent_id = %agent.id(),
705 "No semaphore found for agent, proceeding without queue isolation"
706 );
707 None
708 };
709
710 if !agent.circuit_breaker().is_closed() {
712 warn!(
713 agent_id = %agent.id(),
714 correlation_id = %ctx.correlation_id,
715 filter_failure_mode = ?filter_failure_mode,
716 "Circuit breaker open, skipping agent"
717 );
718
719 if *filter_failure_mode == FailureMode::Closed {
721 debug!(
722 correlation_id = %ctx.correlation_id,
723 agent_id = %agent.id(),
724 "Blocking request due to circuit breaker (filter fail-closed mode)"
725 );
726 return Ok(AgentDecision::block(503, "Service unavailable"));
727 }
728 continue;
730 }
731
732 let start = Instant::now();
734 let timeout_duration = Duration::from_millis(agent.timeout_ms());
735
736 trace!(
737 correlation_id = %ctx.correlation_id,
738 agent_id = %agent.id(),
739 timeout_ms = agent.timeout_ms(),
740 "Calling agent"
741 );
742
743 match timeout(timeout_duration, agent.call_event(event_type, event)).await {
744 Ok(Ok(response)) => {
745 let duration = start.elapsed();
746 agent.record_success(duration);
747
748 trace!(
749 correlation_id = %ctx.correlation_id,
750 agent_id = %agent.id(),
751 duration_ms = duration.as_millis(),
752 decision = ?response,
753 "Agent call succeeded"
754 );
755
756 combined_decision.merge(response.into());
758
759 if !combined_decision.is_allow() {
761 debug!(
762 correlation_id = %ctx.correlation_id,
763 agent_id = %agent.id(),
764 decision = ?combined_decision,
765 "Agent returned blocking decision, stopping agent chain"
766 );
767 break;
768 }
769 }
770 Ok(Err(e)) => {
771 agent.record_failure();
772 error!(
773 agent_id = %agent.id(),
774 correlation_id = %ctx.correlation_id,
775 error = %e,
776 duration_ms = start.elapsed().as_millis(),
777 filter_failure_mode = ?filter_failure_mode,
778 "Agent call failed"
779 );
780
781 if *filter_failure_mode == FailureMode::Closed {
783 debug!(
784 correlation_id = %ctx.correlation_id,
785 agent_id = %agent.id(),
786 "Blocking request due to agent failure (filter fail-closed mode)"
787 );
788 return Ok(AgentDecision::block(503, "Agent unavailable"));
789 }
790 debug!(
792 correlation_id = %ctx.correlation_id,
793 agent_id = %agent.id(),
794 "Continuing despite agent failure (filter fail-open mode)"
795 );
796 }
797 Err(_) => {
798 agent.record_timeout();
799 warn!(
800 agent_id = %agent.id(),
801 correlation_id = %ctx.correlation_id,
802 timeout_ms = agent.timeout_ms(),
803 filter_failure_mode = ?filter_failure_mode,
804 "Agent call timed out"
805 );
806
807 if *filter_failure_mode == FailureMode::Closed {
809 debug!(
810 correlation_id = %ctx.correlation_id,
811 agent_id = %agent.id(),
812 "Blocking request due to timeout (filter fail-closed mode)"
813 );
814 return Ok(AgentDecision::block(504, "Gateway timeout"));
815 }
816 debug!(
818 correlation_id = %ctx.correlation_id,
819 agent_id = %agent.id(),
820 "Continuing despite timeout (filter fail-open mode)"
821 );
822 }
823 }
824 }
825
826 trace!(
827 correlation_id = %ctx.correlation_id,
828 decision = ?combined_decision,
829 agents_processed = relevant_agents.len(),
830 "Agent event processing with failure modes completed"
831 );
832
833 Ok(combined_decision)
834 }
835
836 async fn process_event_parallel<T: serde::Serialize + Sync>(
851 &self,
852 event_type: EventType,
853 event: &T,
854 route_agents: &[(String, FailureMode)],
855 ctx: &AgentCallContext,
856 ) -> GrapsusResult<AgentDecision> {
857 trace!(
858 correlation_id = %ctx.correlation_id,
859 event_type = ?event_type,
860 route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
861 "Starting parallel agent event processing"
862 );
863
864 let agents = self.agents.read().await;
866 let semaphores = self.agent_semaphores.read().await;
867
868 let agent_info: Vec<_> = route_agents
870 .iter()
871 .filter_map(|(id, failure_mode)| {
872 let agent = agents.get(id)?;
873 if !agent.handles_event(event_type) {
874 return None;
875 }
876 let semaphore = semaphores.get(id).cloned();
877 Some((Arc::clone(agent), *failure_mode, semaphore))
878 })
879 .collect();
880
881 drop(agents);
883 drop(semaphores);
884
885 if agent_info.is_empty() {
886 trace!(
887 correlation_id = %ctx.correlation_id,
888 event_type = ?event_type,
889 "No relevant agents for event, allowing request"
890 );
891 return Ok(AgentDecision::default_allow());
892 }
893
894 debug!(
895 correlation_id = %ctx.correlation_id,
896 event_type = ?event_type,
897 agent_count = agent_info.len(),
898 agent_ids = ?agent_info.iter().map(|(a, _, _)| a.id()).collect::<Vec<_>>(),
899 "Processing event through agents in parallel"
900 );
901
902 let futures: Vec<_> = agent_info
904 .iter()
905 .map(|(agent, filter_failure_mode, semaphore)| {
906 let agent = Arc::clone(agent);
907 let filter_failure_mode = *filter_failure_mode;
908 let semaphore = semaphore.clone();
909 let correlation_id = ctx.correlation_id.clone();
910
911 async move {
912 let _permit = if let Some(sem) = semaphore {
914 match sem.acquire_owned().await {
915 Ok(permit) => Some(permit),
916 Err(_) => {
917 error!(
918 correlation_id = %correlation_id,
919 agent_id = %agent.id(),
920 "Failed to acquire agent semaphore permit"
921 );
922 return Err((
923 agent.id().to_string(),
924 filter_failure_mode,
925 "Failed to acquire permit".to_string(),
926 ));
927 }
928 }
929 } else {
930 None
931 };
932
933 if !agent.circuit_breaker().is_closed() {
935 warn!(
936 agent_id = %agent.id(),
937 correlation_id = %correlation_id,
938 filter_failure_mode = ?filter_failure_mode,
939 "Circuit breaker open, skipping agent"
940 );
941 return Err((
942 agent.id().to_string(),
943 filter_failure_mode,
944 "Circuit breaker open".to_string(),
945 ));
946 }
947
948 let start = Instant::now();
950 let timeout_duration = Duration::from_millis(agent.timeout_ms());
951
952 match timeout(timeout_duration, agent.call_event(event_type, event)).await {
953 Ok(Ok(response)) => {
954 let duration = start.elapsed();
955 agent.record_success(duration);
956 trace!(
957 correlation_id = %correlation_id,
958 agent_id = %agent.id(),
959 duration_ms = duration.as_millis(),
960 "Parallel agent call succeeded"
961 );
962 Ok((agent.id().to_string(), response))
963 }
964 Ok(Err(e)) => {
965 agent.record_failure();
966 error!(
967 agent_id = %agent.id(),
968 correlation_id = %correlation_id,
969 error = %e,
970 duration_ms = start.elapsed().as_millis(),
971 filter_failure_mode = ?filter_failure_mode,
972 "Parallel agent call failed"
973 );
974 Err((
975 agent.id().to_string(),
976 filter_failure_mode,
977 format!("Agent error: {}", e),
978 ))
979 }
980 Err(_) => {
981 agent.record_timeout();
982 warn!(
983 agent_id = %agent.id(),
984 correlation_id = %correlation_id,
985 timeout_ms = agent.timeout_ms(),
986 filter_failure_mode = ?filter_failure_mode,
987 "Parallel agent call timed out"
988 );
989 Err((
990 agent.id().to_string(),
991 filter_failure_mode,
992 "Timeout".to_string(),
993 ))
994 }
995 }
996 }
997 })
998 .collect();
999
1000 let results = join_all(futures).await;
1002
1003 let mut combined_decision = AgentDecision::default_allow();
1005 let mut blocking_error: Option<AgentDecision> = None;
1006
1007 for result in results {
1008 match result {
1009 Ok((agent_id, response)) => {
1010 let decision: AgentDecision = response.into();
1011
1012 if !decision.is_allow() {
1014 debug!(
1015 correlation_id = %ctx.correlation_id,
1016 agent_id = %agent_id,
1017 decision = ?decision,
1018 "Agent returned blocking decision"
1019 );
1020 return Ok(decision);
1022 }
1023
1024 combined_decision.merge(decision);
1025 }
1026 Err((agent_id, failure_mode, reason)) => {
1027 if failure_mode == FailureMode::Closed && blocking_error.is_none() {
1029 debug!(
1030 correlation_id = %ctx.correlation_id,
1031 agent_id = %agent_id,
1032 reason = %reason,
1033 "Agent failure in fail-closed mode"
1034 );
1035 let status = if reason.contains("Timeout") { 504 } else { 503 };
1038 let message = if reason.contains("Timeout") {
1039 "Gateway timeout"
1040 } else {
1041 "Service unavailable"
1042 };
1043 blocking_error = Some(AgentDecision::block(status, message));
1044 } else {
1045 debug!(
1047 correlation_id = %ctx.correlation_id,
1048 agent_id = %agent_id,
1049 reason = %reason,
1050 "Agent failure in fail-open mode, continuing"
1051 );
1052 }
1053 }
1054 }
1055 }
1056
1057 if let Some(error_decision) = blocking_error {
1059 return Ok(error_decision);
1060 }
1061
1062 trace!(
1063 correlation_id = %ctx.correlation_id,
1064 decision = ?combined_decision,
1065 agents_processed = agent_info.len(),
1066 "Parallel agent event processing completed"
1067 );
1068
1069 Ok(combined_decision)
1070 }
1071
1072 pub async fn call_guardrail_agent(
1077 &self,
1078 agent_name: &str,
1079 event: GuardrailInspectEvent,
1080 ) -> GrapsusResult<AgentResponse> {
1081 let agents = self.agents.read().await;
1082 let agent = agents.get(agent_name).ok_or_else(|| GrapsusError::Agent {
1083 agent: agent_name.to_string(),
1084 message: format!("Agent '{}' not found", agent_name),
1085 event: "guardrail_inspect".to_string(),
1086 source: None,
1087 })?;
1088
1089 let agent = Arc::clone(agent);
1090 drop(agents); let semaphores = self.agent_semaphores.read().await;
1094 let semaphore = semaphores.get(agent_name).cloned();
1095 drop(semaphores);
1096
1097 let _permit = if let Some(sem) = semaphore {
1098 Some(
1099 sem.acquire_owned()
1100 .await
1101 .map_err(|_| GrapsusError::Agent {
1102 agent: agent_name.to_string(),
1103 message: "Failed to acquire agent call permit".to_string(),
1104 event: "guardrail_inspect".to_string(),
1105 source: None,
1106 })?,
1107 )
1108 } else {
1109 None
1110 };
1111
1112 if !agent.circuit_breaker().is_closed() {
1114 return Err(GrapsusError::Agent {
1115 agent: agent_name.to_string(),
1116 message: "Circuit breaker open".to_string(),
1117 event: "guardrail_inspect".to_string(),
1118 source: None,
1119 });
1120 }
1121
1122 let start = Instant::now();
1123 let timeout_duration = Duration::from_millis(agent.timeout_ms());
1124
1125 match timeout(timeout_duration, agent.call_guardrail_inspect(&event)).await {
1126 Ok(Ok(response)) => {
1127 agent.record_success(start.elapsed());
1128 Ok(response)
1129 }
1130 Ok(Err(e)) => {
1131 agent.record_failure();
1132 Err(e)
1133 }
1134 Err(_) => {
1135 agent.record_timeout();
1136 Err(GrapsusError::Agent {
1137 agent: agent_name.to_string(),
1138 message: format!(
1139 "Guardrail agent call timed out after {}ms",
1140 timeout_duration.as_millis()
1141 ),
1142 event: "guardrail_inspect".to_string(),
1143 source: None,
1144 })
1145 }
1146 }
1147 }
1148
1149 pub async fn initialize(&self) -> GrapsusResult<()> {
1151 let agents = self.agents.read().await;
1152
1153 info!(agent_count = agents.len(), "Initializing agent connections");
1154
1155 let mut initialized_count = 0;
1156 let mut failed_count = 0;
1157
1158 for (id, agent) in agents.iter() {
1159 debug!(agent_id = %id, "Initializing agent connection");
1160 if let Err(e) = agent.initialize().await {
1161 error!(
1162 agent_id = %id,
1163 error = %e,
1164 "Failed to initialize agent"
1165 );
1166 failed_count += 1;
1167 } else {
1169 trace!(agent_id = %id, "Agent initialized successfully");
1170 initialized_count += 1;
1171 }
1172 }
1173
1174 info!(
1175 initialized = initialized_count,
1176 failed = failed_count,
1177 total = agents.len(),
1178 "Agent initialization complete"
1179 );
1180
1181 Ok(())
1182 }
1183
1184 pub async fn shutdown(&self) {
1186 let agents = self.agents.read().await;
1187
1188 info!(agent_count = agents.len(), "Shutting down agent manager");
1189
1190 for (id, agent) in agents.iter() {
1191 debug!(agent_id = %id, "Shutting down agent");
1192 agent.shutdown().await;
1193 trace!(agent_id = %id, "Agent shutdown complete");
1194 }
1195
1196 info!("Agent manager shutdown complete");
1197 }
1198
1199 pub fn metrics(&self) -> &AgentMetrics {
1201 &self.metrics
1202 }
1203
1204 pub fn get_agents_for_event(&self, event_type: EventType) -> Vec<String> {
1209 if let Ok(agents) = self.agents.try_read() {
1212 agents
1213 .values()
1214 .filter(|agent| agent.handles_event(event_type))
1215 .map(|agent| agent.id().to_string())
1216 .collect()
1217 } else {
1218 Vec::new()
1219 }
1220 }
1221
1222 pub async fn get_v2_pool_metrics(&self) -> Vec<(String, Arc<MetricsCollector>)> {
1228 let agents = self.agents.read().await;
1229 agents
1230 .iter()
1231 .map(|(id, agent)| (id.clone(), agent.pool_metrics_collector_arc()))
1232 .collect()
1233 }
1234
1235 pub async fn export_v2_pool_metrics(&self) -> String {
1239 let agents = self.agents.read().await;
1240 let mut output = String::new();
1241
1242 for (id, agent) in agents.iter() {
1243 let pool_metrics = agent.export_prometheus();
1244 if !pool_metrics.is_empty() {
1245 output.push_str(&format!("\n# Agent pool metrics: {}\n", id));
1246 output.push_str(&pool_metrics);
1247 }
1248 }
1249
1250 output
1251 }
1252
1253 pub async fn get_v2_metrics_collector(&self, agent_id: &str) -> Option<Arc<MetricsCollector>> {
1257 let agents = self.agents.read().await;
1258 agents
1259 .get(agent_id)
1260 .map(|agent| agent.pool_metrics_collector_arc())
1261 }
1262}