sentinel_proxy/agents/
manager.rs

1//! Agent manager for coordinating external processing agents.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use base64::{engine::general_purpose::STANDARD, Engine as _};
8use pingora_timeout::timeout;
9use sentinel_agent_protocol::{
10    v2::MetricsCollector,
11    AgentResponse, EventType, RequestBodyChunkEvent, RequestHeadersEvent, ResponseBodyChunkEvent,
12    ResponseHeadersEvent, WebSocketFrameEvent,
13};
14use sentinel_common::{
15    errors::{SentinelError, SentinelResult},
16    types::CircuitBreakerConfig,
17    CircuitBreaker,
18};
19use futures::future::join_all;
20use sentinel_config::{AgentConfig, AgentProtocolVersion, FailureMode};
21use tokio::sync::{RwLock, Semaphore};
22use tracing::{debug, error, info, trace, warn};
23
24use super::agent::Agent;
25use super::agent_v2::AgentV2;
26use super::context::AgentCallContext;
27use super::decision::AgentDecision;
28use super::metrics::AgentMetrics;
29use super::pool::AgentConnectionPool;
30
31/// Unified agent wrapper supporting both v1 and v2 protocols.
32pub enum UnifiedAgent {
33    V1(Arc<Agent>),
34    V2(Arc<AgentV2>),
35}
36
37impl UnifiedAgent {
38    /// Get the agent ID.
39    pub fn id(&self) -> &str {
40        match self {
41            UnifiedAgent::V1(agent) => agent.id(),
42            UnifiedAgent::V2(agent) => agent.id(),
43        }
44    }
45
46    /// Get the agent's circuit breaker.
47    pub fn circuit_breaker(&self) -> &CircuitBreaker {
48        match self {
49            UnifiedAgent::V1(agent) => agent.circuit_breaker(),
50            UnifiedAgent::V2(agent) => agent.circuit_breaker(),
51        }
52    }
53
54    /// Get the agent's failure mode.
55    pub fn failure_mode(&self) -> FailureMode {
56        match self {
57            UnifiedAgent::V1(agent) => agent.failure_mode(),
58            UnifiedAgent::V2(agent) => agent.failure_mode(),
59        }
60    }
61
62    /// Get the agent's timeout in milliseconds.
63    pub fn timeout_ms(&self) -> u64 {
64        match self {
65            UnifiedAgent::V1(agent) => agent.timeout_ms(),
66            UnifiedAgent::V2(agent) => agent.timeout_ms(),
67        }
68    }
69
70    /// Check if agent handles a specific event type.
71    pub fn handles_event(&self, event_type: EventType) -> bool {
72        match self {
73            UnifiedAgent::V1(agent) => agent.handles_event(event_type),
74            UnifiedAgent::V2(agent) => agent.handles_event(event_type),
75        }
76    }
77
78    /// Initialize agent connection(s).
79    pub async fn initialize(&self) -> SentinelResult<()> {
80        match self {
81            UnifiedAgent::V1(agent) => agent.initialize().await,
82            UnifiedAgent::V2(agent) => agent.initialize().await,
83        }
84    }
85
86    /// Call agent with event.
87    ///
88    /// For V2 agents, this dispatches to the appropriate typed method based on
89    /// event type. The event is serialized and deserialized to convert between
90    /// the generic type and the specific event struct.
91    pub async fn call_event<T: serde::Serialize>(
92        &self,
93        event_type: EventType,
94        event: &T,
95    ) -> SentinelResult<AgentResponse> {
96        match self {
97            UnifiedAgent::V1(agent) => agent.call_event(event_type, event).await,
98            UnifiedAgent::V2(agent) => {
99                // V2 uses typed methods - convert to appropriate event type
100                let json = serde_json::to_value(event).map_err(|e| SentinelError::Agent {
101                    agent: agent.id().to_string(),
102                    message: format!("Failed to serialize event: {}", e),
103                    event: format!("{:?}", event_type),
104                    source: None,
105                })?;
106
107                match event_type {
108                    EventType::RequestHeaders => {
109                        let typed_event: RequestHeadersEvent =
110                            serde_json::from_value(json).map_err(|e| SentinelError::Agent {
111                                agent: agent.id().to_string(),
112                                message: format!("Failed to deserialize RequestHeadersEvent: {}", e),
113                                event: format!("{:?}", event_type),
114                                source: None,
115                            })?;
116                        agent.call_request_headers(&typed_event).await
117                    }
118                    EventType::RequestBodyChunk => {
119                        let typed_event: RequestBodyChunkEvent =
120                            serde_json::from_value(json).map_err(|e| SentinelError::Agent {
121                                agent: agent.id().to_string(),
122                                message: format!("Failed to deserialize RequestBodyChunkEvent: {}", e),
123                                event: format!("{:?}", event_type),
124                                source: None,
125                            })?;
126                        agent.call_request_body_chunk(&typed_event).await
127                    }
128                    EventType::ResponseHeaders => {
129                        let typed_event: ResponseHeadersEvent =
130                            serde_json::from_value(json).map_err(|e| SentinelError::Agent {
131                                agent: agent.id().to_string(),
132                                message: format!("Failed to deserialize ResponseHeadersEvent: {}", e),
133                                event: format!("{:?}", event_type),
134                                source: None,
135                            })?;
136                        agent.call_response_headers(&typed_event).await
137                    }
138                    EventType::ResponseBodyChunk => {
139                        let typed_event: ResponseBodyChunkEvent =
140                            serde_json::from_value(json).map_err(|e| SentinelError::Agent {
141                                agent: agent.id().to_string(),
142                                message: format!("Failed to deserialize ResponseBodyChunkEvent: {}", e),
143                                event: format!("{:?}", event_type),
144                                source: None,
145                            })?;
146                        agent.call_response_body_chunk(&typed_event).await
147                    }
148                    _ => {
149                        // For unsupported event types, return error
150                        Err(SentinelError::Agent {
151                            agent: agent.id().to_string(),
152                            message: format!("V2 does not support event type {:?}", event_type),
153                            event: format!("{:?}", event_type),
154                            source: None,
155                        })
156                    }
157                }
158            }
159        }
160    }
161
162    /// Call agent with request headers event (v2-optimized).
163    pub async fn call_request_headers(
164        &self,
165        event: &RequestHeadersEvent,
166    ) -> SentinelResult<AgentResponse> {
167        match self {
168            UnifiedAgent::V1(agent) => agent.call_event(EventType::RequestHeaders, event).await,
169            UnifiedAgent::V2(agent) => agent.call_request_headers(event).await,
170        }
171    }
172
173    /// Call agent with request body chunk event (v2-optimized).
174    pub async fn call_request_body_chunk(
175        &self,
176        event: &RequestBodyChunkEvent,
177    ) -> SentinelResult<AgentResponse> {
178        match self {
179            UnifiedAgent::V1(agent) => agent.call_event(EventType::RequestBodyChunk, event).await,
180            UnifiedAgent::V2(agent) => agent.call_request_body_chunk(event).await,
181        }
182    }
183
184    /// Call agent with response headers event (v2-optimized).
185    pub async fn call_response_headers(
186        &self,
187        event: &ResponseHeadersEvent,
188    ) -> SentinelResult<AgentResponse> {
189        match self {
190            UnifiedAgent::V1(agent) => agent.call_event(EventType::ResponseHeaders, event).await,
191            UnifiedAgent::V2(agent) => agent.call_response_headers(event).await,
192        }
193    }
194
195    /// Call agent with response body chunk event (v2-optimized).
196    pub async fn call_response_body_chunk(
197        &self,
198        event: &ResponseBodyChunkEvent,
199    ) -> SentinelResult<AgentResponse> {
200        match self {
201            UnifiedAgent::V1(agent) => agent.call_event(EventType::ResponseBodyChunk, event).await,
202            UnifiedAgent::V2(agent) => agent.call_response_body_chunk(event).await,
203        }
204    }
205
206    /// Record successful call.
207    pub async fn record_success(&self, duration: Duration) {
208        match self {
209            UnifiedAgent::V1(agent) => agent.record_success(duration).await,
210            UnifiedAgent::V2(agent) => agent.record_success(duration),
211        }
212    }
213
214    /// Record failed call.
215    pub async fn record_failure(&self) {
216        match self {
217            UnifiedAgent::V1(agent) => agent.record_failure().await,
218            UnifiedAgent::V2(agent) => agent.record_failure(),
219        }
220    }
221
222    /// Record timeout.
223    pub async fn record_timeout(&self) {
224        match self {
225            UnifiedAgent::V1(agent) => agent.record_timeout().await,
226            UnifiedAgent::V2(agent) => agent.record_timeout(),
227        }
228    }
229
230    /// Shutdown agent.
231    pub async fn shutdown(&self) {
232        match self {
233            UnifiedAgent::V1(agent) => agent.shutdown().await,
234            UnifiedAgent::V2(agent) => agent.shutdown().await,
235        }
236    }
237
238    /// Check if this is a v2 agent.
239    pub fn is_v2(&self) -> bool {
240        matches!(self, UnifiedAgent::V2(_))
241    }
242}
243
244/// Agent manager handling all external agents.
245///
246/// Supports both v1 and v2 protocol agents. V1 agents use simple request/response,
247/// while v2 agents support bidirectional streaming with capabilities, health
248/// reporting, metrics export, and flow control.
249pub struct AgentManager {
250    /// Configured agents (unified wrapper for v1 and v2)
251    agents: Arc<RwLock<HashMap<String, Arc<UnifiedAgent>>>>,
252    /// Connection pools for v1 agents only
253    connection_pools: Arc<RwLock<HashMap<String, Arc<AgentConnectionPool>>>>,
254    /// Circuit breakers per agent (v1 only - v2 has circuit breakers in pool)
255    circuit_breakers: Arc<RwLock<HashMap<String, Arc<CircuitBreaker>>>>,
256    /// Global agent metrics
257    metrics: Arc<AgentMetrics>,
258    /// Per-agent semaphores for queue isolation (prevents noisy neighbor problem)
259    agent_semaphores: Arc<RwLock<HashMap<String, Arc<Semaphore>>>>,
260}
261
262impl AgentManager {
263    /// Create new agent manager.
264    ///
265    /// Each agent gets its own semaphore for queue isolation, preventing a slow
266    /// agent from affecting other agents (noisy neighbor problem). The concurrency
267    /// limit is configured per-agent via `max_concurrent_calls` in the agent config.
268    ///
269    /// Supports both v1 and v2 protocol agents based on the `protocol_version` field.
270    pub async fn new(agents: Vec<AgentConfig>) -> SentinelResult<Self> {
271        info!(agent_count = agents.len(), "Creating agent manager");
272
273        let mut agent_map = HashMap::new();
274        let mut pools = HashMap::new();
275        let mut breakers = HashMap::new();
276        let mut semaphores = HashMap::new();
277
278        let mut v1_count = 0;
279        let mut v2_count = 0;
280
281        for config in agents {
282            debug!(
283                agent_id = %config.id,
284                transport = ?config.transport,
285                timeout_ms = config.timeout_ms,
286                failure_mode = ?config.failure_mode,
287                max_concurrent_calls = config.max_concurrent_calls,
288                protocol_version = ?config.protocol_version,
289                "Configuring agent"
290            );
291
292            // Create per-agent semaphore for queue isolation
293            let semaphore = Arc::new(Semaphore::new(config.max_concurrent_calls));
294
295            let unified_agent = match config.protocol_version {
296                AgentProtocolVersion::V1 => {
297                    // V1: Create pool and circuit breaker externally
298                    let pool = Arc::new(AgentConnectionPool::new(
299                        10, // max connections
300                        2,  // min idle
301                        5,  // max idle
302                        Duration::from_secs(60),
303                    ));
304
305                    let circuit_breaker = Arc::new(CircuitBreaker::new(
306                        config
307                            .circuit_breaker
308                            .clone()
309                            .unwrap_or_else(CircuitBreakerConfig::default),
310                    ));
311
312                    trace!(
313                        agent_id = %config.id,
314                        max_concurrent_calls = config.max_concurrent_calls,
315                        "Creating v1 agent instance with isolated queue"
316                    );
317
318                    let agent = Arc::new(Agent::new(
319                        config.clone(),
320                        Arc::clone(&pool),
321                        Arc::clone(&circuit_breaker),
322                    ));
323
324                    pools.insert(config.id.clone(), pool);
325                    breakers.insert(config.id.clone(), circuit_breaker);
326                    v1_count += 1;
327
328                    Arc::new(UnifiedAgent::V1(agent))
329                }
330                AgentProtocolVersion::V2 => {
331                    // V2: Pool and circuit breaker are internal to AgentV2
332                    let circuit_breaker = Arc::new(CircuitBreaker::new(
333                        config
334                            .circuit_breaker
335                            .clone()
336                            .unwrap_or_else(CircuitBreakerConfig::default),
337                    ));
338
339                    trace!(
340                        agent_id = %config.id,
341                        max_concurrent_calls = config.max_concurrent_calls,
342                        pool_config = ?config.pool,
343                        "Creating v2 agent instance with internal pool"
344                    );
345
346                    let agent = Arc::new(AgentV2::new(
347                        config.clone(),
348                        circuit_breaker,
349                    ));
350
351                    v2_count += 1;
352
353                    Arc::new(UnifiedAgent::V2(agent))
354                }
355            };
356
357            agent_map.insert(config.id.clone(), unified_agent);
358            semaphores.insert(config.id.clone(), semaphore);
359
360            debug!(
361                agent_id = %config.id,
362                protocol_version = ?config.protocol_version,
363                "Agent configured successfully"
364            );
365        }
366
367        info!(
368            configured_agents = agent_map.len(),
369            v1_agents = v1_count,
370            v2_agents = v2_count,
371            "Agent manager created successfully with per-agent queue isolation"
372        );
373
374        Ok(Self {
375            agents: Arc::new(RwLock::new(agent_map)),
376            connection_pools: Arc::new(RwLock::new(pools)),
377            circuit_breakers: Arc::new(RwLock::new(breakers)),
378            metrics: Arc::new(AgentMetrics::default()),
379            agent_semaphores: Arc::new(RwLock::new(semaphores)),
380        })
381    }
382
383    /// Process request headers through agents.
384    ///
385    /// # Arguments
386    /// * `ctx` - Agent call context with correlation ID and metadata
387    /// * `headers` - Request headers to send to agents
388    /// * `route_agents` - List of (agent_id, failure_mode) tuples from filter chain
389    pub async fn process_request_headers(
390        &self,
391        ctx: &AgentCallContext,
392        headers: &HashMap<String, Vec<String>>,
393        route_agents: &[(String, FailureMode)],
394    ) -> SentinelResult<AgentDecision> {
395        let event = RequestHeadersEvent {
396            metadata: ctx.metadata.clone(),
397            method: headers
398                .get(":method")
399                .and_then(|v| v.first())
400                .unwrap_or(&"GET".to_string())
401                .clone(),
402            uri: headers
403                .get(":path")
404                .and_then(|v| v.first())
405                .unwrap_or(&"/".to_string())
406                .clone(),
407            headers: headers.clone(),
408        };
409
410        // Use parallel processing for better latency with multiple agents
411        self.process_event_parallel(EventType::RequestHeaders, &event, route_agents, ctx)
412            .await
413    }
414
415    /// Process request body chunk through agents.
416    pub async fn process_request_body(
417        &self,
418        ctx: &AgentCallContext,
419        data: &[u8],
420        is_last: bool,
421        route_agents: &[String],
422    ) -> SentinelResult<AgentDecision> {
423        // Check body size limits
424        let max_size = 1024 * 1024; // 1MB default
425        if data.len() > max_size {
426            warn!(
427                correlation_id = %ctx.correlation_id,
428                size = data.len(),
429                "Request body exceeds agent inspection limit"
430            );
431            return Ok(AgentDecision::default_allow());
432        }
433
434        let event = RequestBodyChunkEvent {
435            correlation_id: ctx.correlation_id.to_string(),
436            data: STANDARD.encode(data),
437            is_last,
438            total_size: ctx.request_body.as_ref().map(|b| b.len()),
439            chunk_index: 0, // Buffer mode sends entire body as single chunk
440            bytes_received: data.len(),
441        };
442
443        self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
444            .await
445    }
446
447    /// Process a single request body chunk through agents (streaming mode).
448    ///
449    /// Unlike `process_request_body` which is used for buffered mode, this method
450    /// is designed for streaming where chunks are sent individually as they arrive.
451    pub async fn process_request_body_streaming(
452        &self,
453        ctx: &AgentCallContext,
454        data: &[u8],
455        is_last: bool,
456        chunk_index: u32,
457        bytes_received: usize,
458        total_size: Option<usize>,
459        route_agents: &[String],
460    ) -> SentinelResult<AgentDecision> {
461        trace!(
462            correlation_id = %ctx.correlation_id,
463            chunk_index = chunk_index,
464            chunk_size = data.len(),
465            bytes_received = bytes_received,
466            is_last = is_last,
467            "Processing streaming request body chunk"
468        );
469
470        let event = RequestBodyChunkEvent {
471            correlation_id: ctx.correlation_id.to_string(),
472            data: STANDARD.encode(data),
473            is_last,
474            total_size,
475            chunk_index,
476            bytes_received,
477        };
478
479        self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
480            .await
481    }
482
483    /// Process a single response body chunk through agents (streaming mode).
484    pub async fn process_response_body_streaming(
485        &self,
486        ctx: &AgentCallContext,
487        data: &[u8],
488        is_last: bool,
489        chunk_index: u32,
490        bytes_sent: usize,
491        total_size: Option<usize>,
492        route_agents: &[String],
493    ) -> SentinelResult<AgentDecision> {
494        trace!(
495            correlation_id = %ctx.correlation_id,
496            chunk_index = chunk_index,
497            chunk_size = data.len(),
498            bytes_sent = bytes_sent,
499            is_last = is_last,
500            "Processing streaming response body chunk"
501        );
502
503        let event = ResponseBodyChunkEvent {
504            correlation_id: ctx.correlation_id.to_string(),
505            data: STANDARD.encode(data),
506            is_last,
507            total_size,
508            chunk_index,
509            bytes_sent,
510        };
511
512        self.process_event(EventType::ResponseBodyChunk, &event, route_agents, ctx)
513            .await
514    }
515
516    /// Process response headers through agents.
517    pub async fn process_response_headers(
518        &self,
519        ctx: &AgentCallContext,
520        status: u16,
521        headers: &HashMap<String, Vec<String>>,
522        route_agents: &[String],
523    ) -> SentinelResult<AgentDecision> {
524        let event = ResponseHeadersEvent {
525            correlation_id: ctx.correlation_id.to_string(),
526            status,
527            headers: headers.clone(),
528        };
529
530        self.process_event(EventType::ResponseHeaders, &event, route_agents, ctx)
531            .await
532    }
533
534    /// Process a WebSocket frame through agents.
535    ///
536    /// This is used for WebSocket frame inspection after an upgrade.
537    /// Returns the agent response directly to allow the caller to access
538    /// the websocket_decision field.
539    pub async fn process_websocket_frame(
540        &self,
541        route_id: &str,
542        event: WebSocketFrameEvent,
543    ) -> SentinelResult<AgentResponse> {
544        trace!(
545            correlation_id = %event.correlation_id,
546            route_id = %route_id,
547            frame_index = event.frame_index,
548            opcode = %event.opcode,
549            "Processing WebSocket frame through agents"
550        );
551
552        // Get relevant agents for this route that handle WebSocket frames
553        let agents = self.agents.read().await;
554        let relevant_agents: Vec<_> = agents
555            .values()
556            .filter(|agent| agent.handles_event(EventType::WebSocketFrame))
557            .collect();
558
559        if relevant_agents.is_empty() {
560            trace!(
561                correlation_id = %event.correlation_id,
562                "No agents handle WebSocket frames, allowing"
563            );
564            return Ok(AgentResponse::websocket_allow());
565        }
566
567        debug!(
568            correlation_id = %event.correlation_id,
569            route_id = %route_id,
570            agent_count = relevant_agents.len(),
571            "Processing WebSocket frame through agents"
572        );
573
574        // Process through each agent sequentially
575        for agent in relevant_agents {
576            // Check circuit breaker
577            if !agent.circuit_breaker().is_closed() {
578                warn!(
579                    agent_id = %agent.id(),
580                    correlation_id = %event.correlation_id,
581                    failure_mode = ?agent.failure_mode(),
582                    "Circuit breaker open, skipping agent for WebSocket frame"
583                );
584
585                if agent.failure_mode() == FailureMode::Closed {
586                    debug!(
587                        correlation_id = %event.correlation_id,
588                        agent_id = %agent.id(),
589                        "Closing WebSocket due to circuit breaker (fail-closed mode)"
590                    );
591                    return Ok(AgentResponse::websocket_close(
592                        1011,
593                        "Service unavailable".to_string(),
594                    ));
595                }
596                continue;
597            }
598
599            // Call agent with timeout
600            let start = Instant::now();
601            let timeout_duration = Duration::from_millis(agent.timeout_ms());
602
603            match timeout(
604                timeout_duration,
605                agent.call_event(EventType::WebSocketFrame, &event),
606            )
607            .await
608            {
609                Ok(Ok(response)) => {
610                    let duration = start.elapsed();
611                    agent.record_success(duration).await;
612
613                    trace!(
614                        correlation_id = %event.correlation_id,
615                        agent_id = %agent.id(),
616                        duration_ms = duration.as_millis(),
617                        "WebSocket frame agent call succeeded"
618                    );
619
620                    // If agent returned a WebSocket decision that's not Allow, return immediately
621                    if let Some(ref ws_decision) = response.websocket_decision {
622                        if !matches!(
623                            ws_decision,
624                            sentinel_agent_protocol::WebSocketDecision::Allow
625                        ) {
626                            debug!(
627                                correlation_id = %event.correlation_id,
628                                agent_id = %agent.id(),
629                                decision = ?ws_decision,
630                                "Agent returned non-allow WebSocket decision"
631                            );
632                            return Ok(response);
633                        }
634                    }
635                }
636                Ok(Err(e)) => {
637                    agent.record_failure().await;
638                    error!(
639                        agent_id = %agent.id(),
640                        correlation_id = %event.correlation_id,
641                        error = %e,
642                        duration_ms = start.elapsed().as_millis(),
643                        failure_mode = ?agent.failure_mode(),
644                        "WebSocket frame agent call failed"
645                    );
646
647                    if agent.failure_mode() == FailureMode::Closed {
648                        return Ok(AgentResponse::websocket_close(
649                            1011,
650                            "Agent error".to_string(),
651                        ));
652                    }
653                }
654                Err(_) => {
655                    agent.record_timeout().await;
656                    warn!(
657                        agent_id = %agent.id(),
658                        correlation_id = %event.correlation_id,
659                        timeout_ms = agent.timeout_ms(),
660                        failure_mode = ?agent.failure_mode(),
661                        "WebSocket frame agent call timed out"
662                    );
663
664                    if agent.failure_mode() == FailureMode::Closed {
665                        return Ok(AgentResponse::websocket_close(
666                            1011,
667                            "Gateway timeout".to_string(),
668                        ));
669                    }
670                }
671            }
672        }
673
674        // All agents allowed the frame
675        Ok(AgentResponse::websocket_allow())
676    }
677
678    /// Process an event through relevant agents.
679    async fn process_event<T: serde::Serialize>(
680        &self,
681        event_type: EventType,
682        event: &T,
683        route_agents: &[String],
684        ctx: &AgentCallContext,
685    ) -> SentinelResult<AgentDecision> {
686        trace!(
687            correlation_id = %ctx.correlation_id,
688            event_type = ?event_type,
689            route_agents = ?route_agents,
690            "Starting agent event processing"
691        );
692
693        // Get relevant agents for this route and event type
694        let agents = self.agents.read().await;
695        let relevant_agents: Vec<_> = route_agents
696            .iter()
697            .filter_map(|id| agents.get(id))
698            .filter(|agent| agent.handles_event(event_type))
699            .collect();
700
701        if relevant_agents.is_empty() {
702            trace!(
703                correlation_id = %ctx.correlation_id,
704                event_type = ?event_type,
705                "No relevant agents for event, allowing request"
706            );
707            return Ok(AgentDecision::default_allow());
708        }
709
710        debug!(
711            correlation_id = %ctx.correlation_id,
712            event_type = ?event_type,
713            agent_count = relevant_agents.len(),
714            agent_ids = ?relevant_agents.iter().map(|a| a.id()).collect::<Vec<_>>(),
715            "Processing event through agents"
716        );
717
718        // Process through each agent sequentially
719        let mut combined_decision = AgentDecision::default_allow();
720
721        for (agent_index, agent) in relevant_agents.iter().enumerate() {
722            trace!(
723                correlation_id = %ctx.correlation_id,
724                agent_id = %agent.id(),
725                agent_index = agent_index,
726                event_type = ?event_type,
727                "Processing event through agent"
728            );
729
730            // Acquire per-agent semaphore permit (queue isolation)
731            let semaphores = self.agent_semaphores.read().await;
732            let agent_semaphore = semaphores.get(agent.id()).cloned();
733            drop(semaphores); // Release lock before awaiting
734
735            let _permit = match agent_semaphore {
736                Some(semaphore) => {
737                    trace!(
738                        correlation_id = %ctx.correlation_id,
739                        agent_id = %agent.id(),
740                        "Acquiring per-agent semaphore permit"
741                    );
742                    Some(semaphore.acquire_owned().await.map_err(|_| {
743                        error!(
744                            correlation_id = %ctx.correlation_id,
745                            agent_id = %agent.id(),
746                            "Failed to acquire agent call semaphore permit"
747                        );
748                        SentinelError::Internal {
749                            message: "Failed to acquire agent call permit".to_string(),
750                            correlation_id: Some(ctx.correlation_id.to_string()),
751                            source: None,
752                        }
753                    })?)
754                }
755                None => {
756                    // No semaphore found (shouldn't happen, but fail gracefully)
757                    warn!(
758                        correlation_id = %ctx.correlation_id,
759                        agent_id = %agent.id(),
760                        "No semaphore found for agent, proceeding without queue isolation"
761                    );
762                    None
763                }
764            };
765
766            // Check circuit breaker
767            if !agent.circuit_breaker().is_closed() {
768                warn!(
769                    agent_id = %agent.id(),
770                    correlation_id = %ctx.correlation_id,
771                    failure_mode = ?agent.failure_mode(),
772                    "Circuit breaker open, skipping agent"
773                );
774
775                // Handle based on failure mode
776                if agent.failure_mode() == FailureMode::Closed {
777                    debug!(
778                        correlation_id = %ctx.correlation_id,
779                        agent_id = %agent.id(),
780                        "Blocking request due to circuit breaker (fail-closed mode)"
781                    );
782                    return Ok(AgentDecision::block(503, "Service unavailable"));
783                }
784                continue;
785            }
786
787            // Call agent with timeout (using pingora-timeout for efficiency)
788            let start = Instant::now();
789            let timeout_duration = Duration::from_millis(agent.timeout_ms());
790
791            trace!(
792                correlation_id = %ctx.correlation_id,
793                agent_id = %agent.id(),
794                timeout_ms = agent.timeout_ms(),
795                "Calling agent"
796            );
797
798            match timeout(timeout_duration, agent.call_event(event_type, event)).await {
799                Ok(Ok(response)) => {
800                    let duration = start.elapsed();
801                    agent.record_success(duration).await;
802
803                    trace!(
804                        correlation_id = %ctx.correlation_id,
805                        agent_id = %agent.id(),
806                        duration_ms = duration.as_millis(),
807                        decision = ?response,
808                        "Agent call succeeded"
809                    );
810
811                    // Merge response into combined decision
812                    combined_decision.merge(response.into());
813
814                    // If decision is to block/redirect/challenge, stop processing
815                    if !combined_decision.is_allow() {
816                        debug!(
817                            correlation_id = %ctx.correlation_id,
818                            agent_id = %agent.id(),
819                            decision = ?combined_decision,
820                            "Agent returned blocking decision, stopping agent chain"
821                        );
822                        break;
823                    }
824                }
825                Ok(Err(e)) => {
826                    agent.record_failure().await;
827                    error!(
828                        agent_id = %agent.id(),
829                        correlation_id = %ctx.correlation_id,
830                        error = %e,
831                        duration_ms = start.elapsed().as_millis(),
832                        failure_mode = ?agent.failure_mode(),
833                        "Agent call failed"
834                    );
835
836                    if agent.failure_mode() == FailureMode::Closed {
837                        return Err(e);
838                    }
839                }
840                Err(_) => {
841                    agent.record_timeout().await;
842                    warn!(
843                        agent_id = %agent.id(),
844                        correlation_id = %ctx.correlation_id,
845                        timeout_ms = agent.timeout_ms(),
846                        failure_mode = ?agent.failure_mode(),
847                        "Agent call timed out"
848                    );
849
850                    if agent.failure_mode() == FailureMode::Closed {
851                        debug!(
852                            correlation_id = %ctx.correlation_id,
853                            agent_id = %agent.id(),
854                            "Blocking request due to timeout (fail-closed mode)"
855                        );
856                        return Ok(AgentDecision::block(504, "Gateway timeout"));
857                    }
858                }
859            }
860        }
861
862        trace!(
863            correlation_id = %ctx.correlation_id,
864            decision = ?combined_decision,
865            agents_processed = relevant_agents.len(),
866            "Agent event processing completed"
867        );
868
869        Ok(combined_decision)
870    }
871
872    /// Process an event through relevant agents with per-filter failure modes.
873    ///
874    /// This is the preferred method for processing events as it respects the
875    /// failure mode configured on each filter, not just the agent's default.
876    async fn process_event_with_failure_modes<T: serde::Serialize>(
877        &self,
878        event_type: EventType,
879        event: &T,
880        route_agents: &[(String, FailureMode)],
881        ctx: &AgentCallContext,
882    ) -> SentinelResult<AgentDecision> {
883        trace!(
884            correlation_id = %ctx.correlation_id,
885            event_type = ?event_type,
886            route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
887            "Starting agent event processing with failure modes"
888        );
889
890        // Get relevant agents for this route and event type, preserving failure modes
891        let agents = self.agents.read().await;
892        let relevant_agents: Vec<_> = route_agents
893            .iter()
894            .filter_map(|(id, failure_mode)| {
895                agents.get(id).map(|agent| (agent, *failure_mode))
896            })
897            .filter(|(agent, _)| agent.handles_event(event_type))
898            .collect();
899
900        if relevant_agents.is_empty() {
901            trace!(
902                correlation_id = %ctx.correlation_id,
903                event_type = ?event_type,
904                "No relevant agents for event, allowing request"
905            );
906            return Ok(AgentDecision::default_allow());
907        }
908
909        debug!(
910            correlation_id = %ctx.correlation_id,
911            event_type = ?event_type,
912            agent_count = relevant_agents.len(),
913            agent_ids = ?relevant_agents.iter().map(|(a, _)| a.id()).collect::<Vec<_>>(),
914            "Processing event through agents"
915        );
916
917        // Process through each agent sequentially
918        let mut combined_decision = AgentDecision::default_allow();
919
920        for (agent_index, (agent, filter_failure_mode)) in relevant_agents.iter().enumerate() {
921            trace!(
922                correlation_id = %ctx.correlation_id,
923                agent_id = %agent.id(),
924                agent_index = agent_index,
925                event_type = ?event_type,
926                filter_failure_mode = ?filter_failure_mode,
927                "Processing event through agent with filter failure mode"
928            );
929
930            // Acquire per-agent semaphore permit (queue isolation)
931            let semaphores = self.agent_semaphores.read().await;
932            let agent_semaphore = semaphores.get(agent.id()).cloned();
933            drop(semaphores); // Release lock before awaiting
934
935            let _permit = if let Some(semaphore) = agent_semaphore {
936                trace!(
937                    correlation_id = %ctx.correlation_id,
938                    agent_id = %agent.id(),
939                    "Acquiring per-agent semaphore permit"
940                );
941                Some(semaphore.acquire_owned().await.map_err(|_| {
942                    error!(
943                        correlation_id = %ctx.correlation_id,
944                        agent_id = %agent.id(),
945                        "Failed to acquire agent call semaphore permit"
946                    );
947                    SentinelError::Internal {
948                        message: "Failed to acquire agent call permit".to_string(),
949                        correlation_id: Some(ctx.correlation_id.to_string()),
950                        source: None,
951                    }
952                })?)
953            } else {
954                // No semaphore found (shouldn't happen, but fail gracefully)
955                warn!(
956                    correlation_id = %ctx.correlation_id,
957                    agent_id = %agent.id(),
958                    "No semaphore found for agent, proceeding without queue isolation"
959                );
960                None
961            };
962
963            // Check circuit breaker
964            if !agent.circuit_breaker().is_closed() {
965                warn!(
966                    agent_id = %agent.id(),
967                    correlation_id = %ctx.correlation_id,
968                    filter_failure_mode = ?filter_failure_mode,
969                    "Circuit breaker open, skipping agent"
970                );
971
972                // Handle based on filter's failure mode (not agent's default)
973                if *filter_failure_mode == FailureMode::Closed {
974                    debug!(
975                        correlation_id = %ctx.correlation_id,
976                        agent_id = %agent.id(),
977                        "Blocking request due to circuit breaker (filter fail-closed mode)"
978                    );
979                    return Ok(AgentDecision::block(503, "Service unavailable"));
980                }
981                // Fail-open: continue to next agent
982                continue;
983            }
984
985            // Call agent with timeout
986            let start = Instant::now();
987            let timeout_duration = Duration::from_millis(agent.timeout_ms());
988
989            trace!(
990                correlation_id = %ctx.correlation_id,
991                agent_id = %agent.id(),
992                timeout_ms = agent.timeout_ms(),
993                "Calling agent"
994            );
995
996            match timeout(timeout_duration, agent.call_event(event_type, event)).await {
997                Ok(Ok(response)) => {
998                    let duration = start.elapsed();
999                    agent.record_success(duration).await;
1000
1001                    trace!(
1002                        correlation_id = %ctx.correlation_id,
1003                        agent_id = %agent.id(),
1004                        duration_ms = duration.as_millis(),
1005                        decision = ?response,
1006                        "Agent call succeeded"
1007                    );
1008
1009                    // Merge response into combined decision
1010                    combined_decision.merge(response.into());
1011
1012                    // If decision is to block/redirect/challenge, stop processing
1013                    if !combined_decision.is_allow() {
1014                        debug!(
1015                            correlation_id = %ctx.correlation_id,
1016                            agent_id = %agent.id(),
1017                            decision = ?combined_decision,
1018                            "Agent returned blocking decision, stopping agent chain"
1019                        );
1020                        break;
1021                    }
1022                }
1023                Ok(Err(e)) => {
1024                    agent.record_failure().await;
1025                    error!(
1026                        agent_id = %agent.id(),
1027                        correlation_id = %ctx.correlation_id,
1028                        error = %e,
1029                        duration_ms = start.elapsed().as_millis(),
1030                        filter_failure_mode = ?filter_failure_mode,
1031                        "Agent call failed"
1032                    );
1033
1034                    // Use filter's failure mode, not agent's default
1035                    if *filter_failure_mode == FailureMode::Closed {
1036                        debug!(
1037                            correlation_id = %ctx.correlation_id,
1038                            agent_id = %agent.id(),
1039                            "Blocking request due to agent failure (filter fail-closed mode)"
1040                        );
1041                        return Ok(AgentDecision::block(503, "Agent unavailable"));
1042                    }
1043                    // Fail-open: continue to next agent (or proceed without this agent)
1044                    debug!(
1045                        correlation_id = %ctx.correlation_id,
1046                        agent_id = %agent.id(),
1047                        "Continuing despite agent failure (filter fail-open mode)"
1048                    );
1049                }
1050                Err(_) => {
1051                    agent.record_timeout().await;
1052                    warn!(
1053                        agent_id = %agent.id(),
1054                        correlation_id = %ctx.correlation_id,
1055                        timeout_ms = agent.timeout_ms(),
1056                        filter_failure_mode = ?filter_failure_mode,
1057                        "Agent call timed out"
1058                    );
1059
1060                    // Use filter's failure mode, not agent's default
1061                    if *filter_failure_mode == FailureMode::Closed {
1062                        debug!(
1063                            correlation_id = %ctx.correlation_id,
1064                            agent_id = %agent.id(),
1065                            "Blocking request due to timeout (filter fail-closed mode)"
1066                        );
1067                        return Ok(AgentDecision::block(504, "Gateway timeout"));
1068                    }
1069                    // Fail-open: continue to next agent
1070                    debug!(
1071                        correlation_id = %ctx.correlation_id,
1072                        agent_id = %agent.id(),
1073                        "Continuing despite timeout (filter fail-open mode)"
1074                    );
1075                }
1076            }
1077        }
1078
1079        trace!(
1080            correlation_id = %ctx.correlation_id,
1081            decision = ?combined_decision,
1082            agents_processed = relevant_agents.len(),
1083            "Agent event processing with failure modes completed"
1084        );
1085
1086        Ok(combined_decision)
1087    }
1088
1089    /// Process an event through relevant agents in parallel.
1090    ///
1091    /// This method executes all agent calls concurrently using `join_all`, which
1092    /// significantly improves latency when multiple agents are configured. The
1093    /// tradeoff is that if one agent blocks, other agents may still complete
1094    /// their work (slight resource waste in blocking scenarios).
1095    ///
1096    /// # Performance
1097    ///
1098    /// For N agents with latency L each:
1099    /// - Sequential: O(N * L)
1100    /// - Parallel: O(L) (assuming sufficient concurrency)
1101    ///
1102    /// This is the preferred method for most use cases.
1103    async fn process_event_parallel<T: serde::Serialize + Sync>(
1104        &self,
1105        event_type: EventType,
1106        event: &T,
1107        route_agents: &[(String, FailureMode)],
1108        ctx: &AgentCallContext,
1109    ) -> SentinelResult<AgentDecision> {
1110        trace!(
1111            correlation_id = %ctx.correlation_id,
1112            event_type = ?event_type,
1113            route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
1114            "Starting parallel agent event processing"
1115        );
1116
1117        // Get relevant agents for this route and event type
1118        let agents = self.agents.read().await;
1119        let semaphores = self.agent_semaphores.read().await;
1120
1121        // Collect agent info upfront to minimize lock duration
1122        let agent_info: Vec<_> = route_agents
1123            .iter()
1124            .filter_map(|(id, failure_mode)| {
1125                let agent = agents.get(id)?;
1126                if !agent.handles_event(event_type) {
1127                    return None;
1128                }
1129                let semaphore = semaphores.get(id).cloned();
1130                Some((Arc::clone(agent), *failure_mode, semaphore))
1131            })
1132            .collect();
1133
1134        // Release locks early
1135        drop(agents);
1136        drop(semaphores);
1137
1138        if agent_info.is_empty() {
1139            trace!(
1140                correlation_id = %ctx.correlation_id,
1141                event_type = ?event_type,
1142                "No relevant agents for event, allowing request"
1143            );
1144            return Ok(AgentDecision::default_allow());
1145        }
1146
1147        debug!(
1148            correlation_id = %ctx.correlation_id,
1149            event_type = ?event_type,
1150            agent_count = agent_info.len(),
1151            agent_ids = ?agent_info.iter().map(|(a, _, _)| a.id()).collect::<Vec<_>>(),
1152            "Processing event through agents in parallel"
1153        );
1154
1155        // Spawn all agent calls concurrently
1156        let futures: Vec<_> = agent_info
1157            .iter()
1158            .map(|(agent, filter_failure_mode, semaphore)| {
1159                let agent = Arc::clone(agent);
1160                let filter_failure_mode = *filter_failure_mode;
1161                let semaphore = semaphore.clone();
1162                let correlation_id = ctx.correlation_id.clone();
1163
1164                async move {
1165                    // Acquire per-agent semaphore permit (queue isolation)
1166                    let _permit = if let Some(sem) = semaphore {
1167                        match sem.acquire_owned().await {
1168                            Ok(permit) => Some(permit),
1169                            Err(_) => {
1170                                error!(
1171                                    correlation_id = %correlation_id,
1172                                    agent_id = %agent.id(),
1173                                    "Failed to acquire agent semaphore permit"
1174                                );
1175                                return Err((
1176                                    agent.id().to_string(),
1177                                    filter_failure_mode,
1178                                    "Failed to acquire permit".to_string(),
1179                                ));
1180                            }
1181                        }
1182                    } else {
1183                        None
1184                    };
1185
1186                    // Check circuit breaker
1187                    if !agent.circuit_breaker().is_closed() {
1188                        warn!(
1189                            agent_id = %agent.id(),
1190                            correlation_id = %correlation_id,
1191                            filter_failure_mode = ?filter_failure_mode,
1192                            "Circuit breaker open, skipping agent"
1193                        );
1194                        return Err((
1195                            agent.id().to_string(),
1196                            filter_failure_mode,
1197                            "Circuit breaker open".to_string(),
1198                        ));
1199                    }
1200
1201                    // Call agent with timeout
1202                    let start = Instant::now();
1203                    let timeout_duration = Duration::from_millis(agent.timeout_ms());
1204
1205                    match timeout(timeout_duration, agent.call_event(event_type, event)).await {
1206                        Ok(Ok(response)) => {
1207                            let duration = start.elapsed();
1208                            agent.record_success(duration).await;
1209                            trace!(
1210                                correlation_id = %correlation_id,
1211                                agent_id = %agent.id(),
1212                                duration_ms = duration.as_millis(),
1213                                "Parallel agent call succeeded"
1214                            );
1215                            Ok((agent.id().to_string(), response))
1216                        }
1217                        Ok(Err(e)) => {
1218                            agent.record_failure().await;
1219                            error!(
1220                                agent_id = %agent.id(),
1221                                correlation_id = %correlation_id,
1222                                error = %e,
1223                                duration_ms = start.elapsed().as_millis(),
1224                                filter_failure_mode = ?filter_failure_mode,
1225                                "Parallel agent call failed"
1226                            );
1227                            Err((
1228                                agent.id().to_string(),
1229                                filter_failure_mode,
1230                                format!("Agent error: {}", e),
1231                            ))
1232                        }
1233                        Err(_) => {
1234                            agent.record_timeout().await;
1235                            warn!(
1236                                agent_id = %agent.id(),
1237                                correlation_id = %correlation_id,
1238                                timeout_ms = agent.timeout_ms(),
1239                                filter_failure_mode = ?filter_failure_mode,
1240                                "Parallel agent call timed out"
1241                            );
1242                            Err((
1243                                agent.id().to_string(),
1244                                filter_failure_mode,
1245                                "Timeout".to_string(),
1246                            ))
1247                        }
1248                    }
1249                }
1250            })
1251            .collect();
1252
1253        // Execute all agent calls in parallel
1254        let results = join_all(futures).await;
1255
1256        // Process results and merge decisions
1257        let mut combined_decision = AgentDecision::default_allow();
1258        let mut blocking_error: Option<AgentDecision> = None;
1259
1260        for result in results {
1261            match result {
1262                Ok((agent_id, response)) => {
1263                    let decision: AgentDecision = response.into();
1264
1265                    // Check for blocking decision
1266                    if !decision.is_allow() {
1267                        debug!(
1268                            correlation_id = %ctx.correlation_id,
1269                            agent_id = %agent_id,
1270                            decision = ?decision,
1271                            "Agent returned blocking decision"
1272                        );
1273                        // Return first blocking decision immediately
1274                        return Ok(decision);
1275                    }
1276
1277                    combined_decision.merge(decision);
1278                }
1279                Err((agent_id, failure_mode, reason)) => {
1280                    // Handle failure based on filter's failure mode
1281                    if failure_mode == FailureMode::Closed && blocking_error.is_none() {
1282                        debug!(
1283                            correlation_id = %ctx.correlation_id,
1284                            agent_id = %agent_id,
1285                            reason = %reason,
1286                            "Agent failure in fail-closed mode"
1287                        );
1288                        // Store blocking error but continue processing other results
1289                        // in case another agent returned a more specific block
1290                        let status = if reason.contains("Timeout") { 504 } else { 503 };
1291                        let message = if reason.contains("Timeout") {
1292                            "Gateway timeout"
1293                        } else {
1294                            "Service unavailable"
1295                        };
1296                        blocking_error = Some(AgentDecision::block(status, message));
1297                    } else {
1298                        // Fail-open: log and continue
1299                        debug!(
1300                            correlation_id = %ctx.correlation_id,
1301                            agent_id = %agent_id,
1302                            reason = %reason,
1303                            "Agent failure in fail-open mode, continuing"
1304                        );
1305                    }
1306                }
1307            }
1308        }
1309
1310        // If we have a fail-closed error and no explicit block, return the error
1311        if let Some(error_decision) = blocking_error {
1312            return Ok(error_decision);
1313        }
1314
1315        trace!(
1316            correlation_id = %ctx.correlation_id,
1317            decision = ?combined_decision,
1318            agents_processed = agent_info.len(),
1319            "Parallel agent event processing completed"
1320        );
1321
1322        Ok(combined_decision)
1323    }
1324
1325    /// Initialize agent connections.
1326    pub async fn initialize(&self) -> SentinelResult<()> {
1327        let agents = self.agents.read().await;
1328
1329        info!(agent_count = agents.len(), "Initializing agent connections");
1330
1331        let mut initialized_count = 0;
1332        let mut failed_count = 0;
1333
1334        for (id, agent) in agents.iter() {
1335            debug!(agent_id = %id, "Initializing agent connection");
1336            if let Err(e) = agent.initialize().await {
1337                error!(
1338                    agent_id = %id,
1339                    error = %e,
1340                    "Failed to initialize agent"
1341                );
1342                failed_count += 1;
1343                // Continue with other agents
1344            } else {
1345                trace!(agent_id = %id, "Agent initialized successfully");
1346                initialized_count += 1;
1347            }
1348        }
1349
1350        info!(
1351            initialized = initialized_count,
1352            failed = failed_count,
1353            total = agents.len(),
1354            "Agent initialization complete"
1355        );
1356
1357        Ok(())
1358    }
1359
1360    /// Shutdown all agents.
1361    pub async fn shutdown(&self) {
1362        let agents = self.agents.read().await;
1363
1364        info!(agent_count = agents.len(), "Shutting down agent manager");
1365
1366        for (id, agent) in agents.iter() {
1367            debug!(agent_id = %id, "Shutting down agent");
1368            agent.shutdown().await;
1369            trace!(agent_id = %id, "Agent shutdown complete");
1370        }
1371
1372        info!("Agent manager shutdown complete");
1373    }
1374
1375    /// Get agent metrics.
1376    pub fn metrics(&self) -> &AgentMetrics {
1377        &self.metrics
1378    }
1379
1380    /// Get agent IDs that handle a specific event type.
1381    ///
1382    /// This is useful for pre-filtering agents before making calls,
1383    /// e.g., to check if any agents handle WebSocket frames.
1384    pub fn get_agents_for_event(&self, event_type: EventType) -> Vec<String> {
1385        // Use try_read to avoid blocking - return empty if lock is held
1386        // This is acceptable since this is only used for informational purposes
1387        if let Ok(agents) = self.agents.try_read() {
1388            agents
1389                .values()
1390                .filter(|agent| agent.handles_event(event_type))
1391                .map(|agent| agent.id().to_string())
1392                .collect()
1393        } else {
1394            Vec::new()
1395        }
1396    }
1397
1398    /// Get pool metrics collectors from all v2 agents.
1399    ///
1400    /// Returns a vector of (agent_id, MetricsCollector) pairs for all v2 agents.
1401    /// These can be registered with the MetricsManager to include agent pool
1402    /// metrics in the /metrics endpoint output.
1403    pub async fn get_v2_pool_metrics(&self) -> Vec<(String, Arc<MetricsCollector>)> {
1404        let agents = self.agents.read().await;
1405        agents
1406            .iter()
1407            .filter_map(|(id, agent)| {
1408                if let UnifiedAgent::V2(v2_agent) = agent.as_ref() {
1409                    Some((id.clone(), v2_agent.pool_metrics_collector_arc()))
1410                } else {
1411                    None
1412                }
1413            })
1414            .collect()
1415    }
1416
1417    /// Export prometheus metrics from all v2 agent pools.
1418    ///
1419    /// Returns the combined prometheus-formatted metrics from all v2 agent pools.
1420    pub async fn export_v2_pool_metrics(&self) -> String {
1421        let agents = self.agents.read().await;
1422        let mut output = String::new();
1423
1424        for (id, agent) in agents.iter() {
1425            if let UnifiedAgent::V2(v2_agent) = agent.as_ref() {
1426                let pool_metrics = v2_agent.export_prometheus();
1427                if !pool_metrics.is_empty() {
1428                    output.push_str(&format!("\n# Agent pool metrics: {}\n", id));
1429                    output.push_str(&pool_metrics);
1430                }
1431            }
1432        }
1433
1434        output
1435    }
1436
1437    /// Get a v2 agent's metrics collector by ID.
1438    ///
1439    /// Returns None if the agent doesn't exist or is not a v2 agent.
1440    pub async fn get_v2_metrics_collector(&self, agent_id: &str) -> Option<Arc<MetricsCollector>> {
1441        let agents = self.agents.read().await;
1442        if let Some(agent) = agents.get(agent_id) {
1443            if let UnifiedAgent::V2(v2_agent) = agent.as_ref() {
1444                Some(v2_agent.pool_metrics_collector_arc())
1445            } else {
1446                None
1447            }
1448        } else {
1449            None
1450        }
1451    }
1452}