Expand description
§Cano: Type-Safe Async Workflow Engine
Cano is an async workflow orchestration engine for Rust built on Finite State Machines (FSM). States are user-defined enums; the engine guarantees type-safe transitions between them.
Well-suited for:
- Data pipelines: ETL jobs with parallel processing (Split/Join) and aggregation
- AI agents: Multi-step inference chains with shared context
- Background systems: Scheduled maintenance, periodic reporting, cron jobs
§Quick Start
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Fetch, Process, Done }
// A typed configuration struct shared with tasks via Resources.
struct FetchConfig { batch_size: usize }
#[resource]
impl Resource for FetchConfig {}
struct FetchTask;
struct ProcessTask;
#[task]
impl Task<Step> for FetchTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let config = res.get::<FetchConfig, _>("config")?;
let store = res.get::<MemoryStore, _>("store")?;
// Produce `batch_size` items and store them for the next task.
let data: Vec<u32> = (1..=(config.batch_size as u32)).collect();
store.put("data", data)?;
Ok(TaskResult::Single(Step::Process))
}
}
#[task]
impl Task<Step> for ProcessTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let data: Vec<u32> = store.get("data")?;
store.put("sum", data.iter().sum::<u32>())?;
Ok(TaskResult::Single(Step::Done))
}
}
let store = MemoryStore::new();
let resources = Resources::new()
.insert("config", FetchConfig { batch_size: 3 })
.insert("store", store.clone());
let workflow = Workflow::new(resources)
.register(Step::Fetch, FetchTask)
.register(Step::Process, ProcessTask)
.add_exit_state(Step::Done);
let final_state = workflow.orchestrate(Step::Fetch).await?;
assert_eq!(final_state, Step::Done);
// The sum of 1..=3 is 6.
assert_eq!(store.get::<u32>("sum")?, 6);§Resource-Free Workflows
When tasks do not need shared resources, use Workflow::bare and implement
run_bare instead of run:
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Compute, Done }
struct ComputeTask;
#[task]
impl Task<Step> for ComputeTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
// Pure computation — no resource lookup needed.
let result: u32 = (1..=100).sum();
assert_eq!(result, 5050);
Ok(TaskResult::Single(Step::Done))
}
}
let workflow = Workflow::bare()
.register(Step::Compute, ComputeTask)
.add_exit_state(Step::Done);
let final_state = workflow.orchestrate(Step::Compute).await?;
assert_eq!(final_state, Step::Done);§Core Concepts
§Finite State Machines (FSM)
Workflows in Cano are state machines. You define your states as an enum, register
a Task handler for each state, and the engine manages transitions.
§Tasks
Task is the single interface for processing logic — implement one of run /
run_bare. Several specialized task models build on it:
RouterTasktrait: side-effect-free branching; reads resources, returns the next state without writing anything. Registered withWorkflow::register_router— the engine dispatches it like a normal state but writes noCheckpointRow.PollTasktrait: “wait-until” loop;poll()returnsPollOutcome::ReadyorPollOutcome::Pending { delay_ms }on each call. Defaults toTaskConfig::minimal()(no retries). Registered withWorkflow::register— the engine appliesattempt_timeoutif set viaconfig().BatchTasktrait: fan-out over data items viaload/process_item/finish.SteppedTasktrait: resumable iterative work viastep()with a serializable cursor.
Every RouterTask, PollTask, BatchTask, and SteppedTask automatically
implements Task via companion impls emitted by the #[task::router],
#[task::poll], #[task::batch], and #[task::stepped] macros respectively.
§Parallel Execution (Split/Join)
Run tasks concurrently with Workflow::register_split and join results using
strategies: All, Any, Quorum(n), Percentage(f64), PartialResults(min),
or PartialTimeout.
§Store
MemoryStore provides a thread-safe Arc<RwLock<HashMap>> for sharing typed data
between states. For custom backends, register a concrete storage resource in Resources.
§Observability
Beyond the optional tracing integration, implement WorkflowObserver and attach it
with Workflow::with_observer to receive synchronous lifecycle and failure callbacks
(state entry, task start/success/failure, retry, circuit-open, checkpoint, resume). With
the tracing feature enabled, [TracingObserver] is a ready-made observer that re-emits
those callbacks as tracing events. Resources can also report their own HealthStatus
via Resource::health, aggregated by Resources::check_all_health.
§Crash Recovery
Attach a CheckpointStore with Workflow::with_checkpoint_store (plus
Workflow::with_workflow_id) and the FSM records one CheckpointRow per state
entered — before that state’s task runs. After a crash, Workflow::resume_from
reloads the run, re-enters the FSM at the last checkpointed state, and continues
forward; the resumed state’s task re-runs, so tasks at and after the resume point must
be idempotent. The trait is backend-agnostic; with the recovery feature,
RedbCheckpointStore is a ready-made embedded, ACID implementation. Workflows
without a checkpoint store pay nothing. See the workflow_recovery example.
Workflow::with_workflow_version stamps a u32 version onto every appended
CheckpointRow (default 0); Workflow::resume_from then refuses to replay a
checkpoint whose stored version differs, returning
CanoError::WorkflowVersionMismatch. Bump it when the FSM shape changes between
deploys (added/removed states, cursor schema, compensation output layout) to avoid
silently resuming onto an incompatible workflow definition.
§Sagas & Compensation
For steps that mutate external systems, implement CompensatableTask (its run
returns the next state and an Output; its compensate undoes the step given that
Output) and register it with Workflow::register_with_compensation. The engine
keeps a per-run compensation stack; if a later state fails, it drains the stack in
reverse and runs each compensate. A clean rollback returns the original error (and
clears the checkpoint log if one is attached); a failed compensate produces
CanoError::CompensationFailed with the original error plus every compensation
error. With a checkpoint store, outputs are persisted, so a resumed run can still
compensate work done before the crash. See the saga_payment example.
§Processing Lifecycle
Task: Single run() / run_bare() method — full control over execution flow.
RouterTask: Single route() method — side-effect-free; reads resources, returns
the next state. Dispatched like a normal state but leaves no checkpoint row.
PollTask: Single poll() method — returns PollOutcome::Ready(TaskResult)
or PollOutcome::Pending { delay_ms }; the engine loops until Ready. Defaults
to TaskConfig::minimal() (no retries). Use config().with_attempt_timeout(dur) to cap
total wall-clock time; registered with Workflow::register like any other task.
§Module Overview
task: TheTasktrait — singlerun()/run_bare()method- [
task::router]: TheRouterTasktrait — side-effect-free branching viaroute(); registered withWorkflow::register_router - [
task::poll]: ThePollTasktrait — wait-until loop viapoll(); registered withWorkflow::register - [
task::batch]: TheBatchTasktrait — fan-out over data items viaload/process_item/finish; registered withWorkflow::register - [
task::stepped]: TheSteppedTasktrait — resumable iterative work viastep()with a serializable cursor; registered withWorkflow::register_stepped(persists the cursor when a checkpoint store is attached) workflow:Workflow— FSM orchestration with Split/Join supportscheduler(requiresschedulerfeature):Scheduler(builder) andRunningScheduler(live handle) — cron and interval schedulingresource:Resourcetrait,Resourcesdictionary, andHealthStatus— lifecycle-aware resource management and health probesobserver:WorkflowObserver— synchronous lifecycle/failure event hooks (and [TracingObserver], behind thetracingfeature)metrics(requiresmetricsfeature): aMetricsObserverplus low-cardinality counters / histograms / gauges for workflow, task, retry, split/join, circuit-breaker, scheduler, processing-loop, recovery and saga internals — callcano::metrics::describe()for the full list with help text and unitsrecovery:CheckpointStore/CheckpointRow— append-only checkpoint log for crash recovery (andRedbCheckpointStore, behind therecoveryfeature)saga:CompensatableTask— pair a forward step with a compensating action; failures roll back viaWorkflow::register_with_compensationstore:MemoryStore— a typed in-memory store that implementsResourceerror:CanoErrorvariants and theCanoResultalias
§Getting Started
- Run an example:
cargo run --example workflow_simple - Read the module docs — each module has detailed documentation and examples
- Run benchmarks:
cargo bench --bench workflow_performance
Re-exports§
pub use circuit::CircuitBreaker;pub use circuit::CircuitPolicy;pub use circuit::CircuitState;pub use circuit::Permit as CircuitPermit;pub use error::CanoError;pub use error::CanoResult;pub use observer::WorkflowObserver;pub use recovery::CheckpointRow;pub use recovery::CheckpointStore;pub use recovery::RowKind;pub use resource::HealthStatus;pub use resource::Resource;pub use resource::Resources;pub use saga::CompensatableTask;pub use saga::ErasedCompensatable;pub use task::batch::BatchTask;pub use task::batch::BatchTaskObject;pub use task::batch::DefaultBatchItem;pub use task::batch::DefaultBatchItemOutput;pub use task::batch::DynBatchTask;pub use task::batch::run_batch;pub use task::poll::DynPollTask;pub use task::poll::PollErrorPolicy;pub use task::poll::PollOutcome;pub use task::poll::PollTask;pub use task::poll::PollTaskObject;pub use task::poll::run_poll_loop;pub use task::router::DynRouterTask;pub use task::router::RouterTask;pub use task::router::RouterTaskObject;pub use task::stepped::DefaultStepCursor;pub use task::stepped::DynSteppedTask;pub use task::stepped::StepOutcome;pub use task::stepped::SteppedTask;pub use task::stepped::SteppedTaskObject;pub use task::stepped::run_stepped;pub use store::MemoryStore;pub use task::DynTask;pub use task::RetryMode;pub use task::Task;pub use task::TaskConfig;pub use task::TaskObject;pub use task::TaskResult;pub use workflow::JoinConfig;pub use workflow::JoinStrategy;pub use workflow::SplitResult;pub use workflow::SplitTaskResult;pub use workflow::StateEntry;pub use workflow::Workflow;
Modules§
- circuit
- Circuit Breaker — Fail-Fast Protection for Flaky Dependencies
- error
- Error Handling - Clear, Actionable Error Messages
- observer
- Workflow Observer — lifecycle and failure event hooks
- prelude
- Simplified imports for common usage patterns
- recovery
- Recovery — Append-Only Checkpoint Storage
- resource
- Resources — Unified Lifecycle-Managed Dependencies
- saga
- Saga / Compensation — undo successful work when a later step fails
- store
- Key-Value Store Helpers for Processing Pipelines
- task
- Task API - Workflow Processing Interface
- workflow
- Workflow API - Build Workflows with Split/Join Support
Attribute Macros§
- checkpoint_
store - Attribute macro applied to the
CheckpointStoretrait definition andimpl CheckpointStoreblocks. - resource
- Attribute macro applied to the
Resourcetrait definition andimpl Resourceblocks. - task
- Attribute macro applied to
Tasktrait definitions andimpl Taskblocks to rewriteasync fnmethods into ones returningPin<Box<dyn Future<Output = ...> + Send + 'async_trait>>.
Derive Macros§
- From
Resources - Derive macro that generates a
from_resourcesassociated function for a struct. - Resource
- Derive macro that generates an empty
Resourceimpl for a struct.