Expand description
§apalis-redis
Background task processing for rust using apalis and redis
§Features
- Reliable task queue using any
rediscompatible service as the backend. - Multiple storage types: standard polling and
pubsubbased approaches. - Customizable codecs for serializing/deserializing task arguments such as
json,msgpackandbincode. - Heartbeat and orphaned tasks re-enqueueing for consistent task processing.
- Integration with
apalisworkers and middleware such asretry,long_runningandparallelize - Shared storage for multiple task types using the same
redisconnection. - Observability: Monitor and manage tasks using apalis-board.
§Usage
Add the latest versions from crates.io:
apalis = { version = "1", features = ["retry"] }
apalis-redis = { version = "1" }§Example
use apalis::prelude::*;
use apalis_redis::{RedisStorage, RedisConfig as Config};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
struct Email {
to: String,
}
async fn send_email(task: Email) -> Result<(), BoxDynError> {
Ok(())
}
#[tokio::main]
async fn main() {
let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set");
let conn = apalis_redis::connect(redis_url).await.expect("Could not connect");
let storage = RedisStorage::new(conn);
let worker = WorkerBuilder::new("tasty-pear")
.backend(storage)
.build(send_email);
worker.run().await;
}§Shared Example
This shows an example of multiple backends using the same connection. This can improve performance if you have many task types.
use apalis::prelude::*;
use apalis_redis::{RedisStorage, RedisConfig as Config, shared::SharedRedisStorage, Client};
use std::collections::HashMap;
use futures::stream;
use std::time::Duration;
use std::env;
#[tokio::main]
async fn main() {
let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
let mut store = SharedRedisStorage::new(client).await.expect("Could not create shared");
let mut map_store = store.make_shared().unwrap();
let mut int_store = store.make_shared().unwrap();
map_store
.push_stream(&mut stream::iter(vec![HashMap::<String, String>::new()]))
.await
.unwrap();
int_store.push(99).await.unwrap();
async fn send_reminder<T, I>(
_: T,
task_id: TaskId<I>,
wrk: WorkerContext,
) -> Result<(), BoxDynError> {
tokio::time::sleep(Duration::from_secs(2)).await;
wrk.stop().unwrap();
Ok(())
}
let int_worker = WorkerBuilder::new("rango-tango-2")
.backend(int_store)
.build(send_reminder);
let map_worker = WorkerBuilder::new("rango-tango-1")
.backend(map_store)
.build(send_reminder);
tokio::try_join!(int_worker.run(), map_worker.run()).unwrap();
}§Workflow Example
use apalis::prelude::*;
use apalis_redis::{RedisStorage, RedisConfig as Config};
use apalis_workflow::WorkFlow;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
struct Data {
value: u32,
}
async fn task1(task: u32) -> Result<Data, BoxDynError> {
Ok(Data { value: task + 1 })
}
async fn task2(task: Data) -> Result<Data, BoxDynError> {
Ok(Data { value: task.value * 2 })
}
async fn task3(task: Data) -> Result<(), BoxDynError> {
println!("Final value: {}", task.value);
Ok(())
}
#[tokio::main]
async fn main() {
let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set");
let conn = apalis_redis::connect(redis_url).await.expect("Could not connect");
let storage = RedisStorage::new(conn);
let work_flow = WorkFlow::new("sample-workflow")
.then(task1)
.then(task2)
.then(task3);
let worker = WorkerBuilder::new("tasty-carrot")
.backend(storage)
.build(work_flow);
worker.run().await;
}§Observability
You can track your tasks using apalis-board.

Re-exports§
pub use crate::sink::RedisSink;
Modules§
Structs§
- Client
- The client type.
- Compact
Task - A task structure that includes metadata.
- Connection
Manager - A
ConnectionManageris a proxy that wraps a multiplexed connection and automatically reconnects to the server when necessary. - Redis
Ack - A Redis acknowledgment Layer
- Redis
Config - Config for a
RedisStorage - Redis
Context - The context for a redis storage job
- Redis
Error - Represents a redis error.
- Redis
Storage - Represents a Backend that uses Redis for storage.
Traits§
- Into
Connection Info - Converts an object into a connection info struct. This allows the constructor of the client to accept connection information in a range of different formats.
Functions§
- connect
- Shorthand to create a client and connect
- deserialize_
with_ meta - Deserializes task data and metadata from Redis values.
- parse_
u32 - Parses a u32 from a redis::Value
- str_
from_ val - Extracts a &str from a redis::Value, returning an error if the value is not a bulk string.
Type Aliases§
- Redis
Task - A Redis task type alias