Crate celers

Crate celers 

Source
Expand description

§CeleRS - Celery-Compatible Distributed Task Queue for Rust

CeleRS is a production-ready, Celery-compatible distributed task queue library providing binary-level protocol compatibility with Python Celery while delivering superior performance, type safety, and reliability.

§Quick Start

§1. Add CeleRS to your Cargo.toml

[dependencies]
celers = { version = "0.1", features = ["redis", "backend-redis", "json"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }

§2. Define a Task

use celers::prelude::*;

#[derive(Clone, Serialize, Deserialize, Debug)]
struct AddArgs {
    x: i32,
    y: i32,
}

#[celers::task]
async fn add(args: AddArgs) -> Result<i32, Box<dyn std::error::Error>> {
    Ok(args.x + args.y)
}

§3. Create a Worker

use celers::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Create broker from environment variables
    let broker = create_broker_from_env().await?;

    // Configure and start worker
    let worker = WorkerConfigBuilder::new()
        .concurrency(4)
        .prefetch_count(10)
        .build(broker)?;

    worker.start().await?;
    Ok(())
}

§4. Send Tasks

use celers::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let broker = create_broker_from_env().await?;

    // Send a single task
    let task = add::new(AddArgs { x: 1, y: 2 });
    broker.enqueue(task).await?;

    // Send a workflow
    let workflow = Chain::new()
        .add(add::new(AddArgs { x: 1, y: 2 }))
        .add(add::new(AddArgs { x: 3, y: 4 }));
    workflow.apply_async(&broker).await?;

    Ok(())
}

§Features

  • Type-Safe: Compile-time guarantees for task signatures
  • Celery-Compatible: Binary protocol compatibility with Python Celery
  • High Performance: 10x throughput compared to Python Celery
  • Multiple Brokers: Redis, PostgreSQL, MySQL, RabbitMQ (AMQP), AWS SQS
  • Multiple Backends: Redis, PostgreSQL/MySQL (Database), gRPC
  • Workflow Primitives: Chain, Group, Chord, Map, Starmap
  • Observability: Prometheus metrics, distributed tracing
  • Production Features: Batch operations, persistent scheduler, progress tracking

§Architecture

CeleRS follows a layered architecture:

Application Layer (Your Tasks)
       ↓
Runtime Layer (celers-worker, celers-canvas)
       ↓
Messaging Layer (celers-kombu, celers-broker-*)
       ↓
Protocol Layer (celers-protocol)

§Feature Selection Guide

Choose features based on your infrastructure and requirements:

§Broker Selection

FeatureUse WhenPerformance
redisSimple setup, high throughput needed⭐⭐⭐⭐⭐
postgresPostgreSQL infrastructure exists⭐⭐⭐⭐
mysqlMySQL infrastructure exists⭐⭐⭐⭐
amqpEnterprise messaging, complex routing⭐⭐⭐⭐
sqsAWS cloud, serverless, high availability⭐⭐⭐

§Backend Selection

FeatureUse WhenLatency
backend-redisWith Redis broker (recommended)Low
backend-dbWith PostgreSQL/MySQL brokerMedium
backend-rpcDistributed systems, microservicesMedium

§Configuration Examples

use celers::prelude::*;

// Example 1: Simple Redis Setup
let broker = RedisBroker::new("redis://localhost:6379", "celery")?;

// Example 2: PostgreSQL with Connection Pool
let broker = PostgresBroker::with_queue(
    "postgres://user:pass@localhost/celery",
    "celery"
).await?;

// Example 3: Environment-based Configuration
// Set: CELERS_BROKER_TYPE=redis
//      CELERS_BROKER_URL=redis://localhost:6379
//      CELERS_BROKER_QUEUE=celery
let broker = create_broker_from_env().await?;

// Example 4: Worker with Validation
let result = validate_worker_config(Some(4), Some(10));
if let Err(errors) = result {
    for error in errors {
        eprintln!("Configuration error: {}", error);
    }
}

