1use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use tokio::sync::Mutex;
12use tokio::task::JoinHandle;
13
14use crate::config::{RunConfig, RunManifest};
15use crate::context_runtime::write_full_snapshot_value;
16use crate::engine::{RunPhase, RunResult, StartRun, Stores};
17use crate::errors::{ErrorCategory, ErrorInfo, IoError, RunError, StorageError};
18use crate::events::RunStatus;
19use crate::ids::{ArtifactId, ErrorCode, OpId, RunId};
20use crate::io::IoCall;
21use crate::live_io::{FactIndex, LiveIoEnv, LiveIoTransport, LiveIoTransportFactory};
22use crate::stores::ArtifactKind;
23
24use super::{
25 invalid_plan, next_attempt, read_manifest, read_run_history, run_states, storage_not_found,
26 topological_order, validate_execution_mode, validate_start_run_contract, EngineFailpoints,
27 EventWriter, PlanResolver, SharedEventWriter,
28};
29
30const NAMESPACE_CHILD_RUN_SPAWN: &str = "machine.child_run.spawn";
31const NAMESPACE_CHILD_RUN_AWAIT: &str = "machine.child_run.await";
32
33const CODE_CHILD_RUN_REQUEST_INVALID: &str = "child_run_request_invalid";
34const CODE_CHILD_RUN_ENGINE_FAILED: &str = "child_run_engine_failed";
35const CODE_CHILD_RUN_TASK_FAILED: &str = "child_run_task_failed";
36
37type ChildRunTask = JoinHandle<Result<RunResult, RunError>>;
38type ChildRunTaskMap = HashMap<RunId, ChildRunTask>;
39
40#[derive(Clone, Default)]
41struct ChildRunSupervisor {
42 inner: Arc<Mutex<ChildRunTaskMap>>,
43}
44
45impl ChildRunSupervisor {
46 async fn spawn_if_absent<F>(&self, run_id: RunId, f: F) -> bool
47 where
48 F: FnOnce() -> ChildRunTask,
49 {
50 let mut inner = self.inner.lock().await;
51 if inner.contains_key(&run_id) {
52 return false;
53 }
54 inner.insert(run_id, f());
55 true
56 }
57
58 async fn take(&self, run_id: RunId) -> Option<ChildRunTask> {
59 self.inner.lock().await.remove(&run_id)
60 }
61}
62
63#[derive(Clone)]
64struct ChildRunEngine {
65 resolver: Arc<dyn PlanResolver>,
66 live_transport_factory: Arc<dyn LiveIoTransportFactory>,
67 failpoints: Option<EngineFailpoints>,
68}
69
70impl ChildRunEngine {
71 async fn start_with_id(
72 &self,
73 stores: Stores,
74 run: StartRun,
75 run_id: RunId,
76 ) -> Result<RunResult, RunError> {
77 validate_execution_mode(&run.run_config)?;
78 validate_start_run_contract(&run)?;
79
80 let exists = stores
81 .artifacts
82 .exists(&run.manifest_id)
83 .await
84 .map_err(RunError::Storage)?;
85 if !exists {
86 return Err(RunError::Storage(storage_not_found(
87 "manifest_not_found",
88 "manifest artifact was not found",
89 )));
90 }
91
92 let head = stores
93 .events
94 .head_seq(run_id)
95 .await
96 .map_err(RunError::Storage)?;
97 if head != 0 {
98 return Err(RunError::Storage(StorageError::Concurrency(super::info(
99 "run_already_exists",
100 ErrorCategory::Storage,
101 "run already exists",
102 ))));
103 }
104
105 let initial_snapshot = run.initial_context.dump().map_err(RunError::Context)?;
106 let initial_snapshot_id =
107 write_full_snapshot_value(stores.artifacts.as_ref(), initial_snapshot).await?;
108
109 let writer: SharedEventWriter = Arc::new(Mutex::new(
110 EventWriter::new(Arc::clone(&stores.events), run_id)
111 .await
112 .map_err(RunError::Storage)?,
113 ));
114
115 writer
116 .lock()
117 .await
118 .append_kernel(crate::events::KernelEvent::RunStarted {
119 op_id: run.plan.op_id.clone(),
120 manifest_id: run.manifest_id.clone(),
121 initial_snapshot_id: initial_snapshot_id.clone(),
122 })
123 .await
124 .map_err(RunError::Storage)?;
125
126 run_states(
127 &stores,
128 &run.plan,
129 &run.run_config,
130 run_id,
131 writer,
132 initial_snapshot_id,
133 &HashSet::new(),
134 None,
135 FactIndex::default(),
136 Arc::clone(&self.live_transport_factory),
137 self.failpoints.clone(),
138 )
139 .await
140 }
141
142 async fn resume(&self, stores: Stores, run_id: RunId) -> Result<RunResult, RunError> {
143 let head = stores
144 .events
145 .head_seq(run_id)
146 .await
147 .map_err(RunError::Storage)?;
148 if head == 0 {
149 return Err(RunError::Storage(storage_not_found(
150 "run_not_found",
151 "run event stream was not found",
152 )));
153 }
154
155 let stream = stores
156 .events
157 .read_range(run_id, 1, None)
158 .await
159 .map_err(RunError::Storage)?;
160
161 let facts = FactIndex::from_event_stream(&stream);
162 let history = read_run_history(run_id, &stream)?;
163
164 if let Some((status, final_snapshot_id)) = &history.run_completed {
165 return Ok(RunResult {
166 run_id,
167 phase: match status {
168 RunStatus::Completed => RunPhase::Completed,
169 RunStatus::Failed => RunPhase::Failed,
170 RunStatus::Cancelled => RunPhase::Cancelled,
171 },
172 final_snapshot_id: final_snapshot_id.clone(),
173 });
174 }
175
176 let manifest =
177 read_manifest(stores.artifacts.as_ref(), &history.started.manifest_id).await?;
178 validate_execution_mode(&manifest.run_config)?;
179
180 if history.started.op_id != manifest.op_id {
181 return Err(invalid_plan(
182 "run_started_op_id_mismatch",
183 "RunStarted.op_id did not match manifest.op_id",
184 ));
185 }
186
187 let plan = self.resolver.resolve(&manifest)?;
188 if plan.op_id != manifest.op_id {
189 return Err(invalid_plan(
190 "plan_op_id_mismatch",
191 "resolved plan.op_id did not match manifest.op_id",
192 ));
193 }
194
195 let writer: SharedEventWriter = Arc::new(Mutex::new(
196 EventWriter::new(Arc::clone(&stores.events), run_id)
197 .await
198 .map_err(RunError::Storage)?,
199 ));
200
201 if let Some(orphan) = &history.orphan_attempt {
202 let start = (
203 orphan.state_id.clone(),
204 orphan.attempt + 1,
205 orphan.base_snapshot_id.clone(),
206 );
207 return run_states(
208 &stores,
209 &plan,
210 &manifest.run_config,
211 run_id,
212 writer,
213 history.last_checkpoint.clone(),
214 &history.completed_states,
215 Some(start),
216 facts.clone(),
217 Arc::clone(&self.live_transport_factory),
218 self.failpoints.clone(),
219 )
220 .await;
221 }
222
223 let ordered = topological_order(&plan)
224 .map_err(|_| invalid_plan("invalid_plan", "execution plan failed validation"))?;
225 let next_state = ordered
226 .iter()
227 .find(|n| !history.completed_states.contains(&n.id))
228 .map(|n| n.id.clone());
229
230 let Some(next_state_id) = next_state else {
231 writer
232 .lock()
233 .await
234 .append_kernel(crate::events::KernelEvent::RunCompleted {
235 status: RunStatus::Completed,
236 final_snapshot_id: Some(history.last_checkpoint.clone()),
237 })
238 .await
239 .map_err(RunError::Storage)?;
240 return Ok(RunResult {
241 run_id,
242 phase: RunPhase::Completed,
243 final_snapshot_id: Some(history.last_checkpoint.clone()),
244 });
245 };
246
247 if let Some((attempt, base_snapshot, retryable)) =
248 history.last_failure_by_state.get(&next_state_id)
249 {
250 let next = attempt + 1;
251 if !*retryable || next >= manifest.run_config.retry_policy.max_attempts {
252 writer
253 .lock()
254 .await
255 .append_kernel(crate::events::KernelEvent::RunCompleted {
256 status: RunStatus::Failed,
257 final_snapshot_id: Some(history.last_checkpoint.clone()),
258 })
259 .await
260 .map_err(RunError::Storage)?;
261 return Ok(RunResult {
262 run_id,
263 phase: RunPhase::Failed,
264 final_snapshot_id: Some(history.last_checkpoint.clone()),
265 });
266 }
267
268 let start = (next_state_id.clone(), next, base_snapshot.clone());
269 return run_states(
270 &stores,
271 &plan,
272 &manifest.run_config,
273 run_id,
274 writer,
275 history.last_checkpoint.clone(),
276 &history.completed_states,
277 Some(start),
278 facts.clone(),
279 Arc::clone(&self.live_transport_factory),
280 self.failpoints.clone(),
281 )
282 .await;
283 }
284
285 let start = (
286 next_state_id.clone(),
287 next_attempt(&history.last_attempt_by_state, &next_state_id),
288 history.last_checkpoint.clone(),
289 );
290 run_states(
291 &stores,
292 &plan,
293 &manifest.run_config,
294 run_id,
295 writer,
296 history.last_checkpoint.clone(),
297 &history.completed_states,
298 Some(start),
299 facts,
300 Arc::clone(&self.live_transport_factory),
301 self.failpoints.clone(),
302 )
303 .await
304 }
305}
306
307#[derive(Clone)]
311pub struct ChildRunLiveIoTransportFactory {
312 inner: Arc<dyn LiveIoTransportFactory>,
313 child_engine: ChildRunEngine,
314 supervisor: ChildRunSupervisor,
315}
316
317impl ChildRunLiveIoTransportFactory {
318 pub fn new(resolver: Arc<dyn PlanResolver>, inner: Arc<dyn LiveIoTransportFactory>) -> Self {
322 Self {
323 inner: Arc::clone(&inner),
324 child_engine: ChildRunEngine {
325 resolver,
326 live_transport_factory: inner,
327 failpoints: None,
328 },
329 supervisor: ChildRunSupervisor::default(),
330 }
331 }
332}
333
334impl LiveIoTransportFactory for ChildRunLiveIoTransportFactory {
335 fn namespace_group(&self) -> &str {
336 "child_run_wrapper"
337 }
338
339 fn make(&self, env: LiveIoEnv) -> Box<dyn LiveIoTransport> {
340 Box::new(ChildRunLiveIoTransport {
341 env: env.clone(),
342 inner: self.inner.make(env),
343 child_engine: self.child_engine.clone(),
344 supervisor: self.supervisor.clone(),
345 })
346 }
347}
348
349struct ChildRunLiveIoTransport {
350 env: LiveIoEnv,
351 inner: Box<dyn LiveIoTransport>,
352 child_engine: ChildRunEngine,
353 supervisor: ChildRunSupervisor,
354}
355
356#[derive(Clone, Debug, Deserialize)]
357struct ChildRunSpawnRequestV1 {
358 kind: String,
359 op_id: String,
360 op_version: String,
361 #[serde(default)]
362 op_config: serde_json::Value,
363 #[serde(default)]
364 input: serde_json::Value,
365 run_config: RunConfig,
366 #[serde(default)]
367 initial_context: serde_json::Value,
368}
369
370#[derive(Clone, Debug, Serialize, Deserialize)]
371struct ChildRunSpawnResponseV1 {
372 parent_run_id: RunId,
373 child_run_id: RunId,
374 child_manifest_id: ArtifactId,
375}
376
377#[derive(Clone, Debug, Deserialize)]
378struct ChildRunAwaitRequestV1 {
379 kind: String,
380 child_run_id: RunId,
381 child_manifest_id: ArtifactId,
382}
383
384#[derive(Clone, Debug, Serialize, Deserialize)]
385struct ChildRunAwaitResponseV1 {
386 child_run_id: RunId,
387 status: RunStatus,
388 final_snapshot_id: Option<ArtifactId>,
389 #[serde(default)]
390 final_snapshot: serde_json::Value,
391}
392
393fn child_io_error(code: &'static str, category: ErrorCategory, message: &'static str) -> IoError {
394 IoError::Other(ErrorInfo {
395 code: ErrorCode(code.to_string()),
396 category,
397 retryable: false,
398 message: message.to_string(),
399 details: None,
400 })
401}
402
403impl ChildRunLiveIoTransport {
404 fn missing_fact_key() -> IoError {
405 IoError::MissingFactKey(ErrorInfo {
406 code: ErrorCode(crate::errors::CODE_MISSING_FACT_KEY.to_string()),
407 category: ErrorCategory::ParsingInput,
408 retryable: false,
409 message: "missing fact key for child-run call".to_string(),
410 details: None,
411 })
412 }
413
414 fn child_run_id(&self, fact_key: &crate::ids::FactKey) -> RunId {
415 RunId(uuid::Uuid::new_v5(
418 &self.env.run_id.0,
419 fact_key.0.as_bytes(),
420 ))
421 }
422
423 fn default_build() -> crate::config::BuildProvenance {
424 crate::config::BuildProvenance {
425 git_commit: None,
426 cargo_lock_hash: None,
427 flake_lock_hash: None,
428 rustc_version: None,
429 target_triple: None,
430 env_allowlist: Vec::new(),
431 }
432 }
433
434 fn single_op_manifest_input(
435 op_id: &str,
436 op_version: &str,
437 op_config: serde_json::Value,
438 input: serde_json::Value,
439 ) -> serde_json::Value {
440 serde_json::json!({
441 "pipeline": {
442 "machine_id": op_id,
443 "pipeline_version": op_version,
444 "steps": [{
445 "step_id": "main",
446 "op_id": op_id,
447 "op_version": op_version,
448 "op_config": op_config,
449 }]
450 },
451 "input": input,
452 })
453 }
454
455 fn parse_spawn_request(call: &IoCall) -> Result<ChildRunSpawnRequestV1, IoError> {
456 let req = serde_json::from_value::<ChildRunSpawnRequestV1>(call.request.clone()).map_err(
457 |_| {
458 child_io_error(
459 CODE_CHILD_RUN_REQUEST_INVALID,
460 ErrorCategory::ParsingInput,
461 "invalid child run spawn request",
462 )
463 },
464 )?;
465 if req.kind != "child_run_spawn_v1" {
466 return Err(child_io_error(
467 CODE_CHILD_RUN_REQUEST_INVALID,
468 ErrorCategory::ParsingInput,
469 "unsupported child run spawn request kind",
470 ));
471 }
472 Ok(req)
473 }
474
475 fn parse_await_request(call: &IoCall) -> Result<ChildRunAwaitRequestV1, IoError> {
476 let req = serde_json::from_value::<ChildRunAwaitRequestV1>(call.request.clone()).map_err(
477 |_| {
478 child_io_error(
479 CODE_CHILD_RUN_REQUEST_INVALID,
480 ErrorCategory::ParsingInput,
481 "invalid child run await request",
482 )
483 },
484 )?;
485 if req.kind != "child_run_await_v1" {
486 return Err(child_io_error(
487 CODE_CHILD_RUN_REQUEST_INVALID,
488 ErrorCategory::ParsingInput,
489 "unsupported child run await request kind",
490 ));
491 }
492 Ok(req)
493 }
494
495 async fn spawn_child(&mut self, call: IoCall) -> Result<serde_json::Value, IoError> {
496 let Some(fact_key) = &call.fact_key else {
497 return Err(Self::missing_fact_key());
498 };
499 if crate::secrets::string_contains_secrets(&fact_key.0) {
500 return Err(child_io_error(
501 "secrets_detected",
502 ErrorCategory::Unknown,
503 "fact key contained secrets (policy forbids persisting secrets)",
504 ));
505 }
506
507 let req = Self::parse_spawn_request(&call)?;
508 let child_run_id = self.child_run_id(fact_key);
509
510 let build = Self::default_build();
511 let input_params = Self::single_op_manifest_input(
512 &req.op_id,
513 &req.op_version,
514 req.op_config.clone(),
515 req.input.clone(),
516 );
517 let op_id = OpId::new(req.op_id.clone()).map_err(|_| {
518 child_io_error(
519 CODE_CHILD_RUN_REQUEST_INVALID,
520 ErrorCategory::ParsingInput,
521 "invalid child run op_id",
522 )
523 })?;
524
525 let manifest = RunManifest {
526 op_id,
527 op_version: req.op_version.clone(),
528 input_params,
529 run_config: req.run_config.clone(),
530 build,
531 };
532
533 let value = serde_json::to_value(&manifest).map_err(|_| {
534 child_io_error(
535 CODE_CHILD_RUN_ENGINE_FAILED,
536 ErrorCategory::ParsingInput,
537 "failed to serialize child run manifest",
538 )
539 })?;
540 let bytes = crate::hashing::canonical_json_bytes(&value).map_err(|_| {
541 child_io_error(
542 CODE_CHILD_RUN_ENGINE_FAILED,
543 ErrorCategory::ParsingInput,
544 "child run manifest is not canonical-json-hashable",
545 )
546 })?;
547 let computed_id = crate::hashing::artifact_id_for_bytes(&bytes);
548
549 let stored_id = self
550 .env
551 .stores
552 .artifacts
553 .put(ArtifactKind::Manifest, bytes)
554 .await
555 .map_err(|_| {
556 child_io_error(
557 CODE_CHILD_RUN_ENGINE_FAILED,
558 ErrorCategory::Storage,
559 "failed to store child run manifest",
560 )
561 })?;
562 if stored_id != computed_id {
563 return Err(child_io_error(
564 "child_run_manifest_id_mismatch",
565 ErrorCategory::Storage,
566 "artifact store returned unexpected manifest id",
567 ));
568 }
569
570 let head = self
571 .env
572 .stores
573 .events
574 .head_seq(child_run_id)
575 .await
576 .map_err(|_| {
577 child_io_error(
578 CODE_CHILD_RUN_ENGINE_FAILED,
579 ErrorCategory::Storage,
580 "failed to query child run status",
581 )
582 })?;
583
584 if head == 0 {
585 let plan = self.child_engine.resolver.resolve(&manifest).map_err(|_| {
586 child_io_error(
587 CODE_CHILD_RUN_ENGINE_FAILED,
588 ErrorCategory::Unknown,
589 "failed to resolve child run plan",
590 )
591 })?;
592
593 let initial = if req.initial_context.is_null() {
594 serde_json::json!({})
595 } else {
596 req.initial_context.clone()
597 };
598 let ctx =
599 crate::context_runtime::JsonContext::from_snapshot(initial).map_err(|_| {
600 child_io_error(
601 CODE_CHILD_RUN_ENGINE_FAILED,
602 ErrorCategory::Context,
603 "invalid child initial_context snapshot",
604 )
605 })?;
606
607 let start = StartRun {
608 manifest,
609 manifest_id: stored_id.clone(),
610 plan,
611 run_config: req.run_config,
612 initial_context: Box::new(ctx),
613 };
614
615 let engine = self.child_engine.clone();
616 let stores = self.env.stores.clone();
617 let _ = self
618 .supervisor
619 .spawn_if_absent(child_run_id, move || {
620 tokio::spawn(
621 async move { engine.start_with_id(stores, start, child_run_id).await },
622 )
623 })
624 .await;
625 } else {
626 let stream = self
627 .env
628 .stores
629 .events
630 .read_range(child_run_id, 1, None)
631 .await
632 .map_err(|_| {
633 child_io_error(
634 CODE_CHILD_RUN_ENGINE_FAILED,
635 ErrorCategory::Storage,
636 "failed to read child run event stream",
637 )
638 })?;
639 let history = read_run_history(child_run_id, &stream).map_err(|_| {
640 child_io_error(
641 CODE_CHILD_RUN_ENGINE_FAILED,
642 ErrorCategory::Storage,
643 "invalid child run event stream",
644 )
645 })?;
646 if history.started.manifest_id != stored_id {
647 return Err(child_io_error(
648 "child_run_conflict",
649 ErrorCategory::Unknown,
650 "child run id already exists with a different manifest id",
651 ));
652 }
653
654 if history.run_completed.is_none() {
655 let engine = self.child_engine.clone();
656 let stores = self.env.stores.clone();
657 let _ = self
658 .supervisor
659 .spawn_if_absent(child_run_id, move || {
660 tokio::spawn(async move { engine.resume(stores, child_run_id).await })
661 })
662 .await;
663 }
664 }
665
666 let resp = ChildRunSpawnResponseV1 {
667 parent_run_id: self.env.run_id,
668 child_run_id,
669 child_manifest_id: stored_id,
670 };
671 serde_json::to_value(resp).map_err(|_| {
672 child_io_error(
673 CODE_CHILD_RUN_TASK_FAILED,
674 ErrorCategory::Unknown,
675 "failed to serialize child run spawn response",
676 )
677 })
678 }
679
680 async fn await_child(&mut self, call: IoCall) -> Result<serde_json::Value, IoError> {
681 if call.fact_key.is_none() {
682 return Err(Self::missing_fact_key());
683 }
684 let req = Self::parse_await_request(&call)?;
685
686 let stores = self.env.stores.clone();
687
688 let rr = if let Some(handle) = self.supervisor.take(req.child_run_id).await {
689 match handle.await {
690 Ok(res) => res.map_err(|_| {
691 child_io_error(
692 CODE_CHILD_RUN_TASK_FAILED,
693 ErrorCategory::Unknown,
694 "child run task failed",
695 )
696 })?,
697 Err(_) => {
698 return Err(child_io_error(
699 CODE_CHILD_RUN_TASK_FAILED,
700 ErrorCategory::Unknown,
701 "child run task panicked or was cancelled",
702 ));
703 }
704 }
705 } else {
706 match self
707 .child_engine
708 .resume(stores.clone(), req.child_run_id)
709 .await
710 {
711 Ok(rr) => rr,
712 Err(RunError::Storage(StorageError::NotFound(info)))
713 if info.code.0 == "run_not_found" =>
714 {
715 let manifest = read_manifest(stores.artifacts.as_ref(), &req.child_manifest_id)
716 .await
717 .map_err(|_| {
718 child_io_error(
719 CODE_CHILD_RUN_ENGINE_FAILED,
720 ErrorCategory::Storage,
721 "failed to read child run manifest",
722 )
723 })?;
724 let plan = self.child_engine.resolver.resolve(&manifest).map_err(|_| {
725 child_io_error(
726 CODE_CHILD_RUN_ENGINE_FAILED,
727 ErrorCategory::Unknown,
728 "failed to resolve child run plan",
729 )
730 })?;
731
732 let run_config = manifest.run_config.clone();
733 let start = StartRun {
734 manifest,
735 manifest_id: req.child_manifest_id.clone(),
736 plan,
737 run_config,
738 initial_context: Box::new(crate::context_runtime::JsonContext::new()),
739 };
740 self.child_engine
741 .start_with_id(stores.clone(), start, req.child_run_id)
742 .await
743 .map_err(|_| {
744 child_io_error(
745 CODE_CHILD_RUN_ENGINE_FAILED,
746 ErrorCategory::Unknown,
747 "failed to start missing child run",
748 )
749 })?
750 }
751 Err(_) => {
752 return Err(child_io_error(
753 CODE_CHILD_RUN_ENGINE_FAILED,
754 ErrorCategory::Unknown,
755 "failed to resume child run",
756 ));
757 }
758 }
759 };
760
761 let status = match rr.phase {
762 RunPhase::Completed => RunStatus::Completed,
763 RunPhase::Failed => RunStatus::Failed,
764 RunPhase::Cancelled => RunStatus::Cancelled,
765 RunPhase::Running => RunStatus::Failed,
766 };
767
768 let final_snapshot = if let Some(id) = &rr.final_snapshot_id {
769 let bytes = stores.artifacts.get(id).await.map_err(|_| {
770 child_io_error(
771 CODE_CHILD_RUN_ENGINE_FAILED,
772 ErrorCategory::Storage,
773 "failed to read child final snapshot",
774 )
775 })?;
776 serde_json::from_slice::<serde_json::Value>(&bytes).map_err(|_| {
777 child_io_error(
778 CODE_CHILD_RUN_ENGINE_FAILED,
779 ErrorCategory::ParsingInput,
780 "child final snapshot was invalid JSON",
781 )
782 })?
783 } else {
784 serde_json::Value::Null
785 };
786
787 let resp = ChildRunAwaitResponseV1 {
788 child_run_id: rr.run_id,
789 status,
790 final_snapshot_id: rr.final_snapshot_id,
791 final_snapshot,
792 };
793 serde_json::to_value(resp).map_err(|_| {
794 child_io_error(
795 CODE_CHILD_RUN_TASK_FAILED,
796 ErrorCategory::Unknown,
797 "failed to serialize child run await response",
798 )
799 })
800 }
801}
802
803#[async_trait]
804impl LiveIoTransport for ChildRunLiveIoTransport {
805 async fn call(&mut self, call: IoCall) -> Result<serde_json::Value, IoError> {
806 match call.namespace.as_str() {
807 NAMESPACE_CHILD_RUN_SPAWN => self.spawn_child(call).await,
808 NAMESPACE_CHILD_RUN_AWAIT => self.await_child(call).await,
809 _ => self.inner.call(call).await,
810 }
811 }
812}