Skip to main content

sentinel_proxy/agents/
manager.rs

1//! Agent manager for coordinating external processing agents.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use base64::{engine::general_purpose::STANDARD, Engine as _};
8use futures::future::join_all;
9use pingora_timeout::timeout;
10use sentinel_agent_protocol::{
11    v2::MetricsCollector, AgentResponse, EventType, RequestBodyChunkEvent, RequestHeadersEvent,
12    ResponseBodyChunkEvent, ResponseHeadersEvent, WebSocketFrameEvent,
13};
14use sentinel_common::{
15    errors::{SentinelError, SentinelResult},
16    types::CircuitBreakerConfig,
17    CircuitBreaker,
18};
19use sentinel_config::{AgentConfig, AgentProtocolVersion, FailureMode};
20use tokio::sync::{RwLock, Semaphore};
21use tracing::{debug, error, info, trace, warn};
22
23use super::agent::Agent;
24use super::agent_v2::AgentV2;
25use super::context::AgentCallContext;
26use super::decision::AgentDecision;
27use super::metrics::AgentMetrics;
28use super::pool::AgentConnectionPool;
29
30/// Unified agent wrapper supporting both v1 and v2 protocols.
31pub enum UnifiedAgent {
32    V1(Arc<Agent>),
33    V2(Arc<AgentV2>),
34}
35
36impl UnifiedAgent {
37    /// Get the agent ID.
38    pub fn id(&self) -> &str {
39        match self {
40            UnifiedAgent::V1(agent) => agent.id(),
41            UnifiedAgent::V2(agent) => agent.id(),
42        }
43    }
44
45    /// Get the agent's circuit breaker.
46    pub fn circuit_breaker(&self) -> &CircuitBreaker {
47        match self {
48            UnifiedAgent::V1(agent) => agent.circuit_breaker(),
49            UnifiedAgent::V2(agent) => agent.circuit_breaker(),
50        }
51    }
52
53    /// Get the agent's failure mode.
54    pub fn failure_mode(&self) -> FailureMode {
55        match self {
56            UnifiedAgent::V1(agent) => agent.failure_mode(),
57            UnifiedAgent::V2(agent) => agent.failure_mode(),
58        }
59    }
60
61    /// Get the agent's timeout in milliseconds.
62    pub fn timeout_ms(&self) -> u64 {
63        match self {
64            UnifiedAgent::V1(agent) => agent.timeout_ms(),
65            UnifiedAgent::V2(agent) => agent.timeout_ms(),
66        }
67    }
68
69    /// Check if agent handles a specific event type.
70    pub fn handles_event(&self, event_type: EventType) -> bool {
71        match self {
72            UnifiedAgent::V1(agent) => agent.handles_event(event_type),
73            UnifiedAgent::V2(agent) => agent.handles_event(event_type),
74        }
75    }
76
77    /// Initialize agent connection(s).
78    pub async fn initialize(&self) -> SentinelResult<()> {
79        match self {
80            UnifiedAgent::V1(agent) => agent.initialize().await,
81            UnifiedAgent::V2(agent) => agent.initialize().await,
82        }
83    }
84
85    /// Call agent with event.
86    ///
87    /// For V2 agents, this dispatches to the appropriate typed method based on
88    /// event type. The event is serialized and deserialized to convert between
89    /// the generic type and the specific event struct.
90    pub async fn call_event<T: serde::Serialize>(
91        &self,
92        event_type: EventType,
93        event: &T,
94    ) -> SentinelResult<AgentResponse> {
95        match self {
96            UnifiedAgent::V1(agent) => agent.call_event(event_type, event).await,
97            UnifiedAgent::V2(agent) => {
98                // V2 uses typed methods - convert to appropriate event type
99                let json = serde_json::to_value(event).map_err(|e| SentinelError::Agent {
100                    agent: agent.id().to_string(),
101                    message: format!("Failed to serialize event: {}", e),
102                    event: format!("{:?}", event_type),
103                    source: None,
104                })?;
105
106                match event_type {
107                    EventType::RequestHeaders => {
108                        let typed_event: RequestHeadersEvent = serde_json::from_value(json)
109                            .map_err(|e| SentinelError::Agent {
110                                agent: agent.id().to_string(),
111                                message: format!(
112                                    "Failed to deserialize RequestHeadersEvent: {}",
113                                    e
114                                ),
115                                event: format!("{:?}", event_type),
116                                source: None,
117                            })?;
118                        agent.call_request_headers(&typed_event).await
119                    }
120                    EventType::RequestBodyChunk => {
121                        let typed_event: RequestBodyChunkEvent = serde_json::from_value(json)
122                            .map_err(|e| SentinelError::Agent {
123                                agent: agent.id().to_string(),
124                                message: format!(
125                                    "Failed to deserialize RequestBodyChunkEvent: {}",
126                                    e
127                                ),
128                                event: format!("{:?}", event_type),
129                                source: None,
130                            })?;
131                        agent.call_request_body_chunk(&typed_event).await
132                    }
133                    EventType::ResponseHeaders => {
134                        let typed_event: ResponseHeadersEvent = serde_json::from_value(json)
135                            .map_err(|e| SentinelError::Agent {
136                                agent: agent.id().to_string(),
137                                message: format!(
138                                    "Failed to deserialize ResponseHeadersEvent: {}",
139                                    e
140                                ),
141                                event: format!("{:?}", event_type),
142                                source: None,
143                            })?;
144                        agent.call_response_headers(&typed_event).await
145                    }
146                    EventType::ResponseBodyChunk => {
147                        let typed_event: ResponseBodyChunkEvent = serde_json::from_value(json)
148                            .map_err(|e| SentinelError::Agent {
149                                agent: agent.id().to_string(),
150                                message: format!(
151                                    "Failed to deserialize ResponseBodyChunkEvent: {}",
152                                    e
153                                ),
154                                event: format!("{:?}", event_type),
155                                source: None,
156                            })?;
157                        agent.call_response_body_chunk(&typed_event).await
158                    }
159                    _ => {
160                        // For unsupported event types, return error
161                        Err(SentinelError::Agent {
162                            agent: agent.id().to_string(),
163                            message: format!("V2 does not support event type {:?}", event_type),
164                            event: format!("{:?}", event_type),
165                            source: None,
166                        })
167                    }
168                }
169            }
170        }
171    }
172
173    /// Call agent with request headers event (v2-optimized).
174    pub async fn call_request_headers(
175        &self,
176        event: &RequestHeadersEvent,
177    ) -> SentinelResult<AgentResponse> {
178        match self {
179            UnifiedAgent::V1(agent) => agent.call_event(EventType::RequestHeaders, event).await,
180            UnifiedAgent::V2(agent) => agent.call_request_headers(event).await,
181        }
182    }
183
184    /// Call agent with request body chunk event (v2-optimized).
185    pub async fn call_request_body_chunk(
186        &self,
187        event: &RequestBodyChunkEvent,
188    ) -> SentinelResult<AgentResponse> {
189        match self {
190            UnifiedAgent::V1(agent) => agent.call_event(EventType::RequestBodyChunk, event).await,
191            UnifiedAgent::V2(agent) => agent.call_request_body_chunk(event).await,
192        }
193    }
194
195    /// Call agent with response headers event (v2-optimized).
196    pub async fn call_response_headers(
197        &self,
198        event: &ResponseHeadersEvent,
199    ) -> SentinelResult<AgentResponse> {
200        match self {
201            UnifiedAgent::V1(agent) => agent.call_event(EventType::ResponseHeaders, event).await,
202            UnifiedAgent::V2(agent) => agent.call_response_headers(event).await,
203        }
204    }
205
206    /// Call agent with response body chunk event (v2-optimized).
207    pub async fn call_response_body_chunk(
208        &self,
209        event: &ResponseBodyChunkEvent,
210    ) -> SentinelResult<AgentResponse> {
211        match self {
212            UnifiedAgent::V1(agent) => agent.call_event(EventType::ResponseBodyChunk, event).await,
213            UnifiedAgent::V2(agent) => agent.call_response_body_chunk(event).await,
214        }
215    }
216
217    /// Record successful call.
218    pub async fn record_success(&self, duration: Duration) {
219        match self {
220            UnifiedAgent::V1(agent) => agent.record_success(duration).await,
221            UnifiedAgent::V2(agent) => agent.record_success(duration),
222        }
223    }
224
225    /// Record failed call.
226    pub async fn record_failure(&self) {
227        match self {
228            UnifiedAgent::V1(agent) => agent.record_failure().await,
229            UnifiedAgent::V2(agent) => agent.record_failure(),
230        }
231    }
232
233    /// Record timeout.
234    pub async fn record_timeout(&self) {
235        match self {
236            UnifiedAgent::V1(agent) => agent.record_timeout().await,
237            UnifiedAgent::V2(agent) => agent.record_timeout(),
238        }
239    }
240
241    /// Shutdown agent.
242    pub async fn shutdown(&self) {
243        match self {
244            UnifiedAgent::V1(agent) => agent.shutdown().await,
245            UnifiedAgent::V2(agent) => agent.shutdown().await,
246        }
247    }
248
249    /// Check if this is a v2 agent.
250    pub fn is_v2(&self) -> bool {
251        matches!(self, UnifiedAgent::V2(_))
252    }
253}
254
255/// Agent manager handling all external agents.
256///
257/// Supports both v1 and v2 protocol agents. V1 agents use simple request/response,
258/// while v2 agents support bidirectional streaming with capabilities, health
259/// reporting, metrics export, and flow control.
260pub struct AgentManager {
261    /// Configured agents (unified wrapper for v1 and v2)
262    agents: Arc<RwLock<HashMap<String, Arc<UnifiedAgent>>>>,
263    /// Connection pools for v1 agents only
264    connection_pools: Arc<RwLock<HashMap<String, Arc<AgentConnectionPool>>>>,
265    /// Circuit breakers per agent (v1 only - v2 has circuit breakers in pool)
266    circuit_breakers: Arc<RwLock<HashMap<String, Arc<CircuitBreaker>>>>,
267    /// Global agent metrics
268    metrics: Arc<AgentMetrics>,
269    /// Per-agent semaphores for queue isolation (prevents noisy neighbor problem)
270    agent_semaphores: Arc<RwLock<HashMap<String, Arc<Semaphore>>>>,
271}
272
273impl AgentManager {
274    /// Create new agent manager.
275    ///
276    /// Each agent gets its own semaphore for queue isolation, preventing a slow
277    /// agent from affecting other agents (noisy neighbor problem). The concurrency
278    /// limit is configured per-agent via `max_concurrent_calls` in the agent config.
279    ///
280    /// Supports both v1 and v2 protocol agents based on the `protocol_version` field.
281    pub async fn new(agents: Vec<AgentConfig>) -> SentinelResult<Self> {
282        info!(agent_count = agents.len(), "Creating agent manager");
283
284        let mut agent_map = HashMap::new();
285        let mut pools = HashMap::new();
286        let mut breakers = HashMap::new();
287        let mut semaphores = HashMap::new();
288
289        let mut v1_count = 0;
290        let mut v2_count = 0;
291
292        for config in agents {
293            debug!(
294                agent_id = %config.id,
295                transport = ?config.transport,
296                timeout_ms = config.timeout_ms,
297                failure_mode = ?config.failure_mode,
298                max_concurrent_calls = config.max_concurrent_calls,
299                protocol_version = ?config.protocol_version,
300                "Configuring agent"
301            );
302
303            // Create per-agent semaphore for queue isolation
304            let semaphore = Arc::new(Semaphore::new(config.max_concurrent_calls));
305
306            let unified_agent = match config.protocol_version {
307                AgentProtocolVersion::V1 => {
308                    // V1: Create pool and circuit breaker externally
309                    let pool = Arc::new(AgentConnectionPool::new(
310                        10, // max connections
311                        2,  // min idle
312                        5,  // max idle
313                        Duration::from_secs(60),
314                    ));
315
316                    let circuit_breaker = Arc::new(CircuitBreaker::new(
317                        config
318                            .circuit_breaker
319                            .clone()
320                            .unwrap_or_else(CircuitBreakerConfig::default),
321                    ));
322
323                    trace!(
324                        agent_id = %config.id,
325                        max_concurrent_calls = config.max_concurrent_calls,
326                        "Creating v1 agent instance with isolated queue"
327                    );
328
329                    let agent = Arc::new(Agent::new(
330                        config.clone(),
331                        Arc::clone(&pool),
332                        Arc::clone(&circuit_breaker),
333                    ));
334
335                    pools.insert(config.id.clone(), pool);
336                    breakers.insert(config.id.clone(), circuit_breaker);
337                    v1_count += 1;
338
339                    Arc::new(UnifiedAgent::V1(agent))
340                }
341                AgentProtocolVersion::V2 => {
342                    // V2: Pool and circuit breaker are internal to AgentV2
343                    let circuit_breaker = Arc::new(CircuitBreaker::new(
344                        config
345                            .circuit_breaker
346                            .clone()
347                            .unwrap_or_else(CircuitBreakerConfig::default),
348                    ));
349
350                    trace!(
351                        agent_id = %config.id,
352                        max_concurrent_calls = config.max_concurrent_calls,
353                        pool_config = ?config.pool,
354                        "Creating v2 agent instance with internal pool"
355                    );
356
357                    let agent = Arc::new(AgentV2::new(config.clone(), circuit_breaker));
358
359                    v2_count += 1;
360
361                    Arc::new(UnifiedAgent::V2(agent))
362                }
363            };
364
365            agent_map.insert(config.id.clone(), unified_agent);
366            semaphores.insert(config.id.clone(), semaphore);
367
368            debug!(
369                agent_id = %config.id,
370                protocol_version = ?config.protocol_version,
371                "Agent configured successfully"
372            );
373        }
374
375        info!(
376            configured_agents = agent_map.len(),
377            v1_agents = v1_count,
378            v2_agents = v2_count,
379            "Agent manager created successfully with per-agent queue isolation"
380        );
381
382        Ok(Self {
383            agents: Arc::new(RwLock::new(agent_map)),
384            connection_pools: Arc::new(RwLock::new(pools)),
385            circuit_breakers: Arc::new(RwLock::new(breakers)),
386            metrics: Arc::new(AgentMetrics::default()),
387            agent_semaphores: Arc::new(RwLock::new(semaphores)),
388        })
389    }
390
391    /// Process request headers through agents.
392    ///
393    /// # Arguments
394    /// * `ctx` - Agent call context with correlation ID and metadata
395    /// * `headers` - Request headers to send to agents
396    /// * `route_agents` - List of (agent_id, failure_mode) tuples from filter chain
397    pub async fn process_request_headers(
398        &self,
399        ctx: &AgentCallContext,
400        mut headers: HashMap<String, Vec<String>>,
401        route_agents: &[(String, FailureMode)],
402    ) -> SentinelResult<AgentDecision> {
403        let method = headers
404            .remove(":method")
405            .and_then(|mut v| {
406                if v.is_empty() {
407                    None
408                } else {
409                    Some(v.swap_remove(0))
410                }
411            })
412            .unwrap_or_else(|| "GET".to_string());
413        let uri = headers
414            .remove(":path")
415            .and_then(|mut v| {
416                if v.is_empty() {
417                    None
418                } else {
419                    Some(v.swap_remove(0))
420                }
421            })
422            .unwrap_or_else(|| "/".to_string());
423        let event = RequestHeadersEvent {
424            metadata: ctx.metadata.clone(),
425            method,
426            uri,
427            headers,
428        };
429
430        // Use parallel processing for better latency with multiple agents
431        self.process_event_parallel(EventType::RequestHeaders, &event, route_agents, ctx)
432            .await
433    }
434
435    /// Process request body chunk through agents.
436    pub async fn process_request_body(
437        &self,
438        ctx: &AgentCallContext,
439        data: &[u8],
440        is_last: bool,
441        route_agents: &[String],
442    ) -> SentinelResult<AgentDecision> {
443        // Check body size limits
444        let max_size = 1024 * 1024; // 1MB default
445        if data.len() > max_size {
446            warn!(
447                correlation_id = %ctx.correlation_id,
448                size = data.len(),
449                "Request body exceeds agent inspection limit"
450            );
451            return Ok(AgentDecision::default_allow());
452        }
453
454        let event = RequestBodyChunkEvent {
455            correlation_id: ctx.correlation_id.to_string(),
456            data: STANDARD.encode(data),
457            is_last,
458            total_size: ctx.request_body.as_ref().map(|b| b.len()),
459            chunk_index: 0, // Buffer mode sends entire body as single chunk
460            bytes_received: data.len(),
461        };
462
463        self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
464            .await
465    }
466
467    /// Process a single request body chunk through agents (streaming mode).
468    ///
469    /// Unlike `process_request_body` which is used for buffered mode, this method
470    /// is designed for streaming where chunks are sent individually as they arrive.
471    pub async fn process_request_body_streaming(
472        &self,
473        ctx: &AgentCallContext,
474        data: &[u8],
475        is_last: bool,
476        chunk_index: u32,
477        bytes_received: usize,
478        total_size: Option<usize>,
479        route_agents: &[String],
480    ) -> SentinelResult<AgentDecision> {
481        trace!(
482            correlation_id = %ctx.correlation_id,
483            chunk_index = chunk_index,
484            chunk_size = data.len(),
485            bytes_received = bytes_received,
486            is_last = is_last,
487            "Processing streaming request body chunk"
488        );
489
490        let event = RequestBodyChunkEvent {
491            correlation_id: ctx.correlation_id.to_string(),
492            data: STANDARD.encode(data),
493            is_last,
494            total_size,
495            chunk_index,
496            bytes_received,
497        };
498
499        self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
500            .await
501    }
502
503    /// Process a single response body chunk through agents (streaming mode).
504    pub async fn process_response_body_streaming(
505        &self,
506        ctx: &AgentCallContext,
507        data: &[u8],
508        is_last: bool,
509        chunk_index: u32,
510        bytes_sent: usize,
511        total_size: Option<usize>,
512        route_agents: &[String],
513    ) -> SentinelResult<AgentDecision> {
514        trace!(
515            correlation_id = %ctx.correlation_id,
516            chunk_index = chunk_index,
517            chunk_size = data.len(),
518            bytes_sent = bytes_sent,
519            is_last = is_last,
520            "Processing streaming response body chunk"
521        );
522
523        let event = ResponseBodyChunkEvent {
524            correlation_id: ctx.correlation_id.to_string(),
525            data: STANDARD.encode(data),
526            is_last,
527            total_size,
528            chunk_index,
529            bytes_sent,
530        };
531
532        self.process_event(EventType::ResponseBodyChunk, &event, route_agents, ctx)
533            .await
534    }
535
536    /// Process response headers through agents.
537    pub async fn process_response_headers(
538        &self,
539        ctx: &AgentCallContext,
540        status: u16,
541        headers: &HashMap<String, Vec<String>>,
542        route_agents: &[String],
543    ) -> SentinelResult<AgentDecision> {
544        let event = ResponseHeadersEvent {
545            correlation_id: ctx.correlation_id.to_string(),
546            status,
547            headers: headers.clone(),
548        };
549
550        self.process_event(EventType::ResponseHeaders, &event, route_agents, ctx)
551            .await
552    }
553
554    /// Process a WebSocket frame through agents.
555    ///
556    /// This is used for WebSocket frame inspection after an upgrade.
557    /// Returns the agent response directly to allow the caller to access
558    /// the websocket_decision field.
559    pub async fn process_websocket_frame(
560        &self,
561        route_id: &str,
562        event: WebSocketFrameEvent,
563    ) -> SentinelResult<AgentResponse> {
564        trace!(
565            correlation_id = %event.correlation_id,
566            route_id = %route_id,
567            frame_index = event.frame_index,
568            opcode = %event.opcode,
569            "Processing WebSocket frame through agents"
570        );
571
572        // Get relevant agents for this route that handle WebSocket frames
573        let agents = self.agents.read().await;
574        let relevant_agents: Vec<_> = agents
575            .values()
576            .filter(|agent| agent.handles_event(EventType::WebSocketFrame))
577            .collect();
578
579        if relevant_agents.is_empty() {
580            trace!(
581                correlation_id = %event.correlation_id,
582                "No agents handle WebSocket frames, allowing"
583            );
584            return Ok(AgentResponse::websocket_allow());
585        }
586
587        debug!(
588            correlation_id = %event.correlation_id,
589            route_id = %route_id,
590            agent_count = relevant_agents.len(),
591            "Processing WebSocket frame through agents"
592        );
593
594        // Process through each agent sequentially
595        for agent in relevant_agents {
596            // Check circuit breaker
597            if !agent.circuit_breaker().is_closed() {
598                warn!(
599                    agent_id = %agent.id(),
600                    correlation_id = %event.correlation_id,
601                    failure_mode = ?agent.failure_mode(),
602                    "Circuit breaker open, skipping agent for WebSocket frame"
603                );
604
605                if agent.failure_mode() == FailureMode::Closed {
606                    debug!(
607                        correlation_id = %event.correlation_id,
608                        agent_id = %agent.id(),
609                        "Closing WebSocket due to circuit breaker (fail-closed mode)"
610                    );
611                    return Ok(AgentResponse::websocket_close(
612                        1011,
613                        "Service unavailable".to_string(),
614                    ));
615                }
616                continue;
617            }
618
619            // Call agent with timeout
620            let start = Instant::now();
621            let timeout_duration = Duration::from_millis(agent.timeout_ms());
622
623            match timeout(
624                timeout_duration,
625                agent.call_event(EventType::WebSocketFrame, &event),
626            )
627            .await
628            {
629                Ok(Ok(response)) => {
630                    let duration = start.elapsed();
631                    agent.record_success(duration).await;
632
633                    trace!(
634                        correlation_id = %event.correlation_id,
635                        agent_id = %agent.id(),
636                        duration_ms = duration.as_millis(),
637                        "WebSocket frame agent call succeeded"
638                    );
639
640                    // If agent returned a WebSocket decision that's not Allow, return immediately
641                    if let Some(ref ws_decision) = response.websocket_decision {
642                        if !matches!(
643                            ws_decision,
644                            sentinel_agent_protocol::WebSocketDecision::Allow
645                        ) {
646                            debug!(
647                                correlation_id = %event.correlation_id,
648                                agent_id = %agent.id(),
649                                decision = ?ws_decision,
650                                "Agent returned non-allow WebSocket decision"
651                            );
652                            return Ok(response);
653                        }
654                    }
655                }
656                Ok(Err(e)) => {
657                    agent.record_failure().await;
658                    error!(
659                        agent_id = %agent.id(),
660                        correlation_id = %event.correlation_id,
661                        error = %e,
662                        duration_ms = start.elapsed().as_millis(),
663                        failure_mode = ?agent.failure_mode(),
664                        "WebSocket frame agent call failed"
665                    );
666
667                    if agent.failure_mode() == FailureMode::Closed {
668                        return Ok(AgentResponse::websocket_close(
669                            1011,
670                            "Agent error".to_string(),
671                        ));
672                    }
673                }
674                Err(_) => {
675                    agent.record_timeout().await;
676                    warn!(
677                        agent_id = %agent.id(),
678                        correlation_id = %event.correlation_id,
679                        timeout_ms = agent.timeout_ms(),
680                        failure_mode = ?agent.failure_mode(),
681                        "WebSocket frame agent call timed out"
682                    );
683
684                    if agent.failure_mode() == FailureMode::Closed {
685                        return Ok(AgentResponse::websocket_close(
686                            1011,
687                            "Gateway timeout".to_string(),
688                        ));
689                    }
690                }
691            }
692        }
693
694        // All agents allowed the frame
695        Ok(AgentResponse::websocket_allow())
696    }
697
698    /// Process an event through relevant agents.
699    async fn process_event<T: serde::Serialize>(
700        &self,
701        event_type: EventType,
702        event: &T,
703        route_agents: &[String],
704        ctx: &AgentCallContext,
705    ) -> SentinelResult<AgentDecision> {
706        trace!(
707            correlation_id = %ctx.correlation_id,
708            event_type = ?event_type,
709            route_agents = ?route_agents,
710            "Starting agent event processing"
711        );
712
713        // Get relevant agents for this route and event type
714        let agents = self.agents.read().await;
715        let relevant_agents: Vec<_> = route_agents
716            .iter()
717            .filter_map(|id| agents.get(id))
718            .filter(|agent| agent.handles_event(event_type))
719            .collect();
720
721        if relevant_agents.is_empty() {
722            trace!(
723                correlation_id = %ctx.correlation_id,
724                event_type = ?event_type,
725                "No relevant agents for event, allowing request"
726            );
727            return Ok(AgentDecision::default_allow());
728        }
729
730        debug!(
731            correlation_id = %ctx.correlation_id,
732            event_type = ?event_type,
733            agent_count = relevant_agents.len(),
734            agent_ids = ?relevant_agents.iter().map(|a| a.id()).collect::<Vec<_>>(),
735            "Processing event through agents"
736        );
737
738        // Process through each agent sequentially
739        let mut combined_decision = AgentDecision::default_allow();
740
741        for (agent_index, agent) in relevant_agents.iter().enumerate() {
742            trace!(
743                correlation_id = %ctx.correlation_id,
744                agent_id = %agent.id(),
745                agent_index = agent_index,
746                event_type = ?event_type,
747                "Processing event through agent"
748            );
749
750            // Acquire per-agent semaphore permit (queue isolation)
751            let semaphores = self.agent_semaphores.read().await;
752            let agent_semaphore = semaphores.get(agent.id()).cloned();
753            drop(semaphores); // Release lock before awaiting
754
755            let _permit = match agent_semaphore {
756                Some(semaphore) => {
757                    trace!(
758                        correlation_id = %ctx.correlation_id,
759                        agent_id = %agent.id(),
760                        "Acquiring per-agent semaphore permit"
761                    );
762                    Some(semaphore.acquire_owned().await.map_err(|_| {
763                        error!(
764                            correlation_id = %ctx.correlation_id,
765                            agent_id = %agent.id(),
766                            "Failed to acquire agent call semaphore permit"
767                        );
768                        SentinelError::Internal {
769                            message: "Failed to acquire agent call permit".to_string(),
770                            correlation_id: Some(ctx.correlation_id.to_string()),
771                            source: None,
772                        }
773                    })?)
774                }
775                None => {
776                    // No semaphore found (shouldn't happen, but fail gracefully)
777                    warn!(
778                        correlation_id = %ctx.correlation_id,
779                        agent_id = %agent.id(),
780                        "No semaphore found for agent, proceeding without queue isolation"
781                    );
782                    None
783                }
784            };
785
786            // Check circuit breaker
787            if !agent.circuit_breaker().is_closed() {
788                warn!(
789                    agent_id = %agent.id(),
790                    correlation_id = %ctx.correlation_id,
791                    failure_mode = ?agent.failure_mode(),
792                    "Circuit breaker open, skipping agent"
793                );
794
795                // Handle based on failure mode
796                if agent.failure_mode() == FailureMode::Closed {
797                    debug!(
798                        correlation_id = %ctx.correlation_id,
799                        agent_id = %agent.id(),
800                        "Blocking request due to circuit breaker (fail-closed mode)"
801                    );
802                    return Ok(AgentDecision::block(503, "Service unavailable"));
803                }
804                continue;
805            }
806
807            // Call agent with timeout (using pingora-timeout for efficiency)
808            let start = Instant::now();
809            let timeout_duration = Duration::from_millis(agent.timeout_ms());
810
811            trace!(
812                correlation_id = %ctx.correlation_id,
813                agent_id = %agent.id(),
814                timeout_ms = agent.timeout_ms(),
815                "Calling agent"
816            );
817
818            match timeout(timeout_duration, agent.call_event(event_type, event)).await {
819                Ok(Ok(response)) => {
820                    let duration = start.elapsed();
821                    agent.record_success(duration).await;
822
823                    trace!(
824                        correlation_id = %ctx.correlation_id,
825                        agent_id = %agent.id(),
826                        duration_ms = duration.as_millis(),
827                        decision = ?response,
828                        "Agent call succeeded"
829                    );
830
831                    // Merge response into combined decision
832                    combined_decision.merge(response.into());
833
834                    // If decision is to block/redirect/challenge, stop processing
835                    if !combined_decision.is_allow() {
836                        debug!(
837                            correlation_id = %ctx.correlation_id,
838                            agent_id = %agent.id(),
839                            decision = ?combined_decision,
840                            "Agent returned blocking decision, stopping agent chain"
841                        );
842                        break;
843                    }
844                }
845                Ok(Err(e)) => {
846                    agent.record_failure().await;
847                    error!(
848                        agent_id = %agent.id(),
849                        correlation_id = %ctx.correlation_id,
850                        error = %e,
851                        duration_ms = start.elapsed().as_millis(),
852                        failure_mode = ?agent.failure_mode(),
853                        "Agent call failed"
854                    );
855
856                    if agent.failure_mode() == FailureMode::Closed {
857                        return Err(e);
858                    }
859                }
860                Err(_) => {
861                    agent.record_timeout().await;
862                    warn!(
863                        agent_id = %agent.id(),
864                        correlation_id = %ctx.correlation_id,
865                        timeout_ms = agent.timeout_ms(),
866                        failure_mode = ?agent.failure_mode(),
867                        "Agent call timed out"
868                    );
869
870                    if agent.failure_mode() == FailureMode::Closed {
871                        debug!(
872                            correlation_id = %ctx.correlation_id,
873                            agent_id = %agent.id(),
874                            "Blocking request due to timeout (fail-closed mode)"
875                        );
876                        return Ok(AgentDecision::block(504, "Gateway timeout"));
877                    }
878                }
879            }
880        }
881
882        trace!(
883            correlation_id = %ctx.correlation_id,
884            decision = ?combined_decision,
885            agents_processed = relevant_agents.len(),
886            "Agent event processing completed"
887        );
888
889        Ok(combined_decision)
890    }
891
892    /// Process an event through relevant agents with per-filter failure modes.
893    ///
894    /// This is the preferred method for processing events as it respects the
895    /// failure mode configured on each filter, not just the agent's default.
896    async fn process_event_with_failure_modes<T: serde::Serialize>(
897        &self,
898        event_type: EventType,
899        event: &T,
900        route_agents: &[(String, FailureMode)],
901        ctx: &AgentCallContext,
902    ) -> SentinelResult<AgentDecision> {
903        trace!(
904            correlation_id = %ctx.correlation_id,
905            event_type = ?event_type,
906            route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
907            "Starting agent event processing with failure modes"
908        );
909
910        // Get relevant agents for this route and event type, preserving failure modes
911        let agents = self.agents.read().await;
912        let relevant_agents: Vec<_> = route_agents
913            .iter()
914            .filter_map(|(id, failure_mode)| agents.get(id).map(|agent| (agent, *failure_mode)))
915            .filter(|(agent, _)| agent.handles_event(event_type))
916            .collect();
917
918        if relevant_agents.is_empty() {
919            trace!(
920                correlation_id = %ctx.correlation_id,
921                event_type = ?event_type,
922                "No relevant agents for event, allowing request"
923            );
924            return Ok(AgentDecision::default_allow());
925        }
926
927        debug!(
928            correlation_id = %ctx.correlation_id,
929            event_type = ?event_type,
930            agent_count = relevant_agents.len(),
931            agent_ids = ?relevant_agents.iter().map(|(a, _)| a.id()).collect::<Vec<_>>(),
932            "Processing event through agents"
933        );
934
935        // Process through each agent sequentially
936        let mut combined_decision = AgentDecision::default_allow();
937
938        for (agent_index, (agent, filter_failure_mode)) in relevant_agents.iter().enumerate() {
939            trace!(
940                correlation_id = %ctx.correlation_id,
941                agent_id = %agent.id(),
942                agent_index = agent_index,
943                event_type = ?event_type,
944                filter_failure_mode = ?filter_failure_mode,
945                "Processing event through agent with filter failure mode"
946            );
947
948            // Acquire per-agent semaphore permit (queue isolation)
949            let semaphores = self.agent_semaphores.read().await;
950            let agent_semaphore = semaphores.get(agent.id()).cloned();
951            drop(semaphores); // Release lock before awaiting
952
953            let _permit = if let Some(semaphore) = agent_semaphore {
954                trace!(
955                    correlation_id = %ctx.correlation_id,
956                    agent_id = %agent.id(),
957                    "Acquiring per-agent semaphore permit"
958                );
959                Some(semaphore.acquire_owned().await.map_err(|_| {
960                    error!(
961                        correlation_id = %ctx.correlation_id,
962                        agent_id = %agent.id(),
963                        "Failed to acquire agent call semaphore permit"
964                    );
965                    SentinelError::Internal {
966                        message: "Failed to acquire agent call permit".to_string(),
967                        correlation_id: Some(ctx.correlation_id.to_string()),
968                        source: None,
969                    }
970                })?)
971            } else {
972                // No semaphore found (shouldn't happen, but fail gracefully)
973                warn!(
974                    correlation_id = %ctx.correlation_id,
975                    agent_id = %agent.id(),
976                    "No semaphore found for agent, proceeding without queue isolation"
977                );
978                None
979            };
980
981            // Check circuit breaker
982            if !agent.circuit_breaker().is_closed() {
983                warn!(
984                    agent_id = %agent.id(),
985                    correlation_id = %ctx.correlation_id,
986                    filter_failure_mode = ?filter_failure_mode,
987                    "Circuit breaker open, skipping agent"
988                );
989
990                // Handle based on filter's failure mode (not agent's default)
991                if *filter_failure_mode == FailureMode::Closed {
992                    debug!(
993                        correlation_id = %ctx.correlation_id,
994                        agent_id = %agent.id(),
995                        "Blocking request due to circuit breaker (filter fail-closed mode)"
996                    );
997                    return Ok(AgentDecision::block(503, "Service unavailable"));
998                }
999                // Fail-open: continue to next agent
1000                continue;
1001            }
1002
1003            // Call agent with timeout
1004            let start = Instant::now();
1005            let timeout_duration = Duration::from_millis(agent.timeout_ms());
1006
1007            trace!(
1008                correlation_id = %ctx.correlation_id,
1009                agent_id = %agent.id(),
1010                timeout_ms = agent.timeout_ms(),
1011                "Calling agent"
1012            );
1013
1014            match timeout(timeout_duration, agent.call_event(event_type, event)).await {
1015                Ok(Ok(response)) => {
1016                    let duration = start.elapsed();
1017                    agent.record_success(duration).await;
1018
1019                    trace!(
1020                        correlation_id = %ctx.correlation_id,
1021                        agent_id = %agent.id(),
1022                        duration_ms = duration.as_millis(),
1023                        decision = ?response,
1024                        "Agent call succeeded"
1025                    );
1026
1027                    // Merge response into combined decision
1028                    combined_decision.merge(response.into());
1029
1030                    // If decision is to block/redirect/challenge, stop processing
1031                    if !combined_decision.is_allow() {
1032                        debug!(
1033                            correlation_id = %ctx.correlation_id,
1034                            agent_id = %agent.id(),
1035                            decision = ?combined_decision,
1036                            "Agent returned blocking decision, stopping agent chain"
1037                        );
1038                        break;
1039                    }
1040                }
1041                Ok(Err(e)) => {
1042                    agent.record_failure().await;
1043                    error!(
1044                        agent_id = %agent.id(),
1045                        correlation_id = %ctx.correlation_id,
1046                        error = %e,
1047                        duration_ms = start.elapsed().as_millis(),
1048                        filter_failure_mode = ?filter_failure_mode,
1049                        "Agent call failed"
1050                    );
1051
1052                    // Use filter's failure mode, not agent's default
1053                    if *filter_failure_mode == FailureMode::Closed {
1054                        debug!(
1055                            correlation_id = %ctx.correlation_id,
1056                            agent_id = %agent.id(),
1057                            "Blocking request due to agent failure (filter fail-closed mode)"
1058                        );
1059                        return Ok(AgentDecision::block(503, "Agent unavailable"));
1060                    }
1061                    // Fail-open: continue to next agent (or proceed without this agent)
1062                    debug!(
1063                        correlation_id = %ctx.correlation_id,
1064                        agent_id = %agent.id(),
1065                        "Continuing despite agent failure (filter fail-open mode)"
1066                    );
1067                }
1068                Err(_) => {
1069                    agent.record_timeout().await;
1070                    warn!(
1071                        agent_id = %agent.id(),
1072                        correlation_id = %ctx.correlation_id,
1073                        timeout_ms = agent.timeout_ms(),
1074                        filter_failure_mode = ?filter_failure_mode,
1075                        "Agent call timed out"
1076                    );
1077
1078                    // Use filter's failure mode, not agent's default
1079                    if *filter_failure_mode == FailureMode::Closed {
1080                        debug!(
1081                            correlation_id = %ctx.correlation_id,
1082                            agent_id = %agent.id(),
1083                            "Blocking request due to timeout (filter fail-closed mode)"
1084                        );
1085                        return Ok(AgentDecision::block(504, "Gateway timeout"));
1086                    }
1087                    // Fail-open: continue to next agent
1088                    debug!(
1089                        correlation_id = %ctx.correlation_id,
1090                        agent_id = %agent.id(),
1091                        "Continuing despite timeout (filter fail-open mode)"
1092                    );
1093                }
1094            }
1095        }
1096
1097        trace!(
1098            correlation_id = %ctx.correlation_id,
1099            decision = ?combined_decision,
1100            agents_processed = relevant_agents.len(),
1101            "Agent event processing with failure modes completed"
1102        );
1103
1104        Ok(combined_decision)
1105    }
1106
1107    /// Process an event through relevant agents in parallel.
1108    ///
1109    /// This method executes all agent calls concurrently using `join_all`, which
1110    /// significantly improves latency when multiple agents are configured. The
1111    /// tradeoff is that if one agent blocks, other agents may still complete
1112    /// their work (slight resource waste in blocking scenarios).
1113    ///
1114    /// # Performance
1115    ///
1116    /// For N agents with latency L each:
1117    /// - Sequential: O(N * L)
1118    /// - Parallel: O(L) (assuming sufficient concurrency)
1119    ///
1120    /// This is the preferred method for most use cases.
1121    async fn process_event_parallel<T: serde::Serialize + Sync>(
1122        &self,
1123        event_type: EventType,
1124        event: &T,
1125        route_agents: &[(String, FailureMode)],
1126        ctx: &AgentCallContext,
1127    ) -> SentinelResult<AgentDecision> {
1128        trace!(
1129            correlation_id = %ctx.correlation_id,
1130            event_type = ?event_type,
1131            route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
1132            "Starting parallel agent event processing"
1133        );
1134
1135        // Get relevant agents for this route and event type
1136        let agents = self.agents.read().await;
1137        let semaphores = self.agent_semaphores.read().await;
1138
1139        // Collect agent info upfront to minimize lock duration
1140        let agent_info: Vec<_> = route_agents
1141            .iter()
1142            .filter_map(|(id, failure_mode)| {
1143                let agent = agents.get(id)?;
1144                if !agent.handles_event(event_type) {
1145                    return None;
1146                }
1147                let semaphore = semaphores.get(id).cloned();
1148                Some((Arc::clone(agent), *failure_mode, semaphore))
1149            })
1150            .collect();
1151
1152        // Release locks early
1153        drop(agents);
1154        drop(semaphores);
1155
1156        if agent_info.is_empty() {
1157            trace!(
1158                correlation_id = %ctx.correlation_id,
1159                event_type = ?event_type,
1160                "No relevant agents for event, allowing request"
1161            );
1162            return Ok(AgentDecision::default_allow());
1163        }
1164
1165        debug!(
1166            correlation_id = %ctx.correlation_id,
1167            event_type = ?event_type,
1168            agent_count = agent_info.len(),
1169            agent_ids = ?agent_info.iter().map(|(a, _, _)| a.id()).collect::<Vec<_>>(),
1170            "Processing event through agents in parallel"
1171        );
1172
1173        // Spawn all agent calls concurrently
1174        let futures: Vec<_> = agent_info
1175            .iter()
1176            .map(|(agent, filter_failure_mode, semaphore)| {
1177                let agent = Arc::clone(agent);
1178                let filter_failure_mode = *filter_failure_mode;
1179                let semaphore = semaphore.clone();
1180                let correlation_id = ctx.correlation_id.clone();
1181
1182                async move {
1183                    // Acquire per-agent semaphore permit (queue isolation)
1184                    let _permit = if let Some(sem) = semaphore {
1185                        match sem.acquire_owned().await {
1186                            Ok(permit) => Some(permit),
1187                            Err(_) => {
1188                                error!(
1189                                    correlation_id = %correlation_id,
1190                                    agent_id = %agent.id(),
1191                                    "Failed to acquire agent semaphore permit"
1192                                );
1193                                return Err((
1194                                    agent.id().to_string(),
1195                                    filter_failure_mode,
1196                                    "Failed to acquire permit".to_string(),
1197                                ));
1198                            }
1199                        }
1200                    } else {
1201                        None
1202                    };
1203
1204                    // Check circuit breaker
1205                    if !agent.circuit_breaker().is_closed() {
1206                        warn!(
1207                            agent_id = %agent.id(),
1208                            correlation_id = %correlation_id,
1209                            filter_failure_mode = ?filter_failure_mode,
1210                            "Circuit breaker open, skipping agent"
1211                        );
1212                        return Err((
1213                            agent.id().to_string(),
1214                            filter_failure_mode,
1215                            "Circuit breaker open".to_string(),
1216                        ));
1217                    }
1218
1219                    // Call agent with timeout
1220                    let start = Instant::now();
1221                    let timeout_duration = Duration::from_millis(agent.timeout_ms());
1222
1223                    match timeout(timeout_duration, agent.call_event(event_type, event)).await {
1224                        Ok(Ok(response)) => {
1225                            let duration = start.elapsed();
1226                            agent.record_success(duration).await;
1227                            trace!(
1228                                correlation_id = %correlation_id,
1229                                agent_id = %agent.id(),
1230                                duration_ms = duration.as_millis(),
1231                                "Parallel agent call succeeded"
1232                            );
1233                            Ok((agent.id().to_string(), response))
1234                        }
1235                        Ok(Err(e)) => {
1236                            agent.record_failure().await;
1237                            error!(
1238                                agent_id = %agent.id(),
1239                                correlation_id = %correlation_id,
1240                                error = %e,
1241                                duration_ms = start.elapsed().as_millis(),
1242                                filter_failure_mode = ?filter_failure_mode,
1243                                "Parallel agent call failed"
1244                            );
1245                            Err((
1246                                agent.id().to_string(),
1247                                filter_failure_mode,
1248                                format!("Agent error: {}", e),
1249                            ))
1250                        }
1251                        Err(_) => {
1252                            agent.record_timeout().await;
1253                            warn!(
1254                                agent_id = %agent.id(),
1255                                correlation_id = %correlation_id,
1256                                timeout_ms = agent.timeout_ms(),
1257                                filter_failure_mode = ?filter_failure_mode,
1258                                "Parallel agent call timed out"
1259                            );
1260                            Err((
1261                                agent.id().to_string(),
1262                                filter_failure_mode,
1263                                "Timeout".to_string(),
1264                            ))
1265                        }
1266                    }
1267                }
1268            })
1269            .collect();
1270
1271        // Execute all agent calls in parallel
1272        let results = join_all(futures).await;
1273
1274        // Process results and merge decisions
1275        let mut combined_decision = AgentDecision::default_allow();
1276        let mut blocking_error: Option<AgentDecision> = None;
1277
1278        for result in results {
1279            match result {
1280                Ok((agent_id, response)) => {
1281                    let decision: AgentDecision = response.into();
1282
1283                    // Check for blocking decision
1284                    if !decision.is_allow() {
1285                        debug!(
1286                            correlation_id = %ctx.correlation_id,
1287                            agent_id = %agent_id,
1288                            decision = ?decision,
1289                            "Agent returned blocking decision"
1290                        );
1291                        // Return first blocking decision immediately
1292                        return Ok(decision);
1293                    }
1294
1295                    combined_decision.merge(decision);
1296                }
1297                Err((agent_id, failure_mode, reason)) => {
1298                    // Handle failure based on filter's failure mode
1299                    if failure_mode == FailureMode::Closed && blocking_error.is_none() {
1300                        debug!(
1301                            correlation_id = %ctx.correlation_id,
1302                            agent_id = %agent_id,
1303                            reason = %reason,
1304                            "Agent failure in fail-closed mode"
1305                        );
1306                        // Store blocking error but continue processing other results
1307                        // in case another agent returned a more specific block
1308                        let status = if reason.contains("Timeout") { 504 } else { 503 };
1309                        let message = if reason.contains("Timeout") {
1310                            "Gateway timeout"
1311                        } else {
1312                            "Service unavailable"
1313                        };
1314                        blocking_error = Some(AgentDecision::block(status, message));
1315                    } else {
1316                        // Fail-open: log and continue
1317                        debug!(
1318                            correlation_id = %ctx.correlation_id,
1319                            agent_id = %agent_id,
1320                            reason = %reason,
1321                            "Agent failure in fail-open mode, continuing"
1322                        );
1323                    }
1324                }
1325            }
1326        }
1327
1328        // If we have a fail-closed error and no explicit block, return the error
1329        if let Some(error_decision) = blocking_error {
1330            return Ok(error_decision);
1331        }
1332
1333        trace!(
1334            correlation_id = %ctx.correlation_id,
1335            decision = ?combined_decision,
1336            agents_processed = agent_info.len(),
1337            "Parallel agent event processing completed"
1338        );
1339
1340        Ok(combined_decision)
1341    }
1342
1343    /// Initialize agent connections.
1344    pub async fn initialize(&self) -> SentinelResult<()> {
1345        let agents = self.agents.read().await;
1346
1347        info!(agent_count = agents.len(), "Initializing agent connections");
1348
1349        let mut initialized_count = 0;
1350        let mut failed_count = 0;
1351
1352        for (id, agent) in agents.iter() {
1353            debug!(agent_id = %id, "Initializing agent connection");
1354            if let Err(e) = agent.initialize().await {
1355                error!(
1356                    agent_id = %id,
1357                    error = %e,
1358                    "Failed to initialize agent"
1359                );
1360                failed_count += 1;
1361                // Continue with other agents
1362            } else {
1363                trace!(agent_id = %id, "Agent initialized successfully");
1364                initialized_count += 1;
1365            }
1366        }
1367
1368        info!(
1369            initialized = initialized_count,
1370            failed = failed_count,
1371            total = agents.len(),
1372            "Agent initialization complete"
1373        );
1374
1375        Ok(())
1376    }
1377
1378    /// Shutdown all agents.
1379    pub async fn shutdown(&self) {
1380        let agents = self.agents.read().await;
1381
1382        info!(agent_count = agents.len(), "Shutting down agent manager");
1383
1384        for (id, agent) in agents.iter() {
1385            debug!(agent_id = %id, "Shutting down agent");
1386            agent.shutdown().await;
1387            trace!(agent_id = %id, "Agent shutdown complete");
1388        }
1389
1390        info!("Agent manager shutdown complete");
1391    }
1392
1393    /// Get agent metrics.
1394    pub fn metrics(&self) -> &AgentMetrics {
1395        &self.metrics
1396    }
1397
1398    /// Get agent IDs that handle a specific event type.
1399    ///
1400    /// This is useful for pre-filtering agents before making calls,
1401    /// e.g., to check if any agents handle WebSocket frames.
1402    pub fn get_agents_for_event(&self, event_type: EventType) -> Vec<String> {
1403        // Use try_read to avoid blocking - return empty if lock is held
1404        // This is acceptable since this is only used for informational purposes
1405        if let Ok(agents) = self.agents.try_read() {
1406            agents
1407                .values()
1408                .filter(|agent| agent.handles_event(event_type))
1409                .map(|agent| agent.id().to_string())
1410                .collect()
1411        } else {
1412            Vec::new()
1413        }
1414    }
1415
1416    /// Get pool metrics collectors from all v2 agents.
1417    ///
1418    /// Returns a vector of (agent_id, MetricsCollector) pairs for all v2 agents.
1419    /// These can be registered with the MetricsManager to include agent pool
1420    /// metrics in the /metrics endpoint output.
1421    pub async fn get_v2_pool_metrics(&self) -> Vec<(String, Arc<MetricsCollector>)> {
1422        let agents = self.agents.read().await;
1423        agents
1424            .iter()
1425            .filter_map(|(id, agent)| {
1426                if let UnifiedAgent::V2(v2_agent) = agent.as_ref() {
1427                    Some((id.clone(), v2_agent.pool_metrics_collector_arc()))
1428                } else {
1429                    None
1430                }
1431            })
1432            .collect()
1433    }
1434
1435    /// Export prometheus metrics from all v2 agent pools.
1436    ///
1437    /// Returns the combined prometheus-formatted metrics from all v2 agent pools.
1438    pub async fn export_v2_pool_metrics(&self) -> String {
1439        let agents = self.agents.read().await;
1440        let mut output = String::new();
1441
1442        for (id, agent) in agents.iter() {
1443            if let UnifiedAgent::V2(v2_agent) = agent.as_ref() {
1444                let pool_metrics = v2_agent.export_prometheus();
1445                if !pool_metrics.is_empty() {
1446                    output.push_str(&format!("\n# Agent pool metrics: {}\n", id));
1447                    output.push_str(&pool_metrics);
1448                }
1449            }
1450        }
1451
1452        output
1453    }
1454
1455    /// Get a v2 agent's metrics collector by ID.
1456    ///
1457    /// Returns None if the agent doesn't exist or is not a v2 agent.
1458    pub async fn get_v2_metrics_collector(&self, agent_id: &str) -> Option<Arc<MetricsCollector>> {
1459        let agents = self.agents.read().await;
1460        if let Some(agent) = agents.get(agent_id) {
1461            if let UnifiedAgent::V2(v2_agent) = agent.as_ref() {
1462                Some(v2_agent.pool_metrics_collector_arc())
1463            } else {
1464                None
1465            }
1466        } else {
1467            None
1468        }
1469    }
1470}