Crate apalis_mysql

Crate apalis_mysql 

Source
Expand description

§apalis-mysql

Background task processing in rust using apalis and mysql

§Features

  • Reliable job queue using MySql as the backend.
  • Multiple storage types: standard polling and trigger based storages.
  • Custom codecs for serializing/deserializing job arguments as bytes.
  • Heartbeat and orphaned job re-enqueueing for robust task processing.
  • Integration with apalis workers and middleware.
  • Observability: Monitor and manage tasks using apalis-board.

§Storage Types

The naming is designed to clearly indicate the storage mechanism and its capabilities, but under the hood the result is the MySqlStorage struct with different configurations.

§Setting up

You need a working MySql server setup. Here is a quick command using docker.

docker run -d --name test-mysql -e MYSQL_ROOT_PASSWORD=strong_password -p 3306:3306 mysql

§Examples

§Basic Worker Example

use apalis_mysql::*;
use futures::stream::{self, StreamExt};
use apalis::prelude::*;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let pool = MySqlPool::connect(env!("DATABASE_URL")).await.unwrap();
    MySqlStorage::setup(&pool).await.unwrap();
    let mut backend = MySqlStorage::new(&pool);

    let mut start = 0;
    let mut items = stream::repeat_with(move || {
        start += 1;
        let task = Task::builder(start)
            .run_after(Duration::from_secs(1))
            .priority(1)
            .build();
        task
    })
    .take(10);
    backend.push_all(&mut items).await.unwrap();

    async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> {
        Ok(())
    }

    let worker = WorkerBuilder::new("worker-1")
        .backend(backend)
        .build(send_reminder);
    worker.run().await.unwrap();
}

§Workflow Example

use apalis_mysql::*;
use apalis::prelude::*;
use std::time::Duration;
use apalis_workflow::*;

#[tokio::main]
async fn main() {
    let workflow = Workflow::new("odd-numbers-workflow")
        .and_then(|a: usize| async move {
            Ok::<_, BoxDynError>((0..=a).collect::<Vec<_>>())
        })
        .filter_map(|x| async move {
            if x % 2 != 0 { Some(x) } else { None }
        })
        .filter_map(|x| async move {
            if x % 3 != 0 { Some(x) } else { None }
        })
        .filter_map(|x| async move {
            if x % 5 != 0 { Some(x) } else { None }
        })
        .delay_for(Duration::from_millis(1000))
        .and_then(|a: Vec<usize>| async move {
            println!("Sum: {}", a.iter().sum::<usize>());
            Ok::<(), BoxDynError>(())
        });

    let pool = MySqlPool::connect(env!("DATABASE_URL")).await.unwrap();
    MySqlStorage::setup(&pool).await.unwrap();
    let mut backend = MySqlStorage::new_in_queue(&pool, "test-workflow");

    backend.push_start(100usize).await.unwrap();

    let worker = WorkerBuilder::new("rango-tango")
        .backend(backend)
        .on_event(|ctx, ev| {
            println!("On Event = {:?}", ev);
            if matches!(ev, Event::Error(_)) {
                ctx.stop().unwrap();
            }
        })
        .build(workflow);

    worker.run().await.unwrap();
}

§Shared Example

This shows an example of multiple backends using the same connection. This can improve performance if you have many types of jobs.

use apalis_mysql::*;
use futures::stream;
use apalis::prelude::*;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let pool = MySqlPool::connect(env!("DATABASE_URL"))
        .await
        .unwrap();
    let mut store = SharedMySqlStorage::new(pool);

    let mut map_store = store.make_shared().unwrap();

    let mut int_store = store.make_shared().unwrap();

    map_store
        .push_stream(&mut stream::iter(vec![HashMap::<String, String>::new()]))
        .await
        .unwrap();
    int_store.push(99).await.unwrap();

    async fn send_reminder<T, I>(
        _: T,
        task_id: TaskId<I>,
        wrk: WorkerContext,
    ) -> Result<(), BoxDynError> {
        tokio::time::sleep(Duration::from_secs(2)).await;
        wrk.stop().unwrap();
        Ok(())
    }

    let int_worker = WorkerBuilder::new("rango-tango-2")
        .backend(int_store)
        .build(send_reminder);
    let map_worker = WorkerBuilder::new("rango-tango-1")
        .backend(map_store)
        .build(send_reminder);
    tokio::try_join!(int_worker.run(), map_worker.run()).unwrap();
}

§Observability

You can track your jobs using apalis-board. Task

§License

Licensed under either of Apache License, Version 2.0 or MIT license at your option.

Modules§

fetcher
Fetcher module for retrieving tasks from mysql backend
queries
Queries module for mysql backend Module for various query implementations for the SQLite backend.
sink
Sink module for pushing tasks to mysql backend

Structs§

Config
Configuration for the SQL backend
MySql
MySQL database driver.
MySqlConnectOptions
Options and flags which can be used to configure a MySQL connection.
MySqlConnection
A connection to a MySQL database.
MySqlStorage
MySqlStorage is a storage backend for apalis using mysql as the database.
Pool
An asynchronous pool of SQLx database connections.
PoolConnection
A connection managed by a Pool.
PoolOptions
Configuration options for Pool.
SharedMySqlStorage
Shared MySql storage backend that can be used across multiple workers

Enums§

SharedMySqlError
Errors that can occur when creating a shared MySql storage backend
SqlxError
Represents all the ways a method can fail within SQLx.

Traits§

Connection
Represents a single database connection.
TaskBuilderExt
Extension traits for TaskBuilder

Type Aliases§

CompactType
CompactType is the type used for compact serialization in mysql backend
MySqlContext
MySqlPool
An alias for Pool, specialized for MySQL.
MySqlTask
Type alias for a task stored in mysql backend
MySqlTaskId