Skip to main content

Crate flowd

Crate flowd 

Source
Expand description

§flowd

A Redis Streams-backed task queue with automatic struct ↔ redis::Value mapping via a derive macro.

§Features

  • Job derive macro — generates bidirectional conversions between Rust structs and Vec<(String, redis::Value)> pairs for zero-copy Redis integration.
  • Consumer queue — reads from a Redis Stream consumer group with semaphore-based concurrency control and graceful shutdown.
  • Claimer — reclaims stuck messages via XAUTOCLAIM, tracks delivery counts, and routes exhausted messages to a dead-letter callback.
  • Runtime-agnostic — works with either tokio or smol (mutually exclusive feature flags).

§Quick start

use flowd::prelude::*;
use flowd::task::{Queue, QueueBuilder};

#[derive(Debug, Job)]
struct Email {
    to: String,
    subject: String,
}

let client    = redis::Client::open("redis://127.0.0.1:6379")?;
let conn      = client.get_multiplexed_async_connection().await?;
let read_conn = client.get_multiplexed_async_connection().await?;

let queue = Queue::new(
    QueueBuilder::new(
        "emails", "senders", "sender-1",
        |email: Email| async move {
            println!("sending to {}", email.to);
            Ok::<(), String>(())
        },
        conn,
        read_conn,
    )
    .block_timeout(5000)
    .max_concurrent_tasks(10),
);

queue.init(None).await?;
let handle = queue.run();
// ... later ...
handle.shutdown().await;

§Runtime selection

Enable exactly one of:

  • tokio — uses tokio::sync::Semaphore, tokio::task::JoinSet
  • smol — uses mea::semaphore::Semaphore, FuturesUnordered

Re-exports§

pub use crate::job::Job;

Modules§

job
prelude
Convenience re-exports for common usage.
task
Redis Streams consumer queue with concurrency control, graceful shutdown, and dead-letter handling.

Functions§

debug_map
Pretty-print every field of a Job value.
round_trip
Round-trip a value through pairs — useful for testing your mapper.