Skip to main content

astrid_runtime/
subagent.rs

1//! Subagent pool management.
2//!
3//! Manages lifecycle (spawn, cancel, depth/concurrency enforcement) for sub-agents.
4
5use crate::error::{RuntimeError, RuntimeResult};
6use chrono::{DateTime, Utc};
7use std::collections::{HashMap, VecDeque};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore};
11use tokio_util::sync::CancellationToken;
12use uuid::Uuid;
13
14/// Unique identifier for a subagent instance.
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub struct SubAgentId(String);
17
18impl SubAgentId {
19    /// Create a new random subagent ID.
20    #[must_use]
21    pub fn new() -> Self {
22        Self(Uuid::new_v4().to_string())
23    }
24}
25
26impl Default for SubAgentId {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl std::fmt::Display for SubAgentId {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        write!(f, "{}", self.0)
35    }
36}
37
38/// Status of a subagent instance.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum SubAgentStatus {
41    /// Subagent is initializing.
42    Initializing,
43    /// Subagent is running.
44    Running,
45    /// Subagent completed successfully.
46    Completed,
47    /// Subagent failed.
48    Failed,
49    /// Subagent was cancelled.
50    Cancelled,
51    /// Subagent timed out.
52    TimedOut,
53}
54
55impl SubAgentStatus {
56    /// Whether this status is terminal (done).
57    fn is_terminal(self) -> bool {
58        matches!(
59            self,
60            Self::Completed | Self::Failed | Self::Cancelled | Self::TimedOut
61        )
62    }
63}
64
65impl std::fmt::Display for SubAgentStatus {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        match self {
68            Self::Initializing => write!(f, "initializing"),
69            Self::Running => write!(f, "running"),
70            Self::Completed => write!(f, "completed"),
71            Self::Failed => write!(f, "failed"),
72            Self::Cancelled => write!(f, "cancelled"),
73            Self::TimedOut => write!(f, "timed_out"),
74        }
75    }
76}
77
78/// Handle to a running subagent.
79#[derive(Debug)]
80pub struct SubAgentHandle {
81    /// Subagent ID.
82    pub id: SubAgentId,
83
84    /// Parent agent ID (if nested).
85    pub parent_id: Option<SubAgentId>,
86
87    /// Task description.
88    pub task: String,
89
90    /// Current depth (0 for first-level subagents).
91    pub depth: usize,
92
93    /// Current status (async access).
94    status: Arc<RwLock<SubAgentStatus>>,
95
96    /// Final status snapshot (sync access, set when entering terminal state).
97    final_status: Arc<std::sync::Mutex<Option<SubAgentStatus>>>,
98
99    /// When the subagent started.
100    pub started_at: DateTime<Utc>,
101
102    /// When the subagent completed (if done).
103    completed_at: Arc<RwLock<Option<DateTime<Utc>>>>,
104
105    /// Result (if completed).
106    result: Arc<RwLock<Option<String>>>,
107
108    /// Error message (if failed).
109    error: Arc<RwLock<Option<String>>>,
110
111    /// Semaphore permit — explicitly released when the handle leaves the active pool.
112    permit: std::sync::Mutex<Option<OwnedSemaphorePermit>>,
113}
114
115impl SubAgentHandle {
116    /// Create a new subagent handle.
117    #[must_use]
118    pub fn new(
119        task: impl Into<String>,
120        parent_id: Option<SubAgentId>,
121        depth: usize,
122        permit: Option<OwnedSemaphorePermit>,
123    ) -> Self {
124        Self {
125            id: SubAgentId::new(),
126            parent_id,
127            task: task.into(),
128            depth,
129            status: Arc::new(RwLock::new(SubAgentStatus::Initializing)),
130            final_status: Arc::new(std::sync::Mutex::new(None)),
131            started_at: Utc::now(),
132            completed_at: Arc::new(RwLock::new(None)),
133            result: Arc::new(RwLock::new(None)),
134            error: Arc::new(RwLock::new(None)),
135            permit: std::sync::Mutex::new(permit),
136        }
137    }
138
139    /// Get current status.
140    pub async fn status(&self) -> SubAgentStatus {
141        *self.status.read().await
142    }
143
144    /// Get final status synchronously (only set when terminal).
145    ///
146    /// # Panics
147    ///
148    /// Panics if the internal mutex is poisoned.
149    pub fn final_status(&self) -> Option<SubAgentStatus> {
150        *self
151            .final_status
152            .lock()
153            .expect("final_status mutex poisoned")
154    }
155
156    /// Set status.
157    ///
158    /// # Panics
159    ///
160    /// Panics if the internal mutex is poisoned.
161    pub async fn set_status(&self, status: SubAgentStatus) {
162        *self.status.write().await = status;
163        if status.is_terminal() {
164            *self.completed_at.write().await = Some(Utc::now());
165            *self
166                .final_status
167                .lock()
168                .expect("final_status mutex poisoned") = Some(status);
169        }
170    }
171
172    /// Mark as running.
173    pub async fn mark_running(&self) {
174        self.set_status(SubAgentStatus::Running).await;
175    }
176
177    /// Mark as completed with result.
178    pub async fn complete(&self, result: impl Into<String>) {
179        *self.result.write().await = Some(result.into());
180        self.set_status(SubAgentStatus::Completed).await;
181    }
182
183    /// Mark as failed with error.
184    pub async fn fail(&self, error: impl Into<String>) {
185        *self.error.write().await = Some(error.into());
186        self.set_status(SubAgentStatus::Failed).await;
187    }
188
189    /// Mark as cancelled.
190    pub async fn cancel(&self) {
191        self.set_status(SubAgentStatus::Cancelled).await;
192    }
193
194    /// Mark as timed out.
195    pub async fn timeout(&self) {
196        self.set_status(SubAgentStatus::TimedOut).await;
197    }
198
199    /// Get result (if completed).
200    pub async fn result(&self) -> Option<String> {
201        self.result.read().await.clone()
202    }
203
204    /// Get error (if failed).
205    pub async fn error(&self) -> Option<String> {
206        self.error.read().await.clone()
207    }
208
209    /// Get completion time (if done).
210    pub async fn completed_at(&self) -> Option<DateTime<Utc>> {
211        *self.completed_at.read().await
212    }
213
214    /// Get duration (if completed).
215    #[allow(clippy::arithmetic_side_effects)] // completed_at is always >= started_at
216    pub async fn duration(&self) -> Option<chrono::Duration> {
217        self.completed_at()
218            .await
219            .map(|completed| completed - self.started_at)
220    }
221
222    /// Check if done (completed, failed, cancelled, or timed out).
223    pub async fn is_done(&self) -> bool {
224        self.status().await.is_terminal()
225    }
226
227    /// Release the semaphore permit (called when moving out of the active pool).
228    fn release_permit(&self) {
229        let _ = self.permit.lock().expect("permit mutex poisoned").take();
230    }
231}
232
233/// Default maximum history size before FIFO eviction.
234const DEFAULT_MAX_HISTORY: usize = 1000;
235
236/// Pool for managing subagent instances.
237#[derive(Debug)]
238pub struct SubAgentPool {
239    /// Maximum concurrent subagents.
240    max_concurrent: usize,
241
242    /// Maximum nesting depth.
243    max_depth: usize,
244
245    /// Maximum completed history entries before FIFO eviction.
246    max_history: usize,
247
248    /// Concurrency semaphore.
249    semaphore: Arc<Semaphore>,
250
251    /// Active subagents.
252    active: Arc<RwLock<HashMap<SubAgentId, Arc<SubAgentHandle>>>>,
253
254    /// Completed subagents (for history).
255    completed: Arc<RwLock<Vec<Arc<SubAgentHandle>>>>,
256
257    /// Notified when the active pool becomes empty.
258    completion_notify: Arc<Notify>,
259
260    /// Cooperative cancellation token for all sub-agents in this pool.
261    cancellation_token: CancellationToken,
262}
263
264impl SubAgentPool {
265    /// Create a new subagent pool with default history limit (1000 entries).
266    #[must_use]
267    pub fn new(max_concurrent: usize, max_depth: usize) -> Self {
268        Self::with_max_history(max_concurrent, max_depth, DEFAULT_MAX_HISTORY)
269    }
270
271    /// Create a new subagent pool with explicit history limit.
272    #[must_use]
273    pub fn with_max_history(max_concurrent: usize, max_depth: usize, max_history: usize) -> Self {
274        Self {
275            max_concurrent,
276            max_depth,
277            max_history,
278            semaphore: Arc::new(Semaphore::new(max_concurrent)),
279            active: Arc::new(RwLock::new(HashMap::new())),
280            completed: Arc::new(RwLock::new(Vec::new())),
281            completion_notify: Arc::new(Notify::new()),
282            cancellation_token: CancellationToken::new(),
283        }
284    }
285
286    /// Spawn a new subagent.
287    ///
288    /// # Errors
289    ///
290    /// Returns an error if the maximum subagent depth is exceeded or concurrency limit is reached.
291    pub async fn spawn(
292        &self,
293        task: impl Into<String>,
294        parent_id: Option<SubAgentId>,
295    ) -> RuntimeResult<Arc<SubAgentHandle>> {
296        let depth = if parent_id.is_some() {
297            // Find parent depth
298            let active = self.active.read().await;
299            if let Some(parent) = parent_id.as_ref().and_then(|id| active.get(id)) {
300                parent.depth.checked_add(1).ok_or_else(|| {
301                    RuntimeError::SubAgentError("subagent depth overflow".to_string())
302                })?
303            } else {
304                1
305            }
306        } else {
307            0
308        };
309
310        if depth >= self.max_depth {
311            return Err(RuntimeError::SubAgentError(format!(
312                "maximum subagent depth ({}) exceeded",
313                self.max_depth
314            )));
315        }
316
317        // Try to acquire semaphore permit
318        let permit = self.semaphore.clone().try_acquire_owned().map_err(|_| {
319            RuntimeError::SubAgentError("maximum concurrent subagents reached".into())
320        })?;
321
322        let handle = Arc::new(SubAgentHandle::new(task, parent_id, depth, Some(permit)));
323
324        // Store in active map
325        self.active
326            .write()
327            .await
328            .insert(handle.id.clone(), handle.clone());
329
330        Ok(handle)
331    }
332
333    /// Release a subagent from the active pool and move to history.
334    ///
335    /// Releases the semaphore permit so another sub-agent can be spawned.
336    /// The handle's status should already be set (completed/failed/timed out)
337    /// before calling this.
338    pub async fn release(&self, id: &SubAgentId) {
339        let mut active = self.active.write().await;
340        if let Some(handle) = active.remove(id) {
341            handle.release_permit();
342            self.push_to_history(handle).await;
343            if active.is_empty() {
344                self.completion_notify.notify_waiters();
345            }
346        }
347    }
348
349    /// Stop a specific subagent: cancel it and move to history.
350    pub async fn stop(&self, id: &SubAgentId) -> Option<Arc<SubAgentHandle>> {
351        let mut active = self.active.write().await;
352        if let Some(handle) = active.remove(id) {
353            handle.cancel().await;
354            handle.release_permit();
355            self.push_to_history(handle.clone()).await;
356            if active.is_empty() {
357                self.completion_notify.notify_waiters();
358            }
359            Some(handle)
360        } else {
361            None
362        }
363    }
364
365    /// Push a handle to history with FIFO eviction at capacity.
366    async fn push_to_history(&self, handle: Arc<SubAgentHandle>) {
367        let mut completed = self.completed.write().await;
368        if completed.len() >= self.max_history {
369            completed.remove(0);
370        }
371        completed.push(handle);
372    }
373
374    /// Get active subagent by ID.
375    pub async fn get(&self, id: &SubAgentId) -> Option<Arc<SubAgentHandle>> {
376        self.active.read().await.get(id).cloned()
377    }
378
379    /// List active subagents.
380    pub async fn list_active(&self) -> Vec<Arc<SubAgentHandle>> {
381        self.active.read().await.values().cloned().collect()
382    }
383
384    /// Get count of active subagents.
385    pub async fn active_count(&self) -> usize {
386        self.active.read().await.len()
387    }
388
389    /// Get available capacity.
390    #[must_use]
391    pub fn available_permits(&self) -> usize {
392        self.semaphore.available_permits()
393    }
394
395    /// Check if a child can be spawned for the given parent without actually spawning.
396    ///
397    /// Returns `true` if both depth limit and concurrency permit are available.
398    pub async fn can_spawn_child(&self, parent_id: &SubAgentId) -> bool {
399        let active = self.active.read().await;
400        let parent_depth = if let Some(parent) = active.get(parent_id) {
401            parent.depth
402        } else {
403            return false; // parent not found
404        };
405
406        let Some(child_depth) = parent_depth.checked_add(1) else {
407            return false;
408        };
409        if child_depth >= self.max_depth {
410            return false;
411        }
412
413        self.semaphore.available_permits() > 0
414    }
415
416    /// Get the cancellation token for cooperative cancellation.
417    ///
418    /// Sub-agent executors should select on this token in their run loop.
419    /// Cancelling this token signals all sub-agents to stop cooperatively.
420    #[must_use]
421    pub fn cancellation_token(&self) -> CancellationToken {
422        self.cancellation_token.clone()
423    }
424
425    /// Cancel all active subagents and move them to history.
426    ///
427    /// Also cancels the cooperative cancellation token so in-flight sub-agents
428    /// can observe the cancellation and stop gracefully.
429    pub async fn cancel_all(&self) {
430        // Signal cooperative cancellation to all sub-agents.
431        self.cancellation_token.cancel();
432
433        let mut active = self.active.write().await;
434        let handles: Vec<Arc<SubAgentHandle>> = active.drain().map(|(_, h)| h).collect();
435        drop(active);
436
437        for handle in handles {
438            handle.cancel().await;
439            handle.release_permit();
440            self.push_to_history(handle).await;
441        }
442
443        self.completion_notify.notify_waiters();
444    }
445
446    /// Wait until the active pool is empty.
447    pub async fn wait_for_completion(&self) {
448        loop {
449            if self.active.read().await.is_empty() {
450                return;
451            }
452            self.completion_notify.notified().await;
453        }
454    }
455
456    /// Wait until the active pool is empty, or the timeout expires.
457    ///
458    /// Returns `true` if the pool drained before the timeout, `false` otherwise.
459    pub async fn wait_for_completion_timeout(&self, timeout: Duration) -> bool {
460        tokio::select! {
461            () = self.wait_for_completion() => true,
462            () = tokio::time::sleep(timeout) => {
463                self.active.read().await.is_empty()
464            }
465        }
466    }
467
468    /// Get direct children of a parent subagent (from both active and completed).
469    pub async fn get_children(&self, parent_id: &SubAgentId) -> Vec<Arc<SubAgentHandle>> {
470        let mut children = Vec::new();
471
472        let active = self.active.read().await;
473        for handle in active.values() {
474            if handle.parent_id.as_ref() == Some(parent_id) {
475                children.push(handle.clone());
476            }
477        }
478        drop(active);
479
480        let completed = self.completed.read().await;
481        for handle in &*completed {
482            if handle.parent_id.as_ref() == Some(parent_id) {
483                children.push(handle.clone());
484            }
485        }
486
487        children
488    }
489
490    /// Get all descendants of a parent subagent (BFS, from both active and completed).
491    pub async fn get_subtree(&self, parent_id: &SubAgentId) -> Vec<Arc<SubAgentHandle>> {
492        // Collect all handles into a single list for BFS
493        let active = self.active.read().await;
494        let completed = self.completed.read().await;
495
496        let all_handles: Vec<Arc<SubAgentHandle>> = active
497            .values()
498            .cloned()
499            .chain(completed.iter().cloned())
500            .collect();
501        drop(active);
502        drop(completed);
503
504        let mut result = Vec::new();
505        let mut queue = VecDeque::new();
506        queue.push_back(parent_id.clone());
507
508        while let Some(current_id) = queue.pop_front() {
509            for handle in &all_handles {
510                if handle.parent_id.as_ref() == Some(&current_id) {
511                    result.push(handle.clone());
512                    queue.push_back(handle.id.clone());
513                }
514            }
515        }
516
517        result
518    }
519
520    /// Cancel all active descendants of a parent subagent and move them to history.
521    ///
522    /// Returns the number of subagents cancelled.
523    pub async fn cancel_subtree(&self, parent_id: &SubAgentId) -> usize {
524        // First find all descendant IDs via BFS over the active pool
525        let active = self.active.read().await;
526        let mut to_cancel = Vec::new();
527        let mut queue = VecDeque::new();
528        queue.push_back(parent_id.clone());
529
530        while let Some(current_id) = queue.pop_front() {
531            for (id, handle) in active.iter() {
532                if handle.parent_id.as_ref() == Some(&current_id) {
533                    to_cancel.push(id.clone());
534                    queue.push_back(id.clone());
535                }
536            }
537        }
538        drop(active);
539
540        let mut cancelled = 0usize;
541        for id in &to_cancel {
542            if self.stop(id).await.is_some() {
543                cancelled = cancelled.saturating_add(1);
544            }
545        }
546        cancelled
547    }
548
549    /// Get completed subagents history.
550    pub async fn history(&self) -> Vec<Arc<SubAgentHandle>> {
551        self.completed.read().await.clone()
552    }
553
554    /// Clear completed history.
555    pub async fn clear_history(&self) {
556        self.completed.write().await.clear();
557    }
558
559    /// Get pool statistics.
560    pub async fn stats(&self) -> SubAgentPoolStats {
561        let active = self.active.read().await;
562        let completed = self.completed.read().await;
563
564        let (succeeded, failed, cancelled, timed_out) =
565            completed
566                .iter()
567                .fold(
568                    (0usize, 0usize, 0usize, 0usize),
569                    |(s, f, c, t), h| match h.final_status() {
570                        Some(SubAgentStatus::Completed) => (s.saturating_add(1), f, c, t),
571                        Some(SubAgentStatus::Failed) => (s, f.saturating_add(1), c, t),
572                        Some(SubAgentStatus::Cancelled) => (s, f, c.saturating_add(1), t),
573                        Some(SubAgentStatus::TimedOut) => (s, f, c, t.saturating_add(1)),
574                        _ => (s, f, c, t),
575                    },
576                );
577
578        SubAgentPoolStats {
579            max_concurrent: self.max_concurrent,
580            max_depth: self.max_depth,
581            active: active.len(),
582            available: self.semaphore.available_permits(),
583            total_completed: completed.len(),
584            succeeded,
585            failed,
586            cancelled,
587            timed_out,
588        }
589    }
590}
591
592/// Statistics for a subagent pool.
593#[derive(Debug, Clone)]
594pub struct SubAgentPoolStats {
595    /// Maximum concurrent subagents.
596    pub max_concurrent: usize,
597    /// Maximum nesting depth.
598    pub max_depth: usize,
599    /// Currently active subagents.
600    pub active: usize,
601    /// Available permits.
602    pub available: usize,
603    /// Total completed subagents.
604    pub total_completed: usize,
605    /// Successfully completed.
606    pub succeeded: usize,
607    /// Failed subagents.
608    pub failed: usize,
609    /// Cancelled subagents.
610    pub cancelled: usize,
611    /// Timed out subagents.
612    pub timed_out: usize,
613}
614
615#[cfg(test)]
616mod tests {
617    use super::*;
618
619    #[tokio::test]
620    async fn test_subagent_lifecycle() {
621        let handle = SubAgentHandle::new("test task", None, 0, None);
622
623        assert_eq!(handle.status().await, SubAgentStatus::Initializing);
624        assert!(!handle.is_done().await);
625
626        handle.mark_running().await;
627        assert_eq!(handle.status().await, SubAgentStatus::Running);
628
629        handle.complete("success").await;
630        assert_eq!(handle.status().await, SubAgentStatus::Completed);
631        assert!(handle.is_done().await);
632        assert_eq!(handle.result().await, Some("success".into()));
633    }
634
635    #[tokio::test]
636    async fn test_subagent_failure() {
637        let handle = SubAgentHandle::new("test task", None, 0, None);
638
639        handle.mark_running().await;
640        handle.fail("something went wrong").await;
641
642        assert_eq!(handle.status().await, SubAgentStatus::Failed);
643        assert!(handle.is_done().await);
644        assert_eq!(handle.error().await, Some("something went wrong".into()));
645    }
646
647    #[tokio::test]
648    async fn test_subagent_final_status() {
649        let handle = SubAgentHandle::new("test task", None, 0, None);
650        assert_eq!(handle.final_status(), None);
651
652        handle.mark_running().await;
653        assert_eq!(handle.final_status(), None);
654
655        handle.complete("done").await;
656        assert_eq!(handle.final_status(), Some(SubAgentStatus::Completed));
657    }
658
659    #[tokio::test]
660    async fn test_pool_spawn() {
661        let pool = SubAgentPool::new(5, 3);
662
663        let handle = pool.spawn("task 1", None).await.unwrap();
664        assert_eq!(pool.active_count().await, 1);
665
666        pool.release(&handle.id).await;
667        assert_eq!(pool.active_count().await, 0);
668        assert_eq!(pool.history().await.len(), 1);
669    }
670
671    #[tokio::test]
672    async fn test_pool_max_depth() {
673        let pool = SubAgentPool::new(5, 2);
674
675        // Depth 0
676        let h1 = pool.spawn("task 1", None).await.unwrap();
677        assert_eq!(h1.depth, 0);
678
679        // Depth 1
680        let h2 = pool.spawn("task 2", Some(h1.id.clone())).await.unwrap();
681        assert_eq!(h2.depth, 1);
682
683        // Depth 2 should fail (max_depth = 2 means 0 and 1 only)
684        let result = pool.spawn("task 3", Some(h2.id.clone())).await;
685        assert!(result.is_err());
686    }
687
688    #[tokio::test]
689    async fn test_pool_cancel_all() {
690        let pool = SubAgentPool::new(5, 3);
691
692        let h1 = pool.spawn("task 1", None).await.unwrap();
693        let h2 = pool.spawn("task 2", None).await.unwrap();
694
695        pool.cancel_all().await;
696
697        assert_eq!(h1.status().await, SubAgentStatus::Cancelled);
698        assert_eq!(h2.status().await, SubAgentStatus::Cancelled);
699        // Handles should be moved to history
700        assert_eq!(pool.active_count().await, 0);
701        assert_eq!(pool.history().await.len(), 2);
702    }
703
704    #[tokio::test]
705    async fn test_semaphore_limits_concurrency() {
706        let pool = SubAgentPool::new(2, 5);
707
708        let _h1 = pool.spawn("task 1", None).await.unwrap();
709        let _h2 = pool.spawn("task 2", None).await.unwrap();
710
711        // Third spawn should fail — semaphore exhausted
712        let result = pool.spawn("task 3", None).await;
713        assert!(result.is_err());
714        assert_eq!(pool.available_permits(), 0);
715    }
716
717    #[tokio::test]
718    async fn test_permit_released_on_complete() {
719        let pool = SubAgentPool::new(1, 5);
720
721        let h1 = pool.spawn("task 1", None).await.unwrap();
722        assert_eq!(pool.available_permits(), 0);
723
724        // Completing moves handle to history, dropping the permit
725        pool.release(&h1.id).await;
726        assert_eq!(pool.available_permits(), 1);
727
728        // Should be able to spawn again
729        let _h2 = pool.spawn("task 2", None).await.unwrap();
730        assert_eq!(pool.available_permits(), 0);
731    }
732
733    #[tokio::test]
734    async fn test_stop_cancels_and_moves_to_history() {
735        let pool = SubAgentPool::new(5, 3);
736
737        let h = pool.spawn("task 1", None).await.unwrap();
738        let id = h.id.clone();
739
740        let stopped = pool.stop(&id).await;
741        assert!(stopped.is_some());
742
743        let handle = stopped.unwrap();
744        assert_eq!(handle.status().await, SubAgentStatus::Cancelled);
745        assert_eq!(pool.active_count().await, 0);
746        assert_eq!(pool.history().await.len(), 1);
747    }
748
749    #[tokio::test]
750    async fn test_stop_nonexistent_returns_none() {
751        let pool = SubAgentPool::new(5, 3);
752        let result = pool.stop(&SubAgentId::new()).await;
753        assert!(result.is_none());
754    }
755
756    #[tokio::test]
757    async fn test_can_spawn_child_checks_depth_and_concurrency() {
758        let pool = SubAgentPool::new(3, 2);
759
760        // Depth 0
761        let h1 = pool.spawn("task 1", None).await.unwrap();
762        assert!(pool.can_spawn_child(&h1.id).await);
763
764        // Depth 1 (max_depth=2, so depth 1 is allowed but children of depth 1 are not)
765        let h2 = pool.spawn("task 2", Some(h1.id.clone())).await.unwrap();
766        assert!(!pool.can_spawn_child(&h2.id).await); // depth 2 >= max_depth
767
768        // Non-existent parent
769        assert!(!pool.can_spawn_child(&SubAgentId::new()).await);
770    }
771
772    #[tokio::test]
773    async fn test_can_spawn_child_checks_concurrency() {
774        let pool = SubAgentPool::new(2, 5);
775
776        let h1 = pool.spawn("task 1", None).await.unwrap();
777        let _h2 = pool.spawn("task 2", None).await.unwrap();
778
779        // No permits left
780        assert!(!pool.can_spawn_child(&h1.id).await);
781    }
782
783    #[tokio::test]
784    async fn test_wait_for_completion_returns_when_empty() {
785        let pool = Arc::new(SubAgentPool::new(5, 3));
786
787        // Empty pool — should return immediately
788        pool.wait_for_completion().await;
789
790        let h = pool.spawn("task 1", None).await.unwrap();
791        let h_id = h.id.clone();
792        let pool_clone = pool.clone();
793
794        let waiter = tokio::spawn(async move {
795            pool_clone.wait_for_completion().await;
796        });
797
798        // Give the waiter a moment to register
799        tokio::time::sleep(Duration::from_millis(10)).await;
800        assert!(!waiter.is_finished());
801
802        pool.release(&h_id).await;
803
804        // Waiter should finish now
805        tokio::time::timeout(Duration::from_secs(1), waiter)
806            .await
807            .expect("waiter should complete")
808            .expect("waiter should not panic");
809    }
810
811    #[tokio::test]
812    async fn test_wait_for_completion_timeout_returns_false_on_expiry() {
813        let pool = SubAgentPool::new(5, 3);
814
815        let _h = pool.spawn("task 1", None).await.unwrap();
816
817        let drained = pool
818            .wait_for_completion_timeout(Duration::from_millis(50))
819            .await;
820        assert!(!drained);
821    }
822
823    #[tokio::test]
824    async fn test_wait_for_completion_timeout_returns_true_on_drain() {
825        let pool = Arc::new(SubAgentPool::new(5, 3));
826
827        let h = pool.spawn("task 1", None).await.unwrap();
828        let h_id = h.id.clone();
829        let pool_clone = pool.clone();
830
831        tokio::spawn(async move {
832            tokio::time::sleep(Duration::from_millis(20)).await;
833            pool_clone.release(&h_id).await;
834        });
835
836        let drained = pool
837            .wait_for_completion_timeout(Duration::from_secs(2))
838            .await;
839        assert!(drained);
840    }
841
842    #[tokio::test]
843    async fn test_get_children_returns_direct_children_only() {
844        let pool = SubAgentPool::new(10, 5);
845
846        let root = pool.spawn("root", None).await.unwrap();
847        let child1 = pool.spawn("child1", Some(root.id.clone())).await.unwrap();
848        let child2 = pool.spawn("child2", Some(root.id.clone())).await.unwrap();
849        let _grandchild = pool
850            .spawn("grandchild", Some(child1.id.clone()))
851            .await
852            .unwrap();
853
854        let children = pool.get_children(&root.id).await;
855        assert_eq!(children.len(), 2);
856
857        let child_ids: Vec<_> = children.iter().map(|c| c.id.clone()).collect();
858        assert!(child_ids.contains(&child1.id));
859        assert!(child_ids.contains(&child2.id));
860    }
861
862    #[tokio::test]
863    async fn test_get_children_includes_completed() {
864        let pool = SubAgentPool::new(10, 5);
865
866        let root = pool.spawn("root", None).await.unwrap();
867        let child = pool.spawn("child", Some(root.id.clone())).await.unwrap();
868        let child_id = child.id.clone();
869
870        // Move child to completed
871        pool.release(&child_id).await;
872
873        let children = pool.get_children(&root.id).await;
874        assert_eq!(children.len(), 1);
875        assert_eq!(children[0].id, child_id);
876    }
877
878    #[tokio::test]
879    async fn test_get_subtree_returns_all_descendants() {
880        let pool = SubAgentPool::new(10, 5);
881
882        let root = pool.spawn("root", None).await.unwrap();
883        let child1 = pool.spawn("child1", Some(root.id.clone())).await.unwrap();
884        let child2 = pool.spawn("child2", Some(root.id.clone())).await.unwrap();
885        let grandchild1 = pool
886            .spawn("grandchild1", Some(child1.id.clone()))
887            .await
888            .unwrap();
889        let grandchild2 = pool
890            .spawn("grandchild2", Some(child2.id.clone()))
891            .await
892            .unwrap();
893
894        let subtree = pool.get_subtree(&root.id).await;
895        assert_eq!(subtree.len(), 4);
896
897        let ids: Vec<_> = subtree.iter().map(|h| h.id.clone()).collect();
898        assert!(ids.contains(&child1.id));
899        assert!(ids.contains(&child2.id));
900        assert!(ids.contains(&grandchild1.id));
901        assert!(ids.contains(&grandchild2.id));
902        // Root itself should NOT be in the subtree
903        assert!(!ids.contains(&root.id));
904    }
905
906    #[tokio::test]
907    async fn test_cancel_subtree_cancels_only_descendants() {
908        let pool = SubAgentPool::new(10, 5);
909
910        let root = pool.spawn("root", None).await.unwrap();
911        let child = pool.spawn("child", Some(root.id.clone())).await.unwrap();
912        let grandchild = pool
913            .spawn("grandchild", Some(child.id.clone()))
914            .await
915            .unwrap();
916        let sibling = pool.spawn("sibling", None).await.unwrap();
917
918        let cancelled = pool.cancel_subtree(&root.id).await;
919        assert_eq!(cancelled, 2); // child + grandchild
920
921        // Root should still be active
922        assert!(pool.get(&root.id).await.is_some());
923        // Child and grandchild should be in history
924        assert!(pool.get(&child.id).await.is_none());
925        assert!(pool.get(&grandchild.id).await.is_none());
926        // Sibling should still be active
927        assert!(pool.get(&sibling.id).await.is_some());
928    }
929
930    #[tokio::test]
931    async fn test_stats_counts_are_accurate() {
932        let pool = SubAgentPool::new(10, 5);
933
934        let h1 = pool.spawn("task 1", None).await.unwrap();
935        let h2 = pool.spawn("task 2", None).await.unwrap();
936        let h3 = pool.spawn("task 3", None).await.unwrap();
937        let h4 = pool.spawn("task 4", None).await.unwrap();
938
939        // Complete one successfully
940        h1.complete("result").await;
941        pool.release(&h1.id).await;
942
943        // Fail one
944        h2.fail("error").await;
945        pool.release(&h2.id).await;
946
947        // Cancel one
948        pool.stop(&h3.id).await;
949
950        // Timeout one
951        h4.timeout().await;
952        pool.release(&h4.id).await;
953
954        let stats = pool.stats().await;
955        assert_eq!(stats.max_concurrent, 10);
956        assert_eq!(stats.max_depth, 5);
957        assert_eq!(stats.active, 0);
958        assert_eq!(stats.total_completed, 4);
959        assert_eq!(stats.succeeded, 1);
960        assert_eq!(stats.failed, 1);
961        assert_eq!(stats.cancelled, 1);
962        assert_eq!(stats.timed_out, 1);
963    }
964
965    #[tokio::test]
966    async fn test_stats_with_active() {
967        let pool = SubAgentPool::new(5, 3);
968
969        let _h1 = pool.spawn("task 1", None).await.unwrap();
970        let h2 = pool.spawn("task 2", None).await.unwrap();
971        h2.complete("done").await;
972        pool.release(&h2.id).await;
973
974        let stats = pool.stats().await;
975        assert_eq!(stats.active, 1);
976        assert_eq!(stats.total_completed, 1);
977        assert_eq!(stats.succeeded, 1);
978        assert_eq!(stats.available, 4);
979    }
980
981    #[tokio::test]
982    async fn test_history_eviction_at_capacity() {
983        let pool = SubAgentPool::with_max_history(10, 5, 3);
984
985        // Spawn and release 5 subagents into a pool with max_history=3
986        for i in 0..5 {
987            let h = pool.spawn(format!("task {i}"), None).await.unwrap();
988            h.complete(format!("result {i}")).await;
989            pool.release(&h.id).await;
990        }
991
992        // Only the 3 most recent should be in history
993        let history = pool.history().await;
994        assert_eq!(history.len(), 3);
995        // Oldest entry should be task 2 (tasks 0 and 1 were evicted)
996        assert_eq!(history[0].task, "task 2");
997        assert_eq!(history[1].task, "task 3");
998        assert_eq!(history[2].task, "task 4");
999    }
1000}