runat 0.2.1

A distributed job scheduler for Rust
Documentation
# RunAt


A distributed job scheduler for Rust with PostgreSQL backend support.

## Features


- **Distributed Job Scheduling**: Run jobs across multiple workers with PostgreSQL-backed coordination
- **Cron Support**: Schedule recurring jobs using cron expressions
- **Failed Jobs Queue**: Failed jobs are moved to a separate queue
- **Retry Mechanisms**: Built-in exponential backoff and custom retry strategies
- **Type-Safe Jobs**: Leverage Rust's type system for job definitions
- **Async/Await**: Built on Tokio for efficient async job execution

## Installation


Add this to your `Cargo.toml`:

```toml
[dependencies]
runat = "0.2.1"
```

For PostgreSQL support (enabled by default):

```toml
[dependencies]
runat = { version = "0.2.1", features = ["postgres"] }
```

Optional features:

- `postgres` - PostgreSQL backend (enabled by default)
- `tracing` - Tracing support for observability

## Quick Start


### Basic Job


```rust
use runat::{BackgroundJob, Executable, JobQueue, PostgresDatastore, JobResult};
use sqlx::postgres::PgPoolOptions;
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};

// Define your job struct
#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]

pub struct SendEmailJob {
    pub to: String,
    pub subject: String,
    pub body: String,
}

// Implement the job logic
#[async_trait]

impl Executable for SendEmailJob {
    async fn execute(&mut self) -> JobResult<()> {
        println!("Sending email to {}: {}", self.to, self.subject);
        // Your email sending logic here
        Ok(())
    }
}

#[tokio::main]

async fn main() -> JobResult<()> {
    // Connect to PostgreSQL
    let database_url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://user:pass@localhost/db".to_string());

    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect(&database_url)
        .await?;

    // Initialize datastore and run migrations
    let datastore = PostgresDatastore::new(pool).await?;
    datastore.migrate().await?;

    // Create job queue (JobQueue is Clone, so no Arc needed)
    let queue = JobQueue::with_datastore(Arc::new(datastore));

    // Register job handlers before starting workers
    queue.register::<SendEmailJob>()?;

    // Enqueue a job
    queue.enqueue(
        SendEmailJob {
            to: "user@example.com".to_string(),
            subject: "Welcome!".to_string(),
            body: "Thanks for signing up!".to_string(),
        }.job()?
    ).await?;

    Ok(())
}
```

### Scheduled Jobs with Cron


```rust
// Register the job handler first
queue.register::<SendEmailJob>()?;

// Schedule a job to run every 10 seconds
queue.enqueue(
    SendEmailJob {
        to: "admin@example.com".to_string(),
        subject: "Daily Report".to_string(),
        body: "Here's your daily report".to_string(),
    }
    .job()?
    .cron("*/10 * * * * * *")?
).await?;
```

### Custom Job Implementation


```rust
use runat::{BackgroundJob, Executable, JobResult};
use serde::{Deserialize, Serialize};
use async_trait::async_trait;

#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]

pub struct ProcessPayment {
    pub user_id: String,
    pub amount: f64,
}

#[async_trait]

impl Executable for ProcessPayment {
    async fn execute(&mut self) -> JobResult<String>{
        println!("Processing ${} payment for user {}", self.amount, self.user_id);
        // Your payment processing logic here
        Ok(format!("Payment processed for {}", self.user_id))
    }
}

// Don't forget to register before using
// queue.register::<ProcessPayment>()?;
```

### Job Registration


**Important**: You must register job handlers with the queue before workers can process them. Each queue instance has its own registry.

```rust
// Register all job types your workers will process
queue.register::<SendEmailJob>()?;
queue.register::<ProcessPayment>()?;
queue.register::<AnotherJobType>()?;

// Then start your workers
queue.start_worker().await?;
```

If a worker encounters a job type that hasn't been registered, it will fail the job with a helpful error message:

```
No handler registered for job type: SendEmailJob. Call queue.register::<T>() before starting workers.
```

### Running Workers


You can run workers in two ways:

**Option 1: Run worker directly from the queue (recommended)**

```rust
// JobQueue is Clone, so you can clone it before moving into the task
let queue_clone = queue.clone();
tokio::spawn(async move {
    queue_clone.start_worker().await
});
```

**Option 2: Create a worker instance**

```rust
use runat::JobWorker;

// Create a worker from the queue
let worker = queue.worker();

// Run the worker
tokio::spawn(async move {
    worker.run().await
});
```

### Retry Strategies


```rust
use runat::Retry;
use chrono::Duration;

// Create a job with exponential backoff retry
queue.enqueue(SendEmailJob {
    to: "user@example.com".to_string(),
    subject: "Welcome!".to_string(),
    body: "Thanks for signing up!".to_string(),
}.job()?
.set_max_attempts(3)
.retry(Retry::fixed(Duration::seconds(3)))
.cron("*/10 * * * * * *")?)
.await?;
```

### Running Tests


```bash
cargo test
```