Skip to main content

symbi_runtime/scheduler/
mod.rs

1//! Agent Runtime Scheduler
2//!
3//! The central orchestrator responsible for managing agent execution across the system.
4
5use async_trait::async_trait;
6use dashmap::DashMap;
7use parking_lot::RwLock;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use tokio::sync::Notify;
11use tokio::time::interval;
12
13use crate::metrics::{
14    LoadBalancerMetrics, MetricsConfig, MetricsExporter, MetricsSnapshot, SchedulerMetrics,
15    SystemResourceMetrics, TaskManagerMetrics,
16};
17use crate::routing::{RouteDecision, RoutingContext, RoutingEngine, SecurityLevel, TaskType};
18use crate::types::*;
19
20pub mod load_balancer;
21pub mod priority_queue;
22pub mod task_manager;
23
24#[cfg(feature = "cron")]
25pub mod cron_scheduler;
26#[cfg(feature = "cron")]
27pub mod cron_types;
28#[cfg(feature = "cron")]
29pub mod delivery;
30#[cfg(feature = "cron")]
31pub mod heartbeat;
32#[cfg(feature = "cron")]
33pub mod job_store;
34#[cfg(feature = "cron")]
35pub mod policy_gate;
36
37use load_balancer::LoadBalancer;
38pub use load_balancer::LoadBalancingStats;
39use priority_queue::PriorityQueue;
40use task_manager::TaskManager;
41
42/// Agent status information returned by the scheduler
43#[derive(Debug, Clone)]
44pub struct AgentStatus {
45    pub agent_id: AgentId,
46    pub state: AgentState,
47    pub last_activity: SystemTime,
48    pub memory_usage: u64,
49    pub cpu_usage: f64,
50    pub active_tasks: u32,
51    pub scheduled_at: SystemTime,
52}
53
54/// Agent scheduler trait
55#[async_trait]
56pub trait AgentScheduler {
57    /// Schedule a new agent for execution
58    async fn schedule_agent(&self, config: AgentConfig) -> Result<AgentId, SchedulerError>;
59
60    /// Reschedule an existing agent with new priority
61    async fn reschedule_agent(
62        &self,
63        agent_id: AgentId,
64        priority: Priority,
65    ) -> Result<(), SchedulerError>;
66
67    /// Terminate an agent
68    async fn terminate_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError>;
69
70    /// Shutdown an agent gracefully
71    async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError>;
72
73    /// Get current system status
74    async fn get_system_status(&self) -> SystemStatus;
75
76    /// Get status of a specific agent
77    async fn get_agent_status(&self, agent_id: AgentId) -> Result<AgentStatus, SchedulerError>;
78
79    /// Shutdown the scheduler
80    async fn shutdown(&self) -> Result<(), SchedulerError>;
81
82    /// Check the health of the scheduler
83    async fn check_health(&self) -> Result<ComponentHealth, SchedulerError>;
84
85    /// List all agents known to the scheduler (both running and queued)
86    async fn list_agents(&self) -> Vec<AgentId>;
87
88    /// Update an existing agent's configuration
89    #[cfg(feature = "http-api")]
90    async fn update_agent(
91        &self,
92        agent_id: AgentId,
93        request: crate::api::types::UpdateAgentRequest,
94    ) -> Result<(), SchedulerError>;
95
96    /// Check whether an agent is registered (regardless of run state)
97    fn has_agent(&self, agent_id: AgentId) -> bool;
98
99    /// Retrieve the stored config for a registered agent
100    fn get_agent_config(&self, agent_id: AgentId) -> Option<AgentConfig>;
101
102    /// Remove an agent from the registry entirely
103    async fn delete_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError>;
104
105    /// Get a reference to the external agents map (for heartbeat/event handlers).
106    #[cfg(feature = "http-api")]
107    fn external_agents(&self) -> &Arc<DashMap<AgentId, crate::api::types::ExternalAgentState>>;
108
109    /// Get the external agent state for a specific agent.
110    #[cfg(feature = "http-api")]
111    fn get_external_agent_state(
112        &self,
113        agent_id: AgentId,
114    ) -> Option<crate::api::types::ExternalAgentState>;
115
116    /// Check all external agents and mark those that have not sent a heartbeat
117    /// within 3x their expected interval as Unreachable.
118    #[cfg(feature = "http-api")]
119    fn check_unreachable_agents(&self);
120}
121
122/// Scheduler configuration
123#[derive(Debug, Clone)]
124pub struct SchedulerConfig {
125    pub max_concurrent_agents: usize,
126    pub priority_levels: u8,
127    pub resource_limits: ResourceLimits,
128    pub scheduling_algorithm: SchedulingAlgorithm,
129    pub load_balancing_strategy: LoadBalancingStrategy,
130    pub task_timeout: Duration,
131    pub health_check_interval: Duration,
132    /// Metrics export configuration. When `Some` and `enabled`, the scheduler
133    /// periodically collects and exports telemetry to the configured backends.
134    pub metrics: Option<MetricsConfig>,
135}
136
137impl Default for SchedulerConfig {
138    fn default() -> Self {
139        Self {
140            max_concurrent_agents: 1000,
141            priority_levels: 4,
142            resource_limits: ResourceLimits::default(),
143            scheduling_algorithm: SchedulingAlgorithm::PriorityBased,
144            load_balancing_strategy: LoadBalancingStrategy::RoundRobin,
145            task_timeout: Duration::from_secs(3600), // 1 hour
146            health_check_interval: Duration::from_secs(30),
147            metrics: None,
148        }
149    }
150}
151
152/// Scheduled task information
153#[derive(Debug, Clone)]
154pub struct ScheduledTask {
155    pub agent_id: AgentId,
156    pub config: AgentConfig,
157    pub priority: Priority,
158    pub scheduled_at: SystemTime,
159    pub deadline: Option<SystemTime>,
160    pub retry_count: u32,
161    pub resource_requirements: ResourceRequirements,
162    pub route_decision: Option<RouteDecision>,
163}
164
165impl ScheduledTask {
166    pub fn new(config: AgentConfig) -> Self {
167        let now = SystemTime::now();
168        Self {
169            agent_id: config.id,
170            priority: config.priority,
171            resource_requirements: config
172                .metadata
173                .get("resource_requirements")
174                .and_then(|s| serde_json::from_str(s).ok())
175                .unwrap_or_default(),
176            config,
177            scheduled_at: now,
178            deadline: None,
179            retry_count: 0,
180            route_decision: None,
181        }
182    }
183
184    /// Build a `RoutingContext` from this scheduled task for routing policy evaluation.
185    pub fn to_routing_context(&self) -> RoutingContext {
186        let security_level = match self.config.security_tier {
187            SecurityTier::None => SecurityLevel::Low,
188            SecurityTier::Tier1 => SecurityLevel::Medium,
189            SecurityTier::Tier2 => SecurityLevel::High,
190            SecurityTier::Tier3 => SecurityLevel::Critical,
191        };
192
193        let capabilities: Vec<String> = self
194            .config
195            .capabilities
196            .iter()
197            .map(|cap| match cap {
198                Capability::FileSystem => "FileSystem".to_string(),
199                Capability::Network => "Network".to_string(),
200                Capability::Database => "Database".to_string(),
201                Capability::Computation => "Computation".to_string(),
202                Capability::Communication => "Communication".to_string(),
203                Capability::Custom(s) => s.clone(),
204            })
205            .collect();
206
207        let task_type = self
208            .config
209            .metadata
210            .get("task_type")
211            .map(|tt| match tt.as_str() {
212                "intent" => TaskType::Intent,
213                "extract" => TaskType::Extract,
214                "template" => TaskType::Template,
215                "boilerplate_code" => TaskType::BoilerplateCode,
216                "code_generation" => TaskType::CodeGeneration,
217                "reasoning" => TaskType::Reasoning,
218                "analysis" => TaskType::Analysis,
219                "summarization" => TaskType::Summarization,
220                "translation" => TaskType::Translation,
221                "qa" => TaskType::QA,
222                other => TaskType::Custom(other.to_string()),
223            })
224            .unwrap_or_else(|| TaskType::Custom("general".to_string()));
225
226        let max_execution_time = self
227            .deadline
228            .and_then(|deadline| deadline.duration_since(SystemTime::now()).ok());
229
230        let mut ctx = RoutingContext::new(self.agent_id, task_type, self.config.dsl_source.clone());
231        ctx.agent_security_level = security_level;
232        ctx.agent_capabilities = capabilities;
233        ctx.max_execution_time = max_execution_time;
234        ctx
235    }
236}
237
238impl PartialEq for ScheduledTask {
239    fn eq(&self, other: &Self) -> bool {
240        self.agent_id == other.agent_id
241    }
242}
243
244impl Eq for ScheduledTask {}
245
246impl PartialOrd for ScheduledTask {
247    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
248        Some(self.cmp(other))
249    }
250}
251
252impl Ord for ScheduledTask {
253    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
254        // Higher priority tasks come first (BinaryHeap is a max-heap)
255        self.priority
256            .cmp(&other.priority)
257            .then_with(|| other.scheduled_at.cmp(&self.scheduled_at))
258    }
259}
260
261/// Information about suspended agents
262#[derive(Debug, Clone)]
263pub struct AgentSuspensionInfo {
264    pub agent_id: AgentId,
265    pub suspended_at: SystemTime,
266    pub suspension_reason: String,
267    pub original_task: ScheduledTask,
268    pub can_resume: bool,
269}
270
271/// Default implementation of the Agent Scheduler
272pub struct DefaultAgentScheduler {
273    config: SchedulerConfig,
274    priority_queue: Arc<RwLock<PriorityQueue<ScheduledTask>>>,
275    load_balancer: Arc<LoadBalancer>,
276    task_manager: Arc<TaskManager>,
277    running_agents: Arc<DashMap<AgentId, ScheduledTask>>,
278    suspended_agents: Arc<DashMap<AgentId, AgentSuspensionInfo>>,
279    /// Persistent registry of all agents that have been scheduled. Agents
280    /// remain here after being dequeued so that status/execute/list continue
281    /// to work even after completion.
282    registered_agents: Arc<DashMap<AgentId, AgentConfig>>,
283    /// State for externally-managed agents (heartbeat, events).
284    #[cfg(feature = "http-api")]
285    external_agents: Arc<DashMap<AgentId, crate::api::types::ExternalAgentState>>,
286    system_metrics: Arc<RwLock<SystemMetrics>>,
287    shutdown_notify: Arc<Notify>,
288    is_running: Arc<RwLock<bool>>,
289    routing_engine: Option<Arc<dyn RoutingEngine>>,
290    metrics_exporter: Option<Arc<dyn MetricsExporter>>,
291}
292
293impl DefaultAgentScheduler {
294    /// Create a new scheduler instance
295    pub async fn new(config: SchedulerConfig) -> Result<Self, SchedulerError> {
296        Self::new_with_routing(config, None).await
297    }
298
299    /// Create a new scheduler instance with optional routing engine
300    pub async fn new_with_routing(
301        config: SchedulerConfig,
302        routing_engine: Option<Arc<dyn RoutingEngine>>,
303    ) -> Result<Self, SchedulerError> {
304        let priority_queue = Arc::new(RwLock::new(PriorityQueue::new()));
305        let load_balancer = Arc::new(LoadBalancer::new(config.load_balancing_strategy.clone()));
306        let task_manager = Arc::new(TaskManager::new(config.task_timeout));
307        let running_agents = Arc::new(DashMap::new());
308        let suspended_agents = Arc::new(DashMap::new());
309        let registered_agents = Arc::new(DashMap::new());
310        #[cfg(feature = "http-api")]
311        let external_agents = Arc::new(DashMap::new());
312        let system_metrics = Arc::new(RwLock::new(SystemMetrics::new()));
313        let shutdown_notify = Arc::new(Notify::new());
314        let is_running = Arc::new(RwLock::new(true));
315
316        // Create metrics exporter if configured and enabled.
317        let metrics_exporter = match config.metrics {
318            Some(ref metrics_config) if metrics_config.enabled => {
319                match crate::metrics::create_exporter(metrics_config) {
320                    Ok(exporter) => {
321                        tracing::info!("Metrics exporter initialized");
322                        Some(exporter)
323                    }
324                    Err(e) => {
325                        tracing::warn!(
326                            "Failed to create metrics exporter, continuing without metrics: {}",
327                            e
328                        );
329                        None
330                    }
331                }
332            }
333            _ => None,
334        };
335
336        let scheduler = Self {
337            config,
338            priority_queue,
339            load_balancer,
340            task_manager,
341            running_agents,
342            suspended_agents,
343            registered_agents,
344            #[cfg(feature = "http-api")]
345            external_agents,
346            system_metrics,
347            shutdown_notify,
348            is_running,
349            routing_engine,
350            metrics_exporter,
351        };
352
353        // Start background tasks
354        scheduler.start_scheduler_loop().await;
355        scheduler.start_health_check_loop().await;
356        scheduler.start_metrics_export_loop().await;
357
358        Ok(scheduler)
359    }
360
361    /// Start the main scheduler loop
362    async fn start_scheduler_loop(&self) {
363        let priority_queue = self.priority_queue.clone();
364        let load_balancer = self.load_balancer.clone();
365        let task_manager = self.task_manager.clone();
366        let running_agents = self.running_agents.clone();
367        let system_metrics = self.system_metrics.clone();
368        let shutdown_notify = self.shutdown_notify.clone();
369        let is_running = self.is_running.clone();
370        let routing_engine = self.routing_engine.clone();
371        let max_concurrent = self.config.max_concurrent_agents;
372
373        tokio::spawn(async move {
374            let mut interval = interval(Duration::from_millis(100));
375
376            loop {
377                tokio::select! {
378                    _ = interval.tick() => {
379                        if !*is_running.read() {
380                            break;
381                        }
382
383                        // Check if we can schedule more agents
384                        if running_agents.len() < max_concurrent {
385                            let task_opt = {
386                                let mut queue = priority_queue.write();
387                                queue.pop()
388                            };
389
390                            if let Some(mut task) = task_opt {
391                                // Evaluate routing policy if a routing engine is configured
392                                if let Some(ref engine) = routing_engine {
393                                    let ctx = task.to_routing_context();
394                                    match engine.route_request(&ctx).await {
395                                        Ok(RouteDecision::Deny { ref reason, ref policy_violated }) => {
396                                            tracing::warn!(
397                                                "Routing policy denied task for agent {}: policy={}, reason={}",
398                                                task.agent_id, policy_violated, reason
399                                            );
400                                            continue;
401                                        }
402                                        Ok(decision) => {
403                                            task.route_decision = Some(decision);
404                                        }
405                                        Err(e) => {
406                                            tracing::warn!(
407                                                "Routing engine error for agent {}, proceeding without decision: {}",
408                                                task.agent_id, e
409                                            );
410                                        }
411                                    }
412                                }
413
414                                // Try to schedule the task
415                                if let Ok(resource_allocation) = load_balancer.allocate_resources(&task.resource_requirements).await {
416                                    running_agents.insert(task.agent_id, task.clone());
417
418                                    if let Err(e) = task_manager.start_task(task.clone()).await {
419                                        tracing::error!("Failed to start task for agent {}: {}", task.agent_id, e);
420                                        running_agents.remove(&task.agent_id);
421                                        load_balancer.deallocate_resources(resource_allocation).await;
422                                    }
423                                } else {
424                                    // Put the task back in the queue if resources aren't available
425                                    let mut queue = priority_queue.write();
426                                    queue.push(task);
427                                }
428                            }
429                        }
430
431                        // Update system metrics
432                        let (running_count, queue_len) = {
433                            let queue = priority_queue.read();
434                            (running_agents.len(), queue.len())
435                        };
436                        system_metrics.write().update(running_count, queue_len);
437                    }
438                    _ = shutdown_notify.notified() => {
439                        break;
440                    }
441                }
442            }
443        });
444    }
445
446    /// Start the health check loop
447    async fn start_health_check_loop(&self) {
448        let task_manager = self.task_manager.clone();
449        let running_agents = self.running_agents.clone();
450        let shutdown_notify = self.shutdown_notify.clone();
451        let is_running = self.is_running.clone();
452        let health_check_interval = self.config.health_check_interval;
453
454        tokio::spawn(async move {
455            let mut interval = interval(health_check_interval);
456
457            loop {
458                tokio::select! {
459                    _ = interval.tick() => {
460                        if !*is_running.read() {
461                            break;
462                        }
463
464                        // Check health of running agents
465                        let mut failed_agents = Vec::new();
466                        for entry in running_agents.iter() {
467                            let agent_id = *entry.key();
468                            if (task_manager.check_task_health(agent_id).await).is_err() {
469                                failed_agents.push(agent_id);
470                            }
471                        }
472
473                        // Remove failed agents
474                        for agent_id in failed_agents {
475                            running_agents.remove(&agent_id);
476                            if let Err(e) = task_manager.terminate_task(agent_id).await {
477                                tracing::error!("Failed to terminate failed agent {}: {}", agent_id, e);
478                            }
479                        }
480                    }
481                    _ = shutdown_notify.notified() => {
482                        break;
483                    }
484                }
485            }
486        });
487    }
488}
489
490#[async_trait]
491impl AgentScheduler for DefaultAgentScheduler {
492    async fn schedule_agent(&self, config: AgentConfig) -> Result<AgentId, SchedulerError> {
493        if !*self.is_running.read() {
494            return Err(SchedulerError::ShuttingDown);
495        }
496
497        // External agents are registered but never enqueued
498        #[cfg(feature = "http-api")]
499        if matches!(
500            config.execution_mode,
501            crate::types::agent::ExecutionMode::External { .. }
502        ) {
503            let agent_id = config.id;
504            self.registered_agents.insert(agent_id, config);
505            self.external_agents
506                .insert(agent_id, crate::api::types::ExternalAgentState::new());
507            tracing::info!("Registered external agent {} (not queued)", agent_id);
508            return Ok(agent_id);
509        }
510
511        let task = ScheduledTask::new(config.clone());
512        let agent_id = task.agent_id;
513
514        // Persist in the registry so the agent survives dequeue
515        self.registered_agents.insert(agent_id, config);
516
517        // Add to priority queue
518        self.priority_queue.write().push(task);
519
520        tracing::info!("Scheduled agent {} for execution", agent_id);
521        Ok(agent_id)
522    }
523
524    async fn reschedule_agent(
525        &self,
526        agent_id: AgentId,
527        priority: Priority,
528    ) -> Result<(), SchedulerError> {
529        if !*self.is_running.read() {
530            return Err(SchedulerError::ShuttingDown);
531        }
532
533        // Check if agent is currently running
534        if let Some(mut entry) = self.running_agents.get_mut(&agent_id) {
535            entry.priority = priority;
536            return Ok(());
537        }
538
539        // Check if agent is in the queue
540        let mut queue = self.priority_queue.write();
541        if let Some(mut task) = queue.remove(&agent_id) {
542            task.priority = priority;
543            queue.push(task);
544            return Ok(());
545        }
546
547        Err(SchedulerError::AgentNotFound { agent_id })
548    }
549
550    async fn terminate_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError> {
551        // Remove from running agents
552        if let Some((_, _task)) = self.running_agents.remove(&agent_id) {
553            self.task_manager
554                .terminate_task(agent_id)
555                .await
556                .map_err(|e| SchedulerError::SchedulingFailed {
557                    agent_id,
558                    reason: format!("Failed to terminate task: {}", e).into(),
559                })?;
560
561            self.registered_agents.remove(&agent_id);
562            tracing::info!("Terminated agent {}", agent_id);
563            return Ok(());
564        }
565
566        // Remove from queue
567        let mut queue = self.priority_queue.write();
568        if queue.remove(&agent_id).is_some() {
569            drop(queue);
570            self.registered_agents.remove(&agent_id);
571            tracing::info!("Removed agent {} from queue", agent_id);
572            return Ok(());
573        }
574
575        Err(SchedulerError::AgentNotFound { agent_id })
576    }
577
578    async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError> {
579        // Check if agent is currently running
580        if let Some((_, _task)) = self.running_agents.remove(&agent_id) {
581            // For graceful shutdown, we use the same task manager termination
582            // but could potentially add graceful shutdown signals in the future
583            self.task_manager
584                .terminate_task(agent_id)
585                .await
586                .map_err(|e| SchedulerError::SchedulingFailed {
587                    agent_id,
588                    reason: format!("Failed to shutdown task: {}", e).into(),
589                })?;
590
591            tracing::info!("Gracefully shutdown agent {}", agent_id);
592            return Ok(());
593        }
594
595        // Remove from queue if not running
596        let mut queue = self.priority_queue.write();
597        if queue.remove(&agent_id).is_some() {
598            tracing::info!("Removed agent {} from queue during shutdown", agent_id);
599            return Ok(());
600        }
601
602        Err(SchedulerError::AgentNotFound { agent_id })
603    }
604
605    async fn get_system_status(&self) -> SystemStatus {
606        let (total_scheduled, uptime) = {
607            let metrics = self.system_metrics.read();
608            let now = SystemTime::now();
609            (metrics.total_scheduled, metrics.uptime_since(now))
610        };
611        let resource_utilization = self.load_balancer.get_resource_utilization().await;
612
613        SystemStatus {
614            total_agents: total_scheduled,
615            running_agents: self.running_agents.len(),
616            suspended_agents: self.suspended_agents.len(),
617            resource_utilization,
618            uptime,
619            last_updated: SystemTime::now(),
620        }
621    }
622
623    async fn get_agent_status(&self, agent_id: AgentId) -> Result<AgentStatus, SchedulerError> {
624        // Check external agents first
625        #[cfg(feature = "http-api")]
626        if let Some(ext) = self.external_agents.get(&agent_id) {
627            let last_activity = ext
628                .last_heartbeat
629                .map(|dt| {
630                    let secs = dt.timestamp() as u64;
631                    std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs)
632                })
633                .unwrap_or(SystemTime::now());
634            return Ok(AgentStatus {
635                agent_id,
636                state: ext.reported_state.clone(),
637                last_activity,
638                memory_usage: 0,
639                cpu_usage: 0.0,
640                active_tasks: 0,
641                scheduled_at: SystemTime::now(),
642            });
643        }
644
645        // Check if agent is currently running
646        if let Some(entry) = self.running_agents.get(&agent_id) {
647            let scheduled_task = entry.value();
648
649            // Get detailed health information from task manager
650            match self.task_manager.check_task_health(agent_id).await {
651                Ok(task_health) => {
652                    // Map TaskStatus to AgentState
653                    let state = match task_health.status {
654                        task_manager::TaskStatus::Pending => AgentState::Ready,
655                        task_manager::TaskStatus::Running => AgentState::Running,
656                        task_manager::TaskStatus::Completed => AgentState::Completed,
657                        task_manager::TaskStatus::Failed => AgentState::Failed,
658                        task_manager::TaskStatus::TimedOut => AgentState::Failed,
659                        task_manager::TaskStatus::Terminated => AgentState::Terminated,
660                    };
661
662                    let active_tasks = if matches!(state, AgentState::Running) {
663                        1
664                    } else {
665                        0
666                    };
667
668                    Ok(AgentStatus {
669                        agent_id,
670                        state,
671                        last_activity: task_health.last_activity,
672                        memory_usage: task_health.memory_usage as u64,
673                        cpu_usage: task_health.cpu_usage as f64,
674                        active_tasks,
675                        scheduled_at: scheduled_task.scheduled_at,
676                    })
677                }
678                Err(_) => {
679                    // Agent exists but health check failed - might be in error state
680                    Ok(AgentStatus {
681                        agent_id,
682                        state: AgentState::Failed,
683                        last_activity: scheduled_task.scheduled_at,
684                        memory_usage: 0,
685                        cpu_usage: 0.0,
686                        active_tasks: 0,
687                        scheduled_at: scheduled_task.scheduled_at,
688                    })
689                }
690            }
691        } else {
692            // Check if agent is in the queue
693            let queue = self.priority_queue.read();
694            if let Some(task) = queue.find(&agent_id) {
695                // Agent is queued but not yet running
696                Ok(AgentStatus {
697                    agent_id,
698                    state: AgentState::Waiting,
699                    last_activity: task.scheduled_at,
700                    memory_usage: 0,
701                    cpu_usage: 0.0,
702                    active_tasks: 0,
703                    scheduled_at: task.scheduled_at,
704                })
705            } else if self.registered_agents.contains_key(&agent_id) {
706                // Agent was registered but already ran and was dequeued
707                Ok(AgentStatus {
708                    agent_id,
709                    state: AgentState::Completed,
710                    last_activity: SystemTime::now(),
711                    memory_usage: 0,
712                    cpu_usage: 0.0,
713                    active_tasks: 0,
714                    scheduled_at: SystemTime::now(),
715                })
716            } else {
717                // Agent not found anywhere
718                Err(SchedulerError::AgentNotFound { agent_id })
719            }
720        }
721    }
722
723    async fn shutdown(&self) -> Result<(), SchedulerError> {
724        // Check if already shutting down (idempotent)
725        {
726            let is_running = self.is_running.read();
727            if !*is_running {
728                tracing::debug!("Scheduler already shutdown");
729                return Ok(());
730            }
731        }
732
733        tracing::info!("Initiating graceful scheduler shutdown");
734
735        // Set shutdown flag and notify background tasks
736        *self.is_running.write() = false;
737        self.shutdown_notify.notify_waiters();
738
739        // Step 1: Stop accepting new agents (already done by setting is_running=false)
740
741        // Step 2: Gracefully shutdown all running agents with timeout
742        let running_agent_ids: Vec<AgentId> = self
743            .running_agents
744            .iter()
745            .map(|entry| *entry.key())
746            .collect();
747
748        tracing::info!(
749            "Shutting down {} running agents gracefully",
750            running_agent_ids.len()
751        );
752
753        // First pass: attempt graceful shutdown
754        let graceful_timeout = Duration::from_secs(30);
755        let graceful_start = std::time::Instant::now();
756
757        for agent_id in &running_agent_ids {
758            if graceful_start.elapsed() >= graceful_timeout {
759                tracing::warn!(
760                    "Graceful shutdown timeout reached, switching to forced termination"
761                );
762                break;
763            }
764
765            // Use graceful shutdown method first
766            if let Err(e) = self.shutdown_agent(*agent_id).await {
767                tracing::warn!(
768                    "Failed to gracefully shutdown agent {}: {}, will force terminate",
769                    agent_id,
770                    e
771                );
772            }
773        }
774
775        // Wait a bit for agents to terminate gracefully
776        tokio::time::sleep(Duration::from_secs(5)).await;
777
778        // Step 3: Force terminate any remaining agents
779        let remaining_agent_ids: Vec<AgentId> = self
780            .running_agents
781            .iter()
782            .map(|entry| *entry.key())
783            .collect();
784
785        if !remaining_agent_ids.is_empty() {
786            tracing::warn!(
787                "Force terminating {} remaining agents",
788                remaining_agent_ids.len()
789            );
790
791            for agent_id in remaining_agent_ids {
792                if let Err(e) = self.terminate_agent(agent_id).await {
793                    tracing::error!(
794                        "Failed to force terminate agent {} during shutdown: {}",
795                        agent_id,
796                        e
797                    );
798                }
799            }
800        }
801
802        // Step 4: Flush metrics to persistent storage
803        self.flush_metrics().await?;
804
805        // Step 5: Release all allocated resources
806        self.cleanup_resources().await?;
807
808        // Step 6: Final cleanup of queued agents
809        {
810            let mut queue = self.priority_queue.write();
811            let queued_count = queue.len();
812            if queued_count > 0 {
813                tracing::info!("Clearing {} queued agents", queued_count);
814                queue.clear();
815            }
816        }
817
818        tracing::info!("Scheduler shutdown completed successfully");
819        Ok(())
820    }
821
822    async fn check_health(&self) -> Result<ComponentHealth, SchedulerError> {
823        let is_running = *self.is_running.read();
824        if !is_running {
825            return Ok(ComponentHealth::unhealthy(
826                "Scheduler is shut down".to_string(),
827            ));
828        }
829
830        let (total_scheduled, uptime) = {
831            let metrics = self.system_metrics.read();
832            let now = SystemTime::now();
833            (metrics.total_scheduled, metrics.uptime_since(now))
834        };
835
836        let running_count = self.running_agents.len();
837        let queue_len = self.priority_queue.read().len();
838        let load_factor = running_count as f64 / self.config.max_concurrent_agents as f64;
839
840        let status = if load_factor > 0.9 {
841            ComponentHealth::degraded(format!(
842                "High load: {:.1}% capacity used ({}/{})",
843                load_factor * 100.0,
844                running_count,
845                self.config.max_concurrent_agents
846            ))
847        } else if queue_len > 1000 {
848            ComponentHealth::degraded(format!("Large queue: {} agents waiting", queue_len))
849        } else {
850            ComponentHealth::healthy(Some(format!(
851                "Running normally: {} active agents, {} queued",
852                running_count, queue_len
853            )))
854        };
855
856        Ok(status
857            .with_uptime(uptime)
858            .with_metric("running_agents".to_string(), running_count.to_string())
859            .with_metric("queued_agents".to_string(), queue_len.to_string())
860            .with_metric("total_scheduled".to_string(), total_scheduled.to_string())
861            .with_metric(
862                "max_capacity".to_string(),
863                self.config.max_concurrent_agents.to_string(),
864            )
865            .with_metric("load_factor".to_string(), format!("{:.2}", load_factor)))
866    }
867
868    async fn list_agents(&self) -> Vec<AgentId> {
869        // Return all registered agents (running, queued, and completed)
870        self.registered_agents
871            .iter()
872            .map(|entry| *entry.key())
873            .collect()
874    }
875
876    #[cfg(feature = "http-api")]
877    async fn update_agent(
878        &self,
879        agent_id: AgentId,
880        request: crate::api::types::UpdateAgentRequest,
881    ) -> Result<(), SchedulerError> {
882        if !*self.is_running.read() {
883            return Err(SchedulerError::ShuttingDown);
884        }
885
886        // Check if agent is currently running
887        if let Some(mut entry) = self.running_agents.get_mut(&agent_id) {
888            let task = entry.value_mut();
889
890            // Update the agent configuration
891            if let Some(name) = request.name {
892                task.config.name = name;
893            }
894
895            if let Some(dsl) = request.dsl {
896                task.config.dsl_source = dsl;
897            }
898
899            tracing::info!("Updated running agent {}", agent_id);
900            return Ok(());
901        }
902
903        // Check if agent is in the queue
904        let mut queue = self.priority_queue.write();
905        if let Some(mut task) = queue.remove(&agent_id) {
906            // Update the agent configuration
907            if let Some(name) = request.name {
908                task.config.name = name;
909            }
910
911            if let Some(dsl) = request.dsl {
912                task.config.dsl_source = dsl;
913            }
914
915            // Put it back in the queue
916            queue.push(task);
917            tracing::info!("Updated queued agent {}", agent_id);
918            return Ok(());
919        }
920
921        Err(SchedulerError::AgentNotFound { agent_id })
922    }
923
924    fn has_agent(&self, agent_id: AgentId) -> bool {
925        self.registered_agents.contains_key(&agent_id)
926    }
927
928    fn get_agent_config(&self, agent_id: AgentId) -> Option<AgentConfig> {
929        self.registered_agents.get(&agent_id).map(|r| r.clone())
930    }
931
932    async fn delete_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError> {
933        // Remove from running agents if present
934        if let Some((_, _)) = self.running_agents.remove(&agent_id) {
935            let _ = self.task_manager.terminate_task(agent_id).await;
936        }
937
938        // Remove from queue if present
939        {
940            let mut queue = self.priority_queue.write();
941            queue.remove(&agent_id);
942        }
943
944        // Remove external agent state if present
945        #[cfg(feature = "http-api")]
946        self.external_agents.remove(&agent_id);
947
948        // Remove from registry
949        if self.registered_agents.remove(&agent_id).is_some() {
950            tracing::info!("Deleted agent {} from registry", agent_id);
951            Ok(())
952        } else {
953            Err(SchedulerError::AgentNotFound { agent_id })
954        }
955    }
956
957    #[cfg(feature = "http-api")]
958    fn external_agents(&self) -> &Arc<DashMap<AgentId, crate::api::types::ExternalAgentState>> {
959        &self.external_agents
960    }
961
962    #[cfg(feature = "http-api")]
963    fn get_external_agent_state(
964        &self,
965        agent_id: AgentId,
966    ) -> Option<crate::api::types::ExternalAgentState> {
967        self.external_agents.get(&agent_id).map(|r| r.clone())
968    }
969
970    #[cfg(feature = "http-api")]
971    fn check_unreachable_agents(&self) {
972        let now = chrono::Utc::now();
973
974        for mut entry in self.external_agents.iter_mut() {
975            let agent_id = *entry.key();
976            let ext_state = entry.value_mut();
977
978            // Skip agents already marked unreachable or in terminal states
979            if ext_state.reported_state == crate::types::AgentState::Unreachable {
980                continue;
981            }
982
983            // Get heartbeat interval from config
984            let interval_secs = self
985                .registered_agents
986                .get(&agent_id)
987                .and_then(|config| match &config.execution_mode {
988                    crate::types::agent::ExecutionMode::External {
989                        heartbeat_interval_secs,
990                        ..
991                    } => Some(*heartbeat_interval_secs),
992                    _ => None,
993                })
994                .unwrap_or(60);
995
996            let threshold = chrono::Duration::seconds((interval_secs * 3) as i64);
997
998            if let Some(last_hb) = ext_state.last_heartbeat {
999                if now - last_hb > threshold {
1000                    tracing::warn!(
1001                        "External agent {} is unreachable (no heartbeat for {}s)",
1002                        agent_id,
1003                        (now - last_hb).num_seconds()
1004                    );
1005                    ext_state.reported_state = crate::types::AgentState::Unreachable;
1006                }
1007            }
1008            // If last_heartbeat is None: agent just registered, don't mark unreachable yet.
1009        }
1010    }
1011}
1012
1013impl DefaultAgentScheduler {
1014    /// Start the periodic metrics export loop.
1015    async fn start_metrics_export_loop(&self) {
1016        let exporter = match self.metrics_exporter.clone() {
1017            Some(e) => e,
1018            None => return,
1019        };
1020
1021        let priority_queue = self.priority_queue.clone();
1022        let running_agents = self.running_agents.clone();
1023        let suspended_agents = self.suspended_agents.clone();
1024        let system_metrics = self.system_metrics.clone();
1025        let task_manager = self.task_manager.clone();
1026        let load_balancer = self.load_balancer.clone();
1027        let shutdown_notify = self.shutdown_notify.clone();
1028        let is_running = self.is_running.clone();
1029        let max_concurrent = self.config.max_concurrent_agents;
1030        let interval_secs = self
1031            .config
1032            .metrics
1033            .as_ref()
1034            .map(|m| m.export_interval_seconds)
1035            .unwrap_or(60);
1036
1037        tokio::spawn(async move {
1038            let mut tick = interval(Duration::from_secs(interval_secs));
1039
1040            loop {
1041                tokio::select! {
1042                    _ = tick.tick() => {
1043                        if !*is_running.read() {
1044                            break;
1045                        }
1046
1047                        let snapshot = Self::build_metrics_snapshot(
1048                            &priority_queue,
1049                            &running_agents,
1050                            &suspended_agents,
1051                            &system_metrics,
1052                            &task_manager,
1053                            &load_balancer,
1054                            max_concurrent,
1055                        )
1056                        .await;
1057
1058                        if let Err(e) = exporter.export(&snapshot).await {
1059                            tracing::warn!("Periodic metrics export failed: {}", e);
1060                        }
1061                    }
1062                    _ = shutdown_notify.notified() => {
1063                        break;
1064                    }
1065                }
1066            }
1067        });
1068    }
1069
1070    /// Build a point-in-time metrics snapshot from all scheduler components.
1071    async fn build_metrics_snapshot(
1072        priority_queue: &Arc<RwLock<PriorityQueue<ScheduledTask>>>,
1073        running_agents: &Arc<DashMap<AgentId, ScheduledTask>>,
1074        suspended_agents: &Arc<DashMap<AgentId, AgentSuspensionInfo>>,
1075        system_metrics: &Arc<RwLock<SystemMetrics>>,
1076        task_manager: &Arc<TaskManager>,
1077        load_balancer: &Arc<LoadBalancer>,
1078        max_concurrent: usize,
1079    ) -> MetricsSnapshot {
1080        let (total_scheduled, uptime) = {
1081            let metrics = system_metrics.read();
1082            let now = SystemTime::now();
1083            (metrics.total_scheduled, metrics.uptime_since(now))
1084        };
1085        let running_count = running_agents.len();
1086        let queued_count = priority_queue.read().len();
1087        let suspended_count = suspended_agents.len();
1088        let load_factor = running_count as f64 / max_concurrent as f64;
1089
1090        let task_stats = task_manager.get_task_statistics().await;
1091        let lb_stats = load_balancer.get_statistics().await;
1092        let resource_usage = load_balancer.get_resource_utilization().await;
1093
1094        MetricsSnapshot {
1095            timestamp: SystemTime::now()
1096                .duration_since(SystemTime::UNIX_EPOCH)
1097                .unwrap_or_default()
1098                .as_secs(),
1099            scheduler: SchedulerMetrics {
1100                total_scheduled,
1101                uptime_seconds: uptime.as_secs(),
1102                running_agents: running_count,
1103                queued_agents: queued_count,
1104                suspended_agents: suspended_count,
1105                max_capacity: max_concurrent,
1106                load_factor,
1107            },
1108            task_manager: TaskManagerMetrics {
1109                total_tasks: task_stats.total_tasks,
1110                healthy_tasks: task_stats.healthy_tasks,
1111                average_uptime_seconds: task_stats.average_uptime.as_secs_f64(),
1112                total_memory_usage: task_stats.total_memory_usage,
1113            },
1114            load_balancer: LoadBalancerMetrics {
1115                total_allocations: lb_stats.total_allocations,
1116                active_allocations: lb_stats.active_allocations,
1117                memory_utilization: lb_stats.memory_utilization as f64,
1118                cpu_utilization: lb_stats.cpu_utilization as f64,
1119                allocation_failures: lb_stats.allocation_failures,
1120                average_allocation_time_ms: lb_stats.average_allocation_time.as_secs_f64() * 1000.0,
1121            },
1122            system: SystemResourceMetrics {
1123                memory_usage_mb: resource_usage.memory_used as f64 / (1024.0 * 1024.0),
1124                cpu_usage_percent: resource_usage.cpu_utilization as f64,
1125            },
1126            compaction: None,
1127        }
1128    }
1129
1130    /// Collect and export a final metrics snapshot, then shut down the exporter.
1131    async fn flush_metrics(&self) -> Result<(), SchedulerError> {
1132        tracing::debug!("Flushing scheduler metrics");
1133
1134        if let Some(ref exporter) = self.metrics_exporter {
1135            let snapshot = Self::build_metrics_snapshot(
1136                &self.priority_queue,
1137                &self.running_agents,
1138                &self.suspended_agents,
1139                &self.system_metrics,
1140                &self.task_manager,
1141                &self.load_balancer,
1142                self.config.max_concurrent_agents,
1143            )
1144            .await;
1145
1146            if let Err(e) = exporter.export(&snapshot).await {
1147                tracing::warn!("Final metrics export failed: {}", e);
1148            }
1149
1150            if let Err(e) = exporter.shutdown().await {
1151                tracing::warn!("Metrics exporter shutdown failed: {}", e);
1152            }
1153        }
1154
1155        // Log summary regardless of exporter presence.
1156        let (total_scheduled, uptime) = {
1157            let metrics = self.system_metrics.read();
1158            let now = SystemTime::now();
1159            (metrics.total_scheduled, metrics.uptime_since(now))
1160        };
1161        tracing::info!(
1162            "Scheduler shutdown metrics - total_scheduled={}, uptime={:?}, \
1163             running={}, queued={}, suspended={}",
1164            total_scheduled,
1165            uptime,
1166            self.running_agents.len(),
1167            self.priority_queue.read().len(),
1168            self.suspended_agents.len(),
1169        );
1170
1171        Ok(())
1172    }
1173
1174    /// Clean up all allocated resources
1175    async fn cleanup_resources(&self) -> Result<(), SchedulerError> {
1176        tracing::debug!("Cleaning up allocated resources");
1177
1178        // Get all allocated agents and their resource allocations
1179        let allocated_agents: Vec<AgentId> = self
1180            .running_agents
1181            .iter()
1182            .map(|entry| *entry.key())
1183            .collect();
1184
1185        // For each agent, ensure resources are properly deallocated
1186        for agent_id in allocated_agents {
1187            // Create a dummy allocation for cleanup
1188            // In a real implementation, we'd track actual allocations
1189            let allocation = ResourceAllocation {
1190                agent_id,
1191                allocated_memory: 0, // Would be tracked from actual allocation
1192                allocated_cpu_cores: 0.0,
1193                allocated_disk_io: 0,
1194                allocated_network_io: 0,
1195                allocation_time: SystemTime::now(),
1196            };
1197
1198            self.load_balancer.deallocate_resources(allocation).await;
1199        }
1200
1201        // Additional cleanup for task manager resources
1202        // The task manager will handle process cleanup in its own termination methods
1203
1204        tracing::debug!("Resource cleanup completed");
1205        Ok(())
1206    }
1207
1208    /// Suspend an agent (moves from running to suspended state)
1209    pub async fn suspend_agent(
1210        &self,
1211        agent_id: AgentId,
1212        reason: String,
1213    ) -> Result<(), SchedulerError> {
1214        if let Some((_, task)) = self.running_agents.remove(&agent_id) {
1215            // Stop the task
1216            if let Err(e) = self.task_manager.terminate_task(agent_id).await {
1217                tracing::error!("Failed to terminate task during suspension: {}", e);
1218                // Put the agent back in running state if we can't stop it
1219                self.running_agents.insert(agent_id, task);
1220                return Err(SchedulerError::SchedulingFailed {
1221                    agent_id,
1222                    reason: format!("Failed to suspend agent: {}", e).into(),
1223                });
1224            }
1225
1226            // Create suspension info
1227            let suspension_info = AgentSuspensionInfo {
1228                agent_id,
1229                suspended_at: SystemTime::now(),
1230                suspension_reason: reason.clone(),
1231                original_task: task,
1232                can_resume: true,
1233            };
1234
1235            // Store in suspended agents
1236            self.suspended_agents.insert(agent_id, suspension_info);
1237
1238            tracing::info!("Suspended agent {} with reason: {}", agent_id, reason);
1239            Ok(())
1240        } else {
1241            Err(SchedulerError::AgentNotFound { agent_id })
1242        }
1243    }
1244
1245    /// Resume a suspended agent
1246    pub async fn resume_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError> {
1247        if let Some((_, suspension_info)) = self.suspended_agents.remove(&agent_id) {
1248            if !suspension_info.can_resume {
1249                return Err(SchedulerError::SchedulingFailed {
1250                    agent_id,
1251                    reason: "Agent cannot be resumed".into(),
1252                });
1253            }
1254
1255            // Add back to priority queue for scheduling
1256            let mut task = suspension_info.original_task;
1257            task.scheduled_at = SystemTime::now(); // Update schedule time
1258
1259            self.priority_queue.write().push(task);
1260
1261            tracing::info!("Resumed agent {} from suspension", agent_id);
1262            Ok(())
1263        } else {
1264            Err(SchedulerError::AgentNotFound { agent_id })
1265        }
1266    }
1267
1268    /// Get list of suspended agents
1269    pub async fn list_suspended_agents(&self) -> Vec<AgentSuspensionInfo> {
1270        self.suspended_agents
1271            .iter()
1272            .map(|entry| entry.value().clone())
1273            .collect()
1274    }
1275}
1276
1277/// System metrics for monitoring
1278#[derive(Debug, Clone)]
1279struct SystemMetrics {
1280    total_scheduled: usize,
1281    start_time: SystemTime,
1282}
1283
1284impl SystemMetrics {
1285    fn new() -> Self {
1286        Self {
1287            total_scheduled: 0,
1288            start_time: SystemTime::now(),
1289        }
1290    }
1291
1292    fn update(&mut self, running: usize, queued: usize) {
1293        self.total_scheduled = running + queued;
1294    }
1295
1296    fn uptime_since(&self, now: SystemTime) -> Duration {
1297        now.duration_since(self.start_time).unwrap_or_default()
1298    }
1299}
1300
1301#[cfg(test)]
1302mod tests {
1303    use super::*;
1304    use std::collections::HashMap;
1305
1306    fn make_test_config() -> AgentConfig {
1307        AgentConfig {
1308            id: AgentId::new(),
1309            name: "test-agent".to_string(),
1310            dsl_source: "do something useful".to_string(),
1311            execution_mode: ExecutionMode::Ephemeral,
1312            security_tier: SecurityTier::Tier2,
1313            resource_limits: ResourceLimits::default(),
1314            capabilities: vec![
1315                Capability::FileSystem,
1316                Capability::Network,
1317                Capability::Computation,
1318            ],
1319            policies: vec![],
1320            metadata: HashMap::new(),
1321            priority: Priority::default(),
1322        }
1323    }
1324
1325    #[test]
1326    fn test_routing_context_from_scheduled_task() {
1327        let config = make_test_config();
1328        let mut task = ScheduledTask::new(config);
1329        task.deadline = Some(SystemTime::now() + Duration::from_secs(300));
1330
1331        let ctx = task.to_routing_context();
1332
1333        assert_eq!(ctx.agent_id, task.agent_id);
1334        assert_eq!(ctx.agent_security_level, SecurityLevel::High);
1335        assert_eq!(ctx.prompt, "do something useful");
1336        assert_eq!(
1337            ctx.agent_capabilities,
1338            vec!["FileSystem", "Network", "Computation"]
1339        );
1340        assert!(ctx.max_execution_time.is_some());
1341        assert!(matches!(ctx.task_type, TaskType::Custom(ref s) if s == "general"));
1342    }
1343
1344    #[test]
1345    fn test_routing_context_custom_task_type() {
1346        let mut config = make_test_config();
1347        config
1348            .metadata
1349            .insert("task_type".to_string(), "analysis".to_string());
1350
1351        let task = ScheduledTask::new(config);
1352        let ctx = task.to_routing_context();
1353
1354        assert!(matches!(ctx.task_type, TaskType::Analysis));
1355    }
1356
1357    #[test]
1358    fn test_routing_context_default_task_type() {
1359        let config = make_test_config();
1360        let task = ScheduledTask::new(config);
1361        let ctx = task.to_routing_context();
1362
1363        assert!(matches!(ctx.task_type, TaskType::Custom(ref s) if s == "general"));
1364    }
1365
1366    #[test]
1367    fn test_scheduled_task_route_decision_default_none() {
1368        let config = make_test_config();
1369        let task = ScheduledTask::new(config);
1370
1371        assert!(task.route_decision.is_none());
1372    }
1373
1374    #[cfg(feature = "http-api")]
1375    #[tokio::test]
1376    async fn test_external_agent_not_queued() {
1377        let scheduler = DefaultAgentScheduler::new(SchedulerConfig::default())
1378            .await
1379            .unwrap();
1380
1381        let mut config = make_test_config();
1382        config.execution_mode = ExecutionMode::External {
1383            endpoint: None,
1384            agentpin_domain: None,
1385            heartbeat_interval_secs: 60,
1386        };
1387        let agent_id = config.id;
1388
1389        let result = scheduler.schedule_agent(config).await;
1390        assert!(result.is_ok());
1391
1392        // Agent should be registered
1393        assert!(scheduler.has_agent(agent_id));
1394
1395        // Agent should NOT be in the priority queue
1396        assert_eq!(scheduler.priority_queue.read().len(), 0);
1397
1398        // Status should return the external state
1399        let status = scheduler.get_agent_status(agent_id).await.unwrap();
1400        assert_eq!(status.agent_id, agent_id);
1401    }
1402
1403    #[cfg(feature = "http-api")]
1404    #[tokio::test]
1405    async fn test_unreachable_detection() {
1406        let scheduler = DefaultAgentScheduler::new(SchedulerConfig::default())
1407            .await
1408            .unwrap();
1409
1410        let mut config = make_test_config();
1411        config.execution_mode = ExecutionMode::External {
1412            endpoint: None,
1413            agentpin_domain: None,
1414            heartbeat_interval_secs: 1, // 1 second interval -> 3s threshold
1415        };
1416        let agent_id = config.id;
1417
1418        scheduler.schedule_agent(config).await.unwrap();
1419
1420        // Set a heartbeat in the past (> 3 seconds ago)
1421        {
1422            let mut entry = scheduler.external_agents.get_mut(&agent_id).unwrap();
1423            entry.last_heartbeat = Some(chrono::Utc::now() - chrono::Duration::seconds(10));
1424            entry.reported_state = crate::types::AgentState::Running;
1425        }
1426
1427        scheduler.check_unreachable_agents();
1428
1429        let entry = scheduler.external_agents.get(&agent_id).unwrap();
1430        assert_eq!(entry.reported_state, crate::types::AgentState::Unreachable);
1431    }
1432}