1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use base64::{engine::general_purpose::STANDARD, Engine as _};
8use pingora_timeout::timeout;
9use sentinel_agent_protocol::{
10 AgentResponse, EventType, RequestBodyChunkEvent, RequestHeadersEvent, ResponseBodyChunkEvent,
11 ResponseHeadersEvent, WebSocketFrameEvent,
12};
13use sentinel_common::{
14 errors::{SentinelError, SentinelResult},
15 types::CircuitBreakerConfig,
16 CircuitBreaker,
17};
18use sentinel_config::{AgentConfig, FailureMode};
19use tokio::sync::{RwLock, Semaphore};
20use tracing::{debug, error, info, trace, warn};
21
22use super::agent::Agent;
23use super::context::AgentCallContext;
24use super::decision::AgentDecision;
25use super::metrics::AgentMetrics;
26use super::pool::AgentConnectionPool;
27
28pub struct AgentManager {
30 agents: Arc<RwLock<HashMap<String, Arc<Agent>>>>,
32 connection_pools: Arc<RwLock<HashMap<String, Arc<AgentConnectionPool>>>>,
34 circuit_breakers: Arc<RwLock<HashMap<String, Arc<CircuitBreaker>>>>,
36 metrics: Arc<AgentMetrics>,
38 agent_semaphores: Arc<RwLock<HashMap<String, Arc<Semaphore>>>>,
40}
41
42impl AgentManager {
43 pub async fn new(agents: Vec<AgentConfig>) -> SentinelResult<Self> {
49 info!(agent_count = agents.len(), "Creating agent manager");
50
51 let mut agent_map = HashMap::new();
52 let mut pools = HashMap::new();
53 let mut breakers = HashMap::new();
54 let mut semaphores = HashMap::new();
55
56 for config in agents {
57 debug!(
58 agent_id = %config.id,
59 transport = ?config.transport,
60 timeout_ms = config.timeout_ms,
61 failure_mode = ?config.failure_mode,
62 max_concurrent_calls = config.max_concurrent_calls,
63 "Configuring agent"
64 );
65
66 let pool = Arc::new(AgentConnectionPool::new(
67 10, 2, 5, Duration::from_secs(60),
71 ));
72
73 let circuit_breaker = Arc::new(CircuitBreaker::new(
74 config
75 .circuit_breaker
76 .clone()
77 .unwrap_or_else(CircuitBreakerConfig::default),
78 ));
79
80 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_calls));
82
83 trace!(
84 agent_id = %config.id,
85 max_concurrent_calls = config.max_concurrent_calls,
86 "Creating agent instance with isolated queue"
87 );
88
89 let agent = Arc::new(Agent::new(
90 config.clone(),
91 Arc::clone(&pool),
92 Arc::clone(&circuit_breaker),
93 ));
94
95 agent_map.insert(config.id.clone(), agent);
96 pools.insert(config.id.clone(), pool);
97 breakers.insert(config.id.clone(), circuit_breaker);
98 semaphores.insert(config.id.clone(), semaphore);
99
100 debug!(
101 agent_id = %config.id,
102 "Agent configured successfully"
103 );
104 }
105
106 info!(
107 configured_agents = agent_map.len(),
108 "Agent manager created successfully with per-agent queue isolation"
109 );
110
111 Ok(Self {
112 agents: Arc::new(RwLock::new(agent_map)),
113 connection_pools: Arc::new(RwLock::new(pools)),
114 circuit_breakers: Arc::new(RwLock::new(breakers)),
115 metrics: Arc::new(AgentMetrics::default()),
116 agent_semaphores: Arc::new(RwLock::new(semaphores)),
117 })
118 }
119
120 pub async fn process_request_headers(
127 &self,
128 ctx: &AgentCallContext,
129 headers: &HashMap<String, Vec<String>>,
130 route_agents: &[(String, FailureMode)],
131 ) -> SentinelResult<AgentDecision> {
132 let event = RequestHeadersEvent {
133 metadata: ctx.metadata.clone(),
134 method: headers
135 .get(":method")
136 .and_then(|v| v.first())
137 .unwrap_or(&"GET".to_string())
138 .clone(),
139 uri: headers
140 .get(":path")
141 .and_then(|v| v.first())
142 .unwrap_or(&"/".to_string())
143 .clone(),
144 headers: headers.clone(),
145 };
146
147 self.process_event_with_failure_modes(EventType::RequestHeaders, &event, route_agents, ctx)
148 .await
149 }
150
151 pub async fn process_request_body(
153 &self,
154 ctx: &AgentCallContext,
155 data: &[u8],
156 is_last: bool,
157 route_agents: &[String],
158 ) -> SentinelResult<AgentDecision> {
159 let max_size = 1024 * 1024; if data.len() > max_size {
162 warn!(
163 correlation_id = %ctx.correlation_id,
164 size = data.len(),
165 "Request body exceeds agent inspection limit"
166 );
167 return Ok(AgentDecision::default_allow());
168 }
169
170 let event = RequestBodyChunkEvent {
171 correlation_id: ctx.correlation_id.to_string(),
172 data: STANDARD.encode(data),
173 is_last,
174 total_size: ctx.request_body.as_ref().map(|b| b.len()),
175 chunk_index: 0, bytes_received: data.len(),
177 };
178
179 self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
180 .await
181 }
182
183 pub async fn process_request_body_streaming(
188 &self,
189 ctx: &AgentCallContext,
190 data: &[u8],
191 is_last: bool,
192 chunk_index: u32,
193 bytes_received: usize,
194 total_size: Option<usize>,
195 route_agents: &[String],
196 ) -> SentinelResult<AgentDecision> {
197 trace!(
198 correlation_id = %ctx.correlation_id,
199 chunk_index = chunk_index,
200 chunk_size = data.len(),
201 bytes_received = bytes_received,
202 is_last = is_last,
203 "Processing streaming request body chunk"
204 );
205
206 let event = RequestBodyChunkEvent {
207 correlation_id: ctx.correlation_id.to_string(),
208 data: STANDARD.encode(data),
209 is_last,
210 total_size,
211 chunk_index,
212 bytes_received,
213 };
214
215 self.process_event(EventType::RequestBodyChunk, &event, route_agents, ctx)
216 .await
217 }
218
219 pub async fn process_response_body_streaming(
221 &self,
222 ctx: &AgentCallContext,
223 data: &[u8],
224 is_last: bool,
225 chunk_index: u32,
226 bytes_sent: usize,
227 total_size: Option<usize>,
228 route_agents: &[String],
229 ) -> SentinelResult<AgentDecision> {
230 trace!(
231 correlation_id = %ctx.correlation_id,
232 chunk_index = chunk_index,
233 chunk_size = data.len(),
234 bytes_sent = bytes_sent,
235 is_last = is_last,
236 "Processing streaming response body chunk"
237 );
238
239 let event = ResponseBodyChunkEvent {
240 correlation_id: ctx.correlation_id.to_string(),
241 data: STANDARD.encode(data),
242 is_last,
243 total_size,
244 chunk_index,
245 bytes_sent,
246 };
247
248 self.process_event(EventType::ResponseBodyChunk, &event, route_agents, ctx)
249 .await
250 }
251
252 pub async fn process_response_headers(
254 &self,
255 ctx: &AgentCallContext,
256 status: u16,
257 headers: &HashMap<String, Vec<String>>,
258 route_agents: &[String],
259 ) -> SentinelResult<AgentDecision> {
260 let event = ResponseHeadersEvent {
261 correlation_id: ctx.correlation_id.to_string(),
262 status,
263 headers: headers.clone(),
264 };
265
266 self.process_event(EventType::ResponseHeaders, &event, route_agents, ctx)
267 .await
268 }
269
270 pub async fn process_websocket_frame(
276 &self,
277 route_id: &str,
278 event: WebSocketFrameEvent,
279 ) -> SentinelResult<AgentResponse> {
280 trace!(
281 correlation_id = %event.correlation_id,
282 route_id = %route_id,
283 frame_index = event.frame_index,
284 opcode = %event.opcode,
285 "Processing WebSocket frame through agents"
286 );
287
288 let agents = self.agents.read().await;
290 let relevant_agents: Vec<_> = agents
291 .values()
292 .filter(|agent| agent.handles_event(EventType::WebSocketFrame))
293 .collect();
294
295 if relevant_agents.is_empty() {
296 trace!(
297 correlation_id = %event.correlation_id,
298 "No agents handle WebSocket frames, allowing"
299 );
300 return Ok(AgentResponse::websocket_allow());
301 }
302
303 debug!(
304 correlation_id = %event.correlation_id,
305 route_id = %route_id,
306 agent_count = relevant_agents.len(),
307 "Processing WebSocket frame through agents"
308 );
309
310 for agent in relevant_agents {
312 if !agent.circuit_breaker().is_closed().await {
314 warn!(
315 agent_id = %agent.id(),
316 correlation_id = %event.correlation_id,
317 failure_mode = ?agent.failure_mode(),
318 "Circuit breaker open, skipping agent for WebSocket frame"
319 );
320
321 if agent.failure_mode() == FailureMode::Closed {
322 debug!(
323 correlation_id = %event.correlation_id,
324 agent_id = %agent.id(),
325 "Closing WebSocket due to circuit breaker (fail-closed mode)"
326 );
327 return Ok(AgentResponse::websocket_close(
328 1011,
329 "Service unavailable".to_string(),
330 ));
331 }
332 continue;
333 }
334
335 let start = Instant::now();
337 let timeout_duration = Duration::from_millis(agent.timeout_ms());
338
339 match timeout(
340 timeout_duration,
341 agent.call_event(EventType::WebSocketFrame, &event),
342 )
343 .await
344 {
345 Ok(Ok(response)) => {
346 let duration = start.elapsed();
347 agent.record_success(duration).await;
348
349 trace!(
350 correlation_id = %event.correlation_id,
351 agent_id = %agent.id(),
352 duration_ms = duration.as_millis(),
353 "WebSocket frame agent call succeeded"
354 );
355
356 if let Some(ref ws_decision) = response.websocket_decision {
358 if !matches!(
359 ws_decision,
360 sentinel_agent_protocol::WebSocketDecision::Allow
361 ) {
362 debug!(
363 correlation_id = %event.correlation_id,
364 agent_id = %agent.id(),
365 decision = ?ws_decision,
366 "Agent returned non-allow WebSocket decision"
367 );
368 return Ok(response);
369 }
370 }
371 }
372 Ok(Err(e)) => {
373 agent.record_failure().await;
374 error!(
375 agent_id = %agent.id(),
376 correlation_id = %event.correlation_id,
377 error = %e,
378 duration_ms = start.elapsed().as_millis(),
379 failure_mode = ?agent.failure_mode(),
380 "WebSocket frame agent call failed"
381 );
382
383 if agent.failure_mode() == FailureMode::Closed {
384 return Ok(AgentResponse::websocket_close(
385 1011,
386 "Agent error".to_string(),
387 ));
388 }
389 }
390 Err(_) => {
391 agent.record_timeout().await;
392 warn!(
393 agent_id = %agent.id(),
394 correlation_id = %event.correlation_id,
395 timeout_ms = agent.timeout_ms(),
396 failure_mode = ?agent.failure_mode(),
397 "WebSocket frame agent call timed out"
398 );
399
400 if agent.failure_mode() == FailureMode::Closed {
401 return Ok(AgentResponse::websocket_close(
402 1011,
403 "Gateway timeout".to_string(),
404 ));
405 }
406 }
407 }
408 }
409
410 Ok(AgentResponse::websocket_allow())
412 }
413
414 async fn process_event<T: serde::Serialize>(
416 &self,
417 event_type: EventType,
418 event: &T,
419 route_agents: &[String],
420 ctx: &AgentCallContext,
421 ) -> SentinelResult<AgentDecision> {
422 trace!(
423 correlation_id = %ctx.correlation_id,
424 event_type = ?event_type,
425 route_agents = ?route_agents,
426 "Starting agent event processing"
427 );
428
429 let agents = self.agents.read().await;
431 let relevant_agents: Vec<_> = route_agents
432 .iter()
433 .filter_map(|id| agents.get(id))
434 .filter(|agent| agent.handles_event(event_type))
435 .collect();
436
437 if relevant_agents.is_empty() {
438 trace!(
439 correlation_id = %ctx.correlation_id,
440 event_type = ?event_type,
441 "No relevant agents for event, allowing request"
442 );
443 return Ok(AgentDecision::default_allow());
444 }
445
446 debug!(
447 correlation_id = %ctx.correlation_id,
448 event_type = ?event_type,
449 agent_count = relevant_agents.len(),
450 agent_ids = ?relevant_agents.iter().map(|a| a.id()).collect::<Vec<_>>(),
451 "Processing event through agents"
452 );
453
454 let mut combined_decision = AgentDecision::default_allow();
456
457 for (agent_index, agent) in relevant_agents.iter().enumerate() {
458 trace!(
459 correlation_id = %ctx.correlation_id,
460 agent_id = %agent.id(),
461 agent_index = agent_index,
462 event_type = ?event_type,
463 "Processing event through agent"
464 );
465
466 let semaphores = self.agent_semaphores.read().await;
468 let agent_semaphore = semaphores.get(agent.id()).cloned();
469 drop(semaphores); let _permit = match agent_semaphore {
472 Some(semaphore) => {
473 trace!(
474 correlation_id = %ctx.correlation_id,
475 agent_id = %agent.id(),
476 "Acquiring per-agent semaphore permit"
477 );
478 Some(semaphore.acquire_owned().await.map_err(|_| {
479 error!(
480 correlation_id = %ctx.correlation_id,
481 agent_id = %agent.id(),
482 "Failed to acquire agent call semaphore permit"
483 );
484 SentinelError::Internal {
485 message: "Failed to acquire agent call permit".to_string(),
486 correlation_id: Some(ctx.correlation_id.to_string()),
487 source: None,
488 }
489 })?)
490 }
491 None => {
492 warn!(
494 correlation_id = %ctx.correlation_id,
495 agent_id = %agent.id(),
496 "No semaphore found for agent, proceeding without queue isolation"
497 );
498 None
499 }
500 };
501
502 if !agent.circuit_breaker().is_closed().await {
504 warn!(
505 agent_id = %agent.id(),
506 correlation_id = %ctx.correlation_id,
507 failure_mode = ?agent.failure_mode(),
508 "Circuit breaker open, skipping agent"
509 );
510
511 if agent.failure_mode() == FailureMode::Closed {
513 debug!(
514 correlation_id = %ctx.correlation_id,
515 agent_id = %agent.id(),
516 "Blocking request due to circuit breaker (fail-closed mode)"
517 );
518 return Ok(AgentDecision::block(503, "Service unavailable"));
519 }
520 continue;
521 }
522
523 let start = Instant::now();
525 let timeout_duration = Duration::from_millis(agent.timeout_ms());
526
527 trace!(
528 correlation_id = %ctx.correlation_id,
529 agent_id = %agent.id(),
530 timeout_ms = agent.timeout_ms(),
531 "Calling agent"
532 );
533
534 match timeout(timeout_duration, agent.call_event(event_type, event)).await {
535 Ok(Ok(response)) => {
536 let duration = start.elapsed();
537 agent.record_success(duration).await;
538
539 trace!(
540 correlation_id = %ctx.correlation_id,
541 agent_id = %agent.id(),
542 duration_ms = duration.as_millis(),
543 decision = ?response,
544 "Agent call succeeded"
545 );
546
547 combined_decision.merge(response.into());
549
550 if !combined_decision.is_allow() {
552 debug!(
553 correlation_id = %ctx.correlation_id,
554 agent_id = %agent.id(),
555 decision = ?combined_decision,
556 "Agent returned blocking decision, stopping agent chain"
557 );
558 break;
559 }
560 }
561 Ok(Err(e)) => {
562 agent.record_failure().await;
563 error!(
564 agent_id = %agent.id(),
565 correlation_id = %ctx.correlation_id,
566 error = %e,
567 duration_ms = start.elapsed().as_millis(),
568 failure_mode = ?agent.failure_mode(),
569 "Agent call failed"
570 );
571
572 if agent.failure_mode() == FailureMode::Closed {
573 return Err(e);
574 }
575 }
576 Err(_) => {
577 agent.record_timeout().await;
578 warn!(
579 agent_id = %agent.id(),
580 correlation_id = %ctx.correlation_id,
581 timeout_ms = agent.timeout_ms(),
582 failure_mode = ?agent.failure_mode(),
583 "Agent call timed out"
584 );
585
586 if agent.failure_mode() == FailureMode::Closed {
587 debug!(
588 correlation_id = %ctx.correlation_id,
589 agent_id = %agent.id(),
590 "Blocking request due to timeout (fail-closed mode)"
591 );
592 return Ok(AgentDecision::block(504, "Gateway timeout"));
593 }
594 }
595 }
596 }
597
598 trace!(
599 correlation_id = %ctx.correlation_id,
600 decision = ?combined_decision,
601 agents_processed = relevant_agents.len(),
602 "Agent event processing completed"
603 );
604
605 Ok(combined_decision)
606 }
607
608 async fn process_event_with_failure_modes<T: serde::Serialize>(
613 &self,
614 event_type: EventType,
615 event: &T,
616 route_agents: &[(String, FailureMode)],
617 ctx: &AgentCallContext,
618 ) -> SentinelResult<AgentDecision> {
619 trace!(
620 correlation_id = %ctx.correlation_id,
621 event_type = ?event_type,
622 route_agents = ?route_agents.iter().map(|(id, _)| id).collect::<Vec<_>>(),
623 "Starting agent event processing with failure modes"
624 );
625
626 let agents = self.agents.read().await;
628 let relevant_agents: Vec<_> = route_agents
629 .iter()
630 .filter_map(|(id, failure_mode)| {
631 agents.get(id).map(|agent| (agent, *failure_mode))
632 })
633 .filter(|(agent, _)| agent.handles_event(event_type))
634 .collect();
635
636 if relevant_agents.is_empty() {
637 trace!(
638 correlation_id = %ctx.correlation_id,
639 event_type = ?event_type,
640 "No relevant agents for event, allowing request"
641 );
642 return Ok(AgentDecision::default_allow());
643 }
644
645 debug!(
646 correlation_id = %ctx.correlation_id,
647 event_type = ?event_type,
648 agent_count = relevant_agents.len(),
649 agent_ids = ?relevant_agents.iter().map(|(a, _)| a.id()).collect::<Vec<_>>(),
650 "Processing event through agents"
651 );
652
653 let mut combined_decision = AgentDecision::default_allow();
655
656 for (agent_index, (agent, filter_failure_mode)) in relevant_agents.iter().enumerate() {
657 trace!(
658 correlation_id = %ctx.correlation_id,
659 agent_id = %agent.id(),
660 agent_index = agent_index,
661 event_type = ?event_type,
662 filter_failure_mode = ?filter_failure_mode,
663 "Processing event through agent with filter failure mode"
664 );
665
666 let semaphores = self.agent_semaphores.read().await;
668 let agent_semaphore = semaphores.get(agent.id()).cloned();
669 drop(semaphores); let _permit = if let Some(semaphore) = agent_semaphore {
672 trace!(
673 correlation_id = %ctx.correlation_id,
674 agent_id = %agent.id(),
675 "Acquiring per-agent semaphore permit"
676 );
677 Some(semaphore.acquire_owned().await.map_err(|_| {
678 error!(
679 correlation_id = %ctx.correlation_id,
680 agent_id = %agent.id(),
681 "Failed to acquire agent call semaphore permit"
682 );
683 SentinelError::Internal {
684 message: "Failed to acquire agent call permit".to_string(),
685 correlation_id: Some(ctx.correlation_id.to_string()),
686 source: None,
687 }
688 })?)
689 } else {
690 warn!(
692 correlation_id = %ctx.correlation_id,
693 agent_id = %agent.id(),
694 "No semaphore found for agent, proceeding without queue isolation"
695 );
696 None
697 };
698
699 if !agent.circuit_breaker().is_closed().await {
701 warn!(
702 agent_id = %agent.id(),
703 correlation_id = %ctx.correlation_id,
704 filter_failure_mode = ?filter_failure_mode,
705 "Circuit breaker open, skipping agent"
706 );
707
708 if *filter_failure_mode == FailureMode::Closed {
710 debug!(
711 correlation_id = %ctx.correlation_id,
712 agent_id = %agent.id(),
713 "Blocking request due to circuit breaker (filter fail-closed mode)"
714 );
715 return Ok(AgentDecision::block(503, "Service unavailable"));
716 }
717 continue;
719 }
720
721 let start = Instant::now();
723 let timeout_duration = Duration::from_millis(agent.timeout_ms());
724
725 trace!(
726 correlation_id = %ctx.correlation_id,
727 agent_id = %agent.id(),
728 timeout_ms = agent.timeout_ms(),
729 "Calling agent"
730 );
731
732 match timeout(timeout_duration, agent.call_event(event_type, event)).await {
733 Ok(Ok(response)) => {
734 let duration = start.elapsed();
735 agent.record_success(duration).await;
736
737 trace!(
738 correlation_id = %ctx.correlation_id,
739 agent_id = %agent.id(),
740 duration_ms = duration.as_millis(),
741 decision = ?response,
742 "Agent call succeeded"
743 );
744
745 combined_decision.merge(response.into());
747
748 if !combined_decision.is_allow() {
750 debug!(
751 correlation_id = %ctx.correlation_id,
752 agent_id = %agent.id(),
753 decision = ?combined_decision,
754 "Agent returned blocking decision, stopping agent chain"
755 );
756 break;
757 }
758 }
759 Ok(Err(e)) => {
760 agent.record_failure().await;
761 error!(
762 agent_id = %agent.id(),
763 correlation_id = %ctx.correlation_id,
764 error = %e,
765 duration_ms = start.elapsed().as_millis(),
766 filter_failure_mode = ?filter_failure_mode,
767 "Agent call failed"
768 );
769
770 if *filter_failure_mode == FailureMode::Closed {
772 debug!(
773 correlation_id = %ctx.correlation_id,
774 agent_id = %agent.id(),
775 "Blocking request due to agent failure (filter fail-closed mode)"
776 );
777 return Ok(AgentDecision::block(503, "Agent unavailable"));
778 }
779 debug!(
781 correlation_id = %ctx.correlation_id,
782 agent_id = %agent.id(),
783 "Continuing despite agent failure (filter fail-open mode)"
784 );
785 }
786 Err(_) => {
787 agent.record_timeout().await;
788 warn!(
789 agent_id = %agent.id(),
790 correlation_id = %ctx.correlation_id,
791 timeout_ms = agent.timeout_ms(),
792 filter_failure_mode = ?filter_failure_mode,
793 "Agent call timed out"
794 );
795
796 if *filter_failure_mode == FailureMode::Closed {
798 debug!(
799 correlation_id = %ctx.correlation_id,
800 agent_id = %agent.id(),
801 "Blocking request due to timeout (filter fail-closed mode)"
802 );
803 return Ok(AgentDecision::block(504, "Gateway timeout"));
804 }
805 debug!(
807 correlation_id = %ctx.correlation_id,
808 agent_id = %agent.id(),
809 "Continuing despite timeout (filter fail-open mode)"
810 );
811 }
812 }
813 }
814
815 trace!(
816 correlation_id = %ctx.correlation_id,
817 decision = ?combined_decision,
818 agents_processed = relevant_agents.len(),
819 "Agent event processing with failure modes completed"
820 );
821
822 Ok(combined_decision)
823 }
824
825 pub async fn initialize(&self) -> SentinelResult<()> {
827 let agents = self.agents.read().await;
828
829 info!(agent_count = agents.len(), "Initializing agent connections");
830
831 let mut initialized_count = 0;
832 let mut failed_count = 0;
833
834 for (id, agent) in agents.iter() {
835 debug!(agent_id = %id, "Initializing agent connection");
836 if let Err(e) = agent.initialize().await {
837 error!(
838 agent_id = %id,
839 error = %e,
840 "Failed to initialize agent"
841 );
842 failed_count += 1;
843 } else {
845 trace!(agent_id = %id, "Agent initialized successfully");
846 initialized_count += 1;
847 }
848 }
849
850 info!(
851 initialized = initialized_count,
852 failed = failed_count,
853 total = agents.len(),
854 "Agent initialization complete"
855 );
856
857 Ok(())
858 }
859
860 pub async fn shutdown(&self) {
862 let agents = self.agents.read().await;
863
864 info!(agent_count = agents.len(), "Shutting down agent manager");
865
866 for (id, agent) in agents.iter() {
867 debug!(agent_id = %id, "Shutting down agent");
868 agent.shutdown().await;
869 trace!(agent_id = %id, "Agent shutdown complete");
870 }
871
872 info!("Agent manager shutdown complete");
873 }
874
875 pub fn metrics(&self) -> &AgentMetrics {
877 &self.metrics
878 }
879
880 pub fn get_agents_for_event(&self, event_type: EventType) -> Vec<String> {
885 if let Ok(agents) = self.agents.try_read() {
888 agents
889 .values()
890 .filter(|agent| agent.handles_event(event_type))
891 .map(|agent| agent.id().to_string())
892 .collect()
893 } else {
894 Vec::new()
895 }
896 }
897}