runat 0.2.0

A distributed job scheduler for Rust
Documentation

RunAt

A distributed job scheduler for Rust with PostgreSQL backend support.

Features

  • Distributed Job Scheduling: Run jobs across multiple workers with PostgreSQL-backed coordination
  • Cron Support: Schedule recurring jobs using cron expressions
  • Retry Mechanisms: Built-in exponential backoff and custom retry strategies
  • Type-Safe Jobs: Leverage Rust's type system for job definitions
  • Async/Await: Built on Tokio for efficient async job execution

Installation

Add this to your Cargo.toml:

[dependencies]

runat = "0.2.0"

For PostgreSQL support (enabled by default):

[dependencies]

runat = { version = "0.2.0", features = ["postgres"] }

Optional features:

  • postgres - PostgreSQL backend (enabled by default)
  • tracing - Tracing support for observability

Quick Start

Basic Job

use runat::{BackgroundJob, Executable, JobQueue, PostgresDatastore, JobResult};
use sqlx::postgres::PgPoolOptions;
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};

// Define your job struct
#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]
pub struct SendEmailJob {
    pub to: String,
    pub subject: String,
    pub body: String,
}

// Implement the job logic
#[async_trait]
impl Executable for SendEmailJob {
    type Output = JobResult<()>;

    async fn execute(&mut self) -> Self::Output {
        println!("Sending email to {}: {}", self.to, self.subject);
        // Your email sending logic here
        Ok(())
    }
}

#[tokio::main]
async fn main() -> JobResult<()> {
    // Connect to PostgreSQL
    let database_url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://user:pass@localhost/db".to_string());

    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect(&database_url)
        .await?;

    // Initialize datastore and run migrations
    let datastore = PostgresDatastore::new(pool).await?;
    datastore.migrate().await?;

    // Create job queue (JobQueue is Clone, so no Arc needed)
    let queue = JobQueue::with_datastore(Arc::new(datastore));

    // Register job handlers before starting workers
    queue.register::<SendEmailJob>()?;

    // Enqueue a job
    queue.enqueue(
        SendEmailJob {
            to: "user@example.com".to_string(),
            subject: "Welcome!".to_string(),
            body: "Thanks for signing up!".to_string(),
        }.job()?
    ).await?;

    Ok(())
}

Scheduled Jobs with Cron

// Register the job handler first
queue.register::<SendEmailJob>()?;

// Schedule a job to run every 10 seconds
queue.enqueue(
    SendEmailJob {
        to: "admin@example.com".to_string(),
        subject: "Daily Report".to_string(),
        body: "Here's your daily report".to_string(),
    }
    .job()?
    .cron("*/10 * * * * * *")?
).await?;

Custom Job Implementation

use runat::{BackgroundJob, Executable, JobResult};
use serde::{Deserialize, Serialize};
use async_trait::async_trait;

#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]
pub struct ProcessPayment {
    pub user_id: String,
    pub amount: f64,
}

#[async_trait]
impl Executable for ProcessPayment {
    type Output = JobResult<String>;

    async fn execute(&mut self) -> Self::Output {
        println!("Processing ${} payment for user {}", self.amount, self.user_id);
        // Your payment processing logic here
        Ok(format!("Payment processed for {}", self.user_id))
    }
}

// Don't forget to register before using
// queue.register::<ProcessPayment>()?;

Job Registration

Important: You must register job handlers with the queue before workers can process them. Each queue instance has its own registry.

// Register all job types your workers will process
queue.register::<SendEmailJob>()?;
queue.register::<ProcessPayment>()?;
queue.register::<AnotherJobType>()?;

// Then start your workers
queue.start_worker().await?;

If a worker encounters a job type that hasn't been registered, it will fail the job with a helpful error message:

No handler registered for job type: SendEmailJob. Call queue.register::<T>() before starting workers.

Running Workers

You can run workers in two ways:

Option 1: Run worker directly from the queue (recommended)

// JobQueue is Clone, so you can clone it before moving into the task
let queue_clone = queue.clone();
tokio::spawn(async move {
    queue_clone.start_worker().await
});

Option 2: Create a worker instance

use runat::JobWorker;

// Create a worker from the queue
let worker = queue.worker();

// Run the worker
tokio::spawn(async move {
    worker.run().await
});

Retry Strategies

use runat::Retry;
use chrono::Duration;

// Create a job with exponential backoff retry
let mut job = Job::new(email_job)?;
job.max_attempts = 5;
job = job.retry(Retry::exponential(Duration::seconds(5)));

queue.enqueue(job).await?;

Running Tests

cargo test