#![warn(missing_docs)]
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
pub mod ids {
use super::*;
use std::fmt;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct IdValidationError {
kind: &'static str,
value: String,
}
impl IdValidationError {
fn new(kind: &'static str, value: impl Into<String>) -> Self {
Self {
kind,
value: value.into(),
}
}
}
impl fmt::Display for IdValidationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "invalid {}: {}", self.kind, self.value)
}
}
impl std::error::Error for IdValidationError {}
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(try_from = "String", into = "String")]
pub struct OpId(String);
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct OpPath(pub String);
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(try_from = "String", into = "String")]
pub struct StateId(String);
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct RunId(pub uuid::Uuid);
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ArtifactId(pub String);
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct FactKey(pub String);
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ContextKey(pub String);
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ErrorCode(pub String);
#[allow(dead_code)]
pub(crate) fn is_valid_id_segment(segment: &str) -> bool {
let b = segment.as_bytes();
if b.is_empty() || b.len() > 63 {
return false;
}
match b[0] {
b'a'..=b'z' => {}
_ => return false,
}
for &c in &b[1..] {
match c {
b'a'..=b'z' | b'0'..=b'9' | b'_' => {}
_ => return false,
}
}
true
}
impl OpId {
pub fn new(value: impl Into<String>) -> Result<Self, IdValidationError> {
let value = value.into();
if !is_valid_id_segment(&value) {
return Err(IdValidationError::new("op_id", value));
}
Ok(Self(value))
}
pub fn must_new(value: impl Into<String>) -> Self {
Self::new(value).expect("op id must satisfy ^[a-z][a-z0-9_]{0,62}$")
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl TryFrom<String> for OpId {
type Error = IdValidationError;
fn try_from(value: String) -> Result<Self, Self::Error> {
Self::new(value)
}
}
impl TryFrom<&str> for OpId {
type Error = IdValidationError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
Self::new(value)
}
}
impl From<OpId> for String {
fn from(value: OpId) -> Self {
value.0
}
}
impl fmt::Display for OpId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[allow(dead_code)]
pub(crate) fn validate_op_path(value: &str) -> bool {
let mut it = value.split('.');
let Some(machine_id) = it.next() else {
return false;
};
let Some(step_id) = it.next() else {
return false;
};
if it.next().is_some() {
return false;
}
is_valid_id_segment(machine_id) && is_valid_id_segment(step_id)
}
#[allow(dead_code)]
pub(crate) fn validate_state_id(value: &str) -> bool {
let mut it = value.split('.');
let Some(machine_id) = it.next() else {
return false;
};
let Some(step_id) = it.next() else {
return false;
};
let Some(state_local_id) = it.next() else {
return false;
};
if it.next().is_some() {
return false;
}
is_valid_id_segment(machine_id)
&& is_valid_id_segment(step_id)
&& is_valid_id_segment(state_local_id)
}
impl StateId {
pub fn new(value: impl Into<String>) -> Result<Self, IdValidationError> {
let value = value.into();
if !validate_state_id(&value) {
return Err(IdValidationError::new("state_id", value));
}
Ok(Self(value))
}
pub fn must_new(value: impl Into<String>) -> Self {
Self::new(value).expect("state id must satisfy <machine_id>.<step_id>.<state_local_id>")
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl TryFrom<String> for StateId {
type Error = IdValidationError;
fn try_from(value: String) -> Result<Self, Self::Error> {
Self::new(value)
}
}
impl TryFrom<&str> for StateId {
type Error = IdValidationError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
Self::new(value)
}
}
impl From<StateId> for String {
fn from(value: StateId) -> Self {
value.0
}
}
impl fmt::Display for StateId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[cfg(test)]
mod ids_tests {
include!("tests/ids_tests.rs");
}
}
pub mod canonical {
pub trait CanonicalJsonPolicy: Send + Sync {}
}
pub mod config {
use super::*;
use crate::ids::OpId;
use crate::meta::Tag;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum IoMode {
Live,
Replay,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum EventProfile {
Minimal,
Normal,
Verbose,
Custom(String),
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum BackoffPolicy {
Fixed {
delay: Duration,
},
Exponential {
base_delay: Duration,
max_delay: Duration,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub backoff: BackoffPolicy,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ExecutionMode {
Sequential,
FanOutJoin {
max_concurrency: u32,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ContextCheckpointing {
AfterEveryState,
Custom(String),
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RunConfig {
pub io_mode: IoMode,
pub retry_policy: RetryPolicy,
pub event_profile: EventProfile,
pub execution_mode: ExecutionMode,
pub context_checkpointing: ContextCheckpointing,
pub replay_missing_fact_retryable: bool,
pub skip_tags: Vec<Tag>,
#[serde(default = "default_nix_flake_allowlist")]
pub nix_flake_allowlist: Vec<String>,
}
pub fn default_nix_flake_allowlist() -> Vec<String> {
vec!["github:willyrgf/mfm".to_string()]
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RunManifest {
pub op_id: OpId,
pub op_version: String,
pub input_params: serde_json::Value,
pub run_config: RunConfig,
pub build: BuildProvenance,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct BuildProvenance {
pub git_commit: Option<String>,
pub cargo_lock_hash: Option<String>,
pub flake_lock_hash: Option<String>,
pub rustc_version: Option<String>,
pub target_triple: Option<String>,
pub env_allowlist: Vec<String>,
}
}
pub mod meta {
use super::*;
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Tag(pub String);
pub mod standard_tags {
pub const CONFIG: &str = "config";
pub const FETCH_DATA: &str = "fetch_data";
pub const COMPUTE: &str = "compute";
pub const EXECUTE: &str = "execute";
pub const REPORT: &str = "report";
pub const APPLY_SIDE_EFFECT: &str = "apply_side_effect";
pub const IMPURE: &str = "impure";
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SideEffectKind {
Pure,
ReadOnlyIo,
ApplySideEffect,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Idempotency {
None,
Key(String),
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum DependencyStrategy {
Latest,
Earliest,
LatestSuccessful,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StateMeta {
pub tags: Vec<Tag>,
pub depends_on: Vec<Tag>,
pub depends_on_strategy: DependencyStrategy,
pub side_effects: SideEffectKind,
pub idempotency: Idempotency,
}
}
pub mod errors {
use super::*;
use crate::ids::{ErrorCode, StateId};
#[allow(dead_code)]
pub(crate) const CODE_MISSING_FACT_KEY: &str = "missing_fact_key";
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ErrorCategory {
ParsingInput,
OnChain,
OffChain,
Rpc,
Storage,
Context,
Unknown,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ErrorInfo {
pub code: ErrorCode,
pub category: ErrorCategory,
pub retryable: bool,
pub message: String,
pub details: Option<serde_json::Value>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StateError {
pub state_id: Option<StateId>,
pub info: ErrorInfo,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum IoError {
MissingFactKey(ErrorInfo),
MissingFact {
key: crate::ids::FactKey,
info: ErrorInfo,
},
Transport(ErrorInfo),
RateLimited(ErrorInfo),
Other(ErrorInfo),
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ContextError {
MissingKey {
key: crate::ids::ContextKey,
info: ErrorInfo,
},
Serialization(ErrorInfo),
Other(ErrorInfo),
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum StorageError {
Concurrency(ErrorInfo),
NotFound(ErrorInfo),
Corruption(ErrorInfo),
Other(ErrorInfo),
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum RunError {
InvalidPlan(ErrorInfo),
Storage(StorageError),
Context(ContextError),
Io(IoError),
State(StateError),
Other(ErrorInfo),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn missing_fact_key_code_is_stable() {
assert_eq!(CODE_MISSING_FACT_KEY, "missing_fact_key");
let info = ErrorInfo {
code: ErrorCode(CODE_MISSING_FACT_KEY.to_string()),
category: ErrorCategory::Rpc,
retryable: false,
message: "missing fact key".to_string(),
details: None,
};
let err = IoError::MissingFactKey(info);
match err {
IoError::MissingFactKey(info) => assert_eq!(info.code.0, "missing_fact_key"),
_ => unreachable!("wrong error variant"),
}
}
}
}
pub mod context {
use super::*;
use crate::errors::ContextError;
use crate::ids::ContextKey;
pub trait DynContext: Send {
fn read(&self, key: &ContextKey) -> Result<Option<serde_json::Value>, ContextError>;
fn write(&mut self, key: ContextKey, value: serde_json::Value) -> Result<(), ContextError>;
fn delete(&mut self, key: &ContextKey) -> Result<(), ContextError>;
fn dump(&self) -> Result<serde_json::Value, ContextError>;
}
pub trait TypedContextExt {
fn read_typed<T: serde::de::DeserializeOwned>(
&self,
key: &ContextKey,
) -> Result<Option<T>, ContextError>;
fn write_typed<T: Serialize>(
&mut self,
key: ContextKey,
value: &T,
) -> Result<(), ContextError>;
}
impl<C: DynContext + ?Sized> TypedContextExt for C {
fn read_typed<T: serde::de::DeserializeOwned>(
&self,
key: &ContextKey,
) -> Result<Option<T>, ContextError> {
let Some(value) = self.read(key)? else {
return Ok(None);
};
serde_json::from_value(value).map(Some).map_err(|_| {
ContextError::Serialization(crate::errors::ErrorInfo {
code: crate::ids::ErrorCode("context_deserialize_failed".to_string()),
category: crate::errors::ErrorCategory::Context,
retryable: false,
message: "context value deserialization failed".to_string(),
details: None,
})
})
}
fn write_typed<T: Serialize>(
&mut self,
key: ContextKey,
value: &T,
) -> Result<(), ContextError> {
let v = serde_json::to_value(value).map_err(|_| {
ContextError::Serialization(crate::errors::ErrorInfo {
code: crate::ids::ErrorCode("context_serialize_failed".to_string()),
category: crate::errors::ErrorCategory::Context,
retryable: false,
message: "context value serialization failed".to_string(),
details: None,
})
})?;
self.write(key, v)
}
}
}
pub mod events {
use super::*;
use crate::errors::StateError;
use crate::ids::{ArtifactId, OpId, OpPath, RunId, StateId};
pub const DOMAIN_EVENT_FACT_RECORDED: &str = "fact_recorded";
pub const DOMAIN_EVENT_ARTIFACT_WRITTEN: &str = "artifact_written";
pub const DOMAIN_EVENT_OP_BOUNDARY: &str = "op_boundary";
pub const DOMAIN_EVENT_CHILD_RUN_SPAWNED: &str = "child_run_spawned";
pub const DOMAIN_EVENT_CHILD_RUN_COMPLETED: &str = "child_run_completed";
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum RunStatus {
Completed,
Failed,
Cancelled,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum KernelEvent {
RunStarted {
op_id: OpId,
manifest_id: ArtifactId,
initial_snapshot_id: ArtifactId,
},
StateEntered {
state_id: StateId,
attempt: u32,
base_snapshot_id: ArtifactId,
},
StateCompleted {
state_id: StateId,
context_snapshot_id: ArtifactId,
},
StateFailed {
state_id: StateId,
error: StateError,
failure_snapshot_id: Option<ArtifactId>,
},
RunCompleted {
status: RunStatus,
final_snapshot_id: Option<ArtifactId>,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct DomainEvent {
pub name: String,
pub payload: serde_json::Value,
pub payload_ref: Option<ArtifactId>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Event {
Kernel(KernelEvent),
Domain(DomainEvent),
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct EventEnvelope {
pub run_id: RunId,
pub seq: u64,
pub ts_millis: Option<u64>,
pub event: Event,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct FactRecorded {
pub key: crate::ids::FactKey,
pub payload_id: ArtifactId,
pub meta: serde_json::Value,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ArtifactWritten {
pub artifact_id: ArtifactId,
pub kind: crate::stores::ArtifactKind,
pub meta: serde_json::Value,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct OpBoundary {
pub op_path: OpPath,
pub phase: String, }
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ChildRunSpawned {
pub parent_run_id: RunId,
pub child_run_id: RunId,
pub child_manifest_id: ArtifactId,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ChildRunCompleted {
pub child_run_id: RunId,
pub status: RunStatus,
pub final_snapshot_id: Option<ArtifactId>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hashing::artifact_id_for_json;
#[test]
fn child_run_payloads_are_canonical_and_non_secret() {
let v = serde_json::to_value(ChildRunSpawned {
parent_run_id: RunId(uuid::Uuid::new_v4()),
child_run_id: RunId(uuid::Uuid::new_v4()),
child_manifest_id: ArtifactId("0".repeat(64)),
})
.expect("serialize");
artifact_id_for_json(&v).expect("canonical-json-hashable");
assert!(!crate::secrets::json_contains_secrets(&v));
let v = serde_json::to_value(ChildRunCompleted {
child_run_id: RunId(uuid::Uuid::new_v4()),
status: RunStatus::Completed,
final_snapshot_id: Some(ArtifactId("1".repeat(64))),
})
.expect("serialize");
artifact_id_for_json(&v).expect("canonical-json-hashable");
assert!(!crate::secrets::json_contains_secrets(&v));
}
}
}
pub mod io {
use super::*;
use crate::errors::IoError;
use crate::ids::{ArtifactId, FactKey};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct IoCall {
pub namespace: String,
pub request: serde_json::Value,
pub fact_key: Option<FactKey>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct IoResult {
pub response: serde_json::Value,
pub recorded_payload_id: Option<ArtifactId>,
}
#[async_trait]
pub trait IoProvider: Send {
async fn call(&mut self, call: IoCall) -> Result<IoResult, IoError>;
async fn record_value(
&mut self,
key: FactKey,
value: serde_json::Value,
) -> Result<ArtifactId, IoError>;
async fn get_recorded_fact(&mut self, key: &FactKey)
-> Result<Option<ArtifactId>, IoError>;
async fn now_millis(&mut self) -> Result<u64, IoError>;
async fn random_bytes(&mut self, n: usize) -> Result<Vec<u8>, IoError>;
}
}
pub mod recorder {
use super::*;
use crate::errors::RunError;
use crate::events::DomainEvent;
#[async_trait]
pub trait EventRecorder: Send {
async fn emit(&mut self, event: DomainEvent) -> Result<(), RunError>;
async fn emit_many(&mut self, events: Vec<DomainEvent>) -> Result<(), RunError>;
}
}
pub mod state {
use super::*;
use crate::context::DynContext;
use crate::errors::StateError;
use crate::io::IoProvider;
use crate::meta::StateMeta;
use crate::recorder::EventRecorder;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SnapshotPolicy {
Never,
OnSuccess,
Always,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StateOutcome {
pub snapshot: SnapshotPolicy,
}
#[async_trait]
pub trait State: Send + Sync {
fn meta(&self) -> StateMeta;
async fn handle(
&self,
ctx: &mut dyn DynContext,
io: &mut dyn IoProvider,
rec: &mut dyn EventRecorder,
) -> Result<StateOutcome, StateError>;
}
pub type DynState = Arc<dyn State>;
}
pub mod plan {
use super::*;
use crate::ids::{OpId, StateId};
use crate::state::DynState;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct DependencyEdge {
pub from: StateId,
pub to: StateId,
}
#[derive(Clone)]
pub struct StateNode {
pub id: StateId,
pub state: DynState,
}
#[derive(Clone)]
pub struct StateGraph {
pub states: Vec<StateNode>,
pub edges: Vec<DependencyEdge>,
}
#[derive(Clone)]
pub struct ExecutionPlan {
pub op_id: OpId,
pub graph: StateGraph,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum PlanValidationError {
EmptyPlan,
DuplicateStateId {
state_id: StateId,
},
MissingStateForEdge {
missing: StateId,
},
CircularDependency {
cycle: Vec<StateId>,
},
DanglingDependencyTag {
state_id: StateId,
missing_tag: crate::meta::Tag,
},
}
pub trait PlanValidator: Send + Sync {
fn validate(&self, plan: &ExecutionPlan) -> Result<(), PlanValidationError>;
}
}
pub mod stores {
use super::*;
use crate::errors::StorageError;
use crate::events::EventEnvelope;
use crate::ids::{ArtifactId, RunId};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ArtifactKind {
Manifest,
ContextSnapshot,
FactPayload,
SecretPayload,
Output,
Other(String),
}
#[async_trait]
pub trait EventStore: Send + Sync {
async fn head_seq(&self, run_id: RunId) -> Result<u64, StorageError>;
async fn append(
&self,
run_id: RunId,
expected_seq: u64,
events: Vec<EventEnvelope>,
) -> Result<u64, StorageError>;
async fn read_range(
&self,
run_id: RunId,
from_seq: u64,
to_seq: Option<u64>,
) -> Result<Vec<EventEnvelope>, StorageError>;
}
#[async_trait]
pub trait ArtifactStore: Send + Sync {
async fn put(&self, kind: ArtifactKind, bytes: Vec<u8>)
-> Result<ArtifactId, StorageError>;
async fn get(&self, id: &ArtifactId) -> Result<Vec<u8>, StorageError>;
async fn exists(&self, id: &ArtifactId) -> Result<bool, StorageError>;
}
}
pub mod engine {
use super::*;
use crate::config::{RunConfig, RunManifest};
use crate::context::DynContext;
use crate::errors::RunError;
use crate::ids::{ArtifactId, RunId};
use crate::plan::ExecutionPlan;
use crate::stores::{ArtifactStore, EventStore};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum RunPhase {
Running,
Completed,
Failed,
Cancelled,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RunResult {
pub run_id: RunId,
pub phase: RunPhase,
pub final_snapshot_id: Option<ArtifactId>,
}
pub struct StartRun {
pub manifest: RunManifest,
pub manifest_id: ArtifactId,
pub plan: ExecutionPlan,
pub run_config: RunConfig,
pub initial_context: Box<dyn DynContext>,
}
#[derive(Clone)]
pub struct Stores {
pub events: Arc<dyn EventStore>,
pub artifacts: Arc<dyn ArtifactStore>,
}
#[async_trait]
pub trait ExecutionEngine: Send + Sync {
async fn start(&self, stores: Stores, run: StartRun) -> Result<RunResult, RunError>;
async fn resume(&self, stores: Stores, run_id: RunId) -> Result<RunResult, RunError>;
}
}
pub mod hashing;
pub mod runtime;
pub mod live_io;
pub mod exec_transport;
pub mod process_exec;
pub mod live_io_router;
pub mod live_io_registry;
pub mod replay_io;
pub(crate) mod attempt_envelope;
pub(crate) mod context_runtime;
pub(crate) mod event_profile;
pub(crate) mod secrets;