Skip to main content

Crate kiomq

Crate kiomq 

Source
Expand description

KioMQ logo

A task queue & orchestration library for Rust
Built for Tokio · Scale up (many workers per machine) · Scale out (Redis + workers across machines)

crates.io CI docs.rs LICENSE


KioMQ provides the core building blocks to run background work inside your Tokio services:

  • A Queue to enqueue tasks/jobs.
  • One or more Workers to process jobs concurrently.
  • Pluggable Stores: InMemoryStore (ephemeral), RedisStore (durable, distributed), RocksDB (under construction).
  • Scheduling – delays, cron expressions, repeat policies.
  • Reliability – retries, backoff strategies, stalled-job detection.
  • Observability – events, progress updates, per-worker metrics.

Inspired by BullMQ’s ergonomics, implemented as an embeddable Rust library.


Contents: Key features · Tokio runtime · Installation · Quick-start · Panics & errors · Configuration · Events & observability · Progress updates · Backends · Benchmarks · Testing · License


§Key features

  • Async & sync processors – async for I/O-bound work, sync spawn_blocking for CPU-bound.
  • Configurable concurrency – defaults to CPU count.
  • Event-driven idle workers – near-zero CPU when empty, using lock-free atomics and Notify.
  • Bulk enqueueQueue::bulk_add / Queue::bulk_add_only.
  • Priority & delayed jobs – by score or after N ms / cron schedule.
  • Repeat policies – cron, backoff-driven, fixed interval, immediate.

§Tokio runtime requirements

Multi-thread runtime is recommended:

tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

For tests:

#[tokio::test(flavor = "multi_thread")]
async fn my_test() { /* ... */ }

§Installation

[dependencies]
kiomq = "0.1.2"

Cargo features: redis-store (default), rocksdb-store, tracing.


§Quick-start

§Async worker
use std::sync::Arc;
use kiomq::{InMemoryStore, Job, KioError, Queue, Worker, WorkerOpts};

#[tokio::main]
async fn main() -> kiomq::KioResult<()> {
    let store: InMemoryStore<u64, u64, ()> = InMemoryStore::new(None, "demo");
    let queue = Queue::new(store, None).await?;

    let processor = |_store: Arc<_>, job: Job<u64, u64, ()>| async move {
        Ok::<u64, KioError>(job.data.unwrap_or_default() * 2)
    };

    let worker = Worker::new_async(&queue, processor, Some(WorkerOpts::default()))?;
    worker.run()?;

    queue.bulk_add_only((0..10u64).map(|i| (format!("job-{i}"), None, i))).await?;

    let updating_metrics = queue.current_metrics.clone();
    // wait for all jobs to complete
    while !updating_metrics.all_jobs_completed()  {
        tokio::task::yield_now().await;
    }
    worker.close();
    Ok(())
}
§Sync worker

Sync processors run on a blocking thread via tokio::task::spawn_blocking — suitable for heavy computation, hashing, blocking FFI, etc.

use std::sync::Arc;
use kiomq::{InMemoryStore, Job, KioError, Queue, Worker, WorkerOpts};

#[tokio::main]
async fn main() -> kiomq::KioResult<()> {
    let store: InMemoryStore<u64, u64, ()> = InMemoryStore::new(None, "demo-sync");
    let queue = Queue::new(store, None).await?;

    let processor = |_store: Arc<_>, job: Job<u64, u64, ()>| {
        Ok::<u64, KioError>(job.data.unwrap_or_default() * 2)
    };

    let worker = Worker::new_sync(&queue, processor, Some(WorkerOpts::default()))?;
    worker.run()?;

    queue.add_job("compute", 42u64, None).await?;

    let updating_metrics = queue.current_metrics.clone();
    // wait for all jobs to complete
    while !updating_metrics.all_jobs_completed()  {
        tokio::task::yield_now().await;
    }
    worker.close();
    Ok(())
}

§Panics & errors in the processor

A processor signals a job failure by returning Err. The worker catches the error, marks the job as failed, and — depending on the attempts configuration — retries it with the configured backoff.

Panics inside a processor are also caught by the worker and treated as failures, so a rogue job cannot bring down the whole process.

