orcs_runtime/channel/runner/
child_spawner.rs1use super::base::OutputSender;
28use orcs_component::{
29 async_trait, AsyncChildHandle, ChildConfig, ChildHandle, ChildResult, RunError, RunnableChild,
30 SpawnError, Status,
31};
32use orcs_event::Signal;
33use std::collections::HashMap;
34use std::fmt::Debug;
35use std::sync::{Arc, Mutex};
36
37const DEFAULT_MAX_CHILDREN: usize = 64;
39
40struct ManagedChild {
42 child: Arc<Mutex<Box<dyn RunnableChild>>>,
44}
45
46pub struct SpawnedChildHandle {
50 id: String,
52 child: Arc<Mutex<Box<dyn RunnableChild>>>,
54}
55
56impl std::fmt::Debug for SpawnedChildHandle {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 let mut s = f.debug_struct("SpawnedChildHandle");
59 s.field("id", &self.id);
60
61 match self.child.try_lock() {
63 Ok(guard) => s.field("status", &guard.status()),
64 Err(_) => s.field("status", &"<locked>"),
65 };
66
67 s.finish()
68 }
69}
70
71impl ChildHandle for SpawnedChildHandle {
72 fn id(&self) -> &str {
73 &self.id
74 }
75
76 fn status(&self) -> Status {
77 self.child
78 .lock()
79 .map(|c| c.status())
80 .unwrap_or(Status::Error)
81 }
82
83 fn run_sync(&mut self, input: serde_json::Value) -> Result<ChildResult, RunError> {
84 let mut child = self
85 .child
86 .lock()
87 .map_err(|e| RunError::ExecutionFailed(format!("lock failed: {}", e)))?;
88
89 Ok(child.run(input))
90 }
91
92 fn abort(&mut self) {
93 if let Ok(mut child) = self.child.lock() {
94 child.abort();
95 }
96 }
97
98 fn is_finished(&self) -> bool {
99 self.child
100 .lock()
101 .map(|c| {
102 let status = c.status();
103 matches!(status, Status::Completed | Status::Error | Status::Aborted)
104 })
105 .unwrap_or(true) }
107}
108
109#[async_trait]
110impl AsyncChildHandle for SpawnedChildHandle {
111 fn id(&self) -> &str {
112 &self.id
113 }
114
115 fn status(&self) -> Status {
116 self.child
117 .lock()
118 .map(|c| c.status())
119 .unwrap_or(Status::Error)
120 }
121
122 async fn run(&mut self, input: serde_json::Value) -> Result<ChildResult, RunError> {
123 let child = Arc::clone(&self.child);
125
126 tokio::task::spawn_blocking(move || {
128 let mut guard = child
129 .lock()
130 .map_err(|e| RunError::ExecutionFailed(format!("lock failed: {}", e)))?;
131 Ok(guard.run(input))
132 })
133 .await
134 .map_err(|e| RunError::ExecutionFailed(format!("task join failed: {}", e)))?
135 }
136
137 fn abort(&mut self) {
138 if let Ok(mut child) = self.child.lock() {
139 child.abort();
140 }
141 }
142
143 fn is_finished(&self) -> bool {
144 self.child
145 .lock()
146 .map(|c| {
147 let status = c.status();
148 matches!(status, Status::Completed | Status::Error | Status::Aborted)
149 })
150 .unwrap_or(true)
151 }
152}
153
154pub struct ChildSpawner {
156 children: HashMap<String, ManagedChild>,
158 max_children: usize,
160 parent_id: String,
162 output_tx: OutputSender,
164}
165
166impl ChildSpawner {
167 #[must_use]
174 pub fn new(parent_id: impl Into<String>, output_tx: OutputSender) -> Self {
175 Self {
176 children: HashMap::new(),
177 max_children: DEFAULT_MAX_CHILDREN,
178 parent_id: parent_id.into(),
179 output_tx,
180 }
181 }
182
183 #[must_use]
185 pub fn with_max_children(mut self, max: usize) -> Self {
186 self.max_children = max;
187 self
188 }
189
190 #[must_use]
192 pub fn child_count(&self) -> usize {
193 self.children.len()
194 }
195
196 #[must_use]
198 pub fn max_children(&self) -> usize {
199 self.max_children
200 }
201
202 #[must_use]
204 pub fn parent_id(&self) -> &str {
205 &self.parent_id
206 }
207
208 fn spawn_inner(
210 &mut self,
211 config: ChildConfig,
212 child: Box<dyn RunnableChild>,
213 ) -> Result<SpawnedChildHandle, SpawnError> {
214 if self.children.len() >= self.max_children {
216 return Err(SpawnError::MaxChildrenReached(self.max_children));
217 }
218
219 if self.children.contains_key(&config.id) {
221 return Err(SpawnError::AlreadyExists(config.id.clone()));
222 }
223
224 let id = config.id.clone();
225
226 let child_arc = Arc::new(Mutex::new(child));
228
229 let managed = ManagedChild {
231 child: Arc::clone(&child_arc),
232 };
233
234 self.children.insert(id.clone(), managed);
236
237 tracing::debug!(
238 parent = %self.parent_id,
239 child_id = %id,
240 child_count = self.children.len(),
241 "child spawned"
242 );
243
244 Ok(SpawnedChildHandle {
245 id,
246 child: child_arc,
247 })
248 }
249
250 pub fn spawn(
263 &mut self,
264 config: ChildConfig,
265 child: Box<dyn RunnableChild>,
266 ) -> Result<Box<dyn ChildHandle>, SpawnError> {
267 Ok(Box::new(self.spawn_inner(config, child)?))
268 }
269
270 pub fn spawn_async(
286 &mut self,
287 config: ChildConfig,
288 child: Box<dyn RunnableChild>,
289 ) -> Result<Box<dyn AsyncChildHandle>, SpawnError> {
290 Ok(Box::new(self.spawn_inner(config, child)?))
291 }
292
293 pub fn propagate_signal(&mut self, signal: &Signal) {
295 for (id, managed) in &mut self.children {
296 if let Ok(mut child) = managed.child.lock() {
297 let response = child.on_signal(signal);
298 if matches!(response, orcs_event::SignalResponse::Abort) {
299 tracing::debug!("Child {} aborted on signal", id);
300 }
301 }
302 }
303 }
304
305 pub fn abort_all(&mut self) {
307 for (id, managed) in &mut self.children {
308 if let Ok(mut child) = managed.child.lock() {
309 child.abort();
310 tracing::debug!("Aborted child {}", id);
311 }
312 }
313 }
314
315 pub fn reap_finished(&mut self) -> Vec<(String, Status)> {
317 let mut finished = Vec::new();
318
319 self.children.retain(|id, managed| {
320 let status = managed
321 .child
322 .lock()
323 .map(|c| c.status())
324 .unwrap_or(Status::Error);
325
326 if matches!(status, Status::Completed | Status::Error | Status::Aborted) {
327 tracing::debug!(
328 parent = %self.parent_id,
329 child_id = %id,
330 status = ?status,
331 "child reaped"
332 );
333 finished.push((id.clone(), status));
334 false } else {
336 true }
338 });
339
340 finished
341 }
342
343 #[must_use]
345 pub fn child_ids(&self) -> Vec<String> {
346 self.children.keys().cloned().collect()
347 }
348
349 #[must_use]
351 pub fn output_tx(&self) -> &OutputSender {
352 &self.output_tx
353 }
354
355 #[must_use]
360 pub fn get_child_arc(&self, id: &str) -> Option<Arc<Mutex<Box<dyn RunnableChild>>>> {
361 self.children.get(id).map(|m| Arc::clone(&m.child))
362 }
363
364 pub fn run_child(
377 &self,
378 id: &str,
379 input: serde_json::Value,
380 ) -> Result<ChildResult, orcs_component::RunError> {
381 let managed = self.children.get(id).ok_or_else(|| {
382 orcs_component::RunError::ExecutionFailed(format!("child not found: {}", id))
383 })?;
384
385 let mut child = managed.child.lock().map_err(|e| {
386 orcs_component::RunError::ExecutionFailed(format!("child lock failed: {}", e))
387 })?;
388
389 Ok(child.run(input))
390 }
391}
392
393impl Debug for ChildSpawner {
394 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395 f.debug_struct("ChildSpawner")
396 .field("parent_id", &self.parent_id)
397 .field("child_count", &self.children.len())
398 .field("max_children", &self.max_children)
399 .finish()
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use orcs_component::{Child, Identifiable, SignalReceiver, Statusable};
407 use orcs_event::SignalResponse;
408 use serde_json::json;
409
410 struct TestWorker {
412 id: String,
413 status: Status,
414 }
415
416 impl TestWorker {
417 fn new(id: &str) -> Self {
418 Self {
419 id: id.into(),
420 status: Status::Idle,
421 }
422 }
423 }
424
425 impl Identifiable for TestWorker {
426 fn id(&self) -> &str {
427 &self.id
428 }
429 }
430
431 impl SignalReceiver for TestWorker {
432 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
433 if signal.is_veto() {
434 self.status = Status::Aborted;
435 SignalResponse::Abort
436 } else {
437 SignalResponse::Handled
438 }
439 }
440
441 fn abort(&mut self) {
442 self.status = Status::Aborted;
443 }
444 }
445
446 impl Statusable for TestWorker {
447 fn status(&self) -> Status {
448 self.status
449 }
450 }
451
452 impl Child for TestWorker {}
453
454 impl RunnableChild for TestWorker {
455 fn run(&mut self, input: serde_json::Value) -> ChildResult {
456 self.status = Status::Running;
457 self.status = Status::Idle;
459 ChildResult::Ok(input)
460 }
461 }
462
463 fn setup() -> ChildSpawner {
464 let (output_tx, _) = OutputSender::channel(64);
465 ChildSpawner::new("test-parent", output_tx)
466 }
467
468 #[test]
469 fn spawn_child() {
470 let mut spawner = setup();
471 let config = ChildConfig::new("worker-1");
472 let worker = Box::new(TestWorker::new("worker-1"));
473
474 let result = spawner.spawn(config, worker);
475 assert!(result.is_ok());
476 assert_eq!(spawner.child_count(), 1);
477 }
478
479 #[test]
480 fn spawn_duplicate_fails() {
481 let mut spawner = setup();
482
483 let config1 = ChildConfig::new("worker-1");
484 let worker1 = Box::new(TestWorker::new("worker-1"));
485 spawner
486 .spawn(config1, worker1)
487 .expect("spawn first worker should succeed");
488
489 let config2 = ChildConfig::new("worker-1");
490 let worker2 = Box::new(TestWorker::new("worker-1"));
491 let result = spawner.spawn(config2, worker2);
492
493 assert!(matches!(result, Err(SpawnError::AlreadyExists(_))));
494 }
495
496 #[test]
497 fn spawn_max_children_limit() {
498 let mut spawner = setup().with_max_children(2);
499
500 for i in 0..2 {
501 let config = ChildConfig::new(format!("worker-{}", i));
502 let worker = Box::new(TestWorker::new(&format!("worker-{}", i)));
503 spawner
504 .spawn(config, worker)
505 .expect("spawn worker within limit should succeed");
506 }
507
508 let config = ChildConfig::new("worker-overflow");
509 let worker = Box::new(TestWorker::new("worker-overflow"));
510 let result = spawner.spawn(config, worker);
511
512 assert!(matches!(result, Err(SpawnError::MaxChildrenReached(2))));
513 }
514
515 #[test]
516 fn child_handle_run() {
517 let mut spawner = setup();
518 let config = ChildConfig::new("worker-1");
519 let worker = Box::new(TestWorker::new("worker-1"));
520
521 let mut handle = spawner
522 .spawn(config, worker)
523 .expect("spawn worker for run test");
524 let result = handle
525 .run_sync(json!({"test": true}))
526 .expect("run_sync should succeed");
527
528 assert!(result.is_ok());
529 if let ChildResult::Ok(data) = result {
530 assert_eq!(data["test"], true);
531 }
532 }
533
534 #[test]
535 fn child_handle_status() {
536 let mut spawner = setup();
537 let config = ChildConfig::new("worker-1");
538 let worker = Box::new(TestWorker::new("worker-1"));
539
540 let handle = spawner
541 .spawn(config, worker)
542 .expect("spawn worker for status check");
543 assert_eq!(handle.status(), Status::Idle);
544 }
545
546 #[test]
547 fn child_handle_abort() {
548 let mut spawner = setup();
549 let config = ChildConfig::new("worker-1");
550 let worker = Box::new(TestWorker::new("worker-1"));
551
552 let mut handle = spawner
553 .spawn(config, worker)
554 .expect("spawn worker for abort test");
555 handle.abort();
556
557 assert!(handle.is_finished());
559 }
560
561 #[test]
562 fn abort_all() {
563 let mut spawner = setup();
564
565 for i in 0..3 {
566 let config = ChildConfig::new(format!("worker-{}", i));
567 let worker = Box::new(TestWorker::new(&format!("worker-{}", i)));
568 spawner
569 .spawn(config, worker)
570 .expect("spawn worker for abort_all test");
571 }
572
573 spawner.abort_all();
574
575 let finished = spawner.reap_finished();
577 assert_eq!(finished.len(), 3);
578 for (_, status) in finished {
579 assert_eq!(status, Status::Aborted);
580 }
581 }
582
583 #[test]
584 fn child_ids() {
585 let mut spawner = setup();
586
587 for i in 0..3 {
588 let config = ChildConfig::new(format!("worker-{}", i));
589 let worker = Box::new(TestWorker::new(&format!("worker-{}", i)));
590 spawner
591 .spawn(config, worker)
592 .expect("spawn worker for child_ids test");
593 }
594
595 let ids = spawner.child_ids();
596 assert_eq!(ids.len(), 3);
597 assert!(ids.contains(&"worker-0".to_string()));
598 assert!(ids.contains(&"worker-1".to_string()));
599 assert!(ids.contains(&"worker-2".to_string()));
600 }
601
602 fn create_spawned_handle(id: &str) -> SpawnedChildHandle {
606 let worker = Box::new(TestWorker::new(id)) as Box<dyn RunnableChild>;
607 SpawnedChildHandle {
608 id: id.to_string(),
609 child: Arc::new(Mutex::new(worker)),
610 }
611 }
612
613 #[tokio::test]
614 async fn async_child_handle_run() {
615 let mut handle = create_spawned_handle("async-worker-1");
616
617 let result = AsyncChildHandle::run(&mut handle, json!({"async": true}))
618 .await
619 .expect("async child run should succeed");
620
621 assert!(result.is_ok());
622 if let ChildResult::Ok(data) = result {
623 assert_eq!(data["async"], true);
624 }
625 }
626
627 #[tokio::test]
628 async fn async_child_handle_status() {
629 let handle = create_spawned_handle("async-worker");
630
631 assert_eq!(AsyncChildHandle::status(&handle), Status::Idle);
632 assert_eq!(AsyncChildHandle::id(&handle), "async-worker");
633 }
634
635 #[tokio::test]
636 async fn async_child_handle_abort() {
637 let mut handle = create_spawned_handle("async-worker");
638
639 AsyncChildHandle::abort(&mut handle);
640
641 assert!(AsyncChildHandle::is_finished(&handle));
642 }
643
644 #[tokio::test]
645 async fn async_child_handle_object_safety() {
646 let handle = create_spawned_handle("async-boxed");
647
648 let boxed: Box<dyn AsyncChildHandle> = Box::new(handle);
650
651 assert_eq!(boxed.id(), "async-boxed");
652 assert_eq!(boxed.status(), Status::Idle);
653 }
654
655 #[test]
658 fn get_child_arc_returns_some_for_existing_child() {
659 let mut spawner = setup();
660 let config = ChildConfig::new("worker-1");
661 let worker = Box::new(TestWorker::new("worker-1"));
662 spawner
663 .spawn(config, worker)
664 .expect("spawn worker for get_child_arc test");
665
666 let arc = spawner.get_child_arc("worker-1");
667 assert!(arc.is_some(), "should return Some for existing child");
668 }
669
670 #[test]
671 fn get_child_arc_returns_none_for_missing_child() {
672 let spawner = setup();
673 let arc = spawner.get_child_arc("nonexistent");
674 assert!(arc.is_none(), "should return None for missing child");
675 }
676
677 #[test]
678 fn get_child_arc_shares_state_with_managed_child() {
679 let mut spawner = setup();
680 let config = ChildConfig::new("worker-1");
681 let worker = Box::new(TestWorker::new("worker-1"));
682 spawner
683 .spawn(config, worker)
684 .expect("spawn worker for shared state test");
685
686 let arc = spawner.get_child_arc("worker-1").expect("get arc");
687 {
689 let mut child = arc.lock().expect("lock child arc");
690 let result = child.run(json!({"via": "arc"}));
691 assert!(result.is_ok(), "should run successfully via arc");
692 }
693 let result = spawner.run_child("worker-1", json!({"via": "spawner"}));
695 assert!(
696 result.is_ok(),
697 "should run successfully via spawner after arc use"
698 );
699 }
700}