pub use celers_core::{
ActiveTaskInfo, AsyncResult, Broker, BrokerStats, CompositeEventEmitter, ControlCommand,
ControlResponse, DeliveryInfo, Event, EventEmitter, GlobPattern, InMemoryEventEmitter,
InspectCommand, InspectResponse, LogLevel, LoggingEventEmitter, NoOpEventEmitter,
PatternMatcher, PoolStats, QueueStats, RateLimitConfig, RateLimiter, RegexPattern, RequestInfo,
ReservedTaskInfo, ResultStore, RouteResult, RouteRule, Router, RouterBuilder, RoutingConfig,
ScheduledTaskInfo, SerializedTask, SlidingWindow, TaskEvent, TaskEventBuilder, TaskRateLimiter,
TaskResultValue, TaskState, TokenBucket, WorkerConf, WorkerEvent, WorkerEventBuilder,
WorkerRateLimiter, WorkerReport, WorkerStats,
};
pub use celers_protocol::{
ContentEncoding, ContentType, Message, MessageHeaders, MessageProperties, ProtocolVersion,
TaskArgs,
};
pub use celers_kombu::{
utils, BrokerError, Consumer, Envelope, Producer, QueueConfig, QueueMode, Result, Transport,
};
pub use celers_worker::{Worker, WorkerConfig};
pub use celers_canvas::{
Chain, Chord, Chunks, Group, Map, Signature, Starmap, TaskOptions, XMap, XStarmap,
};
pub use celers_macros::{task, Task};
#[cfg(feature = "redis")]
pub use celers_broker_redis::{
circuit_breaker, dedup, health, monitoring as redis_monitoring, utilities as redis_utilities,
RedisBroker,
};
#[cfg(feature = "postgres")]
pub use celers_broker_postgres::{
monitoring as postgres_monitoring, utilities as postgres_utilities, PostgresBroker,
};
#[cfg(feature = "mysql")]
pub use celers_broker_sql::{
monitoring as mysql_monitoring, utilities as mysql_utilities, MysqlBroker,
};
#[cfg(feature = "amqp")]
pub use celers_broker_amqp::{
monitoring as amqp_monitoring, utilities as amqp_utilities, AmqpBroker,
};
#[cfg(feature = "sqs")]
pub use celers_broker_sqs::{
monitoring as sqs_monitoring, optimization, utilities as sqs_utilities, SqsBroker,
};
#[cfg(feature = "backend-redis")]
pub use celers_backend_redis::{
batch_size,
event_transport::{RedisEventConfig, RedisEventEmitter, RedisEventReceiver},
ttl, ChordState, RedisResultBackend, ResultBackend, TaskMeta, TaskResult,
};
#[cfg(feature = "backend-db")]
pub use celers_backend_db::{MysqlResultBackend, PostgresResultBackend};
#[cfg(feature = "backend-rpc")]
pub use celers_backend_rpc::GrpcResultBackend;
#[cfg(feature = "beat")]
pub use celers_beat::{BeatScheduler, Schedule, ScheduledTask};
#[cfg(feature = "metrics")]
pub use celers_metrics::{gather_metrics, reset_metrics};
pub mod prelude {
pub use crate::AsyncResult;
pub use crate::Broker;
pub use crate::CompositeEventEmitter;
pub use crate::ControlCommand;
pub use crate::ControlResponse;
pub use crate::Event;
pub use crate::EventEmitter;
pub use crate::InMemoryEventEmitter;
pub use crate::InspectCommand;
pub use crate::InspectResponse;
pub use crate::LogLevel;
pub use crate::LoggingEventEmitter;
pub use crate::NoOpEventEmitter;
pub use crate::ResultStore;
pub use crate::SerializedTask;
pub use crate::TaskEvent;
pub use crate::TaskEventBuilder;
pub use crate::TaskResultValue;
pub use crate::TaskState;
pub use crate::WorkerEvent;
pub use crate::WorkerEventBuilder;
pub use crate::WorkerStats;
pub use crate::RateLimitConfig;
pub use crate::RateLimiter;
pub use crate::SlidingWindow;
pub use crate::TaskRateLimiter;
pub use crate::TokenBucket;
pub use crate::WorkerRateLimiter;
pub use crate::GlobPattern;
pub use crate::PatternMatcher;
pub use crate::RegexPattern;
pub use crate::RouteResult;
pub use crate::RouteRule;
pub use crate::Router;
pub use crate::RouterBuilder;
pub use crate::RoutingConfig;
pub use crate::Worker;
pub use crate::WorkerConfig;
pub use celers_worker::WorkerConfigBuilder;
pub use crate::broker_helper::{create_broker, create_broker_from_env, BrokerConfigError};
pub use crate::config_validation::{
check_feature_compatibility, feature_compatibility_matrix, validate_broker_url,
validate_worker_config, ConfigValidator, ValidationError,
};
#[cfg(feature = "redis")]
pub use crate::{circuit_breaker, dedup, health, rate_limit, RedisBroker};
#[cfg(feature = "postgres")]
pub use crate::PostgresBroker;
#[cfg(feature = "mysql")]
pub use crate::MysqlBroker;
#[cfg(feature = "amqp")]
pub use crate::AmqpBroker;
#[cfg(feature = "sqs")]
pub use crate::{optimization, SqsBroker};
#[cfg(feature = "backend-redis")]
pub use crate::{
batch_size, ttl, ChordState, RedisEventConfig, RedisEventEmitter, RedisEventReceiver,
RedisResultBackend, ResultBackend, TaskMeta,
};
#[cfg(feature = "backend-db")]
pub use crate::{MysqlResultBackend, PostgresResultBackend};
#[cfg(feature = "backend-rpc")]
pub use crate::GrpcResultBackend;
#[cfg(feature = "beat")]
pub use crate::{BeatScheduler, Schedule, ScheduledTask};
#[cfg(feature = "metrics")]
pub use crate::{gather_metrics, reset_metrics};
#[cfg(feature = "tracing")]
pub use crate::tracing::{
create_tracer_provider, extract_trace_context, init_tracing, inject_trace_context,
publish_span, task_span,
};
pub use celers_macros::{task, Task};
pub use crate::{
Chain, Chord, Chunks, Group, Map, Signature, Starmap, TaskOptions, XMap, XStarmap,
};
pub use crate::BrokerError;
pub use async_trait::async_trait;
pub use serde::{Deserialize, Serialize};
pub use serde_json;
pub use serde_json::json;
pub use tokio;
pub use uuid::Uuid;
#[cfg(any(test, feature = "dev-utils"))]
pub use crate::dev_utils::{
create_test_task, EventTracker, MockBroker, PerformanceProfiler, QueueInspector,
TaskBuilder, TaskDebugger,
};
pub type TaskResult<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
pub type AsyncTaskFn<T> =
fn(Vec<u8>) -> std::pin::Pin<Box<dyn std::future::Future<Output = TaskResult<T>> + Send>>;
pub use celers_kombu::Result as KombuResult;
pub use crate::utils;
pub use crate::convenience::{
batch, best_effort, chain, chain_from, chord, chunks, critical, delay, expire_in, fan_in,
fan_out, group, group_from, high_priority, low_priority, map, options, parallel, pipeline,
retry_with_backoff, starmap, task, task_with_options, transient, with_countdown,
with_expires, with_priority, with_retry, with_timeout,
};
#[cfg(feature = "beat-cron")]
pub use crate::convenience::recurring;
#[cfg(feature = "beat")]
pub use crate::convenience::recurring_interval;
pub use crate::workflow_templates::{
batch_processing, etl_pipeline, map_reduce_workflow, priority_workflow, scatter_gather,
sequential_pipeline,
};
pub use crate::task_composition::{
circuit_breaker_group, rate_limited_workflow, retry_wrapper, timeout_wrapper,
};
pub use crate::error_recovery::{
ignore_errors, with_dlq, with_exponential_backoff, with_fallback,
};
pub use crate::workflow_validation::{
check_performance_concerns_chain, check_performance_concerns_group, validate_chain,
validate_chord, validate_group, ValidationError as WorkflowValidationError,
};
pub use crate::result_helpers::{
create_result_collector, create_result_filter, create_result_reducer,
create_result_transformer,
};
pub use crate::advanced_patterns::{
create_conditional_workflow, create_dynamic_workflow, create_parallel_chains,
create_saga_workflow,
};
pub use crate::monitoring_helpers::TaskMonitor;
pub use crate::batch_helpers::{
create_adaptive_batches, create_dynamic_batches, create_prioritized_batches,
};
pub use crate::health_check::{
DependencyChecker, HealthCheckResult, HealthStatus, WorkerHealthChecker,
};
pub use crate::resource_management::{ResourceLimits, ResourcePool, ResourceTracker};
pub use crate::task_hooks::{
HookRegistry, HookResult, LoggingHook, PostExecutionHook, PreExecutionHook, ValidationHook,
};
pub use crate::metrics_aggregation::{DataPoint, Histogram, MetricsAggregator};
pub use crate::task_cancellation::{CancellationToken, ExecutionGuard, TimeoutManager};
pub use crate::retry_strategies::{
DefaultRetryPolicy, ErrorPatternRetryPolicy, RetryPolicy, RetryStrategy,
};
pub use crate::task_dependencies::DependencyGraph;
pub use crate::performance_profiling::{PerformanceProfile, ProfileSpan};
}
pub mod convenience {
pub fn task(name: impl Into<String>) -> crate::Signature {
crate::Signature::new(name.into())
}
pub fn chain() -> crate::Chain {
crate::Chain::new()
}
pub fn group() -> crate::Group {
crate::Group::new()
}
pub fn chord(header: crate::Group, callback: crate::Signature) -> crate::Chord {
crate::Chord::new(header, callback)
}
pub fn chunks<T: serde::Serialize>(
task_name: impl Into<String>,
items: Vec<T>,
chunk_size: usize,
) -> crate::Chunks {
let sig = crate::Signature::new(task_name.into());
let serialized_items: Vec<serde_json::Value> = items
.into_iter()
.filter_map(|item| serde_json::to_value(item).ok())
.collect();
crate::Chunks::new(sig, serialized_items, chunk_size)
}
pub fn map<T: serde::Serialize>(task_name: impl Into<String>, items: Vec<T>) -> crate::Map {
let sig = crate::Signature::new(task_name.into());
let serialized_items: Vec<Vec<serde_json::Value>> = items
.into_iter()
.filter_map(|item| serde_json::to_value(item).ok().map(|v| vec![v]))
.collect();
crate::Map::new(sig, serialized_items)
}
pub fn starmap<T: serde::Serialize>(
task_name: impl Into<String>,
args: Vec<Vec<T>>,
) -> crate::Starmap {
let sig = crate::Signature::new(task_name.into());
let serialized_args: Vec<Vec<serde_json::Value>> = args
.into_iter()
.map(|arg_list| {
arg_list
.into_iter()
.filter_map(|item| serde_json::to_value(item).ok())
.collect()
})
.collect();
crate::Starmap::new(sig, serialized_args)
}
pub fn options() -> crate::TaskOptions {
crate::TaskOptions::default()
}
pub fn with_retry(max_retries: u32, retry_delay_secs: u64) -> crate::TaskOptions {
crate::TaskOptions {
max_retries: Some(max_retries),
countdown: Some(retry_delay_secs),
..Default::default()
}
}
pub fn with_timeout(timeout_secs: u64) -> crate::TaskOptions {
crate::TaskOptions {
time_limit: Some(timeout_secs),
..Default::default()
}
}
pub fn with_priority(priority: u8) -> crate::TaskOptions {
crate::TaskOptions {
priority: Some(priority),
..Default::default()
}
}
pub fn with_countdown(countdown_secs: u64) -> crate::TaskOptions {
crate::TaskOptions {
countdown: Some(countdown_secs),
..Default::default()
}
}
pub fn with_expires(expires_secs: u64) -> crate::TaskOptions {
crate::TaskOptions {
expires: Some(expires_secs),
..Default::default()
}
}
pub fn batch<T: serde::Serialize>(
task_name: impl Into<String>,
args_list: Vec<Vec<T>>,
) -> Vec<crate::Signature> {
let task_name = task_name.into();
args_list
.into_iter()
.map(|args| {
let sig = crate::Signature::new(task_name.clone());
let serialized_args: Vec<serde_json::Value> = args
.into_iter()
.filter_map(|arg| serde_json::to_value(arg).ok())
.collect();
sig.with_args(serialized_args)
})
.collect()
}
pub fn chain_from<T: serde::Serialize>(tasks: Vec<(&str, Vec<T>)>) -> crate::Chain {
let mut chain = crate::Chain::new();
for (task_name, args) in tasks {
let serialized_args: Vec<serde_json::Value> = args
.into_iter()
.filter_map(|arg| serde_json::to_value(arg).ok())
.collect();
chain = chain.then(task_name, serialized_args);
}
chain
}
pub fn group_from<T: serde::Serialize>(tasks: Vec<(&str, Vec<T>)>) -> crate::Group {
let mut group = crate::Group::new();
for (task_name, args) in tasks {
let serialized_args: Vec<serde_json::Value> = args
.into_iter()
.filter_map(|arg| serde_json::to_value(arg).ok())
.collect();
group = group.add(task_name, serialized_args);
}
group
}
pub fn task_with_options(
name: impl Into<String>,
args: Vec<serde_json::Value>,
max_retries: u32,
priority: u8,
) -> crate::Signature {
let mut sig = crate::Signature::new(name.into()).with_args(args);
sig.options.max_retries = Some(max_retries);
sig.options.priority = Some(priority);
sig
}
#[cfg(all(feature = "beat", feature = "beat-cron"))]
pub fn recurring(
task_name: impl Into<String>,
minute: &str,
hour: &str,
day_of_week: &str,
day_of_month: &str,
month_of_year: &str,
) -> crate::ScheduledTask {
crate::ScheduledTask::new(
task_name.into(),
crate::Schedule::crontab(minute, hour, day_of_week, day_of_month, month_of_year),
)
}
#[cfg(feature = "beat")]
pub fn recurring_interval(task_name: impl Into<String>, seconds: u64) -> crate::ScheduledTask {
crate::ScheduledTask::new(task_name.into(), crate::Schedule::interval(seconds))
}
pub fn delay(
task_name: impl Into<String>,
args: Vec<serde_json::Value>,
delay_secs: u64,
) -> crate::Signature {
let mut sig = crate::Signature::new(task_name.into()).with_args(args);
sig.options.countdown = Some(delay_secs);
sig
}
pub fn expire_in(
task_name: impl Into<String>,
args: Vec<serde_json::Value>,
expires_secs: u64,
) -> crate::Signature {
let mut sig = crate::Signature::new(task_name.into()).with_args(args);
sig.options.expires = Some(expires_secs);
sig
}
pub fn high_priority(
task_name: impl Into<String>,
args: Vec<serde_json::Value>,
) -> crate::Signature {
let mut sig = crate::Signature::new(task_name.into()).with_args(args);
sig.options.priority = Some(9);
sig
}
pub fn low_priority(
task_name: impl Into<String>,
args: Vec<serde_json::Value>,
) -> crate::Signature {
let mut sig = crate::Signature::new(task_name.into()).with_args(args);
sig.options.priority = Some(1);
sig
}
pub fn parallel() -> crate::Group {
crate::Group::new()
}
pub fn critical(
task_name: impl Into<String>,
args: Vec<serde_json::Value>,
) -> crate::Signature {
let mut sig = crate::Signature::new(task_name.into()).with_args(args);
sig.options.priority = Some(9);
sig.options.max_retries = Some(5);
sig
}
pub fn best_effort(
task_name: impl Into<String>,
args: Vec<serde_json::Value>,
) -> crate::Signature {
let mut sig = crate::Signature::new(task_name.into()).with_args(args);
sig.options.priority = Some(1);
sig.options.max_retries = Some(0);
sig
}
pub fn transient(
task_name: impl Into<String>,
args: Vec<serde_json::Value>,
ttl_secs: u64,
) -> crate::Signature {
let mut sig = crate::Signature::new(task_name.into()).with_args(args);
sig.options.expires = Some(ttl_secs);
sig
}
pub fn retry_with_backoff(max_retries: u32, initial_delay_secs: u64) -> crate::TaskOptions {
crate::TaskOptions {
max_retries: Some(max_retries),
countdown: Some(initial_delay_secs),
..Default::default()
}
}
pub fn pipeline() -> crate::Chain {
crate::Chain::new()
}
pub fn fan_out<T: serde::Serialize>(task_name: impl Into<String>, items: Vec<T>) -> crate::Map {
map(task_name, items)
}
pub fn fan_in(tasks: crate::Group, callback: crate::Signature) -> crate::Chord {
crate::Chord::new(tasks, callback)
}
}
pub mod quick_start {
#[cfg(feature = "redis")]
pub fn redis_broker(
url: &str,
queue: &str,
) -> std::result::Result<crate::RedisBroker, celers_core::error::CelersError> {
let full_url = if url.starts_with("redis://") {
url.to_string()
} else {
format!("redis://{}", url)
};
crate::RedisBroker::new(&full_url, queue)
}
#[cfg(feature = "postgres")]
pub async fn postgres_broker(
url: &str,
queue: &str,
) -> std::result::Result<crate::PostgresBroker, celers_core::error::CelersError> {
crate::PostgresBroker::with_queue(url, queue).await
}
#[cfg(feature = "mysql")]
pub async fn mysql_broker(
url: &str,
queue: &str,
) -> std::result::Result<crate::MysqlBroker, celers_core::error::CelersError> {
crate::MysqlBroker::with_queue(url, queue).await
}
#[cfg(feature = "amqp")]
pub async fn amqp_broker(
url: &str,
queue: &str,
) -> std::result::Result<crate::AmqpBroker, celers_core::error::CelersError> {
crate::AmqpBroker::new(url, queue)
.await
.map_err(|e| celers_core::error::CelersError::Broker(e.to_string()))
}
#[cfg(feature = "sqs")]
pub async fn sqs_broker(
queue: &str,
) -> std::result::Result<crate::SqsBroker, celers_core::error::CelersError> {
crate::SqsBroker::new(queue)
.await
.map_err(|e| celers_core::error::CelersError::Broker(e.to_string()))
}
pub fn default_worker_config() -> std::result::Result<crate::WorkerConfig, String> {
use celers_worker::WorkerConfigBuilder;
WorkerConfigBuilder::new()
.concurrency(num_cpus::get())
.build()
}
pub fn worker_config_with_concurrency(
concurrency: usize,
) -> std::result::Result<crate::WorkerConfig, String> {
use celers_worker::WorkerConfigBuilder;
WorkerConfigBuilder::new().concurrency(concurrency).build()
}
}
pub mod presets {
pub fn production_config() -> std::result::Result<crate::WorkerConfig, String> {
use celers_worker::WorkerConfigBuilder;
WorkerConfigBuilder::new()
.concurrency(num_cpus::get())
.poll_interval_ms(1000)
.graceful_shutdown(true)
.build()
}
pub fn high_throughput_config() -> std::result::Result<crate::WorkerConfig, String> {
use celers_worker::WorkerConfigBuilder;
let concurrency = num_cpus::get() * 4;
WorkerConfigBuilder::new()
.concurrency(concurrency)
.poll_interval_ms(100)
.build()
}
pub fn low_latency_config() -> std::result::Result<crate::WorkerConfig, String> {
use celers_worker::WorkerConfigBuilder;
WorkerConfigBuilder::new()
.concurrency(num_cpus::get() * 2)
.poll_interval_ms(50)
.build()
}
pub fn memory_constrained_config() -> std::result::Result<crate::WorkerConfig, String> {
use celers_worker::WorkerConfigBuilder;
WorkerConfigBuilder::new()
.concurrency(num_cpus::get())
.poll_interval_ms(2000)
.build()
}
pub fn cpu_bound_config() -> std::result::Result<crate::WorkerConfig, String> {
use celers_worker::WorkerConfigBuilder;
WorkerConfigBuilder::new()
.concurrency(num_cpus::get())
.poll_interval_ms(500)
.build()
}
pub fn io_bound_config() -> std::result::Result<crate::WorkerConfig, String> {
use celers_worker::WorkerConfigBuilder;
let concurrency = num_cpus::get() * 4;
WorkerConfigBuilder::new()
.concurrency(concurrency)
.poll_interval_ms(200)
.build()
}
pub fn balanced_config() -> std::result::Result<crate::WorkerConfig, String> {
use celers_worker::WorkerConfigBuilder;
let concurrency = num_cpus::get() * 2;
WorkerConfigBuilder::new()
.concurrency(concurrency)
.poll_interval_ms(500)
.build()
}
pub fn development_config() -> std::result::Result<crate::WorkerConfig, String> {
use celers_worker::WorkerConfigBuilder;
WorkerConfigBuilder::new()
.concurrency(2)
.poll_interval_ms(1000)
.build()
}
}
pub mod error {
pub use celers_kombu::BrokerError;
}
pub mod protocol {
pub use celers_protocol::*;
}
pub mod canvas {
pub use celers_canvas::*;
}
pub mod worker {
pub use celers_worker::*;
}
pub mod rate_limit {
pub use celers_core::rate_limit::*;
}
pub mod router {
pub use celers_core::router::*;
}
#[cfg(feature = "tracing")]
pub mod tracing {
pub use opentelemetry;
pub use opentelemetry_sdk;
pub use tracing;
pub use tracing_opentelemetry;
pub use tracing_subscriber;
use opentelemetry::trace::SpanKind;
use opentelemetry::KeyValue;
use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracerProvider};
use opentelemetry_sdk::Resource;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
pub fn init_tracing(_service_name: &str) -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer())
.try_init()
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
Ok(())
}
pub fn create_tracer_provider(service_name: &str) -> SdkTracerProvider {
let resource = Resource::builder()
.with_attributes([KeyValue::new("service.name", service_name.to_string())])
.build();
SdkTracerProvider::builder()
.with_sampler(Sampler::AlwaysOn)
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource)
.build()
}
pub fn task_span(task_name: &str, task_id: &str) -> Span {
tracing::info_span!(
"task.execute",
task.name = task_name,
task.id = task_id,
otel.kind = ?SpanKind::Consumer
)
}
pub fn publish_span(task_name: &str, task_id: &str) -> Span {
tracing::info_span!(
"task.publish",
task.name = task_name,
task.id = task_id,
otel.kind = ?SpanKind::Producer
)
}
pub fn extract_trace_context(headers: &std::collections::HashMap<String, String>) {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
let propagator = TraceContextPropagator::new();
let context = propagator.extract(headers);
let _ = tracing::Span::current().set_parent(context);
}
pub fn inject_trace_context(headers: &mut std::collections::HashMap<String, String>) {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
let propagator = TraceContextPropagator::new();
let context = tracing::Span::current().context();
propagator.inject_context(&context, headers);
}
}
#[cfg(any(test, feature = "dev-utils"))]
pub mod dev_utils {
use crate::{Broker, SerializedTask};
use async_trait::async_trait;
use celers_core::broker::BrokerMessage;
use celers_core::error::CelersError;
use celers_core::task::TaskId;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
type Result<T> = std::result::Result<T, CelersError>;
#[derive(Clone)]
pub struct MockBroker {
queue: Arc<Mutex<VecDeque<SerializedTask>>>,
published_tasks: Arc<Mutex<Vec<SerializedTask>>>,
}
impl MockBroker {
pub fn new() -> Self {
Self {
queue: Arc::new(Mutex::new(VecDeque::new())),
published_tasks: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn queue_len(&self) -> usize {
self.queue
.lock()
.expect("lock should not be poisoned")
.len()
}
pub fn published_tasks(&self) -> Vec<SerializedTask> {
self.published_tasks
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn clear(&self) {
self.queue
.lock()
.expect("lock should not be poisoned")
.clear();
self.published_tasks
.lock()
.expect("lock should not be poisoned")
.clear();
}
pub fn push_task(&self, task: SerializedTask) {
self.queue
.lock()
.expect("lock should not be poisoned")
.push_back(task);
}
}
impl Default for MockBroker {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Broker for MockBroker {
async fn enqueue(&self, task: SerializedTask) -> Result<TaskId> {
let task_id = task.metadata.id;
self.published_tasks
.lock()
.expect("lock should not be poisoned")
.push(task.clone());
self.queue
.lock()
.expect("lock should not be poisoned")
.push_back(task);
Ok(task_id)
}
async fn dequeue(&self) -> Result<Option<BrokerMessage>> {
let task = self
.queue
.lock()
.expect("lock should not be poisoned")
.pop_front();
Ok(task.map(BrokerMessage::new))
}
async fn ack(&self, _task_id: &TaskId, _receipt_handle: Option<&str>) -> Result<()> {
Ok(())
}
async fn reject(
&self,
_task_id: &TaskId,
_receipt_handle: Option<&str>,
_requeue: bool,
) -> Result<()> {
Ok(())
}
async fn queue_size(&self) -> Result<usize> {
Ok(self
.queue
.lock()
.expect("lock should not be poisoned")
.len())
}
async fn cancel(&self, task_id: &TaskId) -> Result<bool> {
let mut queue = self.queue.lock().expect("lock should not be poisoned");
let original_len = queue.len();
queue.retain(|t| &t.metadata.id != task_id);
Ok(queue.len() < original_len)
}
}
pub struct TaskBuilder {
name: String,
id: Option<String>,
max_retries: u32,
payload: Vec<u8>,
}
impl TaskBuilder {
pub fn new(task_name: &str) -> Self {
Self {
name: task_name.to_string(),
id: None,
max_retries: 0,
payload: Vec::new(),
}
}
pub fn id(mut self, id: String) -> Self {
self.id = Some(id);
self
}
pub fn max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = max_retries;
self
}
pub fn payload(mut self, payload: Vec<u8>) -> Self {
self.payload = payload;
self
}
pub fn build(self) -> SerializedTask {
use uuid::Uuid;
let mut task = SerializedTask::new(self.name, self.payload);
if let Some(id) = self.id {
task.metadata.id = Uuid::parse_str(&id).unwrap_or_else(|_| Uuid::new_v4());
}
task.metadata.max_retries = self.max_retries;
task
}
}
pub fn create_test_task(name: &str) -> SerializedTask {
TaskBuilder::new(name).build()
}
pub struct TaskDebugger {
task_history: Arc<Mutex<Vec<TaskDebugInfo>>>,
}
#[derive(Debug, Clone)]
pub struct TaskDebugInfo {
pub task_id: String,
pub task_name: String,
pub state: String,
pub timestamp: std::time::SystemTime,
pub metadata: std::collections::HashMap<String, String>,
}
impl TaskDebugger {
pub fn new() -> Self {
Self {
task_history: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn record_task(&self, task: &SerializedTask, state: &str) {
let mut history = self
.task_history
.lock()
.expect("lock should not be poisoned");
history.push(TaskDebugInfo {
task_id: task.metadata.id.to_string(),
task_name: task.metadata.name.clone(),
state: state.to_string(),
timestamp: std::time::SystemTime::now(),
metadata: std::collections::HashMap::new(),
});
}
pub fn history(&self) -> Vec<TaskDebugInfo> {
self.task_history
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn clear(&self) {
self.task_history
.lock()
.expect("lock should not be poisoned")
.clear();
}
pub fn tasks_by_state(&self, state: &str) -> Vec<TaskDebugInfo> {
self.task_history
.lock()
.unwrap()
.iter()
.filter(|info| info.state == state)
.cloned()
.collect()
}
pub fn print_history(&self) {
let history = self.history();
println!("\n╔══════════════════════════════════════════════════════════════════════════════╗");
println!(
"║ Task Execution History ║"
);
println!("╚══════════════════════════════════════════════════════════════════════════════╝\n");
for (idx, info) in history.iter().enumerate() {
println!("Task #{}", idx + 1);
println!(" ID: {}", info.task_id);
println!(" Name: {}", info.task_name);
println!(" State: {}", info.state);
println!(" Timestamp: {:?}", info.timestamp);
if !info.metadata.is_empty() {
println!(" Metadata:");
for (key, value) in &info.metadata {
println!(" {}: {}", key, value);
}
}
println!();
}
}
}
impl Default for TaskDebugger {
fn default() -> Self {
Self::new()
}
}
pub struct EventTracker {
events: Arc<Mutex<Vec<TrackedEvent>>>,
}
#[derive(Debug, Clone)]
pub struct TrackedEvent {
pub event_type: String,
pub task_id: Option<String>,
pub message: String,
pub timestamp: std::time::SystemTime,
}
impl EventTracker {
pub fn new() -> Self {
Self {
events: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn track(&self, event_type: &str, task_id: Option<String>, message: String) {
let mut events = self.events.lock().expect("lock should not be poisoned");
events.push(TrackedEvent {
event_type: event_type.to_string(),
task_id,
message,
timestamp: std::time::SystemTime::now(),
});
}
pub fn events(&self) -> Vec<TrackedEvent> {
self.events
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn events_by_type(&self, event_type: &str) -> Vec<TrackedEvent> {
self.events
.lock()
.unwrap()
.iter()
.filter(|e| e.event_type == event_type)
.cloned()
.collect()
}
pub fn clear(&self) {
self.events
.lock()
.expect("lock should not be poisoned")
.clear();
}
pub fn print_events(&self) {
let events = self.events();
println!("\n╔══════════════════════════════════════════════════════════════════════════════╗");
println!(
"║ Event Log ║"
);
println!("╚══════════════════════════════════════════════════════════════════════════════╝\n");
for (idx, event) in events.iter().enumerate() {
println!("Event #{}", idx + 1);
println!(" Type: {}", event.event_type);
if let Some(ref task_id) = event.task_id {
println!(" Task ID: {}", task_id);
}
println!(" Message: {}", event.message);
println!(" Timestamp: {:?}", event.timestamp);
println!();
}
}
}
impl Default for EventTracker {
fn default() -> Self {
Self::new()
}
}
pub struct PerformanceProfiler {
measurements: Arc<Mutex<Vec<PerformanceMeasurement>>>,
}
#[derive(Debug, Clone)]
pub struct PerformanceMeasurement {
pub name: String,
pub duration_ms: u128,
pub timestamp: std::time::SystemTime,
pub metadata: std::collections::HashMap<String, String>,
}
impl PerformanceProfiler {
pub fn new() -> Self {
Self {
measurements: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn start_measurement(&self, name: &str) -> MeasurementGuard {
MeasurementGuard {
name: name.to_string(),
start: std::time::Instant::now(),
profiler: self.clone(),
}
}
fn record(&self, name: String, duration_ms: u128) {
let mut measurements = self
.measurements
.lock()
.expect("lock should not be poisoned");
measurements.push(PerformanceMeasurement {
name,
duration_ms,
timestamp: std::time::SystemTime::now(),
metadata: std::collections::HashMap::new(),
});
}
pub fn measurements(&self) -> Vec<PerformanceMeasurement> {
self.measurements
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn clear(&self) {
self.measurements
.lock()
.expect("lock should not be poisoned")
.clear();
}
pub fn average_duration(&self, name: &str) -> Option<u128> {
let measurements = self
.measurements
.lock()
.expect("lock should not be poisoned");
let matching: Vec<_> = measurements
.iter()
.filter(|m| m.name == name)
.map(|m| m.duration_ms)
.collect();
if matching.is_empty() {
None
} else {
Some(matching.iter().sum::<u128>() / matching.len() as u128)
}
}
pub fn print_summary(&self) {
let measurements = self.measurements();
println!("\n╔══════════════════════════════════════════════════════════════════════════════╗");
println!(
"║ Performance Summary ║"
);
println!("╚══════════════════════════════════════════════════════════════════════════════╝\n");
let mut grouped: std::collections::HashMap<String, Vec<u128>> =
std::collections::HashMap::new();
for m in measurements {
grouped.entry(m.name).or_default().push(m.duration_ms);
}
for (name, durations) in grouped {
let count = durations.len();
let total: u128 = durations.iter().sum();
let avg = total / count as u128;
let min = *durations
.iter()
.min()
.expect("collection validated to be non-empty");
let max = *durations
.iter()
.max()
.expect("collection validated to be non-empty");
println!("{}", name);
println!(" Count: {}", count);
println!(" Avg: {} ms", avg);
println!(" Min: {} ms", min);
println!(" Max: {} ms", max);
println!(" Total: {} ms", total);
println!();
}
}
}
impl Clone for PerformanceProfiler {
fn clone(&self) -> Self {
Self {
measurements: Arc::clone(&self.measurements),
}
}
}
impl Default for PerformanceProfiler {
fn default() -> Self {
Self::new()
}
}
pub struct MeasurementGuard {
name: String,
start: std::time::Instant,
profiler: PerformanceProfiler,
}
impl Drop for MeasurementGuard {
fn drop(&mut self) {
let duration_ms = self.start.elapsed().as_millis();
self.profiler.record(self.name.clone(), duration_ms);
}
}
pub struct QueueInspector {
snapshots: Arc<Mutex<Vec<QueueSnapshot>>>,
}
#[derive(Debug, Clone)]
pub struct QueueSnapshot {
pub queue_size: usize,
pub timestamp: std::time::SystemTime,
pub metadata: std::collections::HashMap<String, String>,
}
impl QueueInspector {
pub fn new() -> Self {
Self {
snapshots: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn snapshot(&self, broker: &MockBroker) {
let size = broker.queue_len();
let mut snapshots = self.snapshots.lock().expect("lock should not be poisoned");
snapshots.push(QueueSnapshot {
queue_size: size,
timestamp: std::time::SystemTime::now(),
metadata: std::collections::HashMap::new(),
});
}
pub fn snapshots(&self) -> Vec<QueueSnapshot> {
self.snapshots
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn clear(&self) {
self.snapshots
.lock()
.expect("lock should not be poisoned")
.clear();
}
pub fn print_history(&self) {
let snapshots = self.snapshots();
println!("\n╔══════════════════════════════════════════════════════════════════════════════╗");
println!(
"║ Queue Size History ║"
);
println!("╚══════════════════════════════════════════════════════════════════════════════╝\n");
for (idx, snapshot) in snapshots.iter().enumerate() {
println!("Snapshot #{}", idx + 1);
println!(" Queue Size: {}", snapshot.queue_size);
println!(" Timestamp: {:?}", snapshot.timestamp);
println!();
}
}
}
impl Default for QueueInspector {
fn default() -> Self {
Self::new()
}
}
}
pub mod config_validation {
#[derive(Debug, thiserror::Error)]
pub enum ValidationError {
#[error("Invalid configuration: {0}")]
InvalidConfig(String),
#[error("Missing required field: {0}")]
MissingField(String),
#[error("Invalid value for {field}: {message}")]
InvalidValue {
field: String,
message: String,
},
#[error("Incompatible configuration: {0}")]
IncompatibleConfig(String),
}
pub struct ConfigValidator {
errors: Vec<ValidationError>,
warnings: Vec<String>,
}
impl ConfigValidator {
pub fn new() -> Self {
Self {
errors: Vec::new(),
warnings: Vec::new(),
}
}
pub fn require_field(&mut self, field_name: &str, value: Option<&str>) {
if value.is_none() || value == Some("") {
self.errors
.push(ValidationError::MissingField(field_name.to_string()));
}
}
pub fn add_error(&mut self, error: ValidationError) {
self.errors.push(error);
}
pub fn add_warning(&mut self, message: String) {
self.warnings.push(message);
}
pub fn is_valid(&self) -> bool {
self.errors.is_empty()
}
pub fn errors(&self) -> &[ValidationError] {
&self.errors
}
pub fn warnings(&self) -> &[String] {
&self.warnings
}
pub fn validate(self) -> Result<Vec<String>, Vec<ValidationError>> {
if self.errors.is_empty() {
Ok(self.warnings)
} else {
Err(self.errors)
}
}
}
impl Default for ConfigValidator {
fn default() -> Self {
Self::new()
}
}
pub fn validate_worker_config(
concurrency: Option<usize>,
prefetch_count: Option<usize>,
) -> Result<Vec<String>, Vec<ValidationError>> {
let mut validator = ConfigValidator::new();
if let Some(c) = concurrency {
if c == 0 {
validator.add_error(ValidationError::InvalidValue {
field: "concurrency".to_string(),
message: "must be greater than 0".to_string(),
});
}
if c > 1000 {
validator.add_warning(format!(
"High concurrency value ({}). Consider if this is intentional.",
c
));
}
}
if let Some(p) = prefetch_count {
if p == 0 {
validator.add_error(ValidationError::InvalidValue {
field: "prefetch_count".to_string(),
message: "must be greater than 0".to_string(),
});
}
if p > 1000 {
validator.add_warning(format!(
"High prefetch_count value ({}). This may consume significant memory.",
p
));
}
}
validator.validate()
}
pub fn validate_broker_url(url: &str) -> Result<String, ValidationError> {
if url.is_empty() {
return Err(ValidationError::InvalidValue {
field: "broker_url".to_string(),
message: "cannot be empty".to_string(),
});
}
if !url.contains("://") {
return Err(ValidationError::InvalidValue {
field: "broker_url".to_string(),
message: "invalid URL format (missing scheme)".to_string(),
});
}
let scheme = url.split("://").next().unwrap_or("");
match scheme {
"redis" | "rediss" | "postgres" | "postgresql" | "mysql" | "amqp" | "amqps" | "sqs" => {
Ok(format!("Valid {} URL", scheme))
}
_ => Err(ValidationError::InvalidValue {
field: "broker_url".to_string(),
message: format!("unsupported scheme: {}", scheme),
}),
}
}
pub fn check_feature_compatibility(features: &[&str]) -> Result<Vec<String>, ValidationError> {
let mut warnings = Vec::new();
let broker_features: Vec<_> = features
.iter()
.filter(|f| ["redis", "postgres", "mysql", "amqp", "sqs"].contains(f))
.collect();
if broker_features.len() > 1 {
warnings.push(format!(
"Multiple broker features enabled: {:?}. Ensure you're using the correct broker.",
broker_features
));
}
let backend_features: Vec<_> = features
.iter()
.filter(|f| ["backend-redis", "backend-db", "backend-rpc"].contains(f))
.collect();
if backend_features.len() > 1 {
warnings.push(format!(
"Multiple backend features enabled: {:?}. Ensure you're using the correct backend.",
backend_features
));
}
Ok(warnings)
}
pub fn feature_compatibility_matrix() -> String {
r#"
╔══════════════════════════════════════════════════════════════════════════════╗
║ CeleRS Feature Compatibility Matrix ║
╚══════════════════════════════════════════════════════════════════════════════╝
BROKER FEATURES (Choose ONE):
✓ redis - Redis broker (recommended for most use cases)
✓ postgres - PostgreSQL broker (good for existing PostgreSQL infrastructure)
✓ mysql - MySQL broker (good for existing MySQL infrastructure)
✓ amqp - RabbitMQ/AMQP broker (enterprise messaging)
✓ sqs - AWS SQS broker (cloud-native, serverless)
BACKEND FEATURES (Choose ONE):
✓ backend-redis - Redis result backend (recommended with redis broker)
✓ backend-db - PostgreSQL/MySQL backend (use with postgres/mysql broker)
✓ backend-rpc - gRPC result backend (distributed systems)
SERIALIZATION FEATURES (Can combine):
✓ json - JSON serialization (default, always available)
✓ msgpack - MessagePack serialization (compact binary format)
OBSERVABILITY FEATURES (Can combine):
✓ metrics - Prometheus metrics
✓ tracing - OpenTelemetry distributed tracing
OTHER FEATURES (Can combine):
✓ beat - Periodic task scheduler
✓ dev-utils - Development and testing utilities
RECOMMENDED COMBINATIONS:
1. Simple Setup:
features = ["redis", "backend-redis", "json"]
2. Production Ready:
features = ["redis", "backend-redis", "json", "metrics", "tracing"]
3. PostgreSQL Stack:
features = ["postgres", "backend-db", "json", "metrics"]
4. AWS Cloud:
features = ["sqs", "backend-rpc", "json", "msgpack", "metrics"]
5. Full Featured:
features = ["full"] # Enables all features
NOTES:
- Multiple brokers can be compiled but only one should be used at runtime
- Multiple backends can be compiled but only one should be used at runtime
- json + msgpack enables both serialization formats
- metrics + tracing provides comprehensive observability
"#
.to_string()
}
}
pub mod compile_time_validation {
#[inline]
pub const fn has_broker_feature() -> bool {
cfg!(any(
feature = "redis",
feature = "postgres",
feature = "mysql",
feature = "amqp",
feature = "sqs"
))
}
#[inline]
pub const fn has_serialization_feature() -> bool {
cfg!(any(feature = "json", feature = "msgpack"))
}
#[inline]
pub const fn count_broker_features() -> usize {
let mut count = 0;
if cfg!(feature = "redis") {
count += 1;
}
if cfg!(feature = "postgres") {
count += 1;
}
if cfg!(feature = "mysql") {
count += 1;
}
if cfg!(feature = "amqp") {
count += 1;
}
if cfg!(feature = "sqs") {
count += 1;
}
count
}
#[inline]
pub const fn count_backend_features() -> usize {
let mut count = 0;
if cfg!(feature = "backend-redis") {
count += 1;
}
if cfg!(feature = "backend-db") {
count += 1;
}
if cfg!(feature = "backend-rpc") {
count += 1;
}
count
}
#[inline]
pub const fn validate_feature_config() {
if !has_broker_feature() {
panic!(
"At least one broker feature must be enabled: redis, postgres, mysql, amqp, or sqs"
);
}
if !has_serialization_feature() {
panic!("At least one serialization feature must be enabled: json or msgpack");
}
}
pub fn feature_summary() -> String {
let broker_count = count_broker_features();
let backend_count = count_backend_features();
let mut brokers = Vec::new();
if cfg!(feature = "redis") {
brokers.push("redis");
}
if cfg!(feature = "postgres") {
brokers.push("postgres");
}
if cfg!(feature = "mysql") {
brokers.push("mysql");
}
if cfg!(feature = "amqp") {
brokers.push("amqp");
}
if cfg!(feature = "sqs") {
brokers.push("sqs");
}
let mut backends = Vec::new();
if cfg!(feature = "backend-redis") {
backends.push("redis");
}
if cfg!(feature = "backend-db") {
backends.push("database");
}
if cfg!(feature = "backend-rpc") {
backends.push("grpc");
}
let mut formats = Vec::new();
if cfg!(feature = "json") {
formats.push("json");
}
if cfg!(feature = "msgpack") {
formats.push("msgpack");
}
let mut features = Vec::new();
if cfg!(feature = "beat") {
features.push("beat");
}
if cfg!(feature = "metrics") {
features.push("metrics");
}
if cfg!(feature = "tracing") {
features.push("tracing");
}
if cfg!(feature = "dev-utils") {
features.push("dev-utils");
}
format!(
"CeleRS Configuration:\n\
Brokers ({}): {}\n\
Backends ({}): {}\n\
Formats ({}): {}\n\
Features: {}",
broker_count,
if brokers.is_empty() {
"none".to_string()
} else {
brokers.join(", ")
},
backend_count,
if backends.is_empty() {
"none".to_string()
} else {
backends.join(", ")
},
formats.len(),
if formats.is_empty() {
"none".to_string()
} else {
formats.join(", ")
},
if features.is_empty() {
"none".to_string()
} else {
features.join(", ")
}
)
}
}
#[allow(dead_code)]
const _FEATURE_VALIDATION: () = compile_time_validation::validate_feature_config();
pub mod broker_helper {
use std::env;
#[derive(Debug, thiserror::Error)]
pub enum BrokerConfigError {
#[error("Missing environment variable: {0}\n\nSuggestion: Set the environment variable before running:\n export {0}=<value>")]
MissingEnvVar(String),
#[error("Unsupported broker type: {broker_type}\n\nSupported types: redis, postgres, mysql, amqp, sqs\nNote: {note}")]
UnsupportedBrokerType {
broker_type: String,
note: String,
},
#[error("Feature not enabled: {feature}\n\nTo enable this feature, add it to your Cargo.toml:\n celers = {{ version = \"0.1\", features = [\"{feature}\"] }}\n\nAvailable features: redis, postgres, mysql, amqp, sqs, backend-redis, backend-db, backend-rpc")]
FeatureNotEnabled {
feature: String,
},
#[error("Broker creation failed: {message}\n\nPossible causes:\n{suggestions}")]
CreationFailed {
message: String,
suggestions: String,
},
}
pub async fn create_broker_from_env() -> Result<Box<dyn crate::Broker>, BrokerConfigError> {
let broker_type = env::var("CELERS_BROKER_TYPE")
.map_err(|_| BrokerConfigError::MissingEnvVar("CELERS_BROKER_TYPE".to_string()))?;
let broker_url = env::var("CELERS_BROKER_URL")
.map_err(|_| BrokerConfigError::MissingEnvVar("CELERS_BROKER_URL".to_string()))?;
let queue_name = env::var("CELERS_BROKER_QUEUE").unwrap_or_else(|_| "celers".to_string());
create_broker(&broker_type, &broker_url, &queue_name).await
}
pub async fn create_broker(
broker_type: &str,
broker_url: &str,
queue_name: &str,
) -> Result<Box<dyn crate::Broker>, BrokerConfigError> {
match broker_type.to_lowercase().as_str() {
#[cfg(feature = "redis")]
"redis" => {
use crate::RedisBroker;
RedisBroker::new(broker_url, queue_name)
.map(|b| Box::new(b) as Box<dyn crate::Broker>)
.map_err(|e| BrokerConfigError::CreationFailed {
message: e.to_string(),
suggestions: "- Check that Redis server is running\n - Verify the connection URL format: redis://host:port\n - Ensure network connectivity to Redis server".to_string(),
})
}
#[cfg(feature = "postgres")]
"postgres" | "postgresql" => {
use crate::PostgresBroker;
PostgresBroker::with_queue(broker_url, queue_name)
.await
.map(|b| Box::new(b) as Box<dyn crate::Broker>)
.map_err(|e| BrokerConfigError::CreationFailed {
message: e.to_string(),
suggestions: "- Check that PostgreSQL server is running\n - Verify the connection URL format: postgres://user:pass@host:port/db\n - Ensure database exists and user has permissions".to_string(),
})
}
#[cfg(feature = "mysql")]
"mysql" => {
use crate::MysqlBroker;
MysqlBroker::with_queue(broker_url, queue_name)
.await
.map(|b| Box::new(b) as Box<dyn crate::Broker>)
.map_err(|e| BrokerConfigError::CreationFailed {
message: e.to_string(),
suggestions: "- Check that MySQL server is running\n - Verify the connection URL format: mysql://user:pass@host:port/db\n - Ensure database exists and user has permissions".to_string(),
})
}
_ => {
#[cfg(not(feature = "redis"))]
if broker_type.to_lowercase() == "redis" {
return Err(BrokerConfigError::FeatureNotEnabled {
feature: "redis".to_string(),
});
}
#[cfg(not(feature = "postgres"))]
if broker_type.to_lowercase() == "postgres"
|| broker_type.to_lowercase() == "postgresql"
{
return Err(BrokerConfigError::FeatureNotEnabled {
feature: "postgres".to_string(),
});
}
#[cfg(not(feature = "mysql"))]
if broker_type.to_lowercase() == "mysql" {
return Err(BrokerConfigError::FeatureNotEnabled {
feature: "mysql".to_string(),
});
}
if broker_type.to_lowercase() == "amqp" || broker_type.to_lowercase() == "rabbitmq"
{
return Err(BrokerConfigError::UnsupportedBrokerType {
broker_type: broker_type.to_string(),
note: "AMQP brokers use the Transport trait. Import and use AmqpBroker directly from celers::AmqpBroker".to_string(),
});
}
if broker_type.to_lowercase() == "sqs" {
return Err(BrokerConfigError::UnsupportedBrokerType {
broker_type: broker_type.to_string(),
note: "SQS brokers use the Transport trait. Import and use SqsBroker directly from celers::SqsBroker".to_string(),
});
}
Err(BrokerConfigError::UnsupportedBrokerType {
broker_type: broker_type.to_string(),
note: "Check the broker type name for typos".to_string(),
})
}
}
}
}
pub mod startup_optimization {
use std::sync::OnceLock;
pub type AsyncInitTask<T, E> = Box<
dyn FnOnce() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>
+ Send,
>;
pub struct LazyInit<T> {
cell: OnceLock<T>,
}
impl<T> LazyInit<T> {
pub const fn new() -> Self {
Self {
cell: OnceLock::new(),
}
}
#[inline]
pub fn get_or_init<F>(&self, f: F) -> &T
where
F: FnOnce() -> T,
{
self.cell.get_or_init(f)
}
#[inline]
pub fn get(&self) -> Option<&T> {
self.cell.get()
}
}
impl<T> Default for LazyInit<T> {
fn default() -> Self {
Self::new()
}
}
#[allow(dead_code)]
fn _cached_pattern_example() {
}
pub async fn parallel_init<T, E>(tasks: Vec<AsyncInitTask<T, E>>) -> Vec<Result<T, E>>
where
T: Send + 'static,
E: Send + 'static,
{
let handles: Vec<_> = tasks
.into_iter()
.map(|task| tokio::spawn(async move { task().await }))
.collect();
let mut results = Vec::new();
for handle in handles {
match handle.await {
Ok(result) => results.push(result),
Err(e) => {
panic!("Task panicked: {:?}", e);
}
}
}
results
}
#[derive(Debug, Clone)]
pub struct StartupMetrics {
pub broker_init_ms: u64,
pub config_load_ms: u64,
pub backend_init_ms: u64,
pub total_ms: u64,
}
impl StartupMetrics {
pub fn new() -> Self {
Self {
broker_init_ms: 0,
config_load_ms: 0,
backend_init_ms: 0,
total_ms: 0,
}
}
pub fn report(&self) -> String {
format!(
"Startup Performance:\n\
- Broker Init: {}ms\n\
- Config Load: {}ms\n\
- Backend Init: {}ms\n\
- Total: {}ms",
self.broker_init_ms, self.config_load_ms, self.backend_init_ms, self.total_ms
)
}
}
impl Default for StartupMetrics {
fn default() -> Self {
Self::new()
}
}
#[macro_export]
macro_rules! time_init {
($block:block) => {{
let start = std::time::Instant::now();
let result = $block;
let duration = start.elapsed();
(result, duration)
}};
}
}
pub mod ide_support {
use std::future::Future;
use std::pin::Pin;
pub type BoxedResult<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
pub type BoxedFuture<T> = Pin<Box<dyn Future<Output = BoxedResult<T>> + Send + 'static>>;
pub type TaskFn<Args, Output> =
fn(Args) -> Pin<Box<dyn Future<Output = BoxedResult<Output>> + Send>>;
pub type BoxedBroker = Box<dyn crate::Broker>;
#[cfg(feature = "backend-redis")]
pub type BoxedResultBackend = Box<dyn crate::ResultBackend>;
pub type WorkerBuilder = celers_worker::WorkerConfigBuilder;
pub type TaskSignature = crate::Signature;
pub type ChainBuilder = crate::Chain;
pub type GroupBuilder = crate::Group;
pub type ChordBuilder = crate::Chord;
pub type QueueTask = crate::SerializedTask;
pub use crate::TaskState;
pub use crate::BrokerError;
pub type TaskId = uuid::Uuid;
pub use crate::TaskResultValue;
pub type EventEmitter = Box<dyn crate::EventEmitter>;
pub use crate::AsyncResult;
pub use crate::WorkerStats;
pub use crate::TaskOptions;
pub use crate::RateLimitConfig;
pub use crate::Router;
pub type QueueName = String;
pub type BrokerUrl = String;
pub type RetryCount = u32;
pub type PriorityLevel = u8;
pub type TimeoutSeconds = u64;
pub type TaskName = String;
pub type ConcurrencyLevel = usize;
pub type PrefetchCount = usize;
pub trait TaskArgs:
serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
{
}
impl<T> TaskArgs for T where
T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
{
}
pub trait TaskResult: serde::Serialize + serde::de::DeserializeOwned + Send + 'static {}
impl<T> TaskResult for T where T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static {}
pub trait BrokerImpl: crate::Broker + Send + Sync {}
pub mod defaults {
pub const DEFAULT_CONCURRENCY: usize = 4;
pub const DEFAULT_PREFETCH: usize = 10;
pub const DEFAULT_MAX_RETRIES: u32 = 3;
pub const DEFAULT_RETRY_DELAY_SECS: u64 = 60;
pub const DEFAULT_TASK_TIMEOUT_SECS: u64 = 3600;
pub const DEFAULT_REDIS_PORT: u16 = 6379;
pub const DEFAULT_POSTGRES_PORT: u16 = 5432;
pub const DEFAULT_MYSQL_PORT: u16 = 3306;
pub const DEFAULT_RABBITMQ_PORT: u16 = 5672;
pub const DEFAULT_QUEUE_NAME: &str = "celery";
}
pub mod examples {
pub const REDIS_URL_EXAMPLE: &str = "redis://localhost:6379";
pub const POSTGRES_URL_EXAMPLE: &str = "postgres://user:password@localhost:5432/celery";
pub const MYSQL_URL_EXAMPLE: &str = "mysql://user:password@localhost:3306/celery";
pub const RABBITMQ_URL_EXAMPLE: &str = "amqp://guest:guest@localhost:5672/";
pub const SQS_URL_EXAMPLE: &str =
"https://sqs.us-east-1.amazonaws.com/123456789012/my-queue";
}
}
pub mod quick_reference {
pub mod patterns {
}
pub mod config_examples {
}
pub mod troubleshooting {
}
}
#[cfg(feature = "dev-utils")]
pub mod assembly_inspection {
use std::process::Command;
pub fn generate_asm(function_path: &str, release: bool) -> Result<String, std::io::Error> {
let mut cmd = Command::new("cargo");
cmd.arg("asm");
if release {
cmd.arg("--release");
}
cmd.arg("--intel");
cmd.arg(function_path);
let output = cmd.output()?;
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
pub fn verify_inlined(asm: &str) -> bool {
!asm.contains("call") || asm.lines().filter(|l| l.contains("call")).count() < 2
}
pub fn count_instructions(asm: &str) -> usize {
asm.lines()
.filter(|line| {
let trimmed = line.trim();
!trimmed.is_empty()
&& !trimmed.starts_with(';')
&& !trimmed.starts_with('#')
&& !trimmed.starts_with('.')
&& !trimmed.ends_with(':')
})
.count()
}
pub fn compare_debug_release(function_path: &str) -> Result<String, std::io::Error> {
let debug_asm = generate_asm(function_path, false)?;
let release_asm = generate_asm(function_path, true)?;
let debug_count = count_instructions(&debug_asm);
let release_count = count_instructions(&release_asm);
let debug_inlined = verify_inlined(&debug_asm);
let release_inlined = verify_inlined(&release_asm);
Ok(format!(
"Assembly Comparison for {}\n\
\n\
Debug Build:\n\
- Instructions: {}\n\
- Inlined: {}\n\
\n\
Release Build:\n\
- Instructions: {}\n\
- Inlined: {}\n\
\n\
Optimization Ratio: {:.2}x fewer instructions in release\n",
function_path,
debug_count,
debug_inlined,
release_count,
release_inlined,
debug_count as f64 / release_count.max(1) as f64
))
}
}
pub mod workflow_templates {
use crate::canvas::{Chain, Chord, Group};
use crate::Signature;
use serde_json::Value;
pub fn etl_pipeline(
extract_task: &str,
extract_args: Vec<Value>,
transform_task: &str,
load_task: &str,
) -> Chain {
Chain::new()
.then(extract_task, extract_args)
.then(transform_task, vec![])
.then(load_task, vec![])
}
pub fn map_reduce_workflow(map_task: &str, items: Vec<Value>, reduce_task: &str) -> Chord {
let mut group = Group::new();
for item in items {
group = group.add(map_task, vec![item]);
}
let reduce_sig = Signature::new(reduce_task.to_string());
Chord {
header: group,
body: reduce_sig,
}
}
pub fn scatter_gather(tasks: Vec<(&str, Vec<Value>)>, gather_task: &str) -> Chord {
let mut group = Group::new();
for (task_name, args) in tasks {
group = group.add(task_name, args);
}
let gather_sig = Signature::new(gather_task.to_string());
Chord {
header: group,
body: gather_sig,
}
}
pub fn batch_processing(
process_task: &str,
items: Vec<Value>,
batch_size: usize,
aggregate_task: Option<&str>,
) -> Chord {
let mut group = Group::new();
for chunk in items.chunks(batch_size) {
let batch = Value::Array(chunk.to_vec());
group = group.add(process_task, vec![batch]);
}
let body_sig = if let Some(agg_task) = aggregate_task {
Signature::new(agg_task.to_string())
} else {
Signature::new("celers.noop".to_string())
};
Chord {
header: group,
body: body_sig,
}
}
pub fn sequential_pipeline(stages: Vec<(&str, Vec<Value>, u32)>) -> Chain {
let mut chain = Chain::new();
for (task_name, args, max_retries) in stages {
let mut sig = Signature::new(task_name.to_string()).with_args(args);
sig.options.max_retries = Some(max_retries);
chain.tasks.push(sig);
}
chain
}
pub fn priority_workflow(tasks: Vec<(&str, Vec<Value>, u8)>) -> Group {
let mut group = Group::new();
for (task_name, args, priority) in tasks {
let mut sig = Signature::new(task_name.to_string()).with_args(args);
sig.options.priority = Some(priority);
group.tasks.push(sig);
}
group
}
}
pub mod task_composition {
use crate::canvas::{Chain, Group};
use crate::Signature;
use serde_json::Value;
#[allow(dead_code)]
pub fn conditional_chain(
predicate_task: &str,
predicate_args: Vec<Value>,
success_chain: Vec<(&str, Vec<Value>)>,
_fallback_chain: Option<(&str, Vec<Value>)>,
) -> Chain {
let mut chain = Chain::new();
chain = chain.then(predicate_task, predicate_args);
for (task_name, args) in success_chain {
chain = chain.then(task_name, args);
}
chain
}
pub fn retry_wrapper(
task_name: &str,
args: Vec<Value>,
max_retries: u32,
initial_delay: u64,
) -> Signature {
let mut sig = Signature::new(task_name.to_string()).with_args(args);
sig.options.max_retries = Some(max_retries);
sig.options.countdown = Some(initial_delay);
sig
}
pub fn timeout_wrapper(task_name: &str, args: Vec<Value>, timeout_seconds: u64) -> Signature {
let mut sig = Signature::new(task_name.to_string()).with_args(args);
sig.options.time_limit = Some(timeout_seconds);
sig
}
pub fn circuit_breaker_group(tasks: Vec<(&str, Vec<Value>)>, max_failures: u32) -> Group {
let mut group = Group::new();
for (task_name, args) in tasks {
let mut sig = Signature::new(task_name.to_string()).with_args(args);
sig.options.max_retries = Some(max_failures);
group.tasks.push(sig);
}
group
}
pub fn rate_limited_workflow(
task_name: &str,
items: Vec<Value>,
delay_between_tasks: u64,
) -> Chain {
let mut chain = Chain::new();
for (i, item) in items.into_iter().enumerate() {
let mut sig = Signature::new(task_name.to_string()).with_args(vec![item]);
if i > 0 {
sig.options.countdown = Some(delay_between_tasks * i as u64);
}
chain.tasks.push(sig);
}
chain
}
}
pub mod error_recovery {
use crate::{Chain, Signature};
use serde_json::Value;
pub fn with_fallback(
primary_task: &str,
primary_args: Vec<Value>,
fallback_task: &str,
fallback_args: Vec<Value>,
) -> Chain {
Chain::new()
.then(primary_task, primary_args)
.then(fallback_task, fallback_args)
}
pub fn ignore_errors(task_name: &str, args: Vec<Value>) -> Signature {
let mut sig = Signature::new(task_name.to_string()).with_args(args);
sig.options.max_retries = Some(0); sig
}
pub fn with_exponential_backoff(
task_name: &str,
args: Vec<Value>,
max_retries: u32,
base_delay: u64,
) -> Signature {
let mut sig = Signature::new(task_name.to_string()).with_args(args);
sig.options.max_retries = Some(max_retries);
sig.options.countdown = Some(base_delay);
sig
}
pub fn with_dlq(task_name: &str, args: Vec<Value>, dlq_task: &str) -> Chain {
Chain::new().then(task_name, args).then(dlq_task, vec![])
}
}
pub mod workflow_validation {
use crate::{Chain, Chord, Group};
#[derive(Debug, Clone)]
pub struct ValidationError {
pub message: String,
pub task_name: Option<String>,
}
impl ValidationError {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
task_name: None,
}
}
pub fn with_task(message: impl Into<String>, task_name: impl Into<String>) -> Self {
Self {
message: message.into(),
task_name: Some(task_name.into()),
}
}
}
impl std::fmt::Display for ValidationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(task) = &self.task_name {
write!(f, "[{}] {}", task, self.message)
} else {
write!(f, "{}", self.message)
}
}
}
pub fn validate_chain(chain: &Chain) -> Result<(), Vec<ValidationError>> {
let mut errors = Vec::new();
if chain.tasks.is_empty() {
errors.push(ValidationError::new("Chain must contain at least one task"));
}
for task in &chain.tasks {
if task.task.is_empty() {
errors.push(ValidationError::new("Task name cannot be empty"));
}
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
pub fn validate_group(group: &Group) -> Result<(), Vec<ValidationError>> {
let mut errors = Vec::new();
if group.tasks.is_empty() {
errors.push(ValidationError::new("Group must contain at least one task"));
}
for task in &group.tasks {
if task.task.is_empty() {
errors.push(ValidationError::new("Task name cannot be empty"));
}
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
pub fn validate_chord(chord: &Chord) -> Result<(), Vec<ValidationError>> {
let mut errors = Vec::new();
if let Err(mut group_errors) = validate_group(&chord.header) {
errors.append(&mut group_errors);
}
if chord.body.task.is_empty() {
errors.push(ValidationError::with_task(
"Callback task name cannot be empty",
"body",
));
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
pub fn check_performance_concerns_group(group: &Group) -> Option<Vec<String>> {
let mut warnings = Vec::new();
if group.tasks.len() > 100 {
warnings.push(format!(
"Group contains {} tasks, which may cause performance issues. Consider batching.",
group.tasks.len()
));
}
if warnings.is_empty() {
None
} else {
Some(warnings)
}
}
pub fn check_performance_concerns_chain(chain: &Chain) -> Option<Vec<String>> {
let mut warnings = Vec::new();
if chain.tasks.len() > 50 {
warnings.push(format!(
"Chain contains {} sequential tasks, which may cause long latency. Consider parallelizing.",
chain.tasks.len()
));
}
if warnings.is_empty() {
None
} else {
Some(warnings)
}
}
}
pub mod result_helpers {
use crate::Signature;
use serde_json::Value;
pub fn create_result_collector(collector_task: &str, expected_count: usize) -> Signature {
let mut sig =
Signature::new(collector_task.to_string()).with_args(vec![serde_json::json!({
"expected_count": expected_count
})]);
sig.options.time_limit = Some(300); sig
}
pub fn create_result_filter(filter_task: &str, criteria: Value) -> Signature {
Signature::new(filter_task.to_string()).with_args(vec![criteria])
}
pub fn create_result_transformer(
transform_task: &str,
transformation_config: Value,
) -> Signature {
Signature::new(transform_task.to_string()).with_args(vec![transformation_config])
}
pub fn create_result_reducer(reduce_task: &str, operation: &str) -> Signature {
Signature::new(reduce_task.to_string()).with_args(vec![serde_json::json!({
"operation": operation
})])
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_facade_exports() {
let _: Option<Box<dyn Broker>> = None;
}
#[test]
fn test_config_validation() {
use crate::config_validation::*;
let result = validate_worker_config(Some(4), Some(10));
assert!(result.is_ok());
let result = validate_worker_config(Some(0), Some(10));
assert!(result.is_err());
let result = validate_worker_config(Some(4), Some(0));
assert!(result.is_err());
let result = validate_broker_url("redis://localhost:6379");
assert!(result.is_ok());
let result = validate_broker_url("invalid");
assert!(result.is_err());
let result = validate_broker_url("");
assert!(result.is_err());
let result = check_feature_compatibility(&["redis", "postgres"]);
assert!(result.is_ok());
assert!(!result.unwrap().is_empty()); }
#[test]
#[cfg(feature = "redis")]
fn test_redis_broker_export() {
use crate::RedisBroker;
let _: Option<RedisBroker> = None;
}
#[test]
#[cfg(feature = "postgres")]
fn test_postgres_broker_export() {
use crate::PostgresBroker;
let _: Option<PostgresBroker> = None;
}
#[test]
#[cfg(feature = "mysql")]
fn test_mysql_broker_export() {
use crate::MysqlBroker;
let _: Option<MysqlBroker> = None;
}
#[test]
#[cfg(feature = "amqp")]
fn test_amqp_broker_export() {
use crate::AmqpBroker;
let _: Option<AmqpBroker> = None;
}
#[test]
#[cfg(feature = "sqs")]
fn test_sqs_broker_export() {
use crate::SqsBroker;
let _: Option<SqsBroker> = None;
}
#[test]
#[cfg(feature = "backend-redis")]
fn test_redis_backend_export() {
use crate::RedisResultBackend;
let _: Option<RedisResultBackend> = None;
}
#[test]
#[cfg(feature = "backend-db")]
fn test_db_backend_export() {
use crate::{MysqlResultBackend, PostgresResultBackend};
let _: Option<PostgresResultBackend> = None;
let _: Option<MysqlResultBackend> = None;
}
#[test]
#[cfg(feature = "backend-rpc")]
fn test_rpc_backend_export() {
use crate::GrpcResultBackend;
let _: Option<GrpcResultBackend> = None;
}
#[test]
#[cfg(feature = "beat")]
fn test_beat_export() {
use crate::BeatScheduler;
let _: Option<BeatScheduler> = None;
}
#[test]
#[cfg(feature = "metrics")]
#[allow(unused_imports)]
fn test_metrics_export() {
use crate::{gather_metrics, reset_metrics};
}
#[test]
#[cfg(feature = "tracing")]
#[allow(unused_imports)]
fn test_tracing_export() {
use crate::tracing::{init_tracing, task_span};
}
#[test]
fn test_prelude_imports() {
use crate::prelude::*;
let _: Option<Box<dyn Broker>> = None;
let _: Option<SerializedTask> = None;
let _: Option<TaskState> = None;
}
#[tokio::test]
async fn test_mock_broker() {
use crate::dev_utils::{create_test_task, MockBroker};
use crate::Broker;
let broker = MockBroker::new();
assert_eq!(broker.queue_len(), 0);
let task = create_test_task("test.task");
let task_id = broker.enqueue(task.clone()).await.unwrap();
assert_eq!(task_id, task.metadata.id);
assert_eq!(broker.queue_len(), 1);
assert_eq!(broker.published_tasks().len(), 1);
let consumed = broker.dequeue().await.unwrap();
assert!(consumed.is_some());
let consumed_msg = consumed.unwrap();
assert_eq!(consumed_msg.task.metadata.name, "test.task");
assert_eq!(broker.queue_len(), 0);
broker.enqueue(task.clone()).await.unwrap();
broker.clear();
assert_eq!(broker.queue_len(), 0);
assert_eq!(broker.published_tasks().len(), 0);
}
#[test]
fn test_task_builder() {
use crate::dev_utils::TaskBuilder;
let task = TaskBuilder::new("my.task")
.id("550e8400-e29b-41d4-a716-446655440000".to_string())
.max_retries(3)
.build();
assert_eq!(task.metadata.name, "my.task");
assert_eq!(
task.metadata.id.to_string(),
"550e8400-e29b-41d4-a716-446655440000"
);
assert_eq!(task.metadata.max_retries, 3);
}
#[test]
fn test_compile_time_validation() {
use crate::compile_time_validation::*;
assert!(has_broker_feature());
assert!(has_serialization_feature());
assert!(count_broker_features() > 0);
let summary = feature_summary();
assert!(summary.contains("CeleRS Configuration:"));
assert!(summary.contains("Brokers"));
assert!(summary.contains("Backends"));
assert!(summary.contains("Formats"));
}
#[cfg(all(test, feature = "redis"))]
mod redis_integration {
use super::*;
#[tokio::test]
#[ignore = "requires Redis server"]
async fn test_redis_broker_integration() {
use crate::RedisBroker;
let broker_result = RedisBroker::new("redis://localhost:6379", "test_queue");
if let Ok(broker) = broker_result {
let task = crate::dev_utils::create_test_task("redis.test");
let result = broker.enqueue(task).await;
assert!(result.is_ok());
}
}
}
#[cfg(all(test, feature = "postgres"))]
mod postgres_integration {
use super::*;
#[tokio::test]
#[ignore = "requires PostgreSQL server"]
async fn test_postgres_broker_integration() {
use crate::PostgresBroker;
let broker_result =
PostgresBroker::with_queue("postgres://localhost/test", "test_queue").await;
if let Ok(broker) = broker_result {
let task = crate::dev_utils::create_test_task("postgres.test");
let result = broker.enqueue(task).await;
assert!(result.is_ok());
}
}
}
#[cfg(all(test, feature = "mysql"))]
mod mysql_integration {
use super::*;
#[tokio::test]
#[ignore = "requires MySQL server"]
async fn test_mysql_broker_integration() {
use crate::MysqlBroker;
let broker_result =
MysqlBroker::with_queue("mysql://localhost/test", "test_queue").await;
if let Ok(broker) = broker_result {
let task = crate::dev_utils::create_test_task("mysql.test");
let result = broker.enqueue(task).await;
assert!(result.is_ok());
}
}
}
#[cfg(all(test, feature = "amqp"))]
mod amqp_integration {
#[allow(unused_imports)]
use super::*;
#[tokio::test]
#[ignore = "requires RabbitMQ server"]
async fn test_amqp_broker_integration() {
use crate::AmqpBroker;
let broker_result = AmqpBroker::new("amqp://localhost:5672", "test_queue").await;
assert!(broker_result.is_ok());
}
}
#[cfg(all(test, feature = "sqs"))]
mod sqs_integration {
#[allow(unused_imports)]
use super::*;
#[tokio::test]
#[ignore = "requires AWS SQS"]
async fn test_sqs_broker_integration() {
use crate::SqsBroker;
let broker_result = SqsBroker::new("test-queue").await;
assert!(broker_result.is_ok());
}
}
#[cfg(all(test, feature = "backend-redis"))]
mod backend_redis_integration {
#[allow(unused_imports)]
use super::*;
#[tokio::test]
#[ignore = "requires Redis server"]
async fn test_redis_backend_integration() {
use crate::RedisResultBackend;
use celers_core::TaskResultValue;
let backend_result = RedisResultBackend::new("redis://localhost:6379");
if let Ok(backend) = backend_result {
use crate::ResultStore;
use uuid::Uuid;
let task_id = Uuid::new_v4();
let result = backend
.store_result(
task_id,
TaskResultValue::Success(serde_json::json!({"result": "success"})),
)
.await;
assert!(result.is_ok());
}
}
}
#[cfg(all(test, feature = "backend-db"))]
mod backend_db_integration {
#[allow(unused_imports)]
use super::*;
#[tokio::test]
#[ignore = "requires PostgreSQL server"]
async fn test_postgres_backend_integration() {
use crate::PostgresResultBackend;
use celers_core::TaskResultValue;
let backend_result = PostgresResultBackend::new("postgres://localhost/test").await;
if let Ok(backend) = backend_result {
use crate::ResultStore;
use uuid::Uuid;
let task_id = Uuid::new_v4();
let result = backend
.store_result(
task_id,
TaskResultValue::Success(serde_json::json!({"result": "success"})),
)
.await;
assert!(result.is_ok());
}
}
#[tokio::test]
#[ignore = "requires MySQL server"]
async fn test_mysql_backend_integration() {
use crate::MysqlResultBackend;
use celers_core::TaskResultValue;
let backend_result = MysqlResultBackend::new("mysql://localhost/test").await;
if let Ok(backend) = backend_result {
use crate::ResultStore;
use uuid::Uuid;
let task_id = Uuid::new_v4();
let result = backend
.store_result(
task_id,
TaskResultValue::Success(serde_json::json!({"result": "success"})),
)
.await;
assert!(result.is_ok());
}
}
}
#[cfg(all(test, feature = "beat"))]
mod beat_integration {
#[allow(unused_imports)]
use super::*;
#[tokio::test]
#[ignore = "requires broker server"]
async fn test_beat_scheduler_integration() {
use crate::BeatScheduler;
let scheduler = BeatScheduler::new();
assert!(scheduler.list_tasks().is_empty());
}
}
#[test]
fn test_workflow_chain() {
use crate::canvas::Chain;
let chain = Chain::new()
.then("task1", vec![])
.then("task2", vec![])
.then("task3", vec![]);
assert!(!chain.tasks.is_empty());
}
#[test]
fn test_workflow_group() {
use crate::canvas::Group;
let group = Group::new()
.add("task1", vec![])
.add("task2", vec![])
.add("task3", vec![]);
assert!(!group.tasks.is_empty());
}
#[test]
fn test_workflow_chord() {
use crate::canvas::{Chord, Group, Signature};
let header = Group::new().add("task1", vec![]).add("task2", vec![]);
let callback = Signature::new("callback".to_string());
let chord = Chord::new(header, callback);
assert!(!chord.header.tasks.is_empty());
}
#[test]
fn test_task_creation_performance() {
use crate::dev_utils::TaskBuilder;
use std::time::Instant;
let start = Instant::now();
for i in 0..1000 {
let _task = TaskBuilder::new(&format!("task.{}", i)).build();
}
let duration = start.elapsed();
assert!(duration.as_millis() < 100);
}
#[test]
fn test_broker_helper_functions() {
use crate::broker_helper::BrokerConfigError;
let error = BrokerConfigError::MissingEnvVar("TEST".to_string());
assert!(error.to_string().contains("TEST"));
let error = BrokerConfigError::UnsupportedBrokerType {
broker_type: "foo".to_string(),
note: "bar".to_string(),
};
assert!(error.to_string().contains("foo"));
let error = BrokerConfigError::FeatureNotEnabled {
feature: "redis".to_string(),
};
assert!(error.to_string().contains("redis"));
}
#[test]
fn test_presets_exist() {
use crate::presets::*;
let _config = production_config();
let _config = high_throughput_config();
let _config = low_latency_config();
let _config = memory_constrained_config();
}
#[test]
fn test_zero_cost_task_creation() {
use crate::dev_utils::TaskBuilder;
use std::time::Instant;
let start = Instant::now();
for _ in 0..10000 {
let _task = TaskBuilder::new("test.task").build();
}
let duration = start.elapsed();
assert!(
duration.as_millis() < 50,
"Task creation overhead too high: {}ms",
duration.as_millis()
);
}
#[test]
fn test_zero_cost_workflow_construction() {
use crate::canvas::{Chain, Group};
use std::time::Instant;
let start = Instant::now();
for _ in 0..1000 {
let _chain = Chain::new()
.then("task1", vec![])
.then("task2", vec![])
.then("task3", vec![]);
let _group = Group::new()
.add("task1", vec![])
.add("task2", vec![])
.add("task3", vec![]);
}
let duration = start.elapsed();
assert!(
duration.as_millis() < 50,
"Workflow construction overhead too high: {}ms",
duration.as_millis()
);
}
#[test]
fn test_feature_validation_overhead() {
use crate::compile_time_validation::*;
use std::time::Instant;
let start = Instant::now();
for _ in 0..100000 {
let _ = has_broker_feature();
let _ = has_serialization_feature();
let _ = count_broker_features();
let _ = count_backend_features();
}
let duration = start.elapsed();
assert!(
duration.as_millis() < 10,
"Feature validation overhead too high: {}ms",
duration.as_millis()
);
}
#[test]
fn test_memory_efficiency() {
use crate::dev_utils::TaskBuilder;
let tasks: Vec<_> = (0..1000)
.map(|i| TaskBuilder::new(&format!("task{}", i)).build())
.collect();
assert_eq!(tasks.len(), 1000);
let estimated_size_per_task = std::mem::size_of_val(&tasks[0]);
assert!(
estimated_size_per_task < 1024,
"Task size too large: {} bytes",
estimated_size_per_task
);
}
#[test]
fn test_inline_optimization_candidates() {
use crate::compile_time_validation::*;
assert!(has_broker_feature() || !has_broker_feature()); assert!(has_serialization_feature() || !has_serialization_feature());
}
#[test]
fn test_performance_regression_task_creation() {
use crate::dev_utils::TaskBuilder;
use std::time::Instant;
const BASELINE_MS: u128 = 100; const ITERATIONS: usize = 10000;
let start = Instant::now();
for i in 0..ITERATIONS {
let _task = TaskBuilder::new(&format!("task.{}", i)).build();
}
let duration = start.elapsed();
assert!(
duration.as_millis() < BASELINE_MS * 3 / 2,
"Performance regression detected: {}ms (baseline: {}ms) for {} tasks",
duration.as_millis(),
BASELINE_MS,
ITERATIONS
);
}
#[test]
fn test_performance_regression_workflow_construction() {
use crate::canvas::{Chain, Group};
use std::time::Instant;
const BASELINE_MS: u128 = 50; const ITERATIONS: usize = 1000;
let start = Instant::now();
for _ in 0..ITERATIONS {
let _chain = Chain::new()
.then("task1", vec![])
.then("task2", vec![])
.then("task3", vec![]);
let _group = Group::new()
.add("task1", vec![])
.add("task2", vec![])
.add("task3", vec![]);
}
let duration = start.elapsed();
assert!(
duration.as_millis() < BASELINE_MS * 3 / 2,
"Performance regression detected: {}ms (baseline: {}ms) for {} workflows",
duration.as_millis(),
BASELINE_MS,
ITERATIONS
);
}
#[test]
fn test_performance_regression_serialization() {
use crate::dev_utils::TaskBuilder;
use std::time::Instant;
const BASELINE_MS: u128 = 50; const ITERATIONS: usize = 1000;
let tasks: Vec<_> = (0..ITERATIONS)
.map(|i| TaskBuilder::new(&format!("task{}", i)).build())
.collect();
let start = Instant::now();
for task in &tasks {
let _serialized = serde_json::to_string(task).unwrap();
}
let duration = start.elapsed();
assert!(
duration.as_millis() < BASELINE_MS * 3 / 2,
"Performance regression detected: {}ms (baseline: {}ms) for {} serializations",
duration.as_millis(),
BASELINE_MS,
ITERATIONS
);
}
#[test]
fn test_performance_regression_config_validation() {
use crate::config_validation::*;
use std::time::Instant;
const BASELINE_MS: u128 = 100; const ITERATIONS: usize = 10000;
let start = Instant::now();
for _ in 0..ITERATIONS {
let _ = validate_worker_config(Some(4), Some(10));
let _ = validate_broker_url("redis://localhost:6379");
}
let duration = start.elapsed();
assert!(
duration.as_millis() < BASELINE_MS * 3 / 2,
"Performance regression detected: {}ms (baseline: {}ms) for {} validations",
duration.as_millis(),
BASELINE_MS,
ITERATIONS
);
}
#[test]
fn test_lazy_init() {
use crate::startup_optimization::LazyInit;
static COUNTER: LazyInit<usize> = LazyInit::new();
let value = COUNTER.get_or_init(|| 42);
assert_eq!(*value, 42);
let value2 = COUNTER.get_or_init(|| 100);
assert_eq!(*value2, 42); }
#[test]
fn test_startup_metrics() {
use crate::startup_optimization::StartupMetrics;
let mut metrics = StartupMetrics::new();
metrics.broker_init_ms = 100;
metrics.config_load_ms = 50;
metrics.backend_init_ms = 75;
metrics.total_ms = 225;
let report = metrics.report();
assert!(report.contains("100ms"));
assert!(report.contains("50ms"));
assert!(report.contains("75ms"));
assert!(report.contains("225ms"));
}
#[test]
fn test_startup_metrics_default() {
use crate::startup_optimization::StartupMetrics;
let metrics = StartupMetrics::default();
assert_eq!(metrics.broker_init_ms, 0);
assert_eq!(metrics.config_load_ms, 0);
assert_eq!(metrics.backend_init_ms, 0);
assert_eq!(metrics.total_ms, 0);
}
#[tokio::test]
async fn test_parallel_init() {
use crate::startup_optimization::{parallel_init, AsyncInitTask};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let counter = Arc::new(AtomicUsize::new(0));
type TestResult = std::result::Result<(), String>;
let tasks: Vec<AsyncInitTask<(), String>> = vec![
{
let counter = counter.clone();
Box::new(move || {
Box::pin(async move {
counter.fetch_add(1, Ordering::SeqCst);
Ok(())
})
as std::pin::Pin<Box<dyn std::future::Future<Output = TestResult> + Send>>
})
},
{
let counter = counter.clone();
Box::new(move || {
Box::pin(async move {
counter.fetch_add(1, Ordering::SeqCst);
Ok(())
})
as std::pin::Pin<Box<dyn std::future::Future<Output = TestResult> + Send>>
})
},
{
let counter = counter.clone();
Box::new(move || {
Box::pin(async move {
counter.fetch_add(1, Ordering::SeqCst);
Ok(())
})
as std::pin::Pin<Box<dyn std::future::Future<Output = TestResult> + Send>>
})
},
];
let results = parallel_init(tasks).await;
assert_eq!(results.len(), 3);
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[test]
fn test_ide_support_type_aliases() {
use crate::ide_support::*;
let _result: BoxedResult<i32> = Ok(42);
let _broker: Option<BoxedBroker> = None;
let _task: Option<QueueTask> = None;
}
#[test]
fn test_ide_support_defaults() {
use crate::ide_support::defaults::*;
assert_eq!(DEFAULT_CONCURRENCY, 4);
assert_eq!(DEFAULT_PREFETCH, 10);
assert_eq!(DEFAULT_MAX_RETRIES, 3);
assert_eq!(DEFAULT_RETRY_DELAY_SECS, 60);
assert_eq!(DEFAULT_TASK_TIMEOUT_SECS, 3600);
assert_eq!(DEFAULT_REDIS_PORT, 6379);
assert_eq!(DEFAULT_POSTGRES_PORT, 5432);
assert_eq!(DEFAULT_MYSQL_PORT, 3306);
assert_eq!(DEFAULT_RABBITMQ_PORT, 5672);
assert_eq!(DEFAULT_QUEUE_NAME, "celery");
}
#[test]
fn test_ide_support_examples() {
use crate::ide_support::examples::*;
assert!(REDIS_URL_EXAMPLE.starts_with("redis://"));
assert!(POSTGRES_URL_EXAMPLE.starts_with("postgres://"));
assert!(MYSQL_URL_EXAMPLE.starts_with("mysql://"));
assert!(RABBITMQ_URL_EXAMPLE.starts_with("amqp://"));
assert!(SQS_URL_EXAMPLE.starts_with("https://sqs"));
}
#[test]
fn test_ide_support_trait_bounds() {
use crate::ide_support::{TaskArgs, TaskResult};
fn assert_task_args<T: TaskArgs>() {}
fn assert_task_result<T: TaskResult>() {}
assert_task_args::<String>();
assert_task_args::<i32>();
assert_task_args::<Vec<u8>>();
assert_task_result::<String>();
assert_task_result::<i32>();
assert_task_result::<Vec<u8>>();
}
#[tokio::test]
async fn test_ide_support_boxed_future() {
use crate::ide_support::{BoxedFuture, BoxedResult};
fn create_future() -> BoxedFuture<String> {
Box::pin(async { Ok("test".to_string()) })
}
let result: BoxedResult<String> = create_future().await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "test");
}
#[test]
#[cfg(feature = "dev-utils")]
fn test_assembly_inspection_count_instructions() {
use crate::assembly_inspection::count_instructions;
let sample_asm = r#"
; Function prologue
push rbp
mov rbp, rsp
; Actual code
add rdi, rsi
mov rax, rdi
; Function epilogue
pop rbp
ret
"#;
let count = count_instructions(sample_asm);
assert_eq!(count, 6); }
#[test]
#[cfg(feature = "dev-utils")]
fn test_assembly_inspection_verify_inlined() {
use crate::assembly_inspection::verify_inlined;
let inlined_asm = r#"
add rdi, rsi
mov rax, rdi
ret
"#;
assert!(verify_inlined(inlined_asm));
let not_inlined_asm = r#"
push rbp
call some_function
call another_function
pop rbp
ret
"#;
assert!(!verify_inlined(not_inlined_asm));
}
#[test]
fn test_convenience_chunks() {
use crate::convenience::chunks;
use serde_json::json;
let items = vec![json!(1), json!(2), json!(3), json!(4), json!(5)];
let workflow = chunks("process_item", items, 2);
assert_eq!(workflow.task.task, "process_item");
assert_eq!(workflow.items.len(), 5);
assert_eq!(workflow.chunk_size, 2);
}
#[test]
fn test_convenience_map() {
use crate::convenience::map;
use serde_json::json;
let items = vec![json!(1), json!(2), json!(3)];
let workflow = map("square", items);
assert_eq!(workflow.task.task, "square");
assert_eq!(workflow.argsets.len(), 3);
}
#[test]
fn test_convenience_starmap() {
use crate::convenience::starmap;
use serde_json::json;
let args = vec![vec![json!(1), json!(2)], vec![json!(3), json!(4)]];
let workflow = starmap("add", args);
assert_eq!(workflow.task.task, "add");
assert_eq!(workflow.argsets.len(), 2);
assert_eq!(workflow.argsets[0].len(), 2);
}
#[test]
fn test_convenience_options() {
use crate::convenience::options;
let opts = options();
assert!(opts.max_retries.is_none());
}
#[test]
fn test_convenience_with_retry() {
use crate::convenience::with_retry;
let opts = with_retry(5, 60);
assert_eq!(opts.max_retries, Some(5));
assert_eq!(opts.countdown, Some(60));
}
#[test]
fn test_convenience_with_timeout() {
use crate::convenience::with_timeout;
let opts = with_timeout(300);
assert_eq!(opts.time_limit, Some(300));
}
#[test]
fn test_convenience_with_priority() {
use crate::convenience::with_priority;
let opts = with_priority(9);
assert_eq!(opts.priority, Some(9));
}
#[test]
fn test_convenience_with_countdown() {
use crate::convenience::with_countdown;
let opts = with_countdown(60);
assert_eq!(opts.countdown, Some(60));
}
#[test]
fn test_convenience_with_expires() {
use crate::convenience::with_expires;
let opts = with_expires(7200);
assert_eq!(opts.expires, Some(7200));
}
#[test]
fn test_convenience_batch() {
use crate::convenience::batch;
use serde_json::json;
let args_list = vec![
vec![json!(1), json!(2)],
vec![json!(3), json!(4)],
vec![json!(5), json!(6)],
];
let tasks = batch("add", args_list);
assert_eq!(tasks.len(), 3);
assert_eq!(tasks[0].task, "add");
assert_eq!(tasks[1].task, "add");
assert_eq!(tasks[2].task, "add");
}
#[test]
#[cfg(feature = "redis")]
fn test_quick_start_redis_broker() {
use crate::quick_start::redis_broker;
let result = redis_broker("localhost:6379", "test_queue");
assert!(result.is_ok() || result.is_err()); }
#[test]
#[cfg(feature = "mysql")]
fn test_quick_start_mysql_broker_function_exists() {
use crate::quick_start::mysql_broker;
let _ = mysql_broker;
}
#[test]
#[cfg(feature = "amqp")]
fn test_quick_start_amqp_broker_function_exists() {
use crate::quick_start::amqp_broker;
let _ = amqp_broker;
}
#[test]
#[cfg(feature = "sqs")]
fn test_quick_start_sqs_broker_function_exists() {
use crate::quick_start::sqs_broker;
let _ = sqs_broker;
}
#[test]
fn test_ide_support_additional_type_aliases() {
use crate::ide_support::{TaskId, WorkerStats};
let _task_id: TaskId = uuid::Uuid::new_v4();
fn accepts_worker_stats(_stats: &WorkerStats) {}
let stats = WorkerStats {
total_tasks: 0,
active_tasks: 0,
succeeded: 0,
failed: 0,
retried: 0,
uptime: 0.0,
loadavg: None,
memory_usage: None,
pool: None,
broker: None,
clock: None,
};
accepts_worker_stats(&stats);
}
#[test]
fn test_convenience_delay() {
use crate::convenience::delay;
use serde_json::json;
let sig = delay("send_email", vec![json!("user@example.com")], 60);
assert_eq!(sig.task, "send_email");
assert_eq!(sig.options.countdown, Some(60));
assert_eq!(sig.args.len(), 1);
}
#[test]
fn test_convenience_expire_in() {
use crate::convenience::expire_in;
use serde_json::json;
let sig = expire_in("process_data", vec![json!({"id": 123})], 7200);
assert_eq!(sig.task, "process_data");
assert_eq!(sig.options.expires, Some(7200));
assert_eq!(sig.args.len(), 1);
}
#[test]
fn test_convenience_high_priority() {
use crate::convenience::high_priority;
use serde_json::json;
let sig = high_priority("urgent_task", vec![json!({"alert": true})]);
assert_eq!(sig.task, "urgent_task");
assert_eq!(sig.options.priority, Some(9));
}
#[test]
fn test_convenience_low_priority() {
use crate::convenience::low_priority;
use serde_json::json;
let sig = low_priority("background_cleanup", vec![json!({})]);
assert_eq!(sig.task, "background_cleanup");
assert_eq!(sig.options.priority, Some(1));
}
#[test]
fn test_convenience_parallel() {
use crate::convenience::parallel;
use serde_json::json;
let workflow = parallel()
.add("task1", vec![json!(1)])
.add("task2", vec![json!(2)])
.add("task3", vec![json!(3)]);
assert_eq!(workflow.tasks.len(), 3);
}
#[test]
fn test_presets_cpu_bound_config() {
use crate::presets::cpu_bound_config;
let config = cpu_bound_config();
assert!(config.is_ok());
let config = config.unwrap();
assert_eq!(config.concurrency, num_cpus::get());
}
#[test]
fn test_presets_io_bound_config() {
use crate::presets::io_bound_config;
let config = io_bound_config();
assert!(config.is_ok());
let config = config.unwrap();
assert_eq!(config.concurrency, num_cpus::get() * 4);
}
#[test]
fn test_presets_balanced_config() {
use crate::presets::balanced_config;
let config = balanced_config();
assert!(config.is_ok());
let config = config.unwrap();
assert_eq!(config.concurrency, num_cpus::get() * 2);
}
#[test]
fn test_presets_development_config() {
use crate::presets::development_config;
let config = development_config();
assert!(config.is_ok());
let config = config.unwrap();
assert_eq!(config.concurrency, 2);
}
#[test]
fn test_ide_support_new_type_aliases() {
use crate::ide_support::{
BrokerUrl, ConcurrencyLevel, PrefetchCount, PriorityLevel, QueueName, RetryCount,
TaskName, TimeoutSeconds,
};
let _queue: QueueName = "celery".to_string();
let _url: BrokerUrl = "redis://localhost:6379".to_string();
let _retries: RetryCount = 3;
let _priority: PriorityLevel = 9;
let _timeout: TimeoutSeconds = 300;
let _task_name: TaskName = "my_task".to_string();
let _concurrency: ConcurrencyLevel = 4;
let _prefetch: PrefetchCount = 10;
}
#[test]
fn test_convenience_critical() {
use crate::convenience::critical;
use serde_json::json;
let sig = critical("process_payment", vec![json!({"amount": 100})]);
assert_eq!(sig.task, "process_payment");
assert_eq!(sig.options.priority, Some(9));
assert_eq!(sig.options.max_retries, Some(5));
}
#[test]
fn test_convenience_best_effort() {
use crate::convenience::best_effort;
use serde_json::json;
let sig = best_effort("update_cache", vec![json!({"key": "value"})]);
assert_eq!(sig.task, "update_cache");
assert_eq!(sig.options.priority, Some(1));
assert_eq!(sig.options.max_retries, Some(0));
}
#[test]
fn test_convenience_transient() {
use crate::convenience::transient;
use serde_json::json;
let sig = transient("temp_notification", vec![json!({"msg": "hi"})], 300);
assert_eq!(sig.task, "temp_notification");
assert_eq!(sig.options.expires, Some(300));
}
#[test]
fn test_convenience_retry_with_backoff() {
use crate::convenience::retry_with_backoff;
let opts = retry_with_backoff(5, 60);
assert_eq!(opts.max_retries, Some(5));
assert_eq!(opts.countdown, Some(60));
}
#[test]
fn test_convenience_pipeline() {
use crate::convenience::pipeline;
use serde_json::json;
let workflow = pipeline()
.then("fetch_data", vec![json!(1)])
.then("process_data", vec![json!(2)])
.then("save_results", vec![json!(3)]);
assert_eq!(workflow.tasks.len(), 3);
}
#[test]
fn test_convenience_fan_out() {
use crate::convenience::fan_out;
use serde_json::json;
let items = vec![json!(1), json!(2), json!(3)];
let workflow = fan_out("process_item", items);
assert_eq!(workflow.task.task, "process_item");
assert_eq!(workflow.argsets.len(), 3);
}
#[test]
fn test_convenience_fan_in() {
use crate::convenience::{fan_in, parallel, task};
use serde_json::json;
let tasks = parallel()
.add("task1", vec![json!(1)])
.add("task2", vec![json!(2)]);
let callback = task("aggregate_results");
let workflow = fan_in(tasks, callback);
assert_eq!(workflow.header.tasks.len(), 2);
assert_eq!(workflow.body.task, "aggregate_results");
}
#[test]
fn test_workflow_template_etl_pipeline() {
use crate::workflow_templates::etl_pipeline;
use serde_json::json;
let pipeline = etl_pipeline(
"extract",
vec![json!({"source": "db"})],
"transform",
"load",
);
assert_eq!(pipeline.tasks.len(), 3);
assert_eq!(pipeline.tasks[0].task, "extract");
assert_eq!(pipeline.tasks[1].task, "transform");
assert_eq!(pipeline.tasks[2].task, "load");
}
#[test]
fn test_workflow_template_map_reduce() {
use crate::workflow_templates::map_reduce_workflow;
use serde_json::json;
let workflow =
map_reduce_workflow("process", vec![json!(1), json!(2), json!(3)], "aggregate");
assert_eq!(workflow.header.tasks.len(), 3);
assert_eq!(workflow.body.task, "aggregate");
}
#[test]
fn test_workflow_template_scatter_gather() {
use crate::workflow_templates::scatter_gather;
use serde_json::json;
let tasks = vec![
("task1", vec![json!(1)]),
("task2", vec![json!(2)]),
("task3", vec![json!(3)]),
];
let workflow = scatter_gather(tasks, "gather");
assert_eq!(workflow.header.tasks.len(), 3);
assert_eq!(workflow.body.task, "gather");
}
#[test]
fn test_workflow_template_batch_processing() {
use crate::workflow_templates::batch_processing;
use serde_json::json;
let items: Vec<_> = (1..=25).map(|i| json!(i)).collect();
let workflow = batch_processing("process_batch", items, 10, Some("aggregate"));
assert_eq!(workflow.header.tasks.len(), 3);
assert_eq!(workflow.body.task, "aggregate");
}
#[test]
fn test_workflow_template_sequential_pipeline() {
use crate::workflow_templates::sequential_pipeline;
use serde_json::json;
let stages = vec![
("stage1", vec![json!(1)], 3),
("stage2", vec![json!(2)], 5),
("stage3", vec![json!(3)], 2),
];
let pipeline = sequential_pipeline(stages);
assert_eq!(pipeline.tasks.len(), 3);
assert_eq!(pipeline.tasks[0].options.max_retries, Some(3));
assert_eq!(pipeline.tasks[1].options.max_retries, Some(5));
assert_eq!(pipeline.tasks[2].options.max_retries, Some(2));
}
#[test]
fn test_workflow_template_priority_workflow() {
use crate::workflow_templates::priority_workflow;
use serde_json::json;
let tasks = vec![
("critical", vec![json!(1)], 9),
("normal", vec![json!(2)], 5),
("low", vec![json!(3)], 1),
];
let workflow = priority_workflow(tasks);
assert_eq!(workflow.tasks.len(), 3);
assert_eq!(workflow.tasks[0].options.priority, Some(9));
assert_eq!(workflow.tasks[1].options.priority, Some(5));
assert_eq!(workflow.tasks[2].options.priority, Some(1));
}
#[test]
fn test_task_composition_retry_wrapper() {
use crate::task_composition::retry_wrapper;
use serde_json::json;
let sig = retry_wrapper("my_task", vec![json!(1)], 5, 10);
assert_eq!(sig.task, "my_task");
assert_eq!(sig.options.max_retries, Some(5));
assert_eq!(sig.options.countdown, Some(10));
}
#[test]
fn test_task_composition_timeout_wrapper() {
use crate::task_composition::timeout_wrapper;
use serde_json::json;
let sig = timeout_wrapper("long_task", vec![json!(1)], 300);
assert_eq!(sig.task, "long_task");
assert_eq!(sig.options.time_limit, Some(300));
}
#[test]
fn test_task_composition_circuit_breaker() {
use crate::task_composition::circuit_breaker_group;
use serde_json::json;
let tasks = vec![("service_a", vec![json!(1)]), ("service_b", vec![json!(2)])];
let group = circuit_breaker_group(tasks, 3);
assert_eq!(group.tasks.len(), 2);
assert_eq!(group.tasks[0].options.max_retries, Some(3));
assert_eq!(group.tasks[1].options.max_retries, Some(3));
}
#[test]
fn test_task_composition_rate_limited() {
use crate::task_composition::rate_limited_workflow;
use serde_json::json;
let items = vec![json!(1), json!(2), json!(3)];
let workflow = rate_limited_workflow("api_call", items, 5);
assert_eq!(workflow.tasks.len(), 3);
assert_eq!(workflow.tasks[0].options.countdown, None); assert_eq!(workflow.tasks[1].options.countdown, Some(5)); assert_eq!(workflow.tasks[2].options.countdown, Some(10)); }
#[test]
fn test_prelude_workflow_templates() {
use crate::prelude::*;
use serde_json::json;
let _pipeline = etl_pipeline("extract", vec![json!(1)], "transform", "load");
let _map_reduce = map_reduce_workflow("map", vec![json!(1)], "reduce");
let _scatter = scatter_gather(vec![("t1", vec![json!(1)])], "gather");
let _batch = batch_processing("process", vec![json!(1)], 5, None);
let _seq = sequential_pipeline(vec![("s1", vec![json!(1)], 3)]);
let _priority = priority_workflow(vec![("t1", vec![json!(1)], 9)]);
}
#[test]
fn test_prelude_task_composition() {
use crate::prelude::*;
use serde_json::json;
let _retry = retry_wrapper("task", vec![json!(1)], 5, 10);
let _timeout = timeout_wrapper("task", vec![json!(1)], 300);
let _circuit = circuit_breaker_group(vec![("t1", vec![json!(1)])], 3);
let _rate = rate_limited_workflow("task", vec![json!(1)], 5);
}
#[test]
fn test_error_recovery_with_fallback() {
use crate::error_recovery::with_fallback;
use serde_json::json;
let chain = with_fallback(
"primary_task",
vec![json!(1)],
"fallback_task",
vec![json!(2)],
);
assert_eq!(chain.tasks.len(), 2);
assert_eq!(chain.tasks[0].task, "primary_task");
assert_eq!(chain.tasks[1].task, "fallback_task");
}
#[test]
fn test_error_recovery_ignore_errors() {
use crate::error_recovery::ignore_errors;
use serde_json::json;
let sig = ignore_errors("non_critical_task", vec![json!(1)]);
assert_eq!(sig.task, "non_critical_task");
assert_eq!(sig.options.max_retries, Some(0));
}
#[test]
fn test_error_recovery_exponential_backoff() {
use crate::error_recovery::with_exponential_backoff;
use serde_json::json;
let sig = with_exponential_backoff("flaky_task", vec![json!(1)], 5, 2);
assert_eq!(sig.task, "flaky_task");
assert_eq!(sig.options.max_retries, Some(5));
assert_eq!(sig.options.countdown, Some(2));
}
#[test]
fn test_error_recovery_with_dlq() {
use crate::error_recovery::with_dlq;
use serde_json::json;
let chain = with_dlq("risky_task", vec![json!(1)], "dlq_handler");
assert_eq!(chain.tasks.len(), 2);
assert_eq!(chain.tasks[0].task, "risky_task");
assert_eq!(chain.tasks[1].task, "dlq_handler");
}
#[test]
fn test_workflow_validation_chain_valid() {
use crate::workflow_validation::validate_chain;
use crate::Chain;
let chain = Chain::new().then("task1", vec![]).then("task2", vec![]);
assert!(validate_chain(&chain).is_ok());
}
#[test]
fn test_workflow_validation_chain_empty() {
use crate::workflow_validation::validate_chain;
use crate::Chain;
let chain = Chain::new();
assert!(validate_chain(&chain).is_err());
}
#[test]
fn test_workflow_validation_group_valid() {
use crate::workflow_validation::validate_group;
use crate::Group;
let group = Group::new().add("task1", vec![]).add("task2", vec![]);
assert!(validate_group(&group).is_ok());
}
#[test]
fn test_workflow_validation_group_empty() {
use crate::workflow_validation::validate_group;
use crate::Group;
let group = Group::new();
assert!(validate_group(&group).is_err());
}
#[test]
fn test_workflow_validation_chord_valid() {
use crate::workflow_validation::validate_chord;
use crate::{Chord, Group, Signature};
let chord = Chord {
header: Group::new().add("task1", vec![]),
body: Signature::new("callback".to_string()),
};
assert!(validate_chord(&chord).is_ok());
}
#[test]
fn test_workflow_validation_performance_concerns() {
use crate::workflow_validation::check_performance_concerns_group;
use crate::Group;
let mut large_group = Group::new();
for i in 0..150 {
large_group = large_group.add(&format!("task_{}", i), vec![]);
}
let warnings = check_performance_concerns_group(&large_group);
assert!(warnings.is_some());
}
#[test]
fn test_result_helpers_collector() {
use crate::result_helpers::create_result_collector;
let collector = create_result_collector("aggregate", 100);
assert_eq!(collector.task, "aggregate");
assert_eq!(collector.options.time_limit, Some(300));
}
#[test]
fn test_result_helpers_filter() {
use crate::result_helpers::create_result_filter;
use serde_json::json;
let filter = create_result_filter("filter_task", json!({"status": "success"}));
assert_eq!(filter.task, "filter_task");
assert_eq!(filter.args.len(), 1);
}
#[test]
fn test_result_helpers_transformer() {
use crate::result_helpers::create_result_transformer;
use serde_json::json;
let transformer = create_result_transformer("transform", json!({"format": "csv"}));
assert_eq!(transformer.task, "transform");
assert_eq!(transformer.args.len(), 1);
}
#[test]
fn test_result_helpers_reducer() {
use crate::result_helpers::create_result_reducer;
let reducer = create_result_reducer("reduce_task", "sum");
assert_eq!(reducer.task, "reduce_task");
assert_eq!(reducer.args.len(), 1);
}
#[test]
fn test_prelude_error_recovery() {
use crate::prelude::*;
use serde_json::json;
let _fallback = with_fallback("t1", vec![json!(1)], "t2", vec![json!(2)]);
let _ignore = ignore_errors("t", vec![json!(1)]);
let _backoff = with_exponential_backoff("t", vec![json!(1)], 5, 2);
let _dlq = with_dlq("t", vec![json!(1)], "dlq");
}
#[test]
fn test_prelude_workflow_validation() {
use crate::prelude::*;
let chain = Chain::new().then("task", vec![]);
let group = Group::new().add("task", vec![]);
let _ = validate_chain(&chain);
let _ = validate_group(&group);
let _ = check_performance_concerns_chain(&chain);
let _ = check_performance_concerns_group(&group);
let _error = WorkflowValidationError::new("test error");
}
#[test]
fn test_prelude_result_helpers() {
use crate::prelude::*;
use serde_json::json;
let _collector = create_result_collector("collect", 10);
let _filter = create_result_filter("filter", json!({}));
let _transformer = create_result_transformer("transform", json!({}));
let _reducer = create_result_reducer("reduce", "sum");
}
#[test]
fn test_advanced_patterns_module_available() {
use crate::advanced_patterns::*;
let workflow =
create_conditional_workflow("check", vec![], "success", vec![], "failure", vec![]);
assert!(!workflow.tasks.is_empty());
}
#[test]
fn test_monitoring_helpers_available() {
use crate::monitoring_helpers::*;
let monitor = TaskMonitor::new();
assert_eq!(monitor.total_tasks(), 0);
monitor.record_success(100);
assert_eq!(monitor.total_tasks(), 1);
assert_eq!(monitor.successful_tasks(), 1);
monitor.record_failure(200);
assert_eq!(monitor.total_tasks(), 2);
assert_eq!(monitor.failed_tasks(), 1);
}
#[test]
fn test_batch_helpers_available() {
use crate::batch_helpers::*;
use serde_json::json;
let items = vec![json!(1), json!(2), json!(3), json!(4), json!(5)];
let batches = create_dynamic_batches("process", items, 2);
assert_eq!(batches.header.tasks.len(), 3); }
#[test]
fn test_advanced_patterns_dynamic_workflow() {
use crate::advanced_patterns::create_dynamic_workflow;
use serde_json::json;
let workflow =
create_dynamic_workflow("generator", vec![json!({"config": "test"})], "executor");
assert_eq!(workflow.tasks.len(), 2);
}
#[test]
fn test_advanced_patterns_saga_workflow() {
use crate::advanced_patterns::create_saga_workflow;
use serde_json::json;
let steps = vec![
("step1", vec![json!(1)], "compensate1", vec![json!(1)]),
("step2", vec![json!(2)], "compensate2", vec![json!(2)]),
];
let workflow = create_saga_workflow(steps);
assert_eq!(workflow.tasks.len(), 2);
}
#[test]
fn test_monitoring_average_time() {
use crate::monitoring_helpers::TaskMonitor;
let monitor = TaskMonitor::new();
monitor.record_success(100);
monitor.record_success(200);
monitor.record_success(300);
assert_eq!(monitor.average_execution_time_ms(), 200);
}
#[test]
fn test_monitoring_success_rate() {
use crate::monitoring_helpers::TaskMonitor;
let monitor = TaskMonitor::new();
monitor.record_success(100);
monitor.record_success(100);
monitor.record_failure(100);
assert!((monitor.success_rate() - 66.67).abs() < 0.1);
}
#[test]
fn test_batch_adaptive_batches() {
use crate::batch_helpers::create_adaptive_batches;
use serde_json::json;
let items: Vec<_> = (1..=100).map(|i| json!(i)).collect();
let workflow = create_adaptive_batches("process", items, 5, 20);
assert!(!workflow.header.tasks.is_empty());
}
#[test]
fn test_batch_prioritized_batches() {
use crate::batch_helpers::create_prioritized_batches;
use serde_json::json;
let high = vec![json!(1), json!(2)];
let medium = vec![json!(3), json!(4)];
let low = vec![json!(5), json!(6)];
let group = create_prioritized_batches("process", (high, medium, low), 1);
assert_eq!(group.tasks.len(), 6);
assert_eq!(group.tasks[0].options.priority, Some(9));
assert_eq!(group.tasks[2].options.priority, Some(5));
assert_eq!(group.tasks[4].options.priority, Some(1));
}
#[test]
fn test_health_check_worker_health_checker() {
use crate::health_check::{HealthStatus, WorkerHealthChecker};
let checker = WorkerHealthChecker::default();
let result = checker.check_health();
assert_eq!(result.status, HealthStatus::Healthy);
assert!(checker.is_ready());
assert!(checker.is_alive());
checker.heartbeat();
checker.task_processed();
let result = checker.check_health();
assert_eq!(result.status, HealthStatus::Healthy);
}
#[test]
fn test_health_check_dependency_checker() {
use crate::health_check::{DependencyChecker, HealthCheckResult, HealthStatus};
let checker = DependencyChecker::new("database", || {
HealthCheckResult::healthy("Database is operational")
});
assert_eq!(checker.name(), "database");
let result = checker.check();
assert_eq!(result.status, HealthStatus::Healthy);
}
#[test]
fn test_health_check_result_builder() {
use crate::health_check::{HealthCheckResult, HealthStatus};
let result = HealthCheckResult::healthy("All systems operational")
.with_metadata("uptime", "3600")
.with_metadata("requests", "1000");
assert_eq!(result.status, HealthStatus::Healthy);
assert_eq!(result.message, "All systems operational");
assert_eq!(result.metadata.len(), 2);
}
#[test]
fn test_resource_management_limits() {
use crate::resource_management::ResourceLimits;
let limits = ResourceLimits::unlimited();
assert!(limits.max_memory_bytes.is_none());
assert!(limits.max_cpu_seconds.is_none());
let limits = ResourceLimits::memory_constrained(512);
assert_eq!(limits.max_memory_bytes, Some(512 * 1024 * 1024));
let limits = ResourceLimits::cpu_intensive(60);
assert_eq!(limits.max_cpu_seconds, Some(60));
let limits = ResourceLimits::io_intensive(300);
assert_eq!(limits.max_wall_time_seconds, Some(300));
}
#[test]
fn test_resource_management_tracker() {
use crate::resource_management::{ResourceLimits, ResourceTracker};
use std::thread;
use std::time::Duration;
let limits = ResourceLimits::memory_constrained(100);
let tracker = ResourceTracker::new(limits);
tracker.start();
thread::sleep(Duration::from_millis(10));
tracker.record_memory_usage(50 * 1024 * 1024); assert_eq!(tracker.peak_memory_bytes(), 50 * 1024 * 1024);
assert!(tracker.check_limits().is_ok());
tracker.record_memory_usage(75 * 1024 * 1024); assert_eq!(tracker.peak_memory_bytes(), 75 * 1024 * 1024);
}
#[test]
fn test_resource_management_pool() {
use crate::resource_management::ResourcePool;
let pool: ResourcePool<String> = ResourcePool::new(3);
assert!(pool.is_empty());
assert_eq!(pool.max_size(), 3);
pool.release("resource1".to_string()).unwrap();
pool.release("resource2".to_string()).unwrap();
assert_eq!(pool.available(), 2);
let r1 = pool.acquire();
assert!(r1.is_some());
assert_eq!(pool.available(), 1);
let r2 = pool.acquire();
assert!(r2.is_some());
assert_eq!(pool.available(), 0);
assert!(pool.is_empty());
}
#[test]
fn test_task_hooks_hook_registry() {
use crate::task_hooks::{HookRegistry, LoggingHook};
let mut registry = HookRegistry::new();
assert_eq!(registry.pre_hook_count(), 0);
assert_eq!(registry.post_hook_count(), 0);
registry.register_pre_hook(LoggingHook::new(false, false));
registry.register_post_hook(LoggingHook::new(false, false));
assert_eq!(registry.pre_hook_count(), 1);
assert_eq!(registry.post_hook_count(), 1);
}
#[test]
fn test_task_hooks_logging_hook() {
use crate::task_hooks::{LoggingHook, PostExecutionHook, PreExecutionHook};
use serde_json::json;
let hook = LoggingHook::new(false, false);
let mut args = vec![json!({"x": 1})];
let result = hook.before_execute("test_task", "task-123", &mut args);
assert!(result.is_ok());
let task_result: std::result::Result<serde_json::Value, String> = Ok(json!({"result": 42}));
let result = hook.after_execute("test_task", "task-123", &task_result, 100);
assert!(result.is_ok());
let task_result: std::result::Result<serde_json::Value, String> =
Err("Task failed".to_string());
let result = hook.after_execute("test_task", "task-123", &task_result, 100);
assert!(result.is_ok());
}
#[test]
fn test_task_hooks_validation_hook() {
use crate::task_hooks::{PreExecutionHook, ValidationHook};
use serde_json::json;
let hook = ValidationHook::new(|task_name: &str, args: &Vec<serde_json::Value>| {
if args.is_empty() {
return Err("Arguments cannot be empty".into());
}
if task_name == "forbidden" {
return Err("Task is forbidden".into());
}
Ok(())
});
let mut args = vec![json!({"x": 1})];
let result = hook.before_execute("allowed_task", "task-123", &mut args);
assert!(result.is_ok());
let result = hook.before_execute("forbidden", "task-123", &mut args);
assert!(result.is_err());
let mut empty_args = vec![];
let result = hook.before_execute("allowed_task", "task-123", &mut empty_args);
assert!(result.is_err());
}
#[test]
fn test_task_hooks_run_hooks() {
use crate::task_hooks::{HookRegistry, LoggingHook};
use serde_json::json;
let mut registry = HookRegistry::new();
registry.register_pre_hook(LoggingHook::new(false, false));
registry.register_post_hook(LoggingHook::new(false, false));
let mut args = vec![json!({"x": 1})];
let result = registry.run_pre_hooks("test_task", "task-123", &mut args);
assert!(result.is_ok());
let task_result: std::result::Result<serde_json::Value, String> = Ok(json!({"result": 42}));
let result = registry.run_post_hooks("test_task", "task-123", &task_result, 100);
assert!(result.is_ok());
}
#[test]
fn test_metrics_aggregation_histogram() {
use crate::metrics_aggregation::Histogram;
let mut histogram = Histogram::new();
assert_eq!(histogram.count(), 0);
assert_eq!(histogram.mean(), 0.0);
histogram.record(100.0);
histogram.record(200.0);
histogram.record(150.0);
assert_eq!(histogram.count(), 3);
assert_eq!(histogram.sum(), 450.0);
assert_eq!(histogram.mean(), 150.0);
let p50 = histogram.percentile(50.0);
assert!(p50 > 0.0);
}
#[test]
fn test_metrics_aggregation_aggregator() {
use crate::metrics_aggregation::MetricsAggregator;
use std::time::Duration;
let aggregator = MetricsAggregator::new();
aggregator.record_duration("task1", Duration::from_millis(100));
aggregator.record_duration("task1", Duration::from_millis(200));
aggregator.record_duration("task2", Duration::from_millis(50));
assert_eq!(aggregator.task_count("task1"), 2);
assert_eq!(aggregator.task_count("task2"), 1);
assert_eq!(aggregator.task_count("task3"), 0);
let mean = aggregator.mean_duration("task1");
assert!(mean > 100.0 && mean < 200.0);
let p50 = aggregator.percentile_duration("task1", 50.0);
assert!(p50 > 0.0);
let throughput = aggregator.throughput("task1");
assert!(throughput > 0.0);
}
#[test]
fn test_metrics_aggregation_error_tracking() {
use crate::metrics_aggregation::MetricsAggregator;
use std::time::Duration;
let aggregator = MetricsAggregator::new();
aggregator.record_duration("task1", Duration::from_millis(100));
aggregator.record_duration("task1", Duration::from_millis(150));
aggregator.record_error("task1");
assert_eq!(aggregator.task_count("task1"), 2);
assert_eq!(aggregator.error_count("task1"), 1);
let success_rate = aggregator.success_rate("task1");
assert!((success_rate - 50.0).abs() < 0.1);
aggregator.record_duration("task2", Duration::from_millis(100));
let success_rate = aggregator.success_rate("task2");
assert_eq!(success_rate, 100.0);
}
#[test]
fn test_metrics_aggregation_summary() {
use crate::metrics_aggregation::MetricsAggregator;
use std::time::Duration;
let aggregator = MetricsAggregator::new();
aggregator.record_duration("task1", Duration::from_millis(100));
aggregator.record_duration("task1", Duration::from_millis(200));
aggregator.record_error("task1");
let summary = aggregator.summary("task1");
assert!(summary.contains("task1"));
assert!(summary.contains("Total Executions"));
assert!(summary.contains("Mean Duration"));
assert!(summary.contains("Throughput"));
}
#[test]
fn test_metrics_aggregation_task_names() {
use crate::metrics_aggregation::MetricsAggregator;
use std::time::Duration;
let aggregator = MetricsAggregator::new();
aggregator.record_duration("task1", Duration::from_millis(100));
aggregator.record_duration("task2", Duration::from_millis(200));
aggregator.record_duration("task3", Duration::from_millis(300));
let names = aggregator.task_names();
assert_eq!(names.len(), 3);
assert!(names.contains(&"task1".to_string()));
assert!(names.contains(&"task2".to_string()));
assert!(names.contains(&"task3".to_string()));
}
#[test]
fn test_prelude_exports_new_modules() {
use crate::prelude::*;
let _: Option<WorkerHealthChecker> = None;
let _: Option<DependencyChecker> = None;
let _: Option<HealthCheckResult> = None;
let _: Option<ResourceLimits> = None;
let _: Option<ResourceTracker> = None;
let _: Option<ResourcePool<String>> = None;
let _: Option<HookRegistry> = None;
let _: Option<LoggingHook> = None;
let _: Option<MetricsAggregator> = None;
let _: Option<Histogram> = None;
}
#[test]
fn test_task_cancellation_token() {
use crate::task_cancellation::CancellationToken;
let token = CancellationToken::new();
assert!(!token.is_cancelled());
assert!(token.check_cancelled().is_ok());
token.cancel(Some("User requested cancellation".to_string()));
assert!(token.is_cancelled());
assert!(token.check_cancelled().is_err());
assert_eq!(
token.cancellation_reason(),
Some("User requested cancellation".to_string())
);
}
#[test]
fn test_task_cancellation_timeout_manager() {
use crate::task_cancellation::TimeoutManager;
use std::thread;
use std::time::Duration;
let manager = TimeoutManager::new(Duration::from_millis(100));
assert!(!manager.is_timed_out());
assert!(manager.check_timeout().is_ok());
thread::sleep(Duration::from_millis(150));
assert!(manager.is_timed_out());
assert!(manager.check_timeout().is_err());
}
#[test]
fn test_task_cancellation_execution_guard() {
use crate::task_cancellation::{CancellationToken, ExecutionGuard};
use std::time::Duration;
let token = CancellationToken::new();
let guard = ExecutionGuard::new(token.clone(), Some(Duration::from_secs(10)));
assert!(guard.should_continue().is_ok());
token.cancel(None);
assert!(guard.should_continue().is_err());
}
#[test]
fn test_retry_strategies_exponential_backoff() {
use crate::retry_strategies::RetryStrategy;
use std::time::Duration;
let strategy = RetryStrategy::exponential_backoff(3, Duration::from_secs(1));
assert_eq!(strategy.max_retries, 3);
let delay0 = strategy.calculate_delay(0);
assert_eq!(delay0, Duration::from_secs(0));
let delay1 = strategy.calculate_delay(1);
assert!(delay1.as_millis() >= 750 && delay1.as_millis() <= 1250);
let delay2 = strategy.calculate_delay(2);
assert!(delay2.as_millis() >= 1500); }
#[test]
fn test_retry_strategies_fixed_delay() {
use crate::retry_strategies::RetryStrategy;
use std::time::Duration;
let strategy = RetryStrategy::fixed_delay(5, Duration::from_millis(500));
assert_eq!(strategy.max_retries, 5);
let delay = strategy.calculate_delay(1);
assert_eq!(delay, Duration::from_millis(500));
let delay = strategy.calculate_delay(3);
assert_eq!(delay, Duration::from_millis(500));
}
#[test]
fn test_retry_strategies_default_policy() {
use crate::retry_strategies::{DefaultRetryPolicy, RetryPolicy};
let policy = DefaultRetryPolicy::new(3);
assert!(policy.should_retry("any error", 0));
assert!(policy.should_retry("any error", 2));
assert!(!policy.should_retry("any error", 3));
}
#[test]
fn test_retry_strategies_error_pattern_policy() {
use crate::retry_strategies::{ErrorPatternRetryPolicy, RetryPolicy};
let policy =
ErrorPatternRetryPolicy::new(3, vec!["timeout".to_string(), "connection".to_string()]);
assert!(policy.should_retry("connection error", 0));
assert!(policy.should_retry("timeout occurred", 1));
assert!(!policy.should_retry("invalid input", 0));
assert!(!policy.should_retry("timeout", 3)); }
#[test]
fn test_task_dependencies_graph() {
use crate::task_dependencies::DependencyGraph;
let mut graph = DependencyGraph::new();
graph.add_task("task1");
graph.add_task("task2");
graph.add_task("task3");
graph.add_dependency("task2", "task1"); graph.add_dependency("task3", "task2");
assert_eq!(graph.get_dependencies("task1"), Vec::<String>::new());
assert_eq!(graph.get_dependencies("task2"), vec!["task1"]);
assert_eq!(graph.get_dependencies("task3"), vec!["task2"]);
assert_eq!(graph.get_dependents("task1"), vec!["task2"]);
assert_eq!(graph.get_dependents("task2"), vec!["task3"]);
}
#[test]
fn test_task_dependencies_circular_detection() {
use crate::task_dependencies::DependencyGraph;
let mut graph = DependencyGraph::new();
graph.add_task("task1");
graph.add_task("task2");
graph.add_task("task3");
graph.add_dependency("task2", "task1");
graph.add_dependency("task3", "task2");
graph.add_dependency("task1", "task3");
assert!(graph.has_circular_dependencies());
}
#[test]
fn test_task_dependencies_topological_sort() {
use crate::task_dependencies::DependencyGraph;
let mut graph = DependencyGraph::new();
graph.add_task("task1");
graph.add_task("task2");
graph.add_task("task3");
graph.add_dependency("task2", "task1");
graph.add_dependency("task3", "task2");
let sorted = graph.topological_sort().unwrap();
assert_eq!(sorted, vec!["task1", "task2", "task3"]);
}
#[test]
fn test_task_dependencies_ready_tasks() {
use crate::task_dependencies::DependencyGraph;
use std::collections::HashSet;
let mut graph = DependencyGraph::new();
graph.add_task("task1");
graph.add_task("task2");
graph.add_task("task3");
graph.add_dependency("task2", "task1");
graph.add_dependency("task3", "task2");
let completed: HashSet<String> = HashSet::new();
let ready = graph.get_ready_tasks(&completed);
assert_eq!(ready, vec!["task1"]);
let mut completed = HashSet::new();
completed.insert("task1".to_string());
let ready = graph.get_ready_tasks(&completed);
assert_eq!(ready, vec!["task2"]); }
#[test]
fn test_performance_profiling_profiler() {
use crate::performance_profiling::PerformanceProfiler;
use std::thread;
use std::time::Duration;
let profiler = PerformanceProfiler::new();
profiler.start_span("operation1");
thread::sleep(Duration::from_millis(10));
profiler.end_span();
profiler.start_span("operation2");
thread::sleep(Duration::from_millis(20));
profiler.end_span();
let profile1 = profiler.get_profile("operation1").unwrap();
assert_eq!(profile1.name, "operation1");
assert_eq!(profile1.invocation_count, 1);
assert!(profile1.total_duration.as_millis() >= 10);
let profile2 = profiler.get_profile("operation2").unwrap();
assert_eq!(profile2.invocation_count, 1);
assert!(profile2.total_duration.as_millis() >= 20);
}
#[test]
fn test_performance_profiling_multiple_invocations() {
use crate::performance_profiling::PerformanceProfiler;
use std::thread;
use std::time::Duration;
let profiler = PerformanceProfiler::new();
for _ in 0..3 {
profiler.start_span("repeated_op");
thread::sleep(Duration::from_millis(5));
profiler.end_span();
}
let profile = profiler.get_profile("repeated_op").unwrap();
assert_eq!(profile.invocation_count, 3);
assert!(profile.total_duration.as_millis() >= 15);
}
#[test]
fn test_performance_profiling_slowest_operations() {
use crate::performance_profiling::PerformanceProfiler;
use std::thread;
use std::time::Duration;
let profiler = PerformanceProfiler::new();
profiler.start_span("fast_op");
thread::sleep(Duration::from_millis(10));
profiler.end_span();
profiler.start_span("slow_op");
thread::sleep(Duration::from_millis(150));
profiler.end_span();
profiler.start_span("medium_op");
thread::sleep(Duration::from_millis(50));
profiler.end_span();
let slowest = profiler.get_slowest_operations(2);
assert_eq!(slowest.len(), 2);
assert_eq!(slowest[0].name, "slow_op");
assert_eq!(slowest[1].name, "medium_op");
}
#[test]
fn test_performance_profiling_report_generation() {
use crate::performance_profiling::PerformanceProfiler;
use std::thread;
use std::time::Duration;
let profiler = PerformanceProfiler::new();
profiler.start_span("test_operation");
thread::sleep(Duration::from_millis(10));
profiler.end_span();
let report = profiler.generate_report();
assert!(report.contains("Performance Profile Report"));
assert!(report.contains("test_operation"));
assert!(report.contains("Count"));
assert!(report.contains("Total"));
}
#[test]
fn test_prelude_exports_additional_modules() {
use crate::prelude::*;
let _: Option<CancellationToken> = None;
let _: Option<TimeoutManager> = None;
let _: Option<ExecutionGuard> = None;
let _: Option<RetryStrategy> = None;
let _: Option<DefaultRetryPolicy> = None;
let _: Option<ErrorPatternRetryPolicy> = None;
let _: Option<DependencyGraph> = None;
let _: Option<PerformanceProfile> = None;
let _: Option<ProfileSpan<'_>> = None;
let _: Option<crate::performance_profiling::PerformanceProfiler> = None;
}
}
pub mod advanced_patterns {
use crate::{Chain, Group, Signature};
use serde_json::Value;
pub fn create_conditional_workflow(
condition_task: &str,
condition_args: Vec<Value>,
success_task: &str,
success_args: Vec<Value>,
failure_task: &str,
failure_args: Vec<Value>,
) -> Chain {
let mut chain = Chain::new();
chain = chain.then(condition_task, condition_args);
chain = chain.then(success_task, success_args);
let _ = failure_task;
let _ = failure_args;
chain
}
pub fn create_dynamic_workflow(
generator_task: &str,
generator_args: Vec<Value>,
executor_task: &str,
) -> Chain {
Chain::new()
.then(generator_task, generator_args)
.then(executor_task, vec![])
}
#[allow(clippy::type_complexity)]
pub fn create_parallel_chains(
chains: Vec<(&str, Vec<(&str, Vec<Value>)>)>,
aggregate_task: Option<&str>,
) -> Group {
let mut group = Group::new();
for (_chain_name, tasks) in chains {
if let Some((first_task, first_args)) = tasks.first() {
let sig = Signature::new(first_task.to_string()).with_args(first_args.clone());
group.tasks.push(sig);
}
}
let _ = aggregate_task;
group
}
pub fn create_saga_workflow(steps: Vec<(&str, Vec<Value>, &str, Vec<Value>)>) -> Chain {
let mut chain = Chain::new();
for (forward_task, forward_args, _compensate_task, _compensate_args) in steps {
chain = chain.then(forward_task, forward_args);
}
chain
}
}
pub mod monitoring_helpers {
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Clone)]
pub struct TaskMonitor {
metrics: Arc<Mutex<MonitorMetrics>>,
}
#[derive(Debug, Clone)]
struct MonitorMetrics {
total_tasks: usize,
successful_tasks: usize,
failed_tasks: usize,
total_execution_time_ms: u128,
start_time: u64,
}
impl TaskMonitor {
pub fn new() -> Self {
Self {
metrics: Arc::new(Mutex::new(MonitorMetrics {
total_tasks: 0,
successful_tasks: 0,
failed_tasks: 0,
total_execution_time_ms: 0,
start_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
})),
}
}
pub fn record_success(&self, execution_time_ms: u128) {
let mut metrics = self.metrics.lock().expect("lock should not be poisoned");
metrics.total_tasks += 1;
metrics.successful_tasks += 1;
metrics.total_execution_time_ms += execution_time_ms;
}
pub fn record_failure(&self, execution_time_ms: u128) {
let mut metrics = self.metrics.lock().expect("lock should not be poisoned");
metrics.total_tasks += 1;
metrics.failed_tasks += 1;
metrics.total_execution_time_ms += execution_time_ms;
}
pub fn total_tasks(&self) -> usize {
self.metrics
.lock()
.expect("lock should not be poisoned")
.total_tasks
}
pub fn successful_tasks(&self) -> usize {
self.metrics
.lock()
.expect("lock should not be poisoned")
.successful_tasks
}
pub fn failed_tasks(&self) -> usize {
self.metrics
.lock()
.expect("lock should not be poisoned")
.failed_tasks
}
pub fn average_execution_time_ms(&self) -> u128 {
let metrics = self.metrics.lock().expect("lock should not be poisoned");
if metrics.total_tasks == 0 {
0
} else {
metrics.total_execution_time_ms / metrics.total_tasks as u128
}
}
pub fn success_rate(&self) -> f64 {
let metrics = self.metrics.lock().expect("lock should not be poisoned");
if metrics.total_tasks == 0 {
0.0
} else {
(metrics.successful_tasks as f64 / metrics.total_tasks as f64) * 100.0
}
}
pub fn reset(&self) {
let mut metrics = self.metrics.lock().expect("lock should not be poisoned");
metrics.total_tasks = 0;
metrics.successful_tasks = 0;
metrics.failed_tasks = 0;
metrics.total_execution_time_ms = 0;
metrics.start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
}
pub fn summary(&self) -> String {
let metrics = self.metrics.lock().expect("lock should not be poisoned");
format!(
"Task Monitor Summary:\n\
- Total Tasks: {}\n\
- Successful: {} ({:.2}%)\n\
- Failed: {} ({:.2}%)\n\
- Avg Execution Time: {}ms\n\
- Uptime: {}s",
metrics.total_tasks,
metrics.successful_tasks,
if metrics.total_tasks > 0 {
(metrics.successful_tasks as f64 / metrics.total_tasks as f64) * 100.0
} else {
0.0
},
metrics.failed_tasks,
if metrics.total_tasks > 0 {
(metrics.failed_tasks as f64 / metrics.total_tasks as f64) * 100.0
} else {
0.0
},
if metrics.total_tasks > 0 {
metrics.total_execution_time_ms / metrics.total_tasks as u128
} else {
0
},
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
- metrics.start_time
)
}
}
impl Default for TaskMonitor {
fn default() -> Self {
Self::new()
}
}
}
pub mod batch_helpers {
use crate::{Chord, Group, Signature};
use serde_json::Value;
pub fn create_dynamic_batches(
task_name: &str,
items: Vec<Value>,
target_batch_size: usize,
) -> Chord {
let mut group = Group::new();
let batch_size = if target_batch_size == 0 {
1
} else {
target_batch_size
};
for chunk in items.chunks(batch_size) {
let batch = Value::Array(chunk.to_vec());
group = group.add(task_name, vec![batch]);
}
Chord {
header: group,
body: Signature::new("batch_complete".to_string()),
}
}
pub fn create_adaptive_batches(
task_name: &str,
items: Vec<Value>,
min_batch_size: usize,
max_batch_size: usize,
) -> Chord {
let mut group = Group::new();
let batch_size = if items.len() < min_batch_size {
items.len()
} else if items.len() > max_batch_size * 10 {
max_batch_size
} else {
let calculated = (items.len() as f64).sqrt() as usize;
calculated.clamp(min_batch_size, max_batch_size)
};
for chunk in items.chunks(batch_size.max(1)) {
let batch = Value::Array(chunk.to_vec());
group = group.add(task_name, vec![batch]);
}
Chord {
header: group,
body: Signature::new("batch_complete".to_string()),
}
}
pub fn create_prioritized_batches(
task_name: &str,
priority_items: (Vec<Value>, Vec<Value>, Vec<Value>),
batch_size: usize,
) -> Group {
let (high_priority, medium_priority, low_priority) = priority_items;
let mut group = Group::new();
for chunk in high_priority.chunks(batch_size.max(1)) {
let batch = Value::Array(chunk.to_vec());
let mut sig = Signature::new(task_name.to_string()).with_args(vec![batch]);
sig.options.priority = Some(9);
group.tasks.push(sig);
}
for chunk in medium_priority.chunks(batch_size.max(1)) {
let batch = Value::Array(chunk.to_vec());
let mut sig = Signature::new(task_name.to_string()).with_args(vec![batch]);
sig.options.priority = Some(5);
group.tasks.push(sig);
}
for chunk in low_priority.chunks(batch_size.max(1)) {
let batch = Value::Array(chunk.to_vec());
let mut sig = Signature::new(task_name.to_string()).with_args(vec![batch]);
sig.options.priority = Some(1);
group.tasks.push(sig);
}
group
}
}
pub mod health_check {
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
}
#[derive(Debug, Clone)]
pub struct HealthCheckResult {
pub status: HealthStatus,
pub message: String,
pub timestamp: Instant,
pub metadata: Vec<(String, String)>,
}
impl HealthCheckResult {
pub fn healthy(message: impl Into<String>) -> Self {
Self {
status: HealthStatus::Healthy,
message: message.into(),
timestamp: Instant::now(),
metadata: Vec::new(),
}
}
pub fn degraded(message: impl Into<String>) -> Self {
Self {
status: HealthStatus::Degraded,
message: message.into(),
timestamp: Instant::now(),
metadata: Vec::new(),
}
}
pub fn unhealthy(message: impl Into<String>) -> Self {
Self {
status: HealthStatus::Unhealthy,
message: message.into(),
timestamp: Instant::now(),
metadata: Vec::new(),
}
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.push((key.into(), value.into()));
self
}
}
#[derive(Clone)]
pub struct WorkerHealthChecker {
last_heartbeat: Arc<Mutex<Instant>>,
last_task_processed: Arc<Mutex<Option<Instant>>>,
heartbeat_timeout: Duration,
task_timeout: Duration,
}
impl WorkerHealthChecker {
pub fn new(heartbeat_timeout: Duration, task_timeout: Duration) -> Self {
Self {
last_heartbeat: Arc::new(Mutex::new(Instant::now())),
last_task_processed: Arc::new(Mutex::new(None)),
heartbeat_timeout,
task_timeout,
}
}
pub fn heartbeat(&self) {
*self
.last_heartbeat
.lock()
.expect("lock should not be poisoned") = Instant::now();
}
pub fn task_processed(&self) {
*self
.last_task_processed
.lock()
.expect("lock should not be poisoned") = Some(Instant::now());
}
pub fn check_health(&self) -> HealthCheckResult {
let now = Instant::now();
let last_heartbeat = *self
.last_heartbeat
.lock()
.expect("lock should not be poisoned");
let last_task = *self
.last_task_processed
.lock()
.expect("lock should not be poisoned");
if now.duration_since(last_heartbeat) > self.heartbeat_timeout {
return HealthCheckResult::unhealthy("Worker heartbeat timeout").with_metadata(
"last_heartbeat_seconds_ago",
format!("{}", now.duration_since(last_heartbeat).as_secs()),
);
}
if let Some(last_task_time) = last_task {
if now.duration_since(last_task_time) > self.task_timeout {
return HealthCheckResult::degraded("No tasks processed recently")
.with_metadata(
"last_task_seconds_ago",
format!("{}", now.duration_since(last_task_time).as_secs()),
);
}
}
HealthCheckResult::healthy("Worker is operational").with_metadata(
"uptime_seconds",
format!("{}", now.duration_since(last_heartbeat).as_secs()),
)
}
pub fn is_ready(&self) -> bool {
matches!(
self.check_health().status,
HealthStatus::Healthy | HealthStatus::Degraded
)
}
pub fn is_alive(&self) -> bool {
let now = Instant::now();
let last_heartbeat = *self
.last_heartbeat
.lock()
.expect("lock should not be poisoned");
now.duration_since(last_heartbeat) <= self.heartbeat_timeout
}
}
impl Default for WorkerHealthChecker {
fn default() -> Self {
Self::new(Duration::from_secs(30), Duration::from_secs(300))
}
}
pub struct DependencyChecker {
name: String,
check_fn: Box<dyn Fn() -> HealthCheckResult + Send + Sync>,
}
impl DependencyChecker {
pub fn new<F>(name: impl Into<String>, check_fn: F) -> Self
where
F: Fn() -> HealthCheckResult + Send + Sync + 'static,
{
Self {
name: name.into(),
check_fn: Box::new(check_fn),
}
}
pub fn check(&self) -> HealthCheckResult {
(self.check_fn)()
}
pub fn name(&self) -> &str {
&self.name
}
}
}
pub mod resource_management {
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct ResourceLimits {
pub max_memory_bytes: Option<usize>,
pub max_cpu_seconds: Option<u64>,
pub max_wall_time_seconds: Option<u64>,
pub max_file_descriptors: Option<usize>,
}
impl ResourceLimits {
pub fn unlimited() -> Self {
Self {
max_memory_bytes: None,
max_cpu_seconds: None,
max_wall_time_seconds: None,
max_file_descriptors: None,
}
}
pub fn memory_constrained(max_memory_mb: usize) -> Self {
Self {
max_memory_bytes: Some(max_memory_mb * 1024 * 1024),
max_cpu_seconds: None,
max_wall_time_seconds: Some(300), max_file_descriptors: Some(100),
}
}
pub fn cpu_intensive(max_cpu_seconds: u64) -> Self {
Self {
max_memory_bytes: None,
max_cpu_seconds: Some(max_cpu_seconds),
max_wall_time_seconds: Some(max_cpu_seconds + 60),
max_file_descriptors: Some(50),
}
}
pub fn io_intensive(max_wall_time_seconds: u64) -> Self {
Self {
max_memory_bytes: Some(512 * 1024 * 1024), max_cpu_seconds: None,
max_wall_time_seconds: Some(max_wall_time_seconds),
max_file_descriptors: Some(1000),
}
}
pub fn with_max_memory_mb(mut self, mb: usize) -> Self {
self.max_memory_bytes = Some(mb * 1024 * 1024);
self
}
pub fn with_max_cpu_seconds(mut self, seconds: u64) -> Self {
self.max_cpu_seconds = Some(seconds);
self
}
pub fn with_max_wall_time_seconds(mut self, seconds: u64) -> Self {
self.max_wall_time_seconds = Some(seconds);
self
}
}
#[derive(Clone)]
pub struct ResourceTracker {
start_time: Arc<Mutex<Instant>>,
peak_memory_bytes: Arc<Mutex<usize>>,
limits: ResourceLimits,
}
impl ResourceTracker {
pub fn new(limits: ResourceLimits) -> Self {
Self {
start_time: Arc::new(Mutex::new(Instant::now())),
peak_memory_bytes: Arc::new(Mutex::new(0)),
limits,
}
}
pub fn start(&self) {
*self.start_time.lock().expect("lock should not be poisoned") = Instant::now();
}
pub fn record_memory_usage(&self, bytes: usize) {
let mut peak = self
.peak_memory_bytes
.lock()
.expect("lock should not be poisoned");
if bytes > *peak {
*peak = bytes;
}
}
pub fn check_limits(&self) -> Result<(), String> {
let elapsed = self
.start_time
.lock()
.expect("lock should not be poisoned")
.elapsed();
if let Some(max_wall_time) = self.limits.max_wall_time_seconds {
if elapsed > Duration::from_secs(max_wall_time) {
return Err(format!(
"Wall-clock time limit exceeded: {}s > {}s",
elapsed.as_secs(),
max_wall_time
));
}
}
if let Some(max_memory) = self.limits.max_memory_bytes {
let peak_memory = *self
.peak_memory_bytes
.lock()
.expect("lock should not be poisoned");
if peak_memory > max_memory {
return Err(format!(
"Memory limit exceeded: {} bytes > {} bytes",
peak_memory, max_memory
));
}
}
Ok(())
}
pub fn elapsed(&self) -> Duration {
self.start_time
.lock()
.expect("lock should not be poisoned")
.elapsed()
}
pub fn peak_memory_bytes(&self) -> usize {
*self
.peak_memory_bytes
.lock()
.expect("lock should not be poisoned")
}
pub fn limits(&self) -> &ResourceLimits {
&self.limits
}
}
pub struct ResourcePool<T> {
resources: Arc<Mutex<Vec<T>>>,
max_size: usize,
}
impl<T> ResourcePool<T> {
pub fn new(max_size: usize) -> Self {
Self {
resources: Arc::new(Mutex::new(Vec::with_capacity(max_size))),
max_size,
}
}
pub fn acquire(&self) -> Option<T> {
self.resources
.lock()
.expect("lock should not be poisoned")
.pop()
}
pub fn release(&self, resource: T) -> Result<(), String> {
let mut resources = self.resources.lock().expect("lock should not be poisoned");
if resources.len() >= self.max_size {
return Err("Resource pool is full".to_string());
}
resources.push(resource);
Ok(())
}
pub fn available(&self) -> usize {
self.resources
.lock()
.expect("lock should not be poisoned")
.len()
}
pub fn max_size(&self) -> usize {
self.max_size
}
pub fn is_empty(&self) -> bool {
self.resources
.lock()
.expect("lock should not be poisoned")
.is_empty()
}
}
impl<T> Clone for ResourcePool<T> {
fn clone(&self) -> Self {
Self {
resources: Arc::clone(&self.resources),
max_size: self.max_size,
}
}
}
}
pub mod task_hooks {
use serde_json::Value;
use std::sync::Arc;
pub type HookResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
pub trait PreExecutionHook: Send + Sync {
fn before_execute(
&self,
task_name: &str,
task_id: &str,
args: &mut Vec<Value>,
) -> HookResult;
}
pub trait PostExecutionHook: Send + Sync {
fn after_execute(
&self,
task_name: &str,
task_id: &str,
result: &Result<Value, String>,
duration_ms: u128,
) -> HookResult;
}
pub struct HookRegistry {
pre_hooks: Vec<Arc<dyn PreExecutionHook>>,
post_hooks: Vec<Arc<dyn PostExecutionHook>>,
}
impl HookRegistry {
pub fn new() -> Self {
Self {
pre_hooks: Vec::new(),
post_hooks: Vec::new(),
}
}
pub fn register_pre_hook<H>(&mut self, hook: H)
where
H: PreExecutionHook + 'static,
{
self.pre_hooks.push(Arc::new(hook));
}
pub fn register_post_hook<H>(&mut self, hook: H)
where
H: PostExecutionHook + 'static,
{
self.post_hooks.push(Arc::new(hook));
}
pub fn run_pre_hooks(
&self,
task_name: &str,
task_id: &str,
args: &mut Vec<Value>,
) -> HookResult {
for hook in &self.pre_hooks {
hook.before_execute(task_name, task_id, args)?;
}
Ok(())
}
pub fn run_post_hooks(
&self,
task_name: &str,
task_id: &str,
result: &Result<Value, String>,
duration_ms: u128,
) -> HookResult {
for hook in &self.post_hooks {
hook.after_execute(task_name, task_id, result, duration_ms)?;
}
Ok(())
}
pub fn pre_hook_count(&self) -> usize {
self.pre_hooks.len()
}
pub fn post_hook_count(&self) -> usize {
self.post_hooks.len()
}
}
impl Default for HookRegistry {
fn default() -> Self {
Self::new()
}
}
pub struct LoggingHook {
log_args: bool,
log_results: bool,
}
impl LoggingHook {
pub fn new(log_args: bool, log_results: bool) -> Self {
Self {
log_args,
log_results,
}
}
}
impl PreExecutionHook for LoggingHook {
fn before_execute(
&self,
task_name: &str,
task_id: &str,
args: &mut Vec<Value>,
) -> HookResult {
if self.log_args {
println!("[TASK] Starting {} ({}): {:?}", task_name, task_id, args);
} else {
println!("[TASK] Starting {} ({})", task_name, task_id);
}
Ok(())
}
}
impl PostExecutionHook for LoggingHook {
fn after_execute(
&self,
task_name: &str,
task_id: &str,
result: &Result<Value, String>,
duration_ms: u128,
) -> HookResult {
match result {
Ok(value) => {
if self.log_results {
println!(
"[TASK] Completed {} ({}) in {}ms: {:?}",
task_name, task_id, duration_ms, value
);
} else {
println!(
"[TASK] Completed {} ({}) in {}ms",
task_name, task_id, duration_ms
);
}
}
Err(e) => {
println!(
"[TASK] Failed {} ({}) in {}ms: {}",
task_name, task_id, duration_ms, e
);
}
}
Ok(())
}
}
pub struct ValidationHook<F>
where
F: Fn(&str, &Vec<Value>) -> HookResult + Send + Sync,
{
validator: F,
}
impl<F> ValidationHook<F>
where
F: Fn(&str, &Vec<Value>) -> HookResult + Send + Sync,
{
pub fn new(validator: F) -> Self {
Self { validator }
}
}
impl<F> PreExecutionHook for ValidationHook<F>
where
F: Fn(&str, &Vec<Value>) -> HookResult + Send + Sync,
{
fn before_execute(
&self,
task_name: &str,
_task_id: &str,
args: &mut Vec<Value>,
) -> HookResult {
(self.validator)(task_name, args)
}
}
}
pub mod metrics_aggregation {
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct DataPoint {
pub timestamp: Instant,
pub value: f64,
}
pub struct Histogram {
buckets: Vec<(f64, usize)>, total_count: usize,
sum: f64,
}
impl Histogram {
pub fn new() -> Self {
Self::with_buckets(vec![
10.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0,
])
}
pub fn with_buckets(bucket_bounds: Vec<f64>) -> Self {
let buckets = bucket_bounds.into_iter().map(|b| (b, 0)).collect();
Self {
buckets,
total_count: 0,
sum: 0.0,
}
}
pub fn record(&mut self, value: f64) {
self.total_count += 1;
self.sum += value;
for (bound, count) in &mut self.buckets {
if value <= *bound {
*count += 1;
break;
}
}
}
pub fn count(&self) -> usize {
self.total_count
}
pub fn sum(&self) -> f64 {
self.sum
}
pub fn mean(&self) -> f64 {
if self.total_count == 0 {
0.0
} else {
self.sum / self.total_count as f64
}
}
pub fn percentile(&self, p: f64) -> f64 {
if self.total_count == 0 {
return 0.0;
}
let target_count = (self.total_count as f64 * p / 100.0) as usize;
let mut cumulative = 0;
for (bound, count) in &self.buckets {
cumulative += count;
if cumulative >= target_count {
return *bound;
}
}
self.buckets.last().map(|(b, _)| *b).unwrap_or(0.0)
}
pub fn reset(&mut self) {
for (_, count) in &mut self.buckets {
*count = 0;
}
self.total_count = 0;
self.sum = 0.0;
}
}
impl Default for Histogram {
fn default() -> Self {
Self::new()
}
}
pub struct MetricsAggregator {
task_durations: Arc<Mutex<HashMap<String, Histogram>>>,
task_counts: Arc<Mutex<HashMap<String, usize>>>,
task_errors: Arc<Mutex<HashMap<String, usize>>>,
time_series: Arc<Mutex<HashMap<String, Vec<DataPoint>>>>,
start_time: Instant,
}
impl MetricsAggregator {
pub fn new() -> Self {
Self {
task_durations: Arc::new(Mutex::new(HashMap::new())),
task_counts: Arc::new(Mutex::new(HashMap::new())),
task_errors: Arc::new(Mutex::new(HashMap::new())),
time_series: Arc::new(Mutex::new(HashMap::new())),
start_time: Instant::now(),
}
}
pub fn record_duration(&self, task_name: &str, duration: Duration) {
let duration_ms = duration.as_secs_f64() * 1000.0;
let mut durations = self
.task_durations
.lock()
.expect("lock should not be poisoned");
durations
.entry(task_name.to_string())
.or_default()
.record(duration_ms);
let mut counts = self
.task_counts
.lock()
.expect("lock should not be poisoned");
*counts.entry(task_name.to_string()).or_insert(0) += 1;
let mut series = self
.time_series
.lock()
.expect("lock should not be poisoned");
series
.entry(task_name.to_string())
.or_default()
.push(DataPoint {
timestamp: Instant::now(),
value: duration_ms,
});
}
pub fn record_error(&self, task_name: &str) {
let mut errors = self
.task_errors
.lock()
.expect("lock should not be poisoned");
*errors.entry(task_name.to_string()).or_insert(0) += 1;
}
pub fn task_count(&self, task_name: &str) -> usize {
self.task_counts
.lock()
.unwrap()
.get(task_name)
.copied()
.unwrap_or(0)
}
pub fn error_count(&self, task_name: &str) -> usize {
self.task_errors
.lock()
.unwrap()
.get(task_name)
.copied()
.unwrap_or(0)
}
pub fn success_rate(&self, task_name: &str) -> f64 {
let total = self.task_count(task_name);
if total == 0 {
return 100.0;
}
let errors = self.error_count(task_name);
((total - errors) as f64 / total as f64) * 100.0
}
pub fn mean_duration(&self, task_name: &str) -> f64 {
self.task_durations
.lock()
.unwrap()
.get(task_name)
.map(|h| h.mean())
.unwrap_or(0.0)
}
pub fn percentile_duration(&self, task_name: &str, percentile: f64) -> f64 {
self.task_durations
.lock()
.unwrap()
.get(task_name)
.map(|h| h.percentile(percentile))
.unwrap_or(0.0)
}
pub fn throughput(&self, task_name: &str) -> f64 {
let elapsed = self.start_time.elapsed().as_secs_f64();
if elapsed == 0.0 {
return 0.0;
}
self.task_count(task_name) as f64 / elapsed
}
pub fn task_names(&self) -> Vec<String> {
self.task_counts
.lock()
.expect("lock should not be poisoned")
.keys()
.cloned()
.collect()
}
pub fn summary(&self, task_name: &str) -> String {
let count = self.task_count(task_name);
let errors = self.error_count(task_name);
let success_rate = self.success_rate(task_name);
let mean = self.mean_duration(task_name);
let p50 = self.percentile_duration(task_name, 50.0);
let p95 = self.percentile_duration(task_name, 95.0);
let p99 = self.percentile_duration(task_name, 99.0);
let throughput = self.throughput(task_name);
format!(
"Task Metrics: {}\n\
- Total Executions: {}\n\
- Errors: {} ({:.2}% success rate)\n\
- Mean Duration: {:.2}ms\n\
- P50 Duration: {:.2}ms\n\
- P95 Duration: {:.2}ms\n\
- P99 Duration: {:.2}ms\n\
- Throughput: {:.2} tasks/sec",
task_name, count, errors, success_rate, mean, p50, p95, p99, throughput
)
}
pub fn reset(&self) {
self.task_durations
.lock()
.expect("lock should not be poisoned")
.clear();
self.task_counts
.lock()
.expect("lock should not be poisoned")
.clear();
self.task_errors
.lock()
.expect("lock should not be poisoned")
.clear();
self.time_series
.lock()
.expect("lock should not be poisoned")
.clear();
}
}
impl Default for MetricsAggregator {
fn default() -> Self {
Self::new()
}
}
impl Clone for MetricsAggregator {
fn clone(&self) -> Self {
Self {
task_durations: Arc::clone(&self.task_durations),
task_counts: Arc::clone(&self.task_counts),
task_errors: Arc::clone(&self.task_errors),
time_series: Arc::clone(&self.time_series),
start_time: self.start_time,
}
}
}
}
pub mod task_cancellation {
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[derive(Clone)]
pub struct CancellationToken {
cancelled: Arc<Mutex<bool>>,
cancellation_reason: Arc<Mutex<Option<String>>>,
}
impl CancellationToken {
pub fn new() -> Self {
Self {
cancelled: Arc::new(Mutex::new(false)),
cancellation_reason: Arc::new(Mutex::new(None)),
}
}
pub fn cancel(&self, reason: Option<String>) {
*self.cancelled.lock().expect("lock should not be poisoned") = true;
*self
.cancellation_reason
.lock()
.expect("lock should not be poisoned") = reason;
}
pub fn is_cancelled(&self) -> bool {
*self.cancelled.lock().expect("lock should not be poisoned")
}
pub fn cancellation_reason(&self) -> Option<String> {
self.cancellation_reason
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn check_cancelled(&self) -> Result<(), String> {
if self.is_cancelled() {
Err(self
.cancellation_reason()
.unwrap_or_else(|| "Task was cancelled".to_string()))
} else {
Ok(())
}
}
}
impl Default for CancellationToken {
fn default() -> Self {
Self::new()
}
}
pub struct TimeoutManager {
timeout: Duration,
start_time: Instant,
}
impl TimeoutManager {
pub fn new(timeout: Duration) -> Self {
Self {
timeout,
start_time: Instant::now(),
}
}
pub fn is_timed_out(&self) -> bool {
self.start_time.elapsed() > self.timeout
}
pub fn remaining(&self) -> Duration {
let elapsed = self.start_time.elapsed();
if elapsed >= self.timeout {
Duration::from_secs(0)
} else {
self.timeout - elapsed
}
}
pub fn elapsed(&self) -> Duration {
self.start_time.elapsed()
}
pub fn check_timeout(&self) -> Result<(), String> {
if self.is_timed_out() {
Err(format!(
"Task timeout exceeded: {}s",
self.timeout.as_secs()
))
} else {
Ok(())
}
}
}
pub struct ExecutionGuard {
cancellation_token: CancellationToken,
timeout_manager: Option<TimeoutManager>,
}
impl ExecutionGuard {
pub fn new(cancellation_token: CancellationToken, timeout: Option<Duration>) -> Self {
Self {
cancellation_token,
timeout_manager: timeout.map(TimeoutManager::new),
}
}
pub fn should_continue(&self) -> Result<(), String> {
self.cancellation_token.check_cancelled()?;
if let Some(timeout_mgr) = &self.timeout_manager {
timeout_mgr.check_timeout()?;
}
Ok(())
}
pub fn cancellation_token(&self) -> &CancellationToken {
&self.cancellation_token
}
pub fn remaining_timeout(&self) -> Option<Duration> {
self.timeout_manager.as_ref().map(|t| t.remaining())
}
}
}
pub mod retry_strategies {
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct RetryStrategy {
pub max_retries: u32,
pub initial_delay: Duration,
pub max_delay: Duration,
pub backoff_multiplier: f64,
pub use_jitter: bool,
}
impl RetryStrategy {
pub fn exponential_backoff(max_retries: u32, initial_delay: Duration) -> Self {
Self {
max_retries,
initial_delay,
max_delay: Duration::from_secs(300), backoff_multiplier: 2.0,
use_jitter: true,
}
}
pub fn linear_backoff(max_retries: u32, delay: Duration) -> Self {
Self {
max_retries,
initial_delay: delay,
max_delay: delay,
backoff_multiplier: 1.0,
use_jitter: false,
}
}
pub fn fixed_delay(max_retries: u32, delay: Duration) -> Self {
Self {
max_retries,
initial_delay: delay,
max_delay: delay,
backoff_multiplier: 1.0,
use_jitter: false,
}
}
pub fn fibonacci_backoff(max_retries: u32, base_delay: Duration) -> Self {
Self {
max_retries,
initial_delay: base_delay,
max_delay: Duration::from_secs(600), backoff_multiplier: 1.618, use_jitter: true,
}
}
pub fn calculate_delay(&self, attempt: u32) -> Duration {
if attempt == 0 {
return Duration::from_secs(0);
}
let base_delay = if self.backoff_multiplier == 1.0 {
self.initial_delay
} else {
let multiplier = self.backoff_multiplier.powi(attempt as i32 - 1);
let delay_ms = self.initial_delay.as_millis() as f64 * multiplier;
Duration::from_millis(delay_ms.min(self.max_delay.as_millis() as f64) as u64)
};
if self.use_jitter {
let jitter_factor = 0.75 + (rand::random::<f64>() * 0.5);
let delay_ms = (base_delay.as_millis() as f64 * jitter_factor) as u64;
Duration::from_millis(delay_ms)
} else {
base_delay
}
}
pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
self.max_delay = max_delay;
self
}
pub fn with_jitter(mut self, use_jitter: bool) -> Self {
self.use_jitter = use_jitter;
self
}
pub fn with_multiplier(mut self, multiplier: f64) -> Self {
self.backoff_multiplier = multiplier;
self
}
}
impl Default for RetryStrategy {
fn default() -> Self {
Self::exponential_backoff(3, Duration::from_secs(1))
}
}
pub trait RetryPolicy: Send + Sync {
fn should_retry(&self, error: &str, attempt: u32) -> bool;
}
pub struct DefaultRetryPolicy {
max_retries: u32,
}
impl DefaultRetryPolicy {
pub fn new(max_retries: u32) -> Self {
Self { max_retries }
}
}
impl RetryPolicy for DefaultRetryPolicy {
fn should_retry(&self, _error: &str, attempt: u32) -> bool {
attempt < self.max_retries
}
}
pub struct ErrorPatternRetryPolicy {
max_retries: u32,
retryable_patterns: Vec<String>,
}
impl ErrorPatternRetryPolicy {
pub fn new(max_retries: u32, retryable_patterns: Vec<String>) -> Self {
Self {
max_retries,
retryable_patterns,
}
}
}
impl RetryPolicy for ErrorPatternRetryPolicy {
fn should_retry(&self, error: &str, attempt: u32) -> bool {
if attempt >= self.max_retries {
return false;
}
self.retryable_patterns
.iter()
.any(|pattern| error.contains(pattern))
}
}
}
pub mod task_dependencies {
use std::collections::{HashMap, HashSet};
pub struct DependencyGraph {
dependencies: HashMap<String, HashSet<String>>,
dependents: HashMap<String, HashSet<String>>,
}
impl DependencyGraph {
pub fn new() -> Self {
Self {
dependencies: HashMap::new(),
dependents: HashMap::new(),
}
}
pub fn add_task(&mut self, task_id: impl Into<String>) {
let task_id = task_id.into();
self.dependencies.entry(task_id.clone()).or_default();
self.dependents.entry(task_id).or_default();
}
pub fn add_dependency(
&mut self,
task_id: impl Into<String>,
dependency_id: impl Into<String>,
) {
let task_id = task_id.into();
let dependency_id = dependency_id.into();
self.dependencies
.entry(task_id.clone())
.or_default()
.insert(dependency_id.clone());
self.dependents
.entry(dependency_id)
.or_default()
.insert(task_id);
}
pub fn get_dependencies(&self, task_id: &str) -> Vec<String> {
self.dependencies
.get(task_id)
.map(|deps| deps.iter().cloned().collect())
.unwrap_or_default()
}
pub fn get_dependents(&self, task_id: &str) -> Vec<String> {
self.dependents
.get(task_id)
.map(|deps| deps.iter().cloned().collect())
.unwrap_or_default()
}
pub fn has_circular_dependencies(&self) -> bool {
let mut visited = HashSet::new();
let mut rec_stack = HashSet::new();
for task_id in self.dependencies.keys() {
if self.has_cycle(task_id, &mut visited, &mut rec_stack) {
return true;
}
}
false
}
fn has_cycle(
&self,
task_id: &str,
visited: &mut HashSet<String>,
rec_stack: &mut HashSet<String>,
) -> bool {
if rec_stack.contains(task_id) {
return true;
}
if visited.contains(task_id) {
return false;
}
visited.insert(task_id.to_string());
rec_stack.insert(task_id.to_string());
if let Some(deps) = self.dependencies.get(task_id) {
for dep in deps {
if self.has_cycle(dep, visited, rec_stack) {
return true;
}
}
}
rec_stack.remove(task_id);
false
}
pub fn topological_sort(&self) -> Result<Vec<String>, String> {
if self.has_circular_dependencies() {
return Err("Circular dependencies detected".to_string());
}
let mut result = Vec::new();
let mut visited = HashSet::new();
let mut temp_mark = HashSet::new();
for task_id in self.dependencies.keys() {
if !visited.contains(task_id) {
self.visit(task_id, &mut visited, &mut temp_mark, &mut result)?;
}
}
Ok(result)
}
fn visit(
&self,
task_id: &str,
visited: &mut HashSet<String>,
temp_mark: &mut HashSet<String>,
result: &mut Vec<String>,
) -> Result<(), String> {
if temp_mark.contains(task_id) {
return Err("Circular dependency detected".to_string());
}
if visited.contains(task_id) {
return Ok(());
}
temp_mark.insert(task_id.to_string());
if let Some(deps) = self.dependencies.get(task_id) {
for dep in deps {
self.visit(dep, visited, temp_mark, result)?;
}
}
temp_mark.remove(task_id);
visited.insert(task_id.to_string());
result.push(task_id.to_string());
Ok(())
}
pub fn get_ready_tasks(&self, completed_tasks: &HashSet<String>) -> Vec<String> {
self.dependencies
.iter()
.filter(|(task_id, deps)| {
!completed_tasks.contains(*task_id)
&& deps.iter().all(|dep| completed_tasks.contains(dep))
})
.map(|(task_id, _)| task_id.clone())
.collect()
}
}
impl Default for DependencyGraph {
fn default() -> Self {
Self::new()
}
}
}
pub mod performance_profiling {
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct PerformanceProfile {
pub name: String,
pub total_duration: Duration,
pub children_duration: Duration,
pub self_duration: Duration,
pub invocation_count: usize,
}
impl PerformanceProfile {
pub fn avg_duration(&self) -> Duration {
if self.invocation_count == 0 {
Duration::from_secs(0)
} else {
self.total_duration / self.invocation_count as u32
}
}
pub fn self_time_percentage(&self) -> f64 {
if self.total_duration.as_millis() == 0 {
0.0
} else {
(self.self_duration.as_millis() as f64 / self.total_duration.as_millis() as f64)
* 100.0
}
}
}
pub struct PerformanceProfiler {
profiles: Arc<Mutex<HashMap<String, PerformanceProfile>>>,
active_spans: Arc<Mutex<Vec<(String, Instant)>>>,
}
impl PerformanceProfiler {
pub fn new() -> Self {
Self {
profiles: Arc::new(Mutex::new(HashMap::new())),
active_spans: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn start_span(&self, name: impl Into<String>) {
let name = name.into();
self.active_spans
.lock()
.unwrap()
.push((name, Instant::now()));
}
pub fn end_span(&self) {
let span_data = self
.active_spans
.lock()
.expect("lock should not be poisoned")
.pop();
if let Some((name, start_time)) = span_data {
let duration = start_time.elapsed();
let mut profiles = self.profiles.lock().expect("lock should not be poisoned");
let profile = profiles
.entry(name.clone())
.or_insert_with(|| PerformanceProfile {
name: name.clone(),
total_duration: Duration::from_secs(0),
children_duration: Duration::from_secs(0),
self_duration: Duration::from_secs(0),
invocation_count: 0,
});
profile.total_duration += duration;
profile.invocation_count += 1;
profile.self_duration = profile.total_duration - profile.children_duration;
}
}
pub fn get_profile(&self, name: &str) -> Option<PerformanceProfile> {
self.profiles
.lock()
.expect("lock should not be poisoned")
.get(name)
.cloned()
}
pub fn get_all_profiles(&self) -> Vec<PerformanceProfile> {
self.profiles
.lock()
.expect("lock should not be poisoned")
.values()
.cloned()
.collect()
}
pub fn get_slowest_operations(&self, limit: usize) -> Vec<PerformanceProfile> {
let mut profiles = self.get_all_profiles();
profiles.sort_by(|a, b| b.total_duration.cmp(&a.total_duration));
profiles.truncate(limit);
profiles
}
pub fn generate_report(&self) -> String {
let profiles = self.get_all_profiles();
if profiles.is_empty() {
return "No profiling data available".to_string();
}
let mut report = String::from("Performance Profile Report\n");
report.push_str(&format!("{}\n", "=".repeat(80)));
for profile in profiles {
report.push_str(&format!(
"{:<30} | Count: {:>6} | Total: {:>8.2}ms | Avg: {:>8.2}ms | Self: {:>6.1}%\n",
profile.name,
profile.invocation_count,
profile.total_duration.as_secs_f64() * 1000.0,
profile.avg_duration().as_secs_f64() * 1000.0,
profile.self_time_percentage()
));
}
report
}
pub fn reset(&self) {
self.profiles
.lock()
.expect("lock should not be poisoned")
.clear();
self.active_spans
.lock()
.expect("lock should not be poisoned")
.clear();
}
}
impl Default for PerformanceProfiler {
fn default() -> Self {
Self::new()
}
}
impl Clone for PerformanceProfiler {
fn clone(&self) -> Self {
Self {
profiles: Arc::clone(&self.profiles),
active_spans: Arc::clone(&self.active_spans),
}
}
}
pub struct ProfileSpan<'a> {
profiler: &'a PerformanceProfiler,
}
impl<'a> ProfileSpan<'a> {
pub fn new(profiler: &'a PerformanceProfiler, name: impl Into<String>) -> Self {
profiler.start_span(name);
Self { profiler }
}
}
impl<'a> Drop for ProfileSpan<'a> {
fn drop(&mut self) {
self.profiler.end_span();
}
}
}