Cano: Async Data & AI Workflows in Rust
Async workflow engine with built-in scheduling, retry logic, and state machine semantics.
Cano is an async workflow engine for Rust that manages complex processing through composable workflows. It can be used for data processing, AI inference workflows, and background jobs. Cano provides a simple, fast and type-safe API for defining workflows with retry strategies, scheduling capabilities, and shared state management.
The engine is built on three core concepts: Tasks and Nodes to encapsulate business logic, Workflows to manage state transitions, and Schedulers to run workflows on a schedule.
The Node API is inspired by the PocketFlow project, adapted for Rust's async ecosystem.
Features
- Task & Node APIs: Single
Task
trait for simple processing logic, orNode
trait for structured three-phase lifecycle - State Machines: Type-safe enum-driven state transitions with compile-time checking
- Retry Strategies: None, fixed delays, and exponential backoff with jitter (for both Tasks and Nodes)
- Flexible Storage: Built-in
MemoryStore
or custom struct types for data sharing - Workflow Scheduling (optional
scheduler
feature): Built-in scheduler with intervals, cron schedules, and manual triggers - Concurrent Execution: Execute multiple workflow instances in parallel with timeout strategies
- Observability (optional
tracing
feature): Comprehensive tracing and observability for workflow execution - All Features (optional
all
feature): Convenience feature that enables bothscheduler
andtracing
- Performance: Minimal overhead with direct execution and zero-cost abstractions
Getting Started
Add Cano to your Cargo.toml
:
[]
= "0.6"
= "0.1"
= { = "1", = ["macros", "sync", "time", "rt-multi-thread"] }
For scheduler support:
[]
= { = "0.6", = ["scheduler"] }
For observability and tracing:
[]
= { = "0.6", = ["tracing"] }
= "0.1"
Or use the all
feature for convenience:
[]
= { = "0.6", = ["all"] }
= "0.1"
Basic Example
use async_trait;
use *;
// Define your workflow states
// Simple Task implementation - single run method
;
// Structured Node implementation - three-phase lifecycle
;
async
Core Concepts
1. Tasks & Nodes - Processing Units
Cano provides two approaches for implementing processing logic:
Tasks - Simple & Flexible
A Task
provides a simplified interface with a single run
method. Use tasks when you want simplicity and direct control over the execution logic. Both Task
and Node
support retry strategies.
;
Nodes - Structured & Resilient
A Node
implements a structured three-phase lifecycle with built-in retry capabilities. Nodes are ideal for complex operations where separating data loading, execution, and result handling improves clarity and maintainability.
- Prep: Load data, validate inputs, setup resources
- Exec: Core processing logic (with automatic retry support)
- Post: Store results, cleanup, determine next action
;
Compatibility & When to Use Which
- Every Node automatically implements Task - you can use any Node wherever Tasks are accepted.
- Use Task for: Simple processing, quick prototypes, or when you prefer a single method for all logic.
- Use Node for: Complex processing that benefits from a structured three-phase lifecycle (prep, exec, post).
Retry Strategies
Both Tasks and Nodes support retry strategies. Configure retry behavior using TaskConfig
:
// Task with retry configuration
// Node with retry configuration
2. Store - Data Sharing
Cano supports flexible data sharing between workflow nodes through stores.
MemoryStore (Key-Value Store)
The built-in MemoryStore provides a flexible key-value interface:
let store = new;
// Store different types of data
store.put?;
store.put?;
store.put?;
// Retrieve data with type safety
let user_id: i32 = store.get?;
let name: String = store.get?;
// Append items to existing collections
store.append?; // scores is now [85, 92, 78, 95]
// Store operations
let count = store.len?;
let is_empty = store.is_empty?;
store.clear?;
Custom Store Types
For better performance and type safety, use custom struct types:
3. Workflows - State Management
Build workflows with state machine semantics. Workflows can register both Tasks and Nodes using the unified register
method:
let mut workflow = new;
workflow.register // Task
.register // Node
.add_exit_states;
let result = workflow.orchestrate.await?;
Complex Workflows
Build sophisticated state machine pipelines with conditional branching and error handling:
graph TD
A[Start] --> B[LoadData]
B --> C{Validate}
C -->|Valid| D[Process]
C -->|Invalid| E[Sanitize]
C -->|Critical Error| F[Error]
E --> D
D --> G{QualityCheck}
G -->|High Quality| H[Enrich]
G -->|Low Quality| I[BasicProcess]
G -->|Failed & Retries Left| J[Retry]
G -->|Failed & No Retries| K[Failed]
H --> L[Complete]
I --> L
J --> D
F --> M[Cleanup]
M --> K
// Validation node with multiple outcomes
;
// Quality check node with retry logic
;
// OTHER NODES ...
// Build the complete workflow
let mut workflow = new;
workflow
.register
.register
.register
.register
.register
.register
.register
.register
.register
.add_exit_states;
let result = workflow.orchestrate.await?;
Concurrent Workflows
Execute multiple workflow instances in parallel with different timeout strategies:
use *;
// Create a concurrent workflow with the same API as regular workflows
let mut concurrent_workflow = new;
concurrent_workflow.register;
concurrent_workflow.add_exit_state;
// Execute with different wait strategies
let stores: = .map.collect;
// Wait for all workflows to complete
let = concurrent_workflow
.execute_concurrent
.await?;
// Wait for first 5 to complete, then cancel the rest
let = concurrent_workflow
.execute_concurrent
.await?;
// Execute within time limit
let = concurrent_workflow
.execute_concurrent
.await?;
Scheduling Workflows
The Scheduler provides workflow scheduling capabilities for background jobs and automated workflows:
use *;
use Duration;
;
async
Features
- Flexible Scheduling: Intervals, cron expressions, and manual triggers
- Concurrent Workflows: Execute multiple workflow instances in parallel with configurable wait strategies
- Status Monitoring: Check workflow status, run counts, and execution times
- Graceful Shutdown: Stop with timeout for running flows to complete
- Concurrent Execution: Multiple flows can run simultaneously
Workflow Observability & Tracing
Cano provides comprehensive observability through the optional tracing
feature using the tracing library.
Enable Tracing
[]
= { = "0.6", = ["tracing"] }
= "0.1"
= "0.3"
What Gets Traced
- Workflow Level: Orchestration start/completion, state transitions, final states
- Task Level: Task execution with retry logic, attempts, delays, success/failure outcomes
- Node Level: Three-phase lifecycle (prep, exec, post), retry attempts with detailed context
- Scheduler Level: Workflow scheduling, concurrent execution, run counts, durations
- Concurrent Workflows: Individual instance tracking and aggregate statistics
Basic Usage
use *;
use ;
use ;
async
Advanced Tracing
// Custom spans for concurrent workflows
let concurrent_span = info_span!;
let mut concurrent_workflow = new
.with_tracing_span;
// Custom tracing in nodes
Tracing Output
With RUST_LOG=info cargo run
, you'll see structured output like:
INFO user_data_processing{user_id="12345" batch_id="batch_001"}: Starting workflow orchestration
INFO user_data_processing{user_id="12345" batch_id="batch_001"}:task_execution{state=Start}:run_with_retries{max_attempts=4}: Starting task execution with retry logic
INFO user_data_processing{user_id="12345" batch_id="batch_001"}:task_execution{state=Start}:run_with_retries{max_attempts=4}:task_attempt{attempt=1}: Starting data preparation node_id=processor_1
INFO user_data_processing{user_id="12345" batch_id="batch_001"}:task_execution{state=Start}:run_with_retries{max_attempts=4}:task_attempt{attempt=1}: Node execution completed success=true
INFO user_data_processing{user_id="12345" batch_id="batch_001"}:task_execution{state=Start}:run_with_retries{max_attempts=4}: Task execution successful attempt=1
INFO user_data_processing{user_id="12345" batch_id="batch_001"}: Workflow completed successfully final_state=Complete
Performance
- Zero-cost when disabled: No overhead when tracing feature is not enabled
- Minimal impact when enabled: Structured logging with efficient processing
- Conditional compilation: Tracing code only compiled when feature is enabled
Run the tracing demo:
RUST_LOG=info
Examples and Testing
Run Examples
# Examples directory contains various workflow implementations
Run Tests and Benchmarks
# Run all tests
# Run benchmarks from the benches directory
Benchmark results are saved in target/criterion/
.
Documentation
- API Documentation - Complete API reference
- Examples Directory - Hands-on code examples
- Benchmarks - Performance testing and optimization
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the MIT License - see the LICENSE file for details.