kojin
Async distributed task queue for Rust — the equivalent of Celery / Dramatiq (Python), BullMQ (Node.js), Sidekiq (Ruby), and Machinery (Go).
Features
- Async-first — built on Tokio, designed for
async/awaitfrom the ground up #[kojin::task]— proc-macro to define tasks from plain async functions- Pluggable broker — trait-based broker abstraction (Redis included, bring your own)
- Workflows — chain, group, chord orchestration with
chain![],group![]macros - Result backends — Memory, Redis, PostgreSQL for storing task results and coordinating workflows
- Cron scheduling — periodic task execution with standard cron expressions
- Middleware — composable pre/post-execution hooks (tracing, metrics, rate limiting, OpenTelemetry)
- AMQP broker — RabbitMQ support with automatic topology, dead-letter queues, and delayed scheduling
- SQS broker — Amazon SQS support with standard and FIFO queues, long polling, and delayed scheduling
- Deduplication — content-based or key-based dedup middleware with configurable TTL
- Priority queues — per-message priority (0–9) via AMQP broker
- Dashboard — built-in JSON API for monitoring queues, metrics, and task results
- Graceful shutdown — cooperative cancellation via
CancellationToken - Weighted queues — prioritize work across multiple queues
- Configurable retries — per-task retry limits with backoff strategies
Quick Start
Add to your Cargo.toml:
[]
= "0.4"
= { = "1", = ["full"] }
= { = "1", = ["derive"] }
= "0.1"
Define a task, enqueue it, and run a worker:
use async_trait;
use ;
use ;
async
Workflows
Kojin supports Celery-style workflow primitives — chain!, group!, and chord — for composing tasks into DAGs. A result backend is required.
use ;
// Signatures describe a task invocation (name, queue, payload)
let fetch = new;
let parse = new;
let store = new;
// Chain — sequential: fetch → parse → store
let pipeline = chain!;
// Group — parallel: fetch three URLs concurrently
let batch = group!;
// Chord — parallel + callback: fetch all, then aggregate
let aggregate = new;
let workflow = chord;
// Submit to the broker
let handle = pipeline.apply.await?;
See kojin/examples/workflows.rs for a complete runnable example.
Cron Scheduling
With the cron feature flag, you can schedule periodic tasks using standard cron expressions:
[]
= { = "0.4", = ["cron"] }
use ;
let worker = new
.
.result_backend
.cron
.build;
worker.run.await;
See kojin/examples/cron.rs for a complete runnable example.
Middleware & Observability
Kojin ships with composable middleware for tracing, metrics, rate limiting, and OpenTelemetry:
use NonZeroU32;
use ;
let metrics = new;
let worker = new
.
.middleware // structured logs
.middleware // in-process counters
.middleware // token-bucket
.build;
// After processing, query counters:
println!;
OtelMiddleware (behind the otel feature) emits kojin.task.started, kojin.task.succeeded, kojin.task.failed counters and a kojin.task.duration histogram to any configured OpenTelemetry MeterProvider.
See kojin/examples/observability.rs for a complete runnable example.
Deduplication
With the dedup feature flag, you can prevent duplicate task execution using content-based or key-based deduplication:
[]
= { = "0.4", = ["dedup"] }
use Duration;
use ;
// Add dedup middleware with a 5-minute TTL
let dedup = new;
let worker = new
.
.middleware
.build;
// Key-based dedup — tasks with the same key within TTL are rejected
let msg = new
.with_dedup_key;
// Content-based dedup — auto-generates key from task name + payload hash
let msg = new
.with_content_dedup;
AMQP Broker (RabbitMQ)
With the amqp feature flag, you can use RabbitMQ as a production broker:
[]
= { = "0.4", = ["amqp"] }
use ;
let config = new;
let broker = new.await?;
let worker = new
.
.build;
AmqpBroker automatically declares the full topology: a direct exchange (kojin.direct), per-queue dead-letter queues (kojin.dlq.*), and a delayed-message exchange (kojin.delayed) for scheduled tasks.
Priority Queues
Enable per-message priority (0–9, higher = more urgent) by setting max_priority on the config:
let config = new
.with_max_priority;
let broker = new.await?;
// Enqueue a high-priority task
let msg = new
.with_priority;
broker.enqueue.await?;
Note: Changing
max_priorityon an existing queue requires deleting and recreating the queue in RabbitMQ, asx-max-priorityis an immutable queue argument.
See kojin/examples/amqp.rs for a complete runnable example.
SQS Broker (Amazon SQS)
With the sqs feature flag, you can use Amazon SQS as a broker:
[]
= { = "0.4", = ["sqs"] }
use ;
let sdk_config = load_defaults.await;
let config = new;
let broker = new;
let worker = new
.
.build;
Feature notes:
- FIFO queues — automatically detected from
.fifosuffix; usesMessageGroupIdandMessageDeduplicationId - Delayed scheduling — uses SQS
DelaySeconds(max 15 minutes); for longer delays, re-enqueues periodically - Long polling — configurable
wait_time_seconds(default 20s) for efficient message retrieval - No priority support — SQS does not support message priority; use AMQP if you need priority queues
See kojin/examples/sqs.rs for a complete runnable example.
Dashboard
With the dashboard feature flag, you get a built-in JSON API for monitoring:
[]
= { = "0.4", = ["dashboard"] }
use Arc;
use ;
let broker = new;
let metrics = new;
let state = new
.with_metrics;
let _handle = spawn_dashboard;
// GET /api/queues — list all queues with lengths
// GET /api/queues/{name} — single queue detail
// GET /api/metrics — tasks started/succeeded/failed
// GET /api/tasks/{id} — task result (requires result backend)
See kojin/examples/dashboard.rs for a complete runnable example.
Crate Architecture
| Crate | Description |
|---|---|
kojin |
Facade crate — re-exports everything, provides KojinBuilder |
kojin-core |
Core traits (Task, Broker, Middleware), worker runtime, workflows, types |
kojin-macros |
#[kojin::task] proc-macro |
kojin-redis |
Redis broker + result backend via deadpool-redis |
kojin-postgres |
PostgreSQL result backend via sqlx |
kojin-amqp |
RabbitMQ broker via lapin — topology, DLQ, delayed messages |
kojin-sqs |
Amazon SQS broker — standard & FIFO queues, long polling |
kojin-dashboard |
JSON API monitoring dashboard via axum |
License
Licensed under either of Apache License, Version 2.0 or MIT License at your option.