celers-backend-redis
Version: 0.2.0 | Status: [Stable] | Updated: 2026-03-27
Redis-based result backend for CeleRS task result storage and workflow state management. Provides atomic operations for Chord barrier synchronization.
Overview
Production-ready result backend with:
- ✅ Task Result Storage: Store and retrieve task results
- ✅ Chord State Management: Atomic barrier synchronization for map-reduce
- ✅ Result Expiration: TTL support for automatic cleanup
- ✅ Atomic Operations: Redis INCR for thread-safe counters
- ✅ Task Metadata: Complete task lifecycle tracking
- ✅ Multiple States: Pending, Started, Success, Failure, Retry, Revoked
- ✅ Multiplexed Connections: Efficient async connection pooling
Quick Start
use ;
use Uuid;
use Duration;
async
Task Result States
Available States
use TaskResult;
// Task is pending execution
let pending = Pending;
// Task is currently running
let started = Started;
// Task completed successfully
let success = Success;
// Task failed with error
let failure = Failure;
// Task was cancelled/revoked
let revoked = Revoked;
// Task retry scheduled (retry count = 3)
let retry = Retry;
State Transitions
Pending ──> Started ──> Success ✓
└──> Failure ✗
└──> Retry ↻ ──> Started (again)
└──> Revoked ✗
✓ = Final state (success)
✗ = Final state (error)
↻ = Retry loop
Task Metadata
Structure
use TaskMeta;
use Utc;
let task_id = new_v4;
let mut meta = new;
// Update metadata throughout lifecycle
meta.started_at = Some;
meta.worker = Some;
meta.result = Started;
// On completion
meta.completed_at = Some;
meta.result = Success;
Fields
| Field | Type | Description |
|---|---|---|
task_id |
Uuid | Unique task identifier |
task_name |
String | Task name (e.g., "process_image") |
result |
TaskResult | Current task state/result |
created_at |
DateTime | When task was created |
started_at |
Option<DateTime> | When task started executing |
completed_at |
Option<DateTime> | When task completed |
worker |
Option | Worker that executed the task |
Basic Operations
Store Result
use ;
let mut backend = new?;
let task_id = new_v4;
let mut meta = new;
meta.result = Success;
backend.store_result.await?;
Redis key: celery-task-meta-{task_id}
Get Result
match backend.get_result.await?
Delete Result
backend.delete_result.await?;
Set Expiration (TTL)
use Duration;
// Result expires after 1 hour
backend.set_expiration.await?;
// Result expires after 24 hours
backend.set_expiration.await?;
Redis command: EXPIRE celery-task-meta-{task_id} {seconds}
Chord Barrier Synchronization
The backend provides atomic operations for Chord (map-reduce) patterns.
How Chord Works
Header Tasks (parallel):
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Task 1 │ │ Task 2 │ │ Task 3 │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
│ ┌───────▼────────┐ │
│ │ Redis Counter │ │ (Atomic INCR)
│ │ 0 → 1 → 2 │ │
│ └───────┬────────┘ │
│ │ │
└────────────┼────────────┘
│
▼ (When count == 3)
┌──────────────┐
│ Callback Task│ (Aggregate results)
└──────────────┘
Initialize Chord
use ;
use Uuid;
let mut backend = new?;
let chord_id = new_v4;
let state = ChordState ;
backend.chord_init.await?;
Redis keys created:
celery-chord-{chord_id}: Chord state (JSON)celery-chord-counter-{chord_id}: Atomic counter (integer, initialized to 0)
Complete Task (Increment Counter)
// Called by worker when task completes
let count = backend.chord_complete_task.await?;
println!;
// When count == state.total, enqueue callback
if count >= state.total
Redis command: INCR celery-chord-counter-{chord_id} (atomic)
Thread-safety: Multiple workers can complete tasks simultaneously without race conditions.
Get Chord State
if let Some = backend.chord_get_state.await?
Chord State Structure
Configuration
Custom Key Prefix
let backend = new?
.with_prefix;
// Results stored at: myapp-task-{task_id}
Redis URL Formats
// Basic
let backend = new?;
// With password
let backend = new?;
// TLS
let backend = new?;
// Specific database
let backend = new?;
// Unix socket
let backend = new?;
Use Cases
1. Task Result Polling
use ;
async
2. Task Lifecycle Tracking
async
3. Chord Map-Reduce
use ;
use Broker;
async
4. Result Expiration Strategy
use Duration;
async
Error Handling
use ;
match backend.store_result.await
Error Types:
Redis: Underlying Redis client errorsSerialization: JSON encoding/decoding errorsNotFound: Task result doesn't existConnection: Failed to connect to Redis
Performance
Connection Pooling
- Uses multiplexed async connections via
redis::Client::get_multiplexed_async_connection() - Connections automatically reused
- No manual pool management required
Atomic Operations
- Chord counter:
INCRcommand (O(1), atomic) - Store/Get:
SET/GETcommands (O(1)) - Delete:
DELcommand (O(1)) - Expiration:
EXPIREcommand (O(1))
Throughput
| Operation | Latency | Notes |
|---|---|---|
| Store result | <1ms | Depends on network RTT |
| Get result | <1ms | Depends on network RTT |
| Chord increment | <1ms | Atomic operation |
| Set expiration | <1ms | Same as SET |
Optimization tips:
- Use pipelining for batch operations
- Set appropriate TTLs to prevent memory bloat
- Co-locate Redis and workers for low latency
Redis Memory Usage
Per Task
| Component | Size | Notes |
|---|---|---|
| Task metadata | ~500B - 2KB | Depends on result size |
| Chord state | ~200B | Per chord, not per task |
| Chord counter | ~16B | Integer value |
Example Calculation
1 million tasks:
- Average task metadata: 1KB
- Total: ~1GB
- With 24h TTL: ~42K tasks/hour = 42MB/hour steady state
1000 active chords:
- Chord state: 200B × 1000 = 200KB
- Chord counters: 16B × 1000 = 16KB
- Total: ~220KB
Best Practices
1. Always Set TTL
// Store result
backend.store_result.await?;
// Set expiration (required!)
backend.set_expiration.await?;
Why: Prevents unbounded memory growth in Redis.
2. Handle Missing Results
match backend.get_result.await?
3. Chord Cleanup
// After chord completes, clean up state
backend.delete_result.await?;
// Or set TTL on chord state
backend.set_expiration.await?;
4. Error Recovery
// Retry on transient errors
let mut retries = 3;
loop
Celery Compatibility
Key Naming
Compatible with Celery's default key naming:
- Task results:
celery-task-meta-{task_id} - Chord state:
celery-chord-{chord_id} - Chord counter:
celery-chord-counter-{chord_id}
Result Format
Task metadata format matches Celery's backend structure:
Testing
208 tests passing (unit + doc tests + integration tests)
Troubleshooting
Results disappearing
Cause: TTL expired Solution: Increase TTL or retrieve results faster
Chord callback not triggered
Cause: Counter state lost (Redis restart) Solution: Enable Redis persistence (AOF or RDB)
Slow result retrieval
Cause: Network latency Solution: Co-locate Redis with workers, use connection pooling
Memory usage growing
Cause: Missing TTL on results
Solution: Always call set_expiration() after storing results
See Also
- Canvas:
celers-canvas- Chord workflow primitives - Worker:
celers-worker- Worker integration with result backend - Core:
celers-core- Task types and traits
Requirements
- Redis: 6.0+ (6.2+ recommended)
- Features: INCR, SET, GET, EXPIRE commands
- Persistence: Recommended (AOF or RDB) for chord state durability
License
Apache-2.0