Skip to main content

oxios_kernel/
scheduler.rs

1//! Agent Scheduler — priority-based task queue inspired by AIOS / AgentRM.
2//!
3//! Manages agent task scheduling with:
4//! - Priority queue (FIFO within same priority)
5//! - Rate-limit-aware admission control
6//! - Zombie task detection and reaping
7//! - Maximum concurrent task enforcement
8
9use crate::budget::BudgetManager;
10use crate::types::AgentId;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use std::collections::{BinaryHeap, HashMap};
16use std::sync::Arc;
17use uuid::Uuid;
18
19/// Priority levels for scheduled tasks.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
21pub enum Priority {
22    /// Low priority, good for background work.
23    Low = 0,
24    /// Normal priority, default for most tasks.
25    #[default]
26    Normal = 1,
27    /// High priority, important tasks.
28    High = 2,
29    /// Critical priority, must execute immediately.
30    Critical = 3,
31}
32
33/// Status of a scheduled task.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum TaskStatus {
36    /// Task is queued, waiting for execution.
37    Queued,
38    /// Task is currently running.
39    Running,
40    /// Task completed successfully.
41    Completed,
42    /// Task failed with an error.
43    Failed,
44    /// Task was cancelled before execution.
45    Cancelled,
46}
47
48/// A scheduled task for an agent.
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ScheduledTask {
51    /// Unique task identifier.
52    pub id: Uuid,
53    /// Associated agent ID, if any.
54    pub agent_id: Option<AgentId>,
55    /// Human-readable task description.
56    pub description: String,
57    /// Task priority level.
58    pub priority: Priority,
59    /// When the task was created.
60    pub created_at: DateTime<Utc>,
61    /// Current status of the task.
62    pub status: TaskStatus,
63    /// Error message if the task failed.
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub error: Option<String>,
66}
67
68impl PartialOrd for ScheduledTask {
69    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
70        Some(self.cmp(other))
71    }
72}
73
74impl Ord for ScheduledTask {
75    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
76        // Higher priority first; within same priority, newer tasks first (LIFO)
77        // so that BinaryHeap::pop() returns the highest-priority, newest task.
78        self.priority
79            .cmp(&other.priority)
80            .then_with(|| other.created_at.cmp(&self.created_at))
81    }
82}
83
84impl ScheduledTask {
85    /// Creates a new scheduled task.
86    pub fn new(description: String, priority: Priority) -> Self {
87        Self {
88            id: Uuid::new_v4(),
89            agent_id: None,
90            description,
91            priority,
92            created_at: Utc::now(),
93            status: TaskStatus::Queued,
94            error: None,
95        }
96    }
97
98    /// Creates a task associated with a specific agent.
99    pub fn for_agent(agent_id: AgentId, description: String, priority: Priority) -> Self {
100        Self {
101            id: Uuid::new_v4(),
102            agent_id: Some(agent_id),
103            description,
104            priority,
105            created_at: Utc::now(),
106            status: TaskStatus::Queued,
107            error: None,
108        }
109    }
110}
111
112/// Statistics about the scheduler state.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct SchedulerStats {
115    /// Number of queued tasks.
116    pub queued: usize,
117    /// Number of currently running tasks.
118    pub running: usize,
119    /// Number of completed tasks.
120    pub completed: usize,
121    /// Number of failed tasks.
122    pub failed: usize,
123    /// Maximum allowed concurrent tasks.
124    pub max_concurrent: usize,
125    /// Rate limit (requests per minute).
126    pub rate_limit_per_minute: u32,
127    /// Remaining rate limit capacity.
128    pub rate_remaining: u32,
129}
130
131impl Default for SchedulerStats {
132    fn default() -> Self {
133        Self {
134            queued: 0,
135            running: 0,
136            completed: 0,
137            failed: 0,
138            max_concurrent: 5,
139            rate_limit_per_minute: 60,
140            rate_remaining: 60,
141        }
142    }
143}
144
145/// Rate limiter state for tracking API call rates.
146#[derive(Debug, Clone)]
147struct RateLimiter {
148    /// Timestamps of recent requests.
149    window: Vec<DateTime<Utc>>,
150    /// Window duration in seconds.
151    window_secs: u64,
152    /// Maximum requests per window.
153    max_requests: u32,
154}
155
156impl RateLimiter {
157    fn new(window_secs: u64, max_requests: u32) -> Self {
158        Self {
159            window: Vec::new(),
160            window_secs,
161            max_requests,
162        }
163    }
164
165    /// Check if a new request is allowed under rate limits.
166    fn allow(&mut self) -> bool {
167        let now = Utc::now();
168        let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
169
170        // Prune old entries.
171        self.window.retain(|t| *t > cutoff);
172
173        if self.window.len() >= self.max_requests as usize {
174            return false;
175        }
176
177        self.window.push(now);
178        true
179    }
180
181    /// Get remaining capacity in the current window.
182    fn remaining(&self) -> u32 {
183        let now = Utc::now();
184        let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
185        let active = self.window.iter().filter(|t| **t > cutoff).count();
186        self.max_requests.saturating_sub(active as u32)
187    }
188}
189
190/// The agent scheduler.
191///
192/// Manages task queues, rate limiting, zombie detection, and priority scheduling.
193/// This is the central coordinator for all agent task execution.
194pub struct AgentScheduler {
195    /// The task queue (priority max-heap).
196    queue: Arc<Mutex<BinaryHeap<ScheduledTask>>>,
197    /// Currently running tasks.
198    running: Arc<Mutex<HashMap<Uuid, ScheduledTask>>>,
199    /// Maximum concurrent tasks allowed.
200    max_concurrent: usize,
201    /// Rate limiter for LLM API calls.
202    rate_limiter: Arc<Mutex<RateLimiter>>,
203    /// Timeout for zombie detection (seconds).
204    zombie_timeout_secs: u64,
205    /// Track when each running task started (for zombie detection).
206    task_start_times: Arc<Mutex<HashMap<Uuid, DateTime<Utc>>>>,
207    /// Optional budget manager for scheduling checks.
208    budget_manager: Option<Arc<BudgetManager>>,
209}
210
211impl AgentScheduler {
212    /// Creates a new scheduler.
213    ///
214    /// # Arguments
215    /// * `max_concurrent` - Maximum number of tasks that can run simultaneously
216    /// * `rate_limit_per_minute` - Maximum LLM API calls per minute
217    /// * `zombie_timeout_secs` - How long before a running task is considered a zombie
218    pub fn new(
219        max_concurrent: usize,
220        rate_limit_per_minute: u32,
221        zombie_timeout_secs: u64,
222    ) -> Self {
223        Self {
224            queue: Arc::new(Mutex::new(BinaryHeap::new())),
225            running: Arc::new(Mutex::new(HashMap::new())),
226            max_concurrent,
227            rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60, rate_limit_per_minute))),
228            zombie_timeout_secs,
229            task_start_times: Arc::new(Mutex::new(HashMap::new())),
230            budget_manager: None,
231        }
232    }
233
234    /// Attaches a budget manager for scheduling checks.
235    ///
236    /// When a budget manager is set, the scheduler will:
237    /// - Check `can_schedule()` before starting a task (soft gate)
238    /// - Track calls via `track_call()` when a task begins
239    ///
240    /// If no budget manager is set, tasks proceed normally.
241    pub fn set_budget_manager(&mut self, bm: Arc<BudgetManager>) {
242        self.budget_manager = Some(bm);
243    }
244
245    /// Submits a task to the scheduler queue.
246    ///
247    /// Returns the task ID on success.
248    pub fn submit(&self, mut task: ScheduledTask) -> Result<Uuid> {
249        task.status = TaskStatus::Queued;
250        let id = task.id;
251
252        let mut queue = self.queue.lock();
253        queue.push(task); // O(log N) — BinaryHeap maintains heap property
254
255        tracing::debug!(
256            task_id = %id,
257            queue_len = queue.len(),
258            "Task submitted to scheduler"
259        );
260
261        Ok(id)
262    }
263
264    /// Gets the next task ready for execution.
265    ///
266    /// Returns `None` if:
267    /// - The queue is empty
268    /// - Max concurrent limit is reached
269    /// - Rate limit is exceeded
270    pub fn next_task(&self) -> Option<ScheduledTask> {
271        // Check if we can start a new task.
272        {
273            let running = self.running.lock();
274            if running.len() >= self.max_concurrent {
275                tracing::debug!(
276                    running = running.len(),
277                    max = self.max_concurrent,
278                    "Max concurrent limit reached"
279                );
280                return None;
281            }
282        }
283
284        // Check rate limit.
285        {
286            let mut limiter = self.rate_limiter.lock();
287            if !limiter.allow() {
288                tracing::debug!(remaining = limiter.remaining(), "Rate limit exceeded");
289                return None;
290            }
291        }
292
293        // Pop tasks iteratively, skipping agents with exhausted budgets.
294        let mut discarded: usize = 0;
295        let mut task = loop {
296            let task_opt = {
297                let mut queue = self.queue.lock();
298                queue.pop() // O(log N) — BinaryHeap returns max-priority task
299            };
300
301            match task_opt {
302                Some(t) => {
303                    // Check budget before scheduling (soft gate).
304                    if let (Some(ref bm), Some(ref agent_id)) = (&self.budget_manager, &t.agent_id)
305                    {
306                        if !bm.can_schedule(agent_id) {
307                            tracing::warn!(
308                                agent_id = %agent_id,
309                                "Agent budget exhausted, skipping task"
310                            );
311                            discarded += 1;
312                            continue; // skip, try next task
313                        }
314                    }
315                    break t;
316                }
317                None => {
318                    if discarded > 0 {
319                        tracing::info!(discarded, "All queued tasks had exhausted budgets");
320                    }
321                    return None;
322                }
323            }
324        };
325
326        if discarded > 0 {
327            tracing::info!(discarded, "Skipped tasks with exhausted budgets");
328        }
329
330        task.status = TaskStatus::Running;
331
332        // Track start time for zombie detection.
333        {
334            let mut start_times = self.task_start_times.lock();
335            start_times.insert(task.id, Utc::now());
336        }
337
338        // Add to running map.
339        {
340            let mut running = self.running.lock();
341            running.insert(task.id, task.clone());
342        }
343
344        tracing::info!(
345            task_id = %task.id,
346            priority = ?task.priority,
347            running = self.running.lock().len(),
348            "Task started by scheduler"
349        );
350
351        // Track call for budget management.
352        if let (Some(ref bm), Some(ref agent_id)) = (&self.budget_manager, &task.agent_id) {
353            if let Err(e) = bm.track_call(agent_id) {
354                tracing::warn!(
355                    agent_id = %agent_id,
356                    error = %e,
357                    "Budget exceeded during task track_call"
358                );
359            }
360        }
361
362        Some(task)
363    }
364
365    /// Marks a task as completed.
366    ///
367    /// Removes the task from the running map.
368    pub fn complete_task(&self, task_id: Uuid) -> Result<()> {
369        let task = {
370            let mut running = self.running.lock();
371            running.remove(&task_id)
372        };
373
374        match task {
375            Some(mut t) => {
376                t.status = TaskStatus::Completed;
377
378                // Clean up start time tracking.
379                {
380                    let mut start_times = self.task_start_times.lock();
381                    start_times.remove(&task_id);
382                }
383
384                tracing::info!(task_id = %task_id, "Task completed");
385                Ok(())
386            }
387            None => {
388                tracing::warn!(task_id = %task_id, "Attempted to complete unknown task");
389                Err(anyhow::anyhow!("task not found"))
390            }
391        }
392    }
393
394    /// Marks a task as failed with an error message.
395    ///
396    /// Removes the task from the running map.
397    pub fn fail_task(&self, task_id: Uuid, error: &str) -> Result<()> {
398        let task = {
399            let mut running = self.running.lock();
400            running.remove(&task_id)
401        };
402
403        match task {
404            Some(mut t) => {
405                t.status = TaskStatus::Failed;
406                t.error = Some(error.to_string());
407
408                // Clean up start time tracking.
409                {
410                    let mut start_times = self.task_start_times.lock();
411                    start_times.remove(&task_id);
412                }
413
414                tracing::warn!(task_id = %task_id, error = %error, "Task failed");
415                Ok(())
416            }
417            None => {
418                tracing::warn!(task_id = %task_id, "Attempted to fail unknown task");
419                Err(anyhow::anyhow!("task not found"))
420            }
421        }
422    }
423
424    /// Detects and reaps zombie tasks (running longer than the configured timeout).
425    ///
426    /// Returns the IDs of tasks that were reaped.
427    pub fn reap_zombies(&self) -> Vec<Uuid> {
428        let now = Utc::now();
429        let timeout = chrono::Duration::seconds(self.zombie_timeout_secs as i64);
430        let mut start_times = self.task_start_times.lock();
431        let mut running = self.running.lock();
432        let mut reaped = Vec::new();
433
434        let zombie_ids: Vec<Uuid> = start_times
435            .iter()
436            .filter(|(_, start)| now - **start > timeout)
437            .map(|(id, _)| *id)
438            .collect();
439
440        for id in zombie_ids {
441            if let Some(mut task) = running.remove(&id) {
442                task.status = TaskStatus::Failed;
443                task.error = Some(format!(
444                    "zombie: ran for >{} seconds",
445                    self.zombie_timeout_secs
446                ));
447                reaped.push(id);
448                tracing::warn!(
449                    task_id = %id,
450                    duration_secs = self.zombie_timeout_secs,
451                    "Zombie task reaped"
452                );
453            }
454            // Remove from start times.
455            start_times.remove(&id);
456        }
457
458        reaped
459    }
460
461    /// Marks a task as running by task ID.
462    ///
463    /// The task must be in the queue (status Queued). This is used by the
464    /// orchestrator to atomically claim a submitted task before execution.
465    pub fn start_task(&self, task_id: Uuid) -> Result<()> {
466        let task = {
467            let mut queue = self.queue.lock();
468            let all: Vec<ScheduledTask> = queue.drain().collect();
469            let mut found: Option<ScheduledTask> = None;
470            let remaining: Vec<ScheduledTask> = all
471                .into_iter()
472                .filter(|t| {
473                    if t.id == task_id {
474                        found = Some(t.clone());
475                        false
476                    } else {
477                        true
478                    }
479                })
480                .collect();
481            *queue = remaining.into_iter().collect();
482            found
483        };
484
485        match task {
486            Some(mut task) => {
487                task.status = TaskStatus::Running;
488                let mut start_times = self.task_start_times.lock();
489                start_times.insert(task.id, Utc::now());
490                let mut running = self.running.lock();
491                running.insert(task.id, task);
492                Ok(())
493            }
494            None => Err(anyhow::anyhow!("task {} not found in queue", task_id)),
495        }
496    }
497
498    /// Cancels a queued task by ID.
499    ///
500    /// Only works on tasks still in the queue (not yet running).
501    pub fn cancel_task(&self, task_id: Uuid) -> Result<()> {
502        let mut queue = self.queue.lock();
503        let all: Vec<ScheduledTask> = queue.drain().collect();
504        let mut found = false;
505        let remaining: Vec<ScheduledTask> = all
506            .into_iter()
507            .filter(|t| {
508                if t.id == task_id && t.status == TaskStatus::Queued {
509                    found = true;
510                    false
511                } else {
512                    true
513                }
514            })
515            .collect();
516        *queue = remaining.into_iter().collect();
517
518        if found {
519            tracing::info!(task_id = %task_id, "Task cancelled from queue");
520            Ok(())
521        } else {
522            tracing::warn!(task_id = %task_id, "Task not found in queue for cancellation");
523            Err(anyhow::anyhow!("task not found in queue"))
524        }
525    }
526
527    /// Returns the current scheduler statistics.
528    pub fn stats(&self) -> SchedulerStats {
529        let queue = self.queue.lock();
530        let running = self.running.lock();
531        let rate_limiter = self.rate_limiter.lock();
532
533        let _completed = 0usize;
534        let _failed = 0usize;
535
536        SchedulerStats {
537            queued: queue.len(),
538            running: running.len(),
539            completed: _completed,
540            failed: _failed,
541            max_concurrent: self.max_concurrent,
542            rate_limit_per_minute: rate_limiter.max_requests,
543            rate_remaining: rate_limiter.remaining(),
544        }
545    }
546
547    /// Returns remaining rate limit capacity.
548    pub fn rate_limit_remaining(&self) -> u32 {
549        self.rate_limiter.lock().remaining()
550    }
551
552    /// Returns all queued tasks (for debugging/monitoring).
553    pub fn queued_tasks(&self) -> Vec<ScheduledTask> {
554        let heap = self.queue.lock();
555        let mut tasks: Vec<ScheduledTask> = heap.iter().cloned().collect();
556        // Sort ascending by priority so highest priority is at the end
557        // (matches the original Vec-based behavior for test compatibility).
558        tasks.sort_by_key(|a| a.priority);
559        tasks
560    }
561
562    /// Returns all running tasks (for debugging/monitoring).
563    pub fn running_tasks(&self) -> Vec<ScheduledTask> {
564        self.running.lock().values().cloned().collect()
565    }
566}
567
568impl Default for AgentScheduler {
569    fn default() -> Self {
570        Self::new(5, 60, 300)
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577    use std::thread;
578    use std::time::Duration;
579
580    #[test]
581    fn test_task_creation() {
582        let task = ScheduledTask::new("Test task".into(), Priority::Normal);
583        assert_eq!(task.status, TaskStatus::Queued);
584        assert!(task.agent_id.is_none());
585        assert!(!task.error.is_some());
586    }
587
588    #[test]
589    fn test_task_creation_for_agent() {
590        let agent_id = AgentId::new_v4();
591        let task = ScheduledTask::for_agent(agent_id, "Agent task".into(), Priority::High);
592        assert_eq!(task.agent_id, Some(agent_id));
593        assert_eq!(task.priority, Priority::High);
594    }
595
596    #[test]
597    fn test_priority_ordering() {
598        assert!(Priority::Critical > Priority::High);
599        assert!(Priority::High > Priority::Normal);
600        assert!(Priority::Normal > Priority::Low);
601        // Check transitivity
602        assert!(Priority::Critical > Priority::Normal);
603        assert!(Priority::Critical > Priority::Low);
604        assert!(Priority::High > Priority::Low);
605    }
606
607    #[test]
608    fn test_priority_ordering_eq() {
609        assert_eq!(Priority::Low, Priority::Low);
610        assert_eq!(Priority::Normal, Priority::Normal);
611        assert_eq!(Priority::High, Priority::High);
612        assert_eq!(Priority::Critical, Priority::Critical);
613    }
614
615    #[test]
616    fn test_submit_and_next_high_priority_first() {
617        let scheduler = AgentScheduler::new(10, 10_000, 60);
618
619        scheduler
620            .submit(ScheduledTask::new("Low priority".into(), Priority::Low))
621            .unwrap();
622        scheduler
623            .submit(ScheduledTask::new("High priority".into(), Priority::High))
624            .unwrap();
625        scheduler
626            .submit(ScheduledTask::new(
627                "Normal priority".into(),
628                Priority::Normal,
629            ))
630            .unwrap();
631
632        // High priority should come first.
633        let next = scheduler.next_task().unwrap();
634        assert_eq!(next.priority, Priority::High);
635
636        // Normal next.
637        let next = scheduler.next_task().unwrap();
638        assert_eq!(next.priority, Priority::Normal);
639
640        // Low last.
641        let next = scheduler.next_task().unwrap();
642        assert_eq!(next.priority, Priority::Low);
643    }
644
645    #[test]
646    fn test_submit_and_next_critical_first() {
647        let scheduler = AgentScheduler::new(10, 10_000, 60);
648
649        scheduler
650            .submit(ScheduledTask::new("Low".into(), Priority::Low))
651            .unwrap();
652        scheduler
653            .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
654            .unwrap();
655        scheduler
656            .submit(ScheduledTask::new("High".into(), Priority::High))
657            .unwrap();
658        scheduler
659            .submit(ScheduledTask::new("Critical".into(), Priority::Critical))
660            .unwrap();
661
662        // Critical should be first.
663        let next = scheduler.next_task().unwrap();
664        assert_eq!(next.priority, Priority::Critical);
665        // Then High.
666        let next = scheduler.next_task().unwrap();
667        assert_eq!(next.priority, Priority::High);
668        // Then Normal.
669        let next = scheduler.next_task().unwrap();
670        assert_eq!(next.priority, Priority::Normal);
671        // Then Low.
672        let next = scheduler.next_task().unwrap();
673        assert_eq!(next.priority, Priority::Low);
674    }
675
676    #[test]
677    fn test_submit_multiple_same_priority() {
678        let scheduler = AgentScheduler::new(10, 10_000, 60);
679
680        // Multiple tasks at same priority — BinaryHeap does not guarantee
681        // FIFO/LIFO within the same priority level.
682        scheduler
683            .submit(ScheduledTask::new("First".into(), Priority::Normal))
684            .unwrap();
685        scheduler
686            .submit(ScheduledTask::new("Second".into(), Priority::Normal))
687            .unwrap();
688        scheduler
689            .submit(ScheduledTask::new("Third".into(), Priority::Normal))
690            .unwrap();
691
692        // All three should be popped with Normal priority; exact order is unspecified.
693        let mut descriptions = Vec::new();
694        for _ in 0..3 {
695            let next = scheduler.next_task().unwrap();
696            assert_eq!(next.priority, Priority::Normal);
697            descriptions.push(next.description);
698        }
699        descriptions.sort();
700        assert_eq!(descriptions, vec!["First", "Second", "Third"]);
701    }
702
703    #[test]
704    fn test_max_concurrent_blocks() {
705        let scheduler = AgentScheduler::new(2, 10_000, 60);
706
707        scheduler
708            .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
709            .unwrap();
710        scheduler
711            .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
712            .unwrap();
713        scheduler
714            .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
715            .unwrap();
716
717        assert!(scheduler.next_task().is_some());
718        assert!(scheduler.next_task().is_some());
719        // Third should be None due to max concurrent.
720        assert!(scheduler.next_task().is_none());
721    }
722
723    #[test]
724    fn test_max_concurrent_allows_when_slot_frees() {
725        let scheduler = AgentScheduler::new(2, 10_000, 60); // 2 max concurrent.
726
727        let _ = scheduler
728            .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
729            .unwrap();
730        let _id2 = scheduler
731            .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
732            .unwrap();
733        // Queue (insert at 0 prepends): [Task 2, Task 1]
734
735        // Start 2 tasks (fills max_concurrent).
736        let t1 = scheduler.next_task().unwrap(); // Task 2 (first popped).
737        let t2 = scheduler.next_task().unwrap(); // Task 1 (second popped).
738                                                 // Running: [Task 2, Task 1], Queue: []
739        assert!(scheduler.next_task().is_none()); // Blocked.
740
741        // Complete both tasks.
742        scheduler.complete_task(t1.id).unwrap();
743        scheduler.complete_task(t2.id).unwrap();
744
745        // Submit a new task now that slots have freed.
746        let _id3 = scheduler
747            .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
748            .unwrap();
749
750        // Now next_task should work.
751        let task = scheduler.next_task().unwrap();
752        assert_eq!(task.description, "Task 3");
753
754        // Clean up.
755        scheduler.complete_task(task.id).unwrap();
756    }
757
758    #[test]
759    fn test_complete_task_removes_from_running() {
760        let scheduler = AgentScheduler::new(2, 10_000, 60);
761        let task = ScheduledTask::new("Test".into(), Priority::Normal);
762        let id = scheduler.submit(task).unwrap();
763
764        let _ = scheduler.next_task();
765        scheduler.complete_task(id).unwrap();
766
767        let stats = scheduler.stats();
768        assert_eq!(stats.running, 0);
769    }
770
771    #[test]
772    fn test_complete_unknown_task_returns_error() {
773        let scheduler = AgentScheduler::new(2, 10_000, 60);
774        let result = scheduler.complete_task(Uuid::new_v4());
775        assert!(result.is_err());
776    }
777
778    #[test]
779    fn test_fail_task_sets_error() {
780        let scheduler = AgentScheduler::new(2, 10_000, 60);
781        let task = ScheduledTask::new("Test".into(), Priority::Normal);
782        let id = scheduler.submit(task).unwrap();
783
784        let _ = scheduler.next_task();
785        scheduler.fail_task(id, "Something went wrong").unwrap();
786
787        let running = scheduler.running.lock();
788        assert!(!running.contains_key(&id));
789    }
790
791    #[test]
792    fn test_cancel_queued_task() {
793        let scheduler = AgentScheduler::new(2, 10_000, 60);
794        let id = scheduler
795            .submit(ScheduledTask::new("To cancel".into(), Priority::Normal))
796            .unwrap();
797
798        scheduler.cancel_task(id).unwrap();
799
800        // Queue should be empty now.
801        assert!(scheduler.next_task().is_none());
802    }
803
804    #[test]
805    fn test_cancel_running_task_fails() {
806        let scheduler = AgentScheduler::new(2, 10_000, 60);
807        let id = scheduler
808            .submit(ScheduledTask::new("Running".into(), Priority::Normal))
809            .unwrap();
810
811        let _ = scheduler.next_task(); // Task is now running.
812
813        // Can't cancel a running task.
814        let result = scheduler.cancel_task(id);
815        assert!(result.is_err());
816    }
817
818    #[test]
819    fn test_cancel_unknown_task_fails() {
820        let scheduler = AgentScheduler::new(2, 10_000, 60);
821        let result = scheduler.cancel_task(Uuid::new_v4());
822        assert!(result.is_err());
823    }
824
825    #[test]
826    fn test_stats_tracking() {
827        let scheduler = AgentScheduler::new(2, 60, 60);
828
829        let id1 = scheduler
830            .submit(ScheduledTask::new("Queued".into(), Priority::Normal))
831            .unwrap();
832        scheduler
833            .submit(ScheduledTask::new("Queued 2".into(), Priority::Low))
834            .unwrap();
835
836        // Start one task.
837        let started = scheduler.next_task().unwrap();
838        assert_eq!(started.id, id1);
839
840        let stats = scheduler.stats();
841        assert_eq!(stats.queued, 1); // One still in queue.
842        assert_eq!(stats.running, 1);
843        assert_eq!(stats.max_concurrent, 2);
844        assert_eq!(stats.rate_limit_per_minute, 60);
845    }
846
847    #[test]
848    fn test_reap_zombies() {
849        // Create a scheduler with a very short zombie timeout.
850        let scheduler = AgentScheduler::new(2, 10_000, 1); // 1 second timeout.
851
852        // Submit and start a task.
853        let id = scheduler
854            .submit(ScheduledTask::new("Zombie".into(), Priority::Normal))
855            .unwrap();
856        let _ = scheduler.next_task();
857
858        // Wait longer than the zombie timeout.
859        thread::sleep(Duration::from_secs(2));
860
861        // Reap zombies.
862        let reaped = scheduler.reap_zombies();
863        assert!(reaped.contains(&id));
864
865        // Task should no longer be running.
866        assert!(scheduler.running.lock().get(&id).is_none());
867    }
868
869    #[test]
870    fn test_reap_zombies_no_zombies() {
871        let scheduler = AgentScheduler::new(2, 10_000, 60); // Long timeout.
872
873        let id = scheduler
874            .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
875            .unwrap();
876        let _ = scheduler.next_task();
877
878        // No sleep, so no zombies yet.
879        let reaped = scheduler.reap_zombies();
880        assert!(reaped.is_empty());
881
882        // Task still running.
883        assert!(scheduler.running.lock().get(&id).is_some());
884    }
885
886    #[test]
887    fn test_rate_limiter_basic() {
888        let mut limiter = RateLimiter::new(60, 3); // 3 requests per minute.
889
890        assert!(limiter.allow());
891        assert!(limiter.allow());
892        assert!(limiter.allow());
893        // 4th request should be blocked.
894        assert!(!limiter.allow());
895    }
896
897    #[test]
898    fn test_rate_limiter_remaining() {
899        let limiter = RateLimiter::new(60, 3);
900
901        assert_eq!(limiter.remaining(), 3);
902
903        let mut limiter = RateLimiter::new(60, 3);
904        limiter.allow();
905        limiter.allow();
906        assert_eq!(limiter.remaining(), 1);
907    }
908
909    #[test]
910    fn test_rate_limiter_tracks_per_scheduler() {
911        let scheduler = AgentScheduler::new(10, 5, 60); // Only 5 requests allowed, high concurrency.
912
913        // Consume all rate limit by calling next_task (not submit).
914        for i in 0..5 {
915            scheduler
916                .submit(ScheduledTask::new(format!("T{}", i), Priority::Normal))
917                .unwrap();
918            let _ = scheduler.next_task();
919        }
920
921        // Should be rate limited.
922        assert!(scheduler.next_task().is_none());
923        assert_eq!(scheduler.rate_limit_remaining(), 0);
924    }
925
926    #[test]
927    fn test_queued_tasks_inspection() {
928        let scheduler = AgentScheduler::new(2, 10_000, 60);
929
930        scheduler
931            .submit(ScheduledTask::new("A".into(), Priority::Low))
932            .unwrap();
933        scheduler
934            .submit(ScheduledTask::new("B".into(), Priority::High))
935            .unwrap();
936        scheduler
937            .submit(ScheduledTask::new("C".into(), Priority::Normal))
938            .unwrap();
939
940        let queued = scheduler.queued_tasks();
941        assert_eq!(queued.len(), 3);
942        // Order is by priority (highest at back for pop).
943        // High should be at the back.
944        assert_eq!(queued.last().unwrap().description, "B");
945    }
946
947    #[test]
948    fn test_running_tasks_inspection() {
949        let scheduler = AgentScheduler::new(2, 10_000, 60);
950
951        scheduler
952            .submit(ScheduledTask::new("R1".into(), Priority::Normal))
953            .unwrap();
954        scheduler
955            .submit(ScheduledTask::new("R2".into(), Priority::Normal))
956            .unwrap();
957
958        let _ = scheduler.next_task();
959        let _ = scheduler.next_task();
960
961        let running = scheduler.running_tasks();
962        assert_eq!(running.len(), 2);
963    }
964
965    #[test]
966    fn test_default_scheduler() {
967        let scheduler = AgentScheduler::default();
968        let stats = scheduler.stats();
969        assert_eq!(stats.max_concurrent, 5);
970        assert_eq!(stats.rate_limit_per_minute, 60);
971    }
972
973    #[test]
974    fn test_budget_manager_integration_skips_exhausted_agent() {
975        use crate::budget::{BudgetLimit, BudgetManager};
976
977        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
978        let budget_manager = Arc::new(BudgetManager::new());
979
980        // Set a very low budget (1 call).
981        let agent_id = AgentId::new_v4();
982        budget_manager.set_budget(BudgetLimit {
983            agent_id,
984            token_budget: 1000,
985            calls_budget: 1,
986            window_secs: 60,
987        });
988
989        // Attach budget manager to scheduler.
990        scheduler
991            .lock()
992            .set_budget_manager(Arc::clone(&budget_manager));
993
994        // Submit two tasks for the same agent.
995        scheduler
996            .lock()
997            .submit(ScheduledTask::for_agent(
998                agent_id,
999                "Task 1".into(),
1000                Priority::Normal,
1001            ))
1002            .unwrap();
1003        scheduler
1004            .lock()
1005            .submit(ScheduledTask::for_agent(
1006                agent_id,
1007                "Task 2".into(),
1008                Priority::Normal,
1009            ))
1010            .unwrap();
1011
1012        // First task should run (track_call succeeds).
1013        let task1 = scheduler.lock().next_task();
1014        assert!(task1.is_some());
1015        scheduler.lock().complete_task(task1.unwrap().id).unwrap();
1016
1017        // Second task should be skipped (budget exhausted).
1018        let task2 = scheduler.lock().next_task();
1019        assert!(task2.is_none());
1020    }
1021
1022    #[test]
1023    fn test_budget_manager_allows_different_agents() {
1024        use crate::budget::{BudgetLimit, BudgetManager};
1025
1026        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1027        let budget_manager = Arc::new(BudgetManager::new());
1028
1029        let agent1 = AgentId::new_v4();
1030        let agent2 = AgentId::new_v4();
1031
1032        // Set budget for both agents (3 calls each).
1033        for agent_id in [&agent1, &agent2] {
1034            budget_manager.set_budget(BudgetLimit {
1035                agent_id: *agent_id,
1036                token_budget: 1000,
1037                calls_budget: 3,
1038                window_secs: 60,
1039            });
1040        }
1041
1042        scheduler
1043            .lock()
1044            .set_budget_manager(Arc::clone(&budget_manager));
1045
1046        // Submit tasks for both agents.
1047        scheduler
1048            .lock()
1049            .submit(ScheduledTask::for_agent(
1050                agent1,
1051                "A1".into(),
1052                Priority::Normal,
1053            ))
1054            .unwrap();
1055        scheduler
1056            .lock()
1057            .submit(ScheduledTask::for_agent(
1058                agent2,
1059                "B1".into(),
1060                Priority::Normal,
1061            ))
1062            .unwrap();
1063
1064        // Both should run.
1065        let t1 = scheduler.lock().next_task().unwrap();
1066        let t2 = scheduler.lock().next_task().unwrap();
1067        assert_ne!(t1.description, t2.description);
1068    }
1069
1070    #[test]
1071    fn test_budget_manager_task_without_agent_id() {
1072        use crate::budget::{BudgetLimit, BudgetManager};
1073
1074        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1075        let budget_manager = Arc::new(BudgetManager::new());
1076
1077        scheduler
1078            .lock()
1079            .set_budget_manager(Arc::clone(&budget_manager));
1080
1081        // Submit a task without an agent ID.
1082        scheduler
1083            .lock()
1084            .submit(ScheduledTask::new("No agent".into(), Priority::Normal))
1085            .unwrap();
1086
1087        // Should still run (no budget check for tasks without agent).
1088        let task = scheduler.lock().next_task();
1089        assert!(task.is_some());
1090    }
1091
1092    #[test]
1093    fn test_budget_manager_not_set_skips_check() {
1094        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1095        // No budget manager attached.
1096
1097        scheduler
1098            .lock()
1099            .submit(ScheduledTask::new("Any task".into(), Priority::Normal))
1100            .unwrap();
1101
1102        // Should run normally without budget manager.
1103        let task = scheduler.lock().next_task();
1104        assert!(task.is_some());
1105    }
1106}