Expand description
§CeleRS - Celery-Compatible Distributed Task Queue for Rust
CeleRS is a production-ready, Celery-compatible distributed task queue library providing binary-level protocol compatibility with Python Celery while delivering superior performance, type safety, and reliability.
§Quick Start
§1. Add CeleRS to your Cargo.toml
[dependencies]
celers = { version = "0.2", features = ["redis", "backend-redis", "json"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }§2. Define a Task
use celers::prelude::*;
#[derive(Clone, Serialize, Deserialize, Debug)]
struct AddArgs {
x: i32,
y: i32,
}
#[celers::task]
async fn add(args: AddArgs) -> Result<i32, Box<dyn std::error::Error>> {
Ok(args.x + args.y)
}§3. Create a Worker
use celers::prelude::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create broker from environment variables
let broker = create_broker_from_env().await?;
// Configure and start worker
let worker = WorkerConfigBuilder::new()
.concurrency(4)
.prefetch_count(10)
.build(broker)?;
worker.start().await?;
Ok(())
}§4. Send Tasks
use celers::prelude::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let broker = create_broker_from_env().await?;
// Send a single task
let task = add::new(AddArgs { x: 1, y: 2 });
broker.enqueue(task).await?;
// Send a workflow
let workflow = Chain::new()
.add(add::new(AddArgs { x: 1, y: 2 }))
.add(add::new(AddArgs { x: 3, y: 4 }));
workflow.apply_async(&broker).await?;
Ok(())
}§Features
- Type-Safe: Compile-time guarantees for task signatures
- Celery-Compatible: Binary protocol compatibility with Python Celery
- High Performance: 10x throughput compared to Python Celery
- Multiple Brokers: Redis, PostgreSQL, MySQL, RabbitMQ (AMQP), AWS SQS
- Multiple Backends: Redis, PostgreSQL/MySQL (Database), gRPC
- Workflow Primitives: Chain, Group, Chord, Map, Starmap
- Observability: Prometheus metrics, distributed tracing
- Production Features: Batch operations, persistent scheduler, progress tracking
§Architecture
CeleRS follows a layered architecture:
Application Layer (Your Tasks)
↓
Runtime Layer (celers-worker, celers-canvas)
↓
Messaging Layer (celers-kombu, celers-broker-*)
↓
Protocol Layer (celers-protocol)§Feature Selection Guide
Choose features based on your infrastructure and requirements:
§Broker Selection
| Feature | Use When | Performance |
|---|---|---|
redis | Simple setup, high throughput needed | ⭐⭐⭐⭐⭐ |
postgres | PostgreSQL infrastructure exists | ⭐⭐⭐⭐ |
mysql | MySQL infrastructure exists | ⭐⭐⭐⭐ |
amqp | Enterprise messaging, complex routing | ⭐⭐⭐⭐ |
sqs | AWS cloud, serverless, high availability | ⭐⭐⭐ |
§Backend Selection
| Feature | Use When | Latency |
|---|---|---|
backend-redis | With Redis broker (recommended) | Low |
backend-db | With PostgreSQL/MySQL broker | Medium |
backend-rpc | Distributed systems, microservices | Medium |
§Configuration Examples
use celers::prelude::*;
// Example 1: Simple Redis Setup
let broker = RedisBroker::new("redis://localhost:6379", "celery")?;
// Example 2: PostgreSQL with Connection Pool
let broker = PostgresBroker::with_queue(
"postgres://user:pass@localhost/celery",
"celery"
).await?;
// Example 3: Environment-based Configuration
// Set: CELERS_BROKER_TYPE=redis
// CELERS_BROKER_URL=redis://localhost:6379
// CELERS_BROKER_QUEUE=celery
let broker = create_broker_from_env().await?;
// Example 4: Worker with Validation
let result = validate_worker_config(Some(4), Some(10));
if let Err(errors) = result {
for error in errors {
eprintln!("Configuration error: {}", error);
}
}
// Example 5: Feature Compatibility Check
println!("{}", feature_compatibility_matrix());§Testing
CeleRS provides development utilities for testing:
#[cfg(test)]
mod tests {
use celers::dev_utils::{MockBroker, TaskBuilder};
use celers::prelude::*;
#[tokio::test]
async fn test_task_execution() {
let broker = MockBroker::new();
// Create and enqueue a test task
let task = TaskBuilder::new("my.task")
.max_retries(3)
.build();
let task_id = broker.enqueue(task).await.unwrap();
// Verify task was enqueued
assert_eq!(broker.queue_len(), 1);
// Dequeue and process
let msg = broker.dequeue().await.unwrap().unwrap();
assert_eq!(msg.task.metadata.name, "my.task");
}
}§Production Deployment Guide
§Infrastructure Setup
§1. Broker Selection and Configuration
Redis (Recommended for Most Use Cases)
# Install Redis
sudo apt-get install redis-server
# Configure for production
# /etc/redis/redis.conf
maxmemory 2gb
maxmemory-policy allkeys-lru
appendonly yes
appendfsync everysec
# Enable persistence
save 900 1
save 300 10
save 60 10000PostgreSQL (For Database-Centric Deployments)
-- Create database and tables
CREATE DATABASE celers_broker;
CREATE TABLE celery_tasks (
id SERIAL PRIMARY KEY,
task_id UUID UNIQUE NOT NULL,
task_name VARCHAR(255) NOT NULL,
payload BYTEA NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
INDEX idx_created_at (created_at)
);§2. Worker Configuration
use celers::prelude::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize logging
env_logger::init();
// Create broker with connection pooling
let broker = RedisBroker::new(
&std::env::var("BROKER_URL")?,
&std::env::var("QUEUE_NAME")?
)?;
// Configure worker for production
let worker = WorkerConfigBuilder::new()
.concurrency(num_cpus::get()) // Use all CPU cores
.prefetch_count(10) // Balance throughput and memory
.max_retries(3)
.retry_delay_seconds(60)
.build(broker)?;
// Start worker with graceful shutdown
worker.start().await?;
Ok(())
}§3. Systemd Service Configuration
Create /etc/systemd/system/celers-worker.service:
[Unit]
Description=CeleRS Worker
After=network.target redis.service
[Service]
Type=simple
User=celers
WorkingDirectory=/opt/celers
Environment="RUST_LOG=info"
Environment="BROKER_URL=redis://localhost:6379"
Environment="QUEUE_NAME=celery"
ExecStart=/opt/celers/bin/worker
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.targetEnable and start:
sudo systemctl enable celers-worker
sudo systemctl start celers-worker
sudo systemctl status celers-worker§Monitoring and Observability
§Metrics Integration
use celers::prelude::*;
// Enable metrics
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Start Prometheus metrics server
tokio::spawn(async {
let addr = ([0, 0, 0, 0], 9090).into();
// Serve metrics at /metrics
});
// Your worker code
Ok(())
}§Distributed Tracing
use celers::tracing::*;
// Initialize tracing
init_tracing("celers-worker")?;
// Tasks automatically get traced
#[celers::task]
async fn my_task(args: Args) -> Result<(), Error> {
// Span is automatically created and propagated
Ok(())
}§Performance Tuning
§Worker Concurrency
// CPU-bound tasks
.concurrency(num_cpus::get())
// I/O-bound tasks
.concurrency(num_cpus::get() * 4)
// Mixed workload
.concurrency(num_cpus::get() * 2)§Prefetch Configuration
// Low memory systems
.prefetch_count(5)
// Standard configuration
.prefetch_count(10)
// High throughput
.prefetch_count(20)§High Availability
§Multiple Workers
Deploy multiple worker instances for redundancy:
# Worker 1
WORKER_ID=worker-1 cargo run --release
# Worker 2
WORKER_ID=worker-2 cargo run --release
# Worker 3
WORKER_ID=worker-3 cargo run --release§Redis Cluster
// Use Redis cluster for high availability
let broker = RedisBroker::with_cluster(&[
"redis://node1:6379",
"redis://node2:6379",
"redis://node3:6379",
], "celery")?;§Security Best Practices
§1. Secure Connections
# Use TLS for Redis
BROKER_URL=rediss://user:password@redis.example.com:6380
# Use SSL for PostgreSQL
BROKER_URL=postgresql://user:password@db.example.com:5432/celers?sslmode=require§2. Authentication
// Use environment variables for credentials
let broker_url = std::env::var("BROKER_URL")?;
let broker = create_broker("redis", &broker_url, "celery").await?;§3. Network Isolation
- Run workers in private network
- Use VPN or SSH tunnels for remote access
- Implement firewall rules
§Scaling Strategies
§Horizontal Scaling
# Kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: celers-worker
spec:
replicas: 5 # Scale up/down as needed
selector:
matchLabels:
app: celers-worker
template:
metadata:
labels:
app: celers-worker
spec:
containers:
- name: worker
image: myapp/celers-worker:latest
env:
- name: BROKER_URL
valueFrom:
secretKeyRef:
name: celers-secrets
key: broker-url
resources:
limits:
memory: "1Gi"
cpu: "1000m"§Auto-scaling with Kubernetes HPA
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: celers-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: celers-worker
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70§Troubleshooting
§Common Issues
High Memory Usage
- Reduce
prefetch_count - Implement chunked processing
- Use streaming for large datasets
Slow Task Processing
- Increase worker concurrency
- Profile task execution
- Optimize database queries
Task Failures
- Check logs:
journalctl -u celers-worker - Increase retry limits
- Implement proper error handling
Queue Backlog
- Add more workers
- Optimize task execution time
- Implement rate limiting
§Migration Guide from Python Celery
§Feature Comparison
| Feature | Python Celery | CeleRS | Notes |
|---|---|---|---|
| Task Definition | @task decorator | #[celers::task] macro | Type-safe in Rust |
| Brokers | Redis, RabbitMQ, SQS | Redis, PostgreSQL, MySQL, AMQP, SQS | Same protocol |
| Result Backends | Redis, Database, RPC | Redis, Database, gRPC | Binary compatible |
| Canvas Primitives | chain, group, chord | Chain, Group, Chord, Map, Starmap | Same semantics |
| Periodic Tasks | Celery Beat | CeleRS Beat | Compatible schedules |
| Rate Limiting | ✅ | ✅ | Token bucket & sliding window |
| Task Routing | ✅ | ✅ | Glob & regex patterns |
| Retries | ✅ | ✅ | Exponential backoff |
| Monitoring | Flower | Prometheus + Grafana | Standard metrics |
| Performance | Baseline | 10x faster | Native async |
§API Mapping
§Task Definition
Python Celery:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def add(x, y):
return x + yCeleRS:
use celers::prelude::*;
#[derive(Serialize, Deserialize)]
struct AddArgs { x: i32, y: i32 }
#[celers::task]
async fn add(args: AddArgs) -> Result<i32, Box<dyn Error>> {
Ok(args.x + args.y)
}§Sending Tasks
Python Celery:
# Simple task
result = add.delay(4, 4)
# With options
result = add.apply_async(
args=(4, 4),
countdown=10,
retry=True,
retry_policy={'max_retries': 3}
)CeleRS:
// Simple task
let task = SerializedTask::new("add", serde_json::to_vec(&AddArgs { x: 4, y: 4 })?);
broker.enqueue(task).await?;
// With options (using Signature)
let sig = Signature::new("add".to_string())
.with_args(vec![json!(4), json!(4)])
.with_countdown(10)
.with_max_retries(3);§Canvas Primitives
Python Celery:
from celery import chain, group, chord
# Chain
workflow = chain(add.s(2, 2), add.s(4), add.s(8))
workflow.apply_async()
# Group
job = group(add.s(i, i) for i in range(10))
result = job.apply_async()
# Chord
callback = tsum.s()
header = [add.s(i, i) for i in range(10)]
result = chord(header)(callback)CeleRS:
use celers::prelude::*;
// Chain
let workflow = Chain::new()
.add("add", vec![json!(2), json!(2)])
.add("add", vec![json!(4)])
.add("add", vec![json!(8)]);
workflow.apply(&broker).await?;
// Group
let mut group = Group::new();
for i in 0..10 {
group = group.add("add", vec![json!(i), json!(i)]);
}
group.apply(&broker).await?;
// Chord
let mut header = Group::new();
for i in 0..10 {
header = header.add("add", vec![json!(i), json!(i)]);
}
let callback = Signature::new("tsum".to_string());
let chord = Chord::new(header, callback);§Worker Configuration
Python Celery:
celery -A tasks worker \
--concurrency=4 \
--loglevel=info \
--max-tasks-per-child=1000CeleRS:
let worker = WorkerConfigBuilder::new()
.concurrency(4)
.prefetch_count(10)
.build(broker)?;
worker.start().await?;§Code Conversion Examples
§Example 1: Simple Task with Retry
Python:
@app.task(bind=True, max_retries=3)
def send_email(self, email, subject, body):
try:
mail_client.send(email, subject, body)
except Exception as exc:
raise self.retry(exc=exc, countdown=60)Rust:
#[derive(Serialize, Deserialize)]
struct EmailArgs {
email: String,
subject: String,
body: String,
}
#[celers::task]
async fn send_email(args: EmailArgs) -> Result<(), Box<dyn Error>> {
mail_client.send(&args.email, &args.subject, &args.body).await?;
Ok(())
}
// Configure retry in task signature
let sig = Signature::new("send_email".to_string())
.with_max_retries(3)
.with_retry_delay(60);§Example 2: Periodic Tasks
Python:
from celery.schedules import crontab
app.conf.beat_schedule = {
'cleanup-every-midnight': {
'task': 'tasks.cleanup',
'schedule': crontab(hour=0, minute=0),
},
}Rust:
use celers::prelude::*;
let scheduler = BeatScheduler::new();
scheduler.add_task(ScheduledTask {
name: "cleanup".to_string(),
schedule: Schedule::Crontab {
minute: "0".to_string(),
hour: "0".to_string(),
day: "*".to_string(),
month: "*".to_string(),
weekday: "*".to_string(),
},
});§Performance Differences
§Throughput Comparison
| Metric | Python Celery | CeleRS | Improvement |
|---|---|---|---|
| Tasks/sec (simple) | ~1,000 | ~10,000 | 10x |
| Tasks/sec (I/O) | ~5,000 | ~50,000 | 10x |
| Memory per worker | ~50 MB | ~5 MB | 10x less |
| Startup time | ~2 sec | ~50 ms | 40x faster |
| Message latency | ~10 ms | ~1 ms | 10x faster |
§Why CeleRS is Faster
- Native Async: Tokio’s async runtime vs Python’s asyncio
- Zero-copy Serialization: Direct memory access without Python object overhead
- Compiled Code: No runtime interpretation
- Efficient Memory: Stack allocation and no GC pauses
- Type Safety: Compile-time optimization opportunities
§Migration Checklist
✅ Phase 1: Setup
- Install Rust toolchain
- Create new Rust project with CeleRS
- Configure broker connection
- Set up development environment
✅ Phase 2: Task Migration
- Identify all Celery tasks
- Define Rust task argument structs
- Convert task logic to async Rust
- Add error handling
- Configure retry policies
✅ Phase 3: Worker Deployment
- Build CeleRS worker binary
- Configure worker settings (concurrency, prefetch)
- Set up monitoring (Prometheus)
- Deploy alongside Python workers
- Gradual traffic migration
✅ Phase 4: Validation
- Monitor task success rates
- Compare performance metrics
- Verify result backend compatibility
- Test retry behavior
- Validate error handling
✅ Phase 5: Optimization
- Tune worker concurrency
- Optimize database queries
- Implement rate limiting
- Set up distributed tracing
§Compatibility Notes
Binary Protocol Compatibility:
- CeleRS uses the same message format as Python Celery
- Tasks can be sent from Python and consumed by Rust (and vice versa)
- Result backends are fully compatible
Limitations:
- Pickle serialization not supported (use JSON or MessagePack)
- Some advanced Python-specific features unavailable
- Canvas primitives have same semantics but different API
Best Practices:
- Start with stateless, CPU-bound tasks
- Use JSON for serialization (most compatible)
- Keep Python workers for Python-specific tasks
- Gradually migrate high-throughput tasks to Rust
§Architecture Documentation
§System Design Overview
CeleRS follows a layered architecture for modularity and maintainability:
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ (Your Tasks and Workflows) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Canvas Layer │
│ (Chain, Group, Chord, Map, Starmap) │
│ celers-canvas │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Worker Layer │
│ (Task Execution, Concurrency, Retry Logic) │
│ celers-worker │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Messaging Layer │
│ (Producer, Consumer, Transport) │
│ celers-kombu │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Broker Layer │
│ (Redis, PostgreSQL, MySQL, AMQP, SQS) │
│ celers-broker-{redis,postgres,sql,amqp,sqs} │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Protocol Layer │
│ (Message Format, Serialization) │
│ celers-protocol │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Core Layer │
│ (Task Metadata, State, Common Types) │
│ celers-core │
└─────────────────────────────────────────────────────────────┘§Component Interactions
§Task Submission Flow
Application
│
│ 1. Create task with args
▼
SerializedTask
│
│ 2. Serialize to protocol format
▼
Protocol Layer (Message)
│
│ 3. Enqueue to broker
▼
Broker (Redis/PostgreSQL/etc)
│
│ 4. Store in queue
▼
Queue§Task Execution Flow
Worker
│
│ 1. Dequeue task
▼
Broker
│
│ 2. Return BrokerMessage
▼
Worker
│
│ 3. Deserialize & validate
▼
Task Handler
│
│ 4. Execute async task
▼
Result
│
│ 5. Store in result backend
▼
Result Backend (Redis/DB)
│
│ 6. ACK/NACK to broker
▼
Broker§Workflow Execution (Chord Example)
Chord { header: Group, callback: Task }
│
│ 1. Execute Group tasks in parallel
▼
┌─────────┬─────────┬─────────┐
│ Task 1 │ Task 2 │ Task 3 │
└─────────┴─────────┴─────────┘
│ │ │
│ 2. Collect results
▼ ▼ ▼
Result Aggregator
│
│ 3. Trigger callback when all complete
▼
Callback Task§Data Flow Diagrams
§Message Format (Protocol Layer)
Message {
headers: {
id: UUID,
task: String,
origin: String,
...
},
properties: {
correlation_id: String,
reply_to: String,
content_type: "application/json",
content_encoding: "utf-8",
},
body: Vec<u8> // Serialized task args
}§Worker Pool Architecture
┌─────────────────────────────────────────────┐
│ Worker Manager │
│ ┌──────────────────────────────────────┐ │
│ │ Prefetch Queue (Bounded) │ │
│ │ [Task] [Task] [Task] ... │ │
│ └──────────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┬─────────┬─────────┬─────────┐ │
│ │Worker 1 │Worker 2 │Worker 3 │Worker 4 │ │
│ │(Tokio │(Tokio │(Tokio │(Tokio │ │
│ │ Task) │ Task) │ Task) │ Task) │ │
│ └─────────┴─────────┴─────────┴─────────┘ │
└─────────────────────────────────────────────┘§Scalability Patterns
§Horizontal Scaling
Load Balancer
│
├─────────────┬─────────────┬─────────────┐
▼ ▼ ▼ ▼
Worker 1 Worker 2 Worker 3 Worker 4
│ │ │ │
└─────────────┴─────────────┴─────────────┘
│
▼
Shared Broker Queue§Vertical Scaling
- Increase worker concurrency (more Tokio tasks per worker)
- Increase prefetch count (more tasks buffered)
- Optimize task execution (reduce CPU/memory usage)
§Queue-Based Load Balancing
┌──────────────────────────────────────┐
│ Task Router │
│ (Based on task name or priority) │
└──────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
[Queue 1] [Queue 2] [Queue 3] [Queue 4]
│ │ │ │
▼ ▼ ▼ ▼
Worker Worker Worker Worker
Pool Pool Pool PoolModules§
- advanced_
patterns - Advanced workflow patterns for complex use cases
- batch_
helpers - Batch processing helpers
- broker_
helper - Broker selection helpers
- canvas
- Canvas workflow types
- circuit_
breaker - Circuit Breaker for Redis Operations
- compile_
time_ validation - Compile-time feature validation and conflict detection Compile-time feature validation and conflict detection
- config_
validation - Configuration validation helpers
- convenience
- Convenience functions module
- dedup
- Task deduplication implementation
- error
- Error types re-exported from celers-kombu
- error_
recovery - Error recovery patterns
- health
- Redis health check implementation
- health_
check - Health check utilities
- ide_
support - IDE support and documentation helpers
- metrics_
aggregation - Metrics aggregation utilities
- monitoring_
helpers - Monitoring and observability helpers
- performance_
profiling - Performance profiling utilities
- prelude
- Prelude module for common imports
- presets
- Production-ready configuration presets
- protocol
- Protocol types for advanced usage
- quick_
reference - Quick reference documentation
- quick_
start - Quick start helpers for common use cases
- rate_
limit - Rate limiting types
- redis_
monitoring - Redis broker monitoring utilities
- redis_
utilities - Redis broker utility functions
- resource_
management - Resource management utilities
- result_
helpers - Result aggregation helpers
- retry_
strategies - Advanced retry strategies
- router
- Task routing types
- startup_
optimization - Startup time optimization utilities
- task_
cancellation - Task cancellation utilities
- task_
composition - Task composition utilities
- task_
dependencies - Task dependency management
- task_
hooks - Task lifecycle hooks
- utils
- Utility functions for broker operations and analysis.
- worker
- Worker runtime types
- workflow_
templates - Workflow templates for common patterns
- workflow_
validation - Workflow validation utilities
Macros§
- time_
init - Helper macro for timing initialization steps
Structs§
- Active
Task Info - Information about an actively executing task
- Async
Result AsyncResulthandle for querying task results (Celery-compatible API)- Broker
Stats - Broker connection statistics
- Chain
- Chain: Sequential execution
- Chord
- Chord: Parallel execution with callback
- Chunks
- Chunks: Split iterable into chunks for parallel processing
- Composite
Event Emitter - Composite event emitter that sends to multiple emitters
- Delivery
Info - Delivery information for a task
- Envelope
- Message envelope (message + metadata)
- Glob
Pattern - Glob pattern for task name matching
- Group
- Group: Parallel execution
- InMemory
Event Emitter - In-memory event emitter using broadcast channels
- Logging
Event Emitter - Logging event emitter that logs events using tracing
- Map
- Map: Apply task to multiple arguments
- Message
- Complete Celery message
- Message
Headers - Message headers (Celery protocol)
- Message
Properties - Message properties (AMQP-like)
- NoOp
Event Emitter - No-op event emitter that discards all events
- Pool
Stats - Worker pool statistics
- Queue
Config - Queue configuration
- Queue
Stats - Queue statistics
- Rate
Limit Config - Configuration for rate limiting
- Redis
Broker - Redis-based broker implementation
- Regex
Pattern - Regular expression pattern for task name matching
- Request
Info - Request information for a task
- Reserved
Task Info - Information about a reserved (prefetched) task
- Route
Result - Routing result containing queue and optional AMQP settings
- Route
Rule - A routing rule that maps task names to queues
- Router
- Task router for directing tasks to appropriate queues
- Router
Builder - Builder for creating routers with fluent API
- Routing
Config - Serializable routing configuration
- Scheduled
Task Info - Information about a scheduled task (with ETA or countdown)
- Serialized
Task - Serialized task ready for queue
- Signature
- Signature (a task definition with arguments)
- Sliding
Window - Sliding window rate limiter
- Starmap
- Starmap: Like map but unpacks arguments
- Task
Args - Task arguments (args, kwargs)
- Task
Event Builder - Builder for creating task events with common fields
- Task
Options - Task options
- Task
Rate Limiter - Per-task rate limiter manager
- Token
Bucket - Token bucket rate limiter
- Worker
- Worker runtime for consuming and executing tasks
- Worker
Conf - Worker configuration
- Worker
Config - Worker configuration
- Worker
Event Builder - Builder for creating worker events
- Worker
Rate Limiter - Thread-safe per-worker rate limiter
- Worker
Report - Worker report (comprehensive status)
- Worker
Stats - Worker statistics
- XMap
- XMap: Map with exception handling
- XStarmap
- XStarmap: Starmap with exception handling
Enums§
- Broker
Error - Broker errors
- Content
Encoding - Content encoding
- Content
Type - Content type for serialization
- Control
Command - Control commands that can be sent to workers
- Control
Response - Response to a control command
- Event
- Combined event type for all
CeleRSevents - Inspect
Command - Inspection sub-commands
- Inspect
Response - Response data from inspect commands
- LogLevel
- Log level for event logging
- Pattern
Matcher - Pattern matching strategy for task routing
- Protocol
Version - Protocol version
- Queue
Mode - Queue mode
- Task
Event - Task lifecycle events (Celery-compatible)
- Task
Result Value - Task result value stored in backend
- Task
State - Task state enumeration with strict state machine transitions
- Worker
Event - Worker lifecycle events
Traits§
- Broker
- Core trait for task queue brokers
- Consumer
- Consumer trait (message consuming)
- Event
Emitter - Trait for emitting events to various transports
- Producer
- Producer trait (message publishing)
- Rate
Limiter - Trait for rate limiter implementations
- Result
Store - Result store trait for
AsyncResultAPI - Transport
- Transport trait (low-level broker connection)
Type Aliases§
Attribute Macros§
- task
- Attribute macro for marking async functions as tasks
Derive Macros§
- Task
- Derive macro for Task trait