Skip to main content

meerkat_mob/
run.rs

1//! Flow run data model.
2
3use crate::definition::{FlowSpec, LimitsSpec, SupervisorSpec, TopologySpec};
4use crate::error::MobError;
5use crate::ids::{
6    FlowId, FrameId, LoopId, LoopInstanceId, MeerkatId, MobId, ProfileName, RunId, StepId,
7};
8use chrono::{DateTime, Utc};
9use indexmap::IndexMap;
10use meerkat_machine_kernels::generated::flow_run;
11use meerkat_machine_kernels::{KernelInput, KernelState, KernelValue};
12use serde::{Deserialize, Serialize};
13use std::collections::{BTreeMap, VecDeque};
14
15/// Snapshot of a FlowFrameMachine kernel state stored per-frame in MobRun.
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
17pub struct FrameSnapshot {
18    pub kernel_state: KernelState,
19}
20
21/// Snapshot of a LoopIterationMachine kernel state stored per-loop in MobRun.
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
23pub struct LoopSnapshot {
24    pub kernel_state: KernelState,
25}
26
27/// Ledger entry recording the mapping of a loop iteration to its body frame.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct LoopIterationLedgerEntry {
30    pub loop_instance_id: LoopInstanceId,
31    pub iteration: u64,
32    pub frame_id: FrameId,
33}
34
35/// Persisted flow run aggregate.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct MobRun {
38    pub run_id: RunId,
39    pub mob_id: MobId,
40    pub flow_id: FlowId,
41    pub status: MobRunStatus,
42    pub flow_state: KernelState,
43    pub activation_params: serde_json::Value,
44    pub created_at: DateTime<Utc>,
45    pub completed_at: Option<DateTime<Utc>>,
46    pub step_ledger: Vec<StepLedgerEntry>,
47    pub failure_ledger: Vec<FailureLedgerEntry>,
48    /// Per-frame kernel snapshots indexed by FrameId.
49    #[serde(default)]
50    pub frames: BTreeMap<FrameId, FrameSnapshot>,
51    /// Per-loop kernel snapshots indexed by LoopInstanceId.
52    #[serde(default)]
53    pub loops: BTreeMap<LoopInstanceId, LoopSnapshot>,
54    /// Ordered ledger of loop iteration → body frame mappings.
55    #[serde(default)]
56    pub loop_iteration_ledger: Vec<LoopIterationLedgerEntry>,
57    /// Schema version: 0 for legacy runs, 4 for self-describing frame-aware runs.
58    #[serde(default)]
59    pub schema_version: u32,
60    /// Root frame step outputs keyed by step_id string.
61    #[serde(default)]
62    pub root_step_outputs: IndexMap<StepId, serde_json::Value>,
63    /// Loop iteration outputs: key=loop_id, value=per-iteration step outputs.
64    ///
65    /// Uses `BTreeMap` (not `IndexMap`) so that recovery reconciliation can iterate
66    /// in stable key order without depending on insertion order.
67    #[serde(default)]
68    pub loop_iteration_outputs: BTreeMap<LoopId, Vec<IndexMap<StepId, serde_json::Value>>>,
69}
70
71impl MobRun {
72    /// Read-only access to the run's current status.
73    pub fn status(&self) -> &MobRunStatus {
74        &self.status
75    }
76
77    /// Read-only access to the run's current flow state.
78    pub fn flow_state(&self) -> &KernelState {
79        &self.flow_state
80    }
81}
82
83impl MobRun {
84    pub fn pending(
85        mob_id: MobId,
86        flow_id: FlowId,
87        flow_state: KernelState,
88        activation_params: serde_json::Value,
89    ) -> Self {
90        Self {
91            run_id: RunId::new(),
92            mob_id,
93            flow_id,
94            status: MobRunStatus::Pending,
95            flow_state,
96            activation_params,
97            created_at: Utc::now(),
98            completed_at: None,
99            step_ledger: Vec::new(),
100            failure_ledger: Vec::new(),
101            frames: BTreeMap::new(),
102            loops: BTreeMap::new(),
103            loop_iteration_ledger: Vec::new(),
104            schema_version: 4,
105            root_step_outputs: IndexMap::new(),
106            loop_iteration_outputs: BTreeMap::new(),
107        }
108    }
109
110    pub fn flow_state_for_config(config: &FlowRunConfig) -> Result<KernelState, MobError> {
111        let initial = flow_run::initial_state().map_err(|error| {
112            MobError::Internal(format!("flow_run initial_state failed: {error}"))
113        })?;
114        let ordered_steps = topological_steps(&config.flow_spec)?;
115        let step_ids = config
116            .flow_spec
117            .steps
118            .keys()
119            .map(|step_id| KernelValue::String(step_id.to_string()))
120            .collect();
121        let ordered_steps = ordered_steps
122            .into_iter()
123            .map(|step_id| KernelValue::String(step_id.to_string()))
124            .collect();
125        let step_dependencies = config
126            .flow_spec
127            .steps
128            .iter()
129            .map(|(step_id, step)| {
130                (
131                    KernelValue::String(step_id.to_string()),
132                    KernelValue::Seq(
133                        step.depends_on
134                            .iter()
135                            .map(|dependency| KernelValue::String(dependency.to_string()))
136                            .collect(),
137                    ),
138                )
139            })
140            .collect();
141        let step_dependency_modes = config
142            .flow_spec
143            .steps
144            .iter()
145            .map(|(step_id, step)| {
146                (
147                    KernelValue::String(step_id.to_string()),
148                    dependency_mode_value(step.depends_on_mode.clone()),
149                )
150            })
151            .collect();
152        let step_has_conditions = config
153            .flow_spec
154            .steps
155            .iter()
156            .map(|(step_id, step)| {
157                (
158                    KernelValue::String(step_id.to_string()),
159                    KernelValue::Bool(step.condition.is_some()),
160                )
161            })
162            .collect();
163        let step_branches = config
164            .flow_spec
165            .steps
166            .iter()
167            .map(|(step_id, step)| {
168                (
169                    KernelValue::String(step_id.to_string()),
170                    step.branch.as_ref().map_or(KernelValue::None, |branch| {
171                        KernelValue::String(branch.to_string())
172                    }),
173                )
174            })
175            .collect();
176        let step_collection_policies = config
177            .flow_spec
178            .steps
179            .iter()
180            .map(|(step_id, step)| {
181                (
182                    KernelValue::String(step_id.to_string()),
183                    collection_policy_kind_value(&step.collection_policy),
184                )
185            })
186            .collect();
187        let step_quorum_thresholds = config
188            .flow_spec
189            .steps
190            .iter()
191            .map(|(step_id, step)| {
192                let threshold = match step.collection_policy {
193                    crate::definition::CollectionPolicy::Quorum { n } => u64::from(n),
194                    _ => 0,
195                };
196                (
197                    KernelValue::String(step_id.to_string()),
198                    KernelValue::U64(threshold),
199                )
200            })
201            .collect();
202        let escalation_threshold = config
203            .supervisor
204            .as_ref()
205            .map_or(0, |supervisor| u64::from(supervisor.escalation_threshold));
206        let max_step_retries = config
207            .limits
208            .as_ref()
209            .and_then(|limits| limits.max_step_retries)
210            .map_or(0, u64::from);
211        let input = KernelInput {
212            variant: "CreateRun".to_string(),
213            fields: BTreeMap::from([
214                ("step_ids".to_string(), KernelValue::Seq(step_ids)),
215                ("ordered_steps".to_string(), KernelValue::Seq(ordered_steps)),
216                (
217                    "step_dependencies".to_string(),
218                    KernelValue::Map(step_dependencies),
219                ),
220                (
221                    "step_dependency_modes".to_string(),
222                    KernelValue::Map(step_dependency_modes),
223                ),
224                (
225                    "step_has_conditions".to_string(),
226                    KernelValue::Map(step_has_conditions),
227                ),
228                ("step_branches".to_string(), KernelValue::Map(step_branches)),
229                (
230                    "step_collection_policies".to_string(),
231                    KernelValue::Map(step_collection_policies),
232                ),
233                (
234                    "step_quorum_thresholds".to_string(),
235                    KernelValue::Map(step_quorum_thresholds),
236                ),
237                (
238                    "escalation_threshold".to_string(),
239                    KernelValue::U64(escalation_threshold),
240                ),
241                (
242                    "max_step_retries".to_string(),
243                    KernelValue::U64(max_step_retries),
244                ),
245                // v2 scheduler limits — read from config, default to 0 (unlimited/disabled)
246                (
247                    "max_active_nodes".to_string(),
248                    KernelValue::U64(
249                        config
250                            .limits
251                            .as_ref()
252                            .and_then(|l| l.max_active_nodes)
253                            .unwrap_or(0),
254                    ),
255                ),
256                (
257                    "max_active_frames".to_string(),
258                    KernelValue::U64(
259                        config
260                            .limits
261                            .as_ref()
262                            .and_then(|l| l.max_active_frames)
263                            .unwrap_or(0),
264                    ),
265                ),
266                (
267                    "max_frame_depth".to_string(),
268                    KernelValue::U64(
269                        config
270                            .limits
271                            .as_ref()
272                            .and_then(|l| l.max_frame_depth)
273                            .unwrap_or(0),
274                    ),
275                ),
276            ]),
277        };
278        let outcome = flow_run::transition(&initial, &input)
279            .map_err(|error| MobError::Internal(format!("flow_run CreateRun failed: {error}")))?;
280        Ok(outcome.next_state)
281    }
282
283    pub fn flow_state_for_steps<I>(step_ids: I) -> Result<KernelState, MobError>
284    where
285        I: IntoIterator<Item = StepId>,
286    {
287        let mut steps = IndexMap::new();
288        for step_id in step_ids {
289            steps.insert(
290                step_id,
291                crate::definition::FlowStepSpec {
292                    role: ProfileName::from("worker"),
293                    message: meerkat_core::types::ContentInput::from("placeholder"),
294                    depends_on: Vec::new(),
295                    dispatch_mode: crate::definition::DispatchMode::FanOut,
296                    collection_policy: crate::definition::CollectionPolicy::All,
297                    condition: None,
298                    timeout_ms: None,
299                    expected_schema_ref: None,
300                    branch: None,
301                    depends_on_mode: crate::definition::DependencyMode::All,
302                    allowed_tools: None,
303                    blocked_tools: None,
304                    output_format: crate::definition::StepOutputFormat::Json,
305                },
306            );
307        }
308        Self::flow_state_for_config(&FlowRunConfig {
309            flow_id: FlowId::from("placeholder"),
310            flow_spec: FlowSpec {
311                description: None,
312                steps,
313                root: None,
314            },
315            topology: None,
316            supervisor: None,
317            limits: None,
318            orchestrator_role: None,
319        })
320    }
321}
322
323fn dependency_mode_value(mode: crate::definition::DependencyMode) -> KernelValue {
324    let variant = match mode {
325        crate::definition::DependencyMode::All => "All",
326        crate::definition::DependencyMode::Any => "Any",
327    };
328    KernelValue::NamedVariant {
329        enum_name: "DependencyMode".to_string(),
330        variant: variant.to_string(),
331    }
332}
333
334fn collection_policy_kind_value(policy: &crate::definition::CollectionPolicy) -> KernelValue {
335    let variant = match policy {
336        crate::definition::CollectionPolicy::All => "All",
337        crate::definition::CollectionPolicy::Any => "Any",
338        crate::definition::CollectionPolicy::Quorum { .. } => "Quorum",
339    };
340    KernelValue::NamedVariant {
341        enum_name: "CollectionPolicyKind".to_string(),
342        variant: variant.to_string(),
343    }
344}
345
346fn topological_steps(flow_spec: &FlowSpec) -> Result<Vec<StepId>, MobError> {
347    let mut in_degree: BTreeMap<StepId, usize> = BTreeMap::new();
348    let mut outgoing: BTreeMap<StepId, Vec<StepId>> = BTreeMap::new();
349
350    for step_id in flow_spec.steps.keys() {
351        in_degree.insert(step_id.clone(), 0);
352        outgoing.entry(step_id.clone()).or_default();
353    }
354
355    for (step_id, step) in &flow_spec.steps {
356        for dependency in &step.depends_on {
357            if !in_degree.contains_key(dependency) {
358                return Err(MobError::Internal(format!(
359                    "step '{step_id}' depends on unknown step '{dependency}'"
360                )));
361            }
362            *in_degree.entry(step_id.clone()).or_insert(0) += 1;
363            outgoing
364                .entry(dependency.clone())
365                .or_default()
366                .push(step_id.clone());
367        }
368    }
369
370    let mut queue = VecDeque::new();
371    for step_id in flow_spec.steps.keys() {
372        if in_degree.get(step_id) == Some(&0) {
373            queue.push_back(step_id.clone());
374        }
375    }
376
377    let mut ordered = Vec::with_capacity(flow_spec.steps.len());
378    while let Some(next) = queue.pop_front() {
379        ordered.push(next.clone());
380        if let Some(children) = outgoing.get(&next) {
381            for child in children {
382                if let Some(count) = in_degree.get_mut(child)
383                    && *count > 0
384                {
385                    *count -= 1;
386                    if *count == 0 {
387                        queue.push_back(child.clone());
388                    }
389                }
390            }
391        }
392    }
393
394    if ordered.len() != flow_spec.steps.len() {
395        return Err(MobError::Internal(
396            "flow contains a cycle; cannot compute topological order".to_string(),
397        ));
398    }
399
400    Ok(ordered)
401}
402
403/// Run lifecycle states.
404#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
405#[serde(rename_all = "snake_case")]
406pub enum MobRunStatus {
407    Pending,
408    Running,
409    Completed,
410    Failed,
411    Canceled,
412}
413
414impl MobRunStatus {
415    pub fn is_terminal(&self) -> bool {
416        matches!(self, Self::Completed | Self::Failed | Self::Canceled)
417    }
418}
419
420/// Per-target step execution ledger entry.
421#[derive(Debug, Clone, Serialize, Deserialize)]
422pub struct StepLedgerEntry {
423    pub step_id: StepId,
424    pub meerkat_id: MeerkatId,
425    pub status: StepRunStatus,
426    pub output: Option<serde_json::Value>,
427    pub timestamp: DateTime<Utc>,
428}
429
430/// Step execution state.
431#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
432#[serde(rename_all = "snake_case")]
433pub enum StepRunStatus {
434    Dispatched,
435    Completed,
436    Failed,
437    Skipped,
438    Canceled,
439}
440
441/// Flow-level failure log entry.
442#[derive(Debug, Clone, Serialize, Deserialize)]
443pub struct FailureLedgerEntry {
444    pub step_id: StepId,
445    pub reason: String,
446    pub timestamp: DateTime<Utc>,
447}
448
449/// Immutable per-run flow snapshot.
450#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
451pub struct FlowRunConfig {
452    pub flow_id: FlowId,
453    pub flow_spec: FlowSpec,
454    pub topology: Option<TopologySpec>,
455    pub supervisor: Option<SupervisorSpec>,
456    pub limits: Option<LimitsSpec>,
457    pub orchestrator_role: Option<ProfileName>,
458}
459
460impl FlowRunConfig {
461    pub fn from_definition(
462        flow_id: FlowId,
463        definition: &crate::definition::MobDefinition,
464    ) -> Result<Self, MobError> {
465        let flow_spec = definition
466            .flows
467            .get(&flow_id)
468            .cloned()
469            .ok_or_else(|| MobError::FlowNotFound(flow_id.clone()))?;
470        let topology = definition.topology.clone();
471        let orchestrator_role = definition
472            .orchestrator
473            .as_ref()
474            .map(|orchestrator| orchestrator.profile.clone());
475        if topology.is_some() && orchestrator_role.is_none() {
476            return Err(MobError::Internal(
477                "topology requires an orchestrator profile".to_string(),
478            ));
479        }
480        Ok(Self {
481            flow_id,
482            flow_spec,
483            topology,
484            supervisor: definition.supervisor.clone(),
485            limits: definition.limits.clone(),
486            orchestrator_role,
487        })
488    }
489}
490
491/// Per-loop iteration output history, ordered by iteration index.
492#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
493pub struct LoopContextHistory {
494    /// One entry per completed iteration, ordered by iteration index.
495    pub iterations: Vec<IndexMap<StepId, serde_json::Value>>,
496}
497
498/// Runtime context available to condition evaluators.
499#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
500pub struct FlowContext {
501    pub run_id: RunId,
502    pub activation_params: serde_json::Value,
503    /// Root frame step outputs keyed by step_id.
504    pub step_outputs: IndexMap<StepId, serde_json::Value>,
505    /// Per-loop iteration history keyed by loop_id.
506    #[serde(default)]
507    pub loop_outputs: IndexMap<LoopId, LoopContextHistory>,
508}
509
510impl FlowContext {
511    /// Rebuild a `FlowContext` from a persisted `MobRun` aggregate.
512    pub fn from_run_aggregate(
513        run: &MobRun,
514        run_id: RunId,
515        activation_params: serde_json::Value,
516    ) -> Self {
517        let loop_outputs = run
518            .loop_iteration_outputs
519            .iter()
520            .map(|(loop_id, iterations)| {
521                let history = LoopContextHistory {
522                    iterations: iterations.clone(),
523                };
524                (loop_id.clone(), history)
525            })
526            .collect();
527
528        // Seed step_outputs from root outputs, then project last-iteration
529        // outputs from each completed loop into step_outputs (dogma Rule 13:
530        // the projection must match what execute_frame_inner does at runtime —
531        // the last iteration's body step outputs are merged into step_outputs
532        // so that downstream steps/templates see them at steps.<id>).
533        let mut step_outputs = run.root_step_outputs.clone();
534        for iterations in run.loop_iteration_outputs.values() {
535            if let Some(last_iter) = iterations.last() {
536                for (sid, out) in last_iter {
537                    step_outputs.insert(sid.clone(), out.clone());
538                }
539            }
540        }
541
542        FlowContext {
543            run_id,
544            activation_params,
545            step_outputs,
546            loop_outputs,
547        }
548    }
549}
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554    use crate::definition::{
555        BackendConfig, ConditionExpr, DispatchMode, FlowStepSpec, MobDefinition,
556        OrchestratorConfig, WiringRules,
557    };
558    use crate::ids::{BranchId, ProfileName};
559    use crate::profile::{Profile, ProfileBinding, ToolConfig};
560    use meerkat_core::types::ContentInput;
561    use std::collections::BTreeMap;
562
563    fn sample_definition() -> MobDefinition {
564        let mut steps = IndexMap::new();
565        steps.insert(
566            StepId::from("s1"),
567            FlowStepSpec {
568                role: ProfileName::from("worker"),
569                message: ContentInput::from("do it"),
570                depends_on: Vec::new(),
571                dispatch_mode: DispatchMode::FanOut,
572                collection_policy: crate::definition::CollectionPolicy::All,
573                condition: Some(ConditionExpr::Eq {
574                    path: "params.ok".to_string(),
575                    value: serde_json::json!(true),
576                }),
577                timeout_ms: Some(2000),
578                expected_schema_ref: Some("schema.json".to_string()),
579                branch: Some(BranchId::from("branch-a")),
580                depends_on_mode: crate::definition::DependencyMode::All,
581                allowed_tools: None,
582                blocked_tools: None,
583                output_format: crate::definition::StepOutputFormat::Json,
584            },
585        );
586
587        let mut flows = BTreeMap::new();
588        flows.insert(
589            FlowId::from("flow-a"),
590            FlowSpec {
591                description: Some("demo flow".to_string()),
592                steps,
593                root: None,
594            },
595        );
596
597        let mut profiles = BTreeMap::new();
598        profiles.insert(
599            ProfileName::from("lead"),
600            ProfileBinding::Inline(Profile {
601                model: "model".to_string(),
602                skills: Vec::new(),
603                tools: ToolConfig::default(),
604                peer_description: "lead".to_string(),
605                external_addressable: true,
606                backend: None,
607                runtime_mode: crate::MobRuntimeMode::AutonomousHost,
608                max_inline_peer_notifications: None,
609                output_schema: None,
610                provider_params: None,
611            }),
612        );
613        profiles.insert(
614            ProfileName::from("worker"),
615            ProfileBinding::Inline(Profile {
616                model: "model".to_string(),
617                skills: Vec::new(),
618                tools: ToolConfig::default(),
619                peer_description: "worker".to_string(),
620                external_addressable: false,
621                backend: None,
622                runtime_mode: crate::MobRuntimeMode::AutonomousHost,
623                max_inline_peer_notifications: None,
624                output_schema: None,
625                provider_params: None,
626            }),
627        );
628
629        MobDefinition {
630            id: MobId::from("mob"),
631            orchestrator: Some(OrchestratorConfig {
632                profile: ProfileName::from("lead"),
633            }),
634            profiles,
635            mcp_servers: BTreeMap::new(),
636            wiring: WiringRules::default(),
637            skills: BTreeMap::new(),
638            backend: BackendConfig::default(),
639            flows,
640            topology: Some(TopologySpec {
641                mode: crate::definition::PolicyMode::Advisory,
642                rules: vec![crate::definition::TopologyRule {
643                    from_role: ProfileName::from("lead"),
644                    to_role: ProfileName::from("worker"),
645                    allowed: true,
646                }],
647            }),
648            supervisor: Some(SupervisorSpec {
649                role: ProfileName::from("lead"),
650                escalation_threshold: 3,
651            }),
652            limits: Some(LimitsSpec {
653                max_flow_duration_ms: Some(60_000),
654                max_step_retries: Some(1),
655                max_orphaned_turns: Some(8),
656                cancel_grace_timeout_ms: None,
657                ..Default::default()
658            }),
659            spawn_policy: None,
660            event_router: None,
661            owner_session_id: None,
662            session_cleanup_policy: crate::definition::SessionCleanupPolicy::Manual,
663            is_implicit: false,
664        }
665    }
666
667    #[test]
668    fn test_run_status_terminal() {
669        assert!(MobRunStatus::Completed.is_terminal());
670        assert!(MobRunStatus::Failed.is_terminal());
671        assert!(MobRunStatus::Canceled.is_terminal());
672        assert!(!MobRunStatus::Pending.is_terminal());
673        assert!(!MobRunStatus::Running.is_terminal());
674    }
675
676    #[test]
677    fn test_flow_run_config_from_definition() {
678        let def = sample_definition();
679        let config = FlowRunConfig::from_definition(FlowId::from("flow-a"), &def).unwrap();
680        assert_eq!(config.flow_id, FlowId::from("flow-a"));
681        assert_eq!(config.flow_spec.steps.len(), 1);
682        assert_eq!(
683            config.orchestrator_role.as_ref(),
684            Some(&ProfileName::from("lead"))
685        );
686    }
687
688    #[test]
689    fn test_flow_run_config_from_definition_missing_flow() {
690        let def = sample_definition();
691        let error = FlowRunConfig::from_definition(FlowId::from("missing"), &def).unwrap_err();
692        assert!(matches!(error, MobError::FlowNotFound(name) if name == FlowId::from("missing")));
693    }
694
695    #[test]
696    fn test_flow_run_config_rejects_topology_without_orchestrator() {
697        let mut def = sample_definition();
698        def.orchestrator = None;
699        let error = FlowRunConfig::from_definition(FlowId::from("flow-a"), &def).unwrap_err();
700        assert!(
701            matches!(error, MobError::Internal(message) if message.contains("topology requires")),
702            "expected explicit topology/orchestrator configuration error"
703        );
704    }
705
706    #[test]
707    fn test_mob_run_roundtrip_json() {
708        let now = Utc::now();
709        let run = MobRun {
710            run_id: RunId::new(),
711            mob_id: MobId::from("mob"),
712            flow_id: FlowId::from("flow-a"),
713            status: MobRunStatus::Running,
714            flow_state: MobRun::flow_state_for_steps([StepId::from("step-1")]).unwrap(),
715            activation_params: serde_json::json!({"k":"v"}),
716            created_at: now,
717            completed_at: None,
718            step_ledger: vec![StepLedgerEntry {
719                step_id: StepId::from("step-1"),
720                meerkat_id: MeerkatId::from("agent-1"),
721                status: StepRunStatus::Completed,
722                output: Some(serde_json::json!({"ok":true})),
723                timestamp: now,
724            }],
725            failure_ledger: vec![FailureLedgerEntry {
726                step_id: StepId::from("step-2"),
727                reason: "boom".to_string(),
728                timestamp: now,
729            }],
730            frames: BTreeMap::new(),
731            loops: BTreeMap::new(),
732            loop_iteration_ledger: Vec::new(),
733            schema_version: 4,
734            root_step_outputs: IndexMap::new(),
735            loop_iteration_outputs: BTreeMap::new(),
736        };
737
738        let encoded = serde_json::to_string(&run).unwrap();
739        let decoded: MobRun = serde_json::from_str(&encoded).unwrap();
740        assert_eq!(decoded.flow_id, run.flow_id);
741        assert_eq!(decoded.step_ledger.len(), 1);
742        assert_eq!(decoded.failure_ledger.len(), 1);
743    }
744
745    #[test]
746    fn test_flow_context_roundtrip_json() {
747        let mut outputs = IndexMap::new();
748        outputs.insert(StepId::from("step-1"), serde_json::json!({"a":1}));
749        let context = FlowContext {
750            run_id: RunId::new(),
751            activation_params: serde_json::json!({"input":"x"}),
752            step_outputs: outputs,
753            loop_outputs: IndexMap::new(),
754        };
755
756        let encoded = serde_json::to_string(&context).unwrap();
757        let decoded: FlowContext = serde_json::from_str(&encoded).unwrap();
758        assert_eq!(decoded.step_outputs.len(), 1);
759        assert_eq!(decoded.activation_params["input"], "x");
760    }
761}