Skip to main content

mfm_machine/runtime/
child_runs.rs

1//! Engine-managed child runs.
2//!
3//! This module wraps a live IO transport factory with two extra namespaces that let
4//! a parent run spawn and await child runs through the same runtime invariants.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use tokio::sync::Mutex;
12use tokio::task::JoinHandle;
13
14use crate::config::{RunConfig, RunManifest};
15use crate::context_runtime::write_full_snapshot_value;
16use crate::engine::{RunPhase, RunResult, StartRun, Stores};
17use crate::errors::{ErrorCategory, ErrorInfo, IoError, RunError, StorageError};
18use crate::events::RunStatus;
19use crate::ids::{ArtifactId, ErrorCode, OpId, RunId};
20use crate::io::IoCall;
21use crate::live_io::{FactIndex, LiveIoEnv, LiveIoTransport, LiveIoTransportFactory};
22use crate::stores::ArtifactKind;
23
24use super::{
25    invalid_plan, next_attempt, read_manifest, read_run_history, run_states, storage_not_found,
26    topological_order, validate_execution_mode, validate_start_run_contract, EngineFailpoints,
27    EventWriter, PlanResolver, SharedEventWriter,
28};
29
30const NAMESPACE_CHILD_RUN_SPAWN: &str = "machine.child_run.spawn";
31const NAMESPACE_CHILD_RUN_AWAIT: &str = "machine.child_run.await";
32
33const CODE_CHILD_RUN_REQUEST_INVALID: &str = "child_run_request_invalid";
34const CODE_CHILD_RUN_ENGINE_FAILED: &str = "child_run_engine_failed";
35const CODE_CHILD_RUN_TASK_FAILED: &str = "child_run_task_failed";
36
37type ChildRunTask = JoinHandle<Result<RunResult, RunError>>;
38type ChildRunTaskMap = HashMap<RunId, ChildRunTask>;
39
40#[derive(Clone, Default)]
41struct ChildRunSupervisor {
42    inner: Arc<Mutex<ChildRunTaskMap>>,
43}
44
45impl ChildRunSupervisor {
46    async fn spawn_if_absent<F>(&self, run_id: RunId, f: F) -> bool
47    where
48        F: FnOnce() -> ChildRunTask,
49    {
50        let mut inner = self.inner.lock().await;
51        if inner.contains_key(&run_id) {
52            return false;
53        }
54        inner.insert(run_id, f());
55        true
56    }
57
58    async fn take(&self, run_id: RunId) -> Option<ChildRunTask> {
59        self.inner.lock().await.remove(&run_id)
60    }
61}
62
63#[derive(Clone)]
64struct ChildRunEngine {
65    resolver: Arc<dyn PlanResolver>,
66    live_transport_factory: Arc<dyn LiveIoTransportFactory>,
67    failpoints: Option<EngineFailpoints>,
68}
69
70impl ChildRunEngine {
71    async fn start_with_id(
72        &self,
73        stores: Stores,
74        run: StartRun,
75        run_id: RunId,
76    ) -> Result<RunResult, RunError> {
77        validate_execution_mode(&run.run_config)?;
78        validate_start_run_contract(&run)?;
79
80        let exists = stores
81            .artifacts
82            .exists(&run.manifest_id)
83            .await
84            .map_err(RunError::Storage)?;
85        if !exists {
86            return Err(RunError::Storage(storage_not_found(
87                "manifest_not_found",
88                "manifest artifact was not found",
89            )));
90        }
91
92        let head = stores
93            .events
94            .head_seq(run_id)
95            .await
96            .map_err(RunError::Storage)?;
97        if head != 0 {
98            return Err(RunError::Storage(StorageError::Concurrency(super::info(
99                "run_already_exists",
100                ErrorCategory::Storage,
101                "run already exists",
102            ))));
103        }
104
105        let initial_snapshot = run.initial_context.dump().map_err(RunError::Context)?;
106        let initial_snapshot_id =
107            write_full_snapshot_value(stores.artifacts.as_ref(), initial_snapshot).await?;
108
109        let writer: SharedEventWriter = Arc::new(Mutex::new(
110            EventWriter::new(Arc::clone(&stores.events), run_id)
111                .await
112                .map_err(RunError::Storage)?,
113        ));
114
115        writer
116            .lock()
117            .await
118            .append_kernel(crate::events::KernelEvent::RunStarted {
119                op_id: run.plan.op_id.clone(),
120                manifest_id: run.manifest_id.clone(),
121                initial_snapshot_id: initial_snapshot_id.clone(),
122            })
123            .await
124            .map_err(RunError::Storage)?;
125
126        run_states(
127            &stores,
128            &run.plan,
129            &run.run_config,
130            run_id,
131            writer,
132            initial_snapshot_id,
133            &HashSet::new(),
134            None,
135            FactIndex::default(),
136            Arc::clone(&self.live_transport_factory),
137            self.failpoints.clone(),
138        )
139        .await
140    }
141
142    async fn resume(&self, stores: Stores, run_id: RunId) -> Result<RunResult, RunError> {
143        let head = stores
144            .events
145            .head_seq(run_id)
146            .await
147            .map_err(RunError::Storage)?;
148        if head == 0 {
149            return Err(RunError::Storage(storage_not_found(
150                "run_not_found",
151                "run event stream was not found",
152            )));
153        }
154
155        let stream = stores
156            .events
157            .read_range(run_id, 1, None)
158            .await
159            .map_err(RunError::Storage)?;
160
161        let facts = FactIndex::from_event_stream(&stream);
162        let history = read_run_history(run_id, &stream)?;
163
164        if let Some((status, final_snapshot_id)) = &history.run_completed {
165            return Ok(RunResult {
166                run_id,
167                phase: match status {
168                    RunStatus::Completed => RunPhase::Completed,
169                    RunStatus::Failed => RunPhase::Failed,
170                    RunStatus::Cancelled => RunPhase::Cancelled,
171                },
172                final_snapshot_id: final_snapshot_id.clone(),
173            });
174        }
175
176        let manifest =
177            read_manifest(stores.artifacts.as_ref(), &history.started.manifest_id).await?;
178        validate_execution_mode(&manifest.run_config)?;
179
180        if history.started.op_id != manifest.op_id {
181            return Err(invalid_plan(
182                "run_started_op_id_mismatch",
183                "RunStarted.op_id did not match manifest.op_id",
184            ));
185        }
186
187        let plan = self.resolver.resolve(&manifest)?;
188        if plan.op_id != manifest.op_id {
189            return Err(invalid_plan(
190                "plan_op_id_mismatch",
191                "resolved plan.op_id did not match manifest.op_id",
192            ));
193        }
194
195        let writer: SharedEventWriter = Arc::new(Mutex::new(
196            EventWriter::new(Arc::clone(&stores.events), run_id)
197                .await
198                .map_err(RunError::Storage)?,
199        ));
200
201        if let Some(orphan) = &history.orphan_attempt {
202            let start = (
203                orphan.state_id.clone(),
204                orphan.attempt + 1,
205                orphan.base_snapshot_id.clone(),
206            );
207            return run_states(
208                &stores,
209                &plan,
210                &manifest.run_config,
211                run_id,
212                writer,
213                history.last_checkpoint.clone(),
214                &history.completed_states,
215                Some(start),
216                facts.clone(),
217                Arc::clone(&self.live_transport_factory),
218                self.failpoints.clone(),
219            )
220            .await;
221        }
222
223        let ordered = topological_order(&plan)
224            .map_err(|_| invalid_plan("invalid_plan", "execution plan failed validation"))?;
225        let next_state = ordered
226            .iter()
227            .find(|n| !history.completed_states.contains(&n.id))
228            .map(|n| n.id.clone());
229
230        let Some(next_state_id) = next_state else {
231            writer
232                .lock()
233                .await
234                .append_kernel(crate::events::KernelEvent::RunCompleted {
235                    status: RunStatus::Completed,
236                    final_snapshot_id: Some(history.last_checkpoint.clone()),
237                })
238                .await
239                .map_err(RunError::Storage)?;
240            return Ok(RunResult {
241                run_id,
242                phase: RunPhase::Completed,
243                final_snapshot_id: Some(history.last_checkpoint.clone()),
244            });
245        };
246
247        if let Some((attempt, base_snapshot, retryable)) =
248            history.last_failure_by_state.get(&next_state_id)
249        {
250            let next = attempt + 1;
251            if !*retryable || next >= manifest.run_config.retry_policy.max_attempts {
252                writer
253                    .lock()
254                    .await
255                    .append_kernel(crate::events::KernelEvent::RunCompleted {
256                        status: RunStatus::Failed,
257                        final_snapshot_id: Some(history.last_checkpoint.clone()),
258                    })
259                    .await
260                    .map_err(RunError::Storage)?;
261                return Ok(RunResult {
262                    run_id,
263                    phase: RunPhase::Failed,
264                    final_snapshot_id: Some(history.last_checkpoint.clone()),
265                });
266            }
267
268            let start = (next_state_id.clone(), next, base_snapshot.clone());
269            return run_states(
270                &stores,
271                &plan,
272                &manifest.run_config,
273                run_id,
274                writer,
275                history.last_checkpoint.clone(),
276                &history.completed_states,
277                Some(start),
278                facts.clone(),
279                Arc::clone(&self.live_transport_factory),
280                self.failpoints.clone(),
281            )
282            .await;
283        }
284
285        let start = (
286            next_state_id.clone(),
287            next_attempt(&history.last_attempt_by_state, &next_state_id),
288            history.last_checkpoint.clone(),
289        );
290        run_states(
291            &stores,
292            &plan,
293            &manifest.run_config,
294            run_id,
295            writer,
296            history.last_checkpoint.clone(),
297            &history.completed_states,
298            Some(start),
299            facts,
300            Arc::clone(&self.live_transport_factory),
301            self.failpoints.clone(),
302        )
303        .await
304    }
305}
306
307/// Live IO transport factory that adds child-run spawn and await namespaces.
308///
309/// It forwards all unrelated namespaces to the wrapped factory.
310#[derive(Clone)]
311pub struct ChildRunLiveIoTransportFactory {
312    inner: Arc<dyn LiveIoTransportFactory>,
313    child_engine: ChildRunEngine,
314    supervisor: ChildRunSupervisor,
315}
316
317impl ChildRunLiveIoTransportFactory {
318    /// Wraps `inner` with child-run spawn/await handling backed by `resolver`.
319    ///
320    /// Calls outside the reserved child-run namespaces are forwarded to `inner`.
321    pub fn new(resolver: Arc<dyn PlanResolver>, inner: Arc<dyn LiveIoTransportFactory>) -> Self {
322        Self {
323            inner: Arc::clone(&inner),
324            child_engine: ChildRunEngine {
325                resolver,
326                live_transport_factory: inner,
327                failpoints: None,
328            },
329            supervisor: ChildRunSupervisor::default(),
330        }
331    }
332}
333
334impl LiveIoTransportFactory for ChildRunLiveIoTransportFactory {
335    fn namespace_group(&self) -> &str {
336        "child_run_wrapper"
337    }
338
339    fn make(&self, env: LiveIoEnv) -> Box<dyn LiveIoTransport> {
340        Box::new(ChildRunLiveIoTransport {
341            env: env.clone(),
342            inner: self.inner.make(env),
343            child_engine: self.child_engine.clone(),
344            supervisor: self.supervisor.clone(),
345        })
346    }
347}
348
349struct ChildRunLiveIoTransport {
350    env: LiveIoEnv,
351    inner: Box<dyn LiveIoTransport>,
352    child_engine: ChildRunEngine,
353    supervisor: ChildRunSupervisor,
354}
355
356#[derive(Clone, Debug, Deserialize)]
357struct ChildRunSpawnRequestV1 {
358    kind: String,
359    op_id: String,
360    op_version: String,
361    #[serde(default)]
362    op_config: serde_json::Value,
363    #[serde(default)]
364    input: serde_json::Value,
365    run_config: RunConfig,
366    #[serde(default)]
367    initial_context: serde_json::Value,
368}
369
370#[derive(Clone, Debug, Serialize, Deserialize)]
371struct ChildRunSpawnResponseV1 {
372    parent_run_id: RunId,
373    child_run_id: RunId,
374    child_manifest_id: ArtifactId,
375}
376
377#[derive(Clone, Debug, Deserialize)]
378struct ChildRunAwaitRequestV1 {
379    kind: String,
380    child_run_id: RunId,
381    child_manifest_id: ArtifactId,
382}
383
384#[derive(Clone, Debug, Serialize, Deserialize)]
385struct ChildRunAwaitResponseV1 {
386    child_run_id: RunId,
387    status: RunStatus,
388    final_snapshot_id: Option<ArtifactId>,
389    #[serde(default)]
390    final_snapshot: serde_json::Value,
391}
392
393fn child_io_error(code: &'static str, category: ErrorCategory, message: &'static str) -> IoError {
394    IoError::Other(ErrorInfo {
395        code: ErrorCode(code.to_string()),
396        category,
397        retryable: false,
398        message: message.to_string(),
399        details: None,
400    })
401}
402
403impl ChildRunLiveIoTransport {
404    fn missing_fact_key() -> IoError {
405        IoError::MissingFactKey(ErrorInfo {
406            code: ErrorCode(crate::errors::CODE_MISSING_FACT_KEY.to_string()),
407            category: ErrorCategory::ParsingInput,
408            retryable: false,
409            message: "missing fact key for child-run call".to_string(),
410            details: None,
411        })
412    }
413
414    fn child_run_id(&self, fact_key: &crate::ids::FactKey) -> RunId {
415        // Deterministic, per-parent-run ids avoid cross-run collisions while remaining stable for
416        // resume/replay within the same parent run.
417        RunId(uuid::Uuid::new_v5(
418            &self.env.run_id.0,
419            fact_key.0.as_bytes(),
420        ))
421    }
422
423    fn default_build() -> crate::config::BuildProvenance {
424        crate::config::BuildProvenance {
425            git_commit: None,
426            cargo_lock_hash: None,
427            flake_lock_hash: None,
428            rustc_version: None,
429            target_triple: None,
430            env_allowlist: Vec::new(),
431        }
432    }
433
434    fn single_op_manifest_input(
435        op_id: &str,
436        op_version: &str,
437        op_config: serde_json::Value,
438        input: serde_json::Value,
439    ) -> serde_json::Value {
440        serde_json::json!({
441            "pipeline": {
442                "machine_id": op_id,
443                "pipeline_version": op_version,
444                "steps": [{
445                    "step_id": "main",
446                    "op_id": op_id,
447                    "op_version": op_version,
448                    "op_config": op_config,
449                }]
450            },
451            "input": input,
452        })
453    }
454
455    fn parse_spawn_request(call: &IoCall) -> Result<ChildRunSpawnRequestV1, IoError> {
456        let req = serde_json::from_value::<ChildRunSpawnRequestV1>(call.request.clone()).map_err(
457            |_| {
458                child_io_error(
459                    CODE_CHILD_RUN_REQUEST_INVALID,
460                    ErrorCategory::ParsingInput,
461                    "invalid child run spawn request",
462                )
463            },
464        )?;
465        if req.kind != "child_run_spawn_v1" {
466            return Err(child_io_error(
467                CODE_CHILD_RUN_REQUEST_INVALID,
468                ErrorCategory::ParsingInput,
469                "unsupported child run spawn request kind",
470            ));
471        }
472        Ok(req)
473    }
474
475    fn parse_await_request(call: &IoCall) -> Result<ChildRunAwaitRequestV1, IoError> {
476        let req = serde_json::from_value::<ChildRunAwaitRequestV1>(call.request.clone()).map_err(
477            |_| {
478                child_io_error(
479                    CODE_CHILD_RUN_REQUEST_INVALID,
480                    ErrorCategory::ParsingInput,
481                    "invalid child run await request",
482                )
483            },
484        )?;
485        if req.kind != "child_run_await_v1" {
486            return Err(child_io_error(
487                CODE_CHILD_RUN_REQUEST_INVALID,
488                ErrorCategory::ParsingInput,
489                "unsupported child run await request kind",
490            ));
491        }
492        Ok(req)
493    }
494
495    async fn spawn_child(&mut self, call: IoCall) -> Result<serde_json::Value, IoError> {
496        let Some(fact_key) = &call.fact_key else {
497            return Err(Self::missing_fact_key());
498        };
499        if crate::secrets::string_contains_secrets(&fact_key.0) {
500            return Err(child_io_error(
501                "secrets_detected",
502                ErrorCategory::Unknown,
503                "fact key contained secrets (policy forbids persisting secrets)",
504            ));
505        }
506
507        let req = Self::parse_spawn_request(&call)?;
508        let child_run_id = self.child_run_id(fact_key);
509
510        let build = Self::default_build();
511        let input_params = Self::single_op_manifest_input(
512            &req.op_id,
513            &req.op_version,
514            req.op_config.clone(),
515            req.input.clone(),
516        );
517        let op_id = OpId::new(req.op_id.clone()).map_err(|_| {
518            child_io_error(
519                CODE_CHILD_RUN_REQUEST_INVALID,
520                ErrorCategory::ParsingInput,
521                "invalid child run op_id",
522            )
523        })?;
524
525        let manifest = RunManifest {
526            op_id,
527            op_version: req.op_version.clone(),
528            input_params,
529            run_config: req.run_config.clone(),
530            build,
531        };
532
533        let value = serde_json::to_value(&manifest).map_err(|_| {
534            child_io_error(
535                CODE_CHILD_RUN_ENGINE_FAILED,
536                ErrorCategory::ParsingInput,
537                "failed to serialize child run manifest",
538            )
539        })?;
540        let bytes = crate::hashing::canonical_json_bytes(&value).map_err(|_| {
541            child_io_error(
542                CODE_CHILD_RUN_ENGINE_FAILED,
543                ErrorCategory::ParsingInput,
544                "child run manifest is not canonical-json-hashable",
545            )
546        })?;
547        let computed_id = crate::hashing::artifact_id_for_bytes(&bytes);
548
549        let stored_id = self
550            .env
551            .stores
552            .artifacts
553            .put(ArtifactKind::Manifest, bytes)
554            .await
555            .map_err(|_| {
556                child_io_error(
557                    CODE_CHILD_RUN_ENGINE_FAILED,
558                    ErrorCategory::Storage,
559                    "failed to store child run manifest",
560                )
561            })?;
562        if stored_id != computed_id {
563            return Err(child_io_error(
564                "child_run_manifest_id_mismatch",
565                ErrorCategory::Storage,
566                "artifact store returned unexpected manifest id",
567            ));
568        }
569
570        let head = self
571            .env
572            .stores
573            .events
574            .head_seq(child_run_id)
575            .await
576            .map_err(|_| {
577                child_io_error(
578                    CODE_CHILD_RUN_ENGINE_FAILED,
579                    ErrorCategory::Storage,
580                    "failed to query child run status",
581                )
582            })?;
583
584        if head == 0 {
585            let plan = self.child_engine.resolver.resolve(&manifest).map_err(|_| {
586                child_io_error(
587                    CODE_CHILD_RUN_ENGINE_FAILED,
588                    ErrorCategory::Unknown,
589                    "failed to resolve child run plan",
590                )
591            })?;
592
593            let initial = if req.initial_context.is_null() {
594                serde_json::json!({})
595            } else {
596                req.initial_context.clone()
597            };
598            let ctx =
599                crate::context_runtime::JsonContext::from_snapshot(initial).map_err(|_| {
600                    child_io_error(
601                        CODE_CHILD_RUN_ENGINE_FAILED,
602                        ErrorCategory::Context,
603                        "invalid child initial_context snapshot",
604                    )
605                })?;
606
607            let start = StartRun {
608                manifest,
609                manifest_id: stored_id.clone(),
610                plan,
611                run_config: req.run_config,
612                initial_context: Box::new(ctx),
613            };
614
615            let engine = self.child_engine.clone();
616            let stores = self.env.stores.clone();
617            let _ = self
618                .supervisor
619                .spawn_if_absent(child_run_id, move || {
620                    tokio::spawn(
621                        async move { engine.start_with_id(stores, start, child_run_id).await },
622                    )
623                })
624                .await;
625        } else {
626            let stream = self
627                .env
628                .stores
629                .events
630                .read_range(child_run_id, 1, None)
631                .await
632                .map_err(|_| {
633                    child_io_error(
634                        CODE_CHILD_RUN_ENGINE_FAILED,
635                        ErrorCategory::Storage,
636                        "failed to read child run event stream",
637                    )
638                })?;
639            let history = read_run_history(child_run_id, &stream).map_err(|_| {
640                child_io_error(
641                    CODE_CHILD_RUN_ENGINE_FAILED,
642                    ErrorCategory::Storage,
643                    "invalid child run event stream",
644                )
645            })?;
646            if history.started.manifest_id != stored_id {
647                return Err(child_io_error(
648                    "child_run_conflict",
649                    ErrorCategory::Unknown,
650                    "child run id already exists with a different manifest id",
651                ));
652            }
653
654            if history.run_completed.is_none() {
655                let engine = self.child_engine.clone();
656                let stores = self.env.stores.clone();
657                let _ = self
658                    .supervisor
659                    .spawn_if_absent(child_run_id, move || {
660                        tokio::spawn(async move { engine.resume(stores, child_run_id).await })
661                    })
662                    .await;
663            }
664        }
665
666        let resp = ChildRunSpawnResponseV1 {
667            parent_run_id: self.env.run_id,
668            child_run_id,
669            child_manifest_id: stored_id,
670        };
671        serde_json::to_value(resp).map_err(|_| {
672            child_io_error(
673                CODE_CHILD_RUN_TASK_FAILED,
674                ErrorCategory::Unknown,
675                "failed to serialize child run spawn response",
676            )
677        })
678    }
679
680    async fn await_child(&mut self, call: IoCall) -> Result<serde_json::Value, IoError> {
681        if call.fact_key.is_none() {
682            return Err(Self::missing_fact_key());
683        }
684        let req = Self::parse_await_request(&call)?;
685
686        let stores = self.env.stores.clone();
687
688        let rr = if let Some(handle) = self.supervisor.take(req.child_run_id).await {
689            match handle.await {
690                Ok(res) => res.map_err(|_| {
691                    child_io_error(
692                        CODE_CHILD_RUN_TASK_FAILED,
693                        ErrorCategory::Unknown,
694                        "child run task failed",
695                    )
696                })?,
697                Err(_) => {
698                    return Err(child_io_error(
699                        CODE_CHILD_RUN_TASK_FAILED,
700                        ErrorCategory::Unknown,
701                        "child run task panicked or was cancelled",
702                    ));
703                }
704            }
705        } else {
706            match self
707                .child_engine
708                .resume(stores.clone(), req.child_run_id)
709                .await
710            {
711                Ok(rr) => rr,
712                Err(RunError::Storage(StorageError::NotFound(info)))
713                    if info.code.0 == "run_not_found" =>
714                {
715                    let manifest = read_manifest(stores.artifacts.as_ref(), &req.child_manifest_id)
716                        .await
717                        .map_err(|_| {
718                            child_io_error(
719                                CODE_CHILD_RUN_ENGINE_FAILED,
720                                ErrorCategory::Storage,
721                                "failed to read child run manifest",
722                            )
723                        })?;
724                    let plan = self.child_engine.resolver.resolve(&manifest).map_err(|_| {
725                        child_io_error(
726                            CODE_CHILD_RUN_ENGINE_FAILED,
727                            ErrorCategory::Unknown,
728                            "failed to resolve child run plan",
729                        )
730                    })?;
731
732                    let run_config = manifest.run_config.clone();
733                    let start = StartRun {
734                        manifest,
735                        manifest_id: req.child_manifest_id.clone(),
736                        plan,
737                        run_config,
738                        initial_context: Box::new(crate::context_runtime::JsonContext::new()),
739                    };
740                    self.child_engine
741                        .start_with_id(stores.clone(), start, req.child_run_id)
742                        .await
743                        .map_err(|_| {
744                            child_io_error(
745                                CODE_CHILD_RUN_ENGINE_FAILED,
746                                ErrorCategory::Unknown,
747                                "failed to start missing child run",
748                            )
749                        })?
750                }
751                Err(_) => {
752                    return Err(child_io_error(
753                        CODE_CHILD_RUN_ENGINE_FAILED,
754                        ErrorCategory::Unknown,
755                        "failed to resume child run",
756                    ));
757                }
758            }
759        };
760
761        let status = match rr.phase {
762            RunPhase::Completed => RunStatus::Completed,
763            RunPhase::Failed => RunStatus::Failed,
764            RunPhase::Cancelled => RunStatus::Cancelled,
765            RunPhase::Running => RunStatus::Failed,
766        };
767
768        let final_snapshot = if let Some(id) = &rr.final_snapshot_id {
769            let bytes = stores.artifacts.get(id).await.map_err(|_| {
770                child_io_error(
771                    CODE_CHILD_RUN_ENGINE_FAILED,
772                    ErrorCategory::Storage,
773                    "failed to read child final snapshot",
774                )
775            })?;
776            serde_json::from_slice::<serde_json::Value>(&bytes).map_err(|_| {
777                child_io_error(
778                    CODE_CHILD_RUN_ENGINE_FAILED,
779                    ErrorCategory::ParsingInput,
780                    "child final snapshot was invalid JSON",
781                )
782            })?
783        } else {
784            serde_json::Value::Null
785        };
786
787        let resp = ChildRunAwaitResponseV1 {
788            child_run_id: rr.run_id,
789            status,
790            final_snapshot_id: rr.final_snapshot_id,
791            final_snapshot,
792        };
793        serde_json::to_value(resp).map_err(|_| {
794            child_io_error(
795                CODE_CHILD_RUN_TASK_FAILED,
796                ErrorCategory::Unknown,
797                "failed to serialize child run await response",
798            )
799        })
800    }
801}
802
803#[async_trait]
804impl LiveIoTransport for ChildRunLiveIoTransport {
805    async fn call(&mut self, call: IoCall) -> Result<serde_json::Value, IoError> {
806        match call.namespace.as_str() {
807            NAMESPACE_CHILD_RUN_SPAWN => self.spawn_child(call).await,
808            NAMESPACE_CHILD_RUN_AWAIT => self.await_child(call).await,
809            _ => self.inner.call(call).await,
810        }
811    }
812}