// Example 5: Feature Compatibility Check
println!("{}", feature_compatibility_matrix());

§Testing

CeleRS provides development utilities for testing:

#[cfg(test)]
mod tests {
    use celers::dev_utils::{MockBroker, TaskBuilder};
    use celers::prelude::*;

    #[tokio::test]
    async fn test_task_execution() {
        let broker = MockBroker::new();

        // Create and enqueue a test task
        let task = TaskBuilder::new("my.task")
            .max_retries(3)
            .build();

        let task_id = broker.enqueue(task).await.unwrap();

        // Verify task was enqueued
        assert_eq!(broker.queue_len(), 1);

        // Dequeue and process
        let msg = broker.dequeue().await.unwrap().unwrap();
        assert_eq!(msg.task.metadata.name, "my.task");
    }
}

§Production Deployment Guide

§Infrastructure Setup

§1. Broker Selection and Configuration

Redis (Recommended for Most Use Cases)

# Install Redis
sudo apt-get install redis-server

# Configure for production
# /etc/redis/redis.conf
maxmemory 2gb
maxmemory-policy allkeys-lru
appendonly yes
appendfsync everysec

# Enable persistence
save 900 1
save 300 10
save 60 10000

PostgreSQL (For Database-Centric Deployments)

-- Create database and tables
CREATE DATABASE celers_broker;
CREATE TABLE celery_tasks (
    id SERIAL PRIMARY KEY,
    task_id UUID UNIQUE NOT NULL,
    task_name VARCHAR(255) NOT NULL,
    payload BYTEA NOT NULL,
    created_at TIMESTAMP DEFAULT NOW(),
    INDEX idx_created_at (created_at)
);
§2. Worker Configuration
use celers::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Initialize logging
    env_logger::init();

    // Create broker with connection pooling
    let broker = RedisBroker::new(
        &std::env::var("BROKER_URL")?,
        &std::env::var("QUEUE_NAME")?
    )?;

    // Configure worker for production
    let worker = WorkerConfigBuilder::new()
        .concurrency(num_cpus::get()) // Use all CPU cores
        .prefetch_count(10) // Balance throughput and memory
        .max_retries(3)
        .retry_delay_seconds(60)
        .build(broker)?;

    // Start worker with graceful shutdown
    worker.start().await?;
    Ok(())
}
§3. Systemd Service Configuration

Create /etc/systemd/system/celers-worker.service:

[Unit]
Description=CeleRS Worker
After=network.target redis.service

[Service]
Type=simple
User=celers
WorkingDirectory=/opt/celers
Environment="RUST_LOG=info"
Environment="BROKER_URL=redis://localhost:6379"
Environment="QUEUE_NAME=celery"
ExecStart=/opt/celers/bin/worker
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target

Enable and start:

sudo systemctl enable celers-worker
sudo systemctl start celers-worker
sudo systemctl status celers-worker

§Monitoring and Observability

§Metrics Integration
use celers::prelude::*;

// Enable metrics
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Start Prometheus metrics server
    tokio::spawn(async {
        let addr = ([0, 0, 0, 0], 9090).into();
        // Serve metrics at /metrics
    });

    // Your worker code
    Ok(())
}
§Distributed Tracing
use celers::tracing::*;

// Initialize tracing
init_tracing("celers-worker")?;

// Tasks automatically get traced
#[celers::task]
async fn my_task(args: Args) -> Result<(), Error> {
    // Span is automatically created and propagated
    Ok(())
}

§Performance Tuning

§Worker Concurrency
// CPU-bound tasks
.concurrency(num_cpus::get())

// I/O-bound tasks
.concurrency(num_cpus::get() * 4)

// Mixed workload
.concurrency(num_cpus::get() * 2)
§Prefetch Configuration
// Low memory systems
.prefetch_count(5)

// Standard configuration
.prefetch_count(10)

// High throughput
.prefetch_count(20)

§High Availability

