1use crate::error::{RuntimeError, RuntimeResult};
6use chrono::{DateTime, Utc};
7use std::collections::{HashMap, VecDeque};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore};
11use tokio_util::sync::CancellationToken;
12use uuid::Uuid;
13
14#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub struct SubAgentId(String);
17
18impl SubAgentId {
19 #[must_use]
21 pub fn new() -> Self {
22 Self(Uuid::new_v4().to_string())
23 }
24}
25
26impl Default for SubAgentId {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl std::fmt::Display for SubAgentId {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 write!(f, "{}", self.0)
35 }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum SubAgentStatus {
41 Initializing,
43 Running,
45 Completed,
47 Failed,
49 Cancelled,
51 TimedOut,
53}
54
55impl SubAgentStatus {
56 fn is_terminal(self) -> bool {
58 matches!(
59 self,
60 Self::Completed | Self::Failed | Self::Cancelled | Self::TimedOut
61 )
62 }
63}
64
65impl std::fmt::Display for SubAgentStatus {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 match self {
68 Self::Initializing => write!(f, "initializing"),
69 Self::Running => write!(f, "running"),
70 Self::Completed => write!(f, "completed"),
71 Self::Failed => write!(f, "failed"),
72 Self::Cancelled => write!(f, "cancelled"),
73 Self::TimedOut => write!(f, "timed_out"),
74 }
75 }
76}
77
78#[derive(Debug)]
80pub struct SubAgentHandle {
81 pub id: SubAgentId,
83
84 pub parent_id: Option<SubAgentId>,
86
87 pub task: String,
89
90 pub depth: usize,
92
93 status: Arc<RwLock<SubAgentStatus>>,
95
96 final_status: Arc<std::sync::Mutex<Option<SubAgentStatus>>>,
98
99 pub started_at: DateTime<Utc>,
101
102 completed_at: Arc<RwLock<Option<DateTime<Utc>>>>,
104
105 result: Arc<RwLock<Option<String>>>,
107
108 error: Arc<RwLock<Option<String>>>,
110
111 permit: std::sync::Mutex<Option<OwnedSemaphorePermit>>,
113}
114
115impl SubAgentHandle {
116 #[must_use]
118 pub fn new(
119 task: impl Into<String>,
120 parent_id: Option<SubAgentId>,
121 depth: usize,
122 permit: Option<OwnedSemaphorePermit>,
123 ) -> Self {
124 Self {
125 id: SubAgentId::new(),
126 parent_id,
127 task: task.into(),
128 depth,
129 status: Arc::new(RwLock::new(SubAgentStatus::Initializing)),
130 final_status: Arc::new(std::sync::Mutex::new(None)),
131 started_at: Utc::now(),
132 completed_at: Arc::new(RwLock::new(None)),
133 result: Arc::new(RwLock::new(None)),
134 error: Arc::new(RwLock::new(None)),
135 permit: std::sync::Mutex::new(permit),
136 }
137 }
138
139 pub async fn status(&self) -> SubAgentStatus {
141 *self.status.read().await
142 }
143
144 pub fn final_status(&self) -> Option<SubAgentStatus> {
150 *self
151 .final_status
152 .lock()
153 .expect("final_status mutex poisoned")
154 }
155
156 pub async fn set_status(&self, status: SubAgentStatus) {
162 *self.status.write().await = status;
163 if status.is_terminal() {
164 *self.completed_at.write().await = Some(Utc::now());
165 *self
166 .final_status
167 .lock()
168 .expect("final_status mutex poisoned") = Some(status);
169 }
170 }
171
172 pub async fn mark_running(&self) {
174 self.set_status(SubAgentStatus::Running).await;
175 }
176
177 pub async fn complete(&self, result: impl Into<String>) {
179 *self.result.write().await = Some(result.into());
180 self.set_status(SubAgentStatus::Completed).await;
181 }
182
183 pub async fn fail(&self, error: impl Into<String>) {
185 *self.error.write().await = Some(error.into());
186 self.set_status(SubAgentStatus::Failed).await;
187 }
188
189 pub async fn cancel(&self) {
191 self.set_status(SubAgentStatus::Cancelled).await;
192 }
193
194 pub async fn timeout(&self) {
196 self.set_status(SubAgentStatus::TimedOut).await;
197 }
198
199 pub async fn result(&self) -> Option<String> {
201 self.result.read().await.clone()
202 }
203
204 pub async fn error(&self) -> Option<String> {
206 self.error.read().await.clone()
207 }
208
209 pub async fn completed_at(&self) -> Option<DateTime<Utc>> {
211 *self.completed_at.read().await
212 }
213
214 #[allow(clippy::arithmetic_side_effects)] pub async fn duration(&self) -> Option<chrono::Duration> {
217 self.completed_at()
218 .await
219 .map(|completed| completed - self.started_at)
220 }
221
222 pub async fn is_done(&self) -> bool {
224 self.status().await.is_terminal()
225 }
226
227 fn release_permit(&self) {
229 let _ = self.permit.lock().expect("permit mutex poisoned").take();
230 }
231}
232
233const DEFAULT_MAX_HISTORY: usize = 1000;
235
236#[derive(Debug)]
238pub struct SubAgentPool {
239 max_concurrent: usize,
241
242 max_depth: usize,
244
245 max_history: usize,
247
248 semaphore: Arc<Semaphore>,
250
251 active: Arc<RwLock<HashMap<SubAgentId, Arc<SubAgentHandle>>>>,
253
254 completed: Arc<RwLock<Vec<Arc<SubAgentHandle>>>>,
256
257 completion_notify: Arc<Notify>,
259
260 cancellation_token: CancellationToken,
262}
263
264impl SubAgentPool {
265 #[must_use]
267 pub fn new(max_concurrent: usize, max_depth: usize) -> Self {
268 Self::with_max_history(max_concurrent, max_depth, DEFAULT_MAX_HISTORY)
269 }
270
271 #[must_use]
273 pub fn with_max_history(max_concurrent: usize, max_depth: usize, max_history: usize) -> Self {
274 Self {
275 max_concurrent,
276 max_depth,
277 max_history,
278 semaphore: Arc::new(Semaphore::new(max_concurrent)),
279 active: Arc::new(RwLock::new(HashMap::new())),
280 completed: Arc::new(RwLock::new(Vec::new())),
281 completion_notify: Arc::new(Notify::new()),
282 cancellation_token: CancellationToken::new(),
283 }
284 }
285
286 pub async fn spawn(
292 &self,
293 task: impl Into<String>,
294 parent_id: Option<SubAgentId>,
295 ) -> RuntimeResult<Arc<SubAgentHandle>> {
296 let depth = if parent_id.is_some() {
297 let active = self.active.read().await;
299 if let Some(parent) = parent_id.as_ref().and_then(|id| active.get(id)) {
300 parent.depth.checked_add(1).ok_or_else(|| {
301 RuntimeError::SubAgentError("subagent depth overflow".to_string())
302 })?
303 } else {
304 1
305 }
306 } else {
307 0
308 };
309
310 if depth >= self.max_depth {
311 return Err(RuntimeError::SubAgentError(format!(
312 "maximum subagent depth ({}) exceeded",
313 self.max_depth
314 )));
315 }
316
317 let permit = self.semaphore.clone().try_acquire_owned().map_err(|_| {
319 RuntimeError::SubAgentError("maximum concurrent subagents reached".into())
320 })?;
321
322 let handle = Arc::new(SubAgentHandle::new(task, parent_id, depth, Some(permit)));
323
324 self.active
326 .write()
327 .await
328 .insert(handle.id.clone(), handle.clone());
329
330 Ok(handle)
331 }
332
333 pub async fn release(&self, id: &SubAgentId) {
339 let mut active = self.active.write().await;
340 if let Some(handle) = active.remove(id) {
341 handle.release_permit();
342 self.push_to_history(handle).await;
343 if active.is_empty() {
344 self.completion_notify.notify_waiters();
345 }
346 }
347 }
348
349 pub async fn stop(&self, id: &SubAgentId) -> Option<Arc<SubAgentHandle>> {
351 let mut active = self.active.write().await;
352 if let Some(handle) = active.remove(id) {
353 handle.cancel().await;
354 handle.release_permit();
355 self.push_to_history(handle.clone()).await;
356 if active.is_empty() {
357 self.completion_notify.notify_waiters();
358 }
359 Some(handle)
360 } else {
361 None
362 }
363 }
364
365 async fn push_to_history(&self, handle: Arc<SubAgentHandle>) {
367 let mut completed = self.completed.write().await;
368 if completed.len() >= self.max_history {
369 completed.remove(0);
370 }
371 completed.push(handle);
372 }
373
374 pub async fn get(&self, id: &SubAgentId) -> Option<Arc<SubAgentHandle>> {
376 self.active.read().await.get(id).cloned()
377 }
378
379 pub async fn list_active(&self) -> Vec<Arc<SubAgentHandle>> {
381 self.active.read().await.values().cloned().collect()
382 }
383
384 pub async fn active_count(&self) -> usize {
386 self.active.read().await.len()
387 }
388
389 #[must_use]
391 pub fn available_permits(&self) -> usize {
392 self.semaphore.available_permits()
393 }
394
395 pub async fn can_spawn_child(&self, parent_id: &SubAgentId) -> bool {
399 let active = self.active.read().await;
400 let parent_depth = if let Some(parent) = active.get(parent_id) {
401 parent.depth
402 } else {
403 return false; };
405
406 let Some(child_depth) = parent_depth.checked_add(1) else {
407 return false;
408 };
409 if child_depth >= self.max_depth {
410 return false;
411 }
412
413 self.semaphore.available_permits() > 0
414 }
415
416 #[must_use]
421 pub fn cancellation_token(&self) -> CancellationToken {
422 self.cancellation_token.clone()
423 }
424
425 pub async fn cancel_all(&self) {
430 self.cancellation_token.cancel();
432
433 let mut active = self.active.write().await;
434 let handles: Vec<Arc<SubAgentHandle>> = active.drain().map(|(_, h)| h).collect();
435 drop(active);
436
437 for handle in handles {
438 handle.cancel().await;
439 handle.release_permit();
440 self.push_to_history(handle).await;
441 }
442
443 self.completion_notify.notify_waiters();
444 }
445
446 pub async fn wait_for_completion(&self) {
448 loop {
449 if self.active.read().await.is_empty() {
450 return;
451 }
452 self.completion_notify.notified().await;
453 }
454 }
455
456 pub async fn wait_for_completion_timeout(&self, timeout: Duration) -> bool {
460 tokio::select! {
461 () = self.wait_for_completion() => true,
462 () = tokio::time::sleep(timeout) => {
463 self.active.read().await.is_empty()
464 }
465 }
466 }
467
468 pub async fn get_children(&self, parent_id: &SubAgentId) -> Vec<Arc<SubAgentHandle>> {
470 let mut children = Vec::new();
471
472 let active = self.active.read().await;
473 for handle in active.values() {
474 if handle.parent_id.as_ref() == Some(parent_id) {
475 children.push(handle.clone());
476 }
477 }
478 drop(active);
479
480 let completed = self.completed.read().await;
481 for handle in &*completed {
482 if handle.parent_id.as_ref() == Some(parent_id) {
483 children.push(handle.clone());
484 }
485 }
486
487 children
488 }
489
490 pub async fn get_subtree(&self, parent_id: &SubAgentId) -> Vec<Arc<SubAgentHandle>> {
492 let active = self.active.read().await;
494 let completed = self.completed.read().await;
495
496 let all_handles: Vec<Arc<SubAgentHandle>> = active
497 .values()
498 .cloned()
499 .chain(completed.iter().cloned())
500 .collect();
501 drop(active);
502 drop(completed);
503
504 let mut result = Vec::new();
505 let mut queue = VecDeque::new();
506 queue.push_back(parent_id.clone());
507
508 while let Some(current_id) = queue.pop_front() {
509 for handle in &all_handles {
510 if handle.parent_id.as_ref() == Some(¤t_id) {
511 result.push(handle.clone());
512 queue.push_back(handle.id.clone());
513 }
514 }
515 }
516
517 result
518 }
519
520 pub async fn cancel_subtree(&self, parent_id: &SubAgentId) -> usize {
524 let active = self.active.read().await;
526 let mut to_cancel = Vec::new();
527 let mut queue = VecDeque::new();
528 queue.push_back(parent_id.clone());
529
530 while let Some(current_id) = queue.pop_front() {
531 for (id, handle) in active.iter() {
532 if handle.parent_id.as_ref() == Some(¤t_id) {
533 to_cancel.push(id.clone());
534 queue.push_back(id.clone());
535 }
536 }
537 }
538 drop(active);
539
540 let mut cancelled = 0usize;
541 for id in &to_cancel {
542 if self.stop(id).await.is_some() {
543 cancelled = cancelled.saturating_add(1);
544 }
545 }
546 cancelled
547 }
548
549 pub async fn history(&self) -> Vec<Arc<SubAgentHandle>> {
551 self.completed.read().await.clone()
552 }
553
554 pub async fn clear_history(&self) {
556 self.completed.write().await.clear();
557 }
558
559 pub async fn stats(&self) -> SubAgentPoolStats {
561 let active = self.active.read().await;
562 let completed = self.completed.read().await;
563
564 let (succeeded, failed, cancelled, timed_out) =
565 completed
566 .iter()
567 .fold(
568 (0usize, 0usize, 0usize, 0usize),
569 |(s, f, c, t), h| match h.final_status() {
570 Some(SubAgentStatus::Completed) => (s.saturating_add(1), f, c, t),
571 Some(SubAgentStatus::Failed) => (s, f.saturating_add(1), c, t),
572 Some(SubAgentStatus::Cancelled) => (s, f, c.saturating_add(1), t),
573 Some(SubAgentStatus::TimedOut) => (s, f, c, t.saturating_add(1)),
574 _ => (s, f, c, t),
575 },
576 );
577
578 SubAgentPoolStats {
579 max_concurrent: self.max_concurrent,
580 max_depth: self.max_depth,
581 active: active.len(),
582 available: self.semaphore.available_permits(),
583 total_completed: completed.len(),
584 succeeded,
585 failed,
586 cancelled,
587 timed_out,
588 }
589 }
590}
591
592#[derive(Debug, Clone)]
594pub struct SubAgentPoolStats {
595 pub max_concurrent: usize,
597 pub max_depth: usize,
599 pub active: usize,
601 pub available: usize,
603 pub total_completed: usize,
605 pub succeeded: usize,
607 pub failed: usize,
609 pub cancelled: usize,
611 pub timed_out: usize,
613}
614
615#[cfg(test)]
616mod tests {
617 use super::*;
618
619 #[tokio::test]
620 async fn test_subagent_lifecycle() {
621 let handle = SubAgentHandle::new("test task", None, 0, None);
622
623 assert_eq!(handle.status().await, SubAgentStatus::Initializing);
624 assert!(!handle.is_done().await);
625
626 handle.mark_running().await;
627 assert_eq!(handle.status().await, SubAgentStatus::Running);
628
629 handle.complete("success").await;
630 assert_eq!(handle.status().await, SubAgentStatus::Completed);
631 assert!(handle.is_done().await);
632 assert_eq!(handle.result().await, Some("success".into()));
633 }
634
635 #[tokio::test]
636 async fn test_subagent_failure() {
637 let handle = SubAgentHandle::new("test task", None, 0, None);
638
639 handle.mark_running().await;
640 handle.fail("something went wrong").await;
641
642 assert_eq!(handle.status().await, SubAgentStatus::Failed);
643 assert!(handle.is_done().await);
644 assert_eq!(handle.error().await, Some("something went wrong".into()));
645 }
646
647 #[tokio::test]
648 async fn test_subagent_final_status() {
649 let handle = SubAgentHandle::new("test task", None, 0, None);
650 assert_eq!(handle.final_status(), None);
651
652 handle.mark_running().await;
653 assert_eq!(handle.final_status(), None);
654
655 handle.complete("done").await;
656 assert_eq!(handle.final_status(), Some(SubAgentStatus::Completed));
657 }
658
659 #[tokio::test]
660 async fn test_pool_spawn() {
661 let pool = SubAgentPool::new(5, 3);
662
663 let handle = pool.spawn("task 1", None).await.unwrap();
664 assert_eq!(pool.active_count().await, 1);
665
666 pool.release(&handle.id).await;
667 assert_eq!(pool.active_count().await, 0);
668 assert_eq!(pool.history().await.len(), 1);
669 }
670
671 #[tokio::test]
672 async fn test_pool_max_depth() {
673 let pool = SubAgentPool::new(5, 2);
674
675 let h1 = pool.spawn("task 1", None).await.unwrap();
677 assert_eq!(h1.depth, 0);
678
679 let h2 = pool.spawn("task 2", Some(h1.id.clone())).await.unwrap();
681 assert_eq!(h2.depth, 1);
682
683 let result = pool.spawn("task 3", Some(h2.id.clone())).await;
685 assert!(result.is_err());
686 }
687
688 #[tokio::test]
689 async fn test_pool_cancel_all() {
690 let pool = SubAgentPool::new(5, 3);
691
692 let h1 = pool.spawn("task 1", None).await.unwrap();
693 let h2 = pool.spawn("task 2", None).await.unwrap();
694
695 pool.cancel_all().await;
696
697 assert_eq!(h1.status().await, SubAgentStatus::Cancelled);
698 assert_eq!(h2.status().await, SubAgentStatus::Cancelled);
699 assert_eq!(pool.active_count().await, 0);
701 assert_eq!(pool.history().await.len(), 2);
702 }
703
704 #[tokio::test]
705 async fn test_semaphore_limits_concurrency() {
706 let pool = SubAgentPool::new(2, 5);
707
708 let _h1 = pool.spawn("task 1", None).await.unwrap();
709 let _h2 = pool.spawn("task 2", None).await.unwrap();
710
711 let result = pool.spawn("task 3", None).await;
713 assert!(result.is_err());
714 assert_eq!(pool.available_permits(), 0);
715 }
716
717 #[tokio::test]
718 async fn test_permit_released_on_complete() {
719 let pool = SubAgentPool::new(1, 5);
720
721 let h1 = pool.spawn("task 1", None).await.unwrap();
722 assert_eq!(pool.available_permits(), 0);
723
724 pool.release(&h1.id).await;
726 assert_eq!(pool.available_permits(), 1);
727
728 let _h2 = pool.spawn("task 2", None).await.unwrap();
730 assert_eq!(pool.available_permits(), 0);
731 }
732
733 #[tokio::test]
734 async fn test_stop_cancels_and_moves_to_history() {
735 let pool = SubAgentPool::new(5, 3);
736
737 let h = pool.spawn("task 1", None).await.unwrap();
738 let id = h.id.clone();
739
740 let stopped = pool.stop(&id).await;
741 assert!(stopped.is_some());
742
743 let handle = stopped.unwrap();
744 assert_eq!(handle.status().await, SubAgentStatus::Cancelled);
745 assert_eq!(pool.active_count().await, 0);
746 assert_eq!(pool.history().await.len(), 1);
747 }
748
749 #[tokio::test]
750 async fn test_stop_nonexistent_returns_none() {
751 let pool = SubAgentPool::new(5, 3);
752 let result = pool.stop(&SubAgentId::new()).await;
753 assert!(result.is_none());
754 }
755
756 #[tokio::test]
757 async fn test_can_spawn_child_checks_depth_and_concurrency() {
758 let pool = SubAgentPool::new(3, 2);
759
760 let h1 = pool.spawn("task 1", None).await.unwrap();
762 assert!(pool.can_spawn_child(&h1.id).await);
763
764 let h2 = pool.spawn("task 2", Some(h1.id.clone())).await.unwrap();
766 assert!(!pool.can_spawn_child(&h2.id).await); assert!(!pool.can_spawn_child(&SubAgentId::new()).await);
770 }
771
772 #[tokio::test]
773 async fn test_can_spawn_child_checks_concurrency() {
774 let pool = SubAgentPool::new(2, 5);
775
776 let h1 = pool.spawn("task 1", None).await.unwrap();
777 let _h2 = pool.spawn("task 2", None).await.unwrap();
778
779 assert!(!pool.can_spawn_child(&h1.id).await);
781 }
782
783 #[tokio::test]
784 async fn test_wait_for_completion_returns_when_empty() {
785 let pool = Arc::new(SubAgentPool::new(5, 3));
786
787 pool.wait_for_completion().await;
789
790 let h = pool.spawn("task 1", None).await.unwrap();
791 let h_id = h.id.clone();
792 let pool_clone = pool.clone();
793
794 let waiter = tokio::spawn(async move {
795 pool_clone.wait_for_completion().await;
796 });
797
798 tokio::time::sleep(Duration::from_millis(10)).await;
800 assert!(!waiter.is_finished());
801
802 pool.release(&h_id).await;
803
804 tokio::time::timeout(Duration::from_secs(1), waiter)
806 .await
807 .expect("waiter should complete")
808 .expect("waiter should not panic");
809 }
810
811 #[tokio::test]
812 async fn test_wait_for_completion_timeout_returns_false_on_expiry() {
813 let pool = SubAgentPool::new(5, 3);
814
815 let _h = pool.spawn("task 1", None).await.unwrap();
816
817 let drained = pool
818 .wait_for_completion_timeout(Duration::from_millis(50))
819 .await;
820 assert!(!drained);
821 }
822
823 #[tokio::test]
824 async fn test_wait_for_completion_timeout_returns_true_on_drain() {
825 let pool = Arc::new(SubAgentPool::new(5, 3));
826
827 let h = pool.spawn("task 1", None).await.unwrap();
828 let h_id = h.id.clone();
829 let pool_clone = pool.clone();
830
831 tokio::spawn(async move {
832 tokio::time::sleep(Duration::from_millis(20)).await;
833 pool_clone.release(&h_id).await;
834 });
835
836 let drained = pool
837 .wait_for_completion_timeout(Duration::from_secs(2))
838 .await;
839 assert!(drained);
840 }
841
842 #[tokio::test]
843 async fn test_get_children_returns_direct_children_only() {
844 let pool = SubAgentPool::new(10, 5);
845
846 let root = pool.spawn("root", None).await.unwrap();
847 let child1 = pool.spawn("child1", Some(root.id.clone())).await.unwrap();
848 let child2 = pool.spawn("child2", Some(root.id.clone())).await.unwrap();
849 let _grandchild = pool
850 .spawn("grandchild", Some(child1.id.clone()))
851 .await
852 .unwrap();
853
854 let children = pool.get_children(&root.id).await;
855 assert_eq!(children.len(), 2);
856
857 let child_ids: Vec<_> = children.iter().map(|c| c.id.clone()).collect();
858 assert!(child_ids.contains(&child1.id));
859 assert!(child_ids.contains(&child2.id));
860 }
861
862 #[tokio::test]
863 async fn test_get_children_includes_completed() {
864 let pool = SubAgentPool::new(10, 5);
865
866 let root = pool.spawn("root", None).await.unwrap();
867 let child = pool.spawn("child", Some(root.id.clone())).await.unwrap();
868 let child_id = child.id.clone();
869
870 pool.release(&child_id).await;
872
873 let children = pool.get_children(&root.id).await;
874 assert_eq!(children.len(), 1);
875 assert_eq!(children[0].id, child_id);
876 }
877
878 #[tokio::test]
879 async fn test_get_subtree_returns_all_descendants() {
880 let pool = SubAgentPool::new(10, 5);
881
882 let root = pool.spawn("root", None).await.unwrap();
883 let child1 = pool.spawn("child1", Some(root.id.clone())).await.unwrap();
884 let child2 = pool.spawn("child2", Some(root.id.clone())).await.unwrap();
885 let grandchild1 = pool
886 .spawn("grandchild1", Some(child1.id.clone()))
887 .await
888 .unwrap();
889 let grandchild2 = pool
890 .spawn("grandchild2", Some(child2.id.clone()))
891 .await
892 .unwrap();
893
894 let subtree = pool.get_subtree(&root.id).await;
895 assert_eq!(subtree.len(), 4);
896
897 let ids: Vec<_> = subtree.iter().map(|h| h.id.clone()).collect();
898 assert!(ids.contains(&child1.id));
899 assert!(ids.contains(&child2.id));
900 assert!(ids.contains(&grandchild1.id));
901 assert!(ids.contains(&grandchild2.id));
902 assert!(!ids.contains(&root.id));
904 }
905
906 #[tokio::test]
907 async fn test_cancel_subtree_cancels_only_descendants() {
908 let pool = SubAgentPool::new(10, 5);
909
910 let root = pool.spawn("root", None).await.unwrap();
911 let child = pool.spawn("child", Some(root.id.clone())).await.unwrap();
912 let grandchild = pool
913 .spawn("grandchild", Some(child.id.clone()))
914 .await
915 .unwrap();
916 let sibling = pool.spawn("sibling", None).await.unwrap();
917
918 let cancelled = pool.cancel_subtree(&root.id).await;
919 assert_eq!(cancelled, 2); assert!(pool.get(&root.id).await.is_some());
923 assert!(pool.get(&child.id).await.is_none());
925 assert!(pool.get(&grandchild.id).await.is_none());
926 assert!(pool.get(&sibling.id).await.is_some());
928 }
929
930 #[tokio::test]
931 async fn test_stats_counts_are_accurate() {
932 let pool = SubAgentPool::new(10, 5);
933
934 let h1 = pool.spawn("task 1", None).await.unwrap();
935 let h2 = pool.spawn("task 2", None).await.unwrap();
936 let h3 = pool.spawn("task 3", None).await.unwrap();
937 let h4 = pool.spawn("task 4", None).await.unwrap();
938
939 h1.complete("result").await;
941 pool.release(&h1.id).await;
942
943 h2.fail("error").await;
945 pool.release(&h2.id).await;
946
947 pool.stop(&h3.id).await;
949
950 h4.timeout().await;
952 pool.release(&h4.id).await;
953
954 let stats = pool.stats().await;
955 assert_eq!(stats.max_concurrent, 10);
956 assert_eq!(stats.max_depth, 5);
957 assert_eq!(stats.active, 0);
958 assert_eq!(stats.total_completed, 4);
959 assert_eq!(stats.succeeded, 1);
960 assert_eq!(stats.failed, 1);
961 assert_eq!(stats.cancelled, 1);
962 assert_eq!(stats.timed_out, 1);
963 }
964
965 #[tokio::test]
966 async fn test_stats_with_active() {
967 let pool = SubAgentPool::new(5, 3);
968
969 let _h1 = pool.spawn("task 1", None).await.unwrap();
970 let h2 = pool.spawn("task 2", None).await.unwrap();
971 h2.complete("done").await;
972 pool.release(&h2.id).await;
973
974 let stats = pool.stats().await;
975 assert_eq!(stats.active, 1);
976 assert_eq!(stats.total_completed, 1);
977 assert_eq!(stats.succeeded, 1);
978 assert_eq!(stats.available, 4);
979 }
980
981 #[tokio::test]
982 async fn test_history_eviction_at_capacity() {
983 let pool = SubAgentPool::with_max_history(10, 5, 3);
984
985 for i in 0..5 {
987 let h = pool.spawn(format!("task {i}"), None).await.unwrap();
988 h.complete(format!("result {i}")).await;
989 pool.release(&h.id).await;
990 }
991
992 let history = pool.history().await;
994 assert_eq!(history.len(), 3);
995 assert_eq!(history[0].task, "task 2");
997 assert_eq!(history[1].task, "task 3");
998 assert_eq!(history[2].task, "task 4");
999 }
1000}