Crate celers_core

Crate celers_core 

Source
Expand description

CeleRS Core - Type-safe distributed task queue library

This crate provides the core abstractions and types for building distributed task processing systems in Rust.

§Quick Start

§Basic Task Creation

use celers_core::{SerializedTask, TaskMetadata, TaskState};

// Create a simple task
let task = SerializedTask::new("process_data".to_string(), vec![1, 2, 3, 4, 5])
    .with_priority(5)
    .with_max_retries(3)
    .with_timeout(60);

assert_eq!(task.metadata.state, TaskState::Pending);
assert!(task.has_priority());

§Task Dependencies and Workflows

use celers_core::{TaskDag, TaskId, SerializedTask};
use uuid::Uuid;

// Create a DAG for task dependencies
let mut dag = TaskDag::new();

let load_task = Uuid::new_v4();
let transform_task = Uuid::new_v4();
let save_task = Uuid::new_v4();

dag.add_node(load_task, "load_data");
dag.add_node(transform_task, "transform_data");
dag.add_node(save_task, "save_results");

// Define dependencies: transform depends on load, save depends on transform
dag.add_dependency(transform_task, load_task).unwrap();
dag.add_dependency(save_task, transform_task).unwrap();

// Validate DAG (no cycles)
assert!(dag.validate().is_ok());

// Get execution order
let order = dag.topological_sort().unwrap();
assert_eq!(order, vec![load_task, transform_task, save_task]);

§Retry Strategies

use celers_core::RetryStrategy;

// Exponential backoff
let strategy = RetryStrategy::exponential(1000, 2.0); // 1s initial, 2x multiplier

assert_eq!(strategy.calculate_delay(0, None), 1000); // 1s
assert_eq!(strategy.calculate_delay(1, Some(1000)), 2000); // 2s
assert_eq!(strategy.calculate_delay(2, Some(2000)), 4000); // 4s

// Fixed delay retry strategy
let fixed = RetryStrategy::fixed(5000); // 5 seconds
assert_eq!(fixed.calculate_delay(0, None), 5000);
assert_eq!(fixed.calculate_delay(10, Some(5000)), 5000);

§State Tracking

use celers_core::{StateHistory, TaskState};

let mut history = StateHistory::with_initial(TaskState::Pending);

// Track state transitions
history.transition(TaskState::Running);
history.transition(TaskState::Succeeded(vec![1, 2, 3]));

assert_eq!(history.transition_count(), 2);
assert!(history.current_state().unwrap().is_terminal());

§Exception Handling

use celers_core::{TaskException, ExceptionCategory, ExceptionPolicy};

// Create an exception policy
let policy = ExceptionPolicy::new()
    .retry_on(&["Temporary*", "Network*"])
    .fail_on(&["Fatal*", "Critical*"])
    .ignore_on(&["Ignorable*"]);

// Build an exception
let exception = TaskException::new("TemporaryError", "Connection timeout")
    .with_category(ExceptionCategory::Retryable);

§Core Concepts

  • Tasks: Units of work that can be executed asynchronously
  • Brokers: Message queue backends for task distribution
  • States: Task lifecycle state machine (Pending → Running → Success/Failure)
  • Dependencies: DAG-based task dependency management
  • Retries: Flexible retry strategies with backoff
  • Routing: Pattern-based task routing to queues
  • Rate Limiting: Token bucket and sliding window algorithms

Re-exports§

