celers-core 0.1.0

Core traits and types for CeleRS distributed task queue
Documentation

celers-core

Core abstractions and traits for the CeleRS distributed task queue system.

Overview

This crate provides the fundamental building blocks for CeleRS:

  • Broker trait: Abstract interface for message brokers (Redis, Postgres, AMQP, SQS, etc.)
  • Task types: Serialized tasks, metadata, and state management
  • Executor trait: Task execution and registry
  • Error types: Comprehensive error handling
  • Batch operations: High-performance bulk enqueue/dequeue (10-100x faster)

Features

  • Type-safe task definitions
  • Broker abstraction for multiple backends
  • Task state machine (Pending → Running → Success/Failure/Retry)
  • Batch operation support (10-100x performance improvement)
  • Workflow support (Group, Chain, Chord)
  • Async/await native

Usage

use celers_core::{Broker, SerializedTask, TaskRegistry};

// Create a task
let task = SerializedTask::new(
    "my_task".to_string(),
    serde_json::to_vec(&args)?,
);

// Enqueue to broker
let task_id = broker.enqueue(task).await?;

// Batch enqueue for better performance
let task_ids = broker.enqueue_batch(tasks).await?;

// Dequeue and execute
if let Some(msg) = broker.dequeue().await? {
    let result = registry.execute(&msg.task).await?;
    broker.ack(&msg.task.metadata.id, msg.receipt_handle.as_deref()).await?;
}

Broker Trait

The core abstraction for message brokers:

#[async_trait]
pub trait Broker: Send + Sync {
    // Single operations
    async fn enqueue(&self, task: SerializedTask) -> Result<TaskId>;
    async fn dequeue(&self) -> Result<Option<BrokerMessage>>;
    async fn ack(&self, task_id: &TaskId, receipt_handle: Option<&str>) -> Result<()>;
    async fn reject(&self, task_id: &TaskId, receipt_handle: Option<&str>, requeue: bool) -> Result<()>;

    // Batch operations (10-100x faster)
    async fn enqueue_batch(&self, tasks: Vec<SerializedTask>) -> Result<Vec<TaskId>>;
    async fn dequeue_batch(&self, count: usize) -> Result<Vec<BrokerMessage>>;
    async fn ack_batch(&self, tasks: &[(TaskId, Option<String>)]) -> Result<()>;

    // Management
    async fn queue_size(&self) -> Result<usize>;
    async fn cancel(&self, task_id: &TaskId) -> Result<bool>;
}

Task Types

SerializedTask

pub struct SerializedTask {
    pub metadata: TaskMetadata,
    pub args: Vec<u8>,          // JSON-serialized arguments
}

impl SerializedTask {
    pub fn new(name: String, args: Vec<u8>) -> Self;
    pub fn with_priority(self, priority: i32) -> Self;
    pub fn with_max_retries(self, max_retries: u32) -> Self;
    pub fn with_timeout(self, timeout_secs: u64) -> Self;
}

TaskMetadata

pub struct TaskMetadata {
    pub id: Uuid,
    pub name: String,
    pub state: TaskState,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
    pub max_retries: u32,
    pub timeout_secs: Option<u64>,
    pub priority: i32,

    // Workflow support
    pub group_id: Option<Uuid>,   // Group ID for parallel execution
    pub chord_id: Option<Uuid>,    // Chord ID for barrier synchronization
}

TaskState

pub enum TaskState {
    Pending,
    Running,
    Retrying(u32),  // retry count
    Success,
    Failure,
    Cancelled,
}

Task Registry

Execute registered tasks by name:

use celers_core::TaskRegistry;

let mut registry = TaskRegistry::new();

// Register a task executor
registry.register("my_task", |args| async move {
    // Task implementation
    Ok(result)
});

// Execute by name
let result = registry.execute(&task).await?;

Error Handling

pub enum CelersError {
    Broker(String),
    Serialization(String),
    Deserialization(String),
    TaskNotFound(String),
    TaskTimeout,
    Other(String),
}

pub type Result<T> = std::result::Result<T, CelersError>;

Performance

Batch Operations

Batch operations provide significant performance improvements:

Operation Individual Batch (10) Batch (100)
Throughput 1K/sec 10K/sec 50K/sec
Latency 1ms/task 0.1ms/task 0.01ms/task
Network RTT N 1 1

Best Practices

// Good: Batch enqueue for bulk operations
let tasks = create_many_tasks(100);
broker.enqueue_batch(tasks).await?;

// Less efficient: Individual enqueue
for task in tasks {
    broker.enqueue(task).await?;
}

Workflow Support

TaskMetadata includes fields for Canvas workflow primitives:

// Group tasks
task.metadata.group_id = Some(group_id);

// Chord tasks (map-reduce)
task.metadata.chord_id = Some(chord_id);

See celers-canvas for high-level workflow APIs.

Dependencies

[dependencies]
async-trait = "0.1"
uuid = { version = "1.0", features = ["v4", "serde"] }
serde = { version = "1.0", features = ["derive"] }
chrono = { version = "0.4", features = ["serde"] }
thiserror = "2.0"

See Also

  • celers-broker-redis: Redis broker with batch operations
  • celers-broker-postgres: PostgreSQL broker implementation
  • celers-worker: Worker runtime for executing tasks
  • celers-canvas: High-level workflow APIs (Chain, Group, Chord)

License

MIT OR Apache-2.0