§Async backtrace with #[framed]

Annotate your processor with framed (re-exported from async_backtrace as kiomq::framed) for richer async stack traces:

use std::sync::Arc;
use kiomq::{framed, InMemoryStore, Job, KioError, Queue, Store, Worker, WorkerOpts};

#[framed]
async fn my_processor<S: Store<u64, u64, ()>>(
    _store: Arc<S>,
    job: Job<u64, u64, ()>,
) -> Result<u64, KioError> {
    let data = job.data.unwrap_or_default();
    if data == 0 {
        // Returning Err marks the job as failed and triggers a retry
        // (up to `attempts` times, as set in QueueOpts / JobOptions).
        return Err(std::io::Error::new(std::io::ErrorKind::Other, "zero input").into());
    }
    Ok(data * 2)
}

#[tokio::main]
async fn main() -> kiomq::KioResult<()> {
    let store: InMemoryStore<u64, u64, ()> = InMemoryStore::new(None, "framed-demo");
    let queue = Queue::new(store, None).await?;

    let worker = Worker::new_async(&queue, |s, j| my_processor(s, j), Some(WorkerOpts::default()))?;
    worker.run()?;

    queue.add_job("job-1", 42u64, None).await?;

    let updating_metrics = queue.current_metrics.clone();
    // while for all jobs to complete
    while !updating_metrics.all_jobs_completed()  {
        tokio::task::yield_now().await;
    }
    worker.close();
    Ok(())
}

§Configuration

§Queue options (QueueOpts)
use kiomq::{BackOffJobOptions, BackOffOptions, KeepJobs, QueueEventMode, QueueOpts,
            RemoveOnCompletionOrFailure};

let queue_opts = QueueOpts {
    attempts: 2,
    default_backoff: Some(BackOffJobOptions::Opts(BackOffOptions {
        type_: Some("exponential".to_owned()),
        delay: Some(200),
    })),
    remove_on_fail: Some(RemoveOnCompletionOrFailure::Opts(KeepJobs {
        age: Some(3600), // keep for 1 hour
        count: None,
    })),
    event_mode: Some(QueueEventMode::PubSub),
    ..Default::default()
};
§Per-job options (JobOptions)
use kiomq::JobOptions;

let opts = JobOptions { attempts: 5, ..Default::default() };
§Worker options (WorkerOpts)
use kiomq::WorkerOpts;

let opts = WorkerOpts { concurrency: 8, ..Default::default() };

§Events & observability

Subscribe to job-state events on the queue:

use kiomq::{EventParameters, InMemoryStore, JobState, Queue};

// Subscribe to a specific state.
let _listener_id = queue.on(JobState::Completed, |evt| async move { let _ = evt; });

// Subscribe to all events.
let _listener_id2 = queue.on_all_events(|evt: EventParameters<u64, ()>| async move { let _ = evt; });

// Remove a listener when no longer needed.
queue.remove_event_listener(_listener_id);

§Progress updates

Report progress from inside your processor using Job.update_progress:

use std::sync::Arc;
use kiomq::{Job, KioError, Store};

async fn processor<S: Store<u64, u64, u8>>(
    store: Arc<S>,
    mut job: Job<u64, u64, u8>,
) -> Result<u64, KioError> {
    // update_progress persists to the store and emits a progress event.
    job.update_progress(50u8, store.as_ref())?; // 50% done
    Ok(job.data.unwrap_or_default() * 2)
}

§Backends

§In-memory

InMemoryStore – ideal for tests, dev, and short-lived tasks. No external dependencies.

§Redis (default feature)

Durable, distributed workloads. Requires a running Redis instance:

docker run --rm -p 6379:6379 redis:latest
use kiomq::{Config, KioResult, Queue, RedisStore};

#[tokio::main]
async fn main() -> KioResult<()> {
    // `Config` can be imported from `kiomq` or from `deadpool_redis`
    // (if you already use it in your app).
    let config = Config::default();

    let store = RedisStore::new(None, "my-queue", &config).await?;
    let queue:Queue<(), (), (),_> = Queue::new(store, None).await?;
    // ... worker logic below here
    Ok(())
}
§RocksDB (under construction)