§Multiple Workers

Deploy multiple worker instances for redundancy:

# Worker 1
WORKER_ID=worker-1 cargo run --release

# Worker 2
WORKER_ID=worker-2 cargo run --release

# Worker 3
WORKER_ID=worker-3 cargo run --release
§Redis Cluster
// Use Redis cluster for high availability
let broker = RedisBroker::with_cluster(&[
    "redis://node1:6379",
    "redis://node2:6379",
    "redis://node3:6379",
], "celery")?;

§Security Best Practices

§1. Secure Connections
# Use TLS for Redis
BROKER_URL=rediss://user:password@redis.example.com:6380

# Use SSL for PostgreSQL
BROKER_URL=postgresql://user:password@db.example.com:5432/celers?sslmode=require
§2. Authentication
// Use environment variables for credentials
let broker_url = std::env::var("BROKER_URL")?;
let broker = create_broker("redis", &broker_url, "celery").await?;
§3. Network Isolation
  • Run workers in private network
  • Use VPN or SSH tunnels for remote access
  • Implement firewall rules

§Scaling Strategies

§Horizontal Scaling
# Kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celers-worker
spec:
  replicas: 5  # Scale up/down as needed
  selector:
    matchLabels:
      app: celers-worker
  template:
    metadata:
      labels:
        app: celers-worker
    spec:
      containers:
      - name: worker
        image: myapp/celers-worker:latest
        env:
        - name: BROKER_URL
          valueFrom:
            secretKeyRef:
              name: celers-secrets
              key: broker-url
        resources:
          limits:
            memory: "1Gi"
            cpu: "1000m"
§Auto-scaling with Kubernetes HPA
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: celers-worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: celers-worker
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

§Troubleshooting

§Common Issues

High Memory Usage

  • Reduce prefetch_count
  • Implement chunked processing
  • Use streaming for large datasets

Slow Task Processing

  • Increase worker concurrency
  • Profile task execution
  • Optimize database queries

Task Failures

  • Check logs: journalctl -u celers-worker
  • Increase retry limits
  • Implement proper error handling

Queue Backlog

  • Add more workers
  • Optimize task execution time
  • Implement rate limiting

§Migration Guide from Python Celery

§Feature Comparison

FeaturePython CeleryCeleRSNotes
Task Definition@task decorator#[celers::task] macroType-safe in Rust
BrokersRedis, RabbitMQ, SQSRedis, PostgreSQL, MySQL, AMQP, SQSSame protocol
Result BackendsRedis, Database, RPCRedis, Database, gRPCBinary compatible
Canvas Primitiveschain, group, chordChain, Group, Chord, Map, StarmapSame semantics
Periodic TasksCelery BeatCeleRS BeatCompatible schedules
Rate LimitingToken bucket & sliding window
Task RoutingGlob & regex patterns
RetriesExponential backoff
MonitoringFlowerPrometheus + GrafanaStandard metrics
PerformanceBaseline10x fasterNative async

§API Mapping

§Task Definition

Python Celery:

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def add(x, y):
    return x + y

CeleRS:

use celers::prelude::*;

#[derive(Serialize, Deserialize)]
struct AddArgs { x: i32, y: i32 }

#[celers::task]
async fn add(args: AddArgs) -> Result<i32, Box<dyn Error>> {
    Ok(args.x + args.y)
}
§Sending Tasks

Python Celery:

# Simple task
result = add.delay(4, 4)

# With options
result = add.apply_async(
    args=(4, 4),
    countdown=10,
    retry=True,
    retry_policy={'max_retries': 3}
)

CeleRS:

// Simple task
let task = SerializedTask::new("add", serde_json::to_vec(&AddArgs { x: 4, y: 4 })?);
broker.enqueue(task).await?;

// With options (using Signature)
let sig = Signature::new("add".to_string())
    .with_args(vec![json!(4), json!(4)])
    .with_countdown(10)
    .with_max_retries(3);
