Crate apalis_sqlite

Crate apalis_sqlite 

Source
Expand description

§apalis-sqlite

Background task processing for rust using apalis and sqlite.

§Features

  • Reliable job queue using SQLite as the backend.
  • Multiple storage types: standard polling and event-driven (hooked) storage.
  • Custom codecs for serializing/deserializing job arguments.
  • Heartbeat and orphaned job re-enqueueing for robust job processing.
  • Integration with apalis workers and middleware.

§Storage Types

The naming is designed to clearly indicate the storage mechanism and its capabilities, but under the hood its the result is the SqliteStorage struct with different configurations.

§Examples

§Basic Worker Example

#[tokio::main]
async fn main() {
    let pool = SqlitePool::connect(":memory:").await.unwrap();
    SqliteStorage::setup(&pool).await.unwrap();
    let mut backend = SqliteStorage::new(&pool);

    let mut start = 0usize;
    let mut items = stream::repeat_with(move || {
        start += 1;
        start
    })
    .take(10);
    backend.push_stream(&mut items).await.unwrap();

    async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> {
        if item == 10 {
            wrk.stop().unwrap();
        }
        Ok(())
    }

    let worker = WorkerBuilder::new("worker-1")
        .backend(backend)
        .build(send_reminder);
    worker.run().await.unwrap();
}

§Hooked Worker Example (Event-driven)


#[tokio::main]
async fn main() {
    let pool = SqlitePool::connect(":memory:").await.unwrap();
    SqliteStorage::setup(&pool).await.unwrap();

    let lazy_strategy = StrategyBuilder::new()
        .apply(IntervalStrategy::new(Duration::from_secs(5)))
        .build();
    let config = Config::new("queue")
        .with_poll_interval(lazy_strategy)
        .set_buffer_size(5);
    let backend = SqliteStorage::new_with_callback(&pool, &config);

    tokio::spawn({
        let pool = pool.clone();
        let config = config.clone();
        async move {
            tokio::time::sleep(Duration::from_secs(2)).await;
            let mut start = 0;
            let items = stream::repeat_with(move || {
                start += 1;
                Task::builder(serde_json::to_string(&start).unwrap())
                    .run_after(Duration::from_secs(1))
                    .with_ctx(SqlContext::new().with_priority(start))
                    .build()
            })
            .take(20)
            .collect::<Vec<_>>()
            .await;
            // push encoded tasks
            apalis_sqlite::sink::push_tasks(pool, config, items).await.unwrap();
        }
    });

    async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> {
        if item == 1 {
            apalis_core::timer::sleep(Duration::from_secs(1)).await;
            wrk.stop().unwrap();
        }
        Ok(())
    }

    let worker = WorkerBuilder::new("worker-2")
        .backend(backend)
        .build(send_reminder);
    worker.run().await.unwrap();
}

§Workflow Example

#[tokio::main]
async fn main() {
    let workflow = WorkFlow::new("odd-numbers-workflow")
        .then(|a: usize| async move {
            Ok::<_, WorkflowError>((0..=a).collect::<Vec<_>>())
        })
        .filter_map(|x| async move {
            if x % 2 != 0 { Some(x) } else { None }
        })
        .filter_map(|x| async move {
            if x % 3 != 0 { Some(x) } else { None }
        })
        .filter_map(|x| async move {
            if x % 5 != 0 { Some(x) } else { None }
        })
        .delay_for(Duration::from_millis(1000))
        .then(|a: Vec<usize>| async move {
            println!("Sum: {}", a.iter().sum::<usize>());
            Ok::<(), WorkflowError>(())
        });

    let pool = SqlitePool::connect(":memory:").await.unwrap();
    SqliteStorage::setup(&pool).await.unwrap();
    let mut sqlite = SqliteStorage::new_in_queue(&pool, "test-workflow");

    sqlite.push(100usize).await.unwrap();

    let worker = WorkerBuilder::new("rango-tango")
        .backend(sqlite)
        .on_event(|ctx, ev| {
            println!("On Event = {:?}", ev);
            if matches!(ev, Event::Error(_)) {
                ctx.stop().unwrap();
            }
        })
        .build(workflow);

    worker.run().await.unwrap();
}

§Migrations

If the migrate feature is enabled, you can run built-in migrations with:

let pool = SqlitePool::connect(":memory:").await.unwrap();
SqliteStorage::setup(&pool).await.unwrap();

§License

Licensed under either of Apache License, Version 2.0 or MIT license at your option.

Modules§

fetcher
queries
sink

Structs§

CallbackListener
Config
DbEvent
SharedSqliteStorage
SqliteStorage

Enums§

SharedPostgresError

Type Aliases§

CompactType
SqlitePool
An alias for Pool, specialized for SQLite.
SqliteTask