Expand description
§Oxanus
Oxanus is job processing library written in Rust doesn’t suck (or at least sucks in a completely different way than other options).
Oxanus goes for simplicity and depth over breadth. It only aims to support a single backend with a simple flow.
§Key Features
- Isolated Queues: Separate job processing queues with independent configurations
- Retrying: Automatic retry of failed jobs with configurable backoff
- Scheduled Jobs: Schedule jobs to run at specific times or after delays
- Dynamic Queues: Create and manage queues at runtime
- Throttling: Control job processing rates with queue-based throttling
- Unique Jobs: Ensure only one instance of a job runs at a time
- Resilient Jobs: Jobs that can survive worker crashes and restarts
- Graceful Shutdown: Clean shutdown of workers with in-progress job handling
- Periodic Jobs: Run jobs on a schedule using cron-like expressions
- Resumable Jobs: Jobs that can be resumed from where they left off when they are retried
§Quick Start
use oxanus::{Context, Storage};
use serde::{Serialize, Deserialize};
// Define your component registry
#[derive(oxanus::Registry)]
struct ComponentRegistry(oxanus::ComponentRegistry<MyContext, MyError>);
// Define your error type
#[derive(Debug, thiserror::Error)]
enum MyError {}
// Define your context
#[derive(Debug, Clone)]
struct MyContext {}
// Define your worker using the derive macro
#[derive(Debug, Serialize, Deserialize, oxanus::Worker)]
struct MyWorker {
data: String,
}
impl MyWorker {
async fn process(&self, _ctx: &Context<MyContext>) -> Result<(), MyError> {
// Process your job here
println!("Processing: {}", self.data);
Ok(())
}
}
// Define your queue using the derive macro
#[derive(Serialize, oxanus::Queue)]
#[oxanus(key = "my_queue", concurrency = 2)]
struct MyQueue;
// Run your worker
#[tokio::main]
async fn main() -> Result<(), oxanus::OxanusError> {
let ctx = Context::value(MyContext {});
let storage = Storage::builder().build_from_env()?;
let config = ComponentRegistry::build_config(&storage)
.with_graceful_shutdown(tokio::signal::ctrl_c());
// Enqueue some jobs
storage.enqueue(MyQueue, MyWorker { data: "hello".into() }).await?;
// Run the worker
oxanus::run(config, ctx).await?;
Ok(())
}For more detailed usage examples, check out the examples directory.
§Web Dashboard
The oxanus-web crate provides a web UI dashboard for monitoring jobs, queues, and cron schedules. It integrates as a nested axum router.
use oxanus_web::OxanusWebState;
// Build your oxanus config as usual
let config = ComponentRegistry::build_config(&storage)
.with_graceful_shutdown(tokio::signal::ctrl_c());
// Create the oxanus-web router
let oxanus_router = oxanus_web::router(OxanusWebState::new(
config.storage.clone(),
config.catalog(),
"/oxanus".to_string(),
));
// Nest it into your existing axum app
let app = your_app_router().nest("/oxanus", oxanus_router);The dashboard exposes these pages:
/- Overview with job stats/busy- Currently processing jobs/queues- All queues with stats/queues/{queue_key}- Jobs in a specific queue/cron- Cron job schedules/scheduled- Scheduled jobs/retries- Jobs pending retry/dead- Dead letter queue
It also provides management actions for wiping queues and deleting individual jobs.
§Core Concepts
§Workers
Workers are the units of work in Oxanus. They can be defined using the #[derive(oxanus::Worker)] macro or by implementing the Worker trait manually. Workers define the processing logic for jobs.
Worker attributes:
#[oxanus(max_retries = 3)]- Set maximum retry attempts#[oxanus(retry_delay = 5)]- Set retry delay in seconds#[oxanus(unique_id = "worker_{id}")]- Define unique job identifiers#[oxanus(on_conflict = Skip)]- Handle job conflicts (Skip or Replace)#[oxanus(cron(schedule = "*/5 * * * * *", queue = MyQueue))]- Schedule periodic jobs
§Queues
Queues are the channels through which jobs flow. They can be defined using the #[derive(oxanus::Queue)] macro or by implementing the Queue trait manually.
Queues can be:
- Static: Defined at compile time with a fixed key
- Dynamic: Created at runtime with each instance being a separate queue (requires struct fields)
Queue attributes:
#[oxanus(key = "my_queue")]- Set static queue key#[oxanus(prefix = "dynamic")]- Set prefix for dynamic queues#[oxanus(concurrency = 2)]- Set concurrency limit#[oxanus(throttle(window_ms = 2000, limit = 5))]- Configure throttling
§Component Registry
The component registry automatically discovers and registers all workers and queues in your application. Use #[derive(oxanus::Registry)] to create a registry and ComponentRegistry::build_config() to build the configuration.
§Storage
The Storage trait provides the interface for job persistence. It handles:
- Job enqueueing
- Job scheduling
- Job state management
- Queue monitoring
Storage is built using Storage::builder().build_from_env() which reads the REDIS_URL environment variable.
§Context
The context provides shared state and utilities to workers. It can include:
- Database connections
- Configuration
- Shared resources
- Job state (for resumable jobs)
§Configuration
Configuration is done through the Config builder, which allows you to:
- Automatically register queues and workers via the component registry
- Set up graceful shutdown
- Configure exit conditions
§Error Handling
Oxanus uses a custom error type OxanusError that covers all possible error cases in the library.
Workers can define their own error type that implements std::error::Error.
§Prometheus Metrics
Enable the prometheus feature to expose metrics:
let metrics = storage.metrics().await?;
let output = metrics.encode_to_string()?;
// Serve `output` on your metrics endpointStructs§
- Catalog
- Catalog of all registered workers and queues.
- Config
- Context
- Cron
Worker Info - Information about a registered cron worker.
- Dynamic
Queue Stats - Statistics for a dynamic sub-queue.
- Job
- JobEnvelope
- JobMeta
- JobState
- Process
- Information about an Oxanus worker process.
- Queue
Config - Queue
Info - Information about a registered queue.
- Queue
List Opts - Options for listing jobs in a queue.
- Queue
Stats - Statistics for a specific queue.
- Queue
Throttle - Queue
Throttle Info - Throttle configuration for a queue.
- Stats
- Overall statistics for the Oxanus job queue system.
- Stats
Global - Global aggregate statistics.
- Stats
Processing - Information about a job currently being processed.
- Storage
- Storage provides the main interface for job management in Oxanus.
- Storage
Builder - Storage
Builder Timeouts - Worker
Config - Worker
Info - Information about a registered worker.
Enums§
Traits§
Functions§
- drain
- Drains a queue of jobs.
- job_
factory - run
- Runs the Oxanus worker system with the given configuration and context.