Skip to main content

Crate cano

Crate cano 

Source
Expand description

§Cano: Type-Safe Async Workflow Engine

Cano is an async workflow orchestration engine for Rust built on Finite State Machines (FSM). States are user-defined enums; the engine guarantees type-safe transitions between them.

Well-suited for:

  • Data pipelines: ETL jobs with parallel processing (Split/Join) and aggregation
  • AI agents: Multi-step inference chains with shared context
  • Background systems: Scheduled maintenance, periodic reporting, cron jobs

§Quick Start

use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Fetch, Process, Done }

// A typed configuration struct shared with tasks via Resources.
struct FetchConfig { batch_size: usize }
#[resource]
impl Resource for FetchConfig {}

struct FetchTask;
struct ProcessTask;

#[task]
impl Task<Step> for FetchTask {
    async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
        let config = res.get::<FetchConfig, _>("config")?;
        let store = res.get::<MemoryStore, _>("store")?;
        // Produce `batch_size` items and store them for the next task.
        let data: Vec<u32> = (1..=(config.batch_size as u32)).collect();
        store.put("data", data)?;
        Ok(TaskResult::Single(Step::Process))
    }
}

#[task]
impl Task<Step> for ProcessTask {
    async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
        let store = res.get::<MemoryStore, _>("store")?;
        let data: Vec<u32> = store.get("data")?;
        store.put("sum", data.iter().sum::<u32>())?;
        Ok(TaskResult::Single(Step::Done))
    }
}

let store = MemoryStore::new();
let resources = Resources::new()
    .insert("config", FetchConfig { batch_size: 3 })
    .insert("store", store.clone());
let workflow = Workflow::new(resources)
    .register(Step::Fetch, FetchTask)
    .register(Step::Process, ProcessTask)
    .add_exit_state(Step::Done);

let final_state = workflow.orchestrate(Step::Fetch).await?;
assert_eq!(final_state, Step::Done);

// The sum of 1..=3 is 6.
assert_eq!(store.get::<u32>("sum")?, 6);

§Resource-Free Workflows

When tasks do not need shared resources, use Workflow::bare and implement run_bare instead of run:

use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Compute, Done }

struct ComputeTask;

#[task]
impl Task<Step> for ComputeTask {
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        // Pure computation — no resource lookup needed.
        let result: u32 = (1..=100).sum();
        assert_eq!(result, 5050);
        Ok(TaskResult::Single(Step::Done))
    }
}

let workflow = Workflow::bare()
    .register(Step::Compute, ComputeTask)
    .add_exit_state(Step::Done);

let final_state = workflow.orchestrate(Step::Compute).await?;
assert_eq!(final_state, Step::Done);

§Core Concepts

§Finite State Machines (FSM)

Workflows in Cano are state machines. You define your states as an enum, register a Task handler for each state, and the engine manages transitions.

§Tasks

Task is the single interface for processing logic — implement one of run / run_bare. Several specialized task models build on it:

Every RouterTask, PollTask, BatchTask, and SteppedTask automatically implements Task via companion impls emitted by the #[task::router], #[task::poll], #[task::batch], and #[task::stepped] macros respectively.

§Parallel Execution (Split/Join)

Run tasks concurrently with Workflow::register_split and join results using strategies: All, Any, Quorum(n), Percentage(f64), PartialResults(min), or PartialTimeout.

§Store

MemoryStore provides a thread-safe Arc<RwLock<HashMap>> for sharing typed data between states. For custom backends, register a concrete storage resource in Resources.

§Observability

Beyond the optional tracing integration, implement WorkflowObserver and attach it with Workflow::with_observer to receive synchronous lifecycle and failure callbacks (state entry, task start/success/failure, retry, circuit-open, checkpoint, resume). With the tracing feature enabled, [TracingObserver] is a ready-made observer that re-emits those callbacks as tracing events. Resources can also report their own HealthStatus via Resource::health, aggregated by Resources::check_all_health.

§Crash Recovery

Attach a CheckpointStore with Workflow::with_checkpoint_store (plus Workflow::with_workflow_id) and the FSM records one CheckpointRow per state entered — before that state’s task runs. After a crash, Workflow::resume_from reloads the run, re-enters the FSM at the last checkpointed state, and continues forward; the resumed state’s task re-runs, so tasks at and after the resume point must be idempotent. The trait is backend-agnostic; with the recovery feature, RedbCheckpointStore is a ready-made embedded, ACID implementation. Workflows without a checkpoint store pay nothing. See the workflow_recovery example.

Workflow::with_workflow_version stamps a u32 version onto every appended CheckpointRow (default 0); Workflow::resume_from then refuses to replay a checkpoint whose stored version differs, returning CanoError::WorkflowVersionMismatch. Bump it when the FSM shape changes between deploys (added/removed states, cursor schema, compensation output layout) to avoid silently resuming onto an incompatible workflow definition.

§Sagas & Compensation

For steps that mutate external systems, implement CompensatableTask (its run returns the next state and an Output; its compensate undoes the step given that Output) and register it with Workflow::register_with_compensation. The engine keeps a per-run compensation stack; if a later state fails, it drains the stack in reverse and runs each compensate. A clean rollback returns the original error (and clears the checkpoint log if one is attached); a failed compensate produces CanoError::CompensationFailed with the original error plus every compensation error. With a checkpoint store, outputs are persisted, so a resumed run can still compensate work done before the crash. See the saga_payment example.