§Canvas Primitives

Python Celery:

from celery import chain, group, chord

# Chain
workflow = chain(add.s(2, 2), add.s(4), add.s(8))
workflow.apply_async()

# Group
job = group(add.s(i, i) for i in range(10))
result = job.apply_async()

# Chord
callback = tsum.s()
header = [add.s(i, i) for i in range(10)]
result = chord(header)(callback)

CeleRS:

use celers::prelude::*;

// Chain
let workflow = Chain::new()
    .add("add", vec![json!(2), json!(2)])
    .add("add", vec![json!(4)])
    .add("add", vec![json!(8)]);
workflow.apply(&broker).await?;

// Group
let mut group = Group::new();
for i in 0..10 {
    group = group.add("add", vec![json!(i), json!(i)]);
}
group.apply(&broker).await?;

// Chord
let mut header = Group::new();
for i in 0..10 {
    header = header.add("add", vec![json!(i), json!(i)]);
}
let callback = Signature::new("tsum".to_string());
let chord = Chord::new(header, callback);
§Worker Configuration

Python Celery:

celery -A tasks worker \
    --concurrency=4 \
    --loglevel=info \
    --max-tasks-per-child=1000

CeleRS:

let worker = WorkerConfigBuilder::new()
    .concurrency(4)
    .prefetch_count(10)
    .build(broker)?;

worker.start().await?;

§Code Conversion Examples

§Example 1: Simple Task with Retry

Python:

@app.task(bind=True, max_retries=3)
def send_email(self, email, subject, body):
    try:
        mail_client.send(email, subject, body)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)

Rust:

#[derive(Serialize, Deserialize)]
struct EmailArgs {
    email: String,
    subject: String,
    body: String,
}

#[celers::task]
async fn send_email(args: EmailArgs) -> Result<(), Box<dyn Error>> {
    mail_client.send(&args.email, &args.subject, &args.body).await?;
    Ok(())
}

// Configure retry in task signature
let sig = Signature::new("send_email".to_string())
    .with_max_retries(3)
    .with_retry_delay(60);
§Example 2: Periodic Tasks

Python:

from celery.schedules import crontab

app.conf.beat_schedule = {
    'cleanup-every-midnight': {
        'task': 'tasks.cleanup',
        'schedule': crontab(hour=0, minute=0),
    },
}

Rust:

use celers::prelude::*;

let scheduler = BeatScheduler::new();
scheduler.add_task(ScheduledTask {
    name: "cleanup".to_string(),
    schedule: Schedule::Crontab {
        minute: "0".to_string(),
        hour: "0".to_string(),
        day: "*".to_string(),
        month: "*".to_string(),
        weekday: "*".to_string(),
    },
});

§Performance Differences

§Throughput Comparison
MetricPython CeleryCeleRSImprovement
Tasks/sec (simple)~1,000~10,00010x
Tasks/sec (I/O)~5,000~50,00010x
Memory per worker~50 MB~5 MB10x less
Startup time~2 sec~50 ms40x faster
Message latency~10 ms~1 ms10x faster
§Why CeleRS is Faster
  1. Native Async: Tokio’s async runtime vs Python’s asyncio
  2. Zero-copy Serialization: Direct memory access without Python object overhead
  3. Compiled Code: No runtime interpretation
  4. Efficient Memory: Stack allocation and no GC pauses
  5. Type Safety: Compile-time optimization opportunities

§Migration Checklist

Phase 1: Setup

  • Install Rust toolchain
  • Create new Rust project with CeleRS
  • Configure broker connection
  • Set up development environment

Phase 2: Task Migration

  • Identify all Celery tasks
  • Define Rust task argument structs
  • Convert task logic to async Rust
  • Add error handling
  • Configure retry policies

Phase 3: Worker Deployment

  • Build CeleRS worker binary
  • Configure worker settings (concurrency, prefetch)
  • Set up monitoring (Prometheus)
  • Deploy alongside Python workers
  • Gradual traffic migration

