1use crate::definition::{DependencyMode, FlowNodeSpec, FrameSpec};
7use crate::error::MobError;
8use crate::ids::{FlowNodeId, FrameId, LoopId, LoopInstanceId, RunId, StepId};
9use crate::run::FrameSnapshot;
10use crate::store::MobRunStore;
11use meerkat_machine_kernels::generated::flow_frame;
12use meerkat_machine_kernels::{KernelEffect, KernelInput, KernelValue};
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::sync::Arc;
15
16mod sealed {
17 pub trait Sealed {}
18}
19
20pub struct StepCompletionOpts<'a> {
24 pub node_id: &'a FlowNodeId,
26 pub step_id: &'a StepId,
28 pub output: serde_json::Value,
30 pub loop_context: Option<(&'a LoopId, u64)>,
34 pub max_retries: usize,
36}
37
38#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
45#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
46pub trait FlowFrameMutator: sealed::Sealed {
47 async fn start_frame(
51 &self,
52 run_id: &RunId,
53 frame_id: &FrameId,
54 spec: &FrameSpec,
55 ) -> Result<FrameSnapshot, MobError>;
56
57 async fn admit_next_ready_node(
60 &self,
61 run_id: &RunId,
62 frame_id: &FrameId,
63 ) -> Result<Option<Vec<KernelEffect>>, MobError>;
64
65 async fn admit_next_ready_node_with_retry(
70 &self,
71 run_id: &RunId,
72 frame_id: &FrameId,
73 max_retries: usize,
74 ) -> Result<Option<Vec<KernelEffect>>, MobError>;
75
76 async fn complete_step(
78 &self,
79 run_id: &RunId,
80 frame_id: &FrameId,
81 opts: StepCompletionOpts<'_>,
82 ) -> Result<(), MobError>;
83
84 async fn complete_node(
86 &self,
87 run_id: &RunId,
88 frame_id: &FrameId,
89 node_id: &FlowNodeId,
90 ) -> Result<bool, MobError>;
91
92 async fn fail_node(
94 &self,
95 run_id: &RunId,
96 frame_id: &FrameId,
97 node_id: &FlowNodeId,
98 ) -> Result<bool, MobError>;
99
100 async fn skip_node(
102 &self,
103 run_id: &RunId,
104 frame_id: &FrameId,
105 node_id: &FlowNodeId,
106 ) -> Result<bool, MobError>;
107
108 async fn cancel_node(
110 &self,
111 run_id: &RunId,
112 frame_id: &FrameId,
113 node_id: &FlowNodeId,
114 ) -> Result<bool, MobError>;
115
116 async fn terminalize_frame(&self, run_id: &RunId, frame_id: &FrameId)
118 -> Result<bool, MobError>;
119}
120
121pub struct FlowFrameKernel {
125 run_store: Arc<dyn MobRunStore>,
126}
127
128impl FlowFrameKernel {
129 pub fn new(run_store: Arc<dyn MobRunStore>) -> Self {
130 Self { run_store }
131 }
132
133 fn node_val(node_id: &FlowNodeId) -> KernelValue {
134 KernelValue::String(node_id.to_string())
135 }
136
137 async fn require_frame(
139 &self,
140 run_id: &RunId,
141 frame_id: &FrameId,
142 ) -> Result<FrameSnapshot, MobError> {
143 let run = self
144 .run_store
145 .get_run(run_id)
146 .await?
147 .ok_or_else(|| MobError::RunNotFound(run_id.clone()))?;
148 run.frames.get(frame_id).cloned().ok_or_else(|| {
149 MobError::Internal(format!("frame '{frame_id}' not found in run '{run_id}'"))
150 })
151 }
152
153 async fn transition_frame(
160 &self,
161 run_id: &RunId,
162 frame_id: &FrameId,
163 input: KernelInput,
164 max_retries: usize,
165 ) -> Result<Vec<KernelEffect>, MobError> {
166 for _ in 0..=max_retries {
167 let current = self.require_frame(run_id, frame_id).await?;
168 let outcome = flow_frame::transition(¤t.kernel_state, &input)
169 .map_err(|e| MobError::Internal(format!("flow_frame transition failed: {e:?}")))?;
170 let next = FrameSnapshot {
171 kernel_state: outcome.next_state,
172 };
173 let effects = outcome.effects.clone();
174 let won = self
175 .run_store
176 .cas_frame_state(run_id, frame_id, Some(¤t), next)
177 .await?;
178 if won {
179 return Ok(effects);
180 }
181 }
182 Err(MobError::Internal(format!(
183 "transition_frame: CAS exhausted {max_retries} retries for frame '{frame_id}'"
184 )))
185 }
186}
187
188impl sealed::Sealed for FlowFrameKernel {}
189
190#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
191#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
192impl FlowFrameMutator for FlowFrameKernel {
193 async fn start_frame(
194 &self,
195 run_id: &RunId,
196 frame_id: &FrameId,
197 spec: &FrameSpec,
198 ) -> Result<FrameSnapshot, MobError> {
199 let run = self
202 .run_store
203 .get_run(run_id)
204 .await?
205 .ok_or_else(|| MobError::RunNotFound(run_id.clone()))?;
206 if let Some(existing) = run.frames.get(frame_id) {
207 return Ok(existing.clone());
208 }
209
210 let initial = flow_frame::initial_state()
211 .map_err(|e| MobError::Internal(format!("flow_frame initial_state failed: {e:?}")))?;
212 let ordered = topological_order(spec)?;
213 let start_input = build_start_root_frame_input(frame_id, spec, &ordered);
214 let outcome = flow_frame::transition(&initial, &start_input)
215 .map_err(|e| MobError::Internal(format!("flow_frame StartRootFrame failed: {e:?}")))?;
216 let snapshot = FrameSnapshot {
217 kernel_state: outcome.next_state,
218 };
219 let inserted = self
221 .run_store
222 .cas_frame_state(run_id, frame_id, None, snapshot.clone())
223 .await?;
224 if !inserted {
225 let run2 = self
228 .run_store
229 .get_run(run_id)
230 .await?
231 .ok_or_else(|| MobError::RunNotFound(run_id.clone()))?;
232 return run2.frames.get(frame_id).cloned().ok_or_else(|| {
233 MobError::Internal(format!(
234 "frame '{frame_id}' missing after concurrent insert in run '{run_id}'"
235 ))
236 });
237 }
238 Ok(snapshot)
239 }
240
241 async fn admit_next_ready_node(
242 &self,
243 run_id: &RunId,
244 frame_id: &FrameId,
245 ) -> Result<Option<Vec<KernelEffect>>, MobError> {
246 let input = KernelInput {
247 variant: "AdmitNextReadyNode".into(),
248 fields: BTreeMap::new(),
249 };
250 self.transition_frame(run_id, frame_id, input, 5)
252 .await
253 .map(Some)
254 }
255
256 async fn admit_next_ready_node_with_retry(
257 &self,
258 run_id: &RunId,
259 frame_id: &FrameId,
260 max_retries: usize,
261 ) -> Result<Option<Vec<KernelEffect>>, MobError> {
262 for _ in 0..=max_retries {
263 let snap = self.require_frame(run_id, frame_id).await?;
264 let queue_empty = match snap.kernel_state.fields.get("ready_queue") {
265 Some(KernelValue::Seq(seq)) => seq.is_empty(),
266 _ => true,
267 };
268 if queue_empty {
269 return Ok(None); }
271
272 let admit_input = KernelInput {
273 variant: "AdmitNextReadyNode".into(),
274 fields: BTreeMap::new(),
275 };
276 let outcome = flow_frame::transition(&snap.kernel_state, &admit_input)
277 .map_err(|e| MobError::Internal(format!("AdmitNextReadyNode failed: {e:?}")))?;
278 let next_snap = FrameSnapshot {
279 kernel_state: outcome.next_state,
280 };
281
282 let won = self
288 .run_store
289 .cas_frame_state(run_id, frame_id, Some(&snap), next_snap)
290 .await?;
291 if won {
292 return Ok(Some(outcome.effects));
293 }
294 }
296
297 Err(MobError::Internal(format!(
301 "admit_next_ready_node: CAS exhausted {max_retries} retries for frame '{frame_id}' \
302 — queue was non-empty but every attempt lost the CAS"
303 )))
304 }
305
306 async fn complete_step(
307 &self,
308 run_id: &RunId,
309 frame_id: &FrameId,
310 opts: StepCompletionOpts<'_>,
311 ) -> Result<(), MobError> {
312 let StepCompletionOpts {
313 node_id,
314 step_id,
315 output,
316 loop_context,
317 max_retries,
318 } = opts;
319 for attempt in 0..=max_retries {
320 let snap = self.require_frame(run_id, frame_id).await?;
321 let complete_input = KernelInput {
322 variant: "CompleteNode".into(),
323 fields: BTreeMap::from([("node_id".into(), Self::node_val(node_id))]),
324 };
325 let next_outcome = flow_frame::transition(&snap.kernel_state, &complete_input)
326 .map_err(|e| MobError::Internal(format!("CompleteNode failed: {e:?}")))?;
327 let next_snap = FrameSnapshot {
328 kernel_state: next_outcome.next_state,
329 };
330 let won = self
331 .run_store
332 .cas_complete_step_and_record_output(
333 run_id,
334 frame_id,
335 &snap,
336 next_snap,
337 step_id.to_string(),
338 output.clone(),
339 loop_context,
340 )
341 .await?;
342 if won {
343 return Ok(());
344 }
345 if attempt == max_retries {
346 return Err(MobError::Internal(format!(
347 "CompleteNode CAS failed after {} attempts for node '{node_id}'",
348 max_retries + 1
349 )));
350 }
351 }
352 Err(MobError::Internal("CompleteNode CAS exhausted".into()))
354 }
355
356 async fn complete_node(
357 &self,
358 run_id: &RunId,
359 frame_id: &FrameId,
360 node_id: &FlowNodeId,
361 ) -> Result<bool, MobError> {
362 let input = KernelInput {
363 variant: "CompleteNode".into(),
364 fields: BTreeMap::from([("node_id".into(), Self::node_val(node_id))]),
365 };
366 self.transition_frame(run_id, frame_id, input, 5)
369 .await
370 .map(|_| true)
371 }
372
373 async fn fail_node(
374 &self,
375 run_id: &RunId,
376 frame_id: &FrameId,
377 node_id: &FlowNodeId,
378 ) -> Result<bool, MobError> {
379 let input = KernelInput {
380 variant: "FailNode".into(),
381 fields: BTreeMap::from([("node_id".into(), Self::node_val(node_id))]),
382 };
383 self.transition_frame(run_id, frame_id, input, 5)
384 .await
385 .map(|_| true)
386 }
387
388 async fn skip_node(
389 &self,
390 run_id: &RunId,
391 frame_id: &FrameId,
392 node_id: &FlowNodeId,
393 ) -> Result<bool, MobError> {
394 let input = KernelInput {
395 variant: "SkipNode".into(),
396 fields: BTreeMap::from([("node_id".into(), Self::node_val(node_id))]),
397 };
398 self.transition_frame(run_id, frame_id, input, 5)
399 .await
400 .map(|_| true)
401 }
402
403 async fn terminalize_frame(
404 &self,
405 run_id: &RunId,
406 frame_id: &FrameId,
407 ) -> Result<bool, MobError> {
408 let input = KernelInput {
409 variant: "SealFrame".into(),
410 fields: BTreeMap::new(),
411 };
412 self.transition_frame(run_id, frame_id, input, 5)
413 .await
414 .map(|_| true)
415 }
416
417 async fn cancel_node(
418 &self,
419 run_id: &RunId,
420 frame_id: &FrameId,
421 node_id: &FlowNodeId,
422 ) -> Result<bool, MobError> {
423 let input = KernelInput {
424 variant: "CancelNode".into(),
425 fields: BTreeMap::from([("node_id".into(), Self::node_val(node_id))]),
426 };
427 self.transition_frame(run_id, frame_id, input, 5)
428 .await
429 .map(|_| true)
430 }
431}
432
433fn build_frame_start_fields(
436 frame_id: &FrameId,
437 spec: &FrameSpec,
438 ordered: &[FlowNodeId],
439) -> BTreeMap<String, KernelValue> {
440 let ordered_kv: Vec<KernelValue> = ordered
441 .iter()
442 .map(|n| KernelValue::String(n.to_string()))
443 .collect();
444
445 let tracked: BTreeSet<KernelValue> = ordered
446 .iter()
447 .map(|n| KernelValue::String(n.to_string()))
448 .collect();
449
450 let mut node_kind: BTreeMap<KernelValue, KernelValue> = BTreeMap::new();
451 let mut node_deps: BTreeMap<KernelValue, KernelValue> = BTreeMap::new();
452 let mut node_dep_modes: BTreeMap<KernelValue, KernelValue> = BTreeMap::new();
453 let mut node_branches: BTreeMap<KernelValue, KernelValue> = BTreeMap::new();
454
455 for (node_id, node_spec) in &spec.nodes {
456 let k = KernelValue::String(node_id.to_string());
457 match node_spec {
458 FlowNodeSpec::Step(s) => {
459 node_kind.insert(
460 k.clone(),
461 KernelValue::NamedVariant {
462 enum_name: "FlowNodeKind".into(),
463 variant: "Step".into(),
464 },
465 );
466 node_deps.insert(
467 k.clone(),
468 KernelValue::Seq(
469 s.depends_on
470 .iter()
471 .map(|d| KernelValue::String(d.to_string()))
472 .collect(),
473 ),
474 );
475 node_dep_modes.insert(k.clone(), dep_mode_kv(&s.depends_on_mode));
476 node_branches.insert(
477 k.clone(),
478 s.branch
479 .as_ref()
480 .map_or(KernelValue::None, |b| KernelValue::String(b.to_string())),
481 );
482 }
483 FlowNodeSpec::RepeatUntil(l) => {
484 node_kind.insert(
485 k.clone(),
486 KernelValue::NamedVariant {
487 enum_name: "FlowNodeKind".into(),
488 variant: "Loop".into(),
489 },
490 );
491 node_deps.insert(
492 k.clone(),
493 KernelValue::Seq(
494 l.depends_on
495 .iter()
496 .map(|d| KernelValue::String(d.to_string()))
497 .collect(),
498 ),
499 );
500 node_dep_modes.insert(k.clone(), dep_mode_kv(&l.depends_on_mode));
501 node_branches.insert(k.clone(), KernelValue::None);
502 }
503 }
504 }
505
506 BTreeMap::from([
507 ("frame_id".into(), KernelValue::String(frame_id.to_string())),
508 ("tracked_nodes".into(), KernelValue::Set(tracked)),
509 ("ordered_nodes".into(), KernelValue::Seq(ordered_kv)),
510 ("node_kind".into(), KernelValue::Map(node_kind)),
511 ("node_dependencies".into(), KernelValue::Map(node_deps)),
512 (
513 "node_dependency_modes".into(),
514 KernelValue::Map(node_dep_modes),
515 ),
516 ("node_branches".into(), KernelValue::Map(node_branches)),
517 ])
518}
519
520pub(crate) fn build_start_root_frame_input(
522 frame_id: &FrameId,
523 spec: &FrameSpec,
524 ordered: &[FlowNodeId],
525) -> KernelInput {
526 KernelInput {
527 variant: "StartRootFrame".into(),
528 fields: build_frame_start_fields(frame_id, spec, ordered),
529 }
530}
531
532pub(crate) fn build_start_body_frame_input(
534 frame_id: &FrameId,
535 loop_instance_id: &LoopInstanceId,
536 iteration: u64,
537 spec: &FrameSpec,
538 ordered: &[FlowNodeId],
539) -> KernelInput {
540 let mut fields = build_frame_start_fields(frame_id, spec, ordered);
541 fields.insert(
542 "loop_instance_id".into(),
543 KernelValue::String(loop_instance_id.to_string()),
544 );
545 fields.insert("iteration".into(), KernelValue::U64(iteration));
546 KernelInput {
547 variant: "StartBodyFrame".into(),
548 fields,
549 }
550}
551
552fn dep_mode_kv(mode: &DependencyMode) -> KernelValue {
553 let variant = match mode {
554 DependencyMode::All => "All",
555 DependencyMode::Any => "Any",
556 };
557 KernelValue::NamedVariant {
558 enum_name: "DependencyMode".into(),
559 variant: variant.into(),
560 }
561}
562
563pub(crate) fn topological_order(spec: &FrameSpec) -> Result<Vec<FlowNodeId>, MobError> {
565 let mut in_degree: BTreeMap<FlowNodeId, usize> = BTreeMap::new();
566 let mut outgoing: BTreeMap<FlowNodeId, Vec<FlowNodeId>> = BTreeMap::new();
567
568 for node_id in spec.nodes.keys() {
569 in_degree.insert(node_id.clone(), 0);
570 outgoing.entry(node_id.clone()).or_default();
571 }
572
573 for (node_id, node_spec) in &spec.nodes {
574 let deps = match node_spec {
575 FlowNodeSpec::Step(s) => s.depends_on.clone(),
576 FlowNodeSpec::RepeatUntil(l) => l.depends_on.clone(),
577 };
578 for dep in deps {
579 if !in_degree.contains_key(&dep) {
580 return Err(MobError::Internal(format!(
581 "node '{node_id}' depends on unknown node '{dep}'"
582 )));
583 }
584 *in_degree.entry(node_id.clone()).or_insert(0) += 1;
585 outgoing
586 .entry(dep.clone())
587 .or_default()
588 .push(node_id.clone());
589 }
590 }
591
592 let mut queue = VecDeque::new();
593 for node_id in spec.nodes.keys() {
594 if in_degree.get(node_id) == Some(&0) {
595 queue.push_back(node_id.clone());
596 }
597 }
598
599 let mut ordered = Vec::with_capacity(spec.nodes.len());
600 while let Some(next) = queue.pop_front() {
601 ordered.push(next.clone());
602 if let Some(children) = outgoing.get(&next) {
603 for child in children {
604 if let Some(count) = in_degree.get_mut(child)
605 && *count > 0
606 {
607 *count -= 1;
608 if *count == 0 {
609 queue.push_back(child.clone());
610 }
611 }
612 }
613 }
614 }
615
616 if ordered.len() != spec.nodes.len() {
617 return Err(MobError::Internal(
618 "frame contains a cycle; cannot compute topological order".to_string(),
619 ));
620 }
621
622 Ok(ordered)
623}