sentinel_proxy/agents/
manager.rs

1//! Agent manager for coordinating external processing agents.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use base64::{engine::general_purpose::STANDARD, Engine as _};
8use pingora_timeout::timeout;
9use sentinel_agent_protocol::{
10    AgentResponse, EventType, RequestBodyChunkEvent, RequestHeadersEvent, ResponseBodyChunkEvent,
11    ResponseHeadersEvent, WebSocketFrameEvent,
12};
13use sentinel_common::{
14    errors::{SentinelError, SentinelResult},
15    types::CircuitBreakerConfig,
16    CircuitBreaker,
17};
18use sentinel_config::{AgentConfig, FailureMode};
19use tokio::sync::{RwLock, Semaphore};
20use tracing::{debug, error, info, trace, warn};
21
22use super::agent::Agent;
23use super::context::AgentCallContext;
24use super::decision::AgentDecision;
25use super::metrics::AgentMetrics;
26use super::pool::AgentConnectionPool;
27
28/// Agent manager handling all external agents.
29pub struct AgentManager {
30    /// Configured agents
31    agents: Arc<RwLock<HashMap<String, Arc<Agent>>>>,
32    /// Connection pools for agents
33    connection_pools: Arc<RwLock<HashMap<String, Arc<AgentConnectionPool>>>>,
34    /// Circuit breakers per agent
35    circuit_breakers: Arc<RwLock<HashMap<String, Arc<CircuitBreaker>>>>,
36    /// Global agent metrics
37    metrics: Arc<AgentMetrics>,
38    /// Per-agent semaphores for queue isolation (prevents noisy neighbor problem)
39    agent_semaphores: Arc<RwLock<HashMap<String, Arc<Semaphore>>>>,
40}
41
42impl AgentManager {
43    /// Create new agent manager.
44    ///
45    /// Each agent gets its own semaphore for queue isolation, preventing a slow
46    /// agent from affecting other agents (noisy neighbor problem). The concurrency
47    /// limit is configured per-agent via `max_concurrent_calls` in the agent config.
48    pub async fn new(agents: Vec<AgentConfig>) -> SentinelResult<Self> {
49        info!(agent_count = agents.len(), "Creating agent manager");
50
51        let mut agent_map = HashMap::new();
52        let mut pools = HashMap::new();
53        let mut breakers = HashMap::new();
54        let mut semaphores = HashMap::new();
55
56        for config in agents {
57            debug!(
58                agent_id = %config.id,
59                transport = ?config.transport,
60                timeout_ms = config.timeout_ms,
61                failure_mode = ?config.failure_mode,
62                max_concurrent_calls = config.max_concurrent_calls,
63                "Configuring agent"
64            );
65
66            let pool = Arc::new(AgentConnectionPool::new(
67                10, // max connections
68                2,  // min idle
69                5,  // max idle
70                Duration::from_secs(60),
71            ));
72
73            let circuit_breaker = Arc::new(CircuitBreaker::new(
74                config
75                    .circuit_breaker
76                    .clone()
77                    .unwrap_or_else(CircuitBreakerConfig::default),
78            ));
79
80            // Create per-agent semaphore for queue isolation
81            let semaphore = Arc::new(Semaphore::new(config.max_concurrent_calls));
82
83            trace!(
84                agent_id = %config.id,
85                max_concurrent_calls = config.max_concurrent_calls,
86                "Creating agent instance with isolated queue"
87            );
88
89            let agent = Arc::new(Agent::new(
90                config.clone(),
91                Arc::clone(&pool),
92                Arc::clone(&circuit_breaker),
93            ));
94
95            agent_map.insert(config.id.clone(), agent);
96            pools.insert(config.id.clone(), pool);
97            breakers.insert(config.id.clone(), circuit_breaker);
98            semaphores.insert(config.id.clone(), semaphore);
99
100            debug!(
101                agent_id = %config.id,
102                "Agent configured successfully"
103            );
104        }
105
106        info!(
107            configured_agents = agent_map.len(),
108            "Agent manager created successfully with per-agent queue isolation"
109        );
110
111        Ok(Self {
112            agents: Arc::new(RwLock::new(agent_map)),
113            connection_pools: Arc::new(RwLock::new(pools)),
114            circuit_breakers: Arc::new(RwLock::new(breakers)),
115            metrics: Arc::new(AgentMetrics::default()),
116            agent_semaphores: Arc::new(RwLock::new(semaphores)),
117        })
118    }
119
120    /// Process request headers through agents.
121    ///
122    /// # Arguments
123    /// * `ctx` - Agent call context with correlation ID and metadata
124    /// * `headers` - Request headers to send to agents
125    /// * `route_agents` - List of (agent_id, failure_mode) tuples from filter chain
126    pub async fn process_request_headers(
127        &self,
128        ctx: &AgentCallContext,
129        headers: &HashMap<String, Vec<String>>,
130        route_agents: &[(String, FailureMode)],
131    ) -> SentinelResult<AgentDecision> {
132        let event = RequestHeadersEvent {
133            metadata: ctx.metadata.clone(),
134            method: headers
135                .get(":method")
136                .and_then(|v| v.first())
137                .unwrap_or(&"GET".to_string())
138                .clone(),
139            uri: headers
140                .get(":path")
141                .and_then(|v| v.first())
142                .unwrap_or(&"/".to_string())
143                .clone(),
144            headers: headers.clone(),
145        };
146
147        self.process_event_with_failure_modes(EventType::RequestHeaders, &event, route_agents, ctx)
148            .await
149    }
150
151    /// Process request body chunk through agents.
152    pub async fn process_request_body(
153        &self,
154        ctx: &AgentCallContext,
155        data: &[u8],
156        is_last: bool,
157        route_agents: &[String],
158    ) -> SentinelResult<AgentDecision> {
159        // Check body size limits
160        let max_size = 1024 * 1024; // 1MB default
161        if data.len() > max_size {
162            warn!(
163                correlation_id = %ctx.correlation_id,
164                size = data.len(),
165                "Request body exceeds agent inspection limit"
166            );
167            return Ok(AgentDecision::default_allow());
168        }
169
170        let event = RequestBodyChunkEvent {
171            correlation_id: ctx.correlation_id.to_string(),
172            data: STANDARD.encode(data),
173            is_last,
174            total_size: ctx.request_body.as_ref().map(|b| b.len()),
175            chunk_index: 0, // Buffer mode sends entire body as single chunk
176            bytes_received: data.len(),
177        };
178
179        self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
180            .await
181    }
182
183    /// Process a single request body chunk through agents (streaming mode).
184    ///
185    /// Unlike `process_request_body` which is used for buffered mode, this method
186    /// is designed for streaming where chunks are sent individually as they arrive.
187    pub async fn process_request_body_streaming(
188        &self,
189        ctx: &AgentCallContext,
190        data: &[u8],
191        is_last: bool,
192        chunk_index: u32,
193        bytes_received: usize,
194        total_size: Option<usize>,
195        route_agents: &[String],
196    ) -> SentinelResult<AgentDecision> {
197        trace!(
198            correlation_id = %ctx.correlation_id,
199            chunk_index = chunk_index,
200            chunk_size = data.len(),
201            bytes_received = bytes_received,
202            is_last = is_last,
203            "Processing streaming request body chunk"
204        );
205
206        let event = RequestBodyChunkEvent {
207            correlation_id: ctx.correlation_id.to_string(),
208            data: STANDARD.encode(data),
209            is_last,
210            total_size,
211            chunk_index,
212            bytes_received,
213        };
214
215        self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
216            .await
217    }
218
219    /// Process a single response body chunk through agents (streaming mode).
220    pub async fn process_response_body_streaming(
221        &self,
222        ctx: &AgentCallContext,
223        data: &[u8],
224        is_last: bool,
225        chunk_index: u32,
226        bytes_sent: usize,
227        total_size: Option<usize>,
228        route_agents: &[String],
229    ) -> SentinelResult<AgentDecision> {
230        trace!(
231            correlation_id = %ctx.correlation_id,
232            chunk_index = chunk_index,
233            chunk_size = data.len(),
234            bytes_sent = bytes_sent,
235            is_last = is_last,
236            "Processing streaming response body chunk"
237        );
238
239        let event = ResponseBodyChunkEvent {
240            correlation_id: ctx.correlation_id.to_string(),
241            data: STANDARD.encode(data),
242            is_last,
243            total_size,
244            chunk_index,
245            bytes_sent,
246        };
247
248        self.process_event(EventType::ResponseBodyChunk, &event, route_agents, ctx)
249            .await
250    }
251
252    /// Process response headers through agents.
253    pub async fn process_response_headers(
254        &self,
255        ctx: &AgentCallContext,
256        status: u16,
257        headers: &HashMap<String, Vec<String>>,
258        route_agents: &[String],
259    ) -> SentinelResult<AgentDecision> {
260        let event = ResponseHeadersEvent {
261            correlation_id: ctx.correlation_id.to_string(),
262            status,
263            headers: headers.clone(),
264        };
265
266        self.process_event(EventType::ResponseHeaders, &event, route_agents, ctx)
267            .await
268    }
269
270    /// Process a WebSocket frame through agents.
271    ///
272    /// This is used for WebSocket frame inspection after an upgrade.
273    /// Returns the agent response directly to allow the caller to access
274    /// the websocket_decision field.
275    pub async fn process_websocket_frame(
276        &self,
277        route_id: &str,
278        event: WebSocketFrameEvent,
279    ) -> SentinelResult<AgentResponse> {
280        trace!(
281            correlation_id = %event.correlation_id,
282            route_id = %route_id,
283            frame_index = event.frame_index,
284            opcode = %event.opcode,
285            "Processing WebSocket frame through agents"
286        );
287
288        // Get relevant agents for this route that handle WebSocket frames
289        let agents = self.agents.read().await;
290        let relevant_agents: Vec<_> = agents
291            .values()
292            .filter(|agent| agent.handles_event(EventType::WebSocketFrame))
293            .collect();
294
295        if relevant_agents.is_empty() {
296            trace!(
297                correlation_id = %event.correlation_id,
298                "No agents handle WebSocket frames, allowing"
299            );
300            return Ok(AgentResponse::websocket_allow());
301        }
302
303        debug!(
304            correlation_id = %event.correlation_id,
305            route_id = %route_id,
306            agent_count = relevant_agents.len(),
307            "Processing WebSocket frame through agents"
308        );
309
310        // Process through each agent sequentially
311        for agent in relevant_agents {
312            // Check circuit breaker
313            if !agent.circuit_breaker().is_closed().await {
314                warn!(
315                    agent_id = %agent.id(),
316                    correlation_id = %event.correlation_id,
317                    failure_mode = ?agent.failure_mode(),
318                    "Circuit breaker open, skipping agent for WebSocket frame"
319                );
320
321                if agent.failure_mode() == FailureMode::Closed {
322                    debug!(
323                        correlation_id = %event.correlation_id,
324                        agent_id = %agent.id(),
325                        "Closing WebSocket due to circuit breaker (fail-closed mode)"
326                    );
327                    return Ok(AgentResponse::websocket_close(
328                        1011,
329                        "Service unavailable".to_string(),
330                    ));
331                }
332                continue;
333            }
334
335            // Call agent with timeout
336            let start = Instant::now();
337            let timeout_duration = Duration::from_millis(agent.timeout_ms());
338
339            match timeout(
340                timeout_duration,
341                agent.call_event(EventType::WebSocketFrame, &event),
342            )
343            .await
344            {
345                Ok(Ok(response)) => {
346                    let duration = start.elapsed();
347                    agent.record_success(duration).await;
348
349                    trace!(
350                        correlation_id = %event.correlation_id,
351                        agent_id = %agent.id(),
352                        duration_ms = duration.as_millis(),
353                        "WebSocket frame agent call succeeded"
354                    );
355
356                    // If agent returned a WebSocket decision that's not Allow, return immediately
357                    if let Some(ref ws_decision) = response.websocket_decision {
358                        if !matches!(
359                            ws_decision,
360                            sentinel_agent_protocol::WebSocketDecision::Allow
361                        ) {
362                            debug!(
363                                correlation_id = %event.correlation_id,
364                                agent_id = %agent.id(),
365                                decision = ?ws_decision,
366                                "Agent returned non-allow WebSocket decision"
367                            );
368                            return Ok(response);
369                        }
370                    }
371                }
372                Ok(Err(e)) => {
373                    agent.record_failure().await;
374                    error!(
375                        agent_id = %agent.id(),
376                        correlation_id = %event.correlation_id,
377                        error = %e,
378                        duration_ms = start.elapsed().as_millis(),
379                        failure_mode = ?agent.failure_mode(),
380                        "WebSocket frame agent call failed"
381                    );
382
383                    if agent.failure_mode() == FailureMode::Closed {
384                        return Ok(AgentResponse::websocket_close(
385                            1011,
386                            "Agent error".to_string(),
387                        ));
388                    }
389                }
390                Err(_) => {
391                    agent.record_timeout().await;
392                    warn!(
393                        agent_id = %agent.id(),
394                        correlation_id = %event.correlation_id,
395                        timeout_ms = agent.timeout_ms(),
396                        failure_mode = ?agent.failure_mode(),
397                        "WebSocket frame agent call timed out"
398                    );
399
400                    if agent.failure_mode() == FailureMode::Closed {
401                        return Ok(AgentResponse::websocket_close(
402                            1011,
403                            "Gateway timeout".to_string(),
404                        ));
405                    }
406                }
407            }
408        }
409
410        // All agents allowed the frame
411        Ok(AgentResponse::websocket_allow())
412    }
413
414    /// Process an event through relevant agents.
415    async fn process_event<T: serde::Serialize>(
416        &self,
417        event_type: EventType,
418        event: &T,
419        route_agents: &[String],
420        ctx: &AgentCallContext,
421    ) -> SentinelResult<AgentDecision> {
422        trace!(
423            correlation_id = %ctx.correlation_id,
424            event_type = ?event_type,
425            route_agents = ?route_agents,
426            "Starting agent event processing"
427        );
428
429        // Get relevant agents for this route and event type
430        let agents = self.agents.read().await;
431        let relevant_agents: Vec<_> = route_agents
432            .iter()
433            .filter_map(|id| agents.get(id))
434            .filter(|agent| agent.handles_event(event_type))
435            .collect();
436
437        if relevant_agents.is_empty() {
438            trace!(
439                correlation_id = %ctx.correlation_id,
440                event_type = ?event_type,
441                "No relevant agents for event, allowing request"
442            );
443            return Ok(AgentDecision::default_allow());
444        }
445
446        debug!(
447            correlation_id = %ctx.correlation_id,
448            event_type = ?event_type,
449            agent_count = relevant_agents.len(),
450            agent_ids = ?relevant_agents.iter().map(|a| a.id()).collect::<Vec<_>>(),
451            "Processing event through agents"
452        );
453
454        // Process through each agent sequentially
455        let mut combined_decision = AgentDecision::default_allow();
456
457        for (agent_index, agent) in relevant_agents.iter().enumerate() {
458            trace!(
459                correlation_id = %ctx.correlation_id,
460                agent_id = %agent.id(),
461                agent_index = agent_index,
462                event_type = ?event_type,
463                "Processing event through agent"
464            );
465
466            // Acquire per-agent semaphore permit (queue isolation)
467            let semaphores = self.agent_semaphores.read().await;
468            let agent_semaphore = semaphores.get(agent.id()).cloned();
469            drop(semaphores); // Release lock before awaiting
470
471            let _permit = match agent_semaphore {
472                Some(semaphore) => {
473                    trace!(
474                        correlation_id = %ctx.correlation_id,
475                        agent_id = %agent.id(),
476                        "Acquiring per-agent semaphore permit"
477                    );
478                    Some(semaphore.acquire_owned().await.map_err(|_| {
479                        error!(
480                            correlation_id = %ctx.correlation_id,
481                            agent_id = %agent.id(),
482                            "Failed to acquire agent call semaphore permit"
483                        );
484                        SentinelError::Internal {
485                            message: "Failed to acquire agent call permit".to_string(),
486                            correlation_id: Some(ctx.correlation_id.to_string()),
487                            source: None,
488                        }
489                    })?)
490                }
491                None => {
492                    // No semaphore found (shouldn't happen, but fail gracefully)
493                    warn!(
494                        correlation_id = %ctx.correlation_id,
495                        agent_id = %agent.id(),
496                        "No semaphore found for agent, proceeding without queue isolation"
497                    );
498                    None
499                }
500            };
501
502            // Check circuit breaker
503            if !agent.circuit_breaker().is_closed().await {
504                warn!(
505                    agent_id = %agent.id(),
506                    correlation_id = %ctx.correlation_id,
507                    failure_mode = ?agent.failure_mode(),
508                    "Circuit breaker open, skipping agent"
509                );
510
511                // Handle based on failure mode
512                if agent.failure_mode() == FailureMode::Closed {
513                    debug!(
514                        correlation_id = %ctx.correlation_id,
515                        agent_id = %agent.id(),
516                        "Blocking request due to circuit breaker (fail-closed mode)"
517                    );
518                    return Ok(AgentDecision::block(503, "Service unavailable"));
519                }
520                continue;
521            }
522
523            // Call agent with timeout (using pingora-timeout for efficiency)
524            let start = Instant::now();
525            let timeout_duration = Duration::from_millis(agent.timeout_ms());
526
527            trace!(
528                correlation_id = %ctx.correlation_id,
529                agent_id = %agent.id(),
530                timeout_ms = agent.timeout_ms(),
531                "Calling agent"
532            );
533
534            match timeout(timeout_duration, agent.call_event(event_type, event)).await {
535                Ok(Ok(response)) => {
536                    let duration = start.elapsed();
537                    agent.record_success(duration).await;
538
539                    trace!(
540                        correlation_id = %ctx.correlation_id,
541                        agent_id = %agent.id(),
542                        duration_ms = duration.as_millis(),
543                        decision = ?response,
544                        "Agent call succeeded"
545                    );
546
547                    // Merge response into combined decision
548                    combined_decision.merge(response.into());
549
550                    // If decision is to block/redirect/challenge, stop processing
551                    if !combined_decision.is_allow() {
552                        debug!(
553                            correlation_id = %ctx.correlation_id,
554                            agent_id = %agent.id(),
555                            decision = ?combined_decision,
556                            "Agent returned blocking decision, stopping agent chain"
557                        );
558                        break;
559                    }
560                }
561                Ok(Err(e)) => {
562                    agent.record_failure().await;
563                    error!(
564                        agent_id = %agent.id(),
565                        correlation_id = %ctx.correlation_id,
566                        error = %e,
567                        duration_ms = start.elapsed().as_millis(),
568                        failure_mode = ?agent.failure_mode(),
569                        "Agent call failed"
570                    );
571
572                    if agent.failure_mode() == FailureMode::Closed {
573                        return Err(e);
574                    }
575                }
576                Err(_) => {
577                    agent.record_timeout().await;
578                    warn!(
579                        agent_id = %agent.id(),
580                        correlation_id = %ctx.correlation_id,
581                        timeout_ms = agent.timeout_ms(),
582                        failure_mode = ?agent.failure_mode(),
583                        "Agent call timed out"
584                    );
585
586                    if agent.failure_mode() == FailureMode::Closed {
587                        debug!(
588                            correlation_id = %ctx.correlation_id,
589                            agent_id = %agent.id(),
590                            "Blocking request due to timeout (fail-closed mode)"
591                        );
592                        return Ok(AgentDecision::block(504, "Gateway timeout"));
593                    }
594                }
595            }
596        }
597
598        trace!(
599            correlation_id = %ctx.correlation_id,
600            decision = ?combined_decision,
601            agents_processed = relevant_agents.len(),
602            "Agent event processing completed"
603        );
604
605        Ok(combined_decision)
606    }
607
608    /// Process an event through relevant agents with per-filter failure modes.
609    ///
610    /// This is the preferred method for processing events as it respects the
611    /// failure mode configured on each filter, not just the agent's default.
612    async fn process_event_with_failure_modes<T: serde::Serialize>(
613        &self,
614        event_type: EventType,
615        event: &T,
616        route_agents: &[(String, FailureMode)],
617        ctx: &AgentCallContext,
618    ) -> SentinelResult<AgentDecision> {
619        trace!(
620            correlation_id = %ctx.correlation_id,
621            event_type = ?event_type,
622            route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
623            "Starting agent event processing with failure modes"
624        );
625
626        // Get relevant agents for this route and event type, preserving failure modes
627        let agents = self.agents.read().await;
628        let relevant_agents: Vec<_> = route_agents
629            .iter()
630            .filter_map(|(id, failure_mode)| {
631                agents.get(id).map(|agent| (agent, *failure_mode))
632            })
633            .filter(|(agent, _)| agent.handles_event(event_type))
634            .collect();
635
636        if relevant_agents.is_empty() {
637            trace!(
638                correlation_id = %ctx.correlation_id,
639                event_type = ?event_type,
640                "No relevant agents for event, allowing request"
641            );
642            return Ok(AgentDecision::default_allow());
643        }
644
645        debug!(
646            correlation_id = %ctx.correlation_id,
647            event_type = ?event_type,
648            agent_count = relevant_agents.len(),
649            agent_ids = ?relevant_agents.iter().map(|(a, _)| a.id()).collect::<Vec<_>>(),
650            "Processing event through agents"
651        );
652
653        // Process through each agent sequentially
654        let mut combined_decision = AgentDecision::default_allow();
655
656        for (agent_index, (agent, filter_failure_mode)) in relevant_agents.iter().enumerate() {
657            trace!(
658                correlation_id = %ctx.correlation_id,
659                agent_id = %agent.id(),
660                agent_index = agent_index,
661                event_type = ?event_type,
662                filter_failure_mode = ?filter_failure_mode,
663                "Processing event through agent with filter failure mode"
664            );
665
666            // Acquire per-agent semaphore permit (queue isolation)
667            let semaphores = self.agent_semaphores.read().await;
668            let agent_semaphore = semaphores.get(agent.id()).cloned();
669            drop(semaphores); // Release lock before awaiting
670
671            let _permit = if let Some(semaphore) = agent_semaphore {
672                trace!(
673                    correlation_id = %ctx.correlation_id,
674                    agent_id = %agent.id(),
675                    "Acquiring per-agent semaphore permit"
676                );
677                Some(semaphore.acquire_owned().await.map_err(|_| {
678                    error!(
679                        correlation_id = %ctx.correlation_id,
680                        agent_id = %agent.id(),
681                        "Failed to acquire agent call semaphore permit"
682                    );
683                    SentinelError::Internal {
684                        message: "Failed to acquire agent call permit".to_string(),
685                        correlation_id: Some(ctx.correlation_id.to_string()),
686                        source: None,
687                    }
688                })?)
689            } else {
690                // No semaphore found (shouldn't happen, but fail gracefully)
691                warn!(
692                    correlation_id = %ctx.correlation_id,
693                    agent_id = %agent.id(),
694                    "No semaphore found for agent, proceeding without queue isolation"
695                );
696                None
697            };
698
699            // Check circuit breaker
700            if !agent.circuit_breaker().is_closed().await {
701                warn!(
702                    agent_id = %agent.id(),
703                    correlation_id = %ctx.correlation_id,
704                    filter_failure_mode = ?filter_failure_mode,
705                    "Circuit breaker open, skipping agent"
706                );
707
708                // Handle based on filter's failure mode (not agent's default)
709                if *filter_failure_mode == FailureMode::Closed {
710                    debug!(
711                        correlation_id = %ctx.correlation_id,
712                        agent_id = %agent.id(),
713                        "Blocking request due to circuit breaker (filter fail-closed mode)"
714                    );
715                    return Ok(AgentDecision::block(503, "Service unavailable"));
716                }
717                // Fail-open: continue to next agent
718                continue;
719            }
720
721            // Call agent with timeout
722            let start = Instant::now();
723            let timeout_duration = Duration::from_millis(agent.timeout_ms());
724
725            trace!(
726                correlation_id = %ctx.correlation_id,
727                agent_id = %agent.id(),
728                timeout_ms = agent.timeout_ms(),
729                "Calling agent"
730            );
731
732            match timeout(timeout_duration, agent.call_event(event_type, event)).await {
733                Ok(Ok(response)) => {
734                    let duration = start.elapsed();
735                    agent.record_success(duration).await;
736
737                    trace!(
738                        correlation_id = %ctx.correlation_id,
739                        agent_id = %agent.id(),
740                        duration_ms = duration.as_millis(),
741                        decision = ?response,
742                        "Agent call succeeded"
743                    );
744
745                    // Merge response into combined decision
746                    combined_decision.merge(response.into());
747
748                    // If decision is to block/redirect/challenge, stop processing
749                    if !combined_decision.is_allow() {
750                        debug!(
751                            correlation_id = %ctx.correlation_id,
752                            agent_id = %agent.id(),
753                            decision = ?combined_decision,
754                            "Agent returned blocking decision, stopping agent chain"
755                        );
756                        break;
757                    }
758                }
759                Ok(Err(e)) => {
760                    agent.record_failure().await;
761                    error!(
762                        agent_id = %agent.id(),
763                        correlation_id = %ctx.correlation_id,
764                        error = %e,
765                        duration_ms = start.elapsed().as_millis(),
766                        filter_failure_mode = ?filter_failure_mode,
767                        "Agent call failed"
768                    );
769
770                    // Use filter's failure mode, not agent's default
771                    if *filter_failure_mode == FailureMode::Closed {
772                        debug!(
773                            correlation_id = %ctx.correlation_id,
774                            agent_id = %agent.id(),
775                            "Blocking request due to agent failure (filter fail-closed mode)"
776                        );
777                        return Ok(AgentDecision::block(503, "Agent unavailable"));
778                    }
779                    // Fail-open: continue to next agent (or proceed without this agent)
780                    debug!(
781                        correlation_id = %ctx.correlation_id,
782                        agent_id = %agent.id(),
783                        "Continuing despite agent failure (filter fail-open mode)"
784                    );
785                }
786                Err(_) => {
787                    agent.record_timeout().await;
788                    warn!(
789                        agent_id = %agent.id(),
790                        correlation_id = %ctx.correlation_id,
791                        timeout_ms = agent.timeout_ms(),
792                        filter_failure_mode = ?filter_failure_mode,
793                        "Agent call timed out"
794                    );
795
796                    // Use filter's failure mode, not agent's default
797                    if *filter_failure_mode == FailureMode::Closed {
798                        debug!(
799                            correlation_id = %ctx.correlation_id,
800                            agent_id = %agent.id(),
801                            "Blocking request due to timeout (filter fail-closed mode)"
802                        );
803                        return Ok(AgentDecision::block(504, "Gateway timeout"));
804                    }
805                    // Fail-open: continue to next agent
806                    debug!(
807                        correlation_id = %ctx.correlation_id,
808                        agent_id = %agent.id(),
809                        "Continuing despite timeout (filter fail-open mode)"
810                    );
811                }
812            }
813        }
814
815        trace!(
816            correlation_id = %ctx.correlation_id,
817            decision = ?combined_decision,
818            agents_processed = relevant_agents.len(),
819            "Agent event processing with failure modes completed"
820        );
821
822        Ok(combined_decision)
823    }
824
825    /// Initialize agent connections.
826    pub async fn initialize(&self) -> SentinelResult<()> {
827        let agents = self.agents.read().await;
828
829        info!(agent_count = agents.len(), "Initializing agent connections");
830
831        let mut initialized_count = 0;
832        let mut failed_count = 0;
833
834        for (id, agent) in agents.iter() {
835            debug!(agent_id = %id, "Initializing agent connection");
836            if let Err(e) = agent.initialize().await {
837                error!(
838                    agent_id = %id,
839                    error = %e,
840                    "Failed to initialize agent"
841                );
842                failed_count += 1;
843                // Continue with other agents
844            } else {
845                trace!(agent_id = %id, "Agent initialized successfully");
846                initialized_count += 1;
847            }
848        }
849
850        info!(
851            initialized = initialized_count,
852            failed = failed_count,
853            total = agents.len(),
854            "Agent initialization complete"
855        );
856
857        Ok(())
858    }
859
860    /// Shutdown all agents.
861    pub async fn shutdown(&self) {
862        let agents = self.agents.read().await;
863
864        info!(agent_count = agents.len(), "Shutting down agent manager");
865
866        for (id, agent) in agents.iter() {
867            debug!(agent_id = %id, "Shutting down agent");
868            agent.shutdown().await;
869            trace!(agent_id = %id, "Agent shutdown complete");
870        }
871
872        info!("Agent manager shutdown complete");
873    }
874
875    /// Get agent metrics.
876    pub fn metrics(&self) -> &AgentMetrics {
877        &self.metrics
878    }
879
880    /// Get agent IDs that handle a specific event type.
881    ///
882    /// This is useful for pre-filtering agents before making calls,
883    /// e.g., to check if any agents handle WebSocket frames.
884    pub fn get_agents_for_event(&self, event_type: EventType) -> Vec<String> {
885        // Use try_read to avoid blocking - return empty if lock is held
886        // This is acceptable since this is only used for informational purposes
887        if let Ok(agents) = self.agents.try_read() {
888            agents
889                .values()
890                .filter(|agent| agent.handles_event(event_type))
891                .map(|agent| agent.id().to_string())
892                .collect()
893        } else {
894            Vec::new()
895        }
896    }
897}