Phase 4: Validation

  • Monitor task success rates
  • Compare performance metrics
  • Verify result backend compatibility
  • Test retry behavior
  • Validate error handling

Phase 5: Optimization

  • Tune worker concurrency
  • Optimize database queries
  • Implement rate limiting
  • Set up distributed tracing

§Compatibility Notes

Binary Protocol Compatibility:

  • CeleRS uses the same message format as Python Celery
  • Tasks can be sent from Python and consumed by Rust (and vice versa)
  • Result backends are fully compatible

Limitations:

  • Pickle serialization not supported (use JSON or MessagePack)
  • Some advanced Python-specific features unavailable
  • Canvas primitives have same semantics but different API

Best Practices:

  • Start with stateless, CPU-bound tasks
  • Use JSON for serialization (most compatible)
  • Keep Python workers for Python-specific tasks
  • Gradually migrate high-throughput tasks to Rust

§Architecture Documentation

§System Design Overview

CeleRS follows a layered architecture for modularity and maintainability:

┌─────────────────────────────────────────────────────────────┐
│                     Application Layer                        │
│              (Your Tasks and Workflows)                      │
└─────────────────────────────────────────────────────────────┘
                           ↓
┌─────────────────────────────────────────────────────────────┐
│                    Canvas Layer                              │
│        (Chain, Group, Chord, Map, Starmap)                  │
│         celers-canvas                                        │
└─────────────────────────────────────────────────────────────┘
                           ↓
┌─────────────────────────────────────────────────────────────┐
│                     Worker Layer                             │
│    (Task Execution, Concurrency, Retry Logic)               │
│         celers-worker                                        │
└─────────────────────────────────────────────────────────────┘
                           ↓
┌─────────────────────────────────────────────────────────────┐
│                   Messaging Layer                            │
│         (Producer, Consumer, Transport)                      │
│              celers-kombu                                    │
└─────────────────────────────────────────────────────────────┘
                           ↓
┌─────────────────────────────────────────────────────────────┐
│                   Broker Layer                               │
│    (Redis, PostgreSQL, MySQL, AMQP, SQS)                    │
│  celers-broker-{redis,postgres,sql,amqp,sqs}                │
└─────────────────────────────────────────────────────────────┘
                           ↓
┌─────────────────────────────────────────────────────────────┐
│                   Protocol Layer                             │
│         (Message Format, Serialization)                      │
│            celers-protocol                                   │
└─────────────────────────────────────────────────────────────┘
                           ↓
┌─────────────────────────────────────────────────────────────┐
│                     Core Layer                               │
│      (Task Metadata, State, Common Types)                   │
│              celers-core                                     │
└─────────────────────────────────────────────────────────────┘

§Component Interactions

§Task Submission Flow
Application
    │
    │ 1. Create task with args
    ▼
SerializedTask
    │
    │ 2. Serialize to protocol format
    ▼
Protocol Layer (Message)
    │
    │ 3. Enqueue to broker
    ▼
Broker (Redis/PostgreSQL/etc)
    │
    │ 4. Store in queue
    ▼
Queue
§Task Execution Flow
Worker
    │
    │ 1. Dequeue task
    ▼
Broker
    │
    │ 2. Return BrokerMessage
    ▼
Worker
    │
    │ 3. Deserialize & validate
    ▼
Task Handler
    │
    │ 4. Execute async task
    ▼
Result
    │
    │ 5. Store in result backend
    ▼
Result Backend (Redis/DB)
    │
    │ 6. ACK/NACK to broker
    ▼
Broker
§Workflow Execution (Chord Example)
Chord { header: Group, callback: Task }
    │
    │ 1. Execute Group tasks in parallel
    ▼
┌─────────┬─────────┬─────────┐
│ Task 1  │ Task 2  │ Task 3  │
└─────────┴─────────┴─────────┘
    │         │         │
    │ 2. Collect results
    ▼         ▼         ▼
