Crate apalis_postgres

Crate apalis_postgres 

Source
Expand description

§apalis-postgres

Background task processing in rust using apalis and postgres

§Features

  • Reliable job queue using Postgres as the backend.
  • Multiple storage types: standard polling and trigger based storages.
  • Custom codecs for serializing/deserializing job arguments as bytes.
  • Heartbeat and orphaned job re-enqueueing for robust task processing.
  • Integration with apalis workers and middleware.

§Storage Types

The naming is designed to clearly indicate the storage mechanism and its capabilities, but under the hood the result is the PostgresStorage struct with different configurations.

§Examples

§Basic Worker Example


#[tokio::main]
async fn main() {
    let pool = PgPool::connect(env!("DATABASE_URL")).await.unwrap();
    PostgresStorage::setup(&pool).await.unwrap();
    let mut backend = PostgresStorage::new_with_config(&pool, &Config::new("int-queue"));

    let mut start = 0;
    let mut items = stream::repeat_with(move || {
        start += 1;
        let task = Task::builder(serde_json::to_vec(&start).unwrap())
            .run_after(Duration::from_secs(1))
            .with_ctx(SqlContext::new().with_priority(1))
            .build();
        Ok(task)
    })
    .take(10);
    backend.send_all(&mut items).await.unwrap();

    async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> {
        Ok(())
    }

    let worker = WorkerBuilder::new("worker-1")
        .backend(backend)
        .build(send_reminder);
    worker.run().await.unwrap();
}

§NOTIFY listener example


#[tokio::main]
async fn main() {
    let pool = PgPool::connect(env!("DATABASE_URL")).await.unwrap();
    PostgresStorage::setup(&pool).await.unwrap();

    let lazy_strategy = StrategyBuilder::new()
        .apply(IntervalStrategy::new(Duration::from_secs(5)))
        .build();
    let config = Config::new("queue")
        .with_poll_interval(lazy_strategy)
        .set_buffer_size(5);
    let backend = PostgresStorage::new_with_notify(&pool, &config);

    tokio::spawn({
        let pool = pool.clone();
        let config = config.clone();
        async move {
            tokio::time::sleep(Duration::from_secs(2)).await;
            let mut start = 0;
            let items = stream::repeat_with(move || {
                start += 1;
                Task::builder(serde_json::to_vec(&start).unwrap())
                    .run_after(Duration::from_secs(1))
                    .with_ctx(SqlContext::new().with_priority(start))
                    .build()
            })
            .take(20)
            .collect::<Vec<_>>()
            .await;
            apalis_postgres::sink::push_tasks(pool, config, items).await.unwrap();
        }
    });

    async fn send_reminder(task: usize) -> Result<(), BoxDynError> {
        Ok(())
    }

    let worker = WorkerBuilder::new("worker-2")
        .backend(backend)
        .build(send_reminder);
    worker.run().await.unwrap();
}

§Workflow Example


#[tokio::main]
async fn main() {
    let workflow = WorkFlow::new("odd-numbers-workflow")
        .then(|a: usize| async move {
            Ok::<_, WorkflowError>((0..=a).collect::<Vec<_>>())
        })
        .filter_map(|x| async move {
            if x % 2 != 0 { Some(x) } else { None }
        })
        .filter_map(|x| async move {
            if x % 3 != 0 { Some(x) } else { None }
        })
        .filter_map(|x| async move {
            if x % 5 != 0 { Some(x) } else { None }
        })
        .delay_for(Duration::from_millis(1000))
        .then(|a: Vec<usize>| async move {
            println!("Sum: {}", a.iter().sum::<usize>());
            Ok::<(), WorkflowError>(())
        });

    let pool = PgPool::connect(env!("DATABASE_URL")).await.unwrap();
    PostgresStorage::setup(&pool).await.unwrap();
    let mut backend = PostgresStorage::new_with_config(&pool, &Config::new("workflow-queue"));

    backend.push(100usize).await.unwrap();

    let worker = WorkerBuilder::new("rango-tango")
        .backend(backend)
        .on_event(|ctx, ev| {
            println!("On Event = {:?}", ev);
            if matches!(ev, Event::Error(_)) {
                ctx.stop().unwrap();
            }
        })
        .build(workflow);

    worker.run().await.unwrap();
}

§Observability

You can track your jobs using apalis-board. Task

§License

Licensed under either of Apache License, Version 2.0 or MIT license at your option.

Modules§

config
context
shared
sink

Structs§

InsertEvent
PgConnectOptions
Options and flags which can be used to configure a PostgreSQL connection.
PgListener
A stream of asynchronous notifications from Postgres.
PgNotify
Postgres
PostgreSQL database driver.
PostgresStorage

Type Aliases§

CompactType
PgPool
An alias for Pool, specialized for Postgres.
PgTask