runat 0.2.2

A distributed job scheduler for Rust
Documentation
# RunAt


A distributed job scheduler for Rust with PostgreSQL backend support.

## Features


- **Distributed Job Scheduling**: Run jobs across multiple workers with PostgreSQL-backed coordination
- **Application Context**: Pass database connections, config, and other state to job handlers
- **Cron Support**: Schedule recurring jobs using cron expressions
- **Failed Jobs Queue**: Failed jobs are moved to a separate queue
- **Retry Mechanisms**: Built-in exponential backoff and custom retry strategies
- **Type-Safe Jobs**: Leverage Rust's type system for job definitions
- **Async/Await**: Built on Tokio for efficient async job execution

## Installation


Add this to your `Cargo.toml`:

```toml
[dependencies]
runat = "0.2.2"
```

For PostgreSQL support (enabled by default):

```toml
[dependencies]
runat = { version = "0.2.2", features = ["postgres"] }
```

Optional features:

- `postgres` - PostgreSQL backend (enabled by default)
- `tracing` - Tracing support for observability

## Quick Start


### Define Your Application Context


The context allows you to pass shared state (database pools, config, HTTP clients, etc.) to your job handlers:

```rust
use sqlx::PgPool;

#[derive(Clone)]

pub struct AppContext {
    pub db: PgPool,
    pub api_key: String,
}
```

### Define a Job


```rust
use runat::{BackgroundJob, Executable, JobResult};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};

// Define your job struct
#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]

pub struct SendEmailJob {
    pub to: String,
    pub subject: String,
    pub body: String,
}

// Implement the job logic with your context type
#[async_trait]

impl Executable<AppContext> for SendEmailJob {
    async fn execute(&mut self, ctx: &AppContext) -> JobResult<()> {
        // Access your database, config, etc. through ctx
        println!("Sending email to {}: {}", self.to, self.subject);

        // Example: use ctx.db for database operations
        // sqlx::query("INSERT INTO sent_emails ...").execute(&ctx.db).await?;

        Ok(())
    }
}
```

### Create the Queue and Run Workers


```rust
use runat::{IntoJob, JobQueue, JobQueueConfig, PostgresDatastore, JobResult};
use sqlx::postgres::PgPoolOptions;
use std::sync::Arc;

#[tokio::main]

async fn main() -> JobResult<()> {
    // Connect to PostgreSQL
    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect("postgres://user:pass@localhost/db")
        .await?;

    // Initialize datastore and run migrations
    let datastore = PostgresDatastore::new(pool.clone()).await?;
    datastore.migrate().await?;

    // Create your application context
    let ctx = AppContext {
        db: pool,
        api_key: "secret".to_string(),
    };

    // Create job queue with context
    let queue = JobQueue::new(
        Arc::new(datastore),
        JobQueueConfig::default(),
        ctx,
    );

    // Register job handlers before starting workers
    queue.register::<SendEmailJob>()?;

    // Enqueue a job
    queue.enqueue(
        SendEmailJob {
            to: "user@example.com".to_string(),
            subject: "Welcome!".to_string(),
            body: "Thanks for signing up!".to_string(),
        }.job()?
    ).await?;

    // Start worker in background
    let queue_clone = queue.clone();
    tokio::spawn(async move {
        queue_clone.start_worker().await
    });

    Ok(())
}
```

### Scheduled Jobs with Cron


```rust
// Schedule a job to run every 10 seconds
queue.enqueue(
    SendEmailJob {
        to: "admin@example.com".to_string(),
        subject: "Daily Report".to_string(),
        body: "Here's your daily report".to_string(),
    }
    .job()?
    .cron("*/10 * * * * * *")?
).await?;
```

### Using Context in Jobs


Jobs receive a reference to the context during execution, giving access to shared resources:

```rust
use runat::{BackgroundJob, Executable, JobResult};
use serde::{Deserialize, Serialize};
use async_trait::async_trait;

#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]

pub struct ProcessPayment {
    pub user_id: String,
    pub amount: f64,
}

#[async_trait]

impl Executable<AppContext> for ProcessPayment {
    async fn execute(&mut self, ctx: &AppContext) -> JobResult<()> {
        // Use the database from context
        sqlx::query("INSERT INTO payments (user_id, amount) VALUES ($1, $2)")
            .bind(&self.user_id)
            .bind(self.amount)
            .execute(&ctx.db)
            .await?;

        // Use other context fields
        println!("Using API key: {}", ctx.api_key);

        Ok(())
    }

    // Optional: pre/post execution hooks also receive context
    async fn pre_execute(&mut self, ctx: &AppContext) {
        println!("About to process payment using db pool");
    }

    async fn post_execute(&mut self, ctx: &AppContext, result: JobResult<()>) -> JobResult<()> {
        if result.is_err() {
            // Log failure to database
        }
        result
    }
}
```

### Job Registration


**Important**: You must register job handlers with the queue before workers can process them.

```rust
// Register all job types your workers will process
queue.register::<SendEmailJob>()?;
queue.register::<ProcessPayment>()?;

// Then start your workers
queue.start_worker().await?;
```

If a worker encounters a job type that hasn't been registered, it will fail the job with:

```
No handler registered for job type: SendEmailJob. Call queue.register::<T>() before starting workers.
```

### Running Workers


**Option 1: Run worker directly from the queue (recommended)**

```rust
let queue_clone = queue.clone();
tokio::spawn(async move {
    queue_clone.start_worker().await
});
```

**Option 2: Create a worker instance**

```rust
let worker = queue.worker();
tokio::spawn(async move {
    worker.run().await
});
```

### Retry Strategies


```rust
use runat::Retry;
use chrono::Duration;

// Create a job with retry on failure
queue.enqueue(
    SendEmailJob {
        to: "user@example.com".to_string(),
        subject: "Welcome!".to_string(),
        body: "Thanks for signing up!".to_string(),
    }
    .job()?
    .set_max_attempts(3)
    .retry(Retry::fixed(Duration::seconds(30)))
).await?;
```

### Simple Jobs Without Context


If your jobs don't need shared state, use `()` as the context type:

```rust
#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]

pub struct SimpleJob {
    pub message: String,
}

#[async_trait]

impl Executable<()> for SimpleJob {
    async fn execute(&mut self, _ctx: &()) -> JobResult<()> {
        println!("{}", self.message);
        Ok(())
    }
}

// Create queue with unit context
let queue = JobQueue::with_datastore(Arc::new(datastore), ());
```

## Running Tests


```bash
cargo test
```