Skip to main content

meerkat_mob/runtime/
flow_frame_kernel.rs

1//! FlowFrameKernel: sealed mutator for FlowFrameMachine state.
2//!
3//! All frame state mutations route through the generated `flow_frame::transition`
4//! + `cas_frame_state`, enforcing the machine authority rule at compile time.
5
6use crate::definition::{DependencyMode, FlowNodeSpec, FrameSpec};
7use crate::error::MobError;
8use crate::ids::{FlowNodeId, FrameId, LoopId, LoopInstanceId, RunId, StepId};
9use crate::run::FrameSnapshot;
10use crate::store::MobRunStore;
11use meerkat_machine_kernels::generated::flow_frame;
12use meerkat_machine_kernels::{KernelEffect, KernelInput, KernelValue};
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::sync::Arc;
15
16mod sealed {
17    pub trait Sealed {}
18}
19
20// ─── StepCompletionOpts ──────────────────────────────────────────────────────
21
22/// Options for completing a step node and recording its output.
23pub struct StepCompletionOpts<'a> {
24    /// The frame node that was admitted as a step.
25    pub node_id: &'a FlowNodeId,
26    /// The step ID used to store the output.
27    pub step_id: &'a StepId,
28    /// The output value produced by the step executor.
29    pub output: serde_json::Value,
30    /// `None` for root frame steps (stored in `root_step_outputs`).
31    /// `Some((loop_id, iteration))` for loop body steps (stored in
32    /// `loop_iteration_outputs[loop_id][iteration]`).
33    pub loop_context: Option<(&'a LoopId, u64)>,
34    /// Maximum number of CAS retries before returning an error.
35    pub max_retries: usize,
36}
37
38// ─── FlowFrameMutator ────────────────────────────────────────────────────────
39
40/// Sealed mutator trait for FlowFrame state transitions.
41///
42/// Only `FlowFrameKernel` implements this. All frame state mutations flow
43/// through `flow_frame::transition()` + `cas_frame_state`.
44#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
45#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
46pub trait FlowFrameMutator: sealed::Sealed {
47    /// Start a frame from a `FrameSpec` (arbitrary DAG).
48    ///
49    /// Returns the initial `FrameSnapshot` in Running state.
50    async fn start_frame(
51        &self,
52        run_id: &RunId,
53        frame_id: &FrameId,
54        spec: &FrameSpec,
55    ) -> Result<FrameSnapshot, MobError>;
56
57    /// Admit the next ready node in the frame. Returns the effects emitted (e.g.
58    /// `AdmitStepWork` or `StartLoopNode`), or `None` if the queue was empty.
59    async fn admit_next_ready_node(
60        &self,
61        run_id: &RunId,
62        frame_id: &FrameId,
63    ) -> Result<Option<Vec<KernelEffect>>, MobError>;
64
65    /// Admit the next ready node with up to `max_retries` CAS retries.
66    ///
67    /// Returns `Ok(Some(effects))` on success, `Ok(None)` if the queue is
68    /// genuinely empty, or `Err` if every attempt lost the CAS (contention).
69    async fn admit_next_ready_node_with_retry(
70        &self,
71        run_id: &RunId,
72        frame_id: &FrameId,
73        max_retries: usize,
74    ) -> Result<Option<Vec<KernelEffect>>, MobError>;
75
76    /// Complete a step node and record its output, with CAS retry.
77    async fn complete_step(
78        &self,
79        run_id: &RunId,
80        frame_id: &FrameId,
81        opts: StepCompletionOpts<'_>,
82    ) -> Result<(), MobError>;
83
84    /// Mark a node as completed. Returns `true` if the CAS succeeded.
85    async fn complete_node(
86        &self,
87        run_id: &RunId,
88        frame_id: &FrameId,
89        node_id: &FlowNodeId,
90    ) -> Result<bool, MobError>;
91
92    /// Mark a node as failed. Returns `true` if the CAS succeeded.
93    async fn fail_node(
94        &self,
95        run_id: &RunId,
96        frame_id: &FrameId,
97        node_id: &FlowNodeId,
98    ) -> Result<bool, MobError>;
99
100    /// Mark a node as skipped. Returns `true` if the CAS succeeded.
101    async fn skip_node(
102        &self,
103        run_id: &RunId,
104        frame_id: &FrameId,
105        node_id: &FlowNodeId,
106    ) -> Result<bool, MobError>;
107
108    /// Mark a node as canceled. Returns `true` if the CAS succeeded.
109    async fn cancel_node(
110        &self,
111        run_id: &RunId,
112        frame_id: &FrameId,
113        node_id: &FlowNodeId,
114    ) -> Result<bool, MobError>;
115
116    /// Terminalize the frame as completed. Returns `true` if the CAS succeeded.
117    async fn terminalize_frame(&self, run_id: &RunId, frame_id: &FrameId)
118    -> Result<bool, MobError>;
119}
120
121// ─── FlowFrameKernel ─────────────────────────────────────────────────────────
122
123/// Concrete implementation of `FlowFrameMutator`.
124pub struct FlowFrameKernel {
125    run_store: Arc<dyn MobRunStore>,
126}
127
128impl FlowFrameKernel {
129    pub fn new(run_store: Arc<dyn MobRunStore>) -> Self {
130        Self { run_store }
131    }
132
133    fn node_val(node_id: &FlowNodeId) -> KernelValue {
134        KernelValue::String(node_id.to_string())
135    }
136
137    /// Read the current `FrameSnapshot` for a frame, returning an error if not found.
138    async fn require_frame(
139        &self,
140        run_id: &RunId,
141        frame_id: &FrameId,
142    ) -> Result<FrameSnapshot, MobError> {
143        let run = self
144            .run_store
145            .get_run(run_id)
146            .await?
147            .ok_or_else(|| MobError::RunNotFound(run_id.clone()))?;
148        run.frames.get(frame_id).cloned().ok_or_else(|| {
149            MobError::Internal(format!("frame '{frame_id}' not found in run '{run_id}'"))
150        })
151    }
152
153    /// Apply a transition to the current frame state via CAS.
154    ///
155    /// Reads the current snapshot, applies `input` to produce `next_snapshot`,
156    /// then CAS-updates the store. Returns the effects from the transition on
157    /// success. Returns `Err` on both transition failure and CAS exhaustion —
158    /// callers must treat `Ok(Some(effects))` as the only success path.
159    async fn transition_frame(
160        &self,
161        run_id: &RunId,
162        frame_id: &FrameId,
163        input: KernelInput,
164        max_retries: usize,
165    ) -> Result<Vec<KernelEffect>, MobError> {
166        for _ in 0..=max_retries {
167            let current = self.require_frame(run_id, frame_id).await?;
168            let outcome = flow_frame::transition(&current.kernel_state, &input)
169                .map_err(|e| MobError::Internal(format!("flow_frame transition failed: {e:?}")))?;
170            let next = FrameSnapshot {
171                kernel_state: outcome.next_state,
172            };
173            let effects = outcome.effects.clone();
174            let won = self
175                .run_store
176                .cas_frame_state(run_id, frame_id, Some(&current), next)
177                .await?;
178            if won {
179                return Ok(effects);
180            }
181        }
182        Err(MobError::Internal(format!(
183            "transition_frame: CAS exhausted {max_retries} retries for frame '{frame_id}'"
184        )))
185    }
186}
187
188impl sealed::Sealed for FlowFrameKernel {}
189
190#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
191#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
192impl FlowFrameMutator for FlowFrameKernel {
193    async fn start_frame(
194        &self,
195        run_id: &RunId,
196        frame_id: &FrameId,
197        spec: &FrameSpec,
198    ) -> Result<FrameSnapshot, MobError> {
199        // Resume guard: if the frame was already started (e.g. crash-recovery),
200        // return the existing snapshot rather than re-initializing it.
201        let run = self
202            .run_store
203            .get_run(run_id)
204            .await?
205            .ok_or_else(|| MobError::RunNotFound(run_id.clone()))?;
206        if let Some(existing) = run.frames.get(frame_id) {
207            return Ok(existing.clone());
208        }
209
210        let initial = flow_frame::initial_state()
211            .map_err(|e| MobError::Internal(format!("flow_frame initial_state failed: {e:?}")))?;
212        let ordered = topological_order(spec)?;
213        let start_input = build_start_root_frame_input(frame_id, spec, &ordered);
214        let outcome = flow_frame::transition(&initial, &start_input)
215            .map_err(|e| MobError::Internal(format!("flow_frame StartRootFrame failed: {e:?}")))?;
216        let snapshot = FrameSnapshot {
217            kernel_state: outcome.next_state,
218        };
219        // CAS-insert the new frame (expected = None means "must not yet exist").
220        let inserted = self
221            .run_store
222            .cas_frame_state(run_id, frame_id, None, snapshot.clone())
223            .await?;
224        if !inserted {
225            // A concurrent writer started the frame between our read and insert.
226            // Read the winner's snapshot and return it.
227            let run2 = self
228                .run_store
229                .get_run(run_id)
230                .await?
231                .ok_or_else(|| MobError::RunNotFound(run_id.clone()))?;
232            return run2.frames.get(frame_id).cloned().ok_or_else(|| {
233                MobError::Internal(format!(
234                    "frame '{frame_id}' missing after concurrent insert in run '{run_id}'"
235                ))
236            });
237        }
238        Ok(snapshot)
239    }
240
241    async fn admit_next_ready_node(
242        &self,
243        run_id: &RunId,
244        frame_id: &FrameId,
245    ) -> Result<Option<Vec<KernelEffect>>, MobError> {
246        let input = KernelInput {
247            variant: "AdmitNextReadyNode".into(),
248            fields: BTreeMap::new(),
249        };
250        // Map Ok(effects) → Ok(Some(effects)); errors propagate as-is.
251        self.transition_frame(run_id, frame_id, input, 5)
252            .await
253            .map(Some)
254    }
255
256    async fn admit_next_ready_node_with_retry(
257        &self,
258        run_id: &RunId,
259        frame_id: &FrameId,
260        max_retries: usize,
261    ) -> Result<Option<Vec<KernelEffect>>, MobError> {
262        for _ in 0..=max_retries {
263            let snap = self.require_frame(run_id, frame_id).await?;
264            let queue_empty = match snap.kernel_state.fields.get("ready_queue") {
265                Some(KernelValue::Seq(seq)) => seq.is_empty(),
266                _ => true,
267            };
268            if queue_empty {
269                return Ok(None); // genuinely nothing to admit
270            }
271
272            let admit_input = KernelInput {
273                variant: "AdmitNextReadyNode".into(),
274                fields: BTreeMap::new(),
275            };
276            let outcome = flow_frame::transition(&snap.kernel_state, &admit_input)
277                .map_err(|e| MobError::Internal(format!("AdmitNextReadyNode failed: {e:?}")))?;
278            let next_snap = FrameSnapshot {
279                kernel_state: outcome.next_state,
280            };
281
282            // The sequential FlowFrameEngine drives nodes one at a time and does not
283            // participate in FlowRunMachine's slot scheduler (ready_frames /
284            // max_active_nodes). Concurrency limits will be enforced by a future
285            // orchestrated multi-frame executor that registers frames before
286            // admission. For now: update frame state only via CAS.
287            let won = self
288                .run_store
289                .cas_frame_state(run_id, frame_id, Some(&snap), next_snap)
290                .await?;
291            if won {
292                return Ok(Some(outcome.effects));
293            }
294            // CAS lost — retry with a fresh snapshot read
295        }
296
297        // All retries exhausted due to CAS contention (queue was non-empty each
298        // time but another writer kept winning). This is distinct from "queue
299        // empty" and indicates a liveness issue rather than normal termination.
300        Err(MobError::Internal(format!(
301            "admit_next_ready_node: CAS exhausted {max_retries} retries for frame '{frame_id}' \
302             — queue was non-empty but every attempt lost the CAS"
303        )))
304    }
305
306    async fn complete_step(
307        &self,
308        run_id: &RunId,
309        frame_id: &FrameId,
310        opts: StepCompletionOpts<'_>,
311    ) -> Result<(), MobError> {
312        let StepCompletionOpts {
313            node_id,
314            step_id,
315            output,
316            loop_context,
317            max_retries,
318        } = opts;
319        for attempt in 0..=max_retries {
320            let snap = self.require_frame(run_id, frame_id).await?;
321            let complete_input = KernelInput {
322                variant: "CompleteNode".into(),
323                fields: BTreeMap::from([("node_id".into(), Self::node_val(node_id))]),
324            };
325            let next_outcome = flow_frame::transition(&snap.kernel_state, &complete_input)
326                .map_err(|e| MobError::Internal(format!("CompleteNode failed: {e:?}")))?;
327            let next_snap = FrameSnapshot {
328                kernel_state: next_outcome.next_state,
329            };
330            let won = self
331                .run_store
332                .cas_complete_step_and_record_output(
333                    run_id,
334                    frame_id,
335                    &snap,
336                    next_snap,
337                    step_id.to_string(),
338                    output.clone(),
339                    loop_context,
340                )
341                .await?;
342            if won {
343                return Ok(());
344            }
345            if attempt == max_retries {
346                return Err(MobError::Internal(format!(
347                    "CompleteNode CAS failed after {} attempts for node '{node_id}'",
348                    max_retries + 1
349                )));
350            }
351        }
352        // Unreachable — the loop always returns on the last attempt.
353        Err(MobError::Internal("CompleteNode CAS exhausted".into()))
354    }
355
356    async fn complete_node(
357        &self,
358        run_id: &RunId,
359        frame_id: &FrameId,
360        node_id: &FlowNodeId,
361    ) -> Result<bool, MobError> {
362        let input = KernelInput {
363            variant: "CompleteNode".into(),
364            fields: BTreeMap::from([("node_id".into(), Self::node_val(node_id))]),
365        };
366        // transition_frame now returns Err on CAS exhaustion rather than Ok(None),
367        // so Ok always means the transition fired successfully.
368        self.transition_frame(run_id, frame_id, input, 5)
369            .await
370            .map(|_| true)
371    }
372
373    async fn fail_node(
374        &self,
375        run_id: &RunId,
376        frame_id: &FrameId,
377        node_id: &FlowNodeId,
378    ) -> Result<bool, MobError> {
379        let input = KernelInput {
380            variant: "FailNode".into(),
381            fields: BTreeMap::from([("node_id".into(), Self::node_val(node_id))]),
382        };
383        self.transition_frame(run_id, frame_id, input, 5)
384            .await
385            .map(|_| true)
386    }
387
388    async fn skip_node(
389        &self,
390        run_id: &RunId,
391        frame_id: &FrameId,
392        node_id: &FlowNodeId,
393    ) -> Result<bool, MobError> {
394        let input = KernelInput {
395            variant: "SkipNode".into(),
396            fields: BTreeMap::from([("node_id".into(), Self::node_val(node_id))]),
397        };
398        self.transition_frame(run_id, frame_id, input, 5)
399            .await
400            .map(|_| true)
401    }
402
403    async fn terminalize_frame(
404        &self,
405        run_id: &RunId,
406        frame_id: &FrameId,
407    ) -> Result<bool, MobError> {
408        let input = KernelInput {
409            variant: "SealFrame".into(),
410            fields: BTreeMap::new(),
411        };
412        self.transition_frame(run_id, frame_id, input, 5)
413            .await
414            .map(|_| true)
415    }
416
417    async fn cancel_node(
418        &self,
419        run_id: &RunId,
420        frame_id: &FrameId,
421        node_id: &FlowNodeId,
422    ) -> Result<bool, MobError> {
423        let input = KernelInput {
424            variant: "CancelNode".into(),
425            fields: BTreeMap::from([("node_id".into(), Self::node_val(node_id))]),
426        };
427        self.transition_frame(run_id, frame_id, input, 5)
428            .await
429            .map(|_| true)
430    }
431}
432
433// ─── Helpers (moved from flow_frame_engine.rs) ─────────────────────────────
434
435fn build_frame_start_fields(
436    frame_id: &FrameId,
437    spec: &FrameSpec,
438    ordered: &[FlowNodeId],
439) -> BTreeMap<String, KernelValue> {
440    let ordered_kv: Vec<KernelValue> = ordered
441        .iter()
442        .map(|n| KernelValue::String(n.to_string()))
443        .collect();
444
445    let tracked: BTreeSet<KernelValue> = ordered
446        .iter()
447        .map(|n| KernelValue::String(n.to_string()))
448        .collect();
449
450    let mut node_kind: BTreeMap<KernelValue, KernelValue> = BTreeMap::new();
451    let mut node_deps: BTreeMap<KernelValue, KernelValue> = BTreeMap::new();
452    let mut node_dep_modes: BTreeMap<KernelValue, KernelValue> = BTreeMap::new();
453    let mut node_branches: BTreeMap<KernelValue, KernelValue> = BTreeMap::new();
454
455    for (node_id, node_spec) in &spec.nodes {
456        let k = KernelValue::String(node_id.to_string());
457        match node_spec {
458            FlowNodeSpec::Step(s) => {
459                node_kind.insert(
460                    k.clone(),
461                    KernelValue::NamedVariant {
462                        enum_name: "FlowNodeKind".into(),
463                        variant: "Step".into(),
464                    },
465                );
466                node_deps.insert(
467                    k.clone(),
468                    KernelValue::Seq(
469                        s.depends_on
470                            .iter()
471                            .map(|d| KernelValue::String(d.to_string()))
472                            .collect(),
473                    ),
474                );
475                node_dep_modes.insert(k.clone(), dep_mode_kv(&s.depends_on_mode));
476                node_branches.insert(
477                    k.clone(),
478                    s.branch
479                        .as_ref()
480                        .map_or(KernelValue::None, |b| KernelValue::String(b.to_string())),
481                );
482            }
483            FlowNodeSpec::RepeatUntil(l) => {
484                node_kind.insert(
485                    k.clone(),
486                    KernelValue::NamedVariant {
487                        enum_name: "FlowNodeKind".into(),
488                        variant: "Loop".into(),
489                    },
490                );
491                node_deps.insert(
492                    k.clone(),
493                    KernelValue::Seq(
494                        l.depends_on
495                            .iter()
496                            .map(|d| KernelValue::String(d.to_string()))
497                            .collect(),
498                    ),
499                );
500                node_dep_modes.insert(k.clone(), dep_mode_kv(&l.depends_on_mode));
501                node_branches.insert(k.clone(), KernelValue::None);
502            }
503        }
504    }
505
506    BTreeMap::from([
507        ("frame_id".into(), KernelValue::String(frame_id.to_string())),
508        ("tracked_nodes".into(), KernelValue::Set(tracked)),
509        ("ordered_nodes".into(), KernelValue::Seq(ordered_kv)),
510        ("node_kind".into(), KernelValue::Map(node_kind)),
511        ("node_dependencies".into(), KernelValue::Map(node_deps)),
512        (
513            "node_dependency_modes".into(),
514            KernelValue::Map(node_dep_modes),
515        ),
516        ("node_branches".into(), KernelValue::Map(node_branches)),
517    ])
518}
519
520/// Build the `StartRootFrame` KernelInput from a `FrameSpec` and its topological order.
521pub(crate) fn build_start_root_frame_input(
522    frame_id: &FrameId,
523    spec: &FrameSpec,
524    ordered: &[FlowNodeId],
525) -> KernelInput {
526    KernelInput {
527        variant: "StartRootFrame".into(),
528        fields: build_frame_start_fields(frame_id, spec, ordered),
529    }
530}
531
532/// Build the `StartBodyFrame` KernelInput from a `FrameSpec` and its topological order.
533pub(crate) fn build_start_body_frame_input(
534    frame_id: &FrameId,
535    loop_instance_id: &LoopInstanceId,
536    iteration: u64,
537    spec: &FrameSpec,
538    ordered: &[FlowNodeId],
539) -> KernelInput {
540    let mut fields = build_frame_start_fields(frame_id, spec, ordered);
541    fields.insert(
542        "loop_instance_id".into(),
543        KernelValue::String(loop_instance_id.to_string()),
544    );
545    fields.insert("iteration".into(), KernelValue::U64(iteration));
546    KernelInput {
547        variant: "StartBodyFrame".into(),
548        fields,
549    }
550}
551
552fn dep_mode_kv(mode: &DependencyMode) -> KernelValue {
553    let variant = match mode {
554        DependencyMode::All => "All",
555        DependencyMode::Any => "Any",
556    };
557    KernelValue::NamedVariant {
558        enum_name: "DependencyMode".into(),
559        variant: variant.into(),
560    }
561}
562
563/// Topological sort of a `FrameSpec` (Kahn's algorithm).
564pub(crate) fn topological_order(spec: &FrameSpec) -> Result<Vec<FlowNodeId>, MobError> {
565    let mut in_degree: BTreeMap<FlowNodeId, usize> = BTreeMap::new();
566    let mut outgoing: BTreeMap<FlowNodeId, Vec<FlowNodeId>> = BTreeMap::new();
567
568    for node_id in spec.nodes.keys() {
569        in_degree.insert(node_id.clone(), 0);
570        outgoing.entry(node_id.clone()).or_default();
571    }
572
573    for (node_id, node_spec) in &spec.nodes {
574        let deps = match node_spec {
575            FlowNodeSpec::Step(s) => s.depends_on.clone(),
576            FlowNodeSpec::RepeatUntil(l) => l.depends_on.clone(),
577        };
578        for dep in deps {
579            if !in_degree.contains_key(&dep) {
580                return Err(MobError::Internal(format!(
581                    "node '{node_id}' depends on unknown node '{dep}'"
582                )));
583            }
584            *in_degree.entry(node_id.clone()).or_insert(0) += 1;
585            outgoing
586                .entry(dep.clone())
587                .or_default()
588                .push(node_id.clone());
589        }
590    }
591
592    let mut queue = VecDeque::new();
593    for node_id in spec.nodes.keys() {
594        if in_degree.get(node_id) == Some(&0) {
595            queue.push_back(node_id.clone());
596        }
597    }
598
599    let mut ordered = Vec::with_capacity(spec.nodes.len());
600    while let Some(next) = queue.pop_front() {
601        ordered.push(next.clone());
602        if let Some(children) = outgoing.get(&next) {
603            for child in children {
604                if let Some(count) = in_degree.get_mut(child)
605                    && *count > 0
606                {
607                    *count -= 1;
608                    if *count == 0 {
609                        queue.push_back(child.clone());
610                    }
611                }
612            }
613        }
614    }
615
616    if ordered.len() != spec.nodes.len() {
617        return Err(MobError::Internal(
618            "frame contains a cycle; cannot compute topological order".to_string(),
619        ));
620    }
621
622    Ok(ordered)
623}