obzenflow_fsm/
lib.rs

1//! Async-first finite state machine (FSM) library inspired by Akka / Apache Pekko (Classic) FSM and
2//! [`edfsm`](https://docs.rs/edfsm).
3//!
4//! `obzenflow-fsm` implements a small **Mealy-machine** core:
5//! `State(S) × Event(E) → Actions(A), State(S')`.
6//!
7//! The engine is intentionally minimal:
8//! - Transition handlers are async and return a [`Transition`].
9//! - Actions are executed explicitly by the host via [`FsmAction::execute`].
10//! - The [`StateMachine`] stores only the current state; the mutable context lives outside.
11//!
12//! This split makes it easy to build deterministic state evolution while keeping side effects
13//! explicit and auditable (e.g. write-to-journal, publish-to-bus, spawn tasks).
14//!
15//! ## Why This Crate Exists (ObzenFlow Background)
16//!
17//! ObzenFlow is an async dataflow runtime. Pipeline/stage supervisors coordinate lifecycles,
18//! subscriptions, backpressure, and observability while executing **user-defined code** (sources,
19//! transforms, stateful operators, sinks) potentially thousands of times per second.
20//!
21//! That runtime context drives a few non-negotiable constraints:
22//! - A "step" often needs to `await` (journals, channels, locks, timers, user code).
23//! - Failures must be handled explicitly (emit an error event, drain/cleanup) rather than panic.
24//! - The host needs control over effect execution (ordering, retries, batching, instrumentation).
25//!
26//! ## Philosophy
27//!
28//! ObzenFlow’s runtime makes *at-least-once* delivery the default. An event may be retried or
29//! replayed and therefore observed more than once.
30//!
31//! When a stage has multiple upstreams, each upstream may be sequential on its own, but the
32//! *interleaving across upstreams* is nondeterministic. In practice this means the combined stream
33//! can be “reordered”, even if no single upstream violates its own ordering.
34//!
35//! `obzenflow-fsm` is designed so those realities stay visible. Transition handlers compute the
36//! next state and return actions; the host executes (and can retry) actions explicitly.
37//!
38//! For outcomes that stay stable under duplicates, interleavings, and reshaping (batching/
39//! sharding), the tests are essentially pointing at the "unholy trinity" of distributed systems
40//! failures: fuzzy or broken idempotence, commutativity, and associativity guarantees. These are
41//! sufficient conditions for many dataflow operators, not universal requirements (some domains
42//! are intentionally order-dependent).
43//!
44//! - **Idempotence**: applying the same logical input more than once has the same effect as
45//!   applying it once.
46//!   In practice, naively doing `credit += amount` is not idempotent under retries, it only becomes
47//!   idempotent if you deduplicate by a stable key (event ID, business key, or a durable cursor).
48//!   Prefer bounded/durable tracking when possible (sequence numbers, journal offsets, windowed
49//!   keys). In ObzenFlow’s runtime-services, stateful stage supervisors already keep bounded per-upstream
50//!   progress/lineage metadata (e.g. last seen event ID *and* vector-clock snapshots) alongside the
51//!   handler’s accumulator state.
52//!
53//! - **Commutativity**: swapping the order of two inputs does not change the outcome.
54//!   If your update is order-dependent (e.g. appending into a `Vec`), that can be correct, but
55//!   then you must model ordering explicitly (total ordering / deterministic tie-breaks) instead of
56//!   assuming reordering is harmless. In ObzenFlow, vector clocks capture happened-before vs concurrency, but they do
57//!   **not** impose a total order on concurrent events, so they can’t be used as an ordering key by
58//!   themselves.
59//!
60//! - **Associativity**: when you represent updates as deltas/partials with a “combine” operator,
61//!   regrouping combinations does not change the result: `(a ⊕ b) ⊕ c == a ⊕ (b ⊕ c)`.
62//!   This is what makes batching and parallel folding safe: combine partial aggregates in any
63//!   grouping and get the same combined delta. If you can only define a sequential update (or use a
64//!   non-associative operator like subtraction), then “batch then reduce” is not equivalent to
65//!   “apply one-by-one”.
66//!
67//! When a property does *not* hold, model that explicitly: carry ordering metadata, detect and
68//! surface duplicates, or transition into a domain `Corrupted`/`Failed` state instead of silently
69//! producing inconsistent results.
70//!
71//! Tip: if you care about replay/event-sourcing, keep state evolution deterministic and model
72//! external effects only as actions. In replay mode you can ignore actions; in live mode you
73//! execute them.
74//!
75//! Also apply idempotence to control/lifecycle signals where possible (e.g. receiving an `Error`
76//! while already in a terminal `Failed` state should be a no-op) so retries don’t amplify failures.
77//!
78//! ## Quick start
79//!
80//! A tiny "door" FSM with explicit actions:
81//!
82//! Note on handler syntax: `fsm!` stores handlers behind trait objects, so each handler closure
83//! returns a boxed pinned future. That’s why examples use `Box::pin(async move { ... })`.
84//!
85//! ```rust
86//! use obzenflow_fsm::{fsm, types::FsmResult, FsmAction, FsmContext, Transition};
87//!
88//! #[derive(Clone, Debug, PartialEq, obzenflow_fsm::StateVariant)]
89//! enum DoorState {
90//!     Closed,
91//!     Open,
92//! }
93//!
94//! #[derive(Clone, Debug, obzenflow_fsm::EventVariant)]
95//! enum DoorEvent {
96//!     Open,
97//!     Close,
98//! }
99//!
100//! #[derive(Clone, Debug, PartialEq)]
101//! enum DoorAction {
102//!     Ring,
103//!     Log(String),
104//! }
105//!
106//! #[derive(Default)]
107//! struct DoorContext {
108//!     log: Vec<String>,
109//! }
110//!
111//! impl FsmContext for DoorContext {}
112//!
113//! #[async_trait::async_trait]
114//! impl FsmAction for DoorAction {
115//!     type Context = DoorContext;
116//!
117//!     async fn execute(&self, ctx: &mut Self::Context) -> FsmResult<()> {
118//!         match self {
119//!             DoorAction::Ring => ctx.log.push("Ring!".to_string()),
120//!             DoorAction::Log(msg) => ctx.log.push(msg.clone()),
121//!         }
122//!         Ok(())
123//!     }
124//! }
125//!
126//! #[tokio::main(flavor = "current_thread")]
127//! async fn main() -> FsmResult<()> {
128//!     let mut door = fsm! {
129//!         state:   DoorState;
130//!         event:   DoorEvent;
131//!         context: DoorContext;
132//!         action:  DoorAction;
133//!         initial: DoorState::Closed;
134//!
135//!         state DoorState::Closed {
136//!             on DoorEvent::Open => |
137//!                 _s: &DoorState,
138//!                 _e: &DoorEvent,
139//!                 _ctx: &mut DoorContext,
140//!             | {
141//!                 // `fsm!` expects a boxed pinned future from each handler.
142//!                 Box::pin(async move {
143//!                     Ok(Transition {
144//!                         next_state: DoorState::Open,
145//!                         actions: vec![
146//!                             DoorAction::Ring,
147//!                             DoorAction::Log("Door opened".into()),
148//!                         ],
149//!                     })
150//!                 })
151//!             };
152//!         }
153//!
154//!         state DoorState::Open {
155//!             on DoorEvent::Close => |
156//!                 _s: &DoorState,
157//!                 _e: &DoorEvent,
158//!                 _ctx: &mut DoorContext,
159//!             | {
160//!                 Box::pin(async move {
161//!                     Ok(Transition {
162//!                         next_state: DoorState::Closed,
163//!                         actions: vec![DoorAction::Log("Door closed".into())],
164//!                     })
165//!                 })
166//!             };
167//!         }
168//!     };
169//!
170//!     let mut ctx = DoorContext::default();
171//!
172//!     let actions = door.handle(DoorEvent::Open, &mut ctx).await?;
173//!     door.execute_actions(actions, &mut ctx).await?;
174//!     assert_eq!(door.state(), &DoorState::Open);
175//!
176//!     let actions = door.handle(DoorEvent::Close, &mut ctx).await?;
177//!     door.execute_actions(actions, &mut ctx).await?;
178//!     assert_eq!(door.state(), &DoorState::Closed);
179//!
180//!     assert_eq!(
181//!         ctx.log,
182//!         vec![
183//!             "Ring!".to_string(),
184//!             "Door opened".to_string(),
185//!             "Door closed".to_string()
186//!         ]
187//!     );
188//!     Ok(())
189//! }
190//! ```
191//!
192//! The rest of this page focuses on the patterns used to run FSMs inside an async runtime.
193//!
194//! ## Supervisor / host loop pattern
195//!
196//! The FSM is usually embedded in a "host loop" (an actor/supervisor task) that:
197//! 1) Receives or derives events from the outside world.
198//! 2) Feeds events into the FSM via [`StateMachine::handle`].
199//! 3) Executes returned actions (often sequentially).
200//! 4) Decides how to handle action failures (retry, escalate, or feed an error event back in).
201//!
202//! In ObzenFlow's runtime-services, a supervisor typically owns the `StateMachine` plus a context
203//! that holds runtime capabilities (journals, message bus handles, metrics emitters). The
204//! supervision loop drives the FSM forward and treats actions as explicit, auditable effects.
205//! The "hot path" where user-defined handlers run (process one input, emit zero or more outputs)
206//! usually lives in the dispatch layer; the FSM primarily models lifecycle coordination and
207//! produces actions like "publish running", "forward EOF", "write completion", "cleanup", etc.
208//!
209//! Concretely, supervisors often split into two layers:
210//! - A "dispatch" loop (sometimes named `dispatch_state`) that performs state-specific I/O and
211//!   decides what should happen next (e.g. `Continue`, `Transition(event)`, `Terminate`).
212//! - A single-threaded FSM step that calls `handle(event, &mut context)` and then executes the
213//!   returned actions, mapping action failures into explicit error events when needed.
214//!
215//! ```rust,no_run
216//! # use obzenflow_fsm::{FsmAction, StateMachine};
217//! # async fn example<S, E, C, A>(
218//! #   mut machine: StateMachine<S, E, C, A>,
219//! #   mut context: C,
220//! # ) -> Result<(), obzenflow_fsm::FsmError>
221//! # where
222//! #   S: obzenflow_fsm::StateVariant,
223//! #   E: obzenflow_fsm::EventVariant,
224//! #   C: obzenflow_fsm::FsmContext,
225//! #   A: obzenflow_fsm::FsmAction<Context = C>,
226//! # {
227//! loop {
228//!     // Give the current state a chance to time out.
229//!     let timeout_actions = machine.check_timeout(&mut context).await?;
230//!     for action in timeout_actions {
231//!         action.execute(&mut context).await?;
232//!     }
233//!
234//!     // Receive an external event (channel, journal subscription, socket, ...).
235//!     let event: E = todo!("receive or derive an event");
236//!
237//!     let actions = machine.handle(event, &mut context).await?;
238//!     for action in actions {
239//!         if let Err(e) = action.execute(&mut context).await {
240//!             // Many supervisors map action failures into a domain event and feed it back in
241//!             // so the FSM can transition into an explicit Failed/Drained state.
242//!             let failure_event: E = todo!("map {e} into an error event");
243//!             let failure_actions = machine.handle(failure_event, &mut context).await?;
244//!             for action in failure_actions {
245//!                 action.execute(&mut context).await?;
246//!             }
247//!         }
248//!     }
249//! }
250//! # }
251//! ```
252//!
253//! ## Timeouts
254//!
255//! Timeouts are **per-state** and **cooperative**:
256//! - A timeout is configured inside a `state` block via `timeout <duration> => handler;`.
257//! - The engine does not spawn timers; the host decides when to poll for timeouts by calling
258//!   [`StateMachine::check_timeout`].
259//! - Timeout handlers return a [`Transition`] just like normal `on Event => ...` handlers.
260//!
261//! The timeout countdown starts when a state is entered. Any transition into a state (including a
262//! self-transition) schedules that state's timeout; transitioning into a state without a timeout
263//! clears the timer.
264//!
265//! ```rust
266//! use obzenflow_fsm::{fsm, FsmAction, FsmContext, Transition};
267//! use std::time::Duration;
268//!
269//! #[derive(Clone, Debug, PartialEq, obzenflow_fsm::StateVariant)]
270//! enum DoorState {
271//!     Closed,
272//!     Open,
273//! }
274//!
275//! #[derive(Clone, Debug, obzenflow_fsm::EventVariant)]
276//! enum DoorEvent {
277//!     Open,
278//! }
279//!
280//! #[derive(Clone, Debug, PartialEq)]
281//! enum DoorAction {
282//!     Log(String),
283//! }
284//!
285//! #[derive(Default)]
286//! struct DoorContext {
287//!     log: Vec<String>,
288//! }
289//!
290//! impl FsmContext for DoorContext {}
291//!
292//! #[async_trait::async_trait]
293//! impl FsmAction for DoorAction {
294//!     type Context = DoorContext;
295//!
296//!     async fn execute(&self, ctx: &mut Self::Context) -> obzenflow_fsm::types::FsmResult<()> {
297//!         match self {
298//!             DoorAction::Log(msg) => ctx.log.push(msg.clone()),
299//!         }
300//!         Ok(())
301//!     }
302//! }
303//!
304//! #[tokio::main(flavor = "current_thread")]
305//! async fn main() -> Result<(), obzenflow_fsm::FsmError> {
306//!     let mut door = fsm! {
307//!         state:   DoorState;
308//!         event:   DoorEvent;
309//!         context: DoorContext;
310//!         action:  DoorAction;
311//!         initial: DoorState::Closed;
312//!
313//!         state DoorState::Closed {
314//!             on DoorEvent::Open => |_s: &DoorState, _e: &DoorEvent, _ctx: &mut DoorContext| {
315//!                 Box::pin(async move {
316//!                     Ok(Transition {
317//!                         next_state: DoorState::Open,
318//!                         actions: vec![DoorAction::Log("Opened".into())],
319//!                     })
320//!                 })
321//!             };
322//!         }
323//!
324//!         state DoorState::Open {
325//!             timeout Duration::from_millis(10) => |_s: &DoorState, _ctx: &mut DoorContext| {
326//!                 // Timeout handlers are the same idea: return a boxed pinned future.
327//!                 Box::pin(async move {
328//!                     Ok(Transition {
329//!                         next_state: DoorState::Closed,
330//!                         actions: vec![DoorAction::Log("Auto-closed".into())],
331//!                     })
332//!                 })
333//!             };
334//!         }
335//!     };
336//!
337//!     let mut ctx = DoorContext::default();
338//!
339//!     // Closed -> Open
340//!     let actions = door.handle(DoorEvent::Open, &mut ctx).await?;
341//!     door.execute_actions(actions, &mut ctx).await?;
342//!     assert_eq!(door.state(), &DoorState::Open);
343//!
344//!     tokio::time::sleep(Duration::from_millis(20)).await;
345//!
346//!     // Open -> Closed (timeout)
347//!     let actions = door.check_timeout(&mut ctx).await?;
348//!     door.execute_actions(actions, &mut ctx).await?;
349//!     assert_eq!(door.state(), &DoorState::Closed);
350//!
351//!     assert_eq!(
352//!         ctx.log,
353//!         vec!["Opened".to_string(), "Auto-closed".to_string()]
354//!     );
355//!     Ok(())
356//! }
357//! ```
358//!
359//! ## Variant names and dispatch
360//!
361//! Transitions are looked up by `(state.variant_name(), event.variant_name())`. For enums, the
362//! recommended approach is to derive these traits:
363//! `#[derive(obzenflow_fsm::StateVariant, obzenflow_fsm::EventVariant)]`.
364
365// Allow this crate to refer to itself via `obzenflow_fsm` so that
366// proc-macro expansions using `::obzenflow_fsm::...` also work in
367// the crate's own tests.
368extern crate self as obzenflow_fsm;
369
370// Module declarations
371pub mod builder;
372pub mod error;
373pub mod handlers;
374pub mod machine;
375pub mod types;
376
377// Re-export main types for convenience
378pub use error::FsmError;
379pub use machine::StateMachine;
380pub use types::{EventVariant, FsmAction, FsmContext, StateVariant, Transition};
381
382// Re-export handler types for advanced usage
383pub use handlers::{StateHandler, TimeoutHandler, TransitionHandler};
384
385/// Internal types used by the `fsm!` macro expansion.
386///
387/// This module is **not** intended for direct use; it exists so that
388/// proc-macro expansions can refer to a stable, public path while
389/// signalling that the underlying types are implementation details.
390#[doc(hidden)]
391pub mod internal {
392    #[allow(deprecated)]
393    pub use crate::builder::FsmBuilder;
394}
395
396// Re-export proc-macro helpers (derives + DSL).
397// This allows users to write:
398//   #[derive(obzenflow_fsm::StateVariant, obzenflow_fsm::EventVariant)]
399//   let fsm = fsm! { ... };
400pub use obzenflow_fsm_macros::{fsm, EventVariant, StateVariant};
401pub use obzenflow_fsm_macros::{
402    EventVariant as EventVariantDerive, StateVariant as StateVariantDerive,
403};
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use async_trait::async_trait;
409
410    #[allow(dead_code)]
411    #[derive(Clone, Debug, PartialEq)]
412    enum TestState {
413        Idle,
414        Active { count: u32 },
415        Done,
416    }
417
418    impl StateVariant for TestState {
419        fn variant_name(&self) -> &str {
420            match self {
421                TestState::Idle => "Idle",
422                TestState::Active { .. } => "Active",
423                TestState::Done => "Done",
424            }
425        }
426    }
427
428    #[allow(dead_code)]
429    #[derive(Clone, Debug)]
430    enum TestEvent {
431        Start,
432        Increment,
433        Finish,
434    }
435
436    impl EventVariant for TestEvent {
437        fn variant_name(&self) -> &str {
438            match self {
439                TestEvent::Start => "Start",
440                TestEvent::Increment => "Increment",
441                TestEvent::Finish => "Finish",
442            }
443        }
444    }
445
446    #[allow(dead_code)]
447    #[derive(Clone, Debug, PartialEq)]
448    enum TestAction {
449        Log(String),
450        Notify,
451    }
452
453    struct TestContext {
454        total: u32,
455    }
456
457    impl FsmContext for TestContext {
458        fn describe(&self) -> String {
459            format!("Test context with total: {}", self.total)
460        }
461    }
462
463    #[async_trait]
464    impl FsmAction for TestAction {
465        type Context = TestContext;
466
467        async fn execute(&self, _ctx: &mut Self::Context) -> crate::types::FsmResult<()> {
468            match self {
469                TestAction::Log(msg) => {
470                    println!("Log: {msg}");
471                    Ok(())
472                }
473                TestAction::Notify => {
474                    println!("Notify");
475                    Ok(())
476                }
477            }
478        }
479    }
480
481    #[tokio::test]
482    async fn test_basic_fsm() {
483        let mut state_machine = crate::fsm! {
484            state:   TestState;
485            event:   TestEvent;
486            context: TestContext;
487            action:  TestAction;
488            initial: TestState::Idle;
489
490            state TestState::Idle {
491                on TestEvent::Start => |_state: &TestState, _event: &TestEvent, _ctx: &mut TestContext| {
492                    Box::pin(async move {
493                        Ok(Transition {
494                            next_state: TestState::Active { count: 0 },
495                            actions: vec![TestAction::Log("Starting".into())],
496                        })
497                    })
498                };
499            }
500        };
501        let mut ctx = TestContext { total: 0 };
502
503        // Test transition
504        let actions = state_machine
505            .handle(TestEvent::Start, &mut ctx)
506            .await
507            .unwrap();
508        assert_eq!(actions.len(), 1);
509        assert!(matches!(
510            state_machine.state(),
511            TestState::Active { count: 0 }
512        ));
513    }
514}