# RunAt
A distributed job scheduler for Rust with PostgreSQL backend support.
## Features
- **Distributed Job Scheduling**: Run jobs across multiple workers with PostgreSQL-backed coordination
- **Cron Support**: Schedule recurring jobs using cron expressions
- **Failed Jobs Queue**: Failed jobs are moved to a separate queue
- **Retry Mechanisms**: Built-in exponential backoff and custom retry strategies
- **Type-Safe Jobs**: Leverage Rust's type system for job definitions
- **Async/Await**: Built on Tokio for efficient async job execution
## Installation
Add this to your `Cargo.toml`:
```toml
[dependencies]
runat = "0.2.1"
```
For PostgreSQL support (enabled by default):
```toml
[dependencies]
runat = { version = "0.2.1", features = ["postgres"] }
```
Optional features:
- `postgres` - PostgreSQL backend (enabled by default)
- `tracing` - Tracing support for observability
## Quick Start
### Basic Job
```rust
use runat::{BackgroundJob, Executable, JobQueue, PostgresDatastore, JobResult};
use sqlx::postgres::PgPoolOptions;
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
// Define your job struct
#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]
pub struct SendEmailJob {
pub to: String,
pub subject: String,
pub body: String,
}
// Implement the job logic
#[async_trait]
impl Executable for SendEmailJob {
async fn execute(&mut self) -> JobResult<()> {
println!("Sending email to {}: {}", self.to, self.subject);
// Your email sending logic here
Ok(())
}
}
#[tokio::main]
async fn main() -> JobResult<()> {
// Connect to PostgreSQL
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://user:pass@localhost/db".to_string());
let pool = PgPoolOptions::new()
.max_connections(10)
.connect(&database_url)
.await?;
// Initialize datastore and run migrations
let datastore = PostgresDatastore::new(pool).await?;
datastore.migrate().await?;
// Create job queue (JobQueue is Clone, so no Arc needed)
let queue = JobQueue::with_datastore(Arc::new(datastore));
// Register job handlers before starting workers
queue.register::<SendEmailJob>()?;
// Enqueue a job
queue.enqueue(
SendEmailJob {
to: "user@example.com".to_string(),
subject: "Welcome!".to_string(),
body: "Thanks for signing up!".to_string(),
}.job()?
).await?;
Ok(())
}
```
### Scheduled Jobs with Cron
```rust
// Register the job handler first
queue.register::<SendEmailJob>()?;
// Schedule a job to run every 10 seconds
queue.enqueue(
SendEmailJob {
to: "admin@example.com".to_string(),
subject: "Daily Report".to_string(),
body: "Here's your daily report".to_string(),
}
.job()?
.cron("*/10 * * * * * *")?
).await?;
```
### Custom Job Implementation
```rust
use runat::{BackgroundJob, Executable, JobResult};
use serde::{Deserialize, Serialize};
use async_trait::async_trait;
#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]
pub struct ProcessPayment {
pub user_id: String,
pub amount: f64,
}
#[async_trait]
impl Executable for ProcessPayment {
async fn execute(&mut self) -> JobResult<String>{
println!("Processing ${} payment for user {}", self.amount, self.user_id);
// Your payment processing logic here
Ok(format!("Payment processed for {}", self.user_id))
}
}
// Don't forget to register before using
// queue.register::<ProcessPayment>()?;
```
### Job Registration
**Important**: You must register job handlers with the queue before workers can process them. Each queue instance has its own registry.
```rust
// Register all job types your workers will process
queue.register::<SendEmailJob>()?;
queue.register::<ProcessPayment>()?;
queue.register::<AnotherJobType>()?;
// Then start your workers
queue.start_worker().await?;
```
If a worker encounters a job type that hasn't been registered, it will fail the job with a helpful error message:
```
No handler registered for job type: SendEmailJob. Call queue.register::<T>() before starting workers.
```
### Running Workers
You can run workers in two ways:
**Option 1: Run worker directly from the queue (recommended)**
```rust
// JobQueue is Clone, so you can clone it before moving into the task
let queue_clone = queue.clone();
tokio::spawn(async move {
queue_clone.start_worker().await
});
```
**Option 2: Create a worker instance**
```rust
use runat::JobWorker;
// Create a worker from the queue
let worker = queue.worker();
// Run the worker
tokio::spawn(async move {
worker.run().await
});
```
### Retry Strategies
```rust
use runat::Retry;
use chrono::Duration;
// Create a job with exponential backoff retry
queue.enqueue(SendEmailJob {
to: "user@example.com".to_string(),
subject: "Welcome!".to_string(),
body: "Thanks for signing up!".to_string(),
}.job()?
.set_max_attempts(3)
.retry(Retry::fixed(Duration::seconds(3)))
.cron("*/10 * * * * * *")?)
.await?;
```
### Running Tests
```bash
cargo test
```