1use crate::budget::BudgetManager;
10use crate::types::AgentId;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use std::collections::{BinaryHeap, HashMap};
16use std::sync::Arc;
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
21pub enum Priority {
22 Low = 0,
24 #[default]
26 Normal = 1,
27 High = 2,
29 Critical = 3,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum TaskStatus {
36 Queued,
38 Running,
40 Completed,
42 Failed,
44 Cancelled,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ScheduledTask {
51 pub id: Uuid,
53 pub agent_id: Option<AgentId>,
55 pub description: String,
57 pub priority: Priority,
59 pub created_at: DateTime<Utc>,
61 pub status: TaskStatus,
63 #[serde(skip_serializing_if = "Option::is_none")]
65 pub error: Option<String>,
66}
67
68impl PartialOrd for ScheduledTask {
69 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
70 Some(self.cmp(other))
71 }
72}
73
74impl Ord for ScheduledTask {
75 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
76 self.priority
79 .cmp(&other.priority)
80 .then_with(|| other.created_at.cmp(&self.created_at))
81 }
82}
83
84impl ScheduledTask {
85 pub fn new(description: String, priority: Priority) -> Self {
87 Self {
88 id: Uuid::new_v4(),
89 agent_id: None,
90 description,
91 priority,
92 created_at: Utc::now(),
93 status: TaskStatus::Queued,
94 error: None,
95 }
96 }
97
98 pub fn for_agent(agent_id: AgentId, description: String, priority: Priority) -> Self {
100 Self {
101 id: Uuid::new_v4(),
102 agent_id: Some(agent_id),
103 description,
104 priority,
105 created_at: Utc::now(),
106 status: TaskStatus::Queued,
107 error: None,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct SchedulerStats {
115 pub queued: usize,
117 pub running: usize,
119 pub completed: usize,
121 pub failed: usize,
123 pub max_concurrent: usize,
125 pub rate_limit_per_minute: u32,
127}
128
129impl Default for SchedulerStats {
130 fn default() -> Self {
131 Self {
132 queued: 0,
133 running: 0,
134 completed: 0,
135 failed: 0,
136 max_concurrent: 5,
137 rate_limit_per_minute: 60,
138 }
139 }
140}
141
142#[derive(Debug, Clone)]
144struct RateLimiter {
145 window: Vec<DateTime<Utc>>,
147 window_secs: u64,
149 max_requests: u32,
151}
152
153impl RateLimiter {
154 fn new(window_secs: u64, max_requests: u32) -> Self {
155 Self {
156 window: Vec::new(),
157 window_secs,
158 max_requests,
159 }
160 }
161
162 fn allow(&mut self) -> bool {
164 let now = Utc::now();
165 let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
166
167 self.window.retain(|t| *t > cutoff);
169
170 if self.window.len() >= self.max_requests as usize {
171 return false;
172 }
173
174 self.window.push(now);
175 true
176 }
177
178 fn remaining(&self) -> u32 {
180 let now = Utc::now();
181 let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
182 let active = self.window.iter().filter(|t| **t > cutoff).count();
183 self.max_requests.saturating_sub(active as u32)
184 }
185}
186
187pub struct AgentScheduler {
192 queue: Arc<Mutex<BinaryHeap<ScheduledTask>>>,
194 running: Arc<Mutex<HashMap<Uuid, ScheduledTask>>>,
196 max_concurrent: usize,
198 rate_limiter: Arc<Mutex<RateLimiter>>,
200 zombie_timeout_secs: u64,
202 task_start_times: Arc<Mutex<HashMap<Uuid, DateTime<Utc>>>>,
204 budget_manager: Option<Arc<BudgetManager>>,
206}
207
208impl AgentScheduler {
209 pub fn new(
216 max_concurrent: usize,
217 rate_limit_per_minute: u32,
218 zombie_timeout_secs: u64,
219 ) -> Self {
220 Self {
221 queue: Arc::new(Mutex::new(BinaryHeap::new())),
222 running: Arc::new(Mutex::new(HashMap::new())),
223 max_concurrent,
224 rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60, rate_limit_per_minute))),
225 zombie_timeout_secs,
226 task_start_times: Arc::new(Mutex::new(HashMap::new())),
227 budget_manager: None,
228 }
229 }
230
231 pub fn set_budget_manager(&mut self, bm: Arc<BudgetManager>) {
239 self.budget_manager = Some(bm);
240 }
241
242 pub fn submit(&self, mut task: ScheduledTask) -> Result<Uuid> {
246 task.status = TaskStatus::Queued;
247 let id = task.id;
248
249 let mut queue = self.queue.lock();
250 queue.push(task); tracing::debug!(
253 task_id = %id,
254 queue_len = queue.len(),
255 "Task submitted to scheduler"
256 );
257
258 Ok(id)
259 }
260
261 pub fn next_task(&self) -> Option<ScheduledTask> {
268 {
270 let running = self.running.lock();
271 if running.len() >= self.max_concurrent {
272 tracing::debug!(
273 running = running.len(),
274 max = self.max_concurrent,
275 "Max concurrent limit reached"
276 );
277 return None;
278 }
279 }
280
281 {
283 let mut limiter = self.rate_limiter.lock();
284 if !limiter.allow() {
285 tracing::debug!(remaining = limiter.remaining(), "Rate limit exceeded");
286 return None;
287 }
288 }
289
290 let mut discarded: usize = 0;
292 let mut task = loop {
293 let task_opt = {
294 let mut queue = self.queue.lock();
295 queue.pop() };
297
298 match task_opt {
299 Some(t) => {
300 if let (Some(ref bm), Some(ref agent_id)) = (&self.budget_manager, &t.agent_id)
302 {
303 if !bm.can_schedule(agent_id) {
304 tracing::warn!(
305 agent_id = %agent_id,
306 "Agent budget exhausted, skipping task"
307 );
308 discarded += 1;
309 continue; }
311 }
312 break t;
313 }
314 None => {
315 if discarded > 0 {
316 tracing::info!(discarded, "All queued tasks had exhausted budgets");
317 }
318 return None;
319 }
320 }
321 };
322
323 if discarded > 0 {
324 tracing::info!(discarded, "Skipped tasks with exhausted budgets");
325 }
326
327 task.status = TaskStatus::Running;
328
329 {
331 let mut start_times = self.task_start_times.lock();
332 start_times.insert(task.id, Utc::now());
333 }
334
335 {
337 let mut running = self.running.lock();
338 running.insert(task.id, task.clone());
339 }
340
341 tracing::info!(
342 task_id = %task.id,
343 priority = ?task.priority,
344 running = self.running.lock().len(),
345 "Task started by scheduler"
346 );
347
348 if let (Some(ref bm), Some(ref agent_id)) = (&self.budget_manager, &task.agent_id) {
350 if let Err(e) = bm.track_call(agent_id) {
351 tracing::warn!(
352 agent_id = %agent_id,
353 error = %e,
354 "Budget exceeded during task track_call"
355 );
356 }
357 }
358
359 Some(task)
360 }
361
362 pub fn complete_task(&self, task_id: Uuid) -> Result<()> {
366 let task = {
367 let mut running = self.running.lock();
368 running.remove(&task_id)
369 };
370
371 match task {
372 Some(mut t) => {
373 t.status = TaskStatus::Completed;
374
375 {
377 let mut start_times = self.task_start_times.lock();
378 start_times.remove(&task_id);
379 }
380
381 tracing::info!(task_id = %task_id, "Task completed");
382 Ok(())
383 }
384 None => {
385 tracing::warn!(task_id = %task_id, "Attempted to complete unknown task");
386 Err(anyhow::anyhow!("task not found"))
387 }
388 }
389 }
390
391 pub fn fail_task(&self, task_id: Uuid, error: &str) -> Result<()> {
395 let task = {
396 let mut running = self.running.lock();
397 running.remove(&task_id)
398 };
399
400 match task {
401 Some(mut t) => {
402 t.status = TaskStatus::Failed;
403 t.error = Some(error.to_string());
404
405 {
407 let mut start_times = self.task_start_times.lock();
408 start_times.remove(&task_id);
409 }
410
411 tracing::warn!(task_id = %task_id, error = %error, "Task failed");
412 Ok(())
413 }
414 None => {
415 tracing::warn!(task_id = %task_id, "Attempted to fail unknown task");
416 Err(anyhow::anyhow!("task not found"))
417 }
418 }
419 }
420
421 pub fn reap_zombies(&self) -> Vec<Uuid> {
425 let now = Utc::now();
426 let timeout = chrono::Duration::seconds(self.zombie_timeout_secs as i64);
427 let mut start_times = self.task_start_times.lock();
428 let mut running = self.running.lock();
429 let mut reaped = Vec::new();
430
431 let zombie_ids: Vec<Uuid> = start_times
432 .iter()
433 .filter(|(_, start)| now - **start > timeout)
434 .map(|(id, _)| *id)
435 .collect();
436
437 for id in zombie_ids {
438 if let Some(mut task) = running.remove(&id) {
439 task.status = TaskStatus::Failed;
440 task.error = Some(format!(
441 "zombie: ran for >{} seconds",
442 self.zombie_timeout_secs
443 ));
444 reaped.push(id);
445 tracing::warn!(
446 task_id = %id,
447 duration_secs = self.zombie_timeout_secs,
448 "Zombie task reaped"
449 );
450 }
451 start_times.remove(&id);
453 }
454
455 reaped
456 }
457
458 pub fn start_task(&self, task_id: Uuid) -> Result<()> {
463 let task = {
464 let mut queue = self.queue.lock();
465 let all: Vec<ScheduledTask> = queue.drain().collect();
466 let mut found: Option<ScheduledTask> = None;
467 let remaining: Vec<ScheduledTask> = all
468 .into_iter()
469 .filter(|t| {
470 if t.id == task_id {
471 found = Some(t.clone());
472 false
473 } else {
474 true
475 }
476 })
477 .collect();
478 *queue = remaining.into_iter().collect();
479 found
480 };
481
482 match task {
483 Some(mut task) => {
484 task.status = TaskStatus::Running;
485 let mut start_times = self.task_start_times.lock();
486 start_times.insert(task.id, Utc::now());
487 let mut running = self.running.lock();
488 running.insert(task.id, task);
489 Ok(())
490 }
491 None => Err(anyhow::anyhow!("task {} not found in queue", task_id)),
492 }
493 }
494
495 pub fn cancel_task(&self, task_id: Uuid) -> Result<()> {
499 let mut queue = self.queue.lock();
500 let all: Vec<ScheduledTask> = queue.drain().collect();
501 let mut found = false;
502 let remaining: Vec<ScheduledTask> = all
503 .into_iter()
504 .filter(|t| {
505 if t.id == task_id && t.status == TaskStatus::Queued {
506 found = true;
507 false
508 } else {
509 true
510 }
511 })
512 .collect();
513 *queue = remaining.into_iter().collect();
514
515 if found {
516 tracing::info!(task_id = %task_id, "Task cancelled from queue");
517 Ok(())
518 } else {
519 tracing::warn!(task_id = %task_id, "Task not found in queue for cancellation");
520 Err(anyhow::anyhow!("task not found in queue"))
521 }
522 }
523
524 pub fn stats(&self) -> SchedulerStats {
526 let queue = self.queue.lock();
527 let running = self.running.lock();
528 let rate_limiter = self.rate_limiter.lock();
529
530 let _completed = 0usize;
531 let _failed = 0usize;
532
533 SchedulerStats {
534 queued: queue.len(),
535 running: running.len(),
536 completed: _completed,
537 failed: _failed,
538 max_concurrent: self.max_concurrent,
539 rate_limit_per_minute: rate_limiter.max_requests,
540 }
541 }
542
543 pub fn rate_limit_remaining(&self) -> u32 {
545 self.rate_limiter.lock().remaining()
546 }
547
548 pub fn queued_tasks(&self) -> Vec<ScheduledTask> {
550 let heap = self.queue.lock();
551 let mut tasks: Vec<ScheduledTask> = heap.iter().cloned().collect();
552 tasks.sort_by_key(|a| a.priority);
555 tasks
556 }
557
558 pub fn running_tasks(&self) -> Vec<ScheduledTask> {
560 self.running.lock().values().cloned().collect()
561 }
562}
563
564impl Default for AgentScheduler {
565 fn default() -> Self {
566 Self::new(5, 60, 300)
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use super::*;
573 use std::thread;
574 use std::time::Duration;
575
576 #[test]
577 fn test_task_creation() {
578 let task = ScheduledTask::new("Test task".into(), Priority::Normal);
579 assert_eq!(task.status, TaskStatus::Queued);
580 assert!(task.agent_id.is_none());
581 assert!(!task.error.is_some());
582 }
583
584 #[test]
585 fn test_task_creation_for_agent() {
586 let agent_id = AgentId::new_v4();
587 let task = ScheduledTask::for_agent(agent_id, "Agent task".into(), Priority::High);
588 assert_eq!(task.agent_id, Some(agent_id));
589 assert_eq!(task.priority, Priority::High);
590 }
591
592 #[test]
593 fn test_priority_ordering() {
594 assert!(Priority::Critical > Priority::High);
595 assert!(Priority::High > Priority::Normal);
596 assert!(Priority::Normal > Priority::Low);
597 assert!(Priority::Critical > Priority::Normal);
599 assert!(Priority::Critical > Priority::Low);
600 assert!(Priority::High > Priority::Low);
601 }
602
603 #[test]
604 fn test_priority_ordering_eq() {
605 assert_eq!(Priority::Low, Priority::Low);
606 assert_eq!(Priority::Normal, Priority::Normal);
607 assert_eq!(Priority::High, Priority::High);
608 assert_eq!(Priority::Critical, Priority::Critical);
609 }
610
611 #[test]
612 fn test_submit_and_next_high_priority_first() {
613 let scheduler = AgentScheduler::new(10, 10_000, 60);
614
615 scheduler
616 .submit(ScheduledTask::new("Low priority".into(), Priority::Low))
617 .unwrap();
618 scheduler
619 .submit(ScheduledTask::new("High priority".into(), Priority::High))
620 .unwrap();
621 scheduler
622 .submit(ScheduledTask::new(
623 "Normal priority".into(),
624 Priority::Normal,
625 ))
626 .unwrap();
627
628 let next = scheduler.next_task().unwrap();
630 assert_eq!(next.priority, Priority::High);
631
632 let next = scheduler.next_task().unwrap();
634 assert_eq!(next.priority, Priority::Normal);
635
636 let next = scheduler.next_task().unwrap();
638 assert_eq!(next.priority, Priority::Low);
639 }
640
641 #[test]
642 fn test_submit_and_next_critical_first() {
643 let scheduler = AgentScheduler::new(10, 10_000, 60);
644
645 scheduler
646 .submit(ScheduledTask::new("Low".into(), Priority::Low))
647 .unwrap();
648 scheduler
649 .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
650 .unwrap();
651 scheduler
652 .submit(ScheduledTask::new("High".into(), Priority::High))
653 .unwrap();
654 scheduler
655 .submit(ScheduledTask::new("Critical".into(), Priority::Critical))
656 .unwrap();
657
658 let next = scheduler.next_task().unwrap();
660 assert_eq!(next.priority, Priority::Critical);
661 let next = scheduler.next_task().unwrap();
663 assert_eq!(next.priority, Priority::High);
664 let next = scheduler.next_task().unwrap();
666 assert_eq!(next.priority, Priority::Normal);
667 let next = scheduler.next_task().unwrap();
669 assert_eq!(next.priority, Priority::Low);
670 }
671
672 #[test]
673 fn test_submit_multiple_same_priority() {
674 let scheduler = AgentScheduler::new(10, 10_000, 60);
675
676 scheduler
679 .submit(ScheduledTask::new("First".into(), Priority::Normal))
680 .unwrap();
681 scheduler
682 .submit(ScheduledTask::new("Second".into(), Priority::Normal))
683 .unwrap();
684 scheduler
685 .submit(ScheduledTask::new("Third".into(), Priority::Normal))
686 .unwrap();
687
688 let mut descriptions = Vec::new();
690 for _ in 0..3 {
691 let next = scheduler.next_task().unwrap();
692 assert_eq!(next.priority, Priority::Normal);
693 descriptions.push(next.description);
694 }
695 descriptions.sort();
696 assert_eq!(descriptions, vec!["First", "Second", "Third"]);
697 }
698
699 #[test]
700 fn test_max_concurrent_blocks() {
701 let scheduler = AgentScheduler::new(2, 10_000, 60);
702
703 scheduler
704 .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
705 .unwrap();
706 scheduler
707 .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
708 .unwrap();
709 scheduler
710 .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
711 .unwrap();
712
713 assert!(scheduler.next_task().is_some());
714 assert!(scheduler.next_task().is_some());
715 assert!(scheduler.next_task().is_none());
717 }
718
719 #[test]
720 fn test_max_concurrent_allows_when_slot_frees() {
721 let scheduler = AgentScheduler::new(2, 10_000, 60); let _ = scheduler
724 .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
725 .unwrap();
726 let _id2 = scheduler
727 .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
728 .unwrap();
729 let t1 = scheduler.next_task().unwrap(); let t2 = scheduler.next_task().unwrap(); assert!(scheduler.next_task().is_none()); scheduler.complete_task(t1.id).unwrap();
739 scheduler.complete_task(t2.id).unwrap();
740
741 let _id3 = scheduler
743 .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
744 .unwrap();
745
746 let task = scheduler.next_task().unwrap();
748 assert_eq!(task.description, "Task 3");
749
750 scheduler.complete_task(task.id).unwrap();
752 }
753
754 #[test]
755 fn test_complete_task_removes_from_running() {
756 let scheduler = AgentScheduler::new(2, 10_000, 60);
757 let task = ScheduledTask::new("Test".into(), Priority::Normal);
758 let id = scheduler.submit(task).unwrap();
759
760 let _ = scheduler.next_task();
761 scheduler.complete_task(id).unwrap();
762
763 let stats = scheduler.stats();
764 assert_eq!(stats.running, 0);
765 }
766
767 #[test]
768 fn test_complete_unknown_task_returns_error() {
769 let scheduler = AgentScheduler::new(2, 10_000, 60);
770 let result = scheduler.complete_task(Uuid::new_v4());
771 assert!(result.is_err());
772 }
773
774 #[test]
775 fn test_fail_task_sets_error() {
776 let scheduler = AgentScheduler::new(2, 10_000, 60);
777 let task = ScheduledTask::new("Test".into(), Priority::Normal);
778 let id = scheduler.submit(task).unwrap();
779
780 let _ = scheduler.next_task();
781 scheduler.fail_task(id, "Something went wrong").unwrap();
782
783 let running = scheduler.running.lock();
784 assert!(!running.contains_key(&id));
785 }
786
787 #[test]
788 fn test_cancel_queued_task() {
789 let scheduler = AgentScheduler::new(2, 10_000, 60);
790 let id = scheduler
791 .submit(ScheduledTask::new("To cancel".into(), Priority::Normal))
792 .unwrap();
793
794 scheduler.cancel_task(id).unwrap();
795
796 assert!(scheduler.next_task().is_none());
798 }
799
800 #[test]
801 fn test_cancel_running_task_fails() {
802 let scheduler = AgentScheduler::new(2, 10_000, 60);
803 let id = scheduler
804 .submit(ScheduledTask::new("Running".into(), Priority::Normal))
805 .unwrap();
806
807 let _ = scheduler.next_task(); let result = scheduler.cancel_task(id);
811 assert!(result.is_err());
812 }
813
814 #[test]
815 fn test_cancel_unknown_task_fails() {
816 let scheduler = AgentScheduler::new(2, 10_000, 60);
817 let result = scheduler.cancel_task(Uuid::new_v4());
818 assert!(result.is_err());
819 }
820
821 #[test]
822 fn test_stats_tracking() {
823 let scheduler = AgentScheduler::new(2, 60, 60);
824
825 let id1 = scheduler
826 .submit(ScheduledTask::new("Queued".into(), Priority::Normal))
827 .unwrap();
828 scheduler
829 .submit(ScheduledTask::new("Queued 2".into(), Priority::Low))
830 .unwrap();
831
832 let started = scheduler.next_task().unwrap();
834 assert_eq!(started.id, id1);
835
836 let stats = scheduler.stats();
837 assert_eq!(stats.queued, 1); assert_eq!(stats.running, 1);
839 assert_eq!(stats.max_concurrent, 2);
840 assert_eq!(stats.rate_limit_per_minute, 60);
841 }
842
843 #[test]
844 fn test_reap_zombies() {
845 let scheduler = AgentScheduler::new(2, 10_000, 1); let id = scheduler
850 .submit(ScheduledTask::new("Zombie".into(), Priority::Normal))
851 .unwrap();
852 let _ = scheduler.next_task();
853
854 thread::sleep(Duration::from_secs(2));
856
857 let reaped = scheduler.reap_zombies();
859 assert!(reaped.contains(&id));
860
861 assert!(scheduler.running.lock().get(&id).is_none());
863 }
864
865 #[test]
866 fn test_reap_zombies_no_zombies() {
867 let scheduler = AgentScheduler::new(2, 10_000, 60); let id = scheduler
870 .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
871 .unwrap();
872 let _ = scheduler.next_task();
873
874 let reaped = scheduler.reap_zombies();
876 assert!(reaped.is_empty());
877
878 assert!(scheduler.running.lock().get(&id).is_some());
880 }
881
882 #[test]
883 fn test_rate_limiter_basic() {
884 let mut limiter = RateLimiter::new(60, 3); assert!(limiter.allow());
887 assert!(limiter.allow());
888 assert!(limiter.allow());
889 assert!(!limiter.allow());
891 }
892
893 #[test]
894 fn test_rate_limiter_remaining() {
895 let limiter = RateLimiter::new(60, 3);
896
897 assert_eq!(limiter.remaining(), 3);
898
899 let mut limiter = RateLimiter::new(60, 3);
900 limiter.allow();
901 limiter.allow();
902 assert_eq!(limiter.remaining(), 1);
903 }
904
905 #[test]
906 fn test_rate_limiter_tracks_per_scheduler() {
907 let scheduler = AgentScheduler::new(10, 5, 60); for i in 0..5 {
911 scheduler
912 .submit(ScheduledTask::new(format!("T{}", i), Priority::Normal))
913 .unwrap();
914 let _ = scheduler.next_task();
915 }
916
917 assert!(scheduler.next_task().is_none());
919 assert_eq!(scheduler.rate_limit_remaining(), 0);
920 }
921
922 #[test]
923 fn test_queued_tasks_inspection() {
924 let scheduler = AgentScheduler::new(2, 10_000, 60);
925
926 scheduler
927 .submit(ScheduledTask::new("A".into(), Priority::Low))
928 .unwrap();
929 scheduler
930 .submit(ScheduledTask::new("B".into(), Priority::High))
931 .unwrap();
932 scheduler
933 .submit(ScheduledTask::new("C".into(), Priority::Normal))
934 .unwrap();
935
936 let queued = scheduler.queued_tasks();
937 assert_eq!(queued.len(), 3);
938 assert_eq!(queued.last().unwrap().description, "B");
941 }
942
943 #[test]
944 fn test_running_tasks_inspection() {
945 let scheduler = AgentScheduler::new(2, 10_000, 60);
946
947 scheduler
948 .submit(ScheduledTask::new("R1".into(), Priority::Normal))
949 .unwrap();
950 scheduler
951 .submit(ScheduledTask::new("R2".into(), Priority::Normal))
952 .unwrap();
953
954 let _ = scheduler.next_task();
955 let _ = scheduler.next_task();
956
957 let running = scheduler.running_tasks();
958 assert_eq!(running.len(), 2);
959 }
960
961 #[test]
962 fn test_default_scheduler() {
963 let scheduler = AgentScheduler::default();
964 let stats = scheduler.stats();
965 assert_eq!(stats.max_concurrent, 5);
966 assert_eq!(stats.rate_limit_per_minute, 60);
967 }
968
969 #[test]
970 fn test_budget_manager_integration_skips_exhausted_agent() {
971 use crate::budget::{BudgetLimit, BudgetManager};
972
973 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
974 let budget_manager = Arc::new(BudgetManager::new());
975
976 let agent_id = AgentId::new_v4();
978 budget_manager.set_budget(BudgetLimit {
979 agent_id,
980 token_budget: 1000,
981 calls_budget: 1,
982 window_secs: 60,
983 });
984
985 scheduler
987 .lock()
988 .set_budget_manager(Arc::clone(&budget_manager));
989
990 scheduler
992 .lock()
993 .submit(ScheduledTask::for_agent(
994 agent_id,
995 "Task 1".into(),
996 Priority::Normal,
997 ))
998 .unwrap();
999 scheduler
1000 .lock()
1001 .submit(ScheduledTask::for_agent(
1002 agent_id,
1003 "Task 2".into(),
1004 Priority::Normal,
1005 ))
1006 .unwrap();
1007
1008 let task1 = scheduler.lock().next_task();
1010 assert!(task1.is_some());
1011 scheduler.lock().complete_task(task1.unwrap().id).unwrap();
1012
1013 let task2 = scheduler.lock().next_task();
1015 assert!(task2.is_none());
1016 }
1017
1018 #[test]
1019 fn test_budget_manager_allows_different_agents() {
1020 use crate::budget::{BudgetLimit, BudgetManager};
1021
1022 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1023 let budget_manager = Arc::new(BudgetManager::new());
1024
1025 let agent1 = AgentId::new_v4();
1026 let agent2 = AgentId::new_v4();
1027
1028 for agent_id in [&agent1, &agent2] {
1030 budget_manager.set_budget(BudgetLimit {
1031 agent_id: *agent_id,
1032 token_budget: 1000,
1033 calls_budget: 3,
1034 window_secs: 60,
1035 });
1036 }
1037
1038 scheduler
1039 .lock()
1040 .set_budget_manager(Arc::clone(&budget_manager));
1041
1042 scheduler
1044 .lock()
1045 .submit(ScheduledTask::for_agent(
1046 agent1,
1047 "A1".into(),
1048 Priority::Normal,
1049 ))
1050 .unwrap();
1051 scheduler
1052 .lock()
1053 .submit(ScheduledTask::for_agent(
1054 agent2,
1055 "B1".into(),
1056 Priority::Normal,
1057 ))
1058 .unwrap();
1059
1060 let t1 = scheduler.lock().next_task().unwrap();
1062 let t2 = scheduler.lock().next_task().unwrap();
1063 assert_ne!(t1.description, t2.description);
1064 }
1065
1066 #[test]
1067 fn test_budget_manager_task_without_agent_id() {
1068 use crate::budget::{BudgetLimit, BudgetManager};
1069
1070 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1071 let budget_manager = Arc::new(BudgetManager::new());
1072
1073 scheduler
1074 .lock()
1075 .set_budget_manager(Arc::clone(&budget_manager));
1076
1077 scheduler
1079 .lock()
1080 .submit(ScheduledTask::new("No agent".into(), Priority::Normal))
1081 .unwrap();
1082
1083 let task = scheduler.lock().next_task();
1085 assert!(task.is_some());
1086 }
1087
1088 #[test]
1089 fn test_budget_manager_not_set_skips_check() {
1090 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1091 scheduler
1094 .lock()
1095 .submit(ScheduledTask::new("Any task".into(), Priority::Normal))
1096 .unwrap();
1097
1098 let task = scheduler.lock().next_task();
1100 assert!(task.is_some());
1101 }
1102}