Skip to main content

oxios_kernel/
scheduler.rs

1//! Agent Scheduler — priority-based task queue inspired by AIOS / AgentRM.
2//!
3//! Manages agent task scheduling with:
4//! - Priority queue (FIFO within same priority)
5//! - Rate-limit-aware admission control
6//! - Zombie task detection and reaping
7//! - Maximum concurrent task enforcement
8
9use crate::budget::BudgetManager;
10use crate::types::AgentId;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use std::collections::{BinaryHeap, HashMap};
16use std::sync::Arc;
17use uuid::Uuid;
18
19/// Priority levels for scheduled tasks.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
21pub enum Priority {
22    /// Low priority, good for background work.
23    Low = 0,
24    /// Normal priority, default for most tasks.
25    #[default]
26    Normal = 1,
27    /// High priority, important tasks.
28    High = 2,
29    /// Critical priority, must execute immediately.
30    Critical = 3,
31}
32
33/// Status of a scheduled task.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum TaskStatus {
36    /// Task is queued, waiting for execution.
37    Queued,
38    /// Task is currently running.
39    Running,
40    /// Task completed successfully.
41    Completed,
42    /// Task failed with an error.
43    Failed,
44    /// Task was cancelled before execution.
45    Cancelled,
46}
47
48/// A scheduled task for an agent.
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ScheduledTask {
51    /// Unique task identifier.
52    pub id: Uuid,
53    /// Associated agent ID, if any.
54    pub agent_id: Option<AgentId>,
55    /// Human-readable task description.
56    pub description: String,
57    /// Task priority level.
58    pub priority: Priority,
59    /// When the task was created.
60    pub created_at: DateTime<Utc>,
61    /// Current status of the task.
62    pub status: TaskStatus,
63    /// Error message if the task failed.
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub error: Option<String>,
66}
67
68impl PartialOrd for ScheduledTask {
69    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
70        Some(self.cmp(other))
71    }
72}
73
74impl Ord for ScheduledTask {
75    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
76        // Higher priority first; within same priority, newer tasks first (LIFO)
77        // so that BinaryHeap::pop() returns the highest-priority, newest task.
78        self.priority
79            .cmp(&other.priority)
80            .then_with(|| other.created_at.cmp(&self.created_at))
81    }
82}
83
84impl ScheduledTask {
85    /// Creates a new scheduled task.
86    pub fn new(description: String, priority: Priority) -> Self {
87        Self {
88            id: Uuid::new_v4(),
89            agent_id: None,
90            description,
91            priority,
92            created_at: Utc::now(),
93            status: TaskStatus::Queued,
94            error: None,
95        }
96    }
97
98    /// Creates a task associated with a specific agent.
99    pub fn for_agent(agent_id: AgentId, description: String, priority: Priority) -> Self {
100        Self {
101            id: Uuid::new_v4(),
102            agent_id: Some(agent_id),
103            description,
104            priority,
105            created_at: Utc::now(),
106            status: TaskStatus::Queued,
107            error: None,
108        }
109    }
110}
111
112/// Statistics about the scheduler state.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct SchedulerStats {
115    /// Number of queued tasks.
116    pub queued: usize,
117    /// Number of currently running tasks.
118    pub running: usize,
119    /// Number of completed tasks.
120    pub completed: usize,
121    /// Number of failed tasks.
122    pub failed: usize,
123    /// Maximum allowed concurrent tasks.
124    pub max_concurrent: usize,
125    /// Rate limit (requests per minute).
126    pub rate_limit_per_minute: u32,
127    /// Remaining rate limit capacity.
128    pub rate_remaining: u32,
129}
130
131impl Default for SchedulerStats {
132    fn default() -> Self {
133        Self {
134            queued: 0,
135            running: 0,
136            completed: 0,
137            failed: 0,
138            max_concurrent: 5,
139            rate_limit_per_minute: 60,
140            rate_remaining: 60,
141        }
142    }
143}
144
145/// Rate limiter state for tracking API call rates.
146#[derive(Debug, Clone)]
147struct RateLimiter {
148    /// Timestamps of recent requests.
149    window: Vec<DateTime<Utc>>,
150    /// Window duration in seconds.
151    window_secs: u64,
152    /// Maximum requests per window.
153    max_requests: u32,
154}
155
156impl RateLimiter {
157    fn new(window_secs: u64, max_requests: u32) -> Self {
158        Self {
159            window: Vec::new(),
160            window_secs,
161            max_requests,
162        }
163    }
164
165    /// Check if a new request is allowed under rate limits.
166    fn allow(&mut self) -> bool {
167        let now = Utc::now();
168        let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
169
170        // Prune old entries.
171        self.window.retain(|t| *t > cutoff);
172
173        if self.window.len() >= self.max_requests as usize {
174            return false;
175        }
176
177        self.window.push(now);
178        true
179    }
180
181    /// Get remaining capacity in the current window.
182    fn remaining(&self) -> u32 {
183        let now = Utc::now();
184        let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
185        let active = self.window.iter().filter(|t| **t > cutoff).count();
186        self.max_requests.saturating_sub(active as u32)
187    }
188}
189
190/// The agent scheduler.
191///
192/// Manages task queues, rate limiting, zombie detection, and priority scheduling.
193/// This is the central coordinator for all agent task execution.
194pub struct AgentScheduler {
195    /// The task queue (priority max-heap).
196    queue: Arc<Mutex<BinaryHeap<ScheduledTask>>>,
197    /// Currently running tasks.
198    running: Arc<Mutex<HashMap<Uuid, ScheduledTask>>>,
199    /// Maximum concurrent tasks allowed.
200    max_concurrent: std::sync::atomic::AtomicUsize,
201    /// Rate limiter for LLM API calls.
202    rate_limiter: Arc<Mutex<RateLimiter>>,
203    /// Timeout for zombie detection (seconds).
204    zombie_timeout_secs: std::sync::atomic::AtomicU64,
205    /// Track when each running task started (for zombie detection).
206    task_start_times: Arc<Mutex<HashMap<Uuid, DateTime<Utc>>>>,
207    /// Optional budget manager for scheduling checks.
208    budget_manager: Option<Arc<BudgetManager>>,
209}
210
211impl AgentScheduler {
212    /// Creates a new scheduler.
213    ///
214    /// # Arguments
215    /// * `max_concurrent` - Maximum number of tasks that can run simultaneously
216    /// * `rate_limit_per_minute` - Maximum LLM API calls per minute
217    /// * `zombie_timeout_secs` - How long before a running task is considered a zombie
218    pub fn new(
219        max_concurrent: usize,
220        rate_limit_per_minute: u32,
221        zombie_timeout_secs: u64,
222    ) -> Self {
223        Self {
224            queue: Arc::new(Mutex::new(BinaryHeap::new())),
225            running: Arc::new(Mutex::new(HashMap::new())),
226            max_concurrent: std::sync::atomic::AtomicUsize::new(max_concurrent),
227            rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60, rate_limit_per_minute))),
228            zombie_timeout_secs: std::sync::atomic::AtomicU64::new(zombie_timeout_secs),
229            task_start_times: Arc::new(Mutex::new(HashMap::new())),
230            budget_manager: None,
231        }
232    }
233
234    /// Attaches a budget manager for scheduling checks.
235    ///
236    /// When a budget manager is set, the scheduler will:
237    /// - Check `can_schedule()` before starting a task (soft gate)
238    /// - Track calls via `track_call()` when a task begins
239    ///
240    /// If no budget manager is set, tasks proceed normally.
241    pub fn set_budget_manager(&mut self, bm: Arc<BudgetManager>) {
242        self.budget_manager = Some(bm);
243    }
244
245    /// Hot-reload scheduler config without restart.
246    ///
247    /// Updates concurrency limit, rate limit, and zombie timeout.
248    /// Takes effect on the next `next_task()` / `reap_zombies()` call.
249    pub fn update_config(
250        &self,
251        max_concurrent: usize,
252        rate_limit_per_minute: u32,
253        zombie_timeout_secs: u64,
254    ) {
255        {
256            let mut limiter = self.rate_limiter.lock();
257            *limiter = RateLimiter::new(60, rate_limit_per_minute);
258        }
259        self.max_concurrent
260            .store(max_concurrent, std::sync::atomic::Ordering::Relaxed);
261        self.zombie_timeout_secs
262            .store(zombie_timeout_secs, std::sync::atomic::Ordering::Relaxed);
263        tracing::info!(
264            max_concurrent,
265            rate_limit_per_minute,
266            zombie_timeout_secs,
267            "Scheduler config hot-reloaded"
268        );
269    }
270
271    /// Submits a task to the scheduler queue.
272    ///
273    /// Returns the task ID on success.
274    pub fn submit(&self, mut task: ScheduledTask) -> Result<Uuid> {
275        task.status = TaskStatus::Queued;
276        let id = task.id;
277
278        let mut queue = self.queue.lock();
279        queue.push(task); // O(log N) — BinaryHeap maintains heap property
280
281        tracing::debug!(
282            task_id = %id,
283            queue_len = queue.len(),
284            "Task submitted to scheduler"
285        );
286
287        Ok(id)
288    }
289
290    /// Gets the next task ready for execution.
291    ///
292    /// Returns `None` if:
293    /// - The queue is empty
294    /// - Max concurrent limit is reached
295    /// - Rate limit is exceeded
296    pub fn next_task(&self) -> Option<ScheduledTask> {
297        // Check if we can start a new task.
298        {
299            let running = self.running.lock();
300            if running.len()
301                >= self
302                    .max_concurrent
303                    .load(std::sync::atomic::Ordering::Relaxed)
304            {
305                tracing::debug!(
306                    running = running.len(),
307                    max = self
308                        .max_concurrent
309                        .load(std::sync::atomic::Ordering::Relaxed),
310                    "Max concurrent limit reached"
311                );
312                return None;
313            }
314        }
315
316        // Check rate limit.
317        {
318            let mut limiter = self.rate_limiter.lock();
319            if !limiter.allow() {
320                tracing::debug!(remaining = limiter.remaining(), "Rate limit exceeded");
321                return None;
322            }
323        }
324
325        // Pop tasks iteratively, skipping agents with exhausted budgets.
326        let mut discarded: usize = 0;
327        let mut task = loop {
328            let task_opt = {
329                let mut queue = self.queue.lock();
330                queue.pop() // O(log N) — BinaryHeap returns max-priority task
331            };
332
333            match task_opt {
334                Some(t) => {
335                    // Check budget before scheduling (soft gate).
336                    if let (Some(ref bm), Some(ref agent_id)) = (&self.budget_manager, &t.agent_id)
337                    {
338                        if !bm.can_schedule(agent_id) {
339                            tracing::warn!(
340                                agent_id = %agent_id,
341                                "Agent budget exhausted, skipping task"
342                            );
343                            discarded += 1;
344                            continue; // skip, try next task
345                        }
346                    }
347                    break t;
348                }
349                None => {
350                    if discarded > 0 {
351                        tracing::info!(discarded, "All queued tasks had exhausted budgets");
352                    }
353                    return None;
354                }
355            }
356        };
357
358        if discarded > 0 {
359            tracing::info!(discarded, "Skipped tasks with exhausted budgets");
360        }
361
362        task.status = TaskStatus::Running;
363
364        // Track start time for zombie detection.
365        {
366            let mut start_times = self.task_start_times.lock();
367            start_times.insert(task.id, Utc::now());
368        }
369
370        // Add to running map.
371        {
372            let mut running = self.running.lock();
373            running.insert(task.id, task.clone());
374        }
375
376        tracing::info!(
377            task_id = %task.id,
378            priority = ?task.priority,
379            running = self.running.lock().len(),
380            "Task started by scheduler"
381        );
382
383        // Track call for budget management.
384        if let (Some(ref bm), Some(ref agent_id)) = (&self.budget_manager, &task.agent_id) {
385            if let Err(e) = bm.track_call(agent_id) {
386                tracing::warn!(
387                    agent_id = %agent_id,
388                    error = %e,
389                    "Budget exceeded during task track_call"
390                );
391            }
392        }
393
394        Some(task)
395    }
396
397    /// Marks a task as completed.
398    ///
399    /// Removes the task from the running map.
400    pub fn complete_task(&self, task_id: Uuid) -> Result<()> {
401        let task = {
402            let mut running = self.running.lock();
403            running.remove(&task_id)
404        };
405
406        match task {
407            Some(mut t) => {
408                t.status = TaskStatus::Completed;
409
410                // Clean up start time tracking.
411                {
412                    let mut start_times = self.task_start_times.lock();
413                    start_times.remove(&task_id);
414                }
415
416                tracing::info!(task_id = %task_id, "Task completed");
417                Ok(())
418            }
419            None => {
420                tracing::warn!(task_id = %task_id, "Attempted to complete unknown task");
421                Err(anyhow::anyhow!("task not found"))
422            }
423        }
424    }
425
426    /// Marks a task as failed with an error message.
427    ///
428    /// Removes the task from the running map.
429    pub fn fail_task(&self, task_id: Uuid, error: &str) -> Result<()> {
430        let task = {
431            let mut running = self.running.lock();
432            running.remove(&task_id)
433        };
434
435        match task {
436            Some(mut t) => {
437                t.status = TaskStatus::Failed;
438                t.error = Some(error.to_string());
439
440                // Clean up start time tracking.
441                {
442                    let mut start_times = self.task_start_times.lock();
443                    start_times.remove(&task_id);
444                }
445
446                tracing::warn!(task_id = %task_id, error = %error, "Task failed");
447                Ok(())
448            }
449            None => {
450                tracing::warn!(task_id = %task_id, "Attempted to fail unknown task");
451                Err(anyhow::anyhow!("task not found"))
452            }
453        }
454    }
455
456    /// Detects and reaps zombie tasks (running longer than the configured timeout).
457    ///
458    /// Returns the IDs of tasks that were reaped.
459    pub fn reap_zombies(&self) -> Vec<Uuid> {
460        let now = Utc::now();
461        let timeout = chrono::Duration::seconds(
462            self.zombie_timeout_secs
463                .load(std::sync::atomic::Ordering::Relaxed) as i64,
464        );
465        let mut start_times = self.task_start_times.lock();
466        let mut running = self.running.lock();
467        let mut reaped = Vec::new();
468
469        let zombie_ids: Vec<Uuid> = start_times
470            .iter()
471            .filter(|(_, start)| now - **start > timeout)
472            .map(|(id, _)| *id)
473            .collect();
474
475        for id in zombie_ids {
476            if let Some(mut task) = running.remove(&id) {
477                task.status = TaskStatus::Failed;
478                task.error = Some(format!(
479                    "zombie: ran for >{} seconds",
480                    self.zombie_timeout_secs
481                        .load(std::sync::atomic::Ordering::Relaxed)
482                ));
483                reaped.push(id);
484                tracing::warn!(
485                    task_id = %id,
486                    duration_secs = self.zombie_timeout_secs.load(std::sync::atomic::Ordering::Relaxed),
487                    "Zombie task reaped"
488                );
489            }
490            // Remove from start times.
491            start_times.remove(&id);
492        }
493
494        reaped
495    }
496
497    /// Marks a task as running by task ID.
498    ///
499    /// The task must be in the queue (status Queued). This is used by the
500    /// orchestrator to atomically claim a submitted task before execution.
501    pub fn start_task(&self, task_id: Uuid) -> Result<()> {
502        let task = {
503            let mut queue = self.queue.lock();
504            let all: Vec<ScheduledTask> = queue.drain().collect();
505            let mut found: Option<ScheduledTask> = None;
506            let remaining: Vec<ScheduledTask> = all
507                .into_iter()
508                .filter(|t| {
509                    if t.id == task_id {
510                        found = Some(t.clone());
511                        false
512                    } else {
513                        true
514                    }
515                })
516                .collect();
517            *queue = remaining.into_iter().collect();
518            found
519        };
520
521        match task {
522            Some(mut task) => {
523                task.status = TaskStatus::Running;
524                let mut start_times = self.task_start_times.lock();
525                start_times.insert(task.id, Utc::now());
526                let mut running = self.running.lock();
527                running.insert(task.id, task);
528                Ok(())
529            }
530            None => Err(anyhow::anyhow!("task {task_id} not found in queue")),
531        }
532    }
533
534    /// Cancels a queued task by ID.
535    ///
536    /// Only works on tasks still in the queue (not yet running).
537    pub fn cancel_task(&self, task_id: Uuid) -> Result<()> {
538        let mut queue = self.queue.lock();
539        let all: Vec<ScheduledTask> = queue.drain().collect();
540        let mut found = false;
541        let remaining: Vec<ScheduledTask> = all
542            .into_iter()
543            .filter(|t| {
544                if t.id == task_id && t.status == TaskStatus::Queued {
545                    found = true;
546                    false
547                } else {
548                    true
549                }
550            })
551            .collect();
552        *queue = remaining.into_iter().collect();
553
554        if found {
555            tracing::info!(task_id = %task_id, "Task cancelled from queue");
556            Ok(())
557        } else {
558            tracing::warn!(task_id = %task_id, "Task not found in queue for cancellation");
559            Err(anyhow::anyhow!("task not found in queue"))
560        }
561    }
562
563    /// Returns the current scheduler statistics.
564    pub fn stats(&self) -> SchedulerStats {
565        let queue = self.queue.lock();
566        let running = self.running.lock();
567        let rate_limiter = self.rate_limiter.lock();
568
569        let _completed = 0usize;
570        let _failed = 0usize;
571
572        SchedulerStats {
573            queued: queue.len(),
574            running: running.len(),
575            completed: _completed,
576            failed: _failed,
577            max_concurrent: self
578                .max_concurrent
579                .load(std::sync::atomic::Ordering::Relaxed),
580            rate_limit_per_minute: rate_limiter.max_requests,
581            rate_remaining: rate_limiter.remaining(),
582        }
583    }
584
585    /// Returns remaining rate limit capacity.
586    pub fn rate_limit_remaining(&self) -> u32 {
587        self.rate_limiter.lock().remaining()
588    }
589
590    /// Returns all queued tasks (for debugging/monitoring).
591    pub fn queued_tasks(&self) -> Vec<ScheduledTask> {
592        let heap = self.queue.lock();
593        let mut tasks: Vec<ScheduledTask> = heap.iter().cloned().collect();
594        // Sort ascending by priority so highest priority is at the end
595        // (matches the original Vec-based behavior for test compatibility).
596        tasks.sort_by_key(|a| a.priority);
597        tasks
598    }
599
600    /// Returns all running tasks (for debugging/monitoring).
601    pub fn running_tasks(&self) -> Vec<ScheduledTask> {
602        self.running.lock().values().cloned().collect()
603    }
604}
605
606impl Default for AgentScheduler {
607    fn default() -> Self {
608        Self::new(5, 60, 300)
609    }
610}
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615    use std::thread;
616    use std::time::Duration;
617
618    #[test]
619    fn test_task_creation() {
620        let task = ScheduledTask::new("Test task".into(), Priority::Normal);
621        assert_eq!(task.status, TaskStatus::Queued);
622        assert!(task.agent_id.is_none());
623        assert!(!task.error.is_some());
624    }
625
626    #[test]
627    fn test_task_creation_for_agent() {
628        let agent_id = AgentId::new_v4();
629        let task = ScheduledTask::for_agent(agent_id, "Agent task".into(), Priority::High);
630        assert_eq!(task.agent_id, Some(agent_id));
631        assert_eq!(task.priority, Priority::High);
632    }
633
634    #[test]
635    fn test_priority_ordering() {
636        assert!(Priority::Critical > Priority::High);
637        assert!(Priority::High > Priority::Normal);
638        assert!(Priority::Normal > Priority::Low);
639        // Check transitivity
640        assert!(Priority::Critical > Priority::Normal);
641        assert!(Priority::Critical > Priority::Low);
642        assert!(Priority::High > Priority::Low);
643    }
644
645    #[test]
646    fn test_priority_ordering_eq() {
647        assert_eq!(Priority::Low, Priority::Low);
648        assert_eq!(Priority::Normal, Priority::Normal);
649        assert_eq!(Priority::High, Priority::High);
650        assert_eq!(Priority::Critical, Priority::Critical);
651    }
652
653    #[test]
654    fn test_submit_and_next_high_priority_first() {
655        let scheduler = AgentScheduler::new(10, 10_000, 60);
656
657        scheduler
658            .submit(ScheduledTask::new("Low priority".into(), Priority::Low))
659            .unwrap();
660        scheduler
661            .submit(ScheduledTask::new("High priority".into(), Priority::High))
662            .unwrap();
663        scheduler
664            .submit(ScheduledTask::new(
665                "Normal priority".into(),
666                Priority::Normal,
667            ))
668            .unwrap();
669
670        // High priority should come first.
671        let next = scheduler.next_task().unwrap();
672        assert_eq!(next.priority, Priority::High);
673
674        // Normal next.
675        let next = scheduler.next_task().unwrap();
676        assert_eq!(next.priority, Priority::Normal);
677
678        // Low last.
679        let next = scheduler.next_task().unwrap();
680        assert_eq!(next.priority, Priority::Low);
681    }
682
683    #[test]
684    fn test_submit_and_next_critical_first() {
685        let scheduler = AgentScheduler::new(10, 10_000, 60);
686
687        scheduler
688            .submit(ScheduledTask::new("Low".into(), Priority::Low))
689            .unwrap();
690        scheduler
691            .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
692            .unwrap();
693        scheduler
694            .submit(ScheduledTask::new("High".into(), Priority::High))
695            .unwrap();
696        scheduler
697            .submit(ScheduledTask::new("Critical".into(), Priority::Critical))
698            .unwrap();
699
700        // Critical should be first.
701        let next = scheduler.next_task().unwrap();
702        assert_eq!(next.priority, Priority::Critical);
703        // Then High.
704        let next = scheduler.next_task().unwrap();
705        assert_eq!(next.priority, Priority::High);
706        // Then Normal.
707        let next = scheduler.next_task().unwrap();
708        assert_eq!(next.priority, Priority::Normal);
709        // Then Low.
710        let next = scheduler.next_task().unwrap();
711        assert_eq!(next.priority, Priority::Low);
712    }
713
714    #[test]
715    fn test_submit_multiple_same_priority() {
716        let scheduler = AgentScheduler::new(10, 10_000, 60);
717
718        // Multiple tasks at same priority — BinaryHeap does not guarantee
719        // FIFO/LIFO within the same priority level.
720        scheduler
721            .submit(ScheduledTask::new("First".into(), Priority::Normal))
722            .unwrap();
723        scheduler
724            .submit(ScheduledTask::new("Second".into(), Priority::Normal))
725            .unwrap();
726        scheduler
727            .submit(ScheduledTask::new("Third".into(), Priority::Normal))
728            .unwrap();
729
730        // All three should be popped with Normal priority; exact order is unspecified.
731        let mut descriptions = Vec::new();
732        for _ in 0..3 {
733            let next = scheduler.next_task().unwrap();
734            assert_eq!(next.priority, Priority::Normal);
735            descriptions.push(next.description);
736        }
737        descriptions.sort();
738        assert_eq!(descriptions, vec!["First", "Second", "Third"]);
739    }
740
741    #[test]
742    fn test_max_concurrent_blocks() {
743        let scheduler = AgentScheduler::new(2, 10_000, 60);
744
745        scheduler
746            .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
747            .unwrap();
748        scheduler
749            .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
750            .unwrap();
751        scheduler
752            .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
753            .unwrap();
754
755        assert!(scheduler.next_task().is_some());
756        assert!(scheduler.next_task().is_some());
757        // Third should be None due to max concurrent.
758        assert!(scheduler.next_task().is_none());
759    }
760
761    #[test]
762    fn test_max_concurrent_allows_when_slot_frees() {
763        let scheduler = AgentScheduler::new(2, 10_000, 60); // 2 max concurrent.
764
765        let _ = scheduler
766            .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
767            .unwrap();
768        let _id2 = scheduler
769            .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
770            .unwrap();
771        // Queue (insert at 0 prepends): [Task 2, Task 1]
772
773        // Start 2 tasks (fills max_concurrent).
774        let t1 = scheduler.next_task().unwrap(); // Task 2 (first popped).
775        let t2 = scheduler.next_task().unwrap(); // Task 1 (second popped).
776                                                 // Running: [Task 2, Task 1], Queue: []
777        assert!(scheduler.next_task().is_none()); // Blocked.
778
779        // Complete both tasks.
780        scheduler.complete_task(t1.id).unwrap();
781        scheduler.complete_task(t2.id).unwrap();
782
783        // Submit a new task now that slots have freed.
784        let _id3 = scheduler
785            .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
786            .unwrap();
787
788        // Now next_task should work.
789        let task = scheduler.next_task().unwrap();
790        assert_eq!(task.description, "Task 3");
791
792        // Clean up.
793        scheduler.complete_task(task.id).unwrap();
794    }
795
796    #[test]
797    fn test_complete_task_removes_from_running() {
798        let scheduler = AgentScheduler::new(2, 10_000, 60);
799        let task = ScheduledTask::new("Test".into(), Priority::Normal);
800        let id = scheduler.submit(task).unwrap();
801
802        let _ = scheduler.next_task();
803        scheduler.complete_task(id).unwrap();
804
805        let stats = scheduler.stats();
806        assert_eq!(stats.running, 0);
807    }
808
809    #[test]
810    fn test_complete_unknown_task_returns_error() {
811        let scheduler = AgentScheduler::new(2, 10_000, 60);
812        let result = scheduler.complete_task(Uuid::new_v4());
813        assert!(result.is_err());
814    }
815
816    #[test]
817    fn test_fail_task_sets_error() {
818        let scheduler = AgentScheduler::new(2, 10_000, 60);
819        let task = ScheduledTask::new("Test".into(), Priority::Normal);
820        let id = scheduler.submit(task).unwrap();
821
822        let _ = scheduler.next_task();
823        scheduler.fail_task(id, "Something went wrong").unwrap();
824
825        let running = scheduler.running.lock();
826        assert!(!running.contains_key(&id));
827    }
828
829    #[test]
830    fn test_cancel_queued_task() {
831        let scheduler = AgentScheduler::new(2, 10_000, 60);
832        let id = scheduler
833            .submit(ScheduledTask::new("To cancel".into(), Priority::Normal))
834            .unwrap();
835
836        scheduler.cancel_task(id).unwrap();
837
838        // Queue should be empty now.
839        assert!(scheduler.next_task().is_none());
840    }
841
842    #[test]
843    fn test_cancel_running_task_fails() {
844        let scheduler = AgentScheduler::new(2, 10_000, 60);
845        let id = scheduler
846            .submit(ScheduledTask::new("Running".into(), Priority::Normal))
847            .unwrap();
848
849        let _ = scheduler.next_task(); // Task is now running.
850
851        // Can't cancel a running task.
852        let result = scheduler.cancel_task(id);
853        assert!(result.is_err());
854    }
855
856    #[test]
857    fn test_cancel_unknown_task_fails() {
858        let scheduler = AgentScheduler::new(2, 10_000, 60);
859        let result = scheduler.cancel_task(Uuid::new_v4());
860        assert!(result.is_err());
861    }
862
863    #[test]
864    fn test_stats_tracking() {
865        let scheduler = AgentScheduler::new(2, 60, 60);
866
867        let id1 = scheduler
868            .submit(ScheduledTask::new("Queued".into(), Priority::Normal))
869            .unwrap();
870        scheduler
871            .submit(ScheduledTask::new("Queued 2".into(), Priority::Low))
872            .unwrap();
873
874        // Start one task.
875        let started = scheduler.next_task().unwrap();
876        assert_eq!(started.id, id1);
877
878        let stats = scheduler.stats();
879        assert_eq!(stats.queued, 1); // One still in queue.
880        assert_eq!(stats.running, 1);
881        assert_eq!(stats.max_concurrent, 2);
882        assert_eq!(stats.rate_limit_per_minute, 60);
883    }
884
885    #[test]
886    fn test_reap_zombies() {
887        // Create a scheduler with a very short zombie timeout.
888        let scheduler = AgentScheduler::new(2, 10_000, 1); // 1 second timeout.
889
890        // Submit and start a task.
891        let id = scheduler
892            .submit(ScheduledTask::new("Zombie".into(), Priority::Normal))
893            .unwrap();
894        let _ = scheduler.next_task();
895
896        // Wait longer than the zombie timeout.
897        thread::sleep(Duration::from_millis(1_100));
898
899        // Reap zombies.
900        let reaped = scheduler.reap_zombies();
901        assert!(reaped.contains(&id));
902
903        // Task should no longer be running.
904        assert!(scheduler.running.lock().get(&id).is_none());
905    }
906
907    #[test]
908    fn test_reap_zombies_no_zombies() {
909        let scheduler = AgentScheduler::new(2, 10_000, 60); // Long timeout.
910
911        let id = scheduler
912            .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
913            .unwrap();
914        let _ = scheduler.next_task();
915
916        // No sleep, so no zombies yet.
917        let reaped = scheduler.reap_zombies();
918        assert!(reaped.is_empty());
919
920        // Task still running.
921        assert!(scheduler.running.lock().get(&id).is_some());
922    }
923
924    #[test]
925    fn test_rate_limiter_basic() {
926        let mut limiter = RateLimiter::new(60, 3); // 3 requests per minute.
927
928        assert!(limiter.allow());
929        assert!(limiter.allow());
930        assert!(limiter.allow());
931        // 4th request should be blocked.
932        assert!(!limiter.allow());
933    }
934
935    #[test]
936    fn test_rate_limiter_remaining() {
937        let limiter = RateLimiter::new(60, 3);
938
939        assert_eq!(limiter.remaining(), 3);
940
941        let mut limiter = RateLimiter::new(60, 3);
942        limiter.allow();
943        limiter.allow();
944        assert_eq!(limiter.remaining(), 1);
945    }
946
947    #[test]
948    fn test_rate_limiter_tracks_per_scheduler() {
949        let scheduler = AgentScheduler::new(10, 5, 60); // Only 5 requests allowed, high concurrency.
950
951        // Consume all rate limit by calling next_task (not submit).
952        for i in 0..5 {
953            scheduler
954                .submit(ScheduledTask::new(format!("T{}", i), Priority::Normal))
955                .unwrap();
956            let _ = scheduler.next_task();
957        }
958
959        // Should be rate limited.
960        assert!(scheduler.next_task().is_none());
961        assert_eq!(scheduler.rate_limit_remaining(), 0);
962    }
963
964    #[test]
965    fn test_queued_tasks_inspection() {
966        let scheduler = AgentScheduler::new(2, 10_000, 60);
967
968        scheduler
969            .submit(ScheduledTask::new("A".into(), Priority::Low))
970            .unwrap();
971        scheduler
972            .submit(ScheduledTask::new("B".into(), Priority::High))
973            .unwrap();
974        scheduler
975            .submit(ScheduledTask::new("C".into(), Priority::Normal))
976            .unwrap();
977
978        let queued = scheduler.queued_tasks();
979        assert_eq!(queued.len(), 3);
980        // Order is by priority (highest at back for pop).
981        // High should be at the back.
982        assert_eq!(queued.last().unwrap().description, "B");
983    }
984
985    #[test]
986    fn test_running_tasks_inspection() {
987        let scheduler = AgentScheduler::new(2, 10_000, 60);
988
989        scheduler
990            .submit(ScheduledTask::new("R1".into(), Priority::Normal))
991            .unwrap();
992        scheduler
993            .submit(ScheduledTask::new("R2".into(), Priority::Normal))
994            .unwrap();
995
996        let _ = scheduler.next_task();
997        let _ = scheduler.next_task();
998
999        let running = scheduler.running_tasks();
1000        assert_eq!(running.len(), 2);
1001    }
1002
1003    #[test]
1004    fn test_default_scheduler() {
1005        let scheduler = AgentScheduler::default();
1006        let stats = scheduler.stats();
1007        assert_eq!(stats.max_concurrent, 5);
1008        assert_eq!(stats.rate_limit_per_minute, 60);
1009    }
1010
1011    #[test]
1012    fn test_budget_manager_integration_skips_exhausted_agent() {
1013        use crate::budget::{BudgetLimit, BudgetManager};
1014
1015        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1016        let budget_manager = Arc::new(BudgetManager::new());
1017
1018        // Set a very low budget (1 call).
1019        let agent_id = AgentId::new_v4();
1020        budget_manager.set_budget(BudgetLimit {
1021            agent_id,
1022            token_budget: 1000,
1023            calls_budget: 1,
1024            window_secs: 60,
1025        });
1026
1027        // Attach budget manager to scheduler.
1028        scheduler
1029            .lock()
1030            .set_budget_manager(Arc::clone(&budget_manager));
1031
1032        // Submit two tasks for the same agent.
1033        scheduler
1034            .lock()
1035            .submit(ScheduledTask::for_agent(
1036                agent_id,
1037                "Task 1".into(),
1038                Priority::Normal,
1039            ))
1040            .unwrap();
1041        scheduler
1042            .lock()
1043            .submit(ScheduledTask::for_agent(
1044                agent_id,
1045                "Task 2".into(),
1046                Priority::Normal,
1047            ))
1048            .unwrap();
1049
1050        // First task should run (track_call succeeds).
1051        let task1 = scheduler.lock().next_task();
1052        assert!(task1.is_some());
1053        scheduler.lock().complete_task(task1.unwrap().id).unwrap();
1054
1055        // Second task should be skipped (budget exhausted).
1056        let task2 = scheduler.lock().next_task();
1057        assert!(task2.is_none());
1058    }
1059
1060    #[test]
1061    fn test_budget_manager_allows_different_agents() {
1062        use crate::budget::{BudgetLimit, BudgetManager};
1063
1064        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1065        let budget_manager = Arc::new(BudgetManager::new());
1066
1067        let agent1 = AgentId::new_v4();
1068        let agent2 = AgentId::new_v4();
1069
1070        // Set budget for both agents (3 calls each).
1071        for agent_id in [&agent1, &agent2] {
1072            budget_manager.set_budget(BudgetLimit {
1073                agent_id: *agent_id,
1074                token_budget: 1000,
1075                calls_budget: 3,
1076                window_secs: 60,
1077            });
1078        }
1079
1080        scheduler
1081            .lock()
1082            .set_budget_manager(Arc::clone(&budget_manager));
1083
1084        // Submit tasks for both agents.
1085        scheduler
1086            .lock()
1087            .submit(ScheduledTask::for_agent(
1088                agent1,
1089                "A1".into(),
1090                Priority::Normal,
1091            ))
1092            .unwrap();
1093        scheduler
1094            .lock()
1095            .submit(ScheduledTask::for_agent(
1096                agent2,
1097                "B1".into(),
1098                Priority::Normal,
1099            ))
1100            .unwrap();
1101
1102        // Both should run.
1103        let t1 = scheduler.lock().next_task().unwrap();
1104        let t2 = scheduler.lock().next_task().unwrap();
1105        assert_ne!(t1.description, t2.description);
1106    }
1107
1108    #[test]
1109    fn test_budget_manager_task_without_agent_id() {
1110        use crate::budget::BudgetManager;
1111
1112        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1113        let budget_manager = Arc::new(BudgetManager::new());
1114
1115        scheduler
1116            .lock()
1117            .set_budget_manager(Arc::clone(&budget_manager));
1118
1119        // Submit a task without an agent ID.
1120        scheduler
1121            .lock()
1122            .submit(ScheduledTask::new("No agent".into(), Priority::Normal))
1123            .unwrap();
1124
1125        // Should still run (no budget check for tasks without agent).
1126        let task = scheduler.lock().next_task();
1127        assert!(task.is_some());
1128    }
1129
1130    #[test]
1131    fn test_budget_manager_not_set_skips_check() {
1132        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1133        // No budget manager attached.
1134
1135        scheduler
1136            .lock()
1137            .submit(ScheduledTask::new("Any task".into(), Priority::Normal))
1138            .unwrap();
1139
1140        // Should run normally without budget manager.
1141        let task = scheduler.lock().next_task();
1142        assert!(task.is_some());
1143    }
1144}