celers-canvas
Distributed workflow primitives for CeleRS task orchestration. Build complex task dependencies with Chain, Group, Chord, Map, and Starmap patterns.
Overview
Production-ready workflow patterns inspired by Celery's Canvas:
- ✅ Chain: Sequential task execution with result passing
- ✅ Group: Parallel task execution
- ✅ Chord: Map-reduce pattern (parallel tasks + callback)
- ✅ Map: Apply task to multiple argument sets
- ✅ Starmap: Like Map but unpacks arguments
- ✅ Signature: Reusable task definitions with arguments
- ✅ Priority Support: Task prioritization in workflows
- ✅ Immutability: Prevent argument replacement in chains
Quick Start
use ;
use RedisBroker;
// Create broker
let broker = new?;
// Chain: Sequential execution
let workflow = new
.then
.then
.then;
let task_id = workflow.apply.await?;
println!;
Workflow Primitives
Chain - Sequential Execution
Execute tasks one after another, passing results as arguments:
use Chain;
// task1(args1) -> task2(result1) -> task3(result2)
let chain = new
.then
.then
.then;
// Start the chain
let first_task_id = chain.apply.await?;
How it works:
- First task executes with provided arguments
- Result passed to next task
- Continues until all tasks complete
- Final task returns the result
Use cases:
- Data pipelines (ETL workflows)
- Multi-stage processing
- Sequential transformations
- Dependent operations
Group - Parallel Execution
Execute multiple tasks in parallel:
use Group;
// (task1 | task2 | task3)
let group = new
.add
.add
.add;
// Start all tasks in parallel
let group_id = group.apply.await?;
Features:
- Tasks execute independently
- No result aggregation
- Group ID for tracking
- Individual task priorities
Use cases:
- Parallel data processing
- Bulk operations
- Independent computations
- Fan-out patterns
Chord - Map-Reduce Pattern
Execute tasks in parallel, then run callback with aggregated results:
use ;
use RedisResultBackend;
// (task1 | task2 | task3) -> callback([result1, result2, result3])
let header = new
.add
.add
.add;
let callback = new;
let chord = new;
// Requires result backend for coordination
let mut backend = new?;
let chord_id = chord.apply.await?;
How it works:
- Header tasks execute in parallel
- Each completion increments atomic counter (Redis INCR)
- When all tasks complete, callback enqueued
- Callback receives array of all results
Requirements:
backend-redisfeature enabled- Redis result backend for coordination
- Atomic counter for barrier synchronization
Use cases:
- Map-reduce operations
- Aggregating parallel results
- Distributed computations
- Parallel data collection + summarization
Map - Batch Processing
Apply the same task to multiple argument sets:
use ;
// map(task, [args1, args2, args3])
let task = new;
let argsets = vec!;
let map = new;
let group_id = map.apply.await?;
Equivalent to:
new
.add
.add
.add
Use cases:
- Batch image/video processing
- Bulk data transformations
- Same operation on many inputs
- Parallel file operations
Starmap - Unpacked Arguments
Like Map but unpacks argument tuples:
use ;
// starmap(task, [(a1, b1), (a2, b2)])
let task = new;
let argsets = vec!;
let starmap = new;
let group_id = starmap.apply.await?;
Difference from Map:
- Map: Each argset is a single argument
- Starmap: Each argset is unpacked as multiple arguments
Use cases:
- Functions with multiple parameters
- Coordinate processing (x, y pairs)
- Key-value operations
- Tuple-based data processing
Signature - Reusable Task Definitions
use Signature;
use HashMap;
// Basic signature
let sig = new;
// With positional arguments
let sig = new
.with_args;
// With keyword arguments
let mut kwargs = new;
kwargs.insert;
kwargs.insert;
let sig = new
.with_kwargs;
// With priority
let sig = new
.with_priority; // Higher = more urgent
// Immutable (args cannot be replaced in chain)
let sig = new
.with_args
.immutable;
Advanced Patterns
Nested Workflows
Combine workflows for complex patterns:
// Process groups in sequence
let group1 = new
.add
.add;
let group2 = new
.add
.add;
// Execute groups sequentially (not directly supported, use manual coordination)
Priority Workflows
Assign priorities to workflow tasks:
let high_priority_chain = new
.then_signature
.then_signature;
Partial Chord Results
Handle partial results with error handling:
// In callback task, check which tasks completed successfully
// Results array may contain errors for failed tasks
async
Workflow Optimization
Optimize workflows before execution using the WorkflowCompiler:
use ;
// Create a workflow with redundant tasks
let chain = new
.then_signature
.then_signature
.then_signature; // Duplicate
// Optimize the workflow
let compiler = new.aggressive;
let optimized = compiler.optimize_chain;
// The optimized chain has duplicates removed
assert_eq!; // Was 3, now 2
Available Optimization Passes:
-
Common Subexpression Elimination (CSE): Removes duplicate task signatures
let compiler = new.aggressive; // Deduplicates identical tasks (same name, args, kwargs) -
Dead Code Elimination (DCE): Removes unreachable or invalid tasks
let compiler = new; // Removes tasks with empty names or no effect -
Task Fusion: Combines sequential tasks with the same name
let compiler = new.aggressive; // Combines immutable tasks with same name and priority -
Parallel Scheduling: Optimizes task execution order
let compiler = new .add_pass; // Sorts group tasks by priority (highest first) -
Resource Optimization: Improves resource utilization
let compiler = new .add_pass; // Groups tasks by queue for better locality
Example: Combined Optimizations
use ;
let group = new
.add_signature
.add_signature // Dead code
.add_signature
.add_signature; // Duplicate
let compiler = new
.aggressive
.add_pass;
let optimized = compiler.optimize_group;
// Result: 2 tasks (dead code removed, duplicate removed, sorted by priority)
// Task order: task2 (priority 9), task1 (priority 1)
Performance Benefits:
- Reduced task count (faster workflow setup)
- Better queue locality (improved throughput)
- Optimized scheduling (higher priority tasks first)
- See
examples/workflow_optimization.rsfor detailed examples
Chord Barrier Synchronization
The Chord primitive uses Redis atomic operations for barrier synchronization:
Implementation
// 1. Initialize chord state
chord_init;
// 2. Each worker completion increments counter
let count = chord_complete_task.await?; // Atomic INCR
// 3. When count == total, enqueue callback
if count >= state.total
Thread Safety
- Atomic Counter: Redis INCR operation (atomic)
- Race Condition Free: Multiple workers can complete simultaneously
- Exactly Once: Callback enqueued exactly once
- No Lost Updates: Atomic operations prevent race conditions
Feature Flags
[]
= { = "0.1", = ["backend-redis"] }
Available features:
backend-redis: Enable Chord support with Redis backend (required for barrier synchronization)
Without backend-redis:
- Chain, Group, Map, Starmap work normally
- Chord falls back to Group (no callback coordination)
Error Handling
use ;
match new.then.apply.await
Error Types:
Invalid: Empty workflow, missing callback, etc.Broker: Enqueue failures, connection errorsSerialization: JSON encoding errors
Comparison with Celery
| Feature | CeleRS Canvas | Celery Canvas |
|---|---|---|
| Chain | ✅ | ✅ |
| Group | ✅ | ✅ |
| Chord | ✅ | ✅ |
| Map | ✅ | ✅ |
| Starmap | ✅ | ✅ |
| Immutability | ✅ | ✅ |
| Priority | ✅ | ✅ |
| Nested Workflows | ⚠️ Manual | ✅ Automatic |
| Result Backend Required | Chord only | All (optional) |
Compatibility:
- API design matches Celery Canvas patterns
- Task format compatible with Celery workers
- Can interoperate with Python Celery deployments
Performance Characteristics
| Workflow | Time Complexity | Space Complexity | Notes |
|---|---|---|---|
| Chain | O(n) sequential | O(1) per task | Executes serially |
| Group | O(1) enqueue | O(n) tasks | Parallel execution |
| Chord | O(1) + callback | O(n) + state | Atomic counter overhead |
| Map | O(1) enqueue | O(n) tasks | Same as Group |
| Starmap | O(1) enqueue | O(n) tasks | Same as Group |
Throughput:
- Chain: Limited by sequential execution
- Group/Map/Starmap: Limited by broker throughput (50K+ tasks/sec with batch)
- Chord: Same as Group + Redis INCR overhead (<1ms)
Examples
Data Pipeline
// ETL workflow
let pipeline = new
.then
.then
.then;
pipeline.apply.await?;
Image Processing
// Batch resize images
let task = new;
let images = vec!;
let workflow = new;
workflow.apply.await?;
Map-Reduce Analytics
// Process log files in parallel, then aggregate
let header = new
.add
.add
.add;
let callback = new;
let workflow = new;
let mut backend = new?;
workflow.apply.await?;
Priority Processing
// High-priority urgent tasks
let urgent = new
.add_signature
.add_signature;
urgent.apply.await?;
Requirements
- Rust: 1.70+ (async/await, trait bounds)
- Broker: Any CeleRS broker (Redis, PostgreSQL, etc.)
- Backend: Redis backend required for Chord (feature:
backend-redis) - Serialization: serde_json for argument encoding
Architecture
┌─────────────────────────────────────────────────────────────┐
│ Canvas Workflow │
└─────────────────────────────────────────────────────────────┘
│
├─ Chain ──> Sequential execution
├─ Group ──> Parallel execution
├─ Chord ──> Map-reduce (Group + callback)
├─ Map ───> Batch same task
└─ Starmap > Batch with unpacking
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Broker (Queue) │
│ (Redis, PostgreSQL) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Workers (Executors) │
│ (Process tasks + update state) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Result Backend (Chord coordination) │
│ (Redis atomic INCR) │
└─────────────────────────────────────────────────────────────┘
Testing
See Also
- Examples:
examples/canvas_workflows.rs- Comprehensive workflow examples - Core:
celers-core- Task registry and execution - Worker:
celers-worker- Worker runtime with workflow support - Backend:
celers-backend-redis- Result backend for Chord
License
MIT OR Apache-2.0