celers-protocol
Celery protocol v2/v5 implementation for CeleRS. Ensures wire-level compatibility with Python Celery workers and brokers.
Overview
Production-ready protocol implementation with:
- ✅ Celery Protocol v2: Compatible with Celery 4.x+
- ✅ Celery Protocol v5: Compatible with Celery 5.x+
- ✅ JSON Serialization: Default, universally compatible
- ✅ MessagePack: Optional high-performance binary format
- ✅ AMQP Properties: Correlation ID, reply-to, delivery mode
- ✅ Workflow Headers: Parent ID, root ID, group ID
- ✅ Base64 Encoding: Binary-safe message bodies
- ✅ Full Metadata: ETA, expiration, retries, priority
Quick Start
use ;
use Uuid;
// Create a simple task message
let task_id = new_v4;
let body = to_vec.unwrap;
let message = new;
// Serialize to JSON for transport
let serialized = to_string.unwrap;
Protocol Structure
Complete Message Format
JSON representation:
Message Headers
Message Properties
Creating Messages
Basic Task
use Message;
use Uuid;
let task_id = new_v4;
let body = to_vec.unwrap;
let message = new;
With Priority
let message = new
.with_priority; // Highest priority
Priority levels:
- 0-3: Low priority
- 4-6: Normal priority
- 7-9: High priority
With Parent/Root ID (Workflows)
let parent_id = new_v4;
let root_id = new_v4;
let message = new
.with_parent
.with_root;
Use cases:
- Chain workflows (parent → child)
- Workflow tracking (all tasks share root_id)
- Result aggregation by root_id
With Group ID
let group_id = new_v4;
let message = new
.with_group;
Use cases:
- Group/Chord workflows
- Parallel task tracking
- Bulk operations
With ETA (Delayed Execution)
use ;
// Execute in 1 hour
let eta = now + hours;
let message = new
.with_eta;
With Expiration
use ;
// Task expires in 5 minutes
let expires = now + minutes;
let message = new
.with_expires;
Task Arguments
Standard Format
use TaskArgs;
let args = TaskArgs ;
let body = to_vec.unwrap;
let message = new;
JSON representation:
Complex Arguments
let args = TaskArgs ;
Protocol Versions
Version 2 (Default)
use ProtocolVersion;
let version = V2; // Celery 4.x+
Features:
- JSON/MessagePack serialization
- Basic workflow support
- AMQP-style properties
- Task metadata
Compatible with:
- Celery 4.0+
- Celery 5.0+ (backward compatible)
Version 5
let version = V5; // Celery 5.x+
Additional features:
- Extended workflow metadata
- Improved error handling
- Enhanced tracing
Content Types
JSON (Default)
use ContentType;
let content_type = Json;
assert_eq!;
Pros:
- Human-readable
- Universally supported
- Easy debugging
Cons:
- Larger message size
- Slower serialization
MessagePack (Optional)
[]
= { = "0.1", = ["msgpack"] }
use ContentType;
let content_type = MessagePack;
assert_eq!;
Pros:
- Compact binary format
- Fast serialization
- Smaller message size
Cons:
- Not human-readable
- Requires msgpack feature
Binary (Custom)
[]
= { = "0.1", = ["binary"] }
let content_type = Binary;
assert_eq!;
Custom Content Type
let content_type = Custom;
Serialization
Message Serialization
use Message;
let message = new;
// To JSON
let json = to_string?;
// To bytes (for broker)
let bytes = to_vec?;
Message Deserialization
// From JSON string
let message: Message = from_str?;
// From bytes
let message: Message = from_slice?;
Base64 Encoding
Message bodies are automatically base64-encoded when serializing to JSON:
let body = vec!; // Binary data
let message = new;
let json = to_string?;
// body field in JSON: "//79" (base64)
Celery Compatibility
Python Celery Interoperability
Send from Rust, receive in Python:
// Rust: Send task
let message = new;
broker.enqueue.await?;
# Python: Receive and execute
=
return +
Send from Python, receive in Rust:
# Python: Send task
=
// Rust: Receive and execute
use TaskRegistry;
let mut registry = new;
registry.register;
Wire Format Compatibility
CeleRS messages are 100% compatible with Celery wire format:
| Component | CeleRS | Celery | Compatible? |
|---|---|---|---|
| Headers | ✅ | ✅ | ✅ Yes |
| Properties | ✅ | ✅ | ✅ Yes |
| Body format | JSON/MessagePack | JSON/MessagePack/Pickle | ✅ Yes* |
| UUIDs | ✅ | ✅ | ✅ Yes |
| Timestamps | ISO8601 | ISO8601 | ✅ Yes |
*Pickle not supported in CeleRS (security reasons)
Message Examples
Simple Task
Priority Task
Workflow Task (Chain)
Group Task
Delayed Task (ETA)
Best Practices
1. Always Set Task ID
// Good: Unique ID
let task_id = new_v4;
let message = new;
// Bad: Reused ID (don't do this)
// let message = Message::new("task".to_string(), old_id, body);
2. Use Priority Sparingly
// Good: Reserve high priority for urgent tasks
let message = new
.with_priority;
// Bad: Everything is high priority (defeats the purpose)
// let message = Message::new("regular_task".to_string(), task_id, body)
// .with_priority(9);
3. Set Expiration for Time-Sensitive Tasks
use ;
// Task only relevant for 5 minutes
let expires = now + minutes;
let message = new
.with_expires;
4. Use Workflows for Related Tasks
// Parent task
let parent_id = new_v4;
let root_id = parent_id; // Root is the first task
let parent_msg = new
.with_root;
// Child task
let child_id = new_v4;
let child_msg = new
.with_parent
.with_root;
5. Choose Appropriate Content Type
// Small messages: JSON is fine
let json_msg = new;
// Large messages or high throughput: Use MessagePack
let msgpack_msg = ;
Troubleshooting
Messages not received by Python workers
Cause: Content type mismatch
Solution: Ensure content-type is "application/json" or "application/x-msgpack"
Binary data corruption
Cause: Missing base64 encoding Solution: Body is automatically base64-encoded when serializing to JSON
Priority not working
Cause: Broker doesn't support priorities Solution: Use Redis with sorted sets or RabbitMQ with priority queues
ETA tasks executing immediately
Cause: Worker doesn't check ETA
Solution: Use celers-worker or Celery worker with ETA support
Performance
Message Size
| Content Type | Overhead | Typical Size |
|---|---|---|
| JSON | ~30% | 200-500B + body |
| MessagePack | ~10% | 150-300B + body |
| Binary | Minimal | 100-200B + body |
Serialization Speed
| Format | Serialize | Deserialize |
|---|---|---|
| JSON | ~100K msg/sec | ~100K msg/sec |
| MessagePack | ~200K msg/sec | ~200K msg/sec |
Recommendation: Use MessagePack for high-throughput systems.
Security Considerations
No Pickle Support
Unlike Python Celery, CeleRS does not support Pickle serialization:
# Python (INSECURE - don't use)
= # ❌ Arbitrary code execution
# CeleRS (SECURE)
# Only JSON and MessagePack supported # ✅ Safe
Why: Pickle allows arbitrary code execution, making it a security risk.
Content-Type Validation
Always validate content-type before deserializing:
match message.content_type.as_str
Testing
See Also
- Core:
celers-core- Task execution and registry - Broker:
celers-broker-redis- Redis broker implementation - Worker:
celers-worker- Worker runtime
License
MIT OR Apache-2.0