Skip to main content

Crate cano

Crate cano 

Source
Expand description

§Cano: Type-Safe Async Workflow Engine

Cano is an async workflow orchestration engine for Rust built on Finite State Machines (FSM). States are user-defined enums; the engine guarantees type-safe transitions between them.

Well-suited for:

  • Data pipelines: ETL jobs with parallel processing (Split/Join) and aggregation
  • AI agents: Multi-step inference chains with shared context
  • Background systems: Scheduled maintenance, periodic reporting, cron jobs

§Quick Start

use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Fetch, Process, Done }

// A typed configuration struct shared with tasks via Resources.
struct FetchConfig { batch_size: usize }
#[resource]
impl Resource for FetchConfig {}

struct FetchTask;
struct ProcessTask;

#[task]
impl Task<Step> for FetchTask {
    async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
        let config = res.get::<FetchConfig, _>("config")?;
        let store = res.get::<MemoryStore, _>("store")?;
        // Produce `batch_size` items and store them for the next task.
        let data: Vec<u32> = (1..=(config.batch_size as u32)).collect();
        store.put("data", data)?;
        Ok(TaskResult::Single(Step::Process))
    }
}

#[task]
impl Task<Step> for ProcessTask {
    async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
        let store = res.get::<MemoryStore, _>("store")?;
        let data: Vec<u32> = store.get("data")?;
        store.put("sum", data.iter().sum::<u32>())?;
        Ok(TaskResult::Single(Step::Done))
    }
}

let store = MemoryStore::new();
let resources = Resources::new()
    .insert("config", FetchConfig { batch_size: 3 })
    .insert("store", store.clone());
let workflow = Workflow::new(resources)
    .register(Step::Fetch, FetchTask)
    .register(Step::Process, ProcessTask)
    .add_exit_state(Step::Done);

let final_state = workflow.orchestrate(Step::Fetch).await?;
assert_eq!(final_state, Step::Done);

// The sum of 1..=3 is 6.
assert_eq!(store.get::<u32>("sum")?, 6);

§Resource-Free Workflows

When tasks do not need shared resources, use Workflow::bare and implement run_bare instead of run:

use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Compute, Done }

struct ComputeTask;

#[task]
impl Task<Step> for ComputeTask {
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        // Pure computation — no resource lookup needed.
        let result: u32 = (1..=100).sum();
        assert_eq!(result, 5050);
        Ok(TaskResult::Single(Step::Done))
    }
}

let workflow = Workflow::bare()
    .register(Step::Compute, ComputeTask)
    .add_exit_state(Step::Done);

let final_state = workflow.orchestrate(Step::Compute).await?;
assert_eq!(final_state, Step::Done);

§Core Concepts

§Finite State Machines (FSM)

Workflows in Cano are state machines. You define your states as an enum, register handlers (Task or Node) for each state, and the engine manages transitions.

§Tasks and Nodes

Two interfaces for processing logic:

  • Task trait: single run() method — straightforward operations and prototyping
  • Node trait: three-phase lifecycle (prepexecpost) with built-in retry

Every Node automatically implements Task via a blanket impl, so both can be registered with the same Workflow::register method.

§Parallel Execution (Split/Join)

Run tasks concurrently with Workflow::register_split and join results using strategies: All, Any, Quorum(n), Percentage(f64), PartialResults(min), or PartialTimeout.

§Store

MemoryStore provides a thread-safe Arc<RwLock<HashMap>> for sharing typed data between states. For custom backends, register a concrete storage resource in Resources.

§Processing Lifecycle

Task: Single run() method — full control over execution flow.

Node: Three-phase lifecycle (retried as a unit on prep or post failure):

  1. prep — load data, validate inputs, allocate resources
  2. exec — core logic (infallible — signature returns result directly)
  3. post — write results, determine next state

§Module Overview

§Getting Started

  1. Run an example: cargo run --example workflow_simple
  2. Read the module docs — each module has detailed documentation and examples
  3. Run benchmarks: cargo bench --bench node_performance

Re-exports§

pub use circuit::CircuitBreaker;
pub use circuit::CircuitPolicy;
pub use circuit::CircuitState;
pub use circuit::Permit as CircuitPermit;
pub use error::CanoError;
pub use error::CanoResult;
pub use node::DefaultNodeResult;
pub use node::DynNode;
pub use node::Node;
pub use node::NodeObject;
pub use resource::Resource;
pub use resource::Resources;
pub use store::MemoryStore;
pub use task::DynTask;
pub use task::RetryMode;
pub use task::Task;
pub use task::TaskConfig;
pub use task::TaskObject;
pub use task::TaskResult;
pub use workflow::JoinConfig;
pub use workflow::JoinStrategy;
pub use workflow::SplitResult;
pub use workflow::SplitTaskResult;
pub use workflow::StateEntry;
pub use workflow::Workflow;

Modules§

circuit
Circuit Breaker — Fail-Fast Protection for Flaky Dependencies
error
Error Handling - Clear, Actionable Error Messages
node
Node API - Structured Workflow Processing
prelude
Simplified imports for common usage patterns
resource
Resources — Unified Lifecycle-Managed Dependencies
store
Key-Value Store Helpers for Processing Pipelines
task
Task API - Simplified Workflow Interface
workflow
Workflow API - Build Workflows with Split/Join Support

Attribute Macros§

node
Attribute macro applied to the Node trait definition and impl Node blocks.
resource
Attribute macro applied to the Resource trait definition and impl Resource blocks.
task
Attribute macro applied to Task trait definitions and impl Task blocks to rewrite async fn methods into ones returning Pin<Box<dyn Future<Output = ...> + Send + 'async_trait>>.

Derive Macros§

FromResources
Derive macro that generates a from_resources associated function for a struct.
Resource
Derive macro that generates an empty Resource impl for a struct.