1use crate::budget::BudgetManager;
10use crate::types::AgentId;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use std::collections::{BinaryHeap, HashMap};
16use std::sync::Arc;
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
21pub enum Priority {
22 Low = 0,
24 #[default]
26 Normal = 1,
27 High = 2,
29 Critical = 3,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum TaskStatus {
36 Queued,
38 Running,
40 Completed,
42 Failed,
44 Cancelled,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ScheduledTask {
51 pub id: Uuid,
53 pub agent_id: Option<AgentId>,
55 pub description: String,
57 pub priority: Priority,
59 pub created_at: DateTime<Utc>,
61 pub status: TaskStatus,
63 #[serde(skip_serializing_if = "Option::is_none")]
65 pub error: Option<String>,
66}
67
68impl PartialOrd for ScheduledTask {
69 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
70 Some(self.cmp(other))
71 }
72}
73
74impl Ord for ScheduledTask {
75 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
76 self.priority
79 .cmp(&other.priority)
80 .then_with(|| other.created_at.cmp(&self.created_at))
81 }
82}
83
84impl ScheduledTask {
85 pub fn new(description: String, priority: Priority) -> Self {
87 Self {
88 id: Uuid::new_v4(),
89 agent_id: None,
90 description,
91 priority,
92 created_at: Utc::now(),
93 status: TaskStatus::Queued,
94 error: None,
95 }
96 }
97
98 pub fn for_agent(agent_id: AgentId, description: String, priority: Priority) -> Self {
100 Self {
101 id: Uuid::new_v4(),
102 agent_id: Some(agent_id),
103 description,
104 priority,
105 created_at: Utc::now(),
106 status: TaskStatus::Queued,
107 error: None,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct SchedulerStats {
115 pub queued: usize,
117 pub running: usize,
119 pub completed: usize,
121 pub failed: usize,
123 pub max_concurrent: usize,
125 pub rate_limit_per_minute: u32,
127 pub rate_remaining: u32,
129}
130
131impl Default for SchedulerStats {
132 fn default() -> Self {
133 Self {
134 queued: 0,
135 running: 0,
136 completed: 0,
137 failed: 0,
138 max_concurrent: 5,
139 rate_limit_per_minute: 60,
140 rate_remaining: 60,
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
147struct RateLimiter {
148 window: Vec<DateTime<Utc>>,
150 window_secs: u64,
152 max_requests: u32,
154}
155
156impl RateLimiter {
157 fn new(window_secs: u64, max_requests: u32) -> Self {
158 Self {
159 window: Vec::new(),
160 window_secs,
161 max_requests,
162 }
163 }
164
165 fn allow(&mut self) -> bool {
167 let now = Utc::now();
168 let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
169
170 self.window.retain(|t| *t > cutoff);
172
173 if self.window.len() >= self.max_requests as usize {
174 return false;
175 }
176
177 self.window.push(now);
178 true
179 }
180
181 fn remaining(&self) -> u32 {
183 let now = Utc::now();
184 let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
185 let active = self.window.iter().filter(|t| **t > cutoff).count();
186 self.max_requests.saturating_sub(active as u32)
187 }
188}
189
190pub struct AgentScheduler {
195 queue: Arc<Mutex<BinaryHeap<ScheduledTask>>>,
197 running: Arc<Mutex<HashMap<Uuid, ScheduledTask>>>,
199 max_concurrent: std::sync::atomic::AtomicUsize,
201 rate_limiter: Arc<Mutex<RateLimiter>>,
203 zombie_timeout_secs: std::sync::atomic::AtomicU64,
205 task_start_times: Arc<Mutex<HashMap<Uuid, DateTime<Utc>>>>,
207 budget_manager: Option<Arc<BudgetManager>>,
209}
210
211impl AgentScheduler {
212 pub fn new(
219 max_concurrent: usize,
220 rate_limit_per_minute: u32,
221 zombie_timeout_secs: u64,
222 ) -> Self {
223 Self {
224 queue: Arc::new(Mutex::new(BinaryHeap::new())),
225 running: Arc::new(Mutex::new(HashMap::new())),
226 max_concurrent: std::sync::atomic::AtomicUsize::new(max_concurrent),
227 rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60, rate_limit_per_minute))),
228 zombie_timeout_secs: std::sync::atomic::AtomicU64::new(zombie_timeout_secs),
229 task_start_times: Arc::new(Mutex::new(HashMap::new())),
230 budget_manager: None,
231 }
232 }
233
234 pub fn set_budget_manager(&mut self, bm: Arc<BudgetManager>) {
242 self.budget_manager = Some(bm);
243 }
244
245 pub fn update_config(
250 &self,
251 max_concurrent: usize,
252 rate_limit_per_minute: u32,
253 zombie_timeout_secs: u64,
254 ) {
255 {
256 let mut limiter = self.rate_limiter.lock();
257 *limiter = RateLimiter::new(60, rate_limit_per_minute);
258 }
259 self.max_concurrent
260 .store(max_concurrent, std::sync::atomic::Ordering::Relaxed);
261 self.zombie_timeout_secs
262 .store(zombie_timeout_secs, std::sync::atomic::Ordering::Relaxed);
263 tracing::info!(
264 max_concurrent,
265 rate_limit_per_minute,
266 zombie_timeout_secs,
267 "Scheduler config hot-reloaded"
268 );
269 }
270
271 pub fn submit(&self, mut task: ScheduledTask) -> Result<Uuid> {
275 task.status = TaskStatus::Queued;
276 let id = task.id;
277
278 let mut queue = self.queue.lock();
279 queue.push(task); tracing::debug!(
282 task_id = %id,
283 queue_len = queue.len(),
284 "Task submitted to scheduler"
285 );
286
287 Ok(id)
288 }
289
290 pub fn next_task(&self) -> Option<ScheduledTask> {
297 {
299 let running = self.running.lock();
300 if running.len()
301 >= self
302 .max_concurrent
303 .load(std::sync::atomic::Ordering::Relaxed)
304 {
305 tracing::debug!(
306 running = running.len(),
307 max = self
308 .max_concurrent
309 .load(std::sync::atomic::Ordering::Relaxed),
310 "Max concurrent limit reached"
311 );
312 return None;
313 }
314 }
315
316 {
318 let mut limiter = self.rate_limiter.lock();
319 if !limiter.allow() {
320 tracing::debug!(remaining = limiter.remaining(), "Rate limit exceeded");
321 return None;
322 }
323 }
324
325 let mut discarded: usize = 0;
327 let mut task = loop {
328 let task_opt = {
329 let mut queue = self.queue.lock();
330 queue.pop() };
332
333 match task_opt {
334 Some(t) => {
335 if let (Some(bm), Some(agent_id)) = (&self.budget_manager, &t.agent_id)
337 && !bm.can_schedule(agent_id)
338 {
339 tracing::warn!(
340 agent_id = %agent_id,
341 "Agent budget exhausted, skipping task"
342 );
343 discarded += 1;
344 continue; }
346 break t;
347 }
348 None => {
349 if discarded > 0 {
350 tracing::info!(discarded, "All queued tasks had exhausted budgets");
351 }
352 return None;
353 }
354 }
355 };
356
357 if discarded > 0 {
358 tracing::info!(discarded, "Skipped tasks with exhausted budgets");
359 }
360
361 task.status = TaskStatus::Running;
362
363 {
365 let mut start_times = self.task_start_times.lock();
366 start_times.insert(task.id, Utc::now());
367 }
368
369 {
371 let mut running = self.running.lock();
372 running.insert(task.id, task.clone());
373 }
374
375 tracing::info!(
376 task_id = %task.id,
377 priority = ?task.priority,
378 running = self.running.lock().len(),
379 "Task started by scheduler"
380 );
381
382 if let (Some(bm), Some(agent_id)) = (&self.budget_manager, &task.agent_id)
384 && let Err(e) = bm.track_call(agent_id)
385 {
386 tracing::warn!(
387 agent_id = %agent_id,
388 error = %e,
389 "Budget exceeded during task track_call"
390 );
391 }
392
393 Some(task)
394 }
395
396 pub fn complete_task(&self, task_id: Uuid) -> Result<()> {
400 let task = {
401 let mut running = self.running.lock();
402 running.remove(&task_id)
403 };
404
405 match task {
406 Some(mut t) => {
407 t.status = TaskStatus::Completed;
408
409 {
411 let mut start_times = self.task_start_times.lock();
412 start_times.remove(&task_id);
413 }
414
415 tracing::info!(task_id = %task_id, "Task completed");
416 Ok(())
417 }
418 None => {
419 tracing::warn!(task_id = %task_id, "Attempted to complete unknown task");
420 Err(anyhow::anyhow!("task not found"))
421 }
422 }
423 }
424
425 pub fn fail_task(&self, task_id: Uuid, error: &str) -> Result<()> {
429 let task = {
430 let mut running = self.running.lock();
431 running.remove(&task_id)
432 };
433
434 match task {
435 Some(mut t) => {
436 t.status = TaskStatus::Failed;
437 t.error = Some(error.to_string());
438
439 {
441 let mut start_times = self.task_start_times.lock();
442 start_times.remove(&task_id);
443 }
444
445 tracing::warn!(task_id = %task_id, error = %error, "Task failed");
446 Ok(())
447 }
448 None => {
449 tracing::warn!(task_id = %task_id, "Attempted to fail unknown task");
450 Err(anyhow::anyhow!("task not found"))
451 }
452 }
453 }
454
455 pub fn reap_zombies(&self) -> Vec<Uuid> {
459 let now = Utc::now();
460 let timeout = chrono::Duration::seconds(
461 self.zombie_timeout_secs
462 .load(std::sync::atomic::Ordering::Relaxed) as i64,
463 );
464 let mut start_times = self.task_start_times.lock();
465 let mut running = self.running.lock();
466 let mut reaped = Vec::new();
467
468 let zombie_ids: Vec<Uuid> = start_times
469 .iter()
470 .filter(|(_, start)| now - **start > timeout)
471 .map(|(id, _)| *id)
472 .collect();
473
474 for id in zombie_ids {
475 if let Some(mut task) = running.remove(&id) {
476 task.status = TaskStatus::Failed;
477 task.error = Some(format!(
478 "zombie: ran for >{} seconds",
479 self.zombie_timeout_secs
480 .load(std::sync::atomic::Ordering::Relaxed)
481 ));
482 reaped.push(id);
483 tracing::warn!(
484 task_id = %id,
485 duration_secs = self.zombie_timeout_secs.load(std::sync::atomic::Ordering::Relaxed),
486 "Zombie task reaped"
487 );
488 }
489 start_times.remove(&id);
491 }
492
493 reaped
494 }
495
496 pub fn start_task(&self, task_id: Uuid) -> Result<()> {
501 let task = {
502 let mut queue = self.queue.lock();
503 let all: Vec<ScheduledTask> = queue.drain().collect();
504 let mut found: Option<ScheduledTask> = None;
505 let remaining: Vec<ScheduledTask> = all
506 .into_iter()
507 .filter(|t| {
508 if t.id == task_id {
509 found = Some(t.clone());
510 false
511 } else {
512 true
513 }
514 })
515 .collect();
516 *queue = remaining.into_iter().collect();
517 found
518 };
519
520 match task {
521 Some(mut task) => {
522 task.status = TaskStatus::Running;
523 let mut start_times = self.task_start_times.lock();
524 start_times.insert(task.id, Utc::now());
525 let mut running = self.running.lock();
526 running.insert(task.id, task);
527 Ok(())
528 }
529 None => Err(anyhow::anyhow!("task {task_id} not found in queue")),
530 }
531 }
532
533 pub fn cancel_task(&self, task_id: Uuid) -> Result<()> {
537 let mut queue = self.queue.lock();
538 let all: Vec<ScheduledTask> = queue.drain().collect();
539 let mut found = false;
540 let remaining: Vec<ScheduledTask> = all
541 .into_iter()
542 .filter(|t| {
543 if t.id == task_id && t.status == TaskStatus::Queued {
544 found = true;
545 false
546 } else {
547 true
548 }
549 })
550 .collect();
551 *queue = remaining.into_iter().collect();
552
553 if found {
554 tracing::info!(task_id = %task_id, "Task cancelled from queue");
555 Ok(())
556 } else {
557 tracing::warn!(task_id = %task_id, "Task not found in queue for cancellation");
558 Err(anyhow::anyhow!("task not found in queue"))
559 }
560 }
561
562 pub fn stats(&self) -> SchedulerStats {
564 let queue = self.queue.lock();
565 let running = self.running.lock();
566 let rate_limiter = self.rate_limiter.lock();
567
568 let _completed = 0usize;
569 let _failed = 0usize;
570
571 SchedulerStats {
572 queued: queue.len(),
573 running: running.len(),
574 completed: _completed,
575 failed: _failed,
576 max_concurrent: self
577 .max_concurrent
578 .load(std::sync::atomic::Ordering::Relaxed),
579 rate_limit_per_minute: rate_limiter.max_requests,
580 rate_remaining: rate_limiter.remaining(),
581 }
582 }
583
584 pub fn rate_limit_remaining(&self) -> u32 {
586 self.rate_limiter.lock().remaining()
587 }
588
589 pub fn queued_tasks(&self) -> Vec<ScheduledTask> {
591 let heap = self.queue.lock();
592 let mut tasks: Vec<ScheduledTask> = heap.iter().cloned().collect();
593 tasks.sort_by_key(|a| a.priority);
596 tasks
597 }
598
599 pub fn running_tasks(&self) -> Vec<ScheduledTask> {
601 self.running.lock().values().cloned().collect()
602 }
603}
604
605impl Default for AgentScheduler {
606 fn default() -> Self {
607 Self::new(5, 60, 300)
608 }
609}
610
611#[cfg(test)]
612mod tests {
613 use super::*;
614 use std::thread;
615 use std::time::Duration;
616
617 #[test]
618 fn test_task_creation() {
619 let task = ScheduledTask::new("Test task".into(), Priority::Normal);
620 assert_eq!(task.status, TaskStatus::Queued);
621 assert!(task.agent_id.is_none());
622 assert!(!task.error.is_some());
623 }
624
625 #[test]
626 fn test_task_creation_for_agent() {
627 let agent_id = AgentId::new_v4();
628 let task = ScheduledTask::for_agent(agent_id, "Agent task".into(), Priority::High);
629 assert_eq!(task.agent_id, Some(agent_id));
630 assert_eq!(task.priority, Priority::High);
631 }
632
633 #[test]
634 fn test_priority_ordering() {
635 assert!(Priority::Critical > Priority::High);
636 assert!(Priority::High > Priority::Normal);
637 assert!(Priority::Normal > Priority::Low);
638 assert!(Priority::Critical > Priority::Normal);
640 assert!(Priority::Critical > Priority::Low);
641 assert!(Priority::High > Priority::Low);
642 }
643
644 #[test]
645 fn test_priority_ordering_eq() {
646 assert_eq!(Priority::Low, Priority::Low);
647 assert_eq!(Priority::Normal, Priority::Normal);
648 assert_eq!(Priority::High, Priority::High);
649 assert_eq!(Priority::Critical, Priority::Critical);
650 }
651
652 #[test]
653 fn test_submit_and_next_high_priority_first() {
654 let scheduler = AgentScheduler::new(10, 10_000, 60);
655
656 scheduler
657 .submit(ScheduledTask::new("Low priority".into(), Priority::Low))
658 .unwrap();
659 scheduler
660 .submit(ScheduledTask::new("High priority".into(), Priority::High))
661 .unwrap();
662 scheduler
663 .submit(ScheduledTask::new(
664 "Normal priority".into(),
665 Priority::Normal,
666 ))
667 .unwrap();
668
669 let next = scheduler.next_task().unwrap();
671 assert_eq!(next.priority, Priority::High);
672
673 let next = scheduler.next_task().unwrap();
675 assert_eq!(next.priority, Priority::Normal);
676
677 let next = scheduler.next_task().unwrap();
679 assert_eq!(next.priority, Priority::Low);
680 }
681
682 #[test]
683 fn test_submit_and_next_critical_first() {
684 let scheduler = AgentScheduler::new(10, 10_000, 60);
685
686 scheduler
687 .submit(ScheduledTask::new("Low".into(), Priority::Low))
688 .unwrap();
689 scheduler
690 .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
691 .unwrap();
692 scheduler
693 .submit(ScheduledTask::new("High".into(), Priority::High))
694 .unwrap();
695 scheduler
696 .submit(ScheduledTask::new("Critical".into(), Priority::Critical))
697 .unwrap();
698
699 let next = scheduler.next_task().unwrap();
701 assert_eq!(next.priority, Priority::Critical);
702 let next = scheduler.next_task().unwrap();
704 assert_eq!(next.priority, Priority::High);
705 let next = scheduler.next_task().unwrap();
707 assert_eq!(next.priority, Priority::Normal);
708 let next = scheduler.next_task().unwrap();
710 assert_eq!(next.priority, Priority::Low);
711 }
712
713 #[test]
714 fn test_submit_multiple_same_priority() {
715 let scheduler = AgentScheduler::new(10, 10_000, 60);
716
717 scheduler
720 .submit(ScheduledTask::new("First".into(), Priority::Normal))
721 .unwrap();
722 scheduler
723 .submit(ScheduledTask::new("Second".into(), Priority::Normal))
724 .unwrap();
725 scheduler
726 .submit(ScheduledTask::new("Third".into(), Priority::Normal))
727 .unwrap();
728
729 let mut descriptions = Vec::new();
731 for _ in 0..3 {
732 let next = scheduler.next_task().unwrap();
733 assert_eq!(next.priority, Priority::Normal);
734 descriptions.push(next.description);
735 }
736 descriptions.sort();
737 assert_eq!(descriptions, vec!["First", "Second", "Third"]);
738 }
739
740 #[test]
741 fn test_max_concurrent_blocks() {
742 let scheduler = AgentScheduler::new(2, 10_000, 60);
743
744 scheduler
745 .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
746 .unwrap();
747 scheduler
748 .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
749 .unwrap();
750 scheduler
751 .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
752 .unwrap();
753
754 assert!(scheduler.next_task().is_some());
755 assert!(scheduler.next_task().is_some());
756 assert!(scheduler.next_task().is_none());
758 }
759
760 #[test]
761 fn test_max_concurrent_allows_when_slot_frees() {
762 let scheduler = AgentScheduler::new(2, 10_000, 60); let _ = scheduler
765 .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
766 .unwrap();
767 let _id2 = scheduler
768 .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
769 .unwrap();
770 let t1 = scheduler.next_task().unwrap(); let t2 = scheduler.next_task().unwrap(); assert!(scheduler.next_task().is_none()); scheduler.complete_task(t1.id).unwrap();
780 scheduler.complete_task(t2.id).unwrap();
781
782 let _id3 = scheduler
784 .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
785 .unwrap();
786
787 let task = scheduler.next_task().unwrap();
789 assert_eq!(task.description, "Task 3");
790
791 scheduler.complete_task(task.id).unwrap();
793 }
794
795 #[test]
796 fn test_complete_task_removes_from_running() {
797 let scheduler = AgentScheduler::new(2, 10_000, 60);
798 let task = ScheduledTask::new("Test".into(), Priority::Normal);
799 let id = scheduler.submit(task).unwrap();
800
801 let _ = scheduler.next_task();
802 scheduler.complete_task(id).unwrap();
803
804 let stats = scheduler.stats();
805 assert_eq!(stats.running, 0);
806 }
807
808 #[test]
809 fn test_complete_unknown_task_returns_error() {
810 let scheduler = AgentScheduler::new(2, 10_000, 60);
811 let result = scheduler.complete_task(Uuid::new_v4());
812 assert!(result.is_err());
813 }
814
815 #[test]
816 fn test_fail_task_sets_error() {
817 let scheduler = AgentScheduler::new(2, 10_000, 60);
818 let task = ScheduledTask::new("Test".into(), Priority::Normal);
819 let id = scheduler.submit(task).unwrap();
820
821 let _ = scheduler.next_task();
822 scheduler.fail_task(id, "Something went wrong").unwrap();
823
824 let running = scheduler.running.lock();
825 assert!(!running.contains_key(&id));
826 }
827
828 #[test]
829 fn test_cancel_queued_task() {
830 let scheduler = AgentScheduler::new(2, 10_000, 60);
831 let id = scheduler
832 .submit(ScheduledTask::new("To cancel".into(), Priority::Normal))
833 .unwrap();
834
835 scheduler.cancel_task(id).unwrap();
836
837 assert!(scheduler.next_task().is_none());
839 }
840
841 #[test]
842 fn test_cancel_running_task_fails() {
843 let scheduler = AgentScheduler::new(2, 10_000, 60);
844 let id = scheduler
845 .submit(ScheduledTask::new("Running".into(), Priority::Normal))
846 .unwrap();
847
848 let _ = scheduler.next_task(); let result = scheduler.cancel_task(id);
852 assert!(result.is_err());
853 }
854
855 #[test]
856 fn test_cancel_unknown_task_fails() {
857 let scheduler = AgentScheduler::new(2, 10_000, 60);
858 let result = scheduler.cancel_task(Uuid::new_v4());
859 assert!(result.is_err());
860 }
861
862 #[test]
863 fn test_stats_tracking() {
864 let scheduler = AgentScheduler::new(2, 60, 60);
865
866 let id1 = scheduler
867 .submit(ScheduledTask::new("Queued".into(), Priority::Normal))
868 .unwrap();
869 scheduler
870 .submit(ScheduledTask::new("Queued 2".into(), Priority::Low))
871 .unwrap();
872
873 let started = scheduler.next_task().unwrap();
875 assert_eq!(started.id, id1);
876
877 let stats = scheduler.stats();
878 assert_eq!(stats.queued, 1); assert_eq!(stats.running, 1);
880 assert_eq!(stats.max_concurrent, 2);
881 assert_eq!(stats.rate_limit_per_minute, 60);
882 }
883
884 #[test]
885 fn test_reap_zombies() {
886 let scheduler = AgentScheduler::new(2, 10_000, 1); let id = scheduler
891 .submit(ScheduledTask::new("Zombie".into(), Priority::Normal))
892 .unwrap();
893 let _ = scheduler.next_task();
894
895 thread::sleep(Duration::from_millis(1_100));
897
898 let reaped = scheduler.reap_zombies();
900 assert!(reaped.contains(&id));
901
902 assert!(scheduler.running.lock().get(&id).is_none());
904 }
905
906 #[test]
907 fn test_reap_zombies_no_zombies() {
908 let scheduler = AgentScheduler::new(2, 10_000, 60); let id = scheduler
911 .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
912 .unwrap();
913 let _ = scheduler.next_task();
914
915 let reaped = scheduler.reap_zombies();
917 assert!(reaped.is_empty());
918
919 assert!(scheduler.running.lock().get(&id).is_some());
921 }
922
923 #[test]
924 fn test_rate_limiter_basic() {
925 let mut limiter = RateLimiter::new(60, 3); assert!(limiter.allow());
928 assert!(limiter.allow());
929 assert!(limiter.allow());
930 assert!(!limiter.allow());
932 }
933
934 #[test]
935 fn test_rate_limiter_remaining() {
936 let limiter = RateLimiter::new(60, 3);
937
938 assert_eq!(limiter.remaining(), 3);
939
940 let mut limiter = RateLimiter::new(60, 3);
941 limiter.allow();
942 limiter.allow();
943 assert_eq!(limiter.remaining(), 1);
944 }
945
946 #[test]
947 fn test_rate_limiter_tracks_per_scheduler() {
948 let scheduler = AgentScheduler::new(10, 5, 60); for i in 0..5 {
952 scheduler
953 .submit(ScheduledTask::new(format!("T{}", i), Priority::Normal))
954 .unwrap();
955 let _ = scheduler.next_task();
956 }
957
958 assert!(scheduler.next_task().is_none());
960 assert_eq!(scheduler.rate_limit_remaining(), 0);
961 }
962
963 #[test]
964 fn test_queued_tasks_inspection() {
965 let scheduler = AgentScheduler::new(2, 10_000, 60);
966
967 scheduler
968 .submit(ScheduledTask::new("A".into(), Priority::Low))
969 .unwrap();
970 scheduler
971 .submit(ScheduledTask::new("B".into(), Priority::High))
972 .unwrap();
973 scheduler
974 .submit(ScheduledTask::new("C".into(), Priority::Normal))
975 .unwrap();
976
977 let queued = scheduler.queued_tasks();
978 assert_eq!(queued.len(), 3);
979 assert_eq!(queued.last().unwrap().description, "B");
982 }
983
984 #[test]
985 fn test_running_tasks_inspection() {
986 let scheduler = AgentScheduler::new(2, 10_000, 60);
987
988 scheduler
989 .submit(ScheduledTask::new("R1".into(), Priority::Normal))
990 .unwrap();
991 scheduler
992 .submit(ScheduledTask::new("R2".into(), Priority::Normal))
993 .unwrap();
994
995 let _ = scheduler.next_task();
996 let _ = scheduler.next_task();
997
998 let running = scheduler.running_tasks();
999 assert_eq!(running.len(), 2);
1000 }
1001
1002 #[test]
1003 fn test_default_scheduler() {
1004 let scheduler = AgentScheduler::default();
1005 let stats = scheduler.stats();
1006 assert_eq!(stats.max_concurrent, 5);
1007 assert_eq!(stats.rate_limit_per_minute, 60);
1008 }
1009
1010 #[test]
1011 fn test_budget_manager_integration_skips_exhausted_agent() {
1012 use crate::budget::{BudgetLimit, BudgetManager};
1013
1014 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1015 let budget_manager = Arc::new(BudgetManager::new());
1016
1017 let agent_id = AgentId::new_v4();
1019 budget_manager.set_budget(BudgetLimit {
1020 agent_id,
1021 token_budget: 1000,
1022 calls_budget: 1,
1023 window_secs: 60,
1024 });
1025
1026 scheduler
1028 .lock()
1029 .set_budget_manager(Arc::clone(&budget_manager));
1030
1031 scheduler
1033 .lock()
1034 .submit(ScheduledTask::for_agent(
1035 agent_id,
1036 "Task 1".into(),
1037 Priority::Normal,
1038 ))
1039 .unwrap();
1040 scheduler
1041 .lock()
1042 .submit(ScheduledTask::for_agent(
1043 agent_id,
1044 "Task 2".into(),
1045 Priority::Normal,
1046 ))
1047 .unwrap();
1048
1049 let task1 = scheduler.lock().next_task();
1051 assert!(task1.is_some());
1052 scheduler.lock().complete_task(task1.unwrap().id).unwrap();
1053
1054 let task2 = scheduler.lock().next_task();
1056 assert!(task2.is_none());
1057 }
1058
1059 #[test]
1060 fn test_budget_manager_allows_different_agents() {
1061 use crate::budget::{BudgetLimit, BudgetManager};
1062
1063 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1064 let budget_manager = Arc::new(BudgetManager::new());
1065
1066 let agent1 = AgentId::new_v4();
1067 let agent2 = AgentId::new_v4();
1068
1069 for agent_id in [&agent1, &agent2] {
1071 budget_manager.set_budget(BudgetLimit {
1072 agent_id: *agent_id,
1073 token_budget: 1000,
1074 calls_budget: 3,
1075 window_secs: 60,
1076 });
1077 }
1078
1079 scheduler
1080 .lock()
1081 .set_budget_manager(Arc::clone(&budget_manager));
1082
1083 scheduler
1085 .lock()
1086 .submit(ScheduledTask::for_agent(
1087 agent1,
1088 "A1".into(),
1089 Priority::Normal,
1090 ))
1091 .unwrap();
1092 scheduler
1093 .lock()
1094 .submit(ScheduledTask::for_agent(
1095 agent2,
1096 "B1".into(),
1097 Priority::Normal,
1098 ))
1099 .unwrap();
1100
1101 let t1 = scheduler.lock().next_task().unwrap();
1103 let t2 = scheduler.lock().next_task().unwrap();
1104 assert_ne!(t1.description, t2.description);
1105 }
1106
1107 #[test]
1108 fn test_budget_manager_task_without_agent_id() {
1109 use crate::budget::BudgetManager;
1110
1111 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1112 let budget_manager = Arc::new(BudgetManager::new());
1113
1114 scheduler
1115 .lock()
1116 .set_budget_manager(Arc::clone(&budget_manager));
1117
1118 scheduler
1120 .lock()
1121 .submit(ScheduledTask::new("No agent".into(), Priority::Normal))
1122 .unwrap();
1123
1124 let task = scheduler.lock().next_task();
1126 assert!(task.is_some());
1127 }
1128
1129 #[test]
1130 fn test_budget_manager_not_set_skips_check() {
1131 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1132 scheduler
1135 .lock()
1136 .submit(ScheduledTask::new("Any task".into(), Priority::Normal))
1137 .unwrap();
1138
1139 let task = scheduler.lock().next_task();
1141 assert!(task.is_some());
1142 }
1143}