Skip to main content

grapsus_proxy/agents/
manager.rs

1//! Agent manager for coordinating external processing agents.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use base64::{engine::general_purpose::STANDARD, Engine as _};
8use futures::future::join_all;
9use pingora_timeout::timeout;
10use tokio::sync::{RwLock, Semaphore};
11use tracing::{debug, error, info, trace, warn};
12use grapsus_agent_protocol::{
13    v2::MetricsCollector, AgentResponse, EventType, GuardrailInspectEvent, RequestBodyChunkEvent,
14    RequestHeadersEvent, ResponseBodyChunkEvent, ResponseHeadersEvent, WebSocketFrameEvent,
15};
16use grapsus_common::{
17    errors::{GrapsusError, GrapsusResult},
18    types::CircuitBreakerConfig,
19    CircuitBreaker,
20};
21use grapsus_config::{AgentConfig, FailureMode};
22
23use super::agent_v2::AgentV2;
24use super::context::AgentCallContext;
25use super::decision::AgentDecision;
26use super::metrics::AgentMetrics;
27
28/// Agent manager handling all external agents.
29///
30/// All agents use the v2 protocol with bidirectional streaming, capabilities,
31/// health reporting, metrics export, and flow control.
32pub struct AgentManager {
33    /// Configured agents
34    agents: Arc<RwLock<HashMap<String, Arc<AgentV2>>>>,
35    /// Circuit breakers per agent
36    circuit_breakers: Arc<RwLock<HashMap<String, Arc<CircuitBreaker>>>>,
37    /// Global agent metrics
38    metrics: Arc<AgentMetrics>,
39    /// Per-agent semaphores for queue isolation (prevents noisy neighbor problem)
40    agent_semaphores: Arc<RwLock<HashMap<String, Arc<Semaphore>>>>,
41}
42
43impl AgentManager {
44    /// Create new agent manager.
45    ///
46    /// Each agent gets its own semaphore for queue isolation, preventing a slow
47    /// agent from affecting other agents (noisy neighbor problem). The concurrency
48    /// limit is configured per-agent via `max_concurrent_calls` in the agent config.
49    pub async fn new(agents: Vec<AgentConfig>) -> GrapsusResult<Self> {
50        info!(agent_count = agents.len(), "Creating agent manager");
51
52        let mut agent_map = HashMap::new();
53        let breakers = HashMap::new();
54        let mut semaphores = HashMap::new();
55
56        for config in agents {
57            debug!(
58                agent_id = %config.id,
59                transport = ?config.transport,
60                timeout_ms = config.timeout_ms,
61                failure_mode = ?config.failure_mode,
62                max_concurrent_calls = config.max_concurrent_calls,
63                "Configuring agent"
64            );
65
66            // Create per-agent semaphore for queue isolation
67            let semaphore = Arc::new(Semaphore::new(config.max_concurrent_calls));
68
69            let circuit_breaker = Arc::new(CircuitBreaker::new(
70                config
71                    .circuit_breaker
72                    .clone()
73                    .unwrap_or_else(CircuitBreakerConfig::default),
74            ));
75
76            trace!(
77                agent_id = %config.id,
78                max_concurrent_calls = config.max_concurrent_calls,
79                pool_config = ?config.pool,
80                "Creating agent instance with internal pool"
81            );
82
83            let agent = Arc::new(AgentV2::new(config.clone(), circuit_breaker));
84
85            agent_map.insert(config.id.clone(), agent);
86            semaphores.insert(config.id.clone(), semaphore);
87
88            debug!(
89                agent_id = %config.id,
90                "Agent configured successfully"
91            );
92        }
93
94        info!(
95            configured_agents = agent_map.len(),
96            "Agent manager created successfully with per-agent queue isolation"
97        );
98
99        Ok(Self {
100            agents: Arc::new(RwLock::new(agent_map)),
101            circuit_breakers: Arc::new(RwLock::new(breakers)),
102            metrics: Arc::new(AgentMetrics::default()),
103            agent_semaphores: Arc::new(RwLock::new(semaphores)),
104        })
105    }
106
107    /// Check if any of the given route agents handle a specific event type.
108    pub async fn any_agent_handles_event(
109        &self,
110        route_agents: &[String],
111        event_type: EventType,
112    ) -> bool {
113        let agents = self.agents.read().await;
114        route_agents
115            .iter()
116            .filter_map(|id| agents.get(id))
117            .any(|agent| agent.handles_event(event_type))
118    }
119
120    /// Process request headers through agents.
121    ///
122    /// # Arguments
123    /// * `ctx` - Agent call context with correlation ID and metadata
124    /// * `headers` - Request headers to send to agents
125    /// * `route_agents` - List of (agent_id, failure_mode) tuples from filter chain
126    pub async fn process_request_headers(
127        &self,
128        ctx: &AgentCallContext,
129        mut headers: HashMap<String, Vec<String>>,
130        route_agents: &[(String, FailureMode)],
131    ) -> GrapsusResult<AgentDecision> {
132        let method = headers
133            .remove(":method")
134            .and_then(|mut v| {
135                if v.is_empty() {
136                    None
137                } else {
138                    Some(v.swap_remove(0))
139                }
140            })
141            .unwrap_or_else(|| "GET".to_string());
142        let uri = headers
143            .remove(":path")
144            .and_then(|mut v| {
145                if v.is_empty() {
146                    None
147                } else {
148                    Some(v.swap_remove(0))
149                }
150            })
151            .unwrap_or_else(|| "/".to_string());
152        let event = RequestHeadersEvent {
153            metadata: ctx.metadata.clone(),
154            method,
155            uri,
156            headers,
157        };
158
159        // Use parallel processing for better latency with multiple agents
160        self.process_event_parallel(EventType::RequestHeaders, &event, route_agents, ctx)
161            .await
162    }
163
164    /// Process request body chunk through agents.
165    pub async fn process_request_body(
166        &self,
167        ctx: &AgentCallContext,
168        data: &[u8],
169        is_last: bool,
170        route_agents: &[String],
171    ) -> GrapsusResult<AgentDecision> {
172        // Check body size limits
173        let max_size = 1024 * 1024; // 1MB default
174        if data.len() > max_size {
175            warn!(
176                correlation_id = %ctx.correlation_id,
177                size = data.len(),
178                "Request body exceeds agent inspection limit"
179            );
180            return Ok(AgentDecision::default_allow());
181        }
182
183        let event = RequestBodyChunkEvent {
184            correlation_id: ctx.correlation_id.to_string(),
185            data: STANDARD.encode(data),
186            is_last,
187            total_size: ctx.request_body.as_ref().map(|b| b.len()),
188            chunk_index: 0, // Buffer mode sends entire body as single chunk
189            bytes_received: data.len(),
190        };
191
192        self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
193            .await
194    }
195
196    /// Process a single request body chunk through agents (streaming mode).
197    ///
198    /// Unlike `process_request_body` which is used for buffered mode, this method
199    /// is designed for streaming where chunks are sent individually as they arrive.
200    pub async fn process_request_body_streaming(
201        &self,
202        ctx: &AgentCallContext,
203        data: &[u8],
204        is_last: bool,
205        chunk_index: u32,
206        bytes_received: usize,
207        total_size: Option<usize>,
208        route_agents: &[String],
209    ) -> GrapsusResult<AgentDecision> {
210        trace!(
211            correlation_id = %ctx.correlation_id,
212            chunk_index = chunk_index,
213            chunk_size = data.len(),
214            bytes_received = bytes_received,
215            is_last = is_last,
216            "Processing streaming request body chunk"
217        );
218
219        let event = RequestBodyChunkEvent {
220            correlation_id: ctx.correlation_id.to_string(),
221            data: STANDARD.encode(data),
222            is_last,
223            total_size,
224            chunk_index,
225            bytes_received,
226        };
227
228        self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
229            .await
230    }
231
232    /// Process a single response body chunk through agents (streaming mode).
233    pub async fn process_response_body_streaming(
234        &self,
235        ctx: &AgentCallContext,
236        data: &[u8],
237        is_last: bool,
238        chunk_index: u32,
239        bytes_sent: usize,
240        total_size: Option<usize>,
241        route_agents: &[String],
242    ) -> GrapsusResult<AgentDecision> {
243        trace!(
244            correlation_id = %ctx.correlation_id,
245            chunk_index = chunk_index,
246            chunk_size = data.len(),
247            bytes_sent = bytes_sent,
248            is_last = is_last,
249            "Processing streaming response body chunk"
250        );
251
252        let event = ResponseBodyChunkEvent {
253            correlation_id: ctx.correlation_id.to_string(),
254            data: STANDARD.encode(data),
255            is_last,
256            total_size,
257            chunk_index,
258            bytes_sent,
259        };
260
261        self.process_event(EventType::ResponseBodyChunk, &event, route_agents, ctx)
262            .await
263    }
264
265    /// Process response headers through agents.
266    pub async fn process_response_headers(
267        &self,
268        ctx: &AgentCallContext,
269        status: u16,
270        headers: &HashMap<String, Vec<String>>,
271        route_agents: &[String],
272    ) -> GrapsusResult<AgentDecision> {
273        let event = ResponseHeadersEvent {
274            correlation_id: ctx.correlation_id.to_string(),
275            status,
276            headers: headers.clone(),
277        };
278
279        self.process_event(EventType::ResponseHeaders, &event, route_agents, ctx)
280            .await
281    }
282
283    /// Process a WebSocket frame through agents.
284    ///
285    /// This is used for WebSocket frame inspection after an upgrade.
286    /// Returns the agent response directly to allow the caller to access
287    /// the websocket_decision field.
288    pub async fn process_websocket_frame(
289        &self,
290        route_id: &str,
291        event: WebSocketFrameEvent,
292    ) -> GrapsusResult<AgentResponse> {
293        trace!(
294            correlation_id = %event.correlation_id,
295            route_id = %route_id,
296            frame_index = event.frame_index,
297            opcode = %event.opcode,
298            "Processing WebSocket frame through agents"
299        );
300
301        // Get relevant agents for this route that handle WebSocket frames
302        let agents = self.agents.read().await;
303        let relevant_agents: Vec<_> = agents
304            .values()
305            .filter(|agent| agent.handles_event(EventType::WebSocketFrame))
306            .collect();
307
308        if relevant_agents.is_empty() {
309            trace!(
310                correlation_id = %event.correlation_id,
311                "No agents handle WebSocket frames, allowing"
312            );
313            return Ok(AgentResponse::websocket_allow());
314        }
315
316        debug!(
317            correlation_id = %event.correlation_id,
318            route_id = %route_id,
319            agent_count = relevant_agents.len(),
320            "Processing WebSocket frame through agents"
321        );
322
323        // Process through each agent sequentially
324        for agent in relevant_agents {
325            // Check circuit breaker
326            if !agent.circuit_breaker().is_closed() {
327                warn!(
328                    agent_id = %agent.id(),
329                    correlation_id = %event.correlation_id,
330                    failure_mode = ?agent.failure_mode(),
331                    "Circuit breaker open, skipping agent for WebSocket frame"
332                );
333
334                if agent.failure_mode() == FailureMode::Closed {
335                    debug!(
336                        correlation_id = %event.correlation_id,
337                        agent_id = %agent.id(),
338                        "Closing WebSocket due to circuit breaker (fail-closed mode)"
339                    );
340                    return Ok(AgentResponse::websocket_close(
341                        1011,
342                        "Service unavailable".to_string(),
343                    ));
344                }
345                continue;
346            }
347
348            // Call agent with timeout
349            let start = Instant::now();
350            let timeout_duration = Duration::from_millis(agent.timeout_ms());
351
352            match timeout(
353                timeout_duration,
354                agent.call_event(EventType::WebSocketFrame, &event),
355            )
356            .await
357            {
358                Ok(Ok(response)) => {
359                    let duration = start.elapsed();
360                    agent.record_success(duration);
361
362                    trace!(
363                        correlation_id = %event.correlation_id,
364                        agent_id = %agent.id(),
365                        duration_ms = duration.as_millis(),
366                        "WebSocket frame agent call succeeded"
367                    );
368
369                    // If agent returned a WebSocket decision that's not Allow, return immediately
370                    if let Some(ref ws_decision) = response.websocket_decision {
371                        if !matches!(
372                            ws_decision,
373                            grapsus_agent_protocol::WebSocketDecision::Allow
374                        ) {
375                            debug!(
376                                correlation_id = %event.correlation_id,
377                                agent_id = %agent.id(),
378                                decision = ?ws_decision,
379                                "Agent returned non-allow WebSocket decision"
380                            );
381                            return Ok(response);
382                        }
383                    }
384                }
385                Ok(Err(e)) => {
386                    agent.record_failure();
387                    error!(
388                        agent_id = %agent.id(),
389                        correlation_id = %event.correlation_id,
390                        error = %e,
391                        duration_ms = start.elapsed().as_millis(),
392                        failure_mode = ?agent.failure_mode(),
393                        "WebSocket frame agent call failed"
394                    );
395
396                    if agent.failure_mode() == FailureMode::Closed {
397                        return Ok(AgentResponse::websocket_close(
398                            1011,
399                            "Agent error".to_string(),
400                        ));
401                    }
402                }
403                Err(_) => {
404                    agent.record_timeout();
405                    warn!(
406                        agent_id = %agent.id(),
407                        correlation_id = %event.correlation_id,
408                        timeout_ms = agent.timeout_ms(),
409                        failure_mode = ?agent.failure_mode(),
410                        "WebSocket frame agent call timed out"
411                    );
412
413                    if agent.failure_mode() == FailureMode::Closed {
414                        return Ok(AgentResponse::websocket_close(
415                            1011,
416                            "Gateway timeout".to_string(),
417                        ));
418                    }
419                }
420            }
421        }
422
423        // All agents allowed the frame
424        Ok(AgentResponse::websocket_allow())
425    }
426
427    /// Process an event through relevant agents.
428    async fn process_event<T: serde::Serialize>(
429        &self,
430        event_type: EventType,
431        event: &T,
432        route_agents: &[String],
433        ctx: &AgentCallContext,
434    ) -> GrapsusResult<AgentDecision> {
435        trace!(
436            correlation_id = %ctx.correlation_id,
437            event_type = ?event_type,
438            route_agents = ?route_agents,
439            "Starting agent event processing"
440        );
441
442        // Get relevant agents for this route and event type
443        let agents = self.agents.read().await;
444        let relevant_agents: Vec<_> = route_agents
445            .iter()
446            .filter_map(|id| agents.get(id))
447            .filter(|agent| agent.handles_event(event_type))
448            .collect();
449
450        if relevant_agents.is_empty() {
451            trace!(
452                correlation_id = %ctx.correlation_id,
453                event_type = ?event_type,
454                "No relevant agents for event, allowing request"
455            );
456            return Ok(AgentDecision::default_allow());
457        }
458
459        debug!(
460            correlation_id = %ctx.correlation_id,
461            event_type = ?event_type,
462            agent_count = relevant_agents.len(),
463            agent_ids = ?relevant_agents.iter().map(|a| a.id()).collect::<Vec<_>>(),
464            "Processing event through agents"
465        );
466
467        // Process through each agent sequentially
468        let mut combined_decision = AgentDecision::default_allow();
469
470        for (agent_index, agent) in relevant_agents.iter().enumerate() {
471            trace!(
472                correlation_id = %ctx.correlation_id,
473                agent_id = %agent.id(),
474                agent_index = agent_index,
475                event_type = ?event_type,
476                "Processing event through agent"
477            );
478
479            // Acquire per-agent semaphore permit (queue isolation)
480            let semaphores = self.agent_semaphores.read().await;
481            let agent_semaphore = semaphores.get(agent.id()).cloned();
482            drop(semaphores); // Release lock before awaiting
483
484            let _permit = match agent_semaphore {
485                Some(semaphore) => {
486                    trace!(
487                        correlation_id = %ctx.correlation_id,
488                        agent_id = %agent.id(),
489                        "Acquiring per-agent semaphore permit"
490                    );
491                    Some(semaphore.acquire_owned().await.map_err(|_| {
492                        error!(
493                            correlation_id = %ctx.correlation_id,
494                            agent_id = %agent.id(),
495                            "Failed to acquire agent call semaphore permit"
496                        );
497                        GrapsusError::Internal {
498                            message: "Failed to acquire agent call permit".to_string(),
499                            correlation_id: Some(ctx.correlation_id.to_string()),
500                            source: None,
501                        }
502                    })?)
503                }
504                None => {
505                    // No semaphore found (shouldn't happen, but fail gracefully)
506                    warn!(
507                        correlation_id = %ctx.correlation_id,
508                        agent_id = %agent.id(),
509                        "No semaphore found for agent, proceeding without queue isolation"
510                    );
511                    None
512                }
513            };
514
515            // Check circuit breaker
516            if !agent.circuit_breaker().is_closed() {
517                warn!(
518                    agent_id = %agent.id(),
519                    correlation_id = %ctx.correlation_id,
520                    failure_mode = ?agent.failure_mode(),
521                    "Circuit breaker open, skipping agent"
522                );
523
524                // Handle based on failure mode
525                if agent.failure_mode() == FailureMode::Closed {
526                    debug!(
527                        correlation_id = %ctx.correlation_id,
528                        agent_id = %agent.id(),
529                        "Blocking request due to circuit breaker (fail-closed mode)"
530                    );
531                    return Ok(AgentDecision::block(503, "Service unavailable"));
532                }
533                continue;
534            }
535
536            // Call agent with timeout (using pingora-timeout for efficiency)
537            let start = Instant::now();
538            let timeout_duration = Duration::from_millis(agent.timeout_ms());
539
540            trace!(
541                correlation_id = %ctx.correlation_id,
542                agent_id = %agent.id(),
543                timeout_ms = agent.timeout_ms(),
544                "Calling agent"
545            );
546
547            match timeout(timeout_duration, agent.call_event(event_type, event)).await {
548                Ok(Ok(response)) => {
549                    let duration = start.elapsed();
550                    agent.record_success(duration);
551
552                    trace!(
553                        correlation_id = %ctx.correlation_id,
554                        agent_id = %agent.id(),
555                        duration_ms = duration.as_millis(),
556                        decision = ?response,
557                        "Agent call succeeded"
558                    );
559
560                    // Merge response into combined decision
561                    combined_decision.merge(response.into());
562
563                    // If decision is to block/redirect/challenge, stop processing
564                    if !combined_decision.is_allow() {
565                        debug!(
566                            correlation_id = %ctx.correlation_id,
567                            agent_id = %agent.id(),
568                            decision = ?combined_decision,
569                            "Agent returned blocking decision, stopping agent chain"
570                        );
571                        break;
572                    }
573                }
574                Ok(Err(e)) => {
575                    agent.record_failure();
576                    error!(
577                        agent_id = %agent.id(),
578                        correlation_id = %ctx.correlation_id,
579                        error = %e,
580                        duration_ms = start.elapsed().as_millis(),
581                        failure_mode = ?agent.failure_mode(),
582                        "Agent call failed"
583                    );
584
585                    if agent.failure_mode() == FailureMode::Closed {
586                        return Err(e);
587                    }
588                }
589                Err(_) => {
590                    agent.record_timeout();
591                    warn!(
592                        agent_id = %agent.id(),
593                        correlation_id = %ctx.correlation_id,
594                        timeout_ms = agent.timeout_ms(),
595                        failure_mode = ?agent.failure_mode(),
596                        "Agent call timed out"
597                    );
598
599                    if agent.failure_mode() == FailureMode::Closed {
600                        debug!(
601                            correlation_id = %ctx.correlation_id,
602                            agent_id = %agent.id(),
603                            "Blocking request due to timeout (fail-closed mode)"
604                        );
605                        return Ok(AgentDecision::block(504, "Gateway timeout"));
606                    }
607                }
608            }
609        }
610
611        trace!(
612            correlation_id = %ctx.correlation_id,
613            decision = ?combined_decision,
614            agents_processed = relevant_agents.len(),
615            "Agent event processing completed"
616        );
617
618        Ok(combined_decision)
619    }
620
621    /// Process an event through relevant agents with per-filter failure modes.
622    ///
623    /// This is the preferred method for processing events as it respects the
624    /// failure mode configured on each filter, not just the agent's default.
625    async fn process_event_with_failure_modes<T: serde::Serialize>(
626        &self,
627        event_type: EventType,
628        event: &T,
629        route_agents: &[(String, FailureMode)],
630        ctx: &AgentCallContext,
631    ) -> GrapsusResult<AgentDecision> {
632        trace!(
633            correlation_id = %ctx.correlation_id,
634            event_type = ?event_type,
635            route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
636            "Starting agent event processing with failure modes"
637        );
638
639        // Get relevant agents for this route and event type, preserving failure modes
640        let agents = self.agents.read().await;
641        let relevant_agents: Vec<_> = route_agents
642            .iter()
643            .filter_map(|(id, failure_mode)| agents.get(id).map(|agent| (agent, *failure_mode)))
644            .filter(|(agent, _)| agent.handles_event(event_type))
645            .collect();
646
647        if relevant_agents.is_empty() {
648            trace!(
649                correlation_id = %ctx.correlation_id,
650                event_type = ?event_type,
651                "No relevant agents for event, allowing request"
652            );
653            return Ok(AgentDecision::default_allow());
654        }
655
656        debug!(
657            correlation_id = %ctx.correlation_id,
658            event_type = ?event_type,
659            agent_count = relevant_agents.len(),
660            agent_ids = ?relevant_agents.iter().map(|(a, _)| a.id()).collect::<Vec<_>>(),
661            "Processing event through agents"
662        );
663
664        // Process through each agent sequentially
665        let mut combined_decision = AgentDecision::default_allow();
666
667        for (agent_index, (agent, filter_failure_mode)) in relevant_agents.iter().enumerate() {
668            trace!(
669                correlation_id = %ctx.correlation_id,
670                agent_id = %agent.id(),
671                agent_index = agent_index,
672                event_type = ?event_type,
673                filter_failure_mode = ?filter_failure_mode,
674                "Processing event through agent with filter failure mode"
675            );
676
677            // Acquire per-agent semaphore permit (queue isolation)
678            let semaphores = self.agent_semaphores.read().await;
679            let agent_semaphore = semaphores.get(agent.id()).cloned();
680            drop(semaphores); // Release lock before awaiting
681
682            let _permit = if let Some(semaphore) = agent_semaphore {
683                trace!(
684                    correlation_id = %ctx.correlation_id,
685                    agent_id = %agent.id(),
686                    "Acquiring per-agent semaphore permit"
687                );
688                Some(semaphore.acquire_owned().await.map_err(|_| {
689                    error!(
690                        correlation_id = %ctx.correlation_id,
691                        agent_id = %agent.id(),
692                        "Failed to acquire agent call semaphore permit"
693                    );
694                    GrapsusError::Internal {
695                        message: "Failed to acquire agent call permit".to_string(),
696                        correlation_id: Some(ctx.correlation_id.to_string()),
697                        source: None,
698                    }
699                })?)
700            } else {
701                // No semaphore found (shouldn't happen, but fail gracefully)
702                warn!(
703                    correlation_id = %ctx.correlation_id,
704                    agent_id = %agent.id(),
705                    "No semaphore found for agent, proceeding without queue isolation"
706                );
707                None
708            };
709
710            // Check circuit breaker
711            if !agent.circuit_breaker().is_closed() {
712                warn!(
713                    agent_id = %agent.id(),
714                    correlation_id = %ctx.correlation_id,
715                    filter_failure_mode = ?filter_failure_mode,
716                    "Circuit breaker open, skipping agent"
717                );
718
719                // Handle based on filter's failure mode (not agent's default)
720                if *filter_failure_mode == FailureMode::Closed {
721                    debug!(
722                        correlation_id = %ctx.correlation_id,
723                        agent_id = %agent.id(),
724                        "Blocking request due to circuit breaker (filter fail-closed mode)"
725                    );
726                    return Ok(AgentDecision::block(503, "Service unavailable"));
727                }
728                // Fail-open: continue to next agent
729                continue;
730            }
731
732            // Call agent with timeout
733            let start = Instant::now();
734            let timeout_duration = Duration::from_millis(agent.timeout_ms());
735
736            trace!(
737                correlation_id = %ctx.correlation_id,
738                agent_id = %agent.id(),
739                timeout_ms = agent.timeout_ms(),
740                "Calling agent"
741            );
742
743            match timeout(timeout_duration, agent.call_event(event_type, event)).await {
744                Ok(Ok(response)) => {
745                    let duration = start.elapsed();
746                    agent.record_success(duration);
747
748                    trace!(
749                        correlation_id = %ctx.correlation_id,
750                        agent_id = %agent.id(),
751                        duration_ms = duration.as_millis(),
752                        decision = ?response,
753                        "Agent call succeeded"
754                    );
755
756                    // Merge response into combined decision
757                    combined_decision.merge(response.into());
758
759                    // If decision is to block/redirect/challenge, stop processing
760                    if !combined_decision.is_allow() {
761                        debug!(
762                            correlation_id = %ctx.correlation_id,
763                            agent_id = %agent.id(),
764                            decision = ?combined_decision,
765                            "Agent returned blocking decision, stopping agent chain"
766                        );
767                        break;
768                    }
769                }
770                Ok(Err(e)) => {
771                    agent.record_failure();
772                    error!(
773                        agent_id = %agent.id(),
774                        correlation_id = %ctx.correlation_id,
775                        error = %e,
776                        duration_ms = start.elapsed().as_millis(),
777                        filter_failure_mode = ?filter_failure_mode,
778                        "Agent call failed"
779                    );
780
781                    // Use filter's failure mode, not agent's default
782                    if *filter_failure_mode == FailureMode::Closed {
783                        debug!(
784                            correlation_id = %ctx.correlation_id,
785                            agent_id = %agent.id(),
786                            "Blocking request due to agent failure (filter fail-closed mode)"
787                        );
788                        return Ok(AgentDecision::block(503, "Agent unavailable"));
789                    }
790                    // Fail-open: continue to next agent (or proceed without this agent)
791                    debug!(
792                        correlation_id = %ctx.correlation_id,
793                        agent_id = %agent.id(),
794                        "Continuing despite agent failure (filter fail-open mode)"
795                    );
796                }
797                Err(_) => {
798                    agent.record_timeout();
799                    warn!(
800                        agent_id = %agent.id(),
801                        correlation_id = %ctx.correlation_id,
802                        timeout_ms = agent.timeout_ms(),
803                        filter_failure_mode = ?filter_failure_mode,
804                        "Agent call timed out"
805                    );
806
807                    // Use filter's failure mode, not agent's default
808                    if *filter_failure_mode == FailureMode::Closed {
809                        debug!(
810                            correlation_id = %ctx.correlation_id,
811                            agent_id = %agent.id(),
812                            "Blocking request due to timeout (filter fail-closed mode)"
813                        );
814                        return Ok(AgentDecision::block(504, "Gateway timeout"));
815                    }
816                    // Fail-open: continue to next agent
817                    debug!(
818                        correlation_id = %ctx.correlation_id,
819                        agent_id = %agent.id(),
820                        "Continuing despite timeout (filter fail-open mode)"
821                    );
822                }
823            }
824        }
825
826        trace!(
827            correlation_id = %ctx.correlation_id,
828            decision = ?combined_decision,
829            agents_processed = relevant_agents.len(),
830            "Agent event processing with failure modes completed"
831        );
832
833        Ok(combined_decision)
834    }
835
836    /// Process an event through relevant agents in parallel.
837    ///
838    /// This method executes all agent calls concurrently using `join_all`, which
839    /// significantly improves latency when multiple agents are configured. The
840    /// tradeoff is that if one agent blocks, other agents may still complete
841    /// their work (slight resource waste in blocking scenarios).
842    ///
843    /// # Performance
844    ///
845    /// For N agents with latency L each:
846    /// - Sequential: O(N * L)
847    /// - Parallel: O(L) (assuming sufficient concurrency)
848    ///
849    /// This is the preferred method for most use cases.
850    async fn process_event_parallel<T: serde::Serialize + Sync>(
851        &self,
852        event_type: EventType,
853        event: &T,
854        route_agents: &[(String, FailureMode)],
855        ctx: &AgentCallContext,
856    ) -> GrapsusResult<AgentDecision> {
857        trace!(
858            correlation_id = %ctx.correlation_id,
859            event_type = ?event_type,
860            route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
861            "Starting parallel agent event processing"
862        );
863
864        // Get relevant agents for this route and event type
865        let agents = self.agents.read().await;
866        let semaphores = self.agent_semaphores.read().await;
867
868        // Collect agent info upfront to minimize lock duration
869        let agent_info: Vec<_> = route_agents
870            .iter()
871            .filter_map(|(id, failure_mode)| {
872                let agent = agents.get(id)?;
873                if !agent.handles_event(event_type) {
874                    return None;
875                }
876                let semaphore = semaphores.get(id).cloned();
877                Some((Arc::clone(agent), *failure_mode, semaphore))
878            })
879            .collect();
880
881        // Release locks early
882        drop(agents);
883        drop(semaphores);
884
885        if agent_info.is_empty() {
886            trace!(
887                correlation_id = %ctx.correlation_id,
888                event_type = ?event_type,
889                "No relevant agents for event, allowing request"
890            );
891            return Ok(AgentDecision::default_allow());
892        }
893
894        debug!(
895            correlation_id = %ctx.correlation_id,
896            event_type = ?event_type,
897            agent_count = agent_info.len(),
898            agent_ids = ?agent_info.iter().map(|(a, _, _)| a.id()).collect::<Vec<_>>(),
899            "Processing event through agents in parallel"
900        );
901
902        // Spawn all agent calls concurrently
903        let futures: Vec<_> = agent_info
904            .iter()
905            .map(|(agent, filter_failure_mode, semaphore)| {
906                let agent = Arc::clone(agent);
907                let filter_failure_mode = *filter_failure_mode;
908                let semaphore = semaphore.clone();
909                let correlation_id = ctx.correlation_id.clone();
910
911                async move {
912                    // Acquire per-agent semaphore permit (queue isolation)
913                    let _permit = if let Some(sem) = semaphore {
914                        match sem.acquire_owned().await {
915                            Ok(permit) => Some(permit),
916                            Err(_) => {
917                                error!(
918                                    correlation_id = %correlation_id,
919                                    agent_id = %agent.id(),
920                                    "Failed to acquire agent semaphore permit"
921                                );
922                                return Err((
923                                    agent.id().to_string(),
924                                    filter_failure_mode,
925                                    "Failed to acquire permit".to_string(),
926                                ));
927                            }
928                        }
929                    } else {
930                        None
931                    };
932
933                    // Check circuit breaker
934                    if !agent.circuit_breaker().is_closed() {
935                        warn!(
936                            agent_id = %agent.id(),
937                            correlation_id = %correlation_id,
938                            filter_failure_mode = ?filter_failure_mode,
939                            "Circuit breaker open, skipping agent"
940                        );
941                        return Err((
942                            agent.id().to_string(),
943                            filter_failure_mode,
944                            "Circuit breaker open".to_string(),
945                        ));
946                    }
947
948                    // Call agent with timeout
949                    let start = Instant::now();
950                    let timeout_duration = Duration::from_millis(agent.timeout_ms());
951
952                    match timeout(timeout_duration, agent.call_event(event_type, event)).await {
953                        Ok(Ok(response)) => {
954                            let duration = start.elapsed();
955                            agent.record_success(duration);
956                            trace!(
957                                correlation_id = %correlation_id,
958                                agent_id = %agent.id(),
959                                duration_ms = duration.as_millis(),
960                                "Parallel agent call succeeded"
961                            );
962                            Ok((agent.id().to_string(), response))
963                        }
964                        Ok(Err(e)) => {
965                            agent.record_failure();
966                            error!(
967                                agent_id = %agent.id(),
968                                correlation_id = %correlation_id,
969                                error = %e,
970                                duration_ms = start.elapsed().as_millis(),
971                                filter_failure_mode = ?filter_failure_mode,
972                                "Parallel agent call failed"
973                            );
974                            Err((
975                                agent.id().to_string(),
976                                filter_failure_mode,
977                                format!("Agent error: {}", e),
978                            ))
979                        }
980                        Err(_) => {
981                            agent.record_timeout();
982                            warn!(
983                                agent_id = %agent.id(),
984                                correlation_id = %correlation_id,
985                                timeout_ms = agent.timeout_ms(),
986                                filter_failure_mode = ?filter_failure_mode,
987                                "Parallel agent call timed out"
988                            );
989                            Err((
990                                agent.id().to_string(),
991                                filter_failure_mode,
992                                "Timeout".to_string(),
993                            ))
994                        }
995                    }
996                }
997            })
998            .collect();
999
1000        // Execute all agent calls in parallel
1001        let results = join_all(futures).await;
1002
1003        // Process results and merge decisions
1004        let mut combined_decision = AgentDecision::default_allow();
1005        let mut blocking_error: Option<AgentDecision> = None;
1006
1007        for result in results {
1008            match result {
1009                Ok((agent_id, response)) => {
1010                    let decision: AgentDecision = response.into();
1011
1012                    // Check for blocking decision
1013                    if !decision.is_allow() {
1014                        debug!(
1015                            correlation_id = %ctx.correlation_id,
1016                            agent_id = %agent_id,
1017                            decision = ?decision,
1018                            "Agent returned blocking decision"
1019                        );
1020                        // Return first blocking decision immediately
1021                        return Ok(decision);
1022                    }
1023
1024                    combined_decision.merge(decision);
1025                }
1026                Err((agent_id, failure_mode, reason)) => {
1027                    // Handle failure based on filter's failure mode
1028                    if failure_mode == FailureMode::Closed && blocking_error.is_none() {
1029                        debug!(
1030                            correlation_id = %ctx.correlation_id,
1031                            agent_id = %agent_id,
1032                            reason = %reason,
1033                            "Agent failure in fail-closed mode"
1034                        );
1035                        // Store blocking error but continue processing other results
1036                        // in case another agent returned a more specific block
1037                        let status = if reason.contains("Timeout") { 504 } else { 503 };
1038                        let message = if reason.contains("Timeout") {
1039                            "Gateway timeout"
1040                        } else {
1041                            "Service unavailable"
1042                        };
1043                        blocking_error = Some(AgentDecision::block(status, message));
1044                    } else {
1045                        // Fail-open: log and continue
1046                        debug!(
1047                            correlation_id = %ctx.correlation_id,
1048                            agent_id = %agent_id,
1049                            reason = %reason,
1050                            "Agent failure in fail-open mode, continuing"
1051                        );
1052                    }
1053                }
1054            }
1055        }
1056
1057        // If we have a fail-closed error and no explicit block, return the error
1058        if let Some(error_decision) = blocking_error {
1059            return Ok(error_decision);
1060        }
1061
1062        trace!(
1063            correlation_id = %ctx.correlation_id,
1064            decision = ?combined_decision,
1065            agents_processed = agent_info.len(),
1066            "Parallel agent event processing completed"
1067        );
1068
1069        Ok(combined_decision)
1070    }
1071
1072    /// Call a named agent with a guardrail inspect event.
1073    ///
1074    /// Looks up the agent by name, checks circuit breaker and timeout,
1075    /// then sends the event and returns the response.
1076    pub async fn call_guardrail_agent(
1077        &self,
1078        agent_name: &str,
1079        event: GuardrailInspectEvent,
1080    ) -> GrapsusResult<AgentResponse> {
1081        let agents = self.agents.read().await;
1082        let agent = agents.get(agent_name).ok_or_else(|| GrapsusError::Agent {
1083            agent: agent_name.to_string(),
1084            message: format!("Agent '{}' not found", agent_name),
1085            event: "guardrail_inspect".to_string(),
1086            source: None,
1087        })?;
1088
1089        let agent = Arc::clone(agent);
1090        drop(agents); // Release lock before calling
1091
1092        // Acquire per-agent semaphore permit
1093        let semaphores = self.agent_semaphores.read().await;
1094        let semaphore = semaphores.get(agent_name).cloned();
1095        drop(semaphores);
1096
1097        let _permit = if let Some(sem) = semaphore {
1098            Some(
1099                sem.acquire_owned()
1100                    .await
1101                    .map_err(|_| GrapsusError::Agent {
1102                        agent: agent_name.to_string(),
1103                        message: "Failed to acquire agent call permit".to_string(),
1104                        event: "guardrail_inspect".to_string(),
1105                        source: None,
1106                    })?,
1107            )
1108        } else {
1109            None
1110        };
1111
1112        // Check circuit breaker
1113        if !agent.circuit_breaker().is_closed() {
1114            return Err(GrapsusError::Agent {
1115                agent: agent_name.to_string(),
1116                message: "Circuit breaker open".to_string(),
1117                event: "guardrail_inspect".to_string(),
1118                source: None,
1119            });
1120        }
1121
1122        let start = Instant::now();
1123        let timeout_duration = Duration::from_millis(agent.timeout_ms());
1124
1125        match timeout(timeout_duration, agent.call_guardrail_inspect(&event)).await {
1126            Ok(Ok(response)) => {
1127                agent.record_success(start.elapsed());
1128                Ok(response)
1129            }
1130            Ok(Err(e)) => {
1131                agent.record_failure();
1132                Err(e)
1133            }
1134            Err(_) => {
1135                agent.record_timeout();
1136                Err(GrapsusError::Agent {
1137                    agent: agent_name.to_string(),
1138                    message: format!(
1139                        "Guardrail agent call timed out after {}ms",
1140                        timeout_duration.as_millis()
1141                    ),
1142                    event: "guardrail_inspect".to_string(),
1143                    source: None,
1144                })
1145            }
1146        }
1147    }
1148
1149    /// Initialize agent connections.
1150    pub async fn initialize(&self) -> GrapsusResult<()> {
1151        let agents = self.agents.read().await;
1152
1153        info!(agent_count = agents.len(), "Initializing agent connections");
1154
1155        let mut initialized_count = 0;
1156        let mut failed_count = 0;
1157
1158        for (id, agent) in agents.iter() {
1159            debug!(agent_id = %id, "Initializing agent connection");
1160            if let Err(e) = agent.initialize().await {
1161                error!(
1162                    agent_id = %id,
1163                    error = %e,
1164                    "Failed to initialize agent"
1165                );
1166                failed_count += 1;
1167                // Continue with other agents
1168            } else {
1169                trace!(agent_id = %id, "Agent initialized successfully");
1170                initialized_count += 1;
1171            }
1172        }
1173
1174        info!(
1175            initialized = initialized_count,
1176            failed = failed_count,
1177            total = agents.len(),
1178            "Agent initialization complete"
1179        );
1180
1181        Ok(())
1182    }
1183
1184    /// Shutdown all agents.
1185    pub async fn shutdown(&self) {
1186        let agents = self.agents.read().await;
1187
1188        info!(agent_count = agents.len(), "Shutting down agent manager");
1189
1190        for (id, agent) in agents.iter() {
1191            debug!(agent_id = %id, "Shutting down agent");
1192            agent.shutdown().await;
1193            trace!(agent_id = %id, "Agent shutdown complete");
1194        }
1195
1196        info!("Agent manager shutdown complete");
1197    }
1198
1199    /// Get agent metrics.
1200    pub fn metrics(&self) -> &AgentMetrics {
1201        &self.metrics
1202    }
1203
1204    /// Get agent IDs that handle a specific event type.
1205    ///
1206    /// This is useful for pre-filtering agents before making calls,
1207    /// e.g., to check if any agents handle WebSocket frames.
1208    pub fn get_agents_for_event(&self, event_type: EventType) -> Vec<String> {
1209        // Use try_read to avoid blocking - return empty if lock is held
1210        // This is acceptable since this is only used for informational purposes
1211        if let Ok(agents) = self.agents.try_read() {
1212            agents
1213                .values()
1214                .filter(|agent| agent.handles_event(event_type))
1215                .map(|agent| agent.id().to_string())
1216                .collect()
1217        } else {
1218            Vec::new()
1219        }
1220    }
1221
1222    /// Get pool metrics collectors from all agents.
1223    ///
1224    /// Returns a vector of (agent_id, MetricsCollector) pairs.
1225    /// These can be registered with the MetricsManager to include agent pool
1226    /// metrics in the /metrics endpoint output.
1227    pub async fn get_v2_pool_metrics(&self) -> Vec<(String, Arc<MetricsCollector>)> {
1228        let agents = self.agents.read().await;
1229        agents
1230            .iter()
1231            .map(|(id, agent)| (id.clone(), agent.pool_metrics_collector_arc()))
1232            .collect()
1233    }
1234
1235    /// Export prometheus metrics from all agent pools.
1236    ///
1237    /// Returns the combined prometheus-formatted metrics from all agent pools.
1238    pub async fn export_v2_pool_metrics(&self) -> String {
1239        let agents = self.agents.read().await;
1240        let mut output = String::new();
1241
1242        for (id, agent) in agents.iter() {
1243            let pool_metrics = agent.export_prometheus();
1244            if !pool_metrics.is_empty() {
1245                output.push_str(&format!("\n# Agent pool metrics: {}\n", id));
1246                output.push_str(&pool_metrics);
1247            }
1248        }
1249
1250        output
1251    }
1252
1253    /// Get an agent's metrics collector by ID.
1254    ///
1255    /// Returns None if the agent doesn't exist.
1256    pub async fn get_v2_metrics_collector(&self, agent_id: &str) -> Option<Arc<MetricsCollector>> {
1257        let agents = self.agents.read().await;
1258        agents
1259            .get(agent_id)
1260            .map(|agent| agent.pool_metrics_collector_arc())
1261    }
1262}