Result Aggregator
    │
    │ 3. Trigger callback when all complete
    ▼
Callback Task

§Data Flow Diagrams

§Message Format (Protocol Layer)
Message {
    headers: {
        id: UUID,
        task: String,
        origin: String,
        ...
    },
    properties: {
        correlation_id: String,
        reply_to: String,
        content_type: "application/json",
        content_encoding: "utf-8",
    },
    body: Vec<u8>  // Serialized task args
}
§Worker Pool Architecture
┌─────────────────────────────────────────────┐
│              Worker Manager                  │
│  ┌──────────────────────────────────────┐  │
│  │      Prefetch Queue (Bounded)         │  │
│  │    [Task] [Task] [Task] ...          │  │
│  └──────────────────────────────────────┘  │
│              │         │         │          │
│              ▼         ▼         ▼          │
│  ┌─────────┬─────────┬─────────┬─────────┐ │
│  │Worker 1 │Worker 2 │Worker 3 │Worker 4 │ │
│  │(Tokio  │(Tokio  │(Tokio  │(Tokio  │ │
│  │ Task)   │ Task)   │ Task)   │ Task)   │ │
│  └─────────┴─────────┴─────────┴─────────┘ │
└─────────────────────────────────────────────┘

§Scalability Patterns

§Horizontal Scaling
Load Balancer
    │
    ├─────────────┬─────────────┬─────────────┐
    ▼             ▼             ▼             ▼
Worker 1      Worker 2      Worker 3      Worker 4
    │             │             │             │
    └─────────────┴─────────────┴─────────────┘
                     │
                     ▼
             Shared Broker Queue
§Vertical Scaling
  • Increase worker concurrency (more Tokio tasks per worker)
  • Increase prefetch count (more tasks buffered)
  • Optimize task execution (reduce CPU/memory usage)
§Queue-Based Load Balancing
┌──────────────────────────────────────┐
│         Task Router                   │
│  (Based on task name or priority)    │
└──────────────────────────────────────┘
    │         │         │         │
    ▼         ▼         ▼         ▼
[Queue 1] [Queue 2] [Queue 3] [Queue 4]
    │         │         │         │
    ▼         ▼         ▼         ▼
Worker    Worker    Worker    Worker
 Pool      Pool      Pool      Pool

Modules§

advanced_patterns
Advanced workflow patterns for complex use cases
batch_helpers
Batch processing helpers for efficient data processing
broker_helper
Broker selection helpers
canvas
Canvas workflow types
circuit_breaker
Circuit Breaker for Redis Operations
compile_time_validation
Compile-time feature validation and conflict detection
config_validation
Configuration validation helpers
convenience
Convenience functions module
dedup
Task deduplication implementation
error
Error types re-exported from celers-kombu
error_recovery
Error recovery patterns for resilient task execution
health
Redis health check implementation
health_check
Health check utilities for monitoring worker and task health
ide_support
IDE support and type hints module
metrics_aggregation
Metrics aggregation utilities for collecting and analyzing task metrics
monitoring_helpers
Monitoring and observability helpers
performance_profiling
Performance profiling utilities for task execution analysis
prelude
Prelude module for common imports
presets
Production-ready configuration presets
protocol
Protocol types for advanced usage
quick_reference
Quick reference documentation module
quick_start
Quick start helpers for common use cases
rate_limit
Rate limiting types
redis_monitoring
Redis broker monitoring utilities
redis_utilities
Redis broker utility functions
resource_management
Resource management utilities for controlling task resource usage
result_helpers
Result aggregation helpers for working with task results
retry_strategies
Advanced retry strategies for fault-tolerant task execution
router
Task routing types
startup_optimization
Startup time optimization utilities
task_cancellation
Task cancellation utilities for managing task lifecycle
task_composition
Advanced task composition utilities
task_dependencies
Task dependency management for workflow orchestration
task_hooks
Task lifecycle hooks for pre/post execution events
utils
Utility functions for broker operations and analysis.
worker
Worker runtime types
workflow_templates
Workflow templates for common distributed computing patterns
workflow_validation
Workflow validation utilities

