Expand description
§flowd
A Redis Streams-backed task queue with automatic struct ↔ redis::Value
mapping via a derive macro.
§Features
Jobderive macro — generates bidirectional conversions between Rust structs andVec<(String, redis::Value)>pairs for zero-copy Redis integration.- Consumer queue — reads from a Redis Stream consumer group with semaphore-based concurrency control and graceful shutdown.
- Claimer — reclaims stuck messages via
XAUTOCLAIM, tracks delivery counts, and routes exhausted messages to a dead-letter callback. - Runtime-agnostic — works with either
tokioorsmol(mutually exclusive feature flags).
§Quick start
ⓘ
use flowd::prelude::*;
use flowd::task::{Queue, QueueBuilder};
#[derive(Debug, Job)]
struct Email {
to: String,
subject: String,
}
let client = redis::Client::open("redis://127.0.0.1:6379")?;
let conn = client.get_multiplexed_async_connection().await?;
let read_conn = client.get_multiplexed_async_connection().await?;
let queue = Queue::new(
QueueBuilder::new(
"emails", "senders", "sender-1",
|email: Email| async move {
println!("sending to {}", email.to);
Ok::<(), String>(())
},
conn,
read_conn,
)
.block_timeout(5000)
.max_concurrent_tasks(10),
);
queue.init(None).await?;
let handle = queue.run();
// ... later ...
handle.shutdown().await;§Runtime selection
Enable exactly one of:
tokio— usestokio::sync::Semaphore,tokio::task::JoinSetsmol— usesmea::semaphore::Semaphore,FuturesUnordered
Re-exports§
pub use crate::job::Job;
Modules§
- job
- prelude
- Convenience re-exports for common usage.
- task
- Redis Streams consumer queue with concurrency control, graceful shutdown, and dead-letter handling.
Functions§
- debug_
map - Pretty-print every field of a
Jobvalue. - round_
trip - Round-trip a value through pairs — useful for testing your mapper.