Skip to main content

orcs_runtime/channel/runner/
child_spawner.rs

1//! Child spawner for managing spawned children within a Runner.
2//!
3//! The ChildSpawner manages the lifecycle of children spawned by
4//! a Component or Child. It handles:
5//!
6//! - Spawning new children from configs
7//! - Signal propagation to all children
8//! - Tracking child status
9//! - Cleanup on abort
10//!
11//! # Architecture
12//!
13//! ```text
14//! ┌─────────────────────────────────────────────────────────────┐
15//! │                     ChildSpawner                             │
16//! │                                                              │
17//! │  spawn(config) ──► ManagedChild ──► RunnableChild           │
18//! │                         │                                    │
19//! │                         └─► child.status() (direct query)   │
20//! │                                                              │
21//! │  propagate_signal() ──► all children (via on_signal)        │
22//! │  abort_all() ──► stop all children                          │
23//! │  reap_finished() ──► remove completed/aborted children      │
24//! └─────────────────────────────────────────────────────────────┘
25//! ```
26
27use super::base::OutputSender;
28use orcs_component::{
29    async_trait, AsyncChildHandle, ChildConfig, ChildHandle, ChildResult, RunError, RunnableChild,
30    SpawnError, Status,
31};
32use orcs_event::Signal;
33use std::collections::HashMap;
34use std::fmt::Debug;
35use std::sync::{Arc, Mutex};
36
37/// Default maximum number of children per spawner.
38const DEFAULT_MAX_CHILDREN: usize = 64;
39
40/// A managed child instance.
41struct ManagedChild {
42    /// The child instance.
43    child: Arc<Mutex<Box<dyn RunnableChild>>>,
44}
45
46/// Handle to a spawned child.
47///
48/// Provides control over a child that was spawned via [`ChildSpawner::spawn()`].
49pub struct SpawnedChildHandle {
50    /// Child ID.
51    id: String,
52    /// The child instance (shared with ManagedChild).
53    child: Arc<Mutex<Box<dyn RunnableChild>>>,
54}
55
56impl std::fmt::Debug for SpawnedChildHandle {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        let mut s = f.debug_struct("SpawnedChildHandle");
59        s.field("id", &self.id);
60
61        // Use try_lock to avoid deadlock when Debug is called while holding lock
62        match self.child.try_lock() {
63            Ok(guard) => s.field("status", &guard.status()),
64            Err(_) => s.field("status", &"<locked>"),
65        };
66
67        s.finish()
68    }
69}
70
71impl ChildHandle for SpawnedChildHandle {
72    fn id(&self) -> &str {
73        &self.id
74    }
75
76    fn status(&self) -> Status {
77        self.child
78            .lock()
79            .map(|c| c.status())
80            .unwrap_or(Status::Error)
81    }
82
83    fn run_sync(&mut self, input: serde_json::Value) -> Result<ChildResult, RunError> {
84        let mut child = self
85            .child
86            .lock()
87            .map_err(|e| RunError::ExecutionFailed(format!("lock failed: {}", e)))?;
88
89        Ok(child.run(input))
90    }
91
92    fn abort(&mut self) {
93        if let Ok(mut child) = self.child.lock() {
94            child.abort();
95        }
96    }
97
98    fn is_finished(&self) -> bool {
99        self.child
100            .lock()
101            .map(|c| {
102                let status = c.status();
103                matches!(status, Status::Completed | Status::Error | Status::Aborted)
104            })
105            .unwrap_or(true) // If lock fails, treat as finished
106    }
107}
108
109#[async_trait]
110impl AsyncChildHandle for SpawnedChildHandle {
111    fn id(&self) -> &str {
112        &self.id
113    }
114
115    fn status(&self) -> Status {
116        self.child
117            .lock()
118            .map(|c| c.status())
119            .unwrap_or(Status::Error)
120    }
121
122    async fn run(&mut self, input: serde_json::Value) -> Result<ChildResult, RunError> {
123        // Clone Arc for move into spawn_blocking
124        let child = Arc::clone(&self.child);
125
126        // Run synchronous child in blocking task
127        tokio::task::spawn_blocking(move || {
128            let mut guard = child
129                .lock()
130                .map_err(|e| RunError::ExecutionFailed(format!("lock failed: {}", e)))?;
131            Ok(guard.run(input))
132        })
133        .await
134        .map_err(|e| RunError::ExecutionFailed(format!("task join failed: {}", e)))?
135    }
136
137    fn abort(&mut self) {
138        if let Ok(mut child) = self.child.lock() {
139            child.abort();
140        }
141    }
142
143    fn is_finished(&self) -> bool {
144        self.child
145            .lock()
146            .map(|c| {
147                let status = c.status();
148                matches!(status, Status::Completed | Status::Error | Status::Aborted)
149            })
150            .unwrap_or(true)
151    }
152}
153
154/// Spawner for managing children within a Runner.
155pub struct ChildSpawner {
156    /// Spawned children by ID.
157    children: HashMap<String, ManagedChild>,
158    /// Maximum allowed children.
159    max_children: usize,
160    /// Parent ID (Component or Child that owns this spawner).
161    parent_id: String,
162    /// Output sender (for child output to parent).
163    output_tx: OutputSender,
164}
165
166impl ChildSpawner {
167    /// Creates a new ChildSpawner.
168    ///
169    /// # Arguments
170    ///
171    /// * `parent_id` - ID of the parent Component/Child
172    /// * `output_tx` - Sender for output events
173    #[must_use]
174    pub fn new(parent_id: impl Into<String>, output_tx: OutputSender) -> Self {
175        Self {
176            children: HashMap::new(),
177            max_children: DEFAULT_MAX_CHILDREN,
178            parent_id: parent_id.into(),
179            output_tx,
180        }
181    }
182
183    /// Sets the maximum number of children.
184    #[must_use]
185    pub fn with_max_children(mut self, max: usize) -> Self {
186        self.max_children = max;
187        self
188    }
189
190    /// Returns the number of active children.
191    #[must_use]
192    pub fn child_count(&self) -> usize {
193        self.children.len()
194    }
195
196    /// Returns the maximum allowed children.
197    #[must_use]
198    pub fn max_children(&self) -> usize {
199        self.max_children
200    }
201
202    /// Returns the parent ID.
203    #[must_use]
204    pub fn parent_id(&self) -> &str {
205        &self.parent_id
206    }
207
208    /// Internal spawn logic shared by `spawn()` and `spawn_async()`.
209    fn spawn_inner(
210        &mut self,
211        config: ChildConfig,
212        child: Box<dyn RunnableChild>,
213    ) -> Result<SpawnedChildHandle, SpawnError> {
214        // Check limits
215        if self.children.len() >= self.max_children {
216            return Err(SpawnError::MaxChildrenReached(self.max_children));
217        }
218
219        // Check for duplicate ID
220        if self.children.contains_key(&config.id) {
221            return Err(SpawnError::AlreadyExists(config.id.clone()));
222        }
223
224        let id = config.id.clone();
225
226        // Wrap child in Arc<Mutex> for sharing
227        let child_arc = Arc::new(Mutex::new(child));
228
229        // Create managed child
230        let managed = ManagedChild {
231            child: Arc::clone(&child_arc),
232        };
233
234        // Store managed child
235        self.children.insert(id.clone(), managed);
236
237        tracing::debug!(
238            parent = %self.parent_id,
239            child_id = %id,
240            child_count = self.children.len(),
241            "child spawned"
242        );
243
244        Ok(SpawnedChildHandle {
245            id,
246            child: child_arc,
247        })
248    }
249
250    /// Spawns a child and returns a handle.
251    ///
252    /// # Arguments
253    ///
254    /// * `config` - Configuration for the child
255    /// * `child` - The RunnableChild instance
256    ///
257    /// # Errors
258    ///
259    /// Returns error if:
260    /// - Max children reached
261    /// - Child with same ID already exists
262    pub fn spawn(
263        &mut self,
264        config: ChildConfig,
265        child: Box<dyn RunnableChild>,
266    ) -> Result<Box<dyn ChildHandle>, SpawnError> {
267        Ok(Box::new(self.spawn_inner(config, child)?))
268    }
269
270    /// Spawns a child and returns an async handle.
271    ///
272    /// Unlike [`Self::spawn()`], this returns a handle that properly uses
273    /// `spawn_blocking` for async execution.
274    ///
275    /// # Arguments
276    ///
277    /// * `config` - Configuration for the child
278    /// * `child` - The RunnableChild instance
279    ///
280    /// # Errors
281    ///
282    /// Returns error if:
283    /// - Max children reached
284    /// - Child with same ID already exists
285    pub fn spawn_async(
286        &mut self,
287        config: ChildConfig,
288        child: Box<dyn RunnableChild>,
289    ) -> Result<Box<dyn AsyncChildHandle>, SpawnError> {
290        Ok(Box::new(self.spawn_inner(config, child)?))
291    }
292
293    /// Propagates a signal to all children.
294    pub fn propagate_signal(&mut self, signal: &Signal) {
295        for (id, managed) in &mut self.children {
296            if let Ok(mut child) = managed.child.lock() {
297                let response = child.on_signal(signal);
298                if matches!(response, orcs_event::SignalResponse::Abort) {
299                    tracing::debug!("Child {} aborted on signal", id);
300                }
301            }
302        }
303    }
304
305    /// Aborts all children.
306    pub fn abort_all(&mut self) {
307        for (id, managed) in &mut self.children {
308            if let Ok(mut child) = managed.child.lock() {
309                child.abort();
310                tracing::debug!("Aborted child {}", id);
311            }
312        }
313    }
314
315    /// Removes finished children and returns their results.
316    pub fn reap_finished(&mut self) -> Vec<(String, Status)> {
317        let mut finished = Vec::new();
318
319        self.children.retain(|id, managed| {
320            let status = managed
321                .child
322                .lock()
323                .map(|c| c.status())
324                .unwrap_or(Status::Error);
325
326            if matches!(status, Status::Completed | Status::Error | Status::Aborted) {
327                tracing::debug!(
328                    parent = %self.parent_id,
329                    child_id = %id,
330                    status = ?status,
331                    "child reaped"
332                );
333                finished.push((id.clone(), status));
334                false // Remove from map
335            } else {
336                true // Keep in map
337            }
338        });
339
340        finished
341    }
342
343    /// Returns the IDs of all active children.
344    #[must_use]
345    pub fn child_ids(&self) -> Vec<String> {
346        self.children.keys().cloned().collect()
347    }
348
349    /// Returns the output sender.
350    #[must_use]
351    pub fn output_tx(&self) -> &OutputSender {
352        &self.output_tx
353    }
354
355    /// Returns a clone of the child's Arc for out-of-lock execution.
356    ///
357    /// Used by `send_to_children_batch` to collect child references while
358    /// briefly holding the spawner lock, then release it before running.
359    #[must_use]
360    pub fn get_child_arc(&self, id: &str) -> Option<Arc<Mutex<Box<dyn RunnableChild>>>> {
361        self.children.get(id).map(|m| Arc::clone(&m.child))
362    }
363
364    /// Runs a child by ID with the given input.
365    ///
366    /// # Arguments
367    ///
368    /// * `id` - The child ID
369    /// * `input` - Input data to pass to the child
370    ///
371    /// # Returns
372    ///
373    /// The child's result, or an error if:
374    /// - Child not found
375    /// - Child lock failed
376    pub fn run_child(
377        &self,
378        id: &str,
379        input: serde_json::Value,
380    ) -> Result<ChildResult, orcs_component::RunError> {
381        let managed = self.children.get(id).ok_or_else(|| {
382            orcs_component::RunError::ExecutionFailed(format!("child not found: {}", id))
383        })?;
384
385        let mut child = managed.child.lock().map_err(|e| {
386            orcs_component::RunError::ExecutionFailed(format!("child lock failed: {}", e))
387        })?;
388
389        Ok(child.run(input))
390    }
391}
392
393impl Debug for ChildSpawner {
394    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395        f.debug_struct("ChildSpawner")
396            .field("parent_id", &self.parent_id)
397            .field("child_count", &self.children.len())
398            .field("max_children", &self.max_children)
399            .finish()
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use orcs_component::{Child, Identifiable, SignalReceiver, Statusable};
407    use orcs_event::SignalResponse;
408    use serde_json::json;
409
410    /// Test implementation of RunnableChild.
411    struct TestWorker {
412        id: String,
413        status: Status,
414    }
415
416    impl TestWorker {
417        fn new(id: &str) -> Self {
418            Self {
419                id: id.into(),
420                status: Status::Idle,
421            }
422        }
423    }
424
425    impl Identifiable for TestWorker {
426        fn id(&self) -> &str {
427            &self.id
428        }
429    }
430
431    impl SignalReceiver for TestWorker {
432        fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
433            if signal.is_veto() {
434                self.status = Status::Aborted;
435                SignalResponse::Abort
436            } else {
437                SignalResponse::Handled
438            }
439        }
440
441        fn abort(&mut self) {
442            self.status = Status::Aborted;
443        }
444    }
445
446    impl Statusable for TestWorker {
447        fn status(&self) -> Status {
448            self.status
449        }
450    }
451
452    impl Child for TestWorker {}
453
454    impl RunnableChild for TestWorker {
455        fn run(&mut self, input: serde_json::Value) -> ChildResult {
456            self.status = Status::Running;
457            // Simple echo
458            self.status = Status::Idle;
459            ChildResult::Ok(input)
460        }
461    }
462
463    fn setup() -> ChildSpawner {
464        let (output_tx, _) = OutputSender::channel(64);
465        ChildSpawner::new("test-parent", output_tx)
466    }
467
468    #[test]
469    fn spawn_child() {
470        let mut spawner = setup();
471        let config = ChildConfig::new("worker-1");
472        let worker = Box::new(TestWorker::new("worker-1"));
473
474        let result = spawner.spawn(config, worker);
475        assert!(result.is_ok());
476        assert_eq!(spawner.child_count(), 1);
477    }
478
479    #[test]
480    fn spawn_duplicate_fails() {
481        let mut spawner = setup();
482
483        let config1 = ChildConfig::new("worker-1");
484        let worker1 = Box::new(TestWorker::new("worker-1"));
485        spawner
486            .spawn(config1, worker1)
487            .expect("spawn first worker should succeed");
488
489        let config2 = ChildConfig::new("worker-1");
490        let worker2 = Box::new(TestWorker::new("worker-1"));
491        let result = spawner.spawn(config2, worker2);
492
493        assert!(matches!(result, Err(SpawnError::AlreadyExists(_))));
494    }
495
496    #[test]
497    fn spawn_max_children_limit() {
498        let mut spawner = setup().with_max_children(2);
499
500        for i in 0..2 {
501            let config = ChildConfig::new(format!("worker-{}", i));
502            let worker = Box::new(TestWorker::new(&format!("worker-{}", i)));
503            spawner
504                .spawn(config, worker)
505                .expect("spawn worker within limit should succeed");
506        }
507
508        let config = ChildConfig::new("worker-overflow");
509        let worker = Box::new(TestWorker::new("worker-overflow"));
510        let result = spawner.spawn(config, worker);
511
512        assert!(matches!(result, Err(SpawnError::MaxChildrenReached(2))));
513    }
514
515    #[test]
516    fn child_handle_run() {
517        let mut spawner = setup();
518        let config = ChildConfig::new("worker-1");
519        let worker = Box::new(TestWorker::new("worker-1"));
520
521        let mut handle = spawner
522            .spawn(config, worker)
523            .expect("spawn worker for run test");
524        let result = handle
525            .run_sync(json!({"test": true}))
526            .expect("run_sync should succeed");
527
528        assert!(result.is_ok());
529        if let ChildResult::Ok(data) = result {
530            assert_eq!(data["test"], true);
531        }
532    }
533
534    #[test]
535    fn child_handle_status() {
536        let mut spawner = setup();
537        let config = ChildConfig::new("worker-1");
538        let worker = Box::new(TestWorker::new("worker-1"));
539
540        let handle = spawner
541            .spawn(config, worker)
542            .expect("spawn worker for status check");
543        assert_eq!(handle.status(), Status::Idle);
544    }
545
546    #[test]
547    fn child_handle_abort() {
548        let mut spawner = setup();
549        let config = ChildConfig::new("worker-1");
550        let worker = Box::new(TestWorker::new("worker-1"));
551
552        let mut handle = spawner
553            .spawn(config, worker)
554            .expect("spawn worker for abort test");
555        handle.abort();
556
557        // Status should be aborted
558        assert!(handle.is_finished());
559    }
560
561    #[test]
562    fn abort_all() {
563        let mut spawner = setup();
564
565        for i in 0..3 {
566            let config = ChildConfig::new(format!("worker-{}", i));
567            let worker = Box::new(TestWorker::new(&format!("worker-{}", i)));
568            spawner
569                .spawn(config, worker)
570                .expect("spawn worker for abort_all test");
571        }
572
573        spawner.abort_all();
574
575        // All children should be aborted
576        let finished = spawner.reap_finished();
577        assert_eq!(finished.len(), 3);
578        for (_, status) in finished {
579            assert_eq!(status, Status::Aborted);
580        }
581    }
582
583    #[test]
584    fn child_ids() {
585        let mut spawner = setup();
586
587        for i in 0..3 {
588            let config = ChildConfig::new(format!("worker-{}", i));
589            let worker = Box::new(TestWorker::new(&format!("worker-{}", i)));
590            spawner
591                .spawn(config, worker)
592                .expect("spawn worker for child_ids test");
593        }
594
595        let ids = spawner.child_ids();
596        assert_eq!(ids.len(), 3);
597        assert!(ids.contains(&"worker-0".to_string()));
598        assert!(ids.contains(&"worker-1".to_string()));
599        assert!(ids.contains(&"worker-2".to_string()));
600    }
601
602    // --- AsyncChildHandle tests ---
603
604    /// Helper to create SpawnedChildHandle directly for async testing.
605    fn create_spawned_handle(id: &str) -> SpawnedChildHandle {
606        let worker = Box::new(TestWorker::new(id)) as Box<dyn RunnableChild>;
607        SpawnedChildHandle {
608            id: id.to_string(),
609            child: Arc::new(Mutex::new(worker)),
610        }
611    }
612
613    #[tokio::test]
614    async fn async_child_handle_run() {
615        let mut handle = create_spawned_handle("async-worker-1");
616
617        let result = AsyncChildHandle::run(&mut handle, json!({"async": true}))
618            .await
619            .expect("async child run should succeed");
620
621        assert!(result.is_ok());
622        if let ChildResult::Ok(data) = result {
623            assert_eq!(data["async"], true);
624        }
625    }
626
627    #[tokio::test]
628    async fn async_child_handle_status() {
629        let handle = create_spawned_handle("async-worker");
630
631        assert_eq!(AsyncChildHandle::status(&handle), Status::Idle);
632        assert_eq!(AsyncChildHandle::id(&handle), "async-worker");
633    }
634
635    #[tokio::test]
636    async fn async_child_handle_abort() {
637        let mut handle = create_spawned_handle("async-worker");
638
639        AsyncChildHandle::abort(&mut handle);
640
641        assert!(AsyncChildHandle::is_finished(&handle));
642    }
643
644    #[tokio::test]
645    async fn async_child_handle_object_safety() {
646        let handle = create_spawned_handle("async-boxed");
647
648        // Can be used as Box<dyn AsyncChildHandle>
649        let boxed: Box<dyn AsyncChildHandle> = Box::new(handle);
650
651        assert_eq!(boxed.id(), "async-boxed");
652        assert_eq!(boxed.status(), Status::Idle);
653    }
654
655    // --- get_child_arc tests ---
656
657    #[test]
658    fn get_child_arc_returns_some_for_existing_child() {
659        let mut spawner = setup();
660        let config = ChildConfig::new("worker-1");
661        let worker = Box::new(TestWorker::new("worker-1"));
662        spawner
663            .spawn(config, worker)
664            .expect("spawn worker for get_child_arc test");
665
666        let arc = spawner.get_child_arc("worker-1");
667        assert!(arc.is_some(), "should return Some for existing child");
668    }
669
670    #[test]
671    fn get_child_arc_returns_none_for_missing_child() {
672        let spawner = setup();
673        let arc = spawner.get_child_arc("nonexistent");
674        assert!(arc.is_none(), "should return None for missing child");
675    }
676
677    #[test]
678    fn get_child_arc_shares_state_with_managed_child() {
679        let mut spawner = setup();
680        let config = ChildConfig::new("worker-1");
681        let worker = Box::new(TestWorker::new("worker-1"));
682        spawner
683            .spawn(config, worker)
684            .expect("spawn worker for shared state test");
685
686        let arc = spawner.get_child_arc("worker-1").expect("get arc");
687        // Run via arc
688        {
689            let mut child = arc.lock().expect("lock child arc");
690            let result = child.run(json!({"via": "arc"}));
691            assert!(result.is_ok(), "should run successfully via arc");
692        }
693        // Run via spawner — same child, should still work
694        let result = spawner.run_child("worker-1", json!({"via": "spawner"}));
695        assert!(
696            result.is_ok(),
697            "should run successfully via spawner after arc use"
698        );
699    }
700}