Macros§

time_init
Helper macro for timing initialization steps

Structs§

ActiveTaskInfo
Information about an actively executing task
AsyncResult
AsyncResult handle for querying task results (Celery-compatible API)
BrokerStats
Broker connection statistics
Chain
Chain: Sequential execution
Chord
Chord: Parallel execution with callback
Chunks
Chunks: Split iterable into chunks for parallel processing
CompositeEventEmitter
Composite event emitter that sends to multiple emitters
DeliveryInfo
Delivery information for a task
Envelope
Message envelope (message + metadata)
GlobPattern
Glob pattern for task name matching
Group
Group: Parallel execution
InMemoryEventEmitter
In-memory event emitter using broadcast channels
LoggingEventEmitter
Logging event emitter that logs events using tracing
Map
Map: Apply task to multiple arguments
Message
Complete Celery message
MessageHeaders
Message headers (Celery protocol)
MessageProperties
Message properties (AMQP-like)
NoOpEventEmitter
No-op event emitter that discards all events
PoolStats
Worker pool statistics
QueueConfig
Queue configuration
QueueStats
Queue statistics
RateLimitConfig
Configuration for rate limiting
RedisBroker
Redis-based broker implementation
RegexPattern
Regular expression pattern for task name matching
RequestInfo
Request information for a task
ReservedTaskInfo
Information about a reserved (prefetched) task
RouteResult
Routing result containing queue and optional AMQP settings
RouteRule
A routing rule that maps task names to queues
Router
Task router for directing tasks to appropriate queues
RouterBuilder
Builder for creating routers with fluent API
RoutingConfig
Serializable routing configuration
ScheduledTaskInfo
Information about a scheduled task (with ETA or countdown)
SerializedTask
Serialized task ready for queue
Signature
Signature (a task definition with arguments)
SlidingWindow
Sliding window rate limiter
Starmap
Starmap: Like map but unpacks arguments
TaskArgs
Task arguments (args, kwargs)
TaskEventBuilder
Builder for creating task events with common fields
TaskOptions
Task options
TaskRateLimiter
Per-task rate limiter manager
TokenBucket
Token bucket rate limiter
Worker
Worker runtime for consuming and executing tasks
WorkerConf
Worker configuration
WorkerConfig
Worker configuration
WorkerEventBuilder
Builder for creating worker events
WorkerRateLimiter
Thread-safe per-worker rate limiter
WorkerReport
Worker report (comprehensive status)
WorkerStats
Worker statistics
XMap
XMap: Map with exception handling
XStarmap
XStarmap: Starmap with exception handling

Enums§

BrokerError
Broker errors
ContentEncoding
Content encoding
ContentType
Content type for serialization
ControlCommand
Control commands that can be sent to workers
ControlResponse
Response to a control command
Event
Combined event type for all CeleRS events
InspectCommand
Inspection sub-commands
InspectResponse
Response data from inspect commands
LogLevel
Log level for event logging
PatternMatcher
Pattern matching strategy for task routing
ProtocolVersion
Protocol version
QueueMode
Queue mode
TaskEvent
Task lifecycle events (Celery-compatible)
TaskResultValue
Task result value stored in backend
TaskState
Task state enumeration with strict state machine transitions
WorkerEvent
Worker lifecycle events

Traits§

Broker
Core trait for task queue brokers
Consumer
Consumer trait (message consuming)
EventEmitter
Trait for emitting events to various transports
Producer
Producer trait (message publishing)
RateLimiter
Trait for rate limiter implementations
ResultStore
Result store trait for AsyncResult API
Transport
Transport trait (low-level broker connection)

Type Aliases§

Result

Attribute Macros§

task
Attribute macro for marking async functions as tasks

Derive Macros§

Task
Derive macro for Task trait