Skip to main content

oxios_kernel/
scheduler.rs

1//! Agent Scheduler — priority-based task queue inspired by AIOS / AgentRM.
2//!
3//! Manages agent task scheduling with:
4//! - Priority queue (FIFO within same priority)
5//! - Rate-limit-aware admission control
6//! - Zombie task detection and reaping
7//! - Maximum concurrent task enforcement
8
9use crate::budget::BudgetManager;
10use crate::types::AgentId;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use std::collections::{BinaryHeap, HashMap};
16use std::sync::Arc;
17use uuid::Uuid;
18
19/// Priority levels for scheduled tasks.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
21pub enum Priority {
22    /// Low priority, good for background work.
23    Low = 0,
24    /// Normal priority, default for most tasks.
25    #[default]
26    Normal = 1,
27    /// High priority, important tasks.
28    High = 2,
29    /// Critical priority, must execute immediately.
30    Critical = 3,
31}
32
33/// Status of a scheduled task.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum TaskStatus {
36    /// Task is queued, waiting for execution.
37    Queued,
38    /// Task is currently running.
39    Running,
40    /// Task completed successfully.
41    Completed,
42    /// Task failed with an error.
43    Failed,
44    /// Task was cancelled before execution.
45    Cancelled,
46}
47
48/// A scheduled task for an agent.
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ScheduledTask {
51    /// Unique task identifier.
52    pub id: Uuid,
53    /// Associated agent ID, if any.
54    pub agent_id: Option<AgentId>,
55    /// Human-readable task description.
56    pub description: String,
57    /// Task priority level.
58    pub priority: Priority,
59    /// When the task was created.
60    pub created_at: DateTime<Utc>,
61    /// Current status of the task.
62    pub status: TaskStatus,
63    /// Error message if the task failed.
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub error: Option<String>,
66}
67
68impl PartialOrd for ScheduledTask {
69    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
70        Some(self.cmp(other))
71    }
72}
73
74impl Ord for ScheduledTask {
75    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
76        // Higher priority first; within same priority, newer tasks first (LIFO)
77        // so that BinaryHeap::pop() returns the highest-priority, newest task.
78        self.priority
79            .cmp(&other.priority)
80            .then_with(|| other.created_at.cmp(&self.created_at))
81    }
82}
83
84impl ScheduledTask {
85    /// Creates a new scheduled task.
86    pub fn new(description: String, priority: Priority) -> Self {
87        Self {
88            id: Uuid::new_v4(),
89            agent_id: None,
90            description,
91            priority,
92            created_at: Utc::now(),
93            status: TaskStatus::Queued,
94            error: None,
95        }
96    }
97
98    /// Creates a task associated with a specific agent.
99    pub fn for_agent(agent_id: AgentId, description: String, priority: Priority) -> Self {
100        Self {
101            id: Uuid::new_v4(),
102            agent_id: Some(agent_id),
103            description,
104            priority,
105            created_at: Utc::now(),
106            status: TaskStatus::Queued,
107            error: None,
108        }
109    }
110}
111
112/// Statistics about the scheduler state.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct SchedulerStats {
115    /// Number of queued tasks.
116    pub queued: usize,
117    /// Number of currently running tasks.
118    pub running: usize,
119    /// Number of completed tasks.
120    pub completed: usize,
121    /// Number of failed tasks.
122    pub failed: usize,
123    /// Maximum allowed concurrent tasks.
124    pub max_concurrent: usize,
125    /// Rate limit (requests per minute).
126    pub rate_limit_per_minute: u32,
127    /// Remaining rate limit capacity.
128    pub rate_remaining: u32,
129}
130
131impl Default for SchedulerStats {
132    fn default() -> Self {
133        Self {
134            queued: 0,
135            running: 0,
136            completed: 0,
137            failed: 0,
138            max_concurrent: 5,
139            rate_limit_per_minute: 60,
140            rate_remaining: 60,
141        }
142    }
143}
144
145/// Rate limiter state for tracking API call rates.
146#[derive(Debug, Clone)]
147struct RateLimiter {
148    /// Timestamps of recent requests.
149    window: Vec<DateTime<Utc>>,
150    /// Window duration in seconds.
151    window_secs: u64,
152    /// Maximum requests per window.
153    max_requests: u32,
154}
155
156impl RateLimiter {
157    fn new(window_secs: u64, max_requests: u32) -> Self {
158        Self {
159            window: Vec::new(),
160            window_secs,
161            max_requests,
162        }
163    }
164
165    /// Check if a new request is allowed under rate limits.
166    fn allow(&mut self) -> bool {
167        let now = Utc::now();
168        let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
169
170        // Prune old entries.
171        self.window.retain(|t| *t > cutoff);
172
173        if self.window.len() >= self.max_requests as usize {
174            return false;
175        }
176
177        self.window.push(now);
178        true
179    }
180
181    /// Get remaining capacity in the current window.
182    fn remaining(&self) -> u32 {
183        let now = Utc::now();
184        let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
185        let active = self.window.iter().filter(|t| **t > cutoff).count();
186        self.max_requests.saturating_sub(active as u32)
187    }
188}
189
190/// The agent scheduler.
191///
192/// Manages task queues, rate limiting, zombie detection, and priority scheduling.
193/// This is the central coordinator for all agent task execution.
194pub struct AgentScheduler {
195    /// The task queue (priority max-heap).
196    queue: Arc<Mutex<BinaryHeap<ScheduledTask>>>,
197    /// Currently running tasks.
198    running: Arc<Mutex<HashMap<Uuid, ScheduledTask>>>,
199    /// Maximum concurrent tasks allowed.
200    max_concurrent: std::sync::atomic::AtomicUsize,
201    /// Rate limiter for LLM API calls.
202    rate_limiter: Arc<Mutex<RateLimiter>>,
203    /// Timeout for zombie detection (seconds).
204    zombie_timeout_secs: std::sync::atomic::AtomicU64,
205    /// Track when each running task started (for zombie detection).
206    task_start_times: Arc<Mutex<HashMap<Uuid, DateTime<Utc>>>>,
207    /// Optional budget manager for scheduling checks.
208    budget_manager: Option<Arc<BudgetManager>>,
209}
210
211impl AgentScheduler {
212    /// Creates a new scheduler.
213    ///
214    /// # Arguments
215    /// * `max_concurrent` - Maximum number of tasks that can run simultaneously
216    /// * `rate_limit_per_minute` - Maximum LLM API calls per minute
217    /// * `zombie_timeout_secs` - How long before a running task is considered a zombie
218    pub fn new(
219        max_concurrent: usize,
220        rate_limit_per_minute: u32,
221        zombie_timeout_secs: u64,
222    ) -> Self {
223        Self {
224            queue: Arc::new(Mutex::new(BinaryHeap::new())),
225            running: Arc::new(Mutex::new(HashMap::new())),
226            max_concurrent: std::sync::atomic::AtomicUsize::new(max_concurrent),
227            rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60, rate_limit_per_minute))),
228            zombie_timeout_secs: std::sync::atomic::AtomicU64::new(zombie_timeout_secs),
229            task_start_times: Arc::new(Mutex::new(HashMap::new())),
230            budget_manager: None,
231        }
232    }
233
234    /// Attaches a budget manager for scheduling checks.
235    ///
236    /// When a budget manager is set, the scheduler will:
237    /// - Check `can_schedule()` before starting a task (soft gate)
238    /// - Track calls via `track_call()` when a task begins
239    ///
240    /// If no budget manager is set, tasks proceed normally.
241    pub fn set_budget_manager(&mut self, bm: Arc<BudgetManager>) {
242        self.budget_manager = Some(bm);
243    }
244
245    /// Hot-reload scheduler config without restart.
246    ///
247    /// Updates concurrency limit, rate limit, and zombie timeout.
248    /// Takes effect on the next `next_task()` / `reap_zombies()` call.
249    pub fn update_config(
250        &self,
251        max_concurrent: usize,
252        rate_limit_per_minute: u32,
253        zombie_timeout_secs: u64,
254    ) {
255        {
256            let mut limiter = self.rate_limiter.lock();
257            *limiter = RateLimiter::new(60, rate_limit_per_minute);
258        }
259        self.max_concurrent
260            .store(max_concurrent, std::sync::atomic::Ordering::Relaxed);
261        self.zombie_timeout_secs
262            .store(zombie_timeout_secs, std::sync::atomic::Ordering::Relaxed);
263        tracing::info!(
264            max_concurrent,
265            rate_limit_per_minute,
266            zombie_timeout_secs,
267            "Scheduler config hot-reloaded"
268        );
269    }
270
271    /// Submits a task to the scheduler queue.
272    ///
273    /// Returns the task ID on success.
274    pub fn submit(&self, mut task: ScheduledTask) -> Result<Uuid> {
275        task.status = TaskStatus::Queued;
276        let id = task.id;
277
278        let mut queue = self.queue.lock();
279        queue.push(task); // O(log N) — BinaryHeap maintains heap property
280
281        tracing::debug!(
282            task_id = %id,
283            queue_len = queue.len(),
284            "Task submitted to scheduler"
285        );
286
287        Ok(id)
288    }
289
290    /// Gets the next task ready for execution.
291    ///
292    /// Returns `None` if:
293    /// - The queue is empty
294    /// - Max concurrent limit is reached
295    /// - Rate limit is exceeded
296    pub fn next_task(&self) -> Option<ScheduledTask> {
297        // Check if we can start a new task.
298        {
299            let running = self.running.lock();
300            if running.len()
301                >= self
302                    .max_concurrent
303                    .load(std::sync::atomic::Ordering::Relaxed)
304            {
305                tracing::debug!(
306                    running = running.len(),
307                    max = self
308                        .max_concurrent
309                        .load(std::sync::atomic::Ordering::Relaxed),
310                    "Max concurrent limit reached"
311                );
312                return None;
313            }
314        }
315
316        // Check rate limit.
317        {
318            let mut limiter = self.rate_limiter.lock();
319            if !limiter.allow() {
320                tracing::debug!(remaining = limiter.remaining(), "Rate limit exceeded");
321                return None;
322            }
323        }
324
325        // Pop tasks iteratively, skipping agents with exhausted budgets.
326        let mut discarded: usize = 0;
327        let mut task = loop {
328            let task_opt = {
329                let mut queue = self.queue.lock();
330                queue.pop() // O(log N) — BinaryHeap returns max-priority task
331            };
332
333            match task_opt {
334                Some(t) => {
335                    // Check budget before scheduling (soft gate).
336                    if let (Some(bm), Some(agent_id)) = (&self.budget_manager, &t.agent_id)
337                        && !bm.can_schedule(agent_id)
338                    {
339                        tracing::warn!(
340                            agent_id = %agent_id,
341                            "Agent budget exhausted, skipping task"
342                        );
343                        discarded += 1;
344                        continue; // skip, try next task
345                    }
346                    break t;
347                }
348                None => {
349                    if discarded > 0 {
350                        tracing::info!(discarded, "All queued tasks had exhausted budgets");
351                    }
352                    return None;
353                }
354            }
355        };
356
357        if discarded > 0 {
358            tracing::info!(discarded, "Skipped tasks with exhausted budgets");
359        }
360
361        task.status = TaskStatus::Running;
362
363        // Track start time for zombie detection.
364        {
365            let mut start_times = self.task_start_times.lock();
366            start_times.insert(task.id, Utc::now());
367        }
368
369        // Add to running map.
370        {
371            let mut running = self.running.lock();
372            running.insert(task.id, task.clone());
373        }
374
375        tracing::info!(
376            task_id = %task.id,
377            priority = ?task.priority,
378            running = self.running.lock().len(),
379            "Task started by scheduler"
380        );
381
382        // Track call for budget management.
383        if let (Some(bm), Some(agent_id)) = (&self.budget_manager, &task.agent_id)
384            && let Err(e) = bm.track_call(agent_id)
385        {
386            tracing::warn!(
387                agent_id = %agent_id,
388                error = %e,
389                "Budget exceeded during task track_call"
390            );
391        }
392
393        Some(task)
394    }
395
396    /// Marks a task as completed.
397    ///
398    /// Removes the task from the running map.
399    pub fn complete_task(&self, task_id: Uuid) -> Result<()> {
400        let task = {
401            let mut running = self.running.lock();
402            running.remove(&task_id)
403        };
404
405        match task {
406            Some(mut t) => {
407                t.status = TaskStatus::Completed;
408
409                // Clean up start time tracking.
410                {
411                    let mut start_times = self.task_start_times.lock();
412                    start_times.remove(&task_id);
413                }
414
415                tracing::info!(task_id = %task_id, "Task completed");
416                Ok(())
417            }
418            None => {
419                tracing::warn!(task_id = %task_id, "Attempted to complete unknown task");
420                Err(anyhow::anyhow!("task not found"))
421            }
422        }
423    }
424
425    /// Marks a task as failed with an error message.
426    ///
427    /// Removes the task from the running map.
428    pub fn fail_task(&self, task_id: Uuid, error: &str) -> Result<()> {
429        let task = {
430            let mut running = self.running.lock();
431            running.remove(&task_id)
432        };
433
434        match task {
435            Some(mut t) => {
436                t.status = TaskStatus::Failed;
437                t.error = Some(error.to_string());
438
439                // Clean up start time tracking.
440                {
441                    let mut start_times = self.task_start_times.lock();
442                    start_times.remove(&task_id);
443                }
444
445                tracing::warn!(task_id = %task_id, error = %error, "Task failed");
446                Ok(())
447            }
448            None => {
449                tracing::warn!(task_id = %task_id, "Attempted to fail unknown task");
450                Err(anyhow::anyhow!("task not found"))
451            }
452        }
453    }
454
455    /// Detects and reaps zombie tasks (running longer than the configured timeout).
456    ///
457    /// Returns the IDs of tasks that were reaped.
458    pub fn reap_zombies(&self) -> Vec<Uuid> {
459        let now = Utc::now();
460        let timeout = chrono::Duration::seconds(
461            self.zombie_timeout_secs
462                .load(std::sync::atomic::Ordering::Relaxed) as i64,
463        );
464        let mut start_times = self.task_start_times.lock();
465        let mut running = self.running.lock();
466        let mut reaped = Vec::new();
467
468        let zombie_ids: Vec<Uuid> = start_times
469            .iter()
470            .filter(|(_, start)| now - **start > timeout)
471            .map(|(id, _)| *id)
472            .collect();
473
474        for id in zombie_ids {
475            if let Some(mut task) = running.remove(&id) {
476                task.status = TaskStatus::Failed;
477                task.error = Some(format!(
478                    "zombie: ran for >{} seconds",
479                    self.zombie_timeout_secs
480                        .load(std::sync::atomic::Ordering::Relaxed)
481                ));
482                reaped.push(id);
483                tracing::warn!(
484                    task_id = %id,
485                    duration_secs = self.zombie_timeout_secs.load(std::sync::atomic::Ordering::Relaxed),
486                    "Zombie task reaped"
487                );
488            }
489            // Remove from start times.
490            start_times.remove(&id);
491        }
492
493        reaped
494    }
495
496    /// Marks a task as running by task ID.
497    ///
498    /// The task must be in the queue (status Queued). This is used by the
499    /// orchestrator to atomically claim a submitted task before execution.
500    pub fn start_task(&self, task_id: Uuid) -> Result<()> {
501        let task = {
502            let mut queue = self.queue.lock();
503            let all: Vec<ScheduledTask> = queue.drain().collect();
504            let mut found: Option<ScheduledTask> = None;
505            let remaining: Vec<ScheduledTask> = all
506                .into_iter()
507                .filter(|t| {
508                    if t.id == task_id {
509                        found = Some(t.clone());
510                        false
511                    } else {
512                        true
513                    }
514                })
515                .collect();
516            *queue = remaining.into_iter().collect();
517            found
518        };
519
520        match task {
521            Some(mut task) => {
522                task.status = TaskStatus::Running;
523                let mut start_times = self.task_start_times.lock();
524                start_times.insert(task.id, Utc::now());
525                let mut running = self.running.lock();
526                running.insert(task.id, task);
527                Ok(())
528            }
529            None => Err(anyhow::anyhow!("task {task_id} not found in queue")),
530        }
531    }
532
533    /// Cancels a queued task by ID.
534    ///
535    /// Only works on tasks still in the queue (not yet running).
536    pub fn cancel_task(&self, task_id: Uuid) -> Result<()> {
537        let mut queue = self.queue.lock();
538        let all: Vec<ScheduledTask> = queue.drain().collect();
539        let mut found = false;
540        let remaining: Vec<ScheduledTask> = all
541            .into_iter()
542            .filter(|t| {
543                if t.id == task_id && t.status == TaskStatus::Queued {
544                    found = true;
545                    false
546                } else {
547                    true
548                }
549            })
550            .collect();
551        *queue = remaining.into_iter().collect();
552
553        if found {
554            tracing::info!(task_id = %task_id, "Task cancelled from queue");
555            Ok(())
556        } else {
557            tracing::warn!(task_id = %task_id, "Task not found in queue for cancellation");
558            Err(anyhow::anyhow!("task not found in queue"))
559        }
560    }
561
562    /// Returns the current scheduler statistics.
563    pub fn stats(&self) -> SchedulerStats {
564        let queue = self.queue.lock();
565        let running = self.running.lock();
566        let rate_limiter = self.rate_limiter.lock();
567
568        let _completed = 0usize;
569        let _failed = 0usize;
570
571        SchedulerStats {
572            queued: queue.len(),
573            running: running.len(),
574            completed: _completed,
575            failed: _failed,
576            max_concurrent: self
577                .max_concurrent
578                .load(std::sync::atomic::Ordering::Relaxed),
579            rate_limit_per_minute: rate_limiter.max_requests,
580            rate_remaining: rate_limiter.remaining(),
581        }
582    }
583
584    /// Returns remaining rate limit capacity.
585    pub fn rate_limit_remaining(&self) -> u32 {
586        self.rate_limiter.lock().remaining()
587    }
588
589    /// Returns all queued tasks (for debugging/monitoring).
590    pub fn queued_tasks(&self) -> Vec<ScheduledTask> {
591        let heap = self.queue.lock();
592        let mut tasks: Vec<ScheduledTask> = heap.iter().cloned().collect();
593        // Sort ascending by priority so highest priority is at the end
594        // (matches the original Vec-based behavior for test compatibility).
595        tasks.sort_by_key(|a| a.priority);
596        tasks
597    }
598
599    /// Returns all running tasks (for debugging/monitoring).
600    pub fn running_tasks(&self) -> Vec<ScheduledTask> {
601        self.running.lock().values().cloned().collect()
602    }
603}
604
605impl Default for AgentScheduler {
606    fn default() -> Self {
607        Self::new(5, 60, 300)
608    }
609}
610
611#[cfg(test)]
612mod tests {
613    use super::*;
614    use std::thread;
615    use std::time::Duration;
616
617    #[test]
618    fn test_task_creation() {
619        let task = ScheduledTask::new("Test task".into(), Priority::Normal);
620        assert_eq!(task.status, TaskStatus::Queued);
621        assert!(task.agent_id.is_none());
622        assert!(!task.error.is_some());
623    }
624
625    #[test]
626    fn test_task_creation_for_agent() {
627        let agent_id = AgentId::new_v4();
628        let task = ScheduledTask::for_agent(agent_id, "Agent task".into(), Priority::High);
629        assert_eq!(task.agent_id, Some(agent_id));
630        assert_eq!(task.priority, Priority::High);
631    }
632
633    #[test]
634    fn test_priority_ordering() {
635        assert!(Priority::Critical > Priority::High);
636        assert!(Priority::High > Priority::Normal);
637        assert!(Priority::Normal > Priority::Low);
638        // Check transitivity
639        assert!(Priority::Critical > Priority::Normal);
640        assert!(Priority::Critical > Priority::Low);
641        assert!(Priority::High > Priority::Low);
642    }
643
644    #[test]
645    fn test_priority_ordering_eq() {
646        assert_eq!(Priority::Low, Priority::Low);
647        assert_eq!(Priority::Normal, Priority::Normal);
648        assert_eq!(Priority::High, Priority::High);
649        assert_eq!(Priority::Critical, Priority::Critical);
650    }
651
652    #[test]
653    fn test_submit_and_next_high_priority_first() {
654        let scheduler = AgentScheduler::new(10, 10_000, 60);
655
656        scheduler
657            .submit(ScheduledTask::new("Low priority".into(), Priority::Low))
658            .unwrap();
659        scheduler
660            .submit(ScheduledTask::new("High priority".into(), Priority::High))
661            .unwrap();
662        scheduler
663            .submit(ScheduledTask::new(
664                "Normal priority".into(),
665                Priority::Normal,
666            ))
667            .unwrap();
668
669        // High priority should come first.
670        let next = scheduler.next_task().unwrap();
671        assert_eq!(next.priority, Priority::High);
672
673        // Normal next.
674        let next = scheduler.next_task().unwrap();
675        assert_eq!(next.priority, Priority::Normal);
676
677        // Low last.
678        let next = scheduler.next_task().unwrap();
679        assert_eq!(next.priority, Priority::Low);
680    }
681
682    #[test]
683    fn test_submit_and_next_critical_first() {
684        let scheduler = AgentScheduler::new(10, 10_000, 60);
685
686        scheduler
687            .submit(ScheduledTask::new("Low".into(), Priority::Low))
688            .unwrap();
689        scheduler
690            .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
691            .unwrap();
692        scheduler
693            .submit(ScheduledTask::new("High".into(), Priority::High))
694            .unwrap();
695        scheduler
696            .submit(ScheduledTask::new("Critical".into(), Priority::Critical))
697            .unwrap();
698
699        // Critical should be first.
700        let next = scheduler.next_task().unwrap();
701        assert_eq!(next.priority, Priority::Critical);
702        // Then High.
703        let next = scheduler.next_task().unwrap();
704        assert_eq!(next.priority, Priority::High);
705        // Then Normal.
706        let next = scheduler.next_task().unwrap();
707        assert_eq!(next.priority, Priority::Normal);
708        // Then Low.
709        let next = scheduler.next_task().unwrap();
710        assert_eq!(next.priority, Priority::Low);
711    }
712
713    #[test]
714    fn test_submit_multiple_same_priority() {
715        let scheduler = AgentScheduler::new(10, 10_000, 60);
716
717        // Multiple tasks at same priority — BinaryHeap does not guarantee
718        // FIFO/LIFO within the same priority level.
719        scheduler
720            .submit(ScheduledTask::new("First".into(), Priority::Normal))
721            .unwrap();
722        scheduler
723            .submit(ScheduledTask::new("Second".into(), Priority::Normal))
724            .unwrap();
725        scheduler
726            .submit(ScheduledTask::new("Third".into(), Priority::Normal))
727            .unwrap();
728
729        // All three should be popped with Normal priority; exact order is unspecified.
730        let mut descriptions = Vec::new();
731        for _ in 0..3 {
732            let next = scheduler.next_task().unwrap();
733            assert_eq!(next.priority, Priority::Normal);
734            descriptions.push(next.description);
735        }
736        descriptions.sort();
737        assert_eq!(descriptions, vec!["First", "Second", "Third"]);
738    }
739
740    #[test]
741    fn test_max_concurrent_blocks() {
742        let scheduler = AgentScheduler::new(2, 10_000, 60);
743
744        scheduler
745            .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
746            .unwrap();
747        scheduler
748            .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
749            .unwrap();
750        scheduler
751            .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
752            .unwrap();
753
754        assert!(scheduler.next_task().is_some());
755        assert!(scheduler.next_task().is_some());
756        // Third should be None due to max concurrent.
757        assert!(scheduler.next_task().is_none());
758    }
759
760    #[test]
761    fn test_max_concurrent_allows_when_slot_frees() {
762        let scheduler = AgentScheduler::new(2, 10_000, 60); // 2 max concurrent.
763
764        let _ = scheduler
765            .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
766            .unwrap();
767        let _id2 = scheduler
768            .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
769            .unwrap();
770        // Queue (insert at 0 prepends): [Task 2, Task 1]
771
772        // Start 2 tasks (fills max_concurrent).
773        let t1 = scheduler.next_task().unwrap(); // Task 2 (first popped).
774        let t2 = scheduler.next_task().unwrap(); // Task 1 (second popped).
775        // Running: [Task 2, Task 1], Queue: []
776        assert!(scheduler.next_task().is_none()); // Blocked.
777
778        // Complete both tasks.
779        scheduler.complete_task(t1.id).unwrap();
780        scheduler.complete_task(t2.id).unwrap();
781
782        // Submit a new task now that slots have freed.
783        let _id3 = scheduler
784            .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
785            .unwrap();
786
787        // Now next_task should work.
788        let task = scheduler.next_task().unwrap();
789        assert_eq!(task.description, "Task 3");
790
791        // Clean up.
792        scheduler.complete_task(task.id).unwrap();
793    }
794
795    #[test]
796    fn test_complete_task_removes_from_running() {
797        let scheduler = AgentScheduler::new(2, 10_000, 60);
798        let task = ScheduledTask::new("Test".into(), Priority::Normal);
799        let id = scheduler.submit(task).unwrap();
800
801        let _ = scheduler.next_task();
802        scheduler.complete_task(id).unwrap();
803
804        let stats = scheduler.stats();
805        assert_eq!(stats.running, 0);
806    }
807
808    #[test]
809    fn test_complete_unknown_task_returns_error() {
810        let scheduler = AgentScheduler::new(2, 10_000, 60);
811        let result = scheduler.complete_task(Uuid::new_v4());
812        assert!(result.is_err());
813    }
814
815    #[test]
816    fn test_fail_task_sets_error() {
817        let scheduler = AgentScheduler::new(2, 10_000, 60);
818        let task = ScheduledTask::new("Test".into(), Priority::Normal);
819        let id = scheduler.submit(task).unwrap();
820
821        let _ = scheduler.next_task();
822        scheduler.fail_task(id, "Something went wrong").unwrap();
823
824        let running = scheduler.running.lock();
825        assert!(!running.contains_key(&id));
826    }
827
828    #[test]
829    fn test_cancel_queued_task() {
830        let scheduler = AgentScheduler::new(2, 10_000, 60);
831        let id = scheduler
832            .submit(ScheduledTask::new("To cancel".into(), Priority::Normal))
833            .unwrap();
834
835        scheduler.cancel_task(id).unwrap();
836
837        // Queue should be empty now.
838        assert!(scheduler.next_task().is_none());
839    }
840
841    #[test]
842    fn test_cancel_running_task_fails() {
843        let scheduler = AgentScheduler::new(2, 10_000, 60);
844        let id = scheduler
845            .submit(ScheduledTask::new("Running".into(), Priority::Normal))
846            .unwrap();
847
848        let _ = scheduler.next_task(); // Task is now running.
849
850        // Can't cancel a running task.
851        let result = scheduler.cancel_task(id);
852        assert!(result.is_err());
853    }
854
855    #[test]
856    fn test_cancel_unknown_task_fails() {
857        let scheduler = AgentScheduler::new(2, 10_000, 60);
858        let result = scheduler.cancel_task(Uuid::new_v4());
859        assert!(result.is_err());
860    }
861
862    #[test]
863    fn test_stats_tracking() {
864        let scheduler = AgentScheduler::new(2, 60, 60);
865
866        let id1 = scheduler
867            .submit(ScheduledTask::new("Queued".into(), Priority::Normal))
868            .unwrap();
869        scheduler
870            .submit(ScheduledTask::new("Queued 2".into(), Priority::Low))
871            .unwrap();
872
873        // Start one task.
874        let started = scheduler.next_task().unwrap();
875        assert_eq!(started.id, id1);
876
877        let stats = scheduler.stats();
878        assert_eq!(stats.queued, 1); // One still in queue.
879        assert_eq!(stats.running, 1);
880        assert_eq!(stats.max_concurrent, 2);
881        assert_eq!(stats.rate_limit_per_minute, 60);
882    }
883
884    #[test]
885    fn test_reap_zombies() {
886        // Create a scheduler with a very short zombie timeout.
887        let scheduler = AgentScheduler::new(2, 10_000, 1); // 1 second timeout.
888
889        // Submit and start a task.
890        let id = scheduler
891            .submit(ScheduledTask::new("Zombie".into(), Priority::Normal))
892            .unwrap();
893        let _ = scheduler.next_task();
894
895        // Wait longer than the zombie timeout.
896        thread::sleep(Duration::from_millis(1_100));
897
898        // Reap zombies.
899        let reaped = scheduler.reap_zombies();
900        assert!(reaped.contains(&id));
901
902        // Task should no longer be running.
903        assert!(scheduler.running.lock().get(&id).is_none());
904    }
905
906    #[test]
907    fn test_reap_zombies_no_zombies() {
908        let scheduler = AgentScheduler::new(2, 10_000, 60); // Long timeout.
909
910        let id = scheduler
911            .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
912            .unwrap();
913        let _ = scheduler.next_task();
914
915        // No sleep, so no zombies yet.
916        let reaped = scheduler.reap_zombies();
917        assert!(reaped.is_empty());
918
919        // Task still running.
920        assert!(scheduler.running.lock().get(&id).is_some());
921    }
922
923    #[test]
924    fn test_rate_limiter_basic() {
925        let mut limiter = RateLimiter::new(60, 3); // 3 requests per minute.
926
927        assert!(limiter.allow());
928        assert!(limiter.allow());
929        assert!(limiter.allow());
930        // 4th request should be blocked.
931        assert!(!limiter.allow());
932    }
933
934    #[test]
935    fn test_rate_limiter_remaining() {
936        let limiter = RateLimiter::new(60, 3);
937
938        assert_eq!(limiter.remaining(), 3);
939
940        let mut limiter = RateLimiter::new(60, 3);
941        limiter.allow();
942        limiter.allow();
943        assert_eq!(limiter.remaining(), 1);
944    }
945
946    #[test]
947    fn test_rate_limiter_tracks_per_scheduler() {
948        let scheduler = AgentScheduler::new(10, 5, 60); // Only 5 requests allowed, high concurrency.
949
950        // Consume all rate limit by calling next_task (not submit).
951        for i in 0..5 {
952            scheduler
953                .submit(ScheduledTask::new(format!("T{}", i), Priority::Normal))
954                .unwrap();
955            let _ = scheduler.next_task();
956        }
957
958        // Should be rate limited.
959        assert!(scheduler.next_task().is_none());
960        assert_eq!(scheduler.rate_limit_remaining(), 0);
961    }
962
963    #[test]
964    fn test_queued_tasks_inspection() {
965        let scheduler = AgentScheduler::new(2, 10_000, 60);
966
967        scheduler
968            .submit(ScheduledTask::new("A".into(), Priority::Low))
969            .unwrap();
970        scheduler
971            .submit(ScheduledTask::new("B".into(), Priority::High))
972            .unwrap();
973        scheduler
974            .submit(ScheduledTask::new("C".into(), Priority::Normal))
975            .unwrap();
976
977        let queued = scheduler.queued_tasks();
978        assert_eq!(queued.len(), 3);
979        // Order is by priority (highest at back for pop).
980        // High should be at the back.
981        assert_eq!(queued.last().unwrap().description, "B");
982    }
983
984    #[test]
985    fn test_running_tasks_inspection() {
986        let scheduler = AgentScheduler::new(2, 10_000, 60);
987
988        scheduler
989            .submit(ScheduledTask::new("R1".into(), Priority::Normal))
990            .unwrap();
991        scheduler
992            .submit(ScheduledTask::new("R2".into(), Priority::Normal))
993            .unwrap();
994
995        let _ = scheduler.next_task();
996        let _ = scheduler.next_task();
997
998        let running = scheduler.running_tasks();
999        assert_eq!(running.len(), 2);
1000    }
1001
1002    #[test]
1003    fn test_default_scheduler() {
1004        let scheduler = AgentScheduler::default();
1005        let stats = scheduler.stats();
1006        assert_eq!(stats.max_concurrent, 5);
1007        assert_eq!(stats.rate_limit_per_minute, 60);
1008    }
1009
1010    #[test]
1011    fn test_budget_manager_integration_skips_exhausted_agent() {
1012        use crate::budget::{BudgetLimit, BudgetManager};
1013
1014        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1015        let budget_manager = Arc::new(BudgetManager::new());
1016
1017        // Set a very low budget (1 call).
1018        let agent_id = AgentId::new_v4();
1019        budget_manager.set_budget(BudgetLimit {
1020            agent_id,
1021            token_budget: 1000,
1022            calls_budget: 1,
1023            window_secs: 60,
1024        });
1025
1026        // Attach budget manager to scheduler.
1027        scheduler
1028            .lock()
1029            .set_budget_manager(Arc::clone(&budget_manager));
1030
1031        // Submit two tasks for the same agent.
1032        scheduler
1033            .lock()
1034            .submit(ScheduledTask::for_agent(
1035                agent_id,
1036                "Task 1".into(),
1037                Priority::Normal,
1038            ))
1039            .unwrap();
1040        scheduler
1041            .lock()
1042            .submit(ScheduledTask::for_agent(
1043                agent_id,
1044                "Task 2".into(),
1045                Priority::Normal,
1046            ))
1047            .unwrap();
1048
1049        // First task should run (track_call succeeds).
1050        let task1 = scheduler.lock().next_task();
1051        assert!(task1.is_some());
1052        scheduler.lock().complete_task(task1.unwrap().id).unwrap();
1053
1054        // Second task should be skipped (budget exhausted).
1055        let task2 = scheduler.lock().next_task();
1056        assert!(task2.is_none());
1057    }
1058
1059    #[test]
1060    fn test_budget_manager_allows_different_agents() {
1061        use crate::budget::{BudgetLimit, BudgetManager};
1062
1063        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1064        let budget_manager = Arc::new(BudgetManager::new());
1065
1066        let agent1 = AgentId::new_v4();
1067        let agent2 = AgentId::new_v4();
1068
1069        // Set budget for both agents (3 calls each).
1070        for agent_id in [&agent1, &agent2] {
1071            budget_manager.set_budget(BudgetLimit {
1072                agent_id: *agent_id,
1073                token_budget: 1000,
1074                calls_budget: 3,
1075                window_secs: 60,
1076            });
1077        }
1078
1079        scheduler
1080            .lock()
1081            .set_budget_manager(Arc::clone(&budget_manager));
1082
1083        // Submit tasks for both agents.
1084        scheduler
1085            .lock()
1086            .submit(ScheduledTask::for_agent(
1087                agent1,
1088                "A1".into(),
1089                Priority::Normal,
1090            ))
1091            .unwrap();
1092        scheduler
1093            .lock()
1094            .submit(ScheduledTask::for_agent(
1095                agent2,
1096                "B1".into(),
1097                Priority::Normal,
1098            ))
1099            .unwrap();
1100
1101        // Both should run.
1102        let t1 = scheduler.lock().next_task().unwrap();
1103        let t2 = scheduler.lock().next_task().unwrap();
1104        assert_ne!(t1.description, t2.description);
1105    }
1106
1107    #[test]
1108    fn test_budget_manager_task_without_agent_id() {
1109        use crate::budget::BudgetManager;
1110
1111        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1112        let budget_manager = Arc::new(BudgetManager::new());
1113
1114        scheduler
1115            .lock()
1116            .set_budget_manager(Arc::clone(&budget_manager));
1117
1118        // Submit a task without an agent ID.
1119        scheduler
1120            .lock()
1121            .submit(ScheduledTask::new("No agent".into(), Priority::Normal))
1122            .unwrap();
1123
1124        // Should still run (no budget check for tasks without agent).
1125        let task = scheduler.lock().next_task();
1126        assert!(task.is_some());
1127    }
1128
1129    #[test]
1130    fn test_budget_manager_not_set_skips_check() {
1131        let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1132        // No budget manager attached.
1133
1134        scheduler
1135            .lock()
1136            .submit(ScheduledTask::new("Any task".into(), Priority::Normal))
1137            .unwrap();
1138
1139        // Should run normally without budget manager.
1140        let task = scheduler.lock().next_task();
1141        assert!(task.is_some());
1142    }
1143}