1use crate::definition::{FlowSpec, LimitsSpec, SupervisorSpec, TopologySpec};
4use crate::error::MobError;
5use crate::ids::{
6 FlowId, FrameId, LoopId, LoopInstanceId, MeerkatId, MobId, ProfileName, RunId, StepId,
7};
8use chrono::{DateTime, Utc};
9use indexmap::IndexMap;
10use meerkat_machine_kernels::generated::flow_run;
11use meerkat_machine_kernels::{KernelInput, KernelState, KernelValue};
12use serde::{Deserialize, Serialize};
13use std::collections::{BTreeMap, VecDeque};
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
17pub struct FrameSnapshot {
18 pub kernel_state: KernelState,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
23pub struct LoopSnapshot {
24 pub kernel_state: KernelState,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct LoopIterationLedgerEntry {
30 pub loop_instance_id: LoopInstanceId,
31 pub iteration: u64,
32 pub frame_id: FrameId,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct MobRun {
38 pub run_id: RunId,
39 pub mob_id: MobId,
40 pub flow_id: FlowId,
41 pub status: MobRunStatus,
42 pub flow_state: KernelState,
43 pub activation_params: serde_json::Value,
44 pub created_at: DateTime<Utc>,
45 pub completed_at: Option<DateTime<Utc>>,
46 pub step_ledger: Vec<StepLedgerEntry>,
47 pub failure_ledger: Vec<FailureLedgerEntry>,
48 #[serde(default)]
50 pub frames: BTreeMap<FrameId, FrameSnapshot>,
51 #[serde(default)]
53 pub loops: BTreeMap<LoopInstanceId, LoopSnapshot>,
54 #[serde(default)]
56 pub loop_iteration_ledger: Vec<LoopIterationLedgerEntry>,
57 #[serde(default)]
59 pub schema_version: u32,
60 #[serde(default)]
62 pub root_step_outputs: IndexMap<StepId, serde_json::Value>,
63 #[serde(default)]
68 pub loop_iteration_outputs: BTreeMap<LoopId, Vec<IndexMap<StepId, serde_json::Value>>>,
69}
70
71impl MobRun {
72 pub fn status(&self) -> &MobRunStatus {
74 &self.status
75 }
76
77 pub fn flow_state(&self) -> &KernelState {
79 &self.flow_state
80 }
81}
82
83impl MobRun {
84 pub fn pending(
85 mob_id: MobId,
86 flow_id: FlowId,
87 flow_state: KernelState,
88 activation_params: serde_json::Value,
89 ) -> Self {
90 Self {
91 run_id: RunId::new(),
92 mob_id,
93 flow_id,
94 status: MobRunStatus::Pending,
95 flow_state,
96 activation_params,
97 created_at: Utc::now(),
98 completed_at: None,
99 step_ledger: Vec::new(),
100 failure_ledger: Vec::new(),
101 frames: BTreeMap::new(),
102 loops: BTreeMap::new(),
103 loop_iteration_ledger: Vec::new(),
104 schema_version: 4,
105 root_step_outputs: IndexMap::new(),
106 loop_iteration_outputs: BTreeMap::new(),
107 }
108 }
109
110 pub fn flow_state_for_config(config: &FlowRunConfig) -> Result<KernelState, MobError> {
111 let initial = flow_run::initial_state().map_err(|error| {
112 MobError::Internal(format!("flow_run initial_state failed: {error}"))
113 })?;
114 let ordered_steps = topological_steps(&config.flow_spec)?;
115 let step_ids = config
116 .flow_spec
117 .steps
118 .keys()
119 .map(|step_id| KernelValue::String(step_id.to_string()))
120 .collect();
121 let ordered_steps = ordered_steps
122 .into_iter()
123 .map(|step_id| KernelValue::String(step_id.to_string()))
124 .collect();
125 let step_dependencies = config
126 .flow_spec
127 .steps
128 .iter()
129 .map(|(step_id, step)| {
130 (
131 KernelValue::String(step_id.to_string()),
132 KernelValue::Seq(
133 step.depends_on
134 .iter()
135 .map(|dependency| KernelValue::String(dependency.to_string()))
136 .collect(),
137 ),
138 )
139 })
140 .collect();
141 let step_dependency_modes = config
142 .flow_spec
143 .steps
144 .iter()
145 .map(|(step_id, step)| {
146 (
147 KernelValue::String(step_id.to_string()),
148 dependency_mode_value(step.depends_on_mode.clone()),
149 )
150 })
151 .collect();
152 let step_has_conditions = config
153 .flow_spec
154 .steps
155 .iter()
156 .map(|(step_id, step)| {
157 (
158 KernelValue::String(step_id.to_string()),
159 KernelValue::Bool(step.condition.is_some()),
160 )
161 })
162 .collect();
163 let step_branches = config
164 .flow_spec
165 .steps
166 .iter()
167 .map(|(step_id, step)| {
168 (
169 KernelValue::String(step_id.to_string()),
170 step.branch.as_ref().map_or(KernelValue::None, |branch| {
171 KernelValue::String(branch.to_string())
172 }),
173 )
174 })
175 .collect();
176 let step_collection_policies = config
177 .flow_spec
178 .steps
179 .iter()
180 .map(|(step_id, step)| {
181 (
182 KernelValue::String(step_id.to_string()),
183 collection_policy_kind_value(&step.collection_policy),
184 )
185 })
186 .collect();
187 let step_quorum_thresholds = config
188 .flow_spec
189 .steps
190 .iter()
191 .map(|(step_id, step)| {
192 let threshold = match step.collection_policy {
193 crate::definition::CollectionPolicy::Quorum { n } => u64::from(n),
194 _ => 0,
195 };
196 (
197 KernelValue::String(step_id.to_string()),
198 KernelValue::U64(threshold),
199 )
200 })
201 .collect();
202 let escalation_threshold = config
203 .supervisor
204 .as_ref()
205 .map_or(0, |supervisor| u64::from(supervisor.escalation_threshold));
206 let max_step_retries = config
207 .limits
208 .as_ref()
209 .and_then(|limits| limits.max_step_retries)
210 .map_or(0, u64::from);
211 let input = KernelInput {
212 variant: "CreateRun".to_string(),
213 fields: BTreeMap::from([
214 ("step_ids".to_string(), KernelValue::Seq(step_ids)),
215 ("ordered_steps".to_string(), KernelValue::Seq(ordered_steps)),
216 (
217 "step_dependencies".to_string(),
218 KernelValue::Map(step_dependencies),
219 ),
220 (
221 "step_dependency_modes".to_string(),
222 KernelValue::Map(step_dependency_modes),
223 ),
224 (
225 "step_has_conditions".to_string(),
226 KernelValue::Map(step_has_conditions),
227 ),
228 ("step_branches".to_string(), KernelValue::Map(step_branches)),
229 (
230 "step_collection_policies".to_string(),
231 KernelValue::Map(step_collection_policies),
232 ),
233 (
234 "step_quorum_thresholds".to_string(),
235 KernelValue::Map(step_quorum_thresholds),
236 ),
237 (
238 "escalation_threshold".to_string(),
239 KernelValue::U64(escalation_threshold),
240 ),
241 (
242 "max_step_retries".to_string(),
243 KernelValue::U64(max_step_retries),
244 ),
245 (
247 "max_active_nodes".to_string(),
248 KernelValue::U64(
249 config
250 .limits
251 .as_ref()
252 .and_then(|l| l.max_active_nodes)
253 .unwrap_or(0),
254 ),
255 ),
256 (
257 "max_active_frames".to_string(),
258 KernelValue::U64(
259 config
260 .limits
261 .as_ref()
262 .and_then(|l| l.max_active_frames)
263 .unwrap_or(0),
264 ),
265 ),
266 (
267 "max_frame_depth".to_string(),
268 KernelValue::U64(
269 config
270 .limits
271 .as_ref()
272 .and_then(|l| l.max_frame_depth)
273 .unwrap_or(0),
274 ),
275 ),
276 ]),
277 };
278 let outcome = flow_run::transition(&initial, &input)
279 .map_err(|error| MobError::Internal(format!("flow_run CreateRun failed: {error}")))?;
280 Ok(outcome.next_state)
281 }
282
283 pub fn flow_state_for_steps<I>(step_ids: I) -> Result<KernelState, MobError>
284 where
285 I: IntoIterator<Item = StepId>,
286 {
287 let mut steps = IndexMap::new();
288 for step_id in step_ids {
289 steps.insert(
290 step_id,
291 crate::definition::FlowStepSpec {
292 role: ProfileName::from("worker"),
293 message: meerkat_core::types::ContentInput::from("placeholder"),
294 depends_on: Vec::new(),
295 dispatch_mode: crate::definition::DispatchMode::FanOut,
296 collection_policy: crate::definition::CollectionPolicy::All,
297 condition: None,
298 timeout_ms: None,
299 expected_schema_ref: None,
300 branch: None,
301 depends_on_mode: crate::definition::DependencyMode::All,
302 allowed_tools: None,
303 blocked_tools: None,
304 output_format: crate::definition::StepOutputFormat::Json,
305 },
306 );
307 }
308 Self::flow_state_for_config(&FlowRunConfig {
309 flow_id: FlowId::from("placeholder"),
310 flow_spec: FlowSpec {
311 description: None,
312 steps,
313 root: None,
314 },
315 topology: None,
316 supervisor: None,
317 limits: None,
318 orchestrator_role: None,
319 })
320 }
321}
322
323fn dependency_mode_value(mode: crate::definition::DependencyMode) -> KernelValue {
324 let variant = match mode {
325 crate::definition::DependencyMode::All => "All",
326 crate::definition::DependencyMode::Any => "Any",
327 };
328 KernelValue::NamedVariant {
329 enum_name: "DependencyMode".to_string(),
330 variant: variant.to_string(),
331 }
332}
333
334fn collection_policy_kind_value(policy: &crate::definition::CollectionPolicy) -> KernelValue {
335 let variant = match policy {
336 crate::definition::CollectionPolicy::All => "All",
337 crate::definition::CollectionPolicy::Any => "Any",
338 crate::definition::CollectionPolicy::Quorum { .. } => "Quorum",
339 };
340 KernelValue::NamedVariant {
341 enum_name: "CollectionPolicyKind".to_string(),
342 variant: variant.to_string(),
343 }
344}
345
346fn topological_steps(flow_spec: &FlowSpec) -> Result<Vec<StepId>, MobError> {
347 let mut in_degree: BTreeMap<StepId, usize> = BTreeMap::new();
348 let mut outgoing: BTreeMap<StepId, Vec<StepId>> = BTreeMap::new();
349
350 for step_id in flow_spec.steps.keys() {
351 in_degree.insert(step_id.clone(), 0);
352 outgoing.entry(step_id.clone()).or_default();
353 }
354
355 for (step_id, step) in &flow_spec.steps {
356 for dependency in &step.depends_on {
357 if !in_degree.contains_key(dependency) {
358 return Err(MobError::Internal(format!(
359 "step '{step_id}' depends on unknown step '{dependency}'"
360 )));
361 }
362 *in_degree.entry(step_id.clone()).or_insert(0) += 1;
363 outgoing
364 .entry(dependency.clone())
365 .or_default()
366 .push(step_id.clone());
367 }
368 }
369
370 let mut queue = VecDeque::new();
371 for step_id in flow_spec.steps.keys() {
372 if in_degree.get(step_id) == Some(&0) {
373 queue.push_back(step_id.clone());
374 }
375 }
376
377 let mut ordered = Vec::with_capacity(flow_spec.steps.len());
378 while let Some(next) = queue.pop_front() {
379 ordered.push(next.clone());
380 if let Some(children) = outgoing.get(&next) {
381 for child in children {
382 if let Some(count) = in_degree.get_mut(child)
383 && *count > 0
384 {
385 *count -= 1;
386 if *count == 0 {
387 queue.push_back(child.clone());
388 }
389 }
390 }
391 }
392 }
393
394 if ordered.len() != flow_spec.steps.len() {
395 return Err(MobError::Internal(
396 "flow contains a cycle; cannot compute topological order".to_string(),
397 ));
398 }
399
400 Ok(ordered)
401}
402
403#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
405#[serde(rename_all = "snake_case")]
406pub enum MobRunStatus {
407 Pending,
408 Running,
409 Completed,
410 Failed,
411 Canceled,
412}
413
414impl MobRunStatus {
415 pub fn is_terminal(&self) -> bool {
416 matches!(self, Self::Completed | Self::Failed | Self::Canceled)
417 }
418}
419
420#[derive(Debug, Clone, Serialize, Deserialize)]
422pub struct StepLedgerEntry {
423 pub step_id: StepId,
424 pub meerkat_id: MeerkatId,
425 pub status: StepRunStatus,
426 pub output: Option<serde_json::Value>,
427 pub timestamp: DateTime<Utc>,
428}
429
430#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
432#[serde(rename_all = "snake_case")]
433pub enum StepRunStatus {
434 Dispatched,
435 Completed,
436 Failed,
437 Skipped,
438 Canceled,
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize)]
443pub struct FailureLedgerEntry {
444 pub step_id: StepId,
445 pub reason: String,
446 pub timestamp: DateTime<Utc>,
447}
448
449#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
451pub struct FlowRunConfig {
452 pub flow_id: FlowId,
453 pub flow_spec: FlowSpec,
454 pub topology: Option<TopologySpec>,
455 pub supervisor: Option<SupervisorSpec>,
456 pub limits: Option<LimitsSpec>,
457 pub orchestrator_role: Option<ProfileName>,
458}
459
460impl FlowRunConfig {
461 pub fn from_definition(
462 flow_id: FlowId,
463 definition: &crate::definition::MobDefinition,
464 ) -> Result<Self, MobError> {
465 let flow_spec = definition
466 .flows
467 .get(&flow_id)
468 .cloned()
469 .ok_or_else(|| MobError::FlowNotFound(flow_id.clone()))?;
470 let topology = definition.topology.clone();
471 let orchestrator_role = definition
472 .orchestrator
473 .as_ref()
474 .map(|orchestrator| orchestrator.profile.clone());
475 if topology.is_some() && orchestrator_role.is_none() {
476 return Err(MobError::Internal(
477 "topology requires an orchestrator profile".to_string(),
478 ));
479 }
480 Ok(Self {
481 flow_id,
482 flow_spec,
483 topology,
484 supervisor: definition.supervisor.clone(),
485 limits: definition.limits.clone(),
486 orchestrator_role,
487 })
488 }
489}
490
491#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
493pub struct LoopContextHistory {
494 pub iterations: Vec<IndexMap<StepId, serde_json::Value>>,
496}
497
498#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
500pub struct FlowContext {
501 pub run_id: RunId,
502 pub activation_params: serde_json::Value,
503 pub step_outputs: IndexMap<StepId, serde_json::Value>,
505 #[serde(default)]
507 pub loop_outputs: IndexMap<LoopId, LoopContextHistory>,
508}
509
510impl FlowContext {
511 pub fn from_run_aggregate(
513 run: &MobRun,
514 run_id: RunId,
515 activation_params: serde_json::Value,
516 ) -> Self {
517 let loop_outputs = run
518 .loop_iteration_outputs
519 .iter()
520 .map(|(loop_id, iterations)| {
521 let history = LoopContextHistory {
522 iterations: iterations.clone(),
523 };
524 (loop_id.clone(), history)
525 })
526 .collect();
527
528 let mut step_outputs = run.root_step_outputs.clone();
534 for iterations in run.loop_iteration_outputs.values() {
535 if let Some(last_iter) = iterations.last() {
536 for (sid, out) in last_iter {
537 step_outputs.insert(sid.clone(), out.clone());
538 }
539 }
540 }
541
542 FlowContext {
543 run_id,
544 activation_params,
545 step_outputs,
546 loop_outputs,
547 }
548 }
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554 use crate::definition::{
555 BackendConfig, ConditionExpr, DispatchMode, FlowStepSpec, MobDefinition,
556 OrchestratorConfig, WiringRules,
557 };
558 use crate::ids::{BranchId, ProfileName};
559 use crate::profile::{Profile, ProfileBinding, ToolConfig};
560 use meerkat_core::types::ContentInput;
561 use std::collections::BTreeMap;
562
563 fn sample_definition() -> MobDefinition {
564 let mut steps = IndexMap::new();
565 steps.insert(
566 StepId::from("s1"),
567 FlowStepSpec {
568 role: ProfileName::from("worker"),
569 message: ContentInput::from("do it"),
570 depends_on: Vec::new(),
571 dispatch_mode: DispatchMode::FanOut,
572 collection_policy: crate::definition::CollectionPolicy::All,
573 condition: Some(ConditionExpr::Eq {
574 path: "params.ok".to_string(),
575 value: serde_json::json!(true),
576 }),
577 timeout_ms: Some(2000),
578 expected_schema_ref: Some("schema.json".to_string()),
579 branch: Some(BranchId::from("branch-a")),
580 depends_on_mode: crate::definition::DependencyMode::All,
581 allowed_tools: None,
582 blocked_tools: None,
583 output_format: crate::definition::StepOutputFormat::Json,
584 },
585 );
586
587 let mut flows = BTreeMap::new();
588 flows.insert(
589 FlowId::from("flow-a"),
590 FlowSpec {
591 description: Some("demo flow".to_string()),
592 steps,
593 root: None,
594 },
595 );
596
597 let mut profiles = BTreeMap::new();
598 profiles.insert(
599 ProfileName::from("lead"),
600 ProfileBinding::Inline(Profile {
601 model: "model".to_string(),
602 skills: Vec::new(),
603 tools: ToolConfig::default(),
604 peer_description: "lead".to_string(),
605 external_addressable: true,
606 backend: None,
607 runtime_mode: crate::MobRuntimeMode::AutonomousHost,
608 max_inline_peer_notifications: None,
609 output_schema: None,
610 provider_params: None,
611 }),
612 );
613 profiles.insert(
614 ProfileName::from("worker"),
615 ProfileBinding::Inline(Profile {
616 model: "model".to_string(),
617 skills: Vec::new(),
618 tools: ToolConfig::default(),
619 peer_description: "worker".to_string(),
620 external_addressable: false,
621 backend: None,
622 runtime_mode: crate::MobRuntimeMode::AutonomousHost,
623 max_inline_peer_notifications: None,
624 output_schema: None,
625 provider_params: None,
626 }),
627 );
628
629 MobDefinition {
630 id: MobId::from("mob"),
631 orchestrator: Some(OrchestratorConfig {
632 profile: ProfileName::from("lead"),
633 }),
634 profiles,
635 mcp_servers: BTreeMap::new(),
636 wiring: WiringRules::default(),
637 skills: BTreeMap::new(),
638 backend: BackendConfig::default(),
639 flows,
640 topology: Some(TopologySpec {
641 mode: crate::definition::PolicyMode::Advisory,
642 rules: vec![crate::definition::TopologyRule {
643 from_role: ProfileName::from("lead"),
644 to_role: ProfileName::from("worker"),
645 allowed: true,
646 }],
647 }),
648 supervisor: Some(SupervisorSpec {
649 role: ProfileName::from("lead"),
650 escalation_threshold: 3,
651 }),
652 limits: Some(LimitsSpec {
653 max_flow_duration_ms: Some(60_000),
654 max_step_retries: Some(1),
655 max_orphaned_turns: Some(8),
656 cancel_grace_timeout_ms: None,
657 ..Default::default()
658 }),
659 spawn_policy: None,
660 event_router: None,
661 owner_session_id: None,
662 session_cleanup_policy: crate::definition::SessionCleanupPolicy::Manual,
663 is_implicit: false,
664 }
665 }
666
667 #[test]
668 fn test_run_status_terminal() {
669 assert!(MobRunStatus::Completed.is_terminal());
670 assert!(MobRunStatus::Failed.is_terminal());
671 assert!(MobRunStatus::Canceled.is_terminal());
672 assert!(!MobRunStatus::Pending.is_terminal());
673 assert!(!MobRunStatus::Running.is_terminal());
674 }
675
676 #[test]
677 fn test_flow_run_config_from_definition() {
678 let def = sample_definition();
679 let config = FlowRunConfig::from_definition(FlowId::from("flow-a"), &def).unwrap();
680 assert_eq!(config.flow_id, FlowId::from("flow-a"));
681 assert_eq!(config.flow_spec.steps.len(), 1);
682 assert_eq!(
683 config.orchestrator_role.as_ref(),
684 Some(&ProfileName::from("lead"))
685 );
686 }
687
688 #[test]
689 fn test_flow_run_config_from_definition_missing_flow() {
690 let def = sample_definition();
691 let error = FlowRunConfig::from_definition(FlowId::from("missing"), &def).unwrap_err();
692 assert!(matches!(error, MobError::FlowNotFound(name) if name == FlowId::from("missing")));
693 }
694
695 #[test]
696 fn test_flow_run_config_rejects_topology_without_orchestrator() {
697 let mut def = sample_definition();
698 def.orchestrator = None;
699 let error = FlowRunConfig::from_definition(FlowId::from("flow-a"), &def).unwrap_err();
700 assert!(
701 matches!(error, MobError::Internal(message) if message.contains("topology requires")),
702 "expected explicit topology/orchestrator configuration error"
703 );
704 }
705
706 #[test]
707 fn test_mob_run_roundtrip_json() {
708 let now = Utc::now();
709 let run = MobRun {
710 run_id: RunId::new(),
711 mob_id: MobId::from("mob"),
712 flow_id: FlowId::from("flow-a"),
713 status: MobRunStatus::Running,
714 flow_state: MobRun::flow_state_for_steps([StepId::from("step-1")]).unwrap(),
715 activation_params: serde_json::json!({"k":"v"}),
716 created_at: now,
717 completed_at: None,
718 step_ledger: vec![StepLedgerEntry {
719 step_id: StepId::from("step-1"),
720 meerkat_id: MeerkatId::from("agent-1"),
721 status: StepRunStatus::Completed,
722 output: Some(serde_json::json!({"ok":true})),
723 timestamp: now,
724 }],
725 failure_ledger: vec![FailureLedgerEntry {
726 step_id: StepId::from("step-2"),
727 reason: "boom".to_string(),
728 timestamp: now,
729 }],
730 frames: BTreeMap::new(),
731 loops: BTreeMap::new(),
732 loop_iteration_ledger: Vec::new(),
733 schema_version: 4,
734 root_step_outputs: IndexMap::new(),
735 loop_iteration_outputs: BTreeMap::new(),
736 };
737
738 let encoded = serde_json::to_string(&run).unwrap();
739 let decoded: MobRun = serde_json::from_str(&encoded).unwrap();
740 assert_eq!(decoded.flow_id, run.flow_id);
741 assert_eq!(decoded.step_ledger.len(), 1);
742 assert_eq!(decoded.failure_ledger.len(), 1);
743 }
744
745 #[test]
746 fn test_flow_context_roundtrip_json() {
747 let mut outputs = IndexMap::new();
748 outputs.insert(StepId::from("step-1"), serde_json::json!({"a":1}));
749 let context = FlowContext {
750 run_id: RunId::new(),
751 activation_params: serde_json::json!({"input":"x"}),
752 step_outputs: outputs,
753 loop_outputs: IndexMap::new(),
754 };
755
756 let encoded = serde_json::to_string(&context).unwrap();
757 let decoded: FlowContext = serde_json::from_str(&encoded).unwrap();
758 assert_eq!(decoded.step_outputs.len(), 1);
759 assert_eq!(decoded.activation_params["input"], "x");
760 }
761}