underway 0.0.5

⏳ Durable step functions via Postgres
Documentation
<h1 align="center">
    underway
</h1>

<p align="center">
    ⏳ Durable step functions via Postgres.
</p>

<div align="center">
    <a href="https://crates.io/crates/underway">
        <img src="https://img.shields.io/crates/v/underway.svg" />
    </a>
    <a href="https://docs.rs/underway">
        <img src="https://docs.rs/underway/badge.svg" />
    </a>
    <a href="https://github.com/maxcountryman/underway/actions/workflows/rust.yml">
        <img src="https://github.com/maxcountryman/underway/actions/workflows/rust.yml/badge.svg" />
    </a>
</div>

## 🎨 Overview

**Underway** is a framework for building robust, asynchronous background
jobs in Rust, leveraging PostgreSQL as its queuing backend. It provides a
streamlined interface for defining jobs as a series of "steps," where each
step's output becomes the input for the next. This design enables the
construction of complex, durable, and resilient workflows with ease.

Key Features:

- **PostgreSQL-Backed** Leverages PostgreSQL with `FOR UPDATE SKIP LOCKED` for
  reliable task storage and coordination, ensuring efficient, safe
- **Atomic Task Management** Enqueue tasks within your transactions and use
  the worker's transaction within your tasks for atomic database
  queries--ensuring consisteny.
- **Automatic Retries** Configurable retry strategies ensure tasks are
  reliably completed, even after transient failures.
- **Cron-Like Scheduling** Schedule recurring tasks with cron-like
  expressions for automated, time-based job execution.
- **Scalable and Flexible** Easily scales from a single worker to many,
  enabling seamless background job processing with minimal setup.

## 🤸 Usage

Underway is suitable for many different use cases, ranging from simple
single-step jobs to more sophisticated multi-step jobs, where dependencies
are built up between steps.

## Welcome emails

A common use case is deferring work that can be processed later. For
instance, during user registration, we might want to send a welcome email to
new users. Rather than handling this within the registration process (e.g.,
form validation, database insertion), we can offload it to run "out-of-band"
using Underway. By defining a job for sending the welcome email, Underway
ensures it gets processed in the background, without slowing down the user
registration flow.

```rust
use std::env;

use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use underway::{Job, To};

// This is the input we'll provide to the job when we enqueue it.
#[derive(Deserialize, Serialize)]
struct WelcomeEmail {
    user_id: i32,
    email: String,
    name: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Set up the database connection pool.
    let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
    let pool = PgPool::connect(database_url).await?;

    // Run migrations.
    underway::MIGRATOR.run(&pool).await?;

    // Build the job.
    let job = Job::builder()
        .step(
            |_cx,
             WelcomeEmail {
                 user_id,
                 email,
                 name,
             }| async move {
                // Simulate sending an email.
                println!("Sending welcome email to {name} <{email}> (user_id: {user_id})");
                // Returning this indicates this is the final step.
                To::done()
            },
        )
        .name("welcome-email")
        .pool(pool)
        .build()
        .await?;

    // Here we enqueue a new job to be processed later.
    job.enqueue(WelcomeEmail {
        user_id: 42,
        email: "ferris@example.com".to_string(),
        name: "Ferris".to_string(),
    })
    .await?;

    // Start processing enqueued tasks.
    job.start().await??;

    Ok(())
}
```

## Order receipts

Another common use case is defining dependencies between discrete steps of a
job. For instance, we might generate PDF receipts for orders and then email
these to customers. With Underway, each step is handled separately, making
it easy to create a job that first generates the PDF and, once
completed, proceeds to send the email.

This separation provides significant value: if the email sending service
is temporarily unavailable, we can retry the email step without having to
regenerate the PDF, avoiding unnecessary repeated work.

```rust
use std::env;

use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use underway::{Job, To};

#[derive(Deserialize, Serialize)]
struct GenerateReceipt {
    // An order we want to generate a receipt for.
    order_id: i32,
}

#[derive(Deserialize, Serialize)]
struct EmailReceipt {
    // An object store key to our receipt PDF.
    receipt_key: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Set up the database connection pool.
    let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
    let pool = PgPool::connect(database_url).await?;

    // Run migrations.
    underway::MIGRATOR.run(&pool).await?;

    // Build the job.
    let job = Job::builder()
        .step(|_cx, GenerateReceipt { order_id }| async move {
            // Use the order ID to build a receipt PDF...
            let receipt_key = format!("receipts_bucket/{order_id}-receipt.pdf");
            // ...store the PDF in an object store.

            // We proceed to the next step with the receipt_key as its input.
            To::next(EmailReceipt { receipt_key })
        })
        .step(|_cx, EmailReceipt { receipt_key }| async move {
            // Retrieve the PDF from the object store, and send the email.
            println!("Emailing receipt for {receipt_key}");
            To::done()
        })
        .name("order-receipt")
        .pool(pool)
        .build()
        .await?;

    // Enqueue the job for the given order.
    job.enqueue(GenerateReceipt { order_id: 42 }).await?;

    // Start processing enqueued jobs.
    job.start().await??;

    Ok(())
}
```

With this setup, if the email service is down, the `EmailReceipt` step can
be retried without redoing the PDF generation, saving time and resources by
not repeating the expensive step of generating the PDF.

## Daily reports

Jobs may also be run on a schedule. This makes them useful for situations
where we want to do things on a regular cadence, such as creating a daily
business report.

```rust
use std::env;

use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use underway::{Job, To};

#[derive(Deserialize, Serialize)]
struct DailyReport;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Set up the database connection pool.
    let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
    let pool = PgPool::connect(database_url).await?;

    // Run migrations.
    underway::MIGRATOR.run(&pool).await?;

    // Build the job.
    let job = Job::builder()
        .step(|_cx, _| async move {
            // Here we would generate and store the report.
            To::done()
        })
        .name("daily-report")
        .pool(pool)
        .build()
        .await?;

    // Set a daily schedule with the given input.
    let daily = "@daily[America/Los_Angeles]".parse()?;
    job.schedule(daily, DailyReport).await?;

    // Start processing enqueued jobs.
    job.start().await??;

    Ok(())
}
```

## 🛟 Getting Help

We've put together a number of [examples][examples] to help get you started. You're also welcome to [open a discussion](https://github.com/maxcountryman/underway/discussions/new?category=q-a) and ask additional questions you might have.

## 👯 Contributing

We appreciate all kinds of contributions, thank you!

[examples]: https://github.com/maxcountryman/underway/tree/main/examples
[docs]: https://docs.rs/underway