🚀 Cano: Async Data & AI Workflows in Rust
Async workflow engine with built-in scheduling, retry logic, and state machine semantics.
Cano is an async workflow engine for Rust that manages complex processing through composable workflows. It can be used for data processing, AI inference workflows, and background jobs. Cano provides a simple, fast and type-safe API for defining workflows with retry strategies, scheduling capabilities, and shared state management.
The engine is built on three core concepts: Nodes to encapsulate business logic, Workflows to manage state transitions, and Schedulers to run workflows on a schedule.
Only Nodes are required to implement your processing logic, while workflows and schedulers are optional and can be used as needed.
The Node API is inspired by the PocketFlow project, adapted for Rust's async ecosystem.
Getting Started
Add Cano to your Cargo.toml
:
[]
= "0.3"
= "0.1"
= { = "1.0", = ["full"] }
Basic Example
use async_trait;
use *;
// Define your workflow states
;
async
This example demonstrates a basic workflow with a single processing node.
Features
- Node-based API: Single
Node
trait for implementing processing logic - Fluent configuration: Builder pattern for setup
- Retry strategies: None, fixed delays, and exponential backoff with jitter
- Shared state: Thread-safe key-value store for data passing between nodes
- Scheduling: Built-in scheduler for intervals, cron schedules, and manual triggers
- State machines: Chain nodes together into complex workflows
- Type safety: Encourages Enum-driven state transitions with compile-time checking.
- Performance: Minimal overhead with direct execution
How It Works
Cano is built around three concepts:
1. Nodes - Processing Units
A Node
implements the processing logic for your workflow. Each node can perform preparation, execution, and post-processing steps, where the preparation step loads data from a shared store, execution performs the main logic, and post-processing updates the store and determines the next state.
// Node with specific processing logic
;
Retry Logic
Each node can be configured with retry logic using different strategies:
No Retries - Fail immediately:
Fixed Retries - Consistent delays:
Exponential Backoff - Increasing delays:
Custom Exponential Backoff - Fine-tuned retry behavior:
When to Use Different Retry Modes:
- None: Database transactions, critical validations
- Fixed: Network calls, file operations
- Exponential: API calls, external services
- Custom Exponential: High-load scenarios requiring precise timing control
2. Store - Data Sharing
Cano allows any type to be used as a store for sharing data between workflow nodes. While MemoryStore is provided as a convenient key-value store implementation, you can use custom struct types for type-safe, performance-optimized data sharing.
Using MemoryStore (Key-Value Store)
The built-in MemoryStore provides a flexible key-value interface:
let store = new;
// Store different types of data
store.put?;
store.put?;
store.put?;
store.put?;
// Retrieve data with type safety
let user_id: i32 = store.get?;
let name: String = store.get?;
let scores: = store.get?;
let is_active: bool = store.get?;
// Append items to existing collections
store.append?; // scores is now [85, 92, 78, 95]
store.append?; // Creates new Vec if key doesn't exist
// Remove individual items
store.remove?;
// Check store status
let count = store.len?; // Get number of items
let is_empty = store.is_empty?; // Check if store is empty
// Iterate over all keys
let all_keys: = store.keys?.collect;
for key in all_keys
// Clear all data
store.clear?;
// Alternative removal method
store.delete?; // Alias for remove()
Using Custom Store Types
For better performance and type safety, you can use any custom struct as a store. This enables direct field access, stack allocation, and compile-time type checking:
// Custom store with direct field access
// Nodes can access and modify the custom store directly
Custom stores provide:
- Performance: Direct field access, no hash map overhead
- Type Safety: Compile-time guarantees about data structure
- Memory Efficiency: Stack allocation, no heap allocations (if you are careful enough)
- API Clarity: Explicit data contracts between nodes
See the workflow_stack_store.rs
example for a complete demonstration of custom store types with request processing pipelines.
3. Workflows - State Management
Build workflows with state machine semantics:
let mut workflow = new;
workflow.register_node
.register_node
.register_node
.add_exit_states;
let result = workflow.orchestrate.await?;
Complex Workflows
Build state machine pipelines with conditional branching, error handling, and retry logic:
graph TD
A[Start] --> B[LoadData]
B --> C{Validate}
C -->|Valid| D[Process]
C -->|Invalid| E[Sanitize]
C -->|Critical Error| F[Error]
E --> D
D --> G{QualityCheck}
G -->|High Quality| H[Enrich]
G -->|Low Quality| I[BasicProcess]
G -->|Failed & Retries Left| J[Retry]
G -->|Failed & No Retries| K[Failed]
H --> L[Complete]
I --> L
J --> D
F --> M[Cleanup]
M --> K
// Validation node with multiple possible outcomes
;
// Quality check node with conditional processing paths
;
// # ALL OTHER NODES...
// Build the complex workflow
let mut workflow = new;
workflow
.register_node
.register_node
.register_node
.register_node
.register_node
.register_node
.register_node // -> Complete
.register_node // -> Complete
.register_node // -> Process
.register_node // -> Failed
.add_exit_states;
let result = workflow.orchestrate.await?;
Scheduler - Workflow Scheduling
The Scheduler provides workflow scheduling capabilities for background jobs, periodic processing, and automated workflows.
Scheduler Usage
use *;
use Duration;
async
Scheduler Features
- Flexible Scheduling: Support for intervals, cron expressions, and manual triggers
- Convenience Methods:
every_seconds()
,every_minutes()
,every_hours()
helpers - Status Monitoring: Check workflow status, run counts, and execution times
- Manual Control: Trigger flows manually and monitor execution
- Graceful Shutdown: Stop with timeout for running flows to complete
- Concurrent Execution: Multiple flows can run simultaneously
Advanced Scheduler Usage
// Scheduled workflow examples
let mut scheduler = new;
// Data processing every hour
scheduler.every_hours?;
// Health checks every 30 seconds
scheduler.every_seconds?;
// Daily reports at 9 AM using cron
scheduler.cron?;
// Manual cleanup task
scheduler.manual?;
// Start the scheduler
scheduler.start.await?;
// Monitor running flows
spawn;
// Graceful shutdown with 30 second timeout
scheduler.stop_with_timeout.await?;
Testing & Benchmarks
Run tests and view performance metrics:
# Run all tests
# Run performance benchmarks
# Run AI workflow examples
# Run workflow examples
# Run scheduler scheduling examples
Documentation
- API Documentation - Complete API reference
- Examples Directory - Hands-on code examples
- Benchmarks - Performance testing and optimization
Contributing
Contributions are welcome in the following areas:
- Documentation - Improve guides and examples
- Features - Add new store backends or workflow capabilities
- Performance - Optimize performance and memory usage
- Testing - Add test cases and edge case coverage
- Bug Fixes - Report and fix issues
License
This project is licensed under the MIT License - see the LICENSE file for details.