obzenflow_fsm/lib.rs
1//! Async-first finite state machine (FSM) library inspired by Akka / Apache Pekko (Classic) FSM and
2//! [`edfsm`](https://docs.rs/edfsm).
3//!
4//! `obzenflow-fsm` implements a small **Mealy-machine** core:
5//! `State(S) × Event(E) → Actions(A), State(S')`.
6//!
7//! The engine is intentionally minimal:
8//! - Transition handlers are async and return a [`Transition`].
9//! - Actions are executed explicitly by the host via [`FsmAction::execute`].
10//! - The [`StateMachine`] stores only the current state; the mutable context lives outside.
11//!
12//! This split makes it easy to build deterministic state evolution while keeping side effects
13//! explicit and auditable (e.g. write-to-journal, publish-to-bus, spawn tasks).
14//!
15//! ## Why This Crate Exists (ObzenFlow Background)
16//!
17//! ObzenFlow is an async dataflow runtime. Pipeline/stage supervisors coordinate lifecycles,
18//! subscriptions, backpressure, and observability while executing **user-defined code** (sources,
19//! transforms, stateful operators, sinks) potentially thousands of times per second.
20//!
21//! That runtime context drives a few non-negotiable constraints:
22//! - A "step" often needs to `await` (journals, channels, locks, timers, user code).
23//! - Failures must be handled explicitly (emit an error event, drain/cleanup) rather than panic.
24//! - The host needs control over effect execution (ordering, retries, batching, instrumentation).
25//!
26//! ## Philosophy
27//!
28//! ObzenFlow’s runtime makes *at-least-once* delivery the default. An event may be retried or
29//! replayed and therefore observed more than once.
30//!
31//! When a stage has multiple upstreams, each upstream may be sequential on its own, but the
32//! *interleaving across upstreams* is nondeterministic. In practice this means the combined stream
33//! can be “reordered”, even if no single upstream violates its own ordering.
34//!
35//! `obzenflow-fsm` is designed so those realities stay visible. Transition handlers compute the
36//! next state and return actions; the host executes (and can retry) actions explicitly.
37//!
38//! For outcomes that stay stable under duplicates, interleavings, and reshaping (batching/
39//! sharding), the tests are essentially pointing at the "unholy trinity" of distributed systems
40//! failures: fuzzy or broken idempotence, commutativity, and associativity guarantees. These are
41//! sufficient conditions for many dataflow operators, not universal requirements (some domains
42//! are intentionally order-dependent).
43//!
44//! - **Idempotence**: applying the same logical input more than once has the same effect as
45//! applying it once.
46//! In practice, naively doing `credit += amount` is not idempotent under retries, it only becomes
47//! idempotent if you deduplicate by a stable key (event ID, business key, or a durable cursor).
48//! Prefer bounded/durable tracking when possible (sequence numbers, journal offsets, windowed
49//! keys). In ObzenFlow’s runtime-services, stateful stage supervisors already keep bounded per-upstream
50//! progress/lineage metadata (e.g. last seen event ID *and* vector-clock snapshots) alongside the
51//! handler’s accumulator state.
52//!
53//! - **Commutativity**: swapping the order of two inputs does not change the outcome.
54//! If your update is order-dependent (e.g. appending into a `Vec`), that can be correct, but
55//! then you must model ordering explicitly (total ordering / deterministic tie-breaks) instead of
56//! assuming reordering is harmless. In ObzenFlow, vector clocks capture happened-before vs concurrency, but they do
57//! **not** impose a total order on concurrent events, so they can’t be used as an ordering key by
58//! themselves.
59//!
60//! - **Associativity**: when you represent updates as deltas/partials with a “combine” operator,
61//! regrouping combinations does not change the result: `(a ⊕ b) ⊕ c == a ⊕ (b ⊕ c)`.
62//! This is what makes batching and parallel folding safe: combine partial aggregates in any
63//! grouping and get the same combined delta. If you can only define a sequential update (or use a
64//! non-associative operator like subtraction), then “batch then reduce” is not equivalent to
65//! “apply one-by-one”.
66//!
67//! When a property does *not* hold, model that explicitly: carry ordering metadata, detect and
68//! surface duplicates, or transition into a domain `Corrupted`/`Failed` state instead of silently
69//! producing inconsistent results.
70//!
71//! Tip: if you care about replay/event-sourcing, keep state evolution deterministic and model
72//! external effects only as actions. In replay mode you can ignore actions; in live mode you
73//! execute them.
74//!
75//! Also apply idempotence to control/lifecycle signals where possible (e.g. receiving an `Error`
76//! while already in a terminal `Failed` state should be a no-op) so retries don’t amplify failures.
77//!
78//! ## Quick start
79//!
80//! A tiny "door" FSM with explicit actions:
81//!
82//! Note on handler syntax: `fsm!` stores handlers behind trait objects, so each handler closure
83//! returns a boxed pinned future. That’s why examples use `Box::pin(async move { ... })`.
84//!
85//! ```rust
86//! use obzenflow_fsm::{fsm, types::FsmResult, FsmAction, FsmContext, Transition};
87//!
88//! #[derive(Clone, Debug, PartialEq, obzenflow_fsm::StateVariant)]
89//! enum DoorState {
90//! Closed,
91//! Open,
92//! }
93//!
94//! #[derive(Clone, Debug, obzenflow_fsm::EventVariant)]
95//! enum DoorEvent {
96//! Open,
97//! Close,
98//! }
99//!
100//! #[derive(Clone, Debug, PartialEq)]
101//! enum DoorAction {
102//! Ring,
103//! Log(String),
104//! }
105//!
106//! #[derive(Default)]
107//! struct DoorContext {
108//! log: Vec<String>,
109//! }
110//!
111//! impl FsmContext for DoorContext {}
112//!
113//! #[async_trait::async_trait]
114//! impl FsmAction for DoorAction {
115//! type Context = DoorContext;
116//!
117//! async fn execute(&self, ctx: &mut Self::Context) -> FsmResult<()> {
118//! match self {
119//! DoorAction::Ring => ctx.log.push("Ring!".to_string()),
120//! DoorAction::Log(msg) => ctx.log.push(msg.clone()),
121//! }
122//! Ok(())
123//! }
124//! }
125//!
126//! #[tokio::main(flavor = "current_thread")]
127//! async fn main() -> FsmResult<()> {
128//! let mut door = fsm! {
129//! state: DoorState;
130//! event: DoorEvent;
131//! context: DoorContext;
132//! action: DoorAction;
133//! initial: DoorState::Closed;
134//!
135//! state DoorState::Closed {
136//! on DoorEvent::Open => |
137//! _s: &DoorState,
138//! _e: &DoorEvent,
139//! _ctx: &mut DoorContext,
140//! | {
141//! // `fsm!` expects a boxed pinned future from each handler.
142//! Box::pin(async move {
143//! Ok(Transition {
144//! next_state: DoorState::Open,
145//! actions: vec![
146//! DoorAction::Ring,
147//! DoorAction::Log("Door opened".into()),
148//! ],
149//! })
150//! })
151//! };
152//! }
153//!
154//! state DoorState::Open {
155//! on DoorEvent::Close => |
156//! _s: &DoorState,
157//! _e: &DoorEvent,
158//! _ctx: &mut DoorContext,
159//! | {
160//! Box::pin(async move {
161//! Ok(Transition {
162//! next_state: DoorState::Closed,
163//! actions: vec![DoorAction::Log("Door closed".into())],
164//! })
165//! })
166//! };
167//! }
168//! };
169//!
170//! let mut ctx = DoorContext::default();
171//!
172//! let actions = door.handle(DoorEvent::Open, &mut ctx).await?;
173//! door.execute_actions(actions, &mut ctx).await?;
174//! assert_eq!(door.state(), &DoorState::Open);
175//!
176//! let actions = door.handle(DoorEvent::Close, &mut ctx).await?;
177//! door.execute_actions(actions, &mut ctx).await?;
178//! assert_eq!(door.state(), &DoorState::Closed);
179//!
180//! assert_eq!(
181//! ctx.log,
182//! vec![
183//! "Ring!".to_string(),
184//! "Door opened".to_string(),
185//! "Door closed".to_string()
186//! ]
187//! );
188//! Ok(())
189//! }
190//! ```
191//!
192//! The rest of this page focuses on the patterns used to run FSMs inside an async runtime.
193//!
194//! ## Supervisor / host loop pattern
195//!
196//! The FSM is usually embedded in a "host loop" (an actor/supervisor task) that:
197//! 1) Receives or derives events from the outside world.
198//! 2) Feeds events into the FSM via [`StateMachine::handle`].
199//! 3) Executes returned actions (often sequentially).
200//! 4) Decides how to handle action failures (retry, escalate, or feed an error event back in).
201//!
202//! In ObzenFlow's runtime-services, a supervisor typically owns the `StateMachine` plus a context
203//! that holds runtime capabilities (journals, message bus handles, metrics emitters). The
204//! supervision loop drives the FSM forward and treats actions as explicit, auditable effects.
205//! The "hot path" where user-defined handlers run (process one input, emit zero or more outputs)
206//! usually lives in the dispatch layer; the FSM primarily models lifecycle coordination and
207//! produces actions like "publish running", "forward EOF", "write completion", "cleanup", etc.
208//!
209//! Concretely, supervisors often split into two layers:
210//! - A "dispatch" loop (sometimes named `dispatch_state`) that performs state-specific I/O and
211//! decides what should happen next (e.g. `Continue`, `Transition(event)`, `Terminate`).
212//! - A single-threaded FSM step that calls `handle(event, &mut context)` and then executes the
213//! returned actions, mapping action failures into explicit error events when needed.
214//!
215//! ```rust,no_run
216//! # use obzenflow_fsm::{FsmAction, StateMachine};
217//! # async fn example<S, E, C, A>(
218//! # mut machine: StateMachine<S, E, C, A>,
219//! # mut context: C,
220//! # ) -> Result<(), obzenflow_fsm::FsmError>
221//! # where
222//! # S: obzenflow_fsm::StateVariant,
223//! # E: obzenflow_fsm::EventVariant,
224//! # C: obzenflow_fsm::FsmContext,
225//! # A: obzenflow_fsm::FsmAction<Context = C>,
226//! # {
227//! loop {
228//! // Give the current state a chance to time out.
229//! let timeout_actions = machine.check_timeout(&mut context).await?;
230//! for action in timeout_actions {
231//! action.execute(&mut context).await?;
232//! }
233//!
234//! // Receive an external event (channel, journal subscription, socket, ...).
235//! let event: E = todo!("receive or derive an event");
236//!
237//! let actions = machine.handle(event, &mut context).await?;
238//! for action in actions {
239//! if let Err(e) = action.execute(&mut context).await {
240//! // Many supervisors map action failures into a domain event and feed it back in
241//! // so the FSM can transition into an explicit Failed/Drained state.
242//! let failure_event: E = todo!("map {e} into an error event");
243//! let failure_actions = machine.handle(failure_event, &mut context).await?;
244//! for action in failure_actions {
245//! action.execute(&mut context).await?;
246//! }
247//! }
248//! }
249//! }
250//! # }
251//! ```
252//!
253//! ## Timeouts
254//!
255//! Timeouts are **per-state** and **cooperative**:
256//! - A timeout is configured inside a `state` block via `timeout <duration> => handler;`.
257//! - The engine does not spawn timers; the host decides when to poll for timeouts by calling
258//! [`StateMachine::check_timeout`].
259//! - Timeout handlers return a [`Transition`] just like normal `on Event => ...` handlers.
260//!
261//! The timeout countdown starts when a state is entered. Any transition into a state (including a
262//! self-transition) schedules that state's timeout; transitioning into a state without a timeout
263//! clears the timer.
264//!
265//! ```rust
266//! use obzenflow_fsm::{fsm, FsmAction, FsmContext, Transition};
267//! use std::time::Duration;
268//!
269//! #[derive(Clone, Debug, PartialEq, obzenflow_fsm::StateVariant)]
270//! enum DoorState {
271//! Closed,
272//! Open,
273//! }
274//!
275//! #[derive(Clone, Debug, obzenflow_fsm::EventVariant)]
276//! enum DoorEvent {
277//! Open,
278//! }
279//!
280//! #[derive(Clone, Debug, PartialEq)]
281//! enum DoorAction {
282//! Log(String),
283//! }
284//!
285//! #[derive(Default)]
286//! struct DoorContext {
287//! log: Vec<String>,
288//! }
289//!
290//! impl FsmContext for DoorContext {}
291//!
292//! #[async_trait::async_trait]
293//! impl FsmAction for DoorAction {
294//! type Context = DoorContext;
295//!
296//! async fn execute(&self, ctx: &mut Self::Context) -> obzenflow_fsm::types::FsmResult<()> {
297//! match self {
298//! DoorAction::Log(msg) => ctx.log.push(msg.clone()),
299//! }
300//! Ok(())
301//! }
302//! }
303//!
304//! #[tokio::main(flavor = "current_thread")]
305//! async fn main() -> Result<(), obzenflow_fsm::FsmError> {
306//! let mut door = fsm! {
307//! state: DoorState;
308//! event: DoorEvent;
309//! context: DoorContext;
310//! action: DoorAction;
311//! initial: DoorState::Closed;
312//!
313//! state DoorState::Closed {
314//! on DoorEvent::Open => |_s: &DoorState, _e: &DoorEvent, _ctx: &mut DoorContext| {
315//! Box::pin(async move {
316//! Ok(Transition {
317//! next_state: DoorState::Open,
318//! actions: vec![DoorAction::Log("Opened".into())],
319//! })
320//! })
321//! };
322//! }
323//!
324//! state DoorState::Open {
325//! timeout Duration::from_millis(10) => |_s: &DoorState, _ctx: &mut DoorContext| {
326//! // Timeout handlers are the same idea: return a boxed pinned future.
327//! Box::pin(async move {
328//! Ok(Transition {
329//! next_state: DoorState::Closed,
330//! actions: vec![DoorAction::Log("Auto-closed".into())],
331//! })
332//! })
333//! };
334//! }
335//! };
336//!
337//! let mut ctx = DoorContext::default();
338//!
339//! // Closed -> Open
340//! let actions = door.handle(DoorEvent::Open, &mut ctx).await?;
341//! door.execute_actions(actions, &mut ctx).await?;
342//! assert_eq!(door.state(), &DoorState::Open);
343//!
344//! tokio::time::sleep(Duration::from_millis(20)).await;
345//!
346//! // Open -> Closed (timeout)
347//! let actions = door.check_timeout(&mut ctx).await?;
348//! door.execute_actions(actions, &mut ctx).await?;
349//! assert_eq!(door.state(), &DoorState::Closed);
350//!
351//! assert_eq!(
352//! ctx.log,
353//! vec!["Opened".to_string(), "Auto-closed".to_string()]
354//! );
355//! Ok(())
356//! }
357//! ```
358//!
359//! ## Variant names and dispatch
360//!
361//! Transitions are looked up by `(state.variant_name(), event.variant_name())`. For enums, the
362//! recommended approach is to derive these traits:
363//! `#[derive(obzenflow_fsm::StateVariant, obzenflow_fsm::EventVariant)]`.
364
365// Allow this crate to refer to itself via `obzenflow_fsm` so that
366// proc-macro expansions using `::obzenflow_fsm::...` also work in
367// the crate's own tests.
368extern crate self as obzenflow_fsm;
369
370// Module declarations
371pub mod builder;
372pub mod error;
373pub mod handlers;
374pub mod machine;
375pub mod types;
376
377// Re-export main types for convenience
378pub use error::FsmError;
379pub use machine::StateMachine;
380pub use types::{EventVariant, FsmAction, FsmContext, StateVariant, Transition};
381
382// Re-export handler types for advanced usage
383pub use handlers::{StateHandler, TimeoutHandler, TransitionHandler};
384
385/// Internal types used by the `fsm!` macro expansion.
386///
387/// This module is **not** intended for direct use; it exists so that
388/// proc-macro expansions can refer to a stable, public path while
389/// signalling that the underlying types are implementation details.
390#[doc(hidden)]
391pub mod internal {
392 #[allow(deprecated)]
393 pub use crate::builder::FsmBuilder;
394}
395
396// Re-export proc-macro helpers (derives + DSL).
397// This allows users to write:
398// #[derive(obzenflow_fsm::StateVariant, obzenflow_fsm::EventVariant)]
399// let fsm = fsm! { ... };
400pub use obzenflow_fsm_macros::{fsm, EventVariant, StateVariant};
401pub use obzenflow_fsm_macros::{
402 EventVariant as EventVariantDerive, StateVariant as StateVariantDerive,
403};
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use async_trait::async_trait;
409
410 #[allow(dead_code)]
411 #[derive(Clone, Debug, PartialEq)]
412 enum TestState {
413 Idle,
414 Active { count: u32 },
415 Done,
416 }
417
418 impl StateVariant for TestState {
419 fn variant_name(&self) -> &str {
420 match self {
421 TestState::Idle => "Idle",
422 TestState::Active { .. } => "Active",
423 TestState::Done => "Done",
424 }
425 }
426 }
427
428 #[allow(dead_code)]
429 #[derive(Clone, Debug)]
430 enum TestEvent {
431 Start,
432 Increment,
433 Finish,
434 }
435
436 impl EventVariant for TestEvent {
437 fn variant_name(&self) -> &str {
438 match self {
439 TestEvent::Start => "Start",
440 TestEvent::Increment => "Increment",
441 TestEvent::Finish => "Finish",
442 }
443 }
444 }
445
446 #[allow(dead_code)]
447 #[derive(Clone, Debug, PartialEq)]
448 enum TestAction {
449 Log(String),
450 Notify,
451 }
452
453 struct TestContext {
454 total: u32,
455 }
456
457 impl FsmContext for TestContext {
458 fn describe(&self) -> String {
459 format!("Test context with total: {}", self.total)
460 }
461 }
462
463 #[async_trait]
464 impl FsmAction for TestAction {
465 type Context = TestContext;
466
467 async fn execute(&self, _ctx: &mut Self::Context) -> crate::types::FsmResult<()> {
468 match self {
469 TestAction::Log(msg) => {
470 println!("Log: {msg}");
471 Ok(())
472 }
473 TestAction::Notify => {
474 println!("Notify");
475 Ok(())
476 }
477 }
478 }
479 }
480
481 #[tokio::test]
482 async fn test_basic_fsm() {
483 let mut state_machine = crate::fsm! {
484 state: TestState;
485 event: TestEvent;
486 context: TestContext;
487 action: TestAction;
488 initial: TestState::Idle;
489
490 state TestState::Idle {
491 on TestEvent::Start => |_state: &TestState, _event: &TestEvent, _ctx: &mut TestContext| {
492 Box::pin(async move {
493 Ok(Transition {
494 next_state: TestState::Active { count: 0 },
495 actions: vec![TestAction::Log("Starting".into())],
496 })
497 })
498 };
499 }
500 };
501 let mut ctx = TestContext { total: 0 };
502
503 // Test transition
504 let actions = state_machine
505 .handle(TestEvent::Start, &mut ctx)
506 .await
507 .unwrap();
508 assert_eq!(actions.len(), 1);
509 assert!(matches!(
510 state_machine.state(),
511 TestState::Active { count: 0 }
512 ));
513 }
514}