1use async_trait::async_trait;
6use dashmap::DashMap;
7use parking_lot::RwLock;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use tokio::sync::Notify;
11use tokio::time::interval;
12
13use crate::metrics::{
14 LoadBalancerMetrics, MetricsConfig, MetricsExporter, MetricsSnapshot, SchedulerMetrics,
15 SystemResourceMetrics, TaskManagerMetrics,
16};
17use crate::routing::{RouteDecision, RoutingContext, RoutingEngine, SecurityLevel, TaskType};
18use crate::types::*;
19
20pub mod load_balancer;
21pub mod priority_queue;
22pub mod task_manager;
23
24#[cfg(feature = "cron")]
25pub mod cron_scheduler;
26#[cfg(feature = "cron")]
27pub mod cron_types;
28#[cfg(feature = "cron")]
29pub mod delivery;
30#[cfg(feature = "cron")]
31pub mod heartbeat;
32#[cfg(feature = "cron")]
33pub mod job_store;
34#[cfg(feature = "cron")]
35pub mod policy_gate;
36
37use load_balancer::LoadBalancer;
38pub use load_balancer::LoadBalancingStats;
39use priority_queue::PriorityQueue;
40use task_manager::TaskManager;
41
42#[derive(Debug, Clone)]
44pub struct AgentStatus {
45 pub agent_id: AgentId,
46 pub state: AgentState,
47 pub last_activity: SystemTime,
48 pub memory_usage: u64,
49 pub cpu_usage: f64,
50 pub active_tasks: u32,
51 pub scheduled_at: SystemTime,
52}
53
54#[async_trait]
56pub trait AgentScheduler {
57 async fn schedule_agent(&self, config: AgentConfig) -> Result<AgentId, SchedulerError>;
59
60 async fn reschedule_agent(
62 &self,
63 agent_id: AgentId,
64 priority: Priority,
65 ) -> Result<(), SchedulerError>;
66
67 async fn terminate_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError>;
69
70 async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError>;
72
73 async fn get_system_status(&self) -> SystemStatus;
75
76 async fn get_agent_status(&self, agent_id: AgentId) -> Result<AgentStatus, SchedulerError>;
78
79 async fn shutdown(&self) -> Result<(), SchedulerError>;
81
82 async fn check_health(&self) -> Result<ComponentHealth, SchedulerError>;
84
85 async fn list_agents(&self) -> Vec<AgentId>;
87
88 #[cfg(feature = "http-api")]
90 async fn update_agent(
91 &self,
92 agent_id: AgentId,
93 request: crate::api::types::UpdateAgentRequest,
94 ) -> Result<(), SchedulerError>;
95
96 fn has_agent(&self, agent_id: AgentId) -> bool;
98
99 fn get_agent_config(&self, agent_id: AgentId) -> Option<AgentConfig>;
101
102 async fn delete_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError>;
104
105 #[cfg(feature = "http-api")]
107 fn external_agents(&self) -> &Arc<DashMap<AgentId, crate::api::types::ExternalAgentState>>;
108
109 #[cfg(feature = "http-api")]
111 fn get_external_agent_state(
112 &self,
113 agent_id: AgentId,
114 ) -> Option<crate::api::types::ExternalAgentState>;
115
116 #[cfg(feature = "http-api")]
119 fn check_unreachable_agents(&self);
120}
121
122#[derive(Debug, Clone)]
124pub struct SchedulerConfig {
125 pub max_concurrent_agents: usize,
126 pub priority_levels: u8,
127 pub resource_limits: ResourceLimits,
128 pub scheduling_algorithm: SchedulingAlgorithm,
129 pub load_balancing_strategy: LoadBalancingStrategy,
130 pub task_timeout: Duration,
131 pub health_check_interval: Duration,
132 pub metrics: Option<MetricsConfig>,
135}
136
137impl Default for SchedulerConfig {
138 fn default() -> Self {
139 Self {
140 max_concurrent_agents: 1000,
141 priority_levels: 4,
142 resource_limits: ResourceLimits::default(),
143 scheduling_algorithm: SchedulingAlgorithm::PriorityBased,
144 load_balancing_strategy: LoadBalancingStrategy::RoundRobin,
145 task_timeout: Duration::from_secs(3600), health_check_interval: Duration::from_secs(30),
147 metrics: None,
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct ScheduledTask {
155 pub agent_id: AgentId,
156 pub config: AgentConfig,
157 pub priority: Priority,
158 pub scheduled_at: SystemTime,
159 pub deadline: Option<SystemTime>,
160 pub retry_count: u32,
161 pub resource_requirements: ResourceRequirements,
162 pub route_decision: Option<RouteDecision>,
163}
164
165impl ScheduledTask {
166 pub fn new(config: AgentConfig) -> Self {
167 let now = SystemTime::now();
168 Self {
169 agent_id: config.id,
170 priority: config.priority,
171 resource_requirements: config
172 .metadata
173 .get("resource_requirements")
174 .and_then(|s| serde_json::from_str(s).ok())
175 .unwrap_or_default(),
176 config,
177 scheduled_at: now,
178 deadline: None,
179 retry_count: 0,
180 route_decision: None,
181 }
182 }
183
184 pub fn to_routing_context(&self) -> RoutingContext {
186 let security_level = match self.config.security_tier {
187 SecurityTier::None => SecurityLevel::Low,
188 SecurityTier::Tier1 => SecurityLevel::Medium,
189 SecurityTier::Tier2 => SecurityLevel::High,
190 SecurityTier::Tier3 => SecurityLevel::Critical,
191 };
192
193 let capabilities: Vec<String> = self
194 .config
195 .capabilities
196 .iter()
197 .map(|cap| match cap {
198 Capability::FileSystem => "FileSystem".to_string(),
199 Capability::Network => "Network".to_string(),
200 Capability::Database => "Database".to_string(),
201 Capability::Computation => "Computation".to_string(),
202 Capability::Communication => "Communication".to_string(),
203 Capability::Custom(s) => s.clone(),
204 })
205 .collect();
206
207 let task_type = self
208 .config
209 .metadata
210 .get("task_type")
211 .map(|tt| match tt.as_str() {
212 "intent" => TaskType::Intent,
213 "extract" => TaskType::Extract,
214 "template" => TaskType::Template,
215 "boilerplate_code" => TaskType::BoilerplateCode,
216 "code_generation" => TaskType::CodeGeneration,
217 "reasoning" => TaskType::Reasoning,
218 "analysis" => TaskType::Analysis,
219 "summarization" => TaskType::Summarization,
220 "translation" => TaskType::Translation,
221 "qa" => TaskType::QA,
222 other => TaskType::Custom(other.to_string()),
223 })
224 .unwrap_or_else(|| TaskType::Custom("general".to_string()));
225
226 let max_execution_time = self
227 .deadline
228 .and_then(|deadline| deadline.duration_since(SystemTime::now()).ok());
229
230 let mut ctx = RoutingContext::new(self.agent_id, task_type, self.config.dsl_source.clone());
231 ctx.agent_security_level = security_level;
232 ctx.agent_capabilities = capabilities;
233 ctx.max_execution_time = max_execution_time;
234 ctx
235 }
236}
237
238impl PartialEq for ScheduledTask {
239 fn eq(&self, other: &Self) -> bool {
240 self.agent_id == other.agent_id
241 }
242}
243
244impl Eq for ScheduledTask {}
245
246impl PartialOrd for ScheduledTask {
247 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
248 Some(self.cmp(other))
249 }
250}
251
252impl Ord for ScheduledTask {
253 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
254 self.priority
256 .cmp(&other.priority)
257 .then_with(|| other.scheduled_at.cmp(&self.scheduled_at))
258 }
259}
260
261#[derive(Debug, Clone)]
263pub struct AgentSuspensionInfo {
264 pub agent_id: AgentId,
265 pub suspended_at: SystemTime,
266 pub suspension_reason: String,
267 pub original_task: ScheduledTask,
268 pub can_resume: bool,
269}
270
271pub struct DefaultAgentScheduler {
273 config: SchedulerConfig,
274 priority_queue: Arc<RwLock<PriorityQueue<ScheduledTask>>>,
275 load_balancer: Arc<LoadBalancer>,
276 task_manager: Arc<TaskManager>,
277 running_agents: Arc<DashMap<AgentId, ScheduledTask>>,
278 suspended_agents: Arc<DashMap<AgentId, AgentSuspensionInfo>>,
279 registered_agents: Arc<DashMap<AgentId, AgentConfig>>,
283 #[cfg(feature = "http-api")]
285 external_agents: Arc<DashMap<AgentId, crate::api::types::ExternalAgentState>>,
286 system_metrics: Arc<RwLock<SystemMetrics>>,
287 shutdown_notify: Arc<Notify>,
288 is_running: Arc<RwLock<bool>>,
289 routing_engine: Option<Arc<dyn RoutingEngine>>,
290 metrics_exporter: Option<Arc<dyn MetricsExporter>>,
291}
292
293impl DefaultAgentScheduler {
294 pub async fn new(config: SchedulerConfig) -> Result<Self, SchedulerError> {
296 Self::new_with_routing(config, None).await
297 }
298
299 pub async fn new_with_routing(
301 config: SchedulerConfig,
302 routing_engine: Option<Arc<dyn RoutingEngine>>,
303 ) -> Result<Self, SchedulerError> {
304 let priority_queue = Arc::new(RwLock::new(PriorityQueue::new()));
305 let load_balancer = Arc::new(LoadBalancer::new(config.load_balancing_strategy.clone()));
306 let task_manager = Arc::new(TaskManager::new(config.task_timeout));
307 let running_agents = Arc::new(DashMap::new());
308 let suspended_agents = Arc::new(DashMap::new());
309 let registered_agents = Arc::new(DashMap::new());
310 #[cfg(feature = "http-api")]
311 let external_agents = Arc::new(DashMap::new());
312 let system_metrics = Arc::new(RwLock::new(SystemMetrics::new()));
313 let shutdown_notify = Arc::new(Notify::new());
314 let is_running = Arc::new(RwLock::new(true));
315
316 let metrics_exporter = match config.metrics {
318 Some(ref metrics_config) if metrics_config.enabled => {
319 match crate::metrics::create_exporter(metrics_config) {
320 Ok(exporter) => {
321 tracing::info!("Metrics exporter initialized");
322 Some(exporter)
323 }
324 Err(e) => {
325 tracing::warn!(
326 "Failed to create metrics exporter, continuing without metrics: {}",
327 e
328 );
329 None
330 }
331 }
332 }
333 _ => None,
334 };
335
336 let scheduler = Self {
337 config,
338 priority_queue,
339 load_balancer,
340 task_manager,
341 running_agents,
342 suspended_agents,
343 registered_agents,
344 #[cfg(feature = "http-api")]
345 external_agents,
346 system_metrics,
347 shutdown_notify,
348 is_running,
349 routing_engine,
350 metrics_exporter,
351 };
352
353 scheduler.start_scheduler_loop().await;
355 scheduler.start_health_check_loop().await;
356 scheduler.start_metrics_export_loop().await;
357
358 Ok(scheduler)
359 }
360
361 async fn start_scheduler_loop(&self) {
363 let priority_queue = self.priority_queue.clone();
364 let load_balancer = self.load_balancer.clone();
365 let task_manager = self.task_manager.clone();
366 let running_agents = self.running_agents.clone();
367 let system_metrics = self.system_metrics.clone();
368 let shutdown_notify = self.shutdown_notify.clone();
369 let is_running = self.is_running.clone();
370 let routing_engine = self.routing_engine.clone();
371 let max_concurrent = self.config.max_concurrent_agents;
372
373 tokio::spawn(async move {
374 let mut interval = interval(Duration::from_millis(100));
375
376 loop {
377 tokio::select! {
378 _ = interval.tick() => {
379 if !*is_running.read() {
380 break;
381 }
382
383 if running_agents.len() < max_concurrent {
385 let task_opt = {
386 let mut queue = priority_queue.write();
387 queue.pop()
388 };
389
390 if let Some(mut task) = task_opt {
391 if let Some(ref engine) = routing_engine {
393 let ctx = task.to_routing_context();
394 match engine.route_request(&ctx).await {
395 Ok(RouteDecision::Deny { ref reason, ref policy_violated }) => {
396 tracing::warn!(
397 "Routing policy denied task for agent {}: policy={}, reason={}",
398 task.agent_id, policy_violated, reason
399 );
400 continue;
401 }
402 Ok(decision) => {
403 task.route_decision = Some(decision);
404 }
405 Err(e) => {
406 tracing::warn!(
407 "Routing engine error for agent {}, proceeding without decision: {}",
408 task.agent_id, e
409 );
410 }
411 }
412 }
413
414 if let Ok(resource_allocation) = load_balancer.allocate_resources(&task.resource_requirements).await {
416 running_agents.insert(task.agent_id, task.clone());
417
418 if let Err(e) = task_manager.start_task(task.clone()).await {
419 tracing::error!("Failed to start task for agent {}: {}", task.agent_id, e);
420 running_agents.remove(&task.agent_id);
421 load_balancer.deallocate_resources(resource_allocation).await;
422 }
423 } else {
424 let mut queue = priority_queue.write();
426 queue.push(task);
427 }
428 }
429 }
430
431 let (running_count, queue_len) = {
433 let queue = priority_queue.read();
434 (running_agents.len(), queue.len())
435 };
436 system_metrics.write().update(running_count, queue_len);
437 }
438 _ = shutdown_notify.notified() => {
439 break;
440 }
441 }
442 }
443 });
444 }
445
446 async fn start_health_check_loop(&self) {
448 let task_manager = self.task_manager.clone();
449 let running_agents = self.running_agents.clone();
450 let shutdown_notify = self.shutdown_notify.clone();
451 let is_running = self.is_running.clone();
452 let health_check_interval = self.config.health_check_interval;
453
454 tokio::spawn(async move {
455 let mut interval = interval(health_check_interval);
456
457 loop {
458 tokio::select! {
459 _ = interval.tick() => {
460 if !*is_running.read() {
461 break;
462 }
463
464 let mut failed_agents = Vec::new();
466 for entry in running_agents.iter() {
467 let agent_id = *entry.key();
468 if (task_manager.check_task_health(agent_id).await).is_err() {
469 failed_agents.push(agent_id);
470 }
471 }
472
473 for agent_id in failed_agents {
475 running_agents.remove(&agent_id);
476 if let Err(e) = task_manager.terminate_task(agent_id).await {
477 tracing::error!("Failed to terminate failed agent {}: {}", agent_id, e);
478 }
479 }
480 }
481 _ = shutdown_notify.notified() => {
482 break;
483 }
484 }
485 }
486 });
487 }
488}
489
490#[async_trait]
491impl AgentScheduler for DefaultAgentScheduler {
492 async fn schedule_agent(&self, config: AgentConfig) -> Result<AgentId, SchedulerError> {
493 if !*self.is_running.read() {
494 return Err(SchedulerError::ShuttingDown);
495 }
496
497 #[cfg(feature = "http-api")]
499 if matches!(
500 config.execution_mode,
501 crate::types::agent::ExecutionMode::External { .. }
502 ) {
503 let agent_id = config.id;
504 self.registered_agents.insert(agent_id, config);
505 self.external_agents
506 .insert(agent_id, crate::api::types::ExternalAgentState::new());
507 tracing::info!("Registered external agent {} (not queued)", agent_id);
508 return Ok(agent_id);
509 }
510
511 let task = ScheduledTask::new(config.clone());
512 let agent_id = task.agent_id;
513
514 self.registered_agents.insert(agent_id, config);
516
517 self.priority_queue.write().push(task);
519
520 tracing::info!("Scheduled agent {} for execution", agent_id);
521 Ok(agent_id)
522 }
523
524 async fn reschedule_agent(
525 &self,
526 agent_id: AgentId,
527 priority: Priority,
528 ) -> Result<(), SchedulerError> {
529 if !*self.is_running.read() {
530 return Err(SchedulerError::ShuttingDown);
531 }
532
533 if let Some(mut entry) = self.running_agents.get_mut(&agent_id) {
535 entry.priority = priority;
536 return Ok(());
537 }
538
539 let mut queue = self.priority_queue.write();
541 if let Some(mut task) = queue.remove(&agent_id) {
542 task.priority = priority;
543 queue.push(task);
544 return Ok(());
545 }
546
547 Err(SchedulerError::AgentNotFound { agent_id })
548 }
549
550 async fn terminate_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError> {
551 if let Some((_, _task)) = self.running_agents.remove(&agent_id) {
553 self.task_manager
554 .terminate_task(agent_id)
555 .await
556 .map_err(|e| SchedulerError::SchedulingFailed {
557 agent_id,
558 reason: format!("Failed to terminate task: {}", e).into(),
559 })?;
560
561 self.registered_agents.remove(&agent_id);
562 tracing::info!("Terminated agent {}", agent_id);
563 return Ok(());
564 }
565
566 let mut queue = self.priority_queue.write();
568 if queue.remove(&agent_id).is_some() {
569 drop(queue);
570 self.registered_agents.remove(&agent_id);
571 tracing::info!("Removed agent {} from queue", agent_id);
572 return Ok(());
573 }
574
575 Err(SchedulerError::AgentNotFound { agent_id })
576 }
577
578 async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError> {
579 if let Some((_, _task)) = self.running_agents.remove(&agent_id) {
581 self.task_manager
584 .terminate_task(agent_id)
585 .await
586 .map_err(|e| SchedulerError::SchedulingFailed {
587 agent_id,
588 reason: format!("Failed to shutdown task: {}", e).into(),
589 })?;
590
591 tracing::info!("Gracefully shutdown agent {}", agent_id);
592 return Ok(());
593 }
594
595 let mut queue = self.priority_queue.write();
597 if queue.remove(&agent_id).is_some() {
598 tracing::info!("Removed agent {} from queue during shutdown", agent_id);
599 return Ok(());
600 }
601
602 Err(SchedulerError::AgentNotFound { agent_id })
603 }
604
605 async fn get_system_status(&self) -> SystemStatus {
606 let (total_scheduled, uptime) = {
607 let metrics = self.system_metrics.read();
608 let now = SystemTime::now();
609 (metrics.total_scheduled, metrics.uptime_since(now))
610 };
611 let resource_utilization = self.load_balancer.get_resource_utilization().await;
612
613 SystemStatus {
614 total_agents: total_scheduled,
615 running_agents: self.running_agents.len(),
616 suspended_agents: self.suspended_agents.len(),
617 resource_utilization,
618 uptime,
619 last_updated: SystemTime::now(),
620 }
621 }
622
623 async fn get_agent_status(&self, agent_id: AgentId) -> Result<AgentStatus, SchedulerError> {
624 #[cfg(feature = "http-api")]
626 if let Some(ext) = self.external_agents.get(&agent_id) {
627 let last_activity = ext
628 .last_heartbeat
629 .map(|dt| {
630 let secs = dt.timestamp() as u64;
631 std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs)
632 })
633 .unwrap_or(SystemTime::now());
634 return Ok(AgentStatus {
635 agent_id,
636 state: ext.reported_state.clone(),
637 last_activity,
638 memory_usage: 0,
639 cpu_usage: 0.0,
640 active_tasks: 0,
641 scheduled_at: SystemTime::now(),
642 });
643 }
644
645 if let Some(entry) = self.running_agents.get(&agent_id) {
647 let scheduled_task = entry.value();
648
649 match self.task_manager.check_task_health(agent_id).await {
651 Ok(task_health) => {
652 let state = match task_health.status {
654 task_manager::TaskStatus::Pending => AgentState::Ready,
655 task_manager::TaskStatus::Running => AgentState::Running,
656 task_manager::TaskStatus::Completed => AgentState::Completed,
657 task_manager::TaskStatus::Failed => AgentState::Failed,
658 task_manager::TaskStatus::TimedOut => AgentState::Failed,
659 task_manager::TaskStatus::Terminated => AgentState::Terminated,
660 };
661
662 let active_tasks = if matches!(state, AgentState::Running) {
663 1
664 } else {
665 0
666 };
667
668 Ok(AgentStatus {
669 agent_id,
670 state,
671 last_activity: task_health.last_activity,
672 memory_usage: task_health.memory_usage as u64,
673 cpu_usage: task_health.cpu_usage as f64,
674 active_tasks,
675 scheduled_at: scheduled_task.scheduled_at,
676 })
677 }
678 Err(_) => {
679 Ok(AgentStatus {
681 agent_id,
682 state: AgentState::Failed,
683 last_activity: scheduled_task.scheduled_at,
684 memory_usage: 0,
685 cpu_usage: 0.0,
686 active_tasks: 0,
687 scheduled_at: scheduled_task.scheduled_at,
688 })
689 }
690 }
691 } else {
692 let queue = self.priority_queue.read();
694 if let Some(task) = queue.find(&agent_id) {
695 Ok(AgentStatus {
697 agent_id,
698 state: AgentState::Waiting,
699 last_activity: task.scheduled_at,
700 memory_usage: 0,
701 cpu_usage: 0.0,
702 active_tasks: 0,
703 scheduled_at: task.scheduled_at,
704 })
705 } else if self.registered_agents.contains_key(&agent_id) {
706 Ok(AgentStatus {
708 agent_id,
709 state: AgentState::Completed,
710 last_activity: SystemTime::now(),
711 memory_usage: 0,
712 cpu_usage: 0.0,
713 active_tasks: 0,
714 scheduled_at: SystemTime::now(),
715 })
716 } else {
717 Err(SchedulerError::AgentNotFound { agent_id })
719 }
720 }
721 }
722
723 async fn shutdown(&self) -> Result<(), SchedulerError> {
724 {
726 let is_running = self.is_running.read();
727 if !*is_running {
728 tracing::debug!("Scheduler already shutdown");
729 return Ok(());
730 }
731 }
732
733 tracing::info!("Initiating graceful scheduler shutdown");
734
735 *self.is_running.write() = false;
737 self.shutdown_notify.notify_waiters();
738
739 let running_agent_ids: Vec<AgentId> = self
743 .running_agents
744 .iter()
745 .map(|entry| *entry.key())
746 .collect();
747
748 tracing::info!(
749 "Shutting down {} running agents gracefully",
750 running_agent_ids.len()
751 );
752
753 let graceful_timeout = Duration::from_secs(30);
755 let graceful_start = std::time::Instant::now();
756
757 for agent_id in &running_agent_ids {
758 if graceful_start.elapsed() >= graceful_timeout {
759 tracing::warn!(
760 "Graceful shutdown timeout reached, switching to forced termination"
761 );
762 break;
763 }
764
765 if let Err(e) = self.shutdown_agent(*agent_id).await {
767 tracing::warn!(
768 "Failed to gracefully shutdown agent {}: {}, will force terminate",
769 agent_id,
770 e
771 );
772 }
773 }
774
775 tokio::time::sleep(Duration::from_secs(5)).await;
777
778 let remaining_agent_ids: Vec<AgentId> = self
780 .running_agents
781 .iter()
782 .map(|entry| *entry.key())
783 .collect();
784
785 if !remaining_agent_ids.is_empty() {
786 tracing::warn!(
787 "Force terminating {} remaining agents",
788 remaining_agent_ids.len()
789 );
790
791 for agent_id in remaining_agent_ids {
792 if let Err(e) = self.terminate_agent(agent_id).await {
793 tracing::error!(
794 "Failed to force terminate agent {} during shutdown: {}",
795 agent_id,
796 e
797 );
798 }
799 }
800 }
801
802 self.flush_metrics().await?;
804
805 self.cleanup_resources().await?;
807
808 {
810 let mut queue = self.priority_queue.write();
811 let queued_count = queue.len();
812 if queued_count > 0 {
813 tracing::info!("Clearing {} queued agents", queued_count);
814 queue.clear();
815 }
816 }
817
818 tracing::info!("Scheduler shutdown completed successfully");
819 Ok(())
820 }
821
822 async fn check_health(&self) -> Result<ComponentHealth, SchedulerError> {
823 let is_running = *self.is_running.read();
824 if !is_running {
825 return Ok(ComponentHealth::unhealthy(
826 "Scheduler is shut down".to_string(),
827 ));
828 }
829
830 let (total_scheduled, uptime) = {
831 let metrics = self.system_metrics.read();
832 let now = SystemTime::now();
833 (metrics.total_scheduled, metrics.uptime_since(now))
834 };
835
836 let running_count = self.running_agents.len();
837 let queue_len = self.priority_queue.read().len();
838 let load_factor = running_count as f64 / self.config.max_concurrent_agents as f64;
839
840 let status = if load_factor > 0.9 {
841 ComponentHealth::degraded(format!(
842 "High load: {:.1}% capacity used ({}/{})",
843 load_factor * 100.0,
844 running_count,
845 self.config.max_concurrent_agents
846 ))
847 } else if queue_len > 1000 {
848 ComponentHealth::degraded(format!("Large queue: {} agents waiting", queue_len))
849 } else {
850 ComponentHealth::healthy(Some(format!(
851 "Running normally: {} active agents, {} queued",
852 running_count, queue_len
853 )))
854 };
855
856 Ok(status
857 .with_uptime(uptime)
858 .with_metric("running_agents".to_string(), running_count.to_string())
859 .with_metric("queued_agents".to_string(), queue_len.to_string())
860 .with_metric("total_scheduled".to_string(), total_scheduled.to_string())
861 .with_metric(
862 "max_capacity".to_string(),
863 self.config.max_concurrent_agents.to_string(),
864 )
865 .with_metric("load_factor".to_string(), format!("{:.2}", load_factor)))
866 }
867
868 async fn list_agents(&self) -> Vec<AgentId> {
869 self.registered_agents
871 .iter()
872 .map(|entry| *entry.key())
873 .collect()
874 }
875
876 #[cfg(feature = "http-api")]
877 async fn update_agent(
878 &self,
879 agent_id: AgentId,
880 request: crate::api::types::UpdateAgentRequest,
881 ) -> Result<(), SchedulerError> {
882 if !*self.is_running.read() {
883 return Err(SchedulerError::ShuttingDown);
884 }
885
886 if let Some(mut entry) = self.running_agents.get_mut(&agent_id) {
888 let task = entry.value_mut();
889
890 if let Some(name) = request.name {
892 task.config.name = name;
893 }
894
895 if let Some(dsl) = request.dsl {
896 task.config.dsl_source = dsl;
897 }
898
899 tracing::info!("Updated running agent {}", agent_id);
900 return Ok(());
901 }
902
903 let mut queue = self.priority_queue.write();
905 if let Some(mut task) = queue.remove(&agent_id) {
906 if let Some(name) = request.name {
908 task.config.name = name;
909 }
910
911 if let Some(dsl) = request.dsl {
912 task.config.dsl_source = dsl;
913 }
914
915 queue.push(task);
917 tracing::info!("Updated queued agent {}", agent_id);
918 return Ok(());
919 }
920
921 Err(SchedulerError::AgentNotFound { agent_id })
922 }
923
924 fn has_agent(&self, agent_id: AgentId) -> bool {
925 self.registered_agents.contains_key(&agent_id)
926 }
927
928 fn get_agent_config(&self, agent_id: AgentId) -> Option<AgentConfig> {
929 self.registered_agents.get(&agent_id).map(|r| r.clone())
930 }
931
932 async fn delete_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError> {
933 if let Some((_, _)) = self.running_agents.remove(&agent_id) {
935 let _ = self.task_manager.terminate_task(agent_id).await;
936 }
937
938 {
940 let mut queue = self.priority_queue.write();
941 queue.remove(&agent_id);
942 }
943
944 #[cfg(feature = "http-api")]
946 self.external_agents.remove(&agent_id);
947
948 if self.registered_agents.remove(&agent_id).is_some() {
950 tracing::info!("Deleted agent {} from registry", agent_id);
951 Ok(())
952 } else {
953 Err(SchedulerError::AgentNotFound { agent_id })
954 }
955 }
956
957 #[cfg(feature = "http-api")]
958 fn external_agents(&self) -> &Arc<DashMap<AgentId, crate::api::types::ExternalAgentState>> {
959 &self.external_agents
960 }
961
962 #[cfg(feature = "http-api")]
963 fn get_external_agent_state(
964 &self,
965 agent_id: AgentId,
966 ) -> Option<crate::api::types::ExternalAgentState> {
967 self.external_agents.get(&agent_id).map(|r| r.clone())
968 }
969
970 #[cfg(feature = "http-api")]
971 fn check_unreachable_agents(&self) {
972 let now = chrono::Utc::now();
973
974 for mut entry in self.external_agents.iter_mut() {
975 let agent_id = *entry.key();
976 let ext_state = entry.value_mut();
977
978 if ext_state.reported_state == crate::types::AgentState::Unreachable {
980 continue;
981 }
982
983 let interval_secs = self
985 .registered_agents
986 .get(&agent_id)
987 .and_then(|config| match &config.execution_mode {
988 crate::types::agent::ExecutionMode::External {
989 heartbeat_interval_secs,
990 ..
991 } => Some(*heartbeat_interval_secs),
992 _ => None,
993 })
994 .unwrap_or(60);
995
996 let threshold = chrono::Duration::seconds((interval_secs * 3) as i64);
997
998 if let Some(last_hb) = ext_state.last_heartbeat {
999 if now - last_hb > threshold {
1000 tracing::warn!(
1001 "External agent {} is unreachable (no heartbeat for {}s)",
1002 agent_id,
1003 (now - last_hb).num_seconds()
1004 );
1005 ext_state.reported_state = crate::types::AgentState::Unreachable;
1006 }
1007 }
1008 }
1010 }
1011}
1012
1013impl DefaultAgentScheduler {
1014 async fn start_metrics_export_loop(&self) {
1016 let exporter = match self.metrics_exporter.clone() {
1017 Some(e) => e,
1018 None => return,
1019 };
1020
1021 let priority_queue = self.priority_queue.clone();
1022 let running_agents = self.running_agents.clone();
1023 let suspended_agents = self.suspended_agents.clone();
1024 let system_metrics = self.system_metrics.clone();
1025 let task_manager = self.task_manager.clone();
1026 let load_balancer = self.load_balancer.clone();
1027 let shutdown_notify = self.shutdown_notify.clone();
1028 let is_running = self.is_running.clone();
1029 let max_concurrent = self.config.max_concurrent_agents;
1030 let interval_secs = self
1031 .config
1032 .metrics
1033 .as_ref()
1034 .map(|m| m.export_interval_seconds)
1035 .unwrap_or(60);
1036
1037 tokio::spawn(async move {
1038 let mut tick = interval(Duration::from_secs(interval_secs));
1039
1040 loop {
1041 tokio::select! {
1042 _ = tick.tick() => {
1043 if !*is_running.read() {
1044 break;
1045 }
1046
1047 let snapshot = Self::build_metrics_snapshot(
1048 &priority_queue,
1049 &running_agents,
1050 &suspended_agents,
1051 &system_metrics,
1052 &task_manager,
1053 &load_balancer,
1054 max_concurrent,
1055 )
1056 .await;
1057
1058 if let Err(e) = exporter.export(&snapshot).await {
1059 tracing::warn!("Periodic metrics export failed: {}", e);
1060 }
1061 }
1062 _ = shutdown_notify.notified() => {
1063 break;
1064 }
1065 }
1066 }
1067 });
1068 }
1069
1070 async fn build_metrics_snapshot(
1072 priority_queue: &Arc<RwLock<PriorityQueue<ScheduledTask>>>,
1073 running_agents: &Arc<DashMap<AgentId, ScheduledTask>>,
1074 suspended_agents: &Arc<DashMap<AgentId, AgentSuspensionInfo>>,
1075 system_metrics: &Arc<RwLock<SystemMetrics>>,
1076 task_manager: &Arc<TaskManager>,
1077 load_balancer: &Arc<LoadBalancer>,
1078 max_concurrent: usize,
1079 ) -> MetricsSnapshot {
1080 let (total_scheduled, uptime) = {
1081 let metrics = system_metrics.read();
1082 let now = SystemTime::now();
1083 (metrics.total_scheduled, metrics.uptime_since(now))
1084 };
1085 let running_count = running_agents.len();
1086 let queued_count = priority_queue.read().len();
1087 let suspended_count = suspended_agents.len();
1088 let load_factor = running_count as f64 / max_concurrent as f64;
1089
1090 let task_stats = task_manager.get_task_statistics().await;
1091 let lb_stats = load_balancer.get_statistics().await;
1092 let resource_usage = load_balancer.get_resource_utilization().await;
1093
1094 MetricsSnapshot {
1095 timestamp: SystemTime::now()
1096 .duration_since(SystemTime::UNIX_EPOCH)
1097 .unwrap_or_default()
1098 .as_secs(),
1099 scheduler: SchedulerMetrics {
1100 total_scheduled,
1101 uptime_seconds: uptime.as_secs(),
1102 running_agents: running_count,
1103 queued_agents: queued_count,
1104 suspended_agents: suspended_count,
1105 max_capacity: max_concurrent,
1106 load_factor,
1107 },
1108 task_manager: TaskManagerMetrics {
1109 total_tasks: task_stats.total_tasks,
1110 healthy_tasks: task_stats.healthy_tasks,
1111 average_uptime_seconds: task_stats.average_uptime.as_secs_f64(),
1112 total_memory_usage: task_stats.total_memory_usage,
1113 },
1114 load_balancer: LoadBalancerMetrics {
1115 total_allocations: lb_stats.total_allocations,
1116 active_allocations: lb_stats.active_allocations,
1117 memory_utilization: lb_stats.memory_utilization as f64,
1118 cpu_utilization: lb_stats.cpu_utilization as f64,
1119 allocation_failures: lb_stats.allocation_failures,
1120 average_allocation_time_ms: lb_stats.average_allocation_time.as_secs_f64() * 1000.0,
1121 },
1122 system: SystemResourceMetrics {
1123 memory_usage_mb: resource_usage.memory_used as f64 / (1024.0 * 1024.0),
1124 cpu_usage_percent: resource_usage.cpu_utilization as f64,
1125 },
1126 compaction: None,
1127 }
1128 }
1129
1130 async fn flush_metrics(&self) -> Result<(), SchedulerError> {
1132 tracing::debug!("Flushing scheduler metrics");
1133
1134 if let Some(ref exporter) = self.metrics_exporter {
1135 let snapshot = Self::build_metrics_snapshot(
1136 &self.priority_queue,
1137 &self.running_agents,
1138 &self.suspended_agents,
1139 &self.system_metrics,
1140 &self.task_manager,
1141 &self.load_balancer,
1142 self.config.max_concurrent_agents,
1143 )
1144 .await;
1145
1146 if let Err(e) = exporter.export(&snapshot).await {
1147 tracing::warn!("Final metrics export failed: {}", e);
1148 }
1149
1150 if let Err(e) = exporter.shutdown().await {
1151 tracing::warn!("Metrics exporter shutdown failed: {}", e);
1152 }
1153 }
1154
1155 let (total_scheduled, uptime) = {
1157 let metrics = self.system_metrics.read();
1158 let now = SystemTime::now();
1159 (metrics.total_scheduled, metrics.uptime_since(now))
1160 };
1161 tracing::info!(
1162 "Scheduler shutdown metrics - total_scheduled={}, uptime={:?}, \
1163 running={}, queued={}, suspended={}",
1164 total_scheduled,
1165 uptime,
1166 self.running_agents.len(),
1167 self.priority_queue.read().len(),
1168 self.suspended_agents.len(),
1169 );
1170
1171 Ok(())
1172 }
1173
1174 async fn cleanup_resources(&self) -> Result<(), SchedulerError> {
1176 tracing::debug!("Cleaning up allocated resources");
1177
1178 let allocated_agents: Vec<AgentId> = self
1180 .running_agents
1181 .iter()
1182 .map(|entry| *entry.key())
1183 .collect();
1184
1185 for agent_id in allocated_agents {
1187 let allocation = ResourceAllocation {
1190 agent_id,
1191 allocated_memory: 0, allocated_cpu_cores: 0.0,
1193 allocated_disk_io: 0,
1194 allocated_network_io: 0,
1195 allocation_time: SystemTime::now(),
1196 };
1197
1198 self.load_balancer.deallocate_resources(allocation).await;
1199 }
1200
1201 tracing::debug!("Resource cleanup completed");
1205 Ok(())
1206 }
1207
1208 pub async fn suspend_agent(
1210 &self,
1211 agent_id: AgentId,
1212 reason: String,
1213 ) -> Result<(), SchedulerError> {
1214 if let Some((_, task)) = self.running_agents.remove(&agent_id) {
1215 if let Err(e) = self.task_manager.terminate_task(agent_id).await {
1217 tracing::error!("Failed to terminate task during suspension: {}", e);
1218 self.running_agents.insert(agent_id, task);
1220 return Err(SchedulerError::SchedulingFailed {
1221 agent_id,
1222 reason: format!("Failed to suspend agent: {}", e).into(),
1223 });
1224 }
1225
1226 let suspension_info = AgentSuspensionInfo {
1228 agent_id,
1229 suspended_at: SystemTime::now(),
1230 suspension_reason: reason.clone(),
1231 original_task: task,
1232 can_resume: true,
1233 };
1234
1235 self.suspended_agents.insert(agent_id, suspension_info);
1237
1238 tracing::info!("Suspended agent {} with reason: {}", agent_id, reason);
1239 Ok(())
1240 } else {
1241 Err(SchedulerError::AgentNotFound { agent_id })
1242 }
1243 }
1244
1245 pub async fn resume_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError> {
1247 if let Some((_, suspension_info)) = self.suspended_agents.remove(&agent_id) {
1248 if !suspension_info.can_resume {
1249 return Err(SchedulerError::SchedulingFailed {
1250 agent_id,
1251 reason: "Agent cannot be resumed".into(),
1252 });
1253 }
1254
1255 let mut task = suspension_info.original_task;
1257 task.scheduled_at = SystemTime::now(); self.priority_queue.write().push(task);
1260
1261 tracing::info!("Resumed agent {} from suspension", agent_id);
1262 Ok(())
1263 } else {
1264 Err(SchedulerError::AgentNotFound { agent_id })
1265 }
1266 }
1267
1268 pub async fn list_suspended_agents(&self) -> Vec<AgentSuspensionInfo> {
1270 self.suspended_agents
1271 .iter()
1272 .map(|entry| entry.value().clone())
1273 .collect()
1274 }
1275}
1276
1277#[derive(Debug, Clone)]
1279struct SystemMetrics {
1280 total_scheduled: usize,
1281 start_time: SystemTime,
1282}
1283
1284impl SystemMetrics {
1285 fn new() -> Self {
1286 Self {
1287 total_scheduled: 0,
1288 start_time: SystemTime::now(),
1289 }
1290 }
1291
1292 fn update(&mut self, running: usize, queued: usize) {
1293 self.total_scheduled = running + queued;
1294 }
1295
1296 fn uptime_since(&self, now: SystemTime) -> Duration {
1297 now.duration_since(self.start_time).unwrap_or_default()
1298 }
1299}
1300
1301#[cfg(test)]
1302mod tests {
1303 use super::*;
1304 use std::collections::HashMap;
1305
1306 fn make_test_config() -> AgentConfig {
1307 AgentConfig {
1308 id: AgentId::new(),
1309 name: "test-agent".to_string(),
1310 dsl_source: "do something useful".to_string(),
1311 execution_mode: ExecutionMode::Ephemeral,
1312 security_tier: SecurityTier::Tier2,
1313 resource_limits: ResourceLimits::default(),
1314 capabilities: vec![
1315 Capability::FileSystem,
1316 Capability::Network,
1317 Capability::Computation,
1318 ],
1319 policies: vec![],
1320 metadata: HashMap::new(),
1321 priority: Priority::default(),
1322 }
1323 }
1324
1325 #[test]
1326 fn test_routing_context_from_scheduled_task() {
1327 let config = make_test_config();
1328 let mut task = ScheduledTask::new(config);
1329 task.deadline = Some(SystemTime::now() + Duration::from_secs(300));
1330
1331 let ctx = task.to_routing_context();
1332
1333 assert_eq!(ctx.agent_id, task.agent_id);
1334 assert_eq!(ctx.agent_security_level, SecurityLevel::High);
1335 assert_eq!(ctx.prompt, "do something useful");
1336 assert_eq!(
1337 ctx.agent_capabilities,
1338 vec!["FileSystem", "Network", "Computation"]
1339 );
1340 assert!(ctx.max_execution_time.is_some());
1341 assert!(matches!(ctx.task_type, TaskType::Custom(ref s) if s == "general"));
1342 }
1343
1344 #[test]
1345 fn test_routing_context_custom_task_type() {
1346 let mut config = make_test_config();
1347 config
1348 .metadata
1349 .insert("task_type".to_string(), "analysis".to_string());
1350
1351 let task = ScheduledTask::new(config);
1352 let ctx = task.to_routing_context();
1353
1354 assert!(matches!(ctx.task_type, TaskType::Analysis));
1355 }
1356
1357 #[test]
1358 fn test_routing_context_default_task_type() {
1359 let config = make_test_config();
1360 let task = ScheduledTask::new(config);
1361 let ctx = task.to_routing_context();
1362
1363 assert!(matches!(ctx.task_type, TaskType::Custom(ref s) if s == "general"));
1364 }
1365
1366 #[test]
1367 fn test_scheduled_task_route_decision_default_none() {
1368 let config = make_test_config();
1369 let task = ScheduledTask::new(config);
1370
1371 assert!(task.route_decision.is_none());
1372 }
1373
1374 #[cfg(feature = "http-api")]
1375 #[tokio::test]
1376 async fn test_external_agent_not_queued() {
1377 let scheduler = DefaultAgentScheduler::new(SchedulerConfig::default())
1378 .await
1379 .unwrap();
1380
1381 let mut config = make_test_config();
1382 config.execution_mode = ExecutionMode::External {
1383 endpoint: None,
1384 agentpin_domain: None,
1385 heartbeat_interval_secs: 60,
1386 };
1387 let agent_id = config.id;
1388
1389 let result = scheduler.schedule_agent(config).await;
1390 assert!(result.is_ok());
1391
1392 assert!(scheduler.has_agent(agent_id));
1394
1395 assert_eq!(scheduler.priority_queue.read().len(), 0);
1397
1398 let status = scheduler.get_agent_status(agent_id).await.unwrap();
1400 assert_eq!(status.agent_id, agent_id);
1401 }
1402
1403 #[cfg(feature = "http-api")]
1404 #[tokio::test]
1405 async fn test_unreachable_detection() {
1406 let scheduler = DefaultAgentScheduler::new(SchedulerConfig::default())
1407 .await
1408 .unwrap();
1409
1410 let mut config = make_test_config();
1411 config.execution_mode = ExecutionMode::External {
1412 endpoint: None,
1413 agentpin_domain: None,
1414 heartbeat_interval_secs: 1, };
1416 let agent_id = config.id;
1417
1418 scheduler.schedule_agent(config).await.unwrap();
1419
1420 {
1422 let mut entry = scheduler.external_agents.get_mut(&agent_id).unwrap();
1423 entry.last_heartbeat = Some(chrono::Utc::now() - chrono::Duration::seconds(10));
1424 entry.reported_state = crate::types::AgentState::Running;
1425 }
1426
1427 scheduler.check_unreachable_agents();
1428
1429 let entry = scheduler.external_agents.get(&agent_id).unwrap();
1430 assert_eq!(entry.reported_state, crate::types::AgentState::Unreachable);
1431 }
1432}