msgq 0.1.5

Robust Redis Stream based message queue with auto-claim and retry handling / 基于 Redis Stream 的健壮消息队列,支持自动认领和重试处理
Documentation

English | 中文


msgq : Robust Redis Stream Message Queue

Robust Redis Stream based message queue with auto-claim and retry handling.

Table of Contents

Features

  • Redis Stream Based: Utilizes XREADGROUP for efficient message consumption.
  • Consumer Groups: Supports scalable message processing with consumer groups.
  • Auto Group Creation: Automatically creates consumer groups if they don't exist.
  • Reliability: Built-in BLOCK and CLAIM mechanisms for handling idle messages.
  • Concurrency: Processes messages concurrently using tokio::spawn.
  • Error Handling: Automatic retry mechanism (configurable limit) with error callbacks.

Usage

use std::sync::Arc;
use aok::{OK, Void};
use log::info;
use msgq::{Kv, ReadGroup};

async fn on_error(mail: Arc<Kv>, error: String) -> Void {
  info!("on_error {error} {:?}", mail);
  OK
}

async fn print_mail(mail: Arc<Kv>) -> Void {
  info!("print_mail {:?}", mail);
  OK
}

#[tokio::main]
async fn main() -> Void {
  // Initialize Redis connection (example)
  // xboot::init().await?;

  // Create a ReadGroup
  // stream: "smtp", group: "send", consumer: "test"
  // block: 1s, claim_idle: 3s, count: 2, max_retry: 3
  let read_group = ReadGroup::new("smtp", "send", "test", 1, 3, 2, 3);

  // Run the consumer loop
  read_group.run(print_mail, on_error).await?;
  OK
}

Design

The ReadGroup::run method executes a continuous loop that ensures robust message processing:

  1. Command Construction: Prepares an XREADGROUP command with BLOCK (for waiting on new messages) and CLAIM (for recovering idle messages from other consumers) options.
  2. Message Fetching: Executes the command against Redis to retrieve a batch of messages.
  3. Group Management: Checks if the consumer group exists; if not, auto_new is called to create it automatically.
  4. Concurrent Processing: Iterates through the fetched messages and spawns a tokio task for each message to execute the user-provided parse callback.
  5. Error Handling & Retry:
    • Monitors the execution of each task.
    • If a task fails, the error is logged.
    • The system tracks retries; if a message fails more than max_retry times, it is handed off to the on_error callback for manual intervention or logging.
  6. Cleanup: Successfully processed messages (or those handled by on_error) are acknowledged and deleted from the stream using rm_id_li (XACK + XDEL) to prevent reprocessing.

Tech Stack

  • Rust: Core language for performance and safety.
  • Tokio: Asynchronous runtime for handling concurrency.
  • Fred: High-performance Redis client.
  • xkv: Redis connection and command management wrapper.

Directory Structure

  • src/lib.rs: Library exports and type definitions.
  • src/read_group.rs: Core ReadGroup struct and consumer loop logic.
  • src/auto_new.rs: Logic for automatically creating consumer groups if missing.
  • src/rm_id_li.rs: Helper for acknowledging and deleting processed messages.
  • src/parse_stream.rs: Utilities for parsing Redis Stream responses.
  • tests/: Integration tests demonstrating usage patterns.

API Reference

ReadGroup

The main struct for configuring and running the consumer.

  • new(stream, group, consumer, block_sec, claim_idle_sec, count, max_retry): Creates a new ReadGroup instance.
    • stream: Redis Key for the stream.
    • group: Consumer group name.
    • consumer: Consumer name.
    • block_sec: Seconds to block waiting for new messages.
    • claim_idle_sec: Seconds before claiming idle messages from other consumers.
    • count: Maximum number of messages to fetch per batch.
    • max_retry: Maximum number of retries before passing to on_error.
  • run(parse, on_error): Starts the infinite processing loop.
    • parse: Async callback function to process each message.
    • on_error: Async callback function to handle messages that failed retries.

StreamItem

Represents a single message from the Redis Stream.

  • id: The unique ID of the message.
  • retry: The delivery count (number of times delivered).
  • idle_ms: Time in milliseconds the message has been idle (if claimed).
  • kv: The message payload as a vector of Key-Value byte pairs.

History

In 1983, Vivek Ranadive, a 26-year-old MIT graduate, observed that while hardware components communicated via a "bus", software lacked a similar standard mechanism. He envisioned a "software bus" where applications could publish and subscribe to information without direct, rigid connections. This idea led to the creation of The Information Bus (TIB), the first commercial message queue software. TIB revolutionised financial trading floors by replacing manual chalkboards with real-time digital streams, allowing different trading systems to communicate instantly. This innovation laid the groundwork for modern event-driven architectures and the message queue systems we rely on today, such as Redis Streams.


About

This project is an open-source component of js0.site ⋅ Refactoring the Internet Plan.

