Skip to main content

mfm_sdk/
lib.rs

1#![warn(missing_docs)]
2//! Orchestration SDK for planning and launching MFM runs.
3//!
4//! `mfm-sdk` sits above `mfm-machine` and below transport layers such as the CLI and REST API.
5//! It provides stable identifiers, operation and pipeline traits, and launcher contracts for
6//! turning deterministic op descriptions into executable `mfm-machine` plans.
7//!
8//! Source of truth: `docs/redesign.md` (v4) Appendix C.2.
9//!
10//! # Examples
11//!
12//! ```rust
13//! use async_trait::async_trait;
14//! use std::sync::Arc;
15//!
16//! use mfm_machine::config::RunConfig;
17//! use mfm_machine::context::DynContext;
18//! use mfm_machine::errors::StateError;
19//! use mfm_machine::ids::{OpId, OpPath, StateId};
20//! use mfm_machine::io::IoProvider;
21//! use mfm_machine::meta::{DependencyStrategy, Idempotency, SideEffectKind, StateMeta};
22//! use mfm_machine::plan::{StateGraph, StateNode};
23//! use mfm_machine::recorder::EventRecorder;
24//! use mfm_machine::state::{SnapshotPolicy, State, StateOutcome};
25//! use mfm_sdk::errors::SdkError;
26//! use mfm_sdk::op::{OpIo, Operation};
27//! use mfm_sdk::unstable::HashMapOperationRegistry;
28//!
29//! struct ExampleState;
30//!
31//! #[async_trait]
32//! impl State for ExampleState {
33//!     fn meta(&self) -> StateMeta {
34//!         StateMeta {
35//!             tags: Vec::new(),
36//!             depends_on: Vec::new(),
37//!             depends_on_strategy: DependencyStrategy::Latest,
38//!             side_effects: SideEffectKind::Pure,
39//!             idempotency: Idempotency::None,
40//!         }
41//!     }
42//!
43//!     async fn handle(
44//!         &self,
45//!         _ctx: &mut dyn DynContext,
46//!         _io: &mut dyn IoProvider,
47//!         _rec: &mut dyn EventRecorder,
48//!     ) -> Result<StateOutcome, StateError> {
49//!         Ok(StateOutcome {
50//!             snapshot: SnapshotPolicy::OnSuccess,
51//!         })
52//!     }
53//! }
54//!
55//! struct ExampleOp;
56//!
57//! impl Operation for ExampleOp {
58//!     fn op_id(&self) -> OpId {
59//!         OpId::must_new("example")
60//!     }
61//!
62//!     fn op_version(&self) -> String {
63//!         "v1".to_string()
64//!     }
65//!
66//!     fn io(&self, _op_config: &serde_json::Value) -> Result<OpIo, SdkError> {
67//!         Ok(OpIo {
68//!             imports: Vec::new(),
69//!             exports: Vec::new(),
70//!         })
71//!     }
72//!
73//!     fn expand(
74//!         &self,
75//!         _op_path: OpPath,
76//!         _op_config: &serde_json::Value,
77//!         _run_config: &RunConfig,
78//!     ) -> Result<StateGraph, SdkError> {
79//!         Ok(StateGraph {
80//!             states: vec![StateNode {
81//!                 id: StateId::must_new("example.main.report"),
82//!                 state: Arc::new(ExampleState),
83//!             }],
84//!             edges: Vec::new(),
85//!         })
86//!     }
87//! }
88//!
89//! let mut registry = HashMapOperationRegistry::default();
90//! registry.register(Arc::new(ExampleOp));
91//! ```
92//!
93//! Anything not listed in Appendix C.2 is internal or unstable, even if temporarily public.
94
95use async_trait::async_trait;
96use serde::{Deserialize, Serialize};
97use std::sync::Arc;
98
99use mfm_machine::config::{BuildProvenance, RunConfig};
100use mfm_machine::context::DynContext;
101use mfm_machine::engine::{ExecutionEngine, RunResult, Stores};
102use mfm_machine::errors::{ErrorInfo, RunError};
103use mfm_machine::ids::{OpId, OpPath, RunId};
104use mfm_machine::plan::{ExecutionPlan, StateGraph};
105
106/// Stable identifiers used by pipeline planners and operation definitions.
107pub mod ids {
108    use super::*;
109
110    /// Enforced: `^[a-z][a-z0-9_]{0,62}$`
111    #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
112    pub struct MachineId(pub String);
113
114    /// Enforced: `^[a-z][a-z0-9_]{0,62}$`
115    #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
116    pub struct StepId(pub String);
117
118    /// Local state id within an operation.
119    ///
120    /// Enforced: `^[a-z][a-z0-9_]{0,62}$`
121    #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
122    pub struct StateLocalId(pub String);
123
124    /// Logical import/export key for cross-op wiring (not a `ContextKey`).
125    ///
126    /// Recommended: dot-separated segments like `prices.latest_eth`.
127    #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
128    pub struct PortKey(pub String);
129}
130
131/// Typed SDK errors returned by planners and launch helpers.
132pub mod errors {
133    use super::*;
134
135    /// SDK planning/launch errors (no secrets).
136    #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
137    pub struct SdkError {
138        /// Structured error payload safe to serialize and persist.
139        pub info: ErrorInfo,
140    }
141}
142
143/// Traits and types for deterministic operation definitions.
144pub mod op {
145    use super::*;
146    use crate::errors::SdkError;
147    use crate::ids::PortKey;
148
149    /// Declared op IO surface for pipeline validation.
150    #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
151    pub struct OpIo {
152        /// Ports this operation expects earlier pipeline steps to provide.
153        pub imports: Vec<PortKey>,
154        /// Ports this operation makes available to later pipeline steps.
155        pub exports: Vec<PortKey>,
156    }
157
158    /// A reusable operation definition.
159    ///
160    /// Contract:
161    /// - `op_id` + `op_version` MUST be stable across environments.
162    /// - `expand()` MUST be deterministic and MUST NOT perform IO.
163    /// - `expand()` MUST assign StateIds of the form: "<op_path>.<state_local_id>" (3 segments).
164    pub trait Operation: Send + Sync {
165        /// Returns the stable operation identifier used in manifests and registries.
166        fn op_id(&self) -> OpId;
167        /// Returns the stable operation version string.
168        fn op_version(&self) -> String;
169
170        /// Declares the import/export surface for the provided config.
171        fn io(&self, op_config: &serde_json::Value) -> Result<OpIo, SdkError>;
172
173        /// Expands this operation into an executable state graph.
174        fn expand(
175            &self,
176            op_path: OpPath,
177            op_config: &serde_json::Value,
178            run_config: &RunConfig,
179        ) -> Result<StateGraph, SdkError>;
180    }
181
182    /// Shared trait-object form for storing heterogeneous operations in registries.
183    pub type DynOperation = Arc<dyn Operation>;
184
185    /// Registry used to resolve operations (by id + version) at plan/launch/resume time.
186    ///
187    /// For the default in-memory implementation used by most binaries and tests, see
188    /// [`crate::unstable::HashMapOperationRegistry`].
189    pub trait OperationRegistry: Send + Sync {
190        /// Resolves an operation implementation for the requested id and version.
191        fn resolve(&self, op_id: &OpId, op_version: &str) -> Result<DynOperation, SdkError>;
192    }
193}
194
195/// Pipeline composition types used to flatten multi-op workflows into execution plans.
196pub mod pipeline {
197    use super::*;
198    use crate::errors::SdkError;
199    use crate::ids::{MachineId, StepId};
200    use crate::op::OperationRegistry;
201
202    /// One pipeline step.
203    ///
204    /// Contract:
205    /// - `step_id` MUST be unique within the pipeline.
206    /// - `op_config` MUST be canonical-JSON hashable and MUST NOT contain secrets.
207    #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
208    pub struct PipelineStep {
209        /// Unique step identifier within the pipeline.
210        pub step_id: StepId,
211        /// Operation identifier executed by this step.
212        pub op_id: OpId,
213        /// Operation version executed by this step.
214        pub op_version: String,
215        /// Canonical JSON configuration passed to the operation.
216        pub op_config: serde_json::Value,
217    }
218
219    /// A flattened machine definition (ordered steps).
220    ///
221    /// Contract:
222    /// - `machine_id` + `pipeline_version` map to `RunManifest.{op_id, op_version}`.
223    /// - Step `OpPath` is "<machine_id>.<step_id>".
224    /// - Single-op convention: wrap a single op as a 1-step pipeline with:
225    ///   - `machine_id = <op_id>`
226    ///   - `steps[0].step_id = "main"`
227    #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
228    pub struct Pipeline {
229        /// Stable machine identifier used as the outer `OpId`.
230        pub machine_id: MachineId,
231        /// Version string for the overall pipeline template.
232        pub pipeline_version: String,
233        /// Ordered steps that make up the pipeline.
234        pub steps: Vec<PipelineStep>,
235    }
236
237    /// Recommended `RunManifest.input_params` shape for pipeline runs (canonical JSON; no secrets).
238    #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
239    pub struct PipelineManifestInput {
240        /// Pipeline definition embedded in the run manifest.
241        pub pipeline: Pipeline,
242        /// Additional pipeline input payload.
243        pub input: serde_json::Value,
244    }
245
246    /// Pipeline planning contract (flattened composition).
247    ///
248    /// For the standard implementation that validates pipeline wiring and produces the flattened
249    /// execution graph used by `mfm-machine`, see [`crate::unstable::DefaultPipelinePlanner`].
250    pub trait PipelinePlanner: Send + Sync {
251        /// Implementations MUST:
252        /// - resolve ops via `OperationRegistry`
253        /// - ensure all `StateId`s are unique and match "<machine_id>.<step_id>.<state_local_id>"
254        /// - enforce step order by adding dependency edges between step graphs (flattened composition)
255        ///
256        /// Returns the executable plan for the supplied pipeline and run config.
257        fn build_execution_plan(
258            &self,
259            registry: Arc<dyn OperationRegistry>,
260            pipeline: &Pipeline,
261            run_config: &RunConfig,
262        ) -> Result<ExecutionPlan, SdkError>;
263    }
264}
265
266/// Contracts for launching and resuming pipeline runs.
267pub mod launcher {
268    use super::*;
269    use crate::op::OperationRegistry;
270    use crate::pipeline::{Pipeline, PipelinePlanner};
271
272    /// Start inputs for launching a pipeline run.
273    ///
274    /// This request bundles everything a launcher needs to compute the plan, persist the
275    /// manifest, and hand the run to an execution engine.
276    pub struct LaunchPipeline {
277        /// Pipeline template to execute.
278        pub pipeline: Pipeline,
279        /// Canonical JSON input passed through to the manifest.
280        pub input: serde_json::Value,
281        /// Effective run configuration for this launch.
282        pub run_config: RunConfig,
283        /// Build provenance stored in the manifest.
284        pub build: BuildProvenance,
285        /// Initial context snapshot used as the run starting point.
286        pub initial_context: Box<dyn DynContext>,
287    }
288
289    /// Run launcher contract:
290    /// - plan the pipeline via `PipelinePlanner`
291    /// - compute + store the `RunManifest` artifact (content-addressed)
292    ///   - `RunManifest.input_params` SHOULD embed `PipelineManifestInput { pipeline, input }` (no secrets)
293    /// - call `ExecutionEngine::{start,resume}`
294    ///
295    /// For the standard implementation that follows the repository manifest layout and resume
296    /// contract, see [`crate::unstable::DefaultRunLauncher`].
297    #[async_trait]
298    pub trait RunLauncher: Send + Sync {
299        /// Plans the supplied pipeline, persists its manifest, and starts a new run.
300        async fn start_pipeline(
301            &self,
302            engine: Arc<dyn ExecutionEngine>,
303            stores: Stores,
304            registry: Arc<dyn OperationRegistry>,
305            planner: Arc<dyn PipelinePlanner>,
306            req: LaunchPipeline,
307        ) -> Result<RunResult, RunError>;
308
309        /// Rebuilds the execution plan and resumes a previously started run.
310        async fn resume(
311            &self,
312            engine: Arc<dyn ExecutionEngine>,
313            stores: Stores,
314            registry: Arc<dyn OperationRegistry>,
315            planner: Arc<dyn PipelinePlanner>,
316            run_id: RunId,
317        ) -> Result<RunResult, RunError>;
318    }
319}
320
321/// Unstable helper implementations for planning and launching.
322///
323/// Not part of the stable API contract (Appendix C.2).
324pub mod unstable;