Embedded persistence – work in progress.

[dependencies]
kiomq = { "0.1", default-features=false, features=["rocksdb-store"] }
use kiomq::{temporary_rocks_db, RocksDbStore, KioResult, Queue, RedisStore};
use std::sync::Arc;
#[tokio::main]
async fn main() -> KioResult<()> {
// replace ``temporary_rocks_db`` with a real database Instantiation (check out the rocksdb-crate)
    let db = Arc::new(temporary_rocks_db());

    let store = RocksDbStore::new(None, "test", db.clone())?;
    let queue = Queue::new(store, None).await?;
// ... worker logic below here
    Ok(())
}

§Benchmarks

cargo bench

§Testing

cargo test

§License

MIT — see LICENSE

Modules§

macros
Re-exports of test helpers for verifying custom Store implementations.

Macros§

frame
Include the annotated async expression in backtraces and taskdumps.
queue_store_suite
Macro that generates the queue integration test suite for any Store implementation.
worker_store_suite
Macro that generates the worker integration test suite for any Store implementation.

Structs§

BackOff
Registry of backoff strategies used to schedule job retries.
BackOffOptions
Detailed backoff configuration.
BacktraceCatcher
Installs a panic hook and drives a future to completion, catching both panics and errors.
CaughtPanicInfo
Information captured about a panic.
Configredis-store
Configuration object.
FailedDetails
Details recorded when a job permanently fails.
InMemoryStore
An in-memory Store implementation.
Job
A unit of work managed by a crate::Queue.
JobMetrics
Timing and attempt statistics for a completed job.
JobOptions
Per-job configuration options.
JobToken
An opaque lock token that identifies a worker’s ownership of a job.
KeepJobs
Fine-grained retention policy for completed/failed jobs.
Queue
A task queue that holds and manages jobs.
QueueMetrics
A live snapshot of queue state counts.
QueueOpts
Queue-level configuration.
RedisStoreredis-store
A Store implementation backed by Redis.
TimedMap
A concurrent map that can automatically evict entries after a configurable TTL.
Timer
A repeating async timer that fires a callback at a fixed interval.
Trace
A single stack-trace entry captured when a job fails.
Worker
A job processor that consumes jobs from a Queue.
WorkerMetrics
Aggregated metrics for a single worker instance.
WorkerOpts
Configuration options for a Worker.

Enums§

BackOffJobOptions
Specifies the backoff policy for job retries.
CaughtError
Represents an error caught during async job processing, including panics, errors, and join failures.
CollectionSuffix
Identifies a named collection (list, set, sorted-set, hash, or key) in the backing store.
EventParameters
The payload delivered to event listeners registered on a Queue.
JobError
Errors arising from individual job operations.
JobField
A typed field update applied to a job record in the store.
JobState
The lifecycle state of a job within the queue.
KioError
The top-level error type returned by most KioMQ operations.
MoveToActiveResult
The outcome of a single Queue::move_to_active call.
ProcessedResult
The outcome of a single processor invocation.
QueueError
Errors arising from queue-level operations.
QueueEventMode
Controls how events are published and consumed within a queue.
RemoveOnCompletionOrFailure
Controls whether—and how many—completed or failed job records are kept.
Repeat
Repeat / scheduling policy for a job.
RetryOptions
Specifies how a job should be retried after a failure or completion.
WorkerError
Errors specific to Worker lifecycle operations.

Traits§

Store
Backend storage interface implemented by all KioMQ store backends.

Functions§

fetch_redis_passredis-store
Reads the Redis password from the REDIS_PASSWORD environment variable.
get_queue_metricsredis-store
Reads all queue-state counters from Redis in a single atomic pipeline call.

Type Aliases§

Counter
A shared atomic counter used to track job IDs and other queue counters.
Dt
alias for DateTime<Utc>
KioResult
Convenience alias for Result<T, KioError>.
StoredFn
A per-attempt delay function: receives the attempt count and returns the delay in milliseconds.

Attribute Macros§

framed
Attribute macro that wraps an async fn so it appears in async_backtrace stack traces.