Skip to main content

oxios_kernel/
scheduler.rs

1//! Agent Scheduler — priority-based task queue inspired by AIOS / AgentRM.
2//!
3//! Manages agent task scheduling with:
4//! - Priority queue (FIFO within same priority)
5//! - Rate-limit-aware admission control
6//! - Zombie task detection and reaping
7//! - Maximum concurrent task enforcement
8
9use crate::budget::BudgetManager;
10use crate::types::AgentId;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use std::collections::{BinaryHeap, HashMap};
16use std::sync::Arc;
17use uuid::Uuid;
18
19/// Priority levels for scheduled tasks.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
21pub enum Priority {
22    /// Low priority, good for background work.
23    Low = 0,
24    /// Normal priority, default for most tasks.
25    #[default]
26    Normal = 1,
27    /// High priority, important tasks.
28    High = 2,
29    /// Critical priority, must execute immediately.
30    Critical = 3,
31}
32
33/// Status of a scheduled task.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum TaskStatus {
36    /// Task is queued, waiting for execution.
37    Queued,
38    /// Task is currently running.
39    Running,
40    /// Task completed successfully.
41    Completed,
42    /// Task failed with an error.
43    Failed,
44    /// Task was cancelled before execution.
45    Cancelled,
46}
47
48/// A scheduled task for an agent.
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ScheduledTask {
51    /// Unique task identifier.
52    pub id: Uuid,
53    /// Associated agent ID, if any.
54    pub agent_id: Option<AgentId>,
55    /// Human-readable task description.
56    pub description: String,
57    /// Task priority level.
58    pub priority: Priority,
59    /// When the task was created.
60    pub created_at: DateTime<Utc>,
61    /// Current status of the task.
62    pub status: TaskStatus,
63    /// Error message if the task failed.
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub error: Option<String>,
66}
67
68impl PartialOrd for ScheduledTask {
69    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
70        Some(self.cmp(other))
71    }
72}
73
74impl Ord for ScheduledTask {
75    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
76        // Higher priority first; within same priority, newer tasks first (LIFO)
77        // so that BinaryHeap::pop() returns the highest-priority, newest task.
78        self.priority
79            .cmp(&other.priority)
80            .then_with(|| other.created_at.cmp(&self.created_at))
81    }
82}
83
84impl ScheduledTask {
85    /// Creates a new scheduled task.
86    pub fn new(description: String, priority: Priority) -> Self {
87        Self {
88            id: Uuid::new_v4(),
89            agent_id: None,
90            description,
91            priority,
92            created_at: Utc::now(),
93            status: TaskStatus::Queued,
94            error: None,
95        }
96    }
97
98    /// Creates a task associated with a specific agent.
99    pub fn for_agent(agent_id: AgentId, description: String, priority: Priority) -> Self {
100        Self {
101            id: Uuid::new_v4(),
102            agent_id: Some(agent_id),
103            description,
104            priority,
105            created_at: Utc::now(),
106            status: TaskStatus::Queued,
107            error: None,
108        }
109    }
110}
111
112/// Statistics about the scheduler state.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct SchedulerStats {
115    /// Number of queued tasks.
116    pub queued: usize,
117    /// Number of currently running tasks.
118    pub running: usize,
119    /// Number of completed tasks.
120    pub completed: usize,
121    /// Number of failed tasks.
122    pub failed: usize,
123    /// Maximum allowed concurrent tasks.
124    pub max_concurrent: usize,
125    /// Rate limit (requests per minute).
126    pub rate_limit_per_minute: u32,
127}
128
129impl Default for SchedulerStats {
130    fn default() -> Self {
131        Self {
132            queued: 0,
133            running: 0,
134            completed: 0,
135            failed: 0,
136            max_concurrent: 5,
137            rate_limit_per_minute: 60,
138        }
139    }
140}
141
142/// Rate limiter state for tracking API call rates.
143#[derive(Debug, Clone)]
144struct RateLimiter {
145    /// Timestamps of recent requests.
146    window: Vec<DateTime<Utc>>,
147    /// Window duration in seconds.
148    window_secs: u64,
149    /// Maximum requests per window.
150    max_requests: u32,
151}
152
153impl RateLimiter {
154    fn new(window_secs: u64, max_requests: u32) -> Self {
155        Self {
156            window: Vec::new(),
157            window_secs,
158            max_requests,
159        }
160    }
161
162    /// Check if a new request is allowed under rate limits.
163    fn allow(&mut self) -> bool {
164        let now = Utc::now();
165        let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
166
167        // Prune old entries.
168        self.window.retain(|t| *t > cutoff);
169
170        if self.window.len() >= self.max_requests as usize {
171            return false;
172        }
173
174        self.window.push(now);
175        true
176    }
177
178    /// Get remaining capacity in the current window.
179    fn remaining(&self) -> u32 {
180        let now = Utc::now();
181        let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
182        let active = self.window.iter().filter(|t| **t > cutoff).count();
183        self.max_requests.saturating_sub(active as u32)
184    }
185}
186
187/// The agent scheduler.
188///
189/// Manages task queues, rate limiting, zombie detection, and priority scheduling.
190/// This is the central coordinator for all agent task execution.
191pub struct AgentScheduler {
192    /// The task queue (priority max-heap).
193    queue: Arc<Mutex<BinaryHeap<ScheduledTask>>>,
194    /// Currently running tasks.
195    running: Arc<Mutex<HashMap<Uuid, ScheduledTask>>>,
196    /// Maximum concurrent tasks allowed.
197    max_concurrent: usize,
198    /// Rate limiter for LLM API calls.
199    rate_limiter: Arc<Mutex<RateLimiter>>,
200    /// Timeout for zombie detection (seconds).
201    zombie_timeout_secs: u64,
202    /// Track when each running task started (for zombie detection).
203    task_start_times: Arc<Mutex<HashMap<Uuid, DateTime<Utc>>>>,
204    /// Optional budget manager for scheduling checks.
205    budget_manager: Option<Arc<BudgetManager>>,
206}
207
208impl AgentScheduler {
209    /// Creates a new scheduler.
210    ///
211    /// # Arguments
212    /// * `max_concurrent` - Maximum number of tasks that can run simultaneously
213    /// * `rate_limit_per_minute` - Maximum LLM API calls per minute
214    /// * `zombie_timeout_secs` - How long before a running task is considered a zombie
215    pub fn new(
216        max_concurrent: usize,
217        rate_limit_per_minute: u32,
218        zombie_timeout_secs: u64,
219    ) -> Self {
220        Self {
221            queue: Arc::new(Mutex::new(BinaryHeap::new())),
222            running: Arc::new(Mutex::new(HashMap::new())),
223            max_concurrent,
224            rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60, rate_limit_per_minute))),
225            zombie_timeout_secs,
226            task_start_times: Arc::new(Mutex::new(HashMap::new())),
227            budget_manager: None,
228        }
229    }
230
231    /// Attaches a budget manager for scheduling checks.
232    ///
233    /// When a budget manager is set, the scheduler will:
234    /// - Check `can_schedule()` before starting a task (soft gate)
235    /// - Track calls via `track_call()` when a task begins
236    ///
237    /// If no budget manager is set, tasks proceed normally.
238    pub fn set_budget_manager(&mut self, bm: Arc<BudgetManager>) {
239        self.budget_manager = Some(bm);
240    }
241
242    /// Submits a task to the scheduler queue.
243    ///
244    /// Returns the task ID on success.
245    pub fn submit(&self, mut task: ScheduledTask) -> Result<Uuid> {
246        task.status = TaskStatus::Queued;
247        let id = task.id;
248
249        let mut queue = self.queue.lock();
250        queue.push(task); // O(log N) — BinaryHeap maintains heap property
251
252        tracing::debug!(
253            task_id = %id,
254            queue_len = queue.len(),
255            "Task submitted to scheduler"
256        );
257
258        Ok(id)
259    }
260
261    /// Gets the next task ready for execution.
262    ///
263    /// Returns `None` if:
264    /// - The queue is empty
265    /// - Max concurrent limit is reached
266    /// - Rate limit is exceeded
267    pub fn next_task(&self) -> Option<ScheduledTask> {
268        // Check if we can start a new task.
269        {
270            let running = self.running.lock();
271            if running.len() >= self.max_concurrent {
272                tracing::debug!(
273                    running = running.len(),
274                    max = self.max_concurrent,
275                    "Max concurrent limit reached"
276                );
277                return None;
278            }
279        }
280
281        // Check rate limit.
282        {
283            let mut limiter = self.rate_limiter.lock();
284            if !limiter.allow() {
285                tracing::debug!(remaining = limiter.remaining(), "Rate limit exceeded");
286                return None;
287            }
288        }
289
290        // Pop tasks iteratively, skipping agents with exhausted budgets.
291        let mut discarded: usize = 0;
292        let mut task = loop {
293            let task_opt = {
294                let mut queue = self.queue.lock();
295                queue.pop() // O(log N) — BinaryHeap returns max-priority task
296            };
297
298            match task_opt {
299                Some(t) => {
300                    // Check budget before scheduling (soft gate).
301                    if let (Some(ref bm), Some(ref agent_id)) = (&self.budget_manager, &t.agent_id)
302                    {
303                        if !bm.can_schedule(agent_id) {
304                            tracing::warn!(
305                                agent_id = %agent_id,
306                                "Agent budget exhausted, skipping task"
307                            );
308                            discarded += 1;
309                            continue; // skip, try next task
310                        }
311                    }
312                    break t;
313                }
314                None => {
315                    if discarded > 0 {
316                        tracing::info!(discarded, "All queued tasks had exhausted budgets");
317                    }
318                    return None;
319                }
320            }
321        };
322
323        if discarded > 0 {
324            tracing::info!(discarded, "Skipped tasks with exhausted budgets");
325        }
326
327        task.status = TaskStatus::Running;
328
329        // Track start time for zombie detection.
330        {
331            let mut start_times = self.task_start_times.lock();
332            start_times.insert(task.id, Utc::now());
333        }
334
335        // Add to running map.
336        {
337            let mut running = self.running.lock();
338            running.insert(task.id, task.clone());
339        }
340
341        tracing::info!(
342            task_id = %task.id,
343            priority = ?task.priority,
344            running = self.running.lock().len(),
345            "Task started by scheduler"
346        );
347
348        // Track call for budget management.
349        if let (Some(ref bm), Some(ref agent_id)) = (&self.budget_manager, &task.agent_id) {
350            if let Err(e) = bm.track_call(agent_id) {
351                tracing::warn!(
352                    agent_id = %agent_id,
353                    error = %e,
354                    "Budget exceeded during task track_call"
355                );
356            }
357        }
358
359        Some(task)
360    }
361
362    /// Marks a task as completed.
363    ///
364    /// Removes the task from the running map.
365    pub fn complete_task(&self, task_id: Uuid) -> Result<()> {
366        let task = {
367            let mut running = self.running.lock();
368            running.remove(&task_id)
369        };
370
371        match task {
372            Some(mut t) => {
373                t.status = TaskStatus::Completed;
374
375                // Clean up start time tracking.
376                {
377                    let mut start_times = self.task_start_times.lock();
378                    start_times.remove(&task_id);
379                }
380
381                tracing::info!(task_id = %task_id, "Task completed");
382                Ok(())
383            }
384            None => {
385                tracing::warn!(task_id = %task_id, "Attempted to complete unknown task");
386                Err(anyhow::anyhow!("task not found"))
387            }
388        }
389    }
390
391    /// Marks a task as failed with an error message.
392    ///
393    /// Removes the task from the running map.
394    pub fn fail_task(&self, task_id: Uuid, error: &str) -> Result<()> {
395        let task = {
396            let mut running = self.running.lock();
397            running.remove(&task_id)
398        };
399
400        match task {
401            Some(mut t) => {
402                t.status = TaskStatus::Failed;
403                t.error = Some(error.to_string());
404
405                // Clean up start time tracking.
406                {
407                    let mut start_times = self.task_start_times.lock();
408                    start_times.remove(&task_id);
409                }
410
411                tracing::warn!(task_id = %task_id, error = %error, "Task failed");
412                Ok(())
413            }
414            None => {
415                tracing::warn!(task_id = %task_id, "Attempted to fail unknown task");
416                Err(anyhow::anyhow!("task not found"))
417            }
418        }
419    }
420
421    /// Detects and reaps zombie tasks (running longer than the configured timeout).
422    ///
423    /// Returns the IDs of tasks that were reaped.
424    pub fn reap_zombies(&self) -> Vec<Uuid> {
425        let now = Utc::now();
426        let timeout = chrono::Duration::seconds(self.zombie_timeout_secs as i64);
427        let mut start_times = self.task_start_times.lock();
428        let mut running = self.running.lock();
429        let mut reaped = Vec::new();
430
431        let zombie_ids: Vec<Uuid> = start_times
432            .iter()
433            .filter(|(_, start)| now - **start > timeout)
434            .map(|(id, _)| *id)
435            .collect();
436
437        for id in zombie_ids {
438            if let Some(mut task) = running.remove(&id) {
439                task.status = TaskStatus::Failed;
440                task.error = Some(format!(
441                    "zombie: ran for >{} seconds",
442                    self.zombie_timeout_secs
443                ));
444                reaped.push(id);
445                tracing::warn!(
446                    task_id = %id,
447                    duration_secs = self.zombie_timeout_secs,
448                    "Zombie task reaped"
449                );
450            }
451            // Remove from start times.
452            start_times.remove(&id);
453        }
454
455        reaped
456    }
457
458    /// Marks a task as running by task ID.
459    ///
460    /// The task must be in the queue (status Queued). This is used by the
461    /// orchestrator to atomically claim a submitted task before execution.
462    pub fn start_task(&self, task_id: Uuid) -> Result<()> {
463        let task = {
464            let mut queue = self.queue.lock();
465            let all: Vec<ScheduledTask> = queue.drain().collect();
466            let mut found: Option<ScheduledTask> = None;
467            let remaining: Vec<ScheduledTask> = all
468                .into_iter()
469                .filter(|t| {
470                    if t.id == task_id {
471                        found = Some(t.clone());
472                        false
473                    } else {
474                        true
475                    }
476                })
477                .collect();
478            *queue = remaining.into_iter().collect();
479            found
480        };
481
482        match task {
483            Some(mut task) => {
484                task.status = TaskStatus::Running;
485                let mut start_times = self.task_start_times.lock();
486                start_times.insert(task.id, Utc::now());
487                let mut running = self.running.lock();
488                running.insert(task.id, task);
489                Ok(())
490            }
491            None => Err(anyhow::anyhow!("task {} not found in queue", task_id)),
492        }
493    }
494
495    /// Cancels a queued task by ID.
496    ///
497    /// Only works on tasks still in the queue (not yet running).
498    pub fn cancel_task(&self, task_id: Uuid) -> Result<()> {
499        let mut queue = self.queue.lock();
500        let all: Vec<ScheduledTask> = queue.drain().collect();
501        let mut found = false;
502        let remaining: Vec<ScheduledTask> = all
503            .into_iter()
504            .filter(|t| {
505                if t.id == task_id && t.status == TaskStatus::Queued {
506                    found = true;
507                    false
508                } else {
509                    true
510                }
511            })
512            .collect();
513        *queue = remaining.into_iter().collect();
514
515        if found {
516            tracing::info!(task_id = %task_id, "Task cancelled from queue");
517            Ok(())
518        } else {
519            tracing::warn!(task_id = %task_id, "Task not found in queue for cancellation");
520            Err(anyhow::anyhow!("task not found in queue"))
521        }
522    }
523
524    /// Returns the current scheduler statistics.
525    pub fn stats(&self) -> SchedulerStats {
526        let queue = self.queue.lock();
527        let running = self.running.lock();
528        let rate_limiter = self.rate_limiter.lock();
529
530        let _completed = 0usize;
531        let _failed = 0usize;
532
533        SchedulerStats {
534            queued: queue.len(),
535            running: running.len(),
536            completed: _completed,
537            failed: _failed,
538            max_concurrent: self.max_concurrent,
539            rate_limit_per_minute: rate_limiter.max_requests,
540        }
541    }
542
543    /// Returns remaining rate limit capacity.
544    pub fn rate_limit_remaining(&self) -> u32 {
545        self.rate_limiter.lock().remaining()
546    }
547
548    /// Returns all queued tasks (for debugging/monitoring).
549    pub fn queued_tasks(&self) -> Vec<ScheduledTask> {
550        let heap = self.queue.lock();
551        let mut tasks: Vec<ScheduledTask> = heap.iter().cloned().collect();
552        // Sort ascending by priority so highest priority is at the end
553        // (matches the original Vec-based behavior for test compatibility).
554        tasks.sort_by_key(|a| a.priority);
555        tasks
556    }
557
558    /// Returns all running tasks (for debugging/monitoring).
559    pub fn running_tasks(&self) -> Vec<ScheduledTask> {
560        self.running.lock().values().cloned().collect()
561    }
562}
563
564impl Default for AgentScheduler {
565    fn default() -> Self {
566        Self::new(5, 60, 300)
567    }
568}
569
570#[cfg(test)]
571mod tests {
572    use super::*;
573    use std::thread;
574    use std::time::Duration;
575
576    #[test]
577    fn test_task_creation() {
578        let task = ScheduledTask::new("Test task".into(), Priority::Normal);
579        assert_eq!(task.status, TaskStatus::Queued);
580        assert!(task.agent_id.is_none());
581        assert!(!task.error.is_some());
582    }
583
584    #[test]
585    fn test_task_creation_for_agent() {
586        let agent_id = AgentId::new_v4();
587        let task = ScheduledTask::for_agent(agent_id, "Agent task".into(), Priority::High);
588        assert_eq!(task.agent_id, Some(agent_id));
589        assert_eq!(task.priority, Priority::High);
590    }
591
592    #[test]
593    fn test_priority_ordering() {
594        assert!(Priority::Critical > Priority::High);
595        assert!(Priority::High > Priority::Normal);
596        assert!(Priority::Normal > Priority::Low);
597        // Check transitivity
598        assert!(Priority::Critical > Priority::Normal);
599        assert!(Priority::Critical > Priority::Low);
600        assert!(Priority::High > Priority::Low);
601    }
602
603    #[test]
604    fn test_priority_ordering_eq() {
605        assert_eq!(Priority::Low, Priority::Low);
606        assert_eq!(Priority::Normal, Priority::Normal);
607        assert_eq!(Priority::High, Priority::High);
608        assert_eq!(Priority::Critical, Priority::Critical);
609    }
610
611    #[test]
612    fn test_submit_and_next_high_priority_first() {
613        let scheduler = AgentScheduler::new(10, 10_000, 60);
614
615        scheduler
616            .submit(ScheduledTask::new("Low priority".into(), Priority::Low))
617            .unwrap();
618        scheduler
619            .submit(ScheduledTask::new("High priority".into(), Priority::High))
620            .unwrap();
621        scheduler
622            .submit(ScheduledTask::new(
623                "Normal priority".into(),
624                Priority::Normal,
625            ))
626            .unwrap();
627
628        // High priority should come first.
629        let next = scheduler.next_task().unwrap();
630        assert_eq!(next.priority, Priority::High);
631
632        // Normal next.
633        let next = scheduler.next_task().unwrap();
634        assert_eq!(next.priority, Priority::Normal);
635
636        // Low last.
637        let next = scheduler.next_task().unwrap();
638        assert_eq!(next.priority, Priority::Low);
639    }
640
641    #[test]
642    fn test_submit_and_next_critical_first() {
643        let scheduler = AgentScheduler::new(10, 10_000, 60);
644
645        scheduler
646            .submit(ScheduledTask::new("Low".into(), Priority::Low))
647            .unwrap();
648        scheduler
649            .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
650            .unwrap();
651        scheduler
652            .submit(ScheduledTask::new("High".into(), Priority::High))
653            .unwrap();
654        scheduler
655            .submit(ScheduledTask::new("Critical".into(), Priority::Critical))
656            .unwrap();
657
658        // Critical should be first.
659        let next = scheduler.next_task().unwrap();
660        assert_eq!(next.priority, Priority::Critical);
661        // Then High.
662        let next = scheduler.next_task().unwrap();
663        assert_eq!(next.priority, Priority::High);
664        // Then Normal.
665        let next = scheduler.next_task().unwrap();
666        assert_eq!(next.priority, Priority::Normal);
667        // Then Low.
668        let next = scheduler.next_task().unwrap();
669        assert_eq!(next.priority, Priority::Low);
670    }
671
672    #[test]
673    fn test_submit_multiple_same_priority() {
674        let scheduler = AgentScheduler::new(10, 10_000, 60);
675
676        // Multiple tasks at same priority — BinaryHeap does not guarantee
677        // FIFO/LIFO within the same priority level.
678        scheduler
679            .submit(ScheduledTask::new("First".into(), Priority::Normal))
680            .unwrap();
681        scheduler
682            .submit(ScheduledTask::new("Second".into(), Priority::Normal))
683            .unwrap();
684        scheduler
685            .submit(ScheduledTask::new("Third".into(), Priority::Normal))
686            .unwrap();
687
688        // All three should be popped with Normal priority; exact order is unspecified.
689        let mut descriptions = Vec::new();
690        for _ in 0..3 {
691            let next = scheduler.next_task().unwrap();
692            assert_eq!(next.priority, Priority::Normal);
693            descriptions.push(next.description);
694        }
695        descriptions.sort();
696        assert_eq!(descriptions, vec!["First", "Second", "Third"]);
697    }
698
699    #[test]
700    fn test_max_concurrent_blocks() {
701        let scheduler = AgentScheduler::new(2, 10_000, 60);
702
703        scheduler
704            .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
705            .unwrap();
706        scheduler
707            .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
708            .unwrap();
709        scheduler
710            .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
711            .unwrap();
712
713        assert!(scheduler.next_task().is_some());
714        assert!(scheduler.next_task().is_some());
715        // Third should be None due to max concurrent.
716        assert!(scheduler.next_task().is_none());
717    }
718
719    #[test]
720    fn test_max_concurrent_allows_when_slot_frees() {
721        let scheduler = AgentScheduler::new(2, 10_000, 60); // 2 max concurrent.
722
723        let _ = scheduler
724            .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
725            .unwrap();
726        let _id2 = scheduler
727            .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
728            .unwrap();
729        // Queue (insert at 0 prepends): [Task 2, Task 1]
730
731        // Start 2 tasks (fills max_concurrent).
732        let t1 = scheduler.next_task().unwrap(); // Task 2 (first popped).
733        let t2 = scheduler.next_task().unwrap(); // Task 1 (second popped).
734                                                 // Running: [Task 2, Task 1], Queue: []
735        assert!(scheduler.next_task().is_none()); // Blocked.
736
737        // Complete both tasks.
738        scheduler.complete_task(t1.id).unwrap();
739        scheduler.complete_task(t2.id).unwrap();
740
741        // Submit a new task now that slots have freed.
742        let _id3 = scheduler
743            .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
744            .unwrap();
745
746        // Now next_task should work.
747        let task = scheduler.next_task().unwrap();
748        assert_eq!(task.description, "Task 3");
749
750        // Clean up.
751        scheduler.complete_task(task.id).unwrap();
752    }
753
754    #[test]
755    fn test_complete_task_removes_from_running() {
756        let scheduler = AgentScheduler::new(2, 10_000, 60);
757        let task = ScheduledTask::new("Test".into(), Priority::Normal);
758        let id = scheduler.submit(task).unwrap();
759
760        let _ = scheduler.next_task();
761        scheduler.complete_task(id).unwrap();
762
763        let stats = scheduler.stats();
764        assert_eq!(stats.running, 0);
765    }
766
767    #[test]
768    fn test_complete_unknown_task_returns_error() {
769        let scheduler = AgentScheduler::new(2, 10_000, 60);
770        let result = scheduler.complete_task(Uuid::new_v4());
771        assert!(result.is_err());
772    }
773
774    #[test]
775    fn test_fail_task_sets_error() {
776        let scheduler = AgentScheduler::new(2, 10_000, 60);
777        let task = ScheduledTask::new("Test".into(), Priority::Normal);
778        let id = scheduler.submit(task).unwrap();
779
780        let _ = scheduler.next_task();
781        scheduler.fail_task(id, "Something went wrong").unwrap();
782
783        let running = scheduler.running.lock();
784        assert!(!running.contains_key(&id));
785    }
786
787    #[test]
788    fn test_cancel_queued_task() {
789        let scheduler = AgentScheduler::new(2, 10_000, 60);
790        let id = scheduler
791            .submit(ScheduledTask::new("To cancel".into(), Priority::Normal))
792            .unwrap();
793
794        scheduler.cancel_task(id).unwrap();
795
796        // Queue should be empty now.
797        assert!(scheduler.next_task().is_none());
798    }
799
800    #[test]
801    fn test_cancel_running_task_fails() {
802        let scheduler = AgentScheduler::new(2, 10_000, 60);
803        let id = scheduler
804            .submit(ScheduledTask::new("Running".into(), Priority::Normal))
805            .unwrap();
806
807        let _ = scheduler.next_task(); // Task is now running.
808
809        // Can't cancel a running task.
810        let result = scheduler.cancel_task(id);
811        assert!(result.is_err());
812    }
813
814    #[test]
815    fn test_cancel_unknown_task_fails() {
816        let scheduler = AgentScheduler::new(2, 10_000, 60);
817        let result = scheduler.cancel_task(Uuid::new_v4());
818        assert!(result.is_err());
819    }
820
821    #[test]
822    fn test_stats_tracking() {
823        let scheduler = AgentScheduler::new(2, 60, 60);
824
825        let id1 = scheduler
826            .submit(ScheduledTask::new("Queued".into(), Priority::Normal))
827            .unwrap();
828        scheduler
829            .submit(ScheduledTask::new("Queued 2".into(), Priority::Low))
830            .unwrap();
831
832        // Start one task.
833        let started = scheduler.next_task().unwrap();
834        assert_eq!(started.id, id1);
835
836        let stats = scheduler.stats();
837        assert_eq!(stats.queued, 1); // One still in queue.
838        assert_eq!(stats.running, 1);
839        assert_eq!(stats.max_concurrent, 2);
840        assert_eq!(stats.rate_limit_per_minute, 60);
841    }
842
843    #[test]
844    fn test_reap_zombies() {
845        // Create a scheduler with a very short zombie timeout.
846        let scheduler = AgentScheduler::new(2, 10_000, 1); // 1 second timeout.
847
848        // Submit and start a task.
849        let id = scheduler
850            .submit(ScheduledTask::new("Zombie".into(), Priority::Normal))
851            .unwrap();
852        let _ = scheduler.next_task();
853
854        // Wait longer than the zombie timeout.
855        thread::sleep(Duration::from_secs(2));
856
857        // Reap zombies.
858        let reaped = scheduler.reap_zombies();
859        assert!(reaped.contains(&id));
860
861        // Task should no longer be running.
862        assert!(scheduler.running.lock().get(&id).is_none());
863    }
864
865    #[test]
866    fn test_reap_zombies_no_zombies() {
867        let scheduler = AgentScheduler::new(2, 10_000, 60); // Long timeout.
868
869        let id = scheduler
870            .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
871            .unwrap();
872        let _ = scheduler.next_task();
873
874        // No sleep, so no zombies yet.
875        let reaped = scheduler.reap_zombies();
876        assert!(reaped.is_empty());
877
878        // Task still running.
879        assert!(scheduler.running.lock().get(&id).is_some());
880    }
881
882    #[test]
883    fn test_rate_limiter_basic() {
884        let mut limiter = RateLimiter::new(60, 3); // 3 requests per minute.
885
886        assert!(limiter.allow());
887        assert!(limiter.allow());
888        assert!(limiter.allow());
889        // 4th request should be blocked.
890        assert!(!limiter.allow());
891    }
892
893    #[test]
894    fn test_rate_limiter_remaining() {
895        let limiter = RateLimiter::new(60, 3);
896
897        assert_eq!(limiter.remaining(), 3);
898
899        let mut limiter = RateLimiter::new(60, 3);
900        limiter.allow();
901        limiter.allow();
902        assert_eq!(limiter.remaining(), 1);
903    }
904
905    #[test]
906    fn test_rate_limiter_tracks_per_scheduler() {
907        let scheduler = AgentScheduler::new(10, 5, 60); // Only 5 requests allowed, high concurrency.
908
909        // Consume all rate limit by calling next_task (not submit).
910        for i in 0..5 {
911            scheduler
912                .submit(ScheduledTask::new(format!("T{}", i), Priority::Normal))
913                .unwrap();
914            let _ = scheduler.next_task();
915        }
916
917        // Should be rate limited.
918        assert!(scheduler.next_task().is_none());
919        assert_eq!(scheduler.rate_limit_remaining(), 0);
920    }
921
922    #[test]
923    fn test_queued_tasks_inspection() {
924        let scheduler = AgentScheduler::new(2, 10_000, 60);
925
926        scheduler
927            .submit(ScheduledTask::new("A".into(), Priority::Low))
928            .unwrap();
929        scheduler
930            .submit(ScheduledTask::new("B".into(), Priority::High))
931            .unwrap();
932        scheduler
933            .submit(ScheduledTask::new("C".into(), Priority::Normal))
934            .unwrap();
935
936        let queued = scheduler.queued_tasks();
937        assert_eq!(queued.len(), 3);
938        // Order is by priority (highest at back for pop).
939        // High should be at the back.
940        assert_eq!(queued.last().unwrap().description, "B");
941    }
942
943    #[test]
944    fn test_running_tasks_inspection() {
945        let scheduler = AgentScheduler::new(2, 10_000, 60);
946
947        scheduler
948            .submit(ScheduledTask::new("R1".into(), Priority::Normal))
949            .unwrap();
950        scheduler
951            .submit(ScheduledTask::new("R2".into(), Priority::Normal))
952            .unwrap();
953
954        let _ = scheduler.next_task();
955        let _ = scheduler.next_task();
956
957        let running = scheduler.running_tasks();
958        assert_eq!(running.len(), 2);
959    }
960
961    #[test]
962    fn test_default_scheduler() {
963        let scheduler = AgentScheduler::default();
964        let stats = scheduler.stats();
965        assert_eq!(stats.max_concurrent, 5);
966        assert_eq!(stats.rate_limit_per_minute, 60);
967    }
968
969    #[test]
970    fn test_budget_manager_integration_skips_exhausted_agent() {
971        use crate::budget::{BudgetLimit, BudgetManager};
972
973        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
974        let budget_manager = Arc::new(BudgetManager::new());
975
976        // Set a very low budget (1 call).
977        let agent_id = AgentId::new_v4();
978        budget_manager.set_budget(BudgetLimit {
979            agent_id,
980            token_budget: 1000,
981            calls_budget: 1,
982            window_secs: 60,
983        });
984
985        // Attach budget manager to scheduler.
986        scheduler
987            .lock()
988            .set_budget_manager(Arc::clone(&budget_manager));
989
990        // Submit two tasks for the same agent.
991        scheduler
992            .lock()
993            .submit(ScheduledTask::for_agent(
994                agent_id,
995                "Task 1".into(),
996                Priority::Normal,
997            ))
998            .unwrap();
999        scheduler
1000            .lock()
1001            .submit(ScheduledTask::for_agent(
1002                agent_id,
1003                "Task 2".into(),
1004                Priority::Normal,
1005            ))
1006            .unwrap();
1007
1008        // First task should run (track_call succeeds).
1009        let task1 = scheduler.lock().next_task();
1010        assert!(task1.is_some());
1011        scheduler.lock().complete_task(task1.unwrap().id).unwrap();
1012
1013        // Second task should be skipped (budget exhausted).
1014        let task2 = scheduler.lock().next_task();
1015        assert!(task2.is_none());
1016    }
1017
1018    #[test]
1019    fn test_budget_manager_allows_different_agents() {
1020        use crate::budget::{BudgetLimit, BudgetManager};
1021
1022        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1023        let budget_manager = Arc::new(BudgetManager::new());
1024
1025        let agent1 = AgentId::new_v4();
1026        let agent2 = AgentId::new_v4();
1027
1028        // Set budget for both agents (3 calls each).
1029        for agent_id in [&agent1, &agent2] {
1030            budget_manager.set_budget(BudgetLimit {
1031                agent_id: *agent_id,
1032                token_budget: 1000,
1033                calls_budget: 3,
1034                window_secs: 60,
1035            });
1036        }
1037
1038        scheduler
1039            .lock()
1040            .set_budget_manager(Arc::clone(&budget_manager));
1041
1042        // Submit tasks for both agents.
1043        scheduler
1044            .lock()
1045            .submit(ScheduledTask::for_agent(
1046                agent1,
1047                "A1".into(),
1048                Priority::Normal,
1049            ))
1050            .unwrap();
1051        scheduler
1052            .lock()
1053            .submit(ScheduledTask::for_agent(
1054                agent2,
1055                "B1".into(),
1056                Priority::Normal,
1057            ))
1058            .unwrap();
1059
1060        // Both should run.
1061        let t1 = scheduler.lock().next_task().unwrap();
1062        let t2 = scheduler.lock().next_task().unwrap();
1063        assert_ne!(t1.description, t2.description);
1064    }
1065
1066    #[test]
1067    fn test_budget_manager_task_without_agent_id() {
1068        use crate::budget::{BudgetLimit, BudgetManager};
1069
1070        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1071        let budget_manager = Arc::new(BudgetManager::new());
1072
1073        scheduler
1074            .lock()
1075            .set_budget_manager(Arc::clone(&budget_manager));
1076
1077        // Submit a task without an agent ID.
1078        scheduler
1079            .lock()
1080            .submit(ScheduledTask::new("No agent".into(), Priority::Normal))
1081            .unwrap();
1082
1083        // Should still run (no budget check for tasks without agent).
1084        let task = scheduler.lock().next_task();
1085        assert!(task.is_some());
1086    }
1087
1088    #[test]
1089    fn test_budget_manager_not_set_skips_check() {
1090        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1091        // No budget manager attached.
1092
1093        scheduler
1094            .lock()
1095            .submit(ScheduledTask::new("Any task".into(), Priority::Normal))
1096            .unwrap();
1097
1098        // Should run normally without budget manager.
1099        let task = scheduler.lock().next_task();
1100        assert!(task.is_some());
1101    }
1102}