use super::flow_system_step_id;
use crate::error::MobError;
use crate::event::{MobEventKind, NewMobEvent};
use crate::ids::{FlowId, MobId, RunId};
use crate::run::{FailureLedgerEntry, MobRunStatus};
use crate::store::{MobEventStore, MobRunStore};
use chrono::Utc;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub(super) enum TerminalizationTarget {
Completed,
Failed { reason: String },
Canceled,
}
impl TerminalizationTarget {
pub(super) fn status(&self) -> MobRunStatus {
match self {
Self::Completed => MobRunStatus::Completed,
Self::Failed { .. } => MobRunStatus::Failed,
Self::Canceled => MobRunStatus::Canceled,
}
}
fn event_name(&self) -> &'static str {
match self {
Self::Completed => "FlowCompleted",
Self::Failed { .. } => "FlowFailed",
Self::Canceled => "FlowCanceled",
}
}
fn into_event_kind(self, run_id: RunId, flow_id: FlowId) -> MobEventKind {
match self {
Self::Completed => MobEventKind::FlowCompleted { run_id, flow_id },
Self::Failed { reason } => MobEventKind::FlowFailed {
run_id,
flow_id,
reason,
},
Self::Canceled => MobEventKind::FlowCanceled { run_id, flow_id },
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TerminalizationOutcome {
Transitioned,
Noop,
}
#[derive(Clone)]
pub(super) struct FlowTerminalizationAuthority {
run_store: Arc<dyn MobRunStore>,
events: Arc<dyn MobEventStore>,
mob_id: MobId,
}
impl FlowTerminalizationAuthority {
pub(super) fn new(
run_store: Arc<dyn MobRunStore>,
events: Arc<dyn MobEventStore>,
mob_id: MobId,
) -> Self {
Self {
run_store,
events,
mob_id,
}
}
pub(super) async fn record_persisted_terminalization(
&self,
run_id: RunId,
flow_id: FlowId,
target: TerminalizationTarget,
) -> Result<TerminalizationOutcome, MobError> {
let event_name = target.event_name();
let event_kind = target.into_event_kind(run_id.clone(), flow_id);
if let Err(append_error) = self
.events
.append(NewMobEvent {
mob_id: self.mob_id.clone(),
timestamp: None,
kind: event_kind,
})
.await
{
return self
.record_terminal_event_append_failure(
&run_id,
event_name,
MobError::from(append_error),
)
.await;
}
Ok(TerminalizationOutcome::Transitioned)
}
async fn record_terminal_event_append_failure(
&self,
run_id: &RunId,
event_name: &'static str,
append_error: MobError,
) -> Result<TerminalizationOutcome, MobError> {
let reason =
format!("terminal run status persisted but {event_name} append failed: {append_error}");
if let Err(failure_ledger_error) = self
.run_store
.append_failure_entry(
run_id,
FailureLedgerEntry {
step_id: flow_system_step_id(),
reason: reason.clone(),
timestamp: Utc::now(),
},
)
.await
{
tracing::error!(
run_id = %run_id,
event_name,
append_error = %append_error,
ledger_error = %failure_ledger_error,
"terminal event append divergence could not be recorded in failure ledger"
);
return Err(MobError::Internal(format!(
"terminal run status persisted but {event_name} append failed and failure ledger append also failed: {failure_ledger_error}"
)));
}
tracing::error!(
run_id = %run_id,
event_name,
append_error = %append_error,
"terminal event append divergence recorded in failure ledger"
);
Err(MobError::Internal(reason))
}
}
#[cfg(test)]
mod tests {
use super::{FlowTerminalizationAuthority, TerminalizationOutcome, TerminalizationTarget};
use crate::event::{MobEvent, MobEventKind, NewMobEvent};
use crate::ids::{FlowId, MobId, RunId, StepId};
use crate::run::{FailureLedgerEntry, MobRun, MobRunStatus, StepLedgerEntry};
use crate::store::{InMemoryMobRunStore, MobEventStore, MobRunStore, MobStoreError};
use async_trait::async_trait;
use chrono::Utc;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
struct RecordingRunStore {
inner: InMemoryMobRunStore,
}
impl RecordingRunStore {
fn new() -> Self {
Self {
inner: InMemoryMobRunStore::new(),
}
}
}
#[async_trait]
impl MobRunStore for RecordingRunStore {
async fn create_run(&self, run: MobRun) -> Result<(), MobStoreError> {
self.inner.create_run(run).await
}
async fn get_run(&self, run_id: &RunId) -> Result<Option<MobRun>, MobStoreError> {
self.inner.get_run(run_id).await
}
async fn list_runs(
&self,
mob_id: &MobId,
flow_id: Option<&FlowId>,
) -> Result<Vec<MobRun>, MobStoreError> {
self.inner.list_runs(mob_id, flow_id).await
}
async fn cas_run_status(
&self,
run_id: &RunId,
expected: MobRunStatus,
next: MobRunStatus,
) -> Result<bool, MobStoreError> {
self.inner.cas_run_status(run_id, expected, next).await
}
async fn cas_flow_state(
&self,
run_id: &RunId,
expected: &meerkat_machine_kernels::KernelState,
next: &meerkat_machine_kernels::KernelState,
) -> Result<bool, MobStoreError> {
self.inner.cas_flow_state(run_id, expected, next).await
}
async fn cas_run_snapshot(
&self,
run_id: &RunId,
expected_status: MobRunStatus,
expected_flow_state: &meerkat_machine_kernels::KernelState,
next_status: MobRunStatus,
next_flow_state: &meerkat_machine_kernels::KernelState,
) -> Result<bool, MobStoreError> {
self.inner
.cas_run_snapshot(
run_id,
expected_status,
expected_flow_state,
next_status,
next_flow_state,
)
.await
}
async fn append_step_entry(
&self,
run_id: &RunId,
entry: StepLedgerEntry,
) -> Result<(), MobStoreError> {
self.inner.append_step_entry(run_id, entry).await
}
async fn append_step_entry_if_absent(
&self,
run_id: &RunId,
entry: StepLedgerEntry,
) -> Result<bool, MobStoreError> {
self.inner.append_step_entry_if_absent(run_id, entry).await
}
async fn put_step_output(
&self,
run_id: &RunId,
step_id: &StepId,
output: serde_json::Value,
) -> Result<(), MobStoreError> {
self.inner.put_step_output(run_id, step_id, output).await
}
async fn append_failure_entry(
&self,
run_id: &RunId,
entry: FailureLedgerEntry,
) -> Result<(), MobStoreError> {
self.inner.append_failure_entry(run_id, entry).await
}
async fn upsert_loop_snapshot(
&self,
run_id: &RunId,
loop_instance_id: &crate::ids::LoopInstanceId,
snapshot: crate::run::LoopSnapshot,
ledger_entry: Option<crate::run::LoopIterationLedgerEntry>,
) -> Result<(), MobStoreError> {
self.inner
.upsert_loop_snapshot(run_id, loop_instance_id, snapshot, ledger_entry)
.await
}
async fn cas_frame_state(
&self,
run_id: &RunId,
frame_id: &crate::ids::FrameId,
expected: Option<&crate::run::FrameSnapshot>,
next: crate::run::FrameSnapshot,
) -> Result<bool, MobStoreError> {
self.inner
.cas_frame_state(run_id, frame_id, expected, next)
.await
}
async fn cas_grant_node_slot(
&self,
run_id: &RunId,
expected_run_state: &meerkat_machine_kernels::KernelState,
next_run_state: meerkat_machine_kernels::KernelState,
frame_id: &crate::ids::FrameId,
expected_frame: &crate::run::FrameSnapshot,
next_frame: crate::run::FrameSnapshot,
) -> Result<bool, MobStoreError> {
self.inner
.cas_grant_node_slot(
run_id,
expected_run_state,
next_run_state,
frame_id,
expected_frame,
next_frame,
)
.await
}
async fn cas_complete_step_and_record_output(
&self,
run_id: &RunId,
frame_id: &crate::ids::FrameId,
expected_frame: &crate::run::FrameSnapshot,
next_frame: crate::run::FrameSnapshot,
step_output_key: String,
step_output: serde_json::Value,
loop_context: Option<(&crate::ids::LoopId, u64)>,
) -> Result<bool, MobStoreError> {
self.inner
.cas_complete_step_and_record_output(
run_id,
frame_id,
expected_frame,
next_frame,
step_output_key,
step_output,
loop_context,
)
.await
}
async fn cas_start_loop(
&self,
run_id: &RunId,
loop_instance_id: &crate::ids::LoopInstanceId,
expected_run_state: &meerkat_machine_kernels::KernelState,
next_run_state: meerkat_machine_kernels::KernelState,
frame_id: &crate::ids::FrameId,
expected_frame: &crate::run::FrameSnapshot,
next_frame: crate::run::FrameSnapshot,
initial_loop: crate::run::LoopSnapshot,
) -> Result<bool, MobStoreError> {
self.inner
.cas_start_loop(
run_id,
loop_instance_id,
expected_run_state,
next_run_state,
frame_id,
expected_frame,
next_frame,
initial_loop,
)
.await
}
async fn cas_loop_request_body_frame(
&self,
run_id: &RunId,
loop_instance_id: &crate::ids::LoopInstanceId,
expected_loop: &crate::run::LoopSnapshot,
next_loop: crate::run::LoopSnapshot,
expected_run_state: &meerkat_machine_kernels::KernelState,
next_run_state: meerkat_machine_kernels::KernelState,
) -> Result<bool, MobStoreError> {
self.inner
.cas_loop_request_body_frame(
run_id,
loop_instance_id,
expected_loop,
next_loop,
expected_run_state,
next_run_state,
)
.await
}
async fn cas_grant_body_frame_start(
&self,
run_id: &RunId,
loop_instance_id: &crate::ids::LoopInstanceId,
expected_loop: &crate::run::LoopSnapshot,
next_loop: crate::run::LoopSnapshot,
frame_id: &crate::ids::FrameId,
initial_frame: crate::run::FrameSnapshot,
ledger_entry: crate::run::LoopIterationLedgerEntry,
expected_run_state: &meerkat_machine_kernels::KernelState,
next_run_state: meerkat_machine_kernels::KernelState,
) -> Result<bool, MobStoreError> {
self.inner
.cas_grant_body_frame_start(
run_id,
loop_instance_id,
expected_loop,
next_loop,
frame_id,
initial_frame,
ledger_entry,
expected_run_state,
next_run_state,
)
.await
}
async fn cas_complete_body_frame(
&self,
run_id: &RunId,
loop_instance_id: &crate::ids::LoopInstanceId,
expected_loop: &crate::run::LoopSnapshot,
next_loop: crate::run::LoopSnapshot,
frame_id: &crate::ids::FrameId,
expected_frame: &crate::run::FrameSnapshot,
next_frame: crate::run::FrameSnapshot,
expected_run_state: &meerkat_machine_kernels::KernelState,
next_run_state: meerkat_machine_kernels::KernelState,
) -> Result<bool, MobStoreError> {
self.inner
.cas_complete_body_frame(
run_id,
loop_instance_id,
expected_loop,
next_loop,
frame_id,
expected_frame,
next_frame,
expected_run_state,
next_run_state,
)
.await
}
async fn cas_complete_loop(
&self,
run_id: &RunId,
loop_instance_id: &crate::ids::LoopInstanceId,
expected_loop: &crate::run::LoopSnapshot,
next_loop: crate::run::LoopSnapshot,
frame_id: &crate::ids::FrameId,
expected_frame: &crate::run::FrameSnapshot,
next_frame: crate::run::FrameSnapshot,
expected_run_state: &meerkat_machine_kernels::KernelState,
next_run_state: meerkat_machine_kernels::KernelState,
) -> Result<bool, MobStoreError> {
self.inner
.cas_complete_loop(
run_id,
loop_instance_id,
expected_loop,
next_loop,
frame_id,
expected_frame,
next_frame,
expected_run_state,
next_run_state,
)
.await
}
}
#[derive(Default)]
struct FaultInjectedEventStore {
events: RwLock<Vec<MobEvent>>,
fail_on_kind: RwLock<HashSet<&'static str>>,
}
impl FaultInjectedEventStore {
async fn fail_appends_for(&self, kind: &'static str) {
self.fail_on_kind.write().await.insert(kind);
}
fn kind_label(kind: &MobEventKind) -> &'static str {
match kind {
MobEventKind::FlowCompleted { .. } => "FlowCompleted",
MobEventKind::FlowFailed { .. } => "FlowFailed",
MobEventKind::FlowCanceled { .. } => "FlowCanceled",
_ => "Other",
}
}
}
#[async_trait]
impl MobEventStore for FaultInjectedEventStore {
async fn append(&self, event: NewMobEvent) -> Result<MobEvent, MobStoreError> {
let kind = Self::kind_label(&event.kind);
if self.fail_on_kind.read().await.contains(kind) {
return Err(MobStoreError::Internal(format!(
"fault-injected append failure for {kind}"
)));
}
let mut events = self.events.write().await;
let stored = MobEvent {
cursor: events.len() as u64 + 1,
timestamp: Utc::now(),
mob_id: event.mob_id,
kind: event.kind,
};
events.push(stored.clone());
Ok(stored)
}
async fn poll(
&self,
after_cursor: u64,
limit: usize,
) -> Result<Vec<MobEvent>, MobStoreError> {
let events = self.events.read().await;
Ok(events
.iter()
.filter(|event| event.cursor > after_cursor)
.take(limit)
.cloned()
.collect())
}
async fn replay_all(&self) -> Result<Vec<MobEvent>, MobStoreError> {
Ok(self.events.read().await.clone())
}
async fn append_batch(
&self,
events: Vec<NewMobEvent>,
) -> Result<Vec<MobEvent>, MobStoreError> {
let mut results = Vec::with_capacity(events.len());
for event in events {
results.push(self.append(event).await?);
}
Ok(results)
}
async fn clear(&self) -> Result<(), MobStoreError> {
self.events.write().await.clear();
Ok(())
}
}
fn sample_run(run_id: RunId, status: MobRunStatus) -> MobRun {
MobRun {
run_id,
mob_id: MobId::from("mob"),
flow_id: FlowId::from("flow"),
status,
flow_state: MobRun::flow_state_for_steps([crate::ids::StepId::from("step-1")]).unwrap(),
activation_params: serde_json::json!({}),
created_at: Utc::now(),
completed_at: None,
step_ledger: Vec::new(),
failure_ledger: Vec::new(),
frames: std::collections::BTreeMap::new(),
loops: std::collections::BTreeMap::new(),
loop_iteration_ledger: Vec::new(),
schema_version: 4,
root_step_outputs: indexmap::IndexMap::new(),
loop_iteration_outputs: std::collections::BTreeMap::new(),
}
}
#[tokio::test]
async fn test_terminalization_records_persisted_terminal_event() {
let run_store = Arc::new(RecordingRunStore::new());
let events = Arc::new(FaultInjectedEventStore::default());
let authority =
FlowTerminalizationAuthority::new(run_store.clone(), events, MobId::from("mob"));
let run_id = RunId::new();
run_store
.create_run(sample_run(run_id.clone(), MobRunStatus::Pending))
.await
.expect("create run");
let outcome = authority
.record_persisted_terminalization(
run_id.clone(),
FlowId::from("flow"),
TerminalizationTarget::Canceled,
)
.await
.expect("terminalize");
assert_eq!(outcome, TerminalizationOutcome::Transitioned);
}
#[tokio::test]
async fn test_terminalization_records_coherence_failure_ledger_on_append_error() {
let run_store = Arc::new(InMemoryMobRunStore::new());
let events = Arc::new(FaultInjectedEventStore::default());
events.fail_appends_for("FlowFailed").await;
let authority =
FlowTerminalizationAuthority::new(run_store.clone(), events, MobId::from("mob"));
let run_id = RunId::new();
run_store
.create_run(sample_run(run_id.clone(), MobRunStatus::Failed))
.await
.expect("create run");
let error = authority
.record_persisted_terminalization(
run_id.clone(),
FlowId::from("flow"),
TerminalizationTarget::Failed {
reason: "boom".to_string(),
},
)
.await
.expect_err("append failure should surface as terminalization error");
assert!(
error
.to_string()
.contains("terminal run status persisted but FlowFailed append failed"),
"error should mention terminal append divergence"
);
let run = run_store
.get_run(&run_id)
.await
.expect("get run")
.expect("run exists");
assert_eq!(run.status, MobRunStatus::Failed);
assert!(
run.failure_ledger.iter().any(|entry| {
entry.step_id == crate::runtime::flow_system_step_id()
&& entry.reason.contains("FlowFailed append failed")
}),
"failed terminal append should always be mirrored into failure ledger"
);
}
}