kojin 0.3.0

Async distributed task queue for Rust — like Celery, BullMQ, Sidekiq, but for Rust
Documentation

kojin

Crates.io docs.rs License

Async distributed task queue for Rust — the equivalent of Celery / Dramatiq (Python), BullMQ (Node.js), Sidekiq (Ruby), and Machinery (Go).

Features

  • Async-first — built on Tokio, designed for async/await from the ground up
  • #[kojin::task] — proc-macro to define tasks from plain async functions
  • Pluggable broker — trait-based broker abstraction (Redis included, bring your own)
  • Workflows — chain, group, chord orchestration with chain![], group![] macros
  • Result backends — Memory, Redis, PostgreSQL for storing task results and coordinating workflows
  • Cron scheduling — periodic task execution with standard cron expressions
  • Middleware — composable pre/post-execution hooks (tracing, metrics, rate limiting, OpenTelemetry)
  • AMQP broker — RabbitMQ support with automatic topology, dead-letter queues, and delayed scheduling
  • Dashboard — built-in JSON API for monitoring queues, metrics, and task results
  • Graceful shutdown — cooperative cancellation via CancellationToken
  • Weighted queues — prioritize work across multiple queues
  • Configurable retries — per-task retry limits with backoff strategies

Quick Start

Add to your Cargo.toml:

[dependencies]
kojin = "0.3"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
async-trait = "0.1"

Define a task, enqueue it, and run a worker:

use async_trait::async_trait;
use kojin::{Broker, KojinBuilder, MemoryBroker, Task, TaskContext, TaskMessage, TaskResult};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct SendEmail {
    to: String,
    subject: String,
}

#[async_trait]
impl Task for SendEmail {
    const NAME: &'static str = "send_email";
    const QUEUE: &'static str = "emails";
    const MAX_RETRIES: u32 = 3;

    type Output = String;

    async fn run(&self, _ctx: &TaskContext) -> TaskResult<Self::Output> {
        println!("Sending email to {}", self.to);
        Ok(format!("Email sent to {}", self.to))
    }
}

#[tokio::main]
async fn main() {
    let broker = MemoryBroker::new();

    // Enqueue a task
    let msg = TaskMessage::new(
        "send_email",
        "emails",
        serde_json::to_value(&SendEmail {
            to: "user@example.com".into(),
            subject: "Hello!".into(),
        }).unwrap(),
    );
    broker.enqueue(msg).await.unwrap();

    // Build and run worker
    let worker = KojinBuilder::new(broker)
        .register_task::<SendEmail>()
        .concurrency(4)
        .queues(vec!["emails".into()])
        .build();

    worker.run().await;
}

Workflows

Kojin supports Celery-style workflow primitives — chain!, group!, and chord — for composing tasks into DAGs. A result backend is required.

use kojin::{chain, group, chord, Signature, Canvas, MemoryResultBackend};

// Signatures describe a task invocation (name, queue, payload)
let fetch = Signature::new("fetch_url", "default", serde_json::json!({"url": "..."}));
let parse = Signature::new("parse_html", "default", serde_json::json!(null));
let store = Signature::new("store_result", "default", serde_json::json!(null));

// Chain — sequential: fetch → parse → store
let pipeline = chain![fetch.clone(), parse.clone(), store.clone()];

// Group — parallel: fetch three URLs concurrently
let batch = group![fetch.clone(), fetch.clone(), fetch.clone()];

// Chord — parallel + callback: fetch all, then aggregate
let aggregate = Signature::new("aggregate", "default", serde_json::json!(null));
let workflow = chord(vec![fetch.clone(), fetch.clone()], aggregate);

// Submit to the broker
let handle = pipeline.apply(&broker, &backend).await?;

See examples/workflow_demo.rs for a complete runnable example.

Cron Scheduling

With the cron feature flag, you can schedule periodic tasks using standard cron expressions:

[dependencies]
kojin = { version = "0.3", features = ["cron"] }
use kojin::{KojinBuilder, Signature, MemoryBroker};

let worker = KojinBuilder::new(MemoryBroker::new())
    .register_task::<CleanupTask>()
    .result_backend(backend)
    .cron(
        "nightly-cleanup",
        "0 3 * * *",  // every day at 03:00
        Signature::new("cleanup", "default", serde_json::json!(null)),
    )
    .build();

worker.run().await;

See examples/cron_demo.rs for a complete runnable example.

Middleware & Observability

Kojin ships with composable middleware for tracing, metrics, rate limiting, and OpenTelemetry:

use std::num::NonZeroU32;
use kojin::{KojinBuilder, MemoryBroker, MetricsMiddleware, TracingMiddleware, RateLimitMiddleware};

let metrics = MetricsMiddleware::new();

let worker = KojinBuilder::new(MemoryBroker::new())
    .register_task::<MyTask>()
    .middleware(TracingMiddleware)                                 // structured logs
    .middleware(metrics.clone())                                   // in-process counters
    .middleware(RateLimitMiddleware::per_second(NonZeroU32::new(100).unwrap())) // token-bucket
    .build();

// After processing, query counters:
println!("succeeded: {}", metrics.tasks_succeeded());

OtelMiddleware (behind the otel feature) emits kojin.task.started, kojin.task.succeeded, kojin.task.failed counters and a kojin.task.duration histogram to any configured OpenTelemetry MeterProvider.

See examples/observability.rs for a complete runnable example.

AMQP Broker (RabbitMQ)

With the amqp feature flag, you can use RabbitMQ as a production broker:

[dependencies]
kojin = { version = "0.3", features = ["amqp"] }
use kojin::{AmqpBroker, AmqpConfig, KojinBuilder};

let config = AmqpConfig::new("amqp://guest:guest@localhost:5672/%2f");
let broker = AmqpBroker::new(config, &["default".into()]).await?;

let worker = KojinBuilder::new(broker)
    .register_task::<MyTask>()
    .build();

AmqpBroker automatically declares the full topology: a direct exchange (kojin.direct), per-queue dead-letter queues (kojin.dlq.*), and a delayed-message exchange (kojin.delayed) for scheduled tasks.

See examples/amqp.rs for a complete runnable example.

Dashboard

With the dashboard feature flag, you get a built-in JSON API for monitoring:

[dependencies]
kojin = { version = "0.3", features = ["dashboard"] }
use std::sync::Arc;
use kojin::{DashboardState, MetricsMiddleware, MemoryBroker, spawn_dashboard};

let broker = MemoryBroker::new();
let metrics = MetricsMiddleware::new();

let state = DashboardState::new(Arc::new(broker.clone()))
    .with_metrics(metrics.clone());

let _handle = spawn_dashboard(state, 9090);
// GET /api/queues        — list all queues with lengths
// GET /api/queues/{name} — single queue detail
// GET /api/metrics       — tasks started/succeeded/failed
// GET /api/tasks/{id}    — task result (requires result backend)

See examples/dashboard.rs for a complete runnable example.

Crate Architecture

Crate Description
kojin Facade crate — re-exports everything, provides KojinBuilder
kojin-core Core traits (Task, Broker, Middleware), worker runtime, workflows, types
kojin-macros #[kojin::task] proc-macro
kojin-redis Redis broker + result backend via deadpool-redis
kojin-postgres PostgreSQL result backend via sqlx
kojin-amqp RabbitMQ broker via lapin — topology, DLQ, delayed messages
kojin-dashboard JSON API monitoring dashboard via axum

License

Licensed under either of Apache License, Version 2.0 or MIT License at your option.