We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:


msgq : 基于 Redis Stream 的健壮消息队列

基于 Redis Stream 的健壮消息队列,支持自动认领和重试处理。

目录

功能特性

  • 基于 Redis Stream: 利用 XREADGROUP 高效消费消息。
  • 消费组支持: 支持通过消费组进行扩展式消息处理。
  • 自动创建组: 若消费组不存在,自动创建。
  • 高可靠性: 内置 BLOCKCLAIM 机制处理闲置消息。
  • 并发处理: 使用 tokio::spawn 并发处理消息。
  • 错误处理: 自动重试机制(可配置重试次数),并提供错误回调。

使用演示

use std::sync::Arc;
use aok::{OK, Void};
use log::info;
use msgq::{Kv, ReadGroup};

async fn on_error(mail: Arc<Kv>, error: String) -> Void {
  info!("on_error {error} {:?}", mail);
  OK
}

async fn print_mail(mail: Arc<Kv>) -> Void {
  info!("print_mail {:?}", mail);
  OK
}

#[tokio::main]
async fn main() -> Void {
  // 初始化 Redis 连接 (示例)
  // xboot::init().await?;

  // 创建 ReadGroup
  // stream: "smtp", group: "send", consumer: "test"
  // block: 1s, claim_idle: 3s, count: 2, max_retry: 3
  let read_group = ReadGroup::new("smtp", "send", "test", 1, 3, 2, 3);

  // 运行消费循环
  read_group.run(print_mail, on_error).await?;
  OK
}

设计思路

ReadGroup::run 方法执行一个持续循环,确保消息处理的健壮性:

  1. 命令构建: 构建带有 BLOCK(等待新消息)和 CLAIM(认领其他消费者的闲置消息)选项的 XREADGROUP 命令。
  2. 消息获取: 向 Redis 执行命令以获取一批消息。
  3. 组管理: 检查消费组是否存在;若不存在,调用 auto_new 自动创建。
  4. 并发处理: 遍历获取的消息,为每条消息生成一个 tokio 异步任务,执行用户提供的 parse 回调。
  5. 错误处理与重试:
    • 监控每个任务的执行。
    • 若任务失败,记录错误日志。
    • 系统跟踪重试次数;若消息失败超过 max_retry 次,将其移交 on_error 回调进行人工干预或记录。
  6. 清理: 成功处理(或由 on_error 处理)的消息将通过 rm_id_liXACK + XDEL)进行确认和删除,防止重复处理。

技术堆栈

  • Rust: 核心语言,提供高性能和安全性。
  • Tokio: 异步运行时,处理并发任务。
  • Fred: 高性能 Redis 客户端。
  • xkv: Redis 连接与命令管理封装。

目录结构

  • src/lib.rs: 库导出与类型定义。
  • src/read_group.rs: 核心 ReadGroup 结构体与消费循环逻辑。
  • src/auto_new.rs: 消费组自动创建逻辑。
  • src/rm_id_li.rs: 消息确认与删除辅助工具。
  • src/parse_stream.rs: Redis Stream 响应解析工具。
  • tests/: 集成测试与使用示例。

API 参考

ReadGroup

配置和运行消费者的主要结构体。

  • new(stream, group, consumer, block_sec, claim_idle_sec, count, max_retry): 创建新的 ReadGroup 实例。
    • stream: Redis Stream 键名。
    • group: 消费组名称。
    • consumer: 消费者名称。
    • block_sec: 等待新消息的阻塞秒数。
    • claim_idle_sec: 认领闲置消息前的等待秒数。
    • count: 每批次获取的最大消息数。
    • max_retry: 移交 on_error 前的最大重试次数。
  • run(parse, on_error): 启动无限处理循环。
    • parse: 处理每条消息的异步回调函数。
    • on_error: 处理重试失败消息的异步回调函数。

StreamItem

表示来自 Redis Stream 的单条消息。

  • id: 消息唯一 ID。
  • retry: 投递次数(重试计数)。
  • idle_ms: 消息闲置时间(毫秒,若被认领)。
  • kv: 消息负载,为键值对字节向量。

历史背景

1983 年,26 岁的麻省理工学院毕业生 Vivek Ranadive 观察到,虽然硬件组件通过“总线”进行通信,但软件缺乏类似的标准机制。他设想了一种“软件总线”,应用程序可以在没有直接、僵化连接的情况下发布和订阅信息。这一想法促成了 The Information Bus (TIB) 的诞生,这是首款商业消息队列软件。TIB 取代了人工黑板,实现了实时数字流,彻底改变了金融交易大厅,允许不同的交易系统即时通信。这一创新为现代事件驱动架构以及我们今天所依赖的 Redis Stream 等消息队列系统奠定了基础。


关于

本项目为 js0.site ⋅ 重构互联网计划 的开源组件。

我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注: