1use crate::budget::BudgetManager;
10use crate::types::AgentId;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use std::collections::{BinaryHeap, HashMap};
16use std::sync::Arc;
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
21pub enum Priority {
22 Low = 0,
24 #[default]
26 Normal = 1,
27 High = 2,
29 Critical = 3,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum TaskStatus {
36 Queued,
38 Running,
40 Completed,
42 Failed,
44 Cancelled,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ScheduledTask {
51 pub id: Uuid,
53 pub agent_id: Option<AgentId>,
55 pub description: String,
57 pub priority: Priority,
59 pub created_at: DateTime<Utc>,
61 pub status: TaskStatus,
63 #[serde(skip_serializing_if = "Option::is_none")]
65 pub error: Option<String>,
66}
67
68impl PartialOrd for ScheduledTask {
69 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
70 Some(self.cmp(other))
71 }
72}
73
74impl Ord for ScheduledTask {
75 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
76 self.priority
79 .cmp(&other.priority)
80 .then_with(|| other.created_at.cmp(&self.created_at))
81 }
82}
83
84impl ScheduledTask {
85 pub fn new(description: String, priority: Priority) -> Self {
87 Self {
88 id: Uuid::new_v4(),
89 agent_id: None,
90 description,
91 priority,
92 created_at: Utc::now(),
93 status: TaskStatus::Queued,
94 error: None,
95 }
96 }
97
98 pub fn for_agent(agent_id: AgentId, description: String, priority: Priority) -> Self {
100 Self {
101 id: Uuid::new_v4(),
102 agent_id: Some(agent_id),
103 description,
104 priority,
105 created_at: Utc::now(),
106 status: TaskStatus::Queued,
107 error: None,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct SchedulerStats {
115 pub queued: usize,
117 pub running: usize,
119 pub completed: usize,
121 pub failed: usize,
123 pub max_concurrent: usize,
125 pub rate_limit_per_minute: u32,
127 pub rate_remaining: u32,
129}
130
131impl Default for SchedulerStats {
132 fn default() -> Self {
133 Self {
134 queued: 0,
135 running: 0,
136 completed: 0,
137 failed: 0,
138 max_concurrent: 5,
139 rate_limit_per_minute: 60,
140 rate_remaining: 60,
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
147struct RateLimiter {
148 window: Vec<DateTime<Utc>>,
150 window_secs: u64,
152 max_requests: u32,
154}
155
156impl RateLimiter {
157 fn new(window_secs: u64, max_requests: u32) -> Self {
158 Self {
159 window: Vec::new(),
160 window_secs,
161 max_requests,
162 }
163 }
164
165 fn allow(&mut self) -> bool {
167 let now = Utc::now();
168 let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
169
170 self.window.retain(|t| *t > cutoff);
172
173 if self.window.len() >= self.max_requests as usize {
174 return false;
175 }
176
177 self.window.push(now);
178 true
179 }
180
181 fn remaining(&self) -> u32 {
183 let now = Utc::now();
184 let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
185 let active = self.window.iter().filter(|t| **t > cutoff).count();
186 self.max_requests.saturating_sub(active as u32)
187 }
188}
189
190pub struct AgentScheduler {
195 queue: Arc<Mutex<BinaryHeap<ScheduledTask>>>,
197 running: Arc<Mutex<HashMap<Uuid, ScheduledTask>>>,
199 max_concurrent: std::sync::atomic::AtomicUsize,
201 rate_limiter: Arc<Mutex<RateLimiter>>,
203 zombie_timeout_secs: std::sync::atomic::AtomicU64,
205 task_start_times: Arc<Mutex<HashMap<Uuid, DateTime<Utc>>>>,
207 budget_manager: Option<Arc<BudgetManager>>,
209}
210
211impl AgentScheduler {
212 pub fn new(
219 max_concurrent: usize,
220 rate_limit_per_minute: u32,
221 zombie_timeout_secs: u64,
222 ) -> Self {
223 Self {
224 queue: Arc::new(Mutex::new(BinaryHeap::new())),
225 running: Arc::new(Mutex::new(HashMap::new())),
226 max_concurrent: std::sync::atomic::AtomicUsize::new(max_concurrent),
227 rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60, rate_limit_per_minute))),
228 zombie_timeout_secs: std::sync::atomic::AtomicU64::new(zombie_timeout_secs),
229 task_start_times: Arc::new(Mutex::new(HashMap::new())),
230 budget_manager: None,
231 }
232 }
233
234 pub fn set_budget_manager(&mut self, bm: Arc<BudgetManager>) {
242 self.budget_manager = Some(bm);
243 }
244
245 pub fn update_config(
250 &self,
251 max_concurrent: usize,
252 rate_limit_per_minute: u32,
253 zombie_timeout_secs: u64,
254 ) {
255 {
256 let mut limiter = self.rate_limiter.lock();
257 *limiter = RateLimiter::new(60, rate_limit_per_minute);
258 }
259 self.max_concurrent
260 .store(max_concurrent, std::sync::atomic::Ordering::Relaxed);
261 self.zombie_timeout_secs
262 .store(zombie_timeout_secs, std::sync::atomic::Ordering::Relaxed);
263 tracing::info!(
264 max_concurrent,
265 rate_limit_per_minute,
266 zombie_timeout_secs,
267 "Scheduler config hot-reloaded"
268 );
269 }
270
271 pub fn submit(&self, mut task: ScheduledTask) -> Result<Uuid> {
275 task.status = TaskStatus::Queued;
276 let id = task.id;
277
278 let mut queue = self.queue.lock();
279 queue.push(task); tracing::debug!(
282 task_id = %id,
283 queue_len = queue.len(),
284 "Task submitted to scheduler"
285 );
286
287 Ok(id)
288 }
289
290 pub fn next_task(&self) -> Option<ScheduledTask> {
297 {
299 let running = self.running.lock();
300 if running.len()
301 >= self
302 .max_concurrent
303 .load(std::sync::atomic::Ordering::Relaxed)
304 {
305 tracing::debug!(
306 running = running.len(),
307 max = self
308 .max_concurrent
309 .load(std::sync::atomic::Ordering::Relaxed),
310 "Max concurrent limit reached"
311 );
312 return None;
313 }
314 }
315
316 {
318 let mut limiter = self.rate_limiter.lock();
319 if !limiter.allow() {
320 tracing::debug!(remaining = limiter.remaining(), "Rate limit exceeded");
321 return None;
322 }
323 }
324
325 let mut discarded: usize = 0;
327 let mut task = loop {
328 let task_opt = {
329 let mut queue = self.queue.lock();
330 queue.pop() };
332
333 match task_opt {
334 Some(t) => {
335 if let (Some(ref bm), Some(ref agent_id)) = (&self.budget_manager, &t.agent_id)
337 {
338 if !bm.can_schedule(agent_id) {
339 tracing::warn!(
340 agent_id = %agent_id,
341 "Agent budget exhausted, skipping task"
342 );
343 discarded += 1;
344 continue; }
346 }
347 break t;
348 }
349 None => {
350 if discarded > 0 {
351 tracing::info!(discarded, "All queued tasks had exhausted budgets");
352 }
353 return None;
354 }
355 }
356 };
357
358 if discarded > 0 {
359 tracing::info!(discarded, "Skipped tasks with exhausted budgets");
360 }
361
362 task.status = TaskStatus::Running;
363
364 {
366 let mut start_times = self.task_start_times.lock();
367 start_times.insert(task.id, Utc::now());
368 }
369
370 {
372 let mut running = self.running.lock();
373 running.insert(task.id, task.clone());
374 }
375
376 tracing::info!(
377 task_id = %task.id,
378 priority = ?task.priority,
379 running = self.running.lock().len(),
380 "Task started by scheduler"
381 );
382
383 if let (Some(ref bm), Some(ref agent_id)) = (&self.budget_manager, &task.agent_id) {
385 if let Err(e) = bm.track_call(agent_id) {
386 tracing::warn!(
387 agent_id = %agent_id,
388 error = %e,
389 "Budget exceeded during task track_call"
390 );
391 }
392 }
393
394 Some(task)
395 }
396
397 pub fn complete_task(&self, task_id: Uuid) -> Result<()> {
401 let task = {
402 let mut running = self.running.lock();
403 running.remove(&task_id)
404 };
405
406 match task {
407 Some(mut t) => {
408 t.status = TaskStatus::Completed;
409
410 {
412 let mut start_times = self.task_start_times.lock();
413 start_times.remove(&task_id);
414 }
415
416 tracing::info!(task_id = %task_id, "Task completed");
417 Ok(())
418 }
419 None => {
420 tracing::warn!(task_id = %task_id, "Attempted to complete unknown task");
421 Err(anyhow::anyhow!("task not found"))
422 }
423 }
424 }
425
426 pub fn fail_task(&self, task_id: Uuid, error: &str) -> Result<()> {
430 let task = {
431 let mut running = self.running.lock();
432 running.remove(&task_id)
433 };
434
435 match task {
436 Some(mut t) => {
437 t.status = TaskStatus::Failed;
438 t.error = Some(error.to_string());
439
440 {
442 let mut start_times = self.task_start_times.lock();
443 start_times.remove(&task_id);
444 }
445
446 tracing::warn!(task_id = %task_id, error = %error, "Task failed");
447 Ok(())
448 }
449 None => {
450 tracing::warn!(task_id = %task_id, "Attempted to fail unknown task");
451 Err(anyhow::anyhow!("task not found"))
452 }
453 }
454 }
455
456 pub fn reap_zombies(&self) -> Vec<Uuid> {
460 let now = Utc::now();
461 let timeout = chrono::Duration::seconds(
462 self.zombie_timeout_secs
463 .load(std::sync::atomic::Ordering::Relaxed) as i64,
464 );
465 let mut start_times = self.task_start_times.lock();
466 let mut running = self.running.lock();
467 let mut reaped = Vec::new();
468
469 let zombie_ids: Vec<Uuid> = start_times
470 .iter()
471 .filter(|(_, start)| now - **start > timeout)
472 .map(|(id, _)| *id)
473 .collect();
474
475 for id in zombie_ids {
476 if let Some(mut task) = running.remove(&id) {
477 task.status = TaskStatus::Failed;
478 task.error = Some(format!(
479 "zombie: ran for >{} seconds",
480 self.zombie_timeout_secs
481 .load(std::sync::atomic::Ordering::Relaxed)
482 ));
483 reaped.push(id);
484 tracing::warn!(
485 task_id = %id,
486 duration_secs = self.zombie_timeout_secs.load(std::sync::atomic::Ordering::Relaxed),
487 "Zombie task reaped"
488 );
489 }
490 start_times.remove(&id);
492 }
493
494 reaped
495 }
496
497 pub fn start_task(&self, task_id: Uuid) -> Result<()> {
502 let task = {
503 let mut queue = self.queue.lock();
504 let all: Vec<ScheduledTask> = queue.drain().collect();
505 let mut found: Option<ScheduledTask> = None;
506 let remaining: Vec<ScheduledTask> = all
507 .into_iter()
508 .filter(|t| {
509 if t.id == task_id {
510 found = Some(t.clone());
511 false
512 } else {
513 true
514 }
515 })
516 .collect();
517 *queue = remaining.into_iter().collect();
518 found
519 };
520
521 match task {
522 Some(mut task) => {
523 task.status = TaskStatus::Running;
524 let mut start_times = self.task_start_times.lock();
525 start_times.insert(task.id, Utc::now());
526 let mut running = self.running.lock();
527 running.insert(task.id, task);
528 Ok(())
529 }
530 None => Err(anyhow::anyhow!("task {task_id} not found in queue")),
531 }
532 }
533
534 pub fn cancel_task(&self, task_id: Uuid) -> Result<()> {
538 let mut queue = self.queue.lock();
539 let all: Vec<ScheduledTask> = queue.drain().collect();
540 let mut found = false;
541 let remaining: Vec<ScheduledTask> = all
542 .into_iter()
543 .filter(|t| {
544 if t.id == task_id && t.status == TaskStatus::Queued {
545 found = true;
546 false
547 } else {
548 true
549 }
550 })
551 .collect();
552 *queue = remaining.into_iter().collect();
553
554 if found {
555 tracing::info!(task_id = %task_id, "Task cancelled from queue");
556 Ok(())
557 } else {
558 tracing::warn!(task_id = %task_id, "Task not found in queue for cancellation");
559 Err(anyhow::anyhow!("task not found in queue"))
560 }
561 }
562
563 pub fn stats(&self) -> SchedulerStats {
565 let queue = self.queue.lock();
566 let running = self.running.lock();
567 let rate_limiter = self.rate_limiter.lock();
568
569 let _completed = 0usize;
570 let _failed = 0usize;
571
572 SchedulerStats {
573 queued: queue.len(),
574 running: running.len(),
575 completed: _completed,
576 failed: _failed,
577 max_concurrent: self
578 .max_concurrent
579 .load(std::sync::atomic::Ordering::Relaxed),
580 rate_limit_per_minute: rate_limiter.max_requests,
581 rate_remaining: rate_limiter.remaining(),
582 }
583 }
584
585 pub fn rate_limit_remaining(&self) -> u32 {
587 self.rate_limiter.lock().remaining()
588 }
589
590 pub fn queued_tasks(&self) -> Vec<ScheduledTask> {
592 let heap = self.queue.lock();
593 let mut tasks: Vec<ScheduledTask> = heap.iter().cloned().collect();
594 tasks.sort_by_key(|a| a.priority);
597 tasks
598 }
599
600 pub fn running_tasks(&self) -> Vec<ScheduledTask> {
602 self.running.lock().values().cloned().collect()
603 }
604}
605
606impl Default for AgentScheduler {
607 fn default() -> Self {
608 Self::new(5, 60, 300)
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615 use std::thread;
616 use std::time::Duration;
617
618 #[test]
619 fn test_task_creation() {
620 let task = ScheduledTask::new("Test task".into(), Priority::Normal);
621 assert_eq!(task.status, TaskStatus::Queued);
622 assert!(task.agent_id.is_none());
623 assert!(!task.error.is_some());
624 }
625
626 #[test]
627 fn test_task_creation_for_agent() {
628 let agent_id = AgentId::new_v4();
629 let task = ScheduledTask::for_agent(agent_id, "Agent task".into(), Priority::High);
630 assert_eq!(task.agent_id, Some(agent_id));
631 assert_eq!(task.priority, Priority::High);
632 }
633
634 #[test]
635 fn test_priority_ordering() {
636 assert!(Priority::Critical > Priority::High);
637 assert!(Priority::High > Priority::Normal);
638 assert!(Priority::Normal > Priority::Low);
639 assert!(Priority::Critical > Priority::Normal);
641 assert!(Priority::Critical > Priority::Low);
642 assert!(Priority::High > Priority::Low);
643 }
644
645 #[test]
646 fn test_priority_ordering_eq() {
647 assert_eq!(Priority::Low, Priority::Low);
648 assert_eq!(Priority::Normal, Priority::Normal);
649 assert_eq!(Priority::High, Priority::High);
650 assert_eq!(Priority::Critical, Priority::Critical);
651 }
652
653 #[test]
654 fn test_submit_and_next_high_priority_first() {
655 let scheduler = AgentScheduler::new(10, 10_000, 60);
656
657 scheduler
658 .submit(ScheduledTask::new("Low priority".into(), Priority::Low))
659 .unwrap();
660 scheduler
661 .submit(ScheduledTask::new("High priority".into(), Priority::High))
662 .unwrap();
663 scheduler
664 .submit(ScheduledTask::new(
665 "Normal priority".into(),
666 Priority::Normal,
667 ))
668 .unwrap();
669
670 let next = scheduler.next_task().unwrap();
672 assert_eq!(next.priority, Priority::High);
673
674 let next = scheduler.next_task().unwrap();
676 assert_eq!(next.priority, Priority::Normal);
677
678 let next = scheduler.next_task().unwrap();
680 assert_eq!(next.priority, Priority::Low);
681 }
682
683 #[test]
684 fn test_submit_and_next_critical_first() {
685 let scheduler = AgentScheduler::new(10, 10_000, 60);
686
687 scheduler
688 .submit(ScheduledTask::new("Low".into(), Priority::Low))
689 .unwrap();
690 scheduler
691 .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
692 .unwrap();
693 scheduler
694 .submit(ScheduledTask::new("High".into(), Priority::High))
695 .unwrap();
696 scheduler
697 .submit(ScheduledTask::new("Critical".into(), Priority::Critical))
698 .unwrap();
699
700 let next = scheduler.next_task().unwrap();
702 assert_eq!(next.priority, Priority::Critical);
703 let next = scheduler.next_task().unwrap();
705 assert_eq!(next.priority, Priority::High);
706 let next = scheduler.next_task().unwrap();
708 assert_eq!(next.priority, Priority::Normal);
709 let next = scheduler.next_task().unwrap();
711 assert_eq!(next.priority, Priority::Low);
712 }
713
714 #[test]
715 fn test_submit_multiple_same_priority() {
716 let scheduler = AgentScheduler::new(10, 10_000, 60);
717
718 scheduler
721 .submit(ScheduledTask::new("First".into(), Priority::Normal))
722 .unwrap();
723 scheduler
724 .submit(ScheduledTask::new("Second".into(), Priority::Normal))
725 .unwrap();
726 scheduler
727 .submit(ScheduledTask::new("Third".into(), Priority::Normal))
728 .unwrap();
729
730 let mut descriptions = Vec::new();
732 for _ in 0..3 {
733 let next = scheduler.next_task().unwrap();
734 assert_eq!(next.priority, Priority::Normal);
735 descriptions.push(next.description);
736 }
737 descriptions.sort();
738 assert_eq!(descriptions, vec!["First", "Second", "Third"]);
739 }
740
741 #[test]
742 fn test_max_concurrent_blocks() {
743 let scheduler = AgentScheduler::new(2, 10_000, 60);
744
745 scheduler
746 .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
747 .unwrap();
748 scheduler
749 .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
750 .unwrap();
751 scheduler
752 .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
753 .unwrap();
754
755 assert!(scheduler.next_task().is_some());
756 assert!(scheduler.next_task().is_some());
757 assert!(scheduler.next_task().is_none());
759 }
760
761 #[test]
762 fn test_max_concurrent_allows_when_slot_frees() {
763 let scheduler = AgentScheduler::new(2, 10_000, 60); let _ = scheduler
766 .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
767 .unwrap();
768 let _id2 = scheduler
769 .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
770 .unwrap();
771 let t1 = scheduler.next_task().unwrap(); let t2 = scheduler.next_task().unwrap(); assert!(scheduler.next_task().is_none()); scheduler.complete_task(t1.id).unwrap();
781 scheduler.complete_task(t2.id).unwrap();
782
783 let _id3 = scheduler
785 .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
786 .unwrap();
787
788 let task = scheduler.next_task().unwrap();
790 assert_eq!(task.description, "Task 3");
791
792 scheduler.complete_task(task.id).unwrap();
794 }
795
796 #[test]
797 fn test_complete_task_removes_from_running() {
798 let scheduler = AgentScheduler::new(2, 10_000, 60);
799 let task = ScheduledTask::new("Test".into(), Priority::Normal);
800 let id = scheduler.submit(task).unwrap();
801
802 let _ = scheduler.next_task();
803 scheduler.complete_task(id).unwrap();
804
805 let stats = scheduler.stats();
806 assert_eq!(stats.running, 0);
807 }
808
809 #[test]
810 fn test_complete_unknown_task_returns_error() {
811 let scheduler = AgentScheduler::new(2, 10_000, 60);
812 let result = scheduler.complete_task(Uuid::new_v4());
813 assert!(result.is_err());
814 }
815
816 #[test]
817 fn test_fail_task_sets_error() {
818 let scheduler = AgentScheduler::new(2, 10_000, 60);
819 let task = ScheduledTask::new("Test".into(), Priority::Normal);
820 let id = scheduler.submit(task).unwrap();
821
822 let _ = scheduler.next_task();
823 scheduler.fail_task(id, "Something went wrong").unwrap();
824
825 let running = scheduler.running.lock();
826 assert!(!running.contains_key(&id));
827 }
828
829 #[test]
830 fn test_cancel_queued_task() {
831 let scheduler = AgentScheduler::new(2, 10_000, 60);
832 let id = scheduler
833 .submit(ScheduledTask::new("To cancel".into(), Priority::Normal))
834 .unwrap();
835
836 scheduler.cancel_task(id).unwrap();
837
838 assert!(scheduler.next_task().is_none());
840 }
841
842 #[test]
843 fn test_cancel_running_task_fails() {
844 let scheduler = AgentScheduler::new(2, 10_000, 60);
845 let id = scheduler
846 .submit(ScheduledTask::new("Running".into(), Priority::Normal))
847 .unwrap();
848
849 let _ = scheduler.next_task(); let result = scheduler.cancel_task(id);
853 assert!(result.is_err());
854 }
855
856 #[test]
857 fn test_cancel_unknown_task_fails() {
858 let scheduler = AgentScheduler::new(2, 10_000, 60);
859 let result = scheduler.cancel_task(Uuid::new_v4());
860 assert!(result.is_err());
861 }
862
863 #[test]
864 fn test_stats_tracking() {
865 let scheduler = AgentScheduler::new(2, 60, 60);
866
867 let id1 = scheduler
868 .submit(ScheduledTask::new("Queued".into(), Priority::Normal))
869 .unwrap();
870 scheduler
871 .submit(ScheduledTask::new("Queued 2".into(), Priority::Low))
872 .unwrap();
873
874 let started = scheduler.next_task().unwrap();
876 assert_eq!(started.id, id1);
877
878 let stats = scheduler.stats();
879 assert_eq!(stats.queued, 1); assert_eq!(stats.running, 1);
881 assert_eq!(stats.max_concurrent, 2);
882 assert_eq!(stats.rate_limit_per_minute, 60);
883 }
884
885 #[test]
886 fn test_reap_zombies() {
887 let scheduler = AgentScheduler::new(2, 10_000, 1); let id = scheduler
892 .submit(ScheduledTask::new("Zombie".into(), Priority::Normal))
893 .unwrap();
894 let _ = scheduler.next_task();
895
896 thread::sleep(Duration::from_millis(1_100));
898
899 let reaped = scheduler.reap_zombies();
901 assert!(reaped.contains(&id));
902
903 assert!(scheduler.running.lock().get(&id).is_none());
905 }
906
907 #[test]
908 fn test_reap_zombies_no_zombies() {
909 let scheduler = AgentScheduler::new(2, 10_000, 60); let id = scheduler
912 .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
913 .unwrap();
914 let _ = scheduler.next_task();
915
916 let reaped = scheduler.reap_zombies();
918 assert!(reaped.is_empty());
919
920 assert!(scheduler.running.lock().get(&id).is_some());
922 }
923
924 #[test]
925 fn test_rate_limiter_basic() {
926 let mut limiter = RateLimiter::new(60, 3); assert!(limiter.allow());
929 assert!(limiter.allow());
930 assert!(limiter.allow());
931 assert!(!limiter.allow());
933 }
934
935 #[test]
936 fn test_rate_limiter_remaining() {
937 let limiter = RateLimiter::new(60, 3);
938
939 assert_eq!(limiter.remaining(), 3);
940
941 let mut limiter = RateLimiter::new(60, 3);
942 limiter.allow();
943 limiter.allow();
944 assert_eq!(limiter.remaining(), 1);
945 }
946
947 #[test]
948 fn test_rate_limiter_tracks_per_scheduler() {
949 let scheduler = AgentScheduler::new(10, 5, 60); for i in 0..5 {
953 scheduler
954 .submit(ScheduledTask::new(format!("T{}", i), Priority::Normal))
955 .unwrap();
956 let _ = scheduler.next_task();
957 }
958
959 assert!(scheduler.next_task().is_none());
961 assert_eq!(scheduler.rate_limit_remaining(), 0);
962 }
963
964 #[test]
965 fn test_queued_tasks_inspection() {
966 let scheduler = AgentScheduler::new(2, 10_000, 60);
967
968 scheduler
969 .submit(ScheduledTask::new("A".into(), Priority::Low))
970 .unwrap();
971 scheduler
972 .submit(ScheduledTask::new("B".into(), Priority::High))
973 .unwrap();
974 scheduler
975 .submit(ScheduledTask::new("C".into(), Priority::Normal))
976 .unwrap();
977
978 let queued = scheduler.queued_tasks();
979 assert_eq!(queued.len(), 3);
980 assert_eq!(queued.last().unwrap().description, "B");
983 }
984
985 #[test]
986 fn test_running_tasks_inspection() {
987 let scheduler = AgentScheduler::new(2, 10_000, 60);
988
989 scheduler
990 .submit(ScheduledTask::new("R1".into(), Priority::Normal))
991 .unwrap();
992 scheduler
993 .submit(ScheduledTask::new("R2".into(), Priority::Normal))
994 .unwrap();
995
996 let _ = scheduler.next_task();
997 let _ = scheduler.next_task();
998
999 let running = scheduler.running_tasks();
1000 assert_eq!(running.len(), 2);
1001 }
1002
1003 #[test]
1004 fn test_default_scheduler() {
1005 let scheduler = AgentScheduler::default();
1006 let stats = scheduler.stats();
1007 assert_eq!(stats.max_concurrent, 5);
1008 assert_eq!(stats.rate_limit_per_minute, 60);
1009 }
1010
1011 #[test]
1012 fn test_budget_manager_integration_skips_exhausted_agent() {
1013 use crate::budget::{BudgetLimit, BudgetManager};
1014
1015 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1016 let budget_manager = Arc::new(BudgetManager::new());
1017
1018 let agent_id = AgentId::new_v4();
1020 budget_manager.set_budget(BudgetLimit {
1021 agent_id,
1022 token_budget: 1000,
1023 calls_budget: 1,
1024 window_secs: 60,
1025 });
1026
1027 scheduler
1029 .lock()
1030 .set_budget_manager(Arc::clone(&budget_manager));
1031
1032 scheduler
1034 .lock()
1035 .submit(ScheduledTask::for_agent(
1036 agent_id,
1037 "Task 1".into(),
1038 Priority::Normal,
1039 ))
1040 .unwrap();
1041 scheduler
1042 .lock()
1043 .submit(ScheduledTask::for_agent(
1044 agent_id,
1045 "Task 2".into(),
1046 Priority::Normal,
1047 ))
1048 .unwrap();
1049
1050 let task1 = scheduler.lock().next_task();
1052 assert!(task1.is_some());
1053 scheduler.lock().complete_task(task1.unwrap().id).unwrap();
1054
1055 let task2 = scheduler.lock().next_task();
1057 assert!(task2.is_none());
1058 }
1059
1060 #[test]
1061 fn test_budget_manager_allows_different_agents() {
1062 use crate::budget::{BudgetLimit, BudgetManager};
1063
1064 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1065 let budget_manager = Arc::new(BudgetManager::new());
1066
1067 let agent1 = AgentId::new_v4();
1068 let agent2 = AgentId::new_v4();
1069
1070 for agent_id in [&agent1, &agent2] {
1072 budget_manager.set_budget(BudgetLimit {
1073 agent_id: *agent_id,
1074 token_budget: 1000,
1075 calls_budget: 3,
1076 window_secs: 60,
1077 });
1078 }
1079
1080 scheduler
1081 .lock()
1082 .set_budget_manager(Arc::clone(&budget_manager));
1083
1084 scheduler
1086 .lock()
1087 .submit(ScheduledTask::for_agent(
1088 agent1,
1089 "A1".into(),
1090 Priority::Normal,
1091 ))
1092 .unwrap();
1093 scheduler
1094 .lock()
1095 .submit(ScheduledTask::for_agent(
1096 agent2,
1097 "B1".into(),
1098 Priority::Normal,
1099 ))
1100 .unwrap();
1101
1102 let t1 = scheduler.lock().next_task().unwrap();
1104 let t2 = scheduler.lock().next_task().unwrap();
1105 assert_ne!(t1.description, t2.description);
1106 }
1107
1108 #[test]
1109 fn test_budget_manager_task_without_agent_id() {
1110 use crate::budget::BudgetManager;
1111
1112 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1113 let budget_manager = Arc::new(BudgetManager::new());
1114
1115 scheduler
1116 .lock()
1117 .set_budget_manager(Arc::clone(&budget_manager));
1118
1119 scheduler
1121 .lock()
1122 .submit(ScheduledTask::new("No agent".into(), Priority::Normal))
1123 .unwrap();
1124
1125 let task = scheduler.lock().next_task();
1127 assert!(task.is_some());
1128 }
1129
1130 #[test]
1131 fn test_budget_manager_not_set_skips_check() {
1132 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1133 scheduler
1136 .lock()
1137 .submit(ScheduledTask::new("Any task".into(), Priority::Normal))
1138 .unwrap();
1139
1140 let task = scheduler.lock().next_task();
1142 assert!(task.is_some());
1143 }
1144}