pub use broker::Broker;
pub use broker::BrokerMessage;
pub use config::BackendTransport;
pub use config::BeatSchedule;
pub use config::BrokerTransport;
pub use config::CeleryConfig;
pub use config::ScheduleDefinition;
pub use config::TaskConfig;
pub use config::TaskRoute;
pub use control::ActiveTaskInfo;
pub use control::BrokerStats;
pub use control::ControlCommand;
pub use control::ControlResponse;
pub use control::DeliveryInfo;
pub use control::InspectCommand;
pub use control::InspectResponse;
pub use control::PoolStats;
pub use control::QueueCommand;
pub use control::QueueResponse;
pub use control::QueueStats;
pub use control::RequestInfo;
pub use control::ReservedTaskInfo;
pub use control::ScheduledTaskInfo;
pub use control::WorkerConf;
pub use control::WorkerReport;
pub use control::WorkerStats;
pub use dag::DagNode;
pub use dag::TaskDag;
pub use error::CelersError;
pub use error::Result;
pub use event::Alert;
pub use event::AlertCondition;
pub use event::AlertContext;
pub use event::AlertHandler;
pub use event::AlertManager;
pub use event::AlertSeverity;
pub use event::CompositeEventEmitter;
pub use event::Event;
pub use event::EventDispatcher;
pub use event::EventEmitter;
pub use event::EventFilter;
pub use event::EventMonitor;
pub use event::EventReceiver;
pub use event::EventStats;
pub use event::EventStorage;
pub use event::EventStream;
pub use event::FileEventStorage;
pub use event::InMemoryEventEmitter;
pub use event::InMemoryEventStorage;
pub use event::LogLevel;
pub use event::LoggingAlertHandler;
pub use event::LoggingEventEmitter;
pub use event::NoOpEventEmitter;
pub use event::TaskEvent;
pub use event::TaskEventBuilder;
pub use event::WorkerEvent;
pub use event::WorkerEventBuilder;
pub use exception::ExceptionAction;
pub use exception::ExceptionCategory;
pub use exception::ExceptionHandler;
pub use exception::ExceptionHandlerChain;
pub use exception::ExceptionPolicy;
pub use exception::LoggingExceptionHandler;
pub use exception::PolicyExceptionHandler;
pub use exception::TaskException;
pub use exception::TracebackFrame;
pub use executor::TaskRegistry;
pub use rate_limit::create_rate_limiter;
pub use rate_limit::DistributedRateLimiter;
pub use rate_limit::DistributedRateLimiterCoordinator;
pub use rate_limit::DistributedRateLimiterState;
pub use rate_limit::DistributedSlidingWindowSpec;
pub use rate_limit::DistributedTokenBucketSpec;
pub use rate_limit::RateLimitConfig;
pub use rate_limit::RateLimiter;
pub use rate_limit::SlidingWindow;
pub use rate_limit::TaskRateLimiter;
pub use rate_limit::TokenBucket;
pub use rate_limit::WorkerRateLimiter;
pub use result::AsyncResult;
pub use result::ExtendedResultStore;
pub use result::ResultChunk;
pub use result::ResultChunker;
pub use result::ResultCompressor;
pub use result::ResultMetadata;
pub use result::ResultStore;
pub use result::ResultTombstone;
pub use result::TaskResultValue;
pub use retry::RetryPolicy;
pub use retry::RetryStrategy;
pub use revocation::PatternRevocation;
pub use revocation::RevocationManager;
pub use revocation::RevocationMode;
pub use revocation::RevocationRequest;
pub use revocation::RevocationResult;
pub use revocation::RevocationState;
pub use revocation::WorkerRevocationManager;
pub use router::ArgumentCondition;
pub use router::GlobPattern;
pub use router::PatternMatcher;
pub use router::RegexPattern;
pub use router::RouteResult;
pub use router::RouteRule;
pub use router::Router;
pub use router::RouterBuilder;
pub use router::RoutingConfig;
pub use state::StateHistory;
pub use state::StateTransition;
pub use state::TaskState;
pub use task::SerializedTask;
pub use task::Task;
pub use task::TaskId;
pub use task::TaskMetadata;
pub use time_limit::TaskTimeLimits;
pub use time_limit::TimeLimit;
pub use time_limit::TimeLimitConfig;
pub use time_limit::TimeLimitExceeded;
pub use time_limit::TimeLimitSettings;
pub use time_limit::TimeLimitStatus;
pub use time_limit::WorkerTimeLimits;

Modules§

broker
config
Celery-compatible configuration for CeleRS
control
Worker Control Commands
dag
Directed Acyclic Graph (DAG) support for task dependencies
error
event
Real-time event types for task and worker lifecycle
exception
Exception Handling for Task Execution
executor
rate_limit
Rate Limiting for Task Execution
result
AsyncResult API for querying task results
retry
Retry strategies for task execution
revocation
Task Revocation
router
Task Routing
state
task
time_limit
Time Limits for Task Execution