1use crate::budget::BudgetManager;
10use crate::types::AgentId;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use std::collections::{BinaryHeap, HashMap};
16use std::sync::Arc;
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
21pub enum Priority {
22 Low = 0,
24 #[default]
26 Normal = 1,
27 High = 2,
29 Critical = 3,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum TaskStatus {
36 Queued,
38 Running,
40 Completed,
42 Failed,
44 Cancelled,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ScheduledTask {
51 pub id: Uuid,
53 pub agent_id: Option<AgentId>,
55 pub description: String,
57 pub priority: Priority,
59 pub created_at: DateTime<Utc>,
61 pub status: TaskStatus,
63 #[serde(skip_serializing_if = "Option::is_none")]
65 pub error: Option<String>,
66}
67
68impl PartialOrd for ScheduledTask {
69 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
70 Some(self.cmp(other))
71 }
72}
73
74impl Ord for ScheduledTask {
75 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
76 self.priority
79 .cmp(&other.priority)
80 .then_with(|| other.created_at.cmp(&self.created_at))
81 }
82}
83
84impl ScheduledTask {
85 pub fn new(description: String, priority: Priority) -> Self {
87 Self {
88 id: Uuid::new_v4(),
89 agent_id: None,
90 description,
91 priority,
92 created_at: Utc::now(),
93 status: TaskStatus::Queued,
94 error: None,
95 }
96 }
97
98 pub fn for_agent(agent_id: AgentId, description: String, priority: Priority) -> Self {
100 Self {
101 id: Uuid::new_v4(),
102 agent_id: Some(agent_id),
103 description,
104 priority,
105 created_at: Utc::now(),
106 status: TaskStatus::Queued,
107 error: None,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct SchedulerStats {
115 pub queued: usize,
117 pub running: usize,
119 pub completed: usize,
121 pub failed: usize,
123 pub max_concurrent: usize,
125 pub rate_limit_per_minute: u32,
127 pub rate_remaining: u32,
129}
130
131impl Default for SchedulerStats {
132 fn default() -> Self {
133 Self {
134 queued: 0,
135 running: 0,
136 completed: 0,
137 failed: 0,
138 max_concurrent: 5,
139 rate_limit_per_minute: 60,
140 rate_remaining: 60,
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
147struct RateLimiter {
148 window: Vec<DateTime<Utc>>,
150 window_secs: u64,
152 max_requests: u32,
154}
155
156impl RateLimiter {
157 fn new(window_secs: u64, max_requests: u32) -> Self {
158 Self {
159 window: Vec::new(),
160 window_secs,
161 max_requests,
162 }
163 }
164
165 fn allow(&mut self) -> bool {
167 let now = Utc::now();
168 let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
169
170 self.window.retain(|t| *t > cutoff);
172
173 if self.window.len() >= self.max_requests as usize {
174 return false;
175 }
176
177 self.window.push(now);
178 true
179 }
180
181 fn remaining(&self) -> u32 {
183 let now = Utc::now();
184 let cutoff = now - chrono::Duration::seconds(self.window_secs as i64);
185 let active = self.window.iter().filter(|t| **t > cutoff).count();
186 self.max_requests.saturating_sub(active as u32)
187 }
188}
189
190pub struct AgentScheduler {
195 queue: Arc<Mutex<BinaryHeap<ScheduledTask>>>,
197 running: Arc<Mutex<HashMap<Uuid, ScheduledTask>>>,
199 max_concurrent: usize,
201 rate_limiter: Arc<Mutex<RateLimiter>>,
203 zombie_timeout_secs: u64,
205 task_start_times: Arc<Mutex<HashMap<Uuid, DateTime<Utc>>>>,
207 budget_manager: Option<Arc<BudgetManager>>,
209}
210
211impl AgentScheduler {
212 pub fn new(
219 max_concurrent: usize,
220 rate_limit_per_minute: u32,
221 zombie_timeout_secs: u64,
222 ) -> Self {
223 Self {
224 queue: Arc::new(Mutex::new(BinaryHeap::new())),
225 running: Arc::new(Mutex::new(HashMap::new())),
226 max_concurrent,
227 rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60, rate_limit_per_minute))),
228 zombie_timeout_secs,
229 task_start_times: Arc::new(Mutex::new(HashMap::new())),
230 budget_manager: None,
231 }
232 }
233
234 pub fn set_budget_manager(&mut self, bm: Arc<BudgetManager>) {
242 self.budget_manager = Some(bm);
243 }
244
245 pub fn submit(&self, mut task: ScheduledTask) -> Result<Uuid> {
249 task.status = TaskStatus::Queued;
250 let id = task.id;
251
252 let mut queue = self.queue.lock();
253 queue.push(task); tracing::debug!(
256 task_id = %id,
257 queue_len = queue.len(),
258 "Task submitted to scheduler"
259 );
260
261 Ok(id)
262 }
263
264 pub fn next_task(&self) -> Option<ScheduledTask> {
271 {
273 let running = self.running.lock();
274 if running.len() >= self.max_concurrent {
275 tracing::debug!(
276 running = running.len(),
277 max = self.max_concurrent,
278 "Max concurrent limit reached"
279 );
280 return None;
281 }
282 }
283
284 {
286 let mut limiter = self.rate_limiter.lock();
287 if !limiter.allow() {
288 tracing::debug!(remaining = limiter.remaining(), "Rate limit exceeded");
289 return None;
290 }
291 }
292
293 let mut discarded: usize = 0;
295 let mut task = loop {
296 let task_opt = {
297 let mut queue = self.queue.lock();
298 queue.pop() };
300
301 match task_opt {
302 Some(t) => {
303 if let (Some(ref bm), Some(ref agent_id)) = (&self.budget_manager, &t.agent_id)
305 {
306 if !bm.can_schedule(agent_id) {
307 tracing::warn!(
308 agent_id = %agent_id,
309 "Agent budget exhausted, skipping task"
310 );
311 discarded += 1;
312 continue; }
314 }
315 break t;
316 }
317 None => {
318 if discarded > 0 {
319 tracing::info!(discarded, "All queued tasks had exhausted budgets");
320 }
321 return None;
322 }
323 }
324 };
325
326 if discarded > 0 {
327 tracing::info!(discarded, "Skipped tasks with exhausted budgets");
328 }
329
330 task.status = TaskStatus::Running;
331
332 {
334 let mut start_times = self.task_start_times.lock();
335 start_times.insert(task.id, Utc::now());
336 }
337
338 {
340 let mut running = self.running.lock();
341 running.insert(task.id, task.clone());
342 }
343
344 tracing::info!(
345 task_id = %task.id,
346 priority = ?task.priority,
347 running = self.running.lock().len(),
348 "Task started by scheduler"
349 );
350
351 if let (Some(ref bm), Some(ref agent_id)) = (&self.budget_manager, &task.agent_id) {
353 if let Err(e) = bm.track_call(agent_id) {
354 tracing::warn!(
355 agent_id = %agent_id,
356 error = %e,
357 "Budget exceeded during task track_call"
358 );
359 }
360 }
361
362 Some(task)
363 }
364
365 pub fn complete_task(&self, task_id: Uuid) -> Result<()> {
369 let task = {
370 let mut running = self.running.lock();
371 running.remove(&task_id)
372 };
373
374 match task {
375 Some(mut t) => {
376 t.status = TaskStatus::Completed;
377
378 {
380 let mut start_times = self.task_start_times.lock();
381 start_times.remove(&task_id);
382 }
383
384 tracing::info!(task_id = %task_id, "Task completed");
385 Ok(())
386 }
387 None => {
388 tracing::warn!(task_id = %task_id, "Attempted to complete unknown task");
389 Err(anyhow::anyhow!("task not found"))
390 }
391 }
392 }
393
394 pub fn fail_task(&self, task_id: Uuid, error: &str) -> Result<()> {
398 let task = {
399 let mut running = self.running.lock();
400 running.remove(&task_id)
401 };
402
403 match task {
404 Some(mut t) => {
405 t.status = TaskStatus::Failed;
406 t.error = Some(error.to_string());
407
408 {
410 let mut start_times = self.task_start_times.lock();
411 start_times.remove(&task_id);
412 }
413
414 tracing::warn!(task_id = %task_id, error = %error, "Task failed");
415 Ok(())
416 }
417 None => {
418 tracing::warn!(task_id = %task_id, "Attempted to fail unknown task");
419 Err(anyhow::anyhow!("task not found"))
420 }
421 }
422 }
423
424 pub fn reap_zombies(&self) -> Vec<Uuid> {
428 let now = Utc::now();
429 let timeout = chrono::Duration::seconds(self.zombie_timeout_secs as i64);
430 let mut start_times = self.task_start_times.lock();
431 let mut running = self.running.lock();
432 let mut reaped = Vec::new();
433
434 let zombie_ids: Vec<Uuid> = start_times
435 .iter()
436 .filter(|(_, start)| now - **start > timeout)
437 .map(|(id, _)| *id)
438 .collect();
439
440 for id in zombie_ids {
441 if let Some(mut task) = running.remove(&id) {
442 task.status = TaskStatus::Failed;
443 task.error = Some(format!(
444 "zombie: ran for >{} seconds",
445 self.zombie_timeout_secs
446 ));
447 reaped.push(id);
448 tracing::warn!(
449 task_id = %id,
450 duration_secs = self.zombie_timeout_secs,
451 "Zombie task reaped"
452 );
453 }
454 start_times.remove(&id);
456 }
457
458 reaped
459 }
460
461 pub fn start_task(&self, task_id: Uuid) -> Result<()> {
466 let task = {
467 let mut queue = self.queue.lock();
468 let all: Vec<ScheduledTask> = queue.drain().collect();
469 let mut found: Option<ScheduledTask> = None;
470 let remaining: Vec<ScheduledTask> = all
471 .into_iter()
472 .filter(|t| {
473 if t.id == task_id {
474 found = Some(t.clone());
475 false
476 } else {
477 true
478 }
479 })
480 .collect();
481 *queue = remaining.into_iter().collect();
482 found
483 };
484
485 match task {
486 Some(mut task) => {
487 task.status = TaskStatus::Running;
488 let mut start_times = self.task_start_times.lock();
489 start_times.insert(task.id, Utc::now());
490 let mut running = self.running.lock();
491 running.insert(task.id, task);
492 Ok(())
493 }
494 None => Err(anyhow::anyhow!("task {} not found in queue", task_id)),
495 }
496 }
497
498 pub fn cancel_task(&self, task_id: Uuid) -> Result<()> {
502 let mut queue = self.queue.lock();
503 let all: Vec<ScheduledTask> = queue.drain().collect();
504 let mut found = false;
505 let remaining: Vec<ScheduledTask> = all
506 .into_iter()
507 .filter(|t| {
508 if t.id == task_id && t.status == TaskStatus::Queued {
509 found = true;
510 false
511 } else {
512 true
513 }
514 })
515 .collect();
516 *queue = remaining.into_iter().collect();
517
518 if found {
519 tracing::info!(task_id = %task_id, "Task cancelled from queue");
520 Ok(())
521 } else {
522 tracing::warn!(task_id = %task_id, "Task not found in queue for cancellation");
523 Err(anyhow::anyhow!("task not found in queue"))
524 }
525 }
526
527 pub fn stats(&self) -> SchedulerStats {
529 let queue = self.queue.lock();
530 let running = self.running.lock();
531 let rate_limiter = self.rate_limiter.lock();
532
533 let _completed = 0usize;
534 let _failed = 0usize;
535
536 SchedulerStats {
537 queued: queue.len(),
538 running: running.len(),
539 completed: _completed,
540 failed: _failed,
541 max_concurrent: self.max_concurrent,
542 rate_limit_per_minute: rate_limiter.max_requests,
543 rate_remaining: rate_limiter.remaining(),
544 }
545 }
546
547 pub fn rate_limit_remaining(&self) -> u32 {
549 self.rate_limiter.lock().remaining()
550 }
551
552 pub fn queued_tasks(&self) -> Vec<ScheduledTask> {
554 let heap = self.queue.lock();
555 let mut tasks: Vec<ScheduledTask> = heap.iter().cloned().collect();
556 tasks.sort_by_key(|a| a.priority);
559 tasks
560 }
561
562 pub fn running_tasks(&self) -> Vec<ScheduledTask> {
564 self.running.lock().values().cloned().collect()
565 }
566}
567
568impl Default for AgentScheduler {
569 fn default() -> Self {
570 Self::new(5, 60, 300)
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577 use std::thread;
578 use std::time::Duration;
579
580 #[test]
581 fn test_task_creation() {
582 let task = ScheduledTask::new("Test task".into(), Priority::Normal);
583 assert_eq!(task.status, TaskStatus::Queued);
584 assert!(task.agent_id.is_none());
585 assert!(!task.error.is_some());
586 }
587
588 #[test]
589 fn test_task_creation_for_agent() {
590 let agent_id = AgentId::new_v4();
591 let task = ScheduledTask::for_agent(agent_id, "Agent task".into(), Priority::High);
592 assert_eq!(task.agent_id, Some(agent_id));
593 assert_eq!(task.priority, Priority::High);
594 }
595
596 #[test]
597 fn test_priority_ordering() {
598 assert!(Priority::Critical > Priority::High);
599 assert!(Priority::High > Priority::Normal);
600 assert!(Priority::Normal > Priority::Low);
601 assert!(Priority::Critical > Priority::Normal);
603 assert!(Priority::Critical > Priority::Low);
604 assert!(Priority::High > Priority::Low);
605 }
606
607 #[test]
608 fn test_priority_ordering_eq() {
609 assert_eq!(Priority::Low, Priority::Low);
610 assert_eq!(Priority::Normal, Priority::Normal);
611 assert_eq!(Priority::High, Priority::High);
612 assert_eq!(Priority::Critical, Priority::Critical);
613 }
614
615 #[test]
616 fn test_submit_and_next_high_priority_first() {
617 let scheduler = AgentScheduler::new(10, 10_000, 60);
618
619 scheduler
620 .submit(ScheduledTask::new("Low priority".into(), Priority::Low))
621 .unwrap();
622 scheduler
623 .submit(ScheduledTask::new("High priority".into(), Priority::High))
624 .unwrap();
625 scheduler
626 .submit(ScheduledTask::new(
627 "Normal priority".into(),
628 Priority::Normal,
629 ))
630 .unwrap();
631
632 let next = scheduler.next_task().unwrap();
634 assert_eq!(next.priority, Priority::High);
635
636 let next = scheduler.next_task().unwrap();
638 assert_eq!(next.priority, Priority::Normal);
639
640 let next = scheduler.next_task().unwrap();
642 assert_eq!(next.priority, Priority::Low);
643 }
644
645 #[test]
646 fn test_submit_and_next_critical_first() {
647 let scheduler = AgentScheduler::new(10, 10_000, 60);
648
649 scheduler
650 .submit(ScheduledTask::new("Low".into(), Priority::Low))
651 .unwrap();
652 scheduler
653 .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
654 .unwrap();
655 scheduler
656 .submit(ScheduledTask::new("High".into(), Priority::High))
657 .unwrap();
658 scheduler
659 .submit(ScheduledTask::new("Critical".into(), Priority::Critical))
660 .unwrap();
661
662 let next = scheduler.next_task().unwrap();
664 assert_eq!(next.priority, Priority::Critical);
665 let next = scheduler.next_task().unwrap();
667 assert_eq!(next.priority, Priority::High);
668 let next = scheduler.next_task().unwrap();
670 assert_eq!(next.priority, Priority::Normal);
671 let next = scheduler.next_task().unwrap();
673 assert_eq!(next.priority, Priority::Low);
674 }
675
676 #[test]
677 fn test_submit_multiple_same_priority() {
678 let scheduler = AgentScheduler::new(10, 10_000, 60);
679
680 scheduler
683 .submit(ScheduledTask::new("First".into(), Priority::Normal))
684 .unwrap();
685 scheduler
686 .submit(ScheduledTask::new("Second".into(), Priority::Normal))
687 .unwrap();
688 scheduler
689 .submit(ScheduledTask::new("Third".into(), Priority::Normal))
690 .unwrap();
691
692 let mut descriptions = Vec::new();
694 for _ in 0..3 {
695 let next = scheduler.next_task().unwrap();
696 assert_eq!(next.priority, Priority::Normal);
697 descriptions.push(next.description);
698 }
699 descriptions.sort();
700 assert_eq!(descriptions, vec!["First", "Second", "Third"]);
701 }
702
703 #[test]
704 fn test_max_concurrent_blocks() {
705 let scheduler = AgentScheduler::new(2, 10_000, 60);
706
707 scheduler
708 .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
709 .unwrap();
710 scheduler
711 .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
712 .unwrap();
713 scheduler
714 .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
715 .unwrap();
716
717 assert!(scheduler.next_task().is_some());
718 assert!(scheduler.next_task().is_some());
719 assert!(scheduler.next_task().is_none());
721 }
722
723 #[test]
724 fn test_max_concurrent_allows_when_slot_frees() {
725 let scheduler = AgentScheduler::new(2, 10_000, 60); let _ = scheduler
728 .submit(ScheduledTask::new("Task 1".into(), Priority::Normal))
729 .unwrap();
730 let _id2 = scheduler
731 .submit(ScheduledTask::new("Task 2".into(), Priority::Normal))
732 .unwrap();
733 let t1 = scheduler.next_task().unwrap(); let t2 = scheduler.next_task().unwrap(); assert!(scheduler.next_task().is_none()); scheduler.complete_task(t1.id).unwrap();
743 scheduler.complete_task(t2.id).unwrap();
744
745 let _id3 = scheduler
747 .submit(ScheduledTask::new("Task 3".into(), Priority::Normal))
748 .unwrap();
749
750 let task = scheduler.next_task().unwrap();
752 assert_eq!(task.description, "Task 3");
753
754 scheduler.complete_task(task.id).unwrap();
756 }
757
758 #[test]
759 fn test_complete_task_removes_from_running() {
760 let scheduler = AgentScheduler::new(2, 10_000, 60);
761 let task = ScheduledTask::new("Test".into(), Priority::Normal);
762 let id = scheduler.submit(task).unwrap();
763
764 let _ = scheduler.next_task();
765 scheduler.complete_task(id).unwrap();
766
767 let stats = scheduler.stats();
768 assert_eq!(stats.running, 0);
769 }
770
771 #[test]
772 fn test_complete_unknown_task_returns_error() {
773 let scheduler = AgentScheduler::new(2, 10_000, 60);
774 let result = scheduler.complete_task(Uuid::new_v4());
775 assert!(result.is_err());
776 }
777
778 #[test]
779 fn test_fail_task_sets_error() {
780 let scheduler = AgentScheduler::new(2, 10_000, 60);
781 let task = ScheduledTask::new("Test".into(), Priority::Normal);
782 let id = scheduler.submit(task).unwrap();
783
784 let _ = scheduler.next_task();
785 scheduler.fail_task(id, "Something went wrong").unwrap();
786
787 let running = scheduler.running.lock();
788 assert!(!running.contains_key(&id));
789 }
790
791 #[test]
792 fn test_cancel_queued_task() {
793 let scheduler = AgentScheduler::new(2, 10_000, 60);
794 let id = scheduler
795 .submit(ScheduledTask::new("To cancel".into(), Priority::Normal))
796 .unwrap();
797
798 scheduler.cancel_task(id).unwrap();
799
800 assert!(scheduler.next_task().is_none());
802 }
803
804 #[test]
805 fn test_cancel_running_task_fails() {
806 let scheduler = AgentScheduler::new(2, 10_000, 60);
807 let id = scheduler
808 .submit(ScheduledTask::new("Running".into(), Priority::Normal))
809 .unwrap();
810
811 let _ = scheduler.next_task(); let result = scheduler.cancel_task(id);
815 assert!(result.is_err());
816 }
817
818 #[test]
819 fn test_cancel_unknown_task_fails() {
820 let scheduler = AgentScheduler::new(2, 10_000, 60);
821 let result = scheduler.cancel_task(Uuid::new_v4());
822 assert!(result.is_err());
823 }
824
825 #[test]
826 fn test_stats_tracking() {
827 let scheduler = AgentScheduler::new(2, 60, 60);
828
829 let id1 = scheduler
830 .submit(ScheduledTask::new("Queued".into(), Priority::Normal))
831 .unwrap();
832 scheduler
833 .submit(ScheduledTask::new("Queued 2".into(), Priority::Low))
834 .unwrap();
835
836 let started = scheduler.next_task().unwrap();
838 assert_eq!(started.id, id1);
839
840 let stats = scheduler.stats();
841 assert_eq!(stats.queued, 1); assert_eq!(stats.running, 1);
843 assert_eq!(stats.max_concurrent, 2);
844 assert_eq!(stats.rate_limit_per_minute, 60);
845 }
846
847 #[test]
848 fn test_reap_zombies() {
849 let scheduler = AgentScheduler::new(2, 10_000, 1); let id = scheduler
854 .submit(ScheduledTask::new("Zombie".into(), Priority::Normal))
855 .unwrap();
856 let _ = scheduler.next_task();
857
858 thread::sleep(Duration::from_secs(2));
860
861 let reaped = scheduler.reap_zombies();
863 assert!(reaped.contains(&id));
864
865 assert!(scheduler.running.lock().get(&id).is_none());
867 }
868
869 #[test]
870 fn test_reap_zombies_no_zombies() {
871 let scheduler = AgentScheduler::new(2, 10_000, 60); let id = scheduler
874 .submit(ScheduledTask::new("Normal".into(), Priority::Normal))
875 .unwrap();
876 let _ = scheduler.next_task();
877
878 let reaped = scheduler.reap_zombies();
880 assert!(reaped.is_empty());
881
882 assert!(scheduler.running.lock().get(&id).is_some());
884 }
885
886 #[test]
887 fn test_rate_limiter_basic() {
888 let mut limiter = RateLimiter::new(60, 3); assert!(limiter.allow());
891 assert!(limiter.allow());
892 assert!(limiter.allow());
893 assert!(!limiter.allow());
895 }
896
897 #[test]
898 fn test_rate_limiter_remaining() {
899 let limiter = RateLimiter::new(60, 3);
900
901 assert_eq!(limiter.remaining(), 3);
902
903 let mut limiter = RateLimiter::new(60, 3);
904 limiter.allow();
905 limiter.allow();
906 assert_eq!(limiter.remaining(), 1);
907 }
908
909 #[test]
910 fn test_rate_limiter_tracks_per_scheduler() {
911 let scheduler = AgentScheduler::new(10, 5, 60); for i in 0..5 {
915 scheduler
916 .submit(ScheduledTask::new(format!("T{}", i), Priority::Normal))
917 .unwrap();
918 let _ = scheduler.next_task();
919 }
920
921 assert!(scheduler.next_task().is_none());
923 assert_eq!(scheduler.rate_limit_remaining(), 0);
924 }
925
926 #[test]
927 fn test_queued_tasks_inspection() {
928 let scheduler = AgentScheduler::new(2, 10_000, 60);
929
930 scheduler
931 .submit(ScheduledTask::new("A".into(), Priority::Low))
932 .unwrap();
933 scheduler
934 .submit(ScheduledTask::new("B".into(), Priority::High))
935 .unwrap();
936 scheduler
937 .submit(ScheduledTask::new("C".into(), Priority::Normal))
938 .unwrap();
939
940 let queued = scheduler.queued_tasks();
941 assert_eq!(queued.len(), 3);
942 assert_eq!(queued.last().unwrap().description, "B");
945 }
946
947 #[test]
948 fn test_running_tasks_inspection() {
949 let scheduler = AgentScheduler::new(2, 10_000, 60);
950
951 scheduler
952 .submit(ScheduledTask::new("R1".into(), Priority::Normal))
953 .unwrap();
954 scheduler
955 .submit(ScheduledTask::new("R2".into(), Priority::Normal))
956 .unwrap();
957
958 let _ = scheduler.next_task();
959 let _ = scheduler.next_task();
960
961 let running = scheduler.running_tasks();
962 assert_eq!(running.len(), 2);
963 }
964
965 #[test]
966 fn test_default_scheduler() {
967 let scheduler = AgentScheduler::default();
968 let stats = scheduler.stats();
969 assert_eq!(stats.max_concurrent, 5);
970 assert_eq!(stats.rate_limit_per_minute, 60);
971 }
972
973 #[test]
974 fn test_budget_manager_integration_skips_exhausted_agent() {
975 use crate::budget::{BudgetLimit, BudgetManager};
976
977 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
978 let budget_manager = Arc::new(BudgetManager::new());
979
980 let agent_id = AgentId::new_v4();
982 budget_manager.set_budget(BudgetLimit {
983 agent_id,
984 token_budget: 1000,
985 calls_budget: 1,
986 window_secs: 60,
987 });
988
989 scheduler
991 .lock()
992 .set_budget_manager(Arc::clone(&budget_manager));
993
994 scheduler
996 .lock()
997 .submit(ScheduledTask::for_agent(
998 agent_id,
999 "Task 1".into(),
1000 Priority::Normal,
1001 ))
1002 .unwrap();
1003 scheduler
1004 .lock()
1005 .submit(ScheduledTask::for_agent(
1006 agent_id,
1007 "Task 2".into(),
1008 Priority::Normal,
1009 ))
1010 .unwrap();
1011
1012 let task1 = scheduler.lock().next_task();
1014 assert!(task1.is_some());
1015 scheduler.lock().complete_task(task1.unwrap().id).unwrap();
1016
1017 let task2 = scheduler.lock().next_task();
1019 assert!(task2.is_none());
1020 }
1021
1022 #[test]
1023 fn test_budget_manager_allows_different_agents() {
1024 use crate::budget::{BudgetLimit, BudgetManager};
1025
1026 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1027 let budget_manager = Arc::new(BudgetManager::new());
1028
1029 let agent1 = AgentId::new_v4();
1030 let agent2 = AgentId::new_v4();
1031
1032 for agent_id in [&agent1, &agent2] {
1034 budget_manager.set_budget(BudgetLimit {
1035 agent_id: *agent_id,
1036 token_budget: 1000,
1037 calls_budget: 3,
1038 window_secs: 60,
1039 });
1040 }
1041
1042 scheduler
1043 .lock()
1044 .set_budget_manager(Arc::clone(&budget_manager));
1045
1046 scheduler
1048 .lock()
1049 .submit(ScheduledTask::for_agent(
1050 agent1,
1051 "A1".into(),
1052 Priority::Normal,
1053 ))
1054 .unwrap();
1055 scheduler
1056 .lock()
1057 .submit(ScheduledTask::for_agent(
1058 agent2,
1059 "B1".into(),
1060 Priority::Normal,
1061 ))
1062 .unwrap();
1063
1064 let t1 = scheduler.lock().next_task().unwrap();
1066 let t2 = scheduler.lock().next_task().unwrap();
1067 assert_ne!(t1.description, t2.description);
1068 }
1069
1070 #[test]
1071 fn test_budget_manager_task_without_agent_id() {
1072 use crate::budget::BudgetManager;
1073
1074 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1075 let budget_manager = Arc::new(BudgetManager::new());
1076
1077 scheduler
1078 .lock()
1079 .set_budget_manager(Arc::clone(&budget_manager));
1080
1081 scheduler
1083 .lock()
1084 .submit(ScheduledTask::new("No agent".into(), Priority::Normal))
1085 .unwrap();
1086
1087 let task = scheduler.lock().next_task();
1089 assert!(task.is_some());
1090 }
1091
1092 #[test]
1093 fn test_budget_manager_not_set_skips_check() {
1094 let scheduler = Arc::new(Mutex::new(AgentScheduler::new(2, 10_000, 60)));
1095 scheduler
1098 .lock()
1099 .submit(ScheduledTask::new("Any task".into(), Priority::Normal))
1100 .unwrap();
1101
1102 let task = scheduler.lock().next_task();
1104 assert!(task.is_some());
1105 }
1106}