§Processing Lifecycle

Task: Single run() / run_bare() method — full control over execution flow.

RouterTask: Single route() method — side-effect-free; reads resources, returns the next state. Dispatched like a normal state but leaves no checkpoint row.

PollTask: Single poll() method — returns PollOutcome::Ready(TaskResult) or PollOutcome::Pending { delay_ms }; the engine loops until Ready. Defaults to TaskConfig::minimal() (no retries). Use config().with_attempt_timeout(dur) to cap total wall-clock time; registered with Workflow::register like any other task.

§Module Overview

  • task: The Task trait — single run() / run_bare() method
  • [task::router]: The RouterTask trait — side-effect-free branching via route(); registered with Workflow::register_router
  • [task::poll]: The PollTask trait — wait-until loop via poll(); registered with Workflow::register
  • [task::batch]: The BatchTask trait — fan-out over data items via load/process_item/finish; registered with Workflow::register
  • [task::stepped]: The SteppedTask trait — resumable iterative work via step() with a serializable cursor; registered with Workflow::register_stepped (persists the cursor when a checkpoint store is attached)
  • workflow: Workflow — FSM orchestration with Split/Join support
  • scheduler (requires scheduler feature): Scheduler (builder) and RunningScheduler (live handle) — cron and interval scheduling
  • resource: Resource trait, Resources dictionary, and HealthStatus — lifecycle-aware resource management and health probes
  • observer: WorkflowObserver — synchronous lifecycle/failure event hooks (and [TracingObserver], behind the tracing feature)
  • metrics (requires metrics feature): a MetricsObserver plus low-cardinality counters / histograms / gauges for workflow, task, retry, split/join, circuit-breaker, scheduler, processing-loop, recovery and saga internals — call cano::metrics::describe() for the full list with help text and units
  • recovery: CheckpointStore / CheckpointRow — append-only checkpoint log for crash recovery (and RedbCheckpointStore, behind the recovery feature)
  • saga: CompensatableTask — pair a forward step with a compensating action; failures roll back via Workflow::register_with_compensation
  • store: MemoryStore — a typed in-memory store that implements Resource
  • error: CanoError variants and the CanoResult alias

§Getting Started

  1. Run an example: cargo run --example workflow_simple
  2. Read the module docs — each module has detailed documentation and examples
  3. Run benchmarks: cargo bench --bench workflow_performance

Re-exports§

pub use circuit::CircuitBreaker;
pub use circuit::CircuitPolicy;
pub use circuit::CircuitState;
pub use circuit::Permit as CircuitPermit;
pub use error::CanoError;
pub use error::CanoResult;
pub use observer::WorkflowObserver;
pub use recovery::CheckpointRow;
pub use recovery::CheckpointStore;
pub use recovery::RowKind;
pub use resource::HealthStatus;
pub use resource::Resource;
pub use resource::Resources;
pub use saga::CompensatableTask;
pub use saga::ErasedCompensatable;
pub use task::batch::BatchTask;
pub use task::batch::BatchTaskObject;
pub use task::batch::DefaultBatchItem;
pub use task::batch::DefaultBatchItemOutput;
pub use task::batch::DynBatchTask;
pub use task::batch::run_batch;
pub use task::poll::DynPollTask;
pub use task::poll::PollErrorPolicy;
pub use task::poll::PollOutcome;
pub use task::poll::PollTask;
pub use task::poll::PollTaskObject;
pub use task::poll::run_poll_loop;
pub use task::router::DynRouterTask;
pub use task::router::RouterTask;
pub use task::router::RouterTaskObject;
pub use task::stepped::DefaultStepCursor;
pub use task::stepped::DynSteppedTask;
pub use task::stepped::StepOutcome;
pub use task::stepped::SteppedTask;
pub use task::stepped::SteppedTaskObject;
pub use task::stepped::run_stepped;
pub use store::MemoryStore;
pub use task::DynTask;
pub use task::RetryMode;
pub use task::Task;
pub use task::TaskConfig;
pub use task::TaskObject;
pub use task::TaskResult;
pub use workflow::JoinConfig;
pub use workflow::JoinStrategy;
pub use workflow::SplitResult;
pub use workflow::SplitTaskResult;
pub use workflow::StateEntry;
pub use workflow::Workflow;

Modules§

circuit
Circuit Breaker — Fail-Fast Protection for Flaky Dependencies
error
Error Handling - Clear, Actionable Error Messages
observer
Workflow Observer — lifecycle and failure event hooks
prelude
Simplified imports for common usage patterns
recovery
Recovery — Append-Only Checkpoint Storage
resource
Resources — Unified Lifecycle-Managed Dependencies
saga
Saga / Compensation — undo successful work when a later step fails
store
Key-Value Store Helpers for Processing Pipelines
task
Task API - Workflow Processing Interface
workflow
Workflow API - Build Workflows with Split/Join Support

Attribute Macros§

checkpoint_store
Attribute macro applied to the CheckpointStore trait definition and impl CheckpointStore blocks.
resource
Attribute macro applied to the Resource trait definition and impl Resource blocks.
task
Attribute macro applied to Task trait definitions and impl Task blocks to rewrite async fn methods into ones returning Pin<Box<dyn Future<Output = ...> + Send + 'async_trait>>.

Derive Macros§

FromResources
Derive macro that generates a from_resources associated function for a struct.
Resource
Derive macro that generates an empty Resource impl for a struct.