msgq 0.1.13

Robust Redis Stream based message queue with auto-claim and retry handling / 基于 Redis Stream 的健壮消息队列,支持自动认领和重试处理
Documentation
[English](n) | [中文](#zh)

---

<a id="en"></a>

# msgq : Robust Redis Stream Message Queue

Robust Redis Stream based message queue with auto-claim and retry handling.

## Table of Contents

- [Features](#features)
- [Usage](#usage)
- [Design](#design)
- [Tech Stack](#tech-stack)
- [Directory Structure](#directory-structure)
- [API Reference](#api-reference)
- [History](#history)

## Features

- **Redis Stream Based**: Utilizes `XREADGROUP` for efficient, scalable message consumption.
- **Consumer Groups**: Supports multiple consumers within a group for parallel processing.
- **Automatic Group Creation**: Automatically creates the Redis Stream group and consumer if they don't exist.
- **Reliable Delivery**: Implements auto-claiming of idle pending messages to prevent message loss.
- **Concurrent Processing**: Uses `tokio::spawn` with `async-scoped` to process messages concurrently.
- **Zero-Copy**: Leverages `async-scoped` to process borrowed data from the stream response without cloning, maximizing performance.
- **Message Chaining**: The `Parse::run` method can return `Some(Kv)` to add a new message to the queue, enabling workflow chaining.
- **Configurable Retries**: Automatic retry mechanism for failed messages, with a configurable limit.
- **Centralized Configuration**: A simple `Conf` struct to manage all connection and behavior settings.
- **Trait-Based Callbacks**: Uses a `Parse` trait for clear and reusable message processing and error handling logic.

## Usage

Define a struct and implement the `Parse` trait to handle your message processing and error logic.

```rust
use msgq::{Conf, Kv, Parse, ReadGroup};
use std::future::Future;
use aok::{OK, Void};
use log::info;

// 1. Define your message processor

struct MyParser;

impl Parse for MyParser {
  // 2. Implement the message processing logic
  async fn run(&self, kv: &Kv, retry: u64) -> aok::Result<Option<Kv>> {
    info!("run: {:?}, retry: {}", kv, retry);
    // Return Ok(None) for successful processing
    // Return Ok(Some(new_kv)) to add a new message to the queue
    // Return Err(...) to retry the message
    Ok(None)
  }

  // 3. Implement the error handling logic for messages that fail all retries
  async fn on_error(&self, kv: Kv, error: String) -> Void {
    info!("on_error: {:?}, error: {}", kv, error);
    OK
  }
}

#[tokio::main]
async fn main() -> Void {
  // 4. Initialize the environment (e.g. using xboot to set up global Redis client)
  // xboot::init().await?; 

  // 5. Configure the consumer
  let conf = Conf::new(
    "s1", // stream key
    "g1", // group name
    "c1", // consumer name
    5,    // block_sec: wait up to 5s for new messages
    60,   // claim_idle_sec: claim messages idle for 60s
    10,   // count: batch size
    3,    // max_retry: retry 3 times before on_error
  );

  // 6. Create a ReadGroup and run it
  ReadGroup::new(MyParser, conf).run().await?;
  OK
}
```

## Design

The `ReadGroup::run` method executes a continuous loop that ensures robust message processing:

1.  **Claim Idle Messages**: It first calls `XPENDING` to find messages that have been idle for longer than `claim_idle_ms` and claims them using `XAUTOCLAIM`. This ensures that messages from crashed or slow consumers are re-processed.
2.  **Fetch New Messages**: It then executes `XREADGROUP` with a `BLOCK` timeout to efficiently wait for and receive a new batch of messages.
3.  **Group Management**: If the command fails with a `NOGROUP` error, the `auto_new` function is called to automatically create the consumer group, making setup seamless.
4.  **Parse and Process**: All claimed and new messages are parsed by `parse_stream` into a list of `StreamItem`s.
5.  **Concurrent Execution with Zero-Copy**:
    -   The system uses `async_scoped` to spawn a `tokio` task for each `StreamItem`.
    -   Crucially, this allows the tasks to borrow data (like the message body) directly from the `StreamItem` without needing to clone it (`'static` lifetime is not required).
    -   This "zero-copy" approach significantly reduces memory overhead and improves performance, especially for large messages.
6.  **Error Handling & Retry**:
    -   If the `run` method returns an error, the message will be retried later.
    -   The `retry` count (delivery count) for each message is tracked and passed to the `run` method. If a message's retry count exceeds `max_retry`, it is passed to the `on_error` callback of the `Parse` trait for final handling (e.g., moving to a dead-letter queue).
7.  **Message Chaining**: If the `run` method returns `Ok(Some(new_kv))`, the new message is added to the queue using `XADD`, enabling workflow chaining.
8.  **Cleanup**: Successfully processed messages (or those handled by `on_error`) are acknowledged and deleted from the stream using `rm_id_li` (`XACK` and `XDEL`) to prevent reprocessing.

## Tech Stack

-   **Rust**: Core language for performance and safety.
-   **Tokio**: Asynchronous runtime for handling concurrency.
-   **Async Scoped**: Enables spawning non-`'static` futures, allowing for efficient zero-copy processing of borrowed data.
-   **Fred**: A high-performance, low-level Redis client for Rust.
-   **ThisError**: A library for deriving boilerplate `Error` implementations.

## Directory Structure

-   `src/lib.rs`: The library's main entry point. It exports the public API, including the `Parse` trait and key structs like `Conf`, `ReadGroup`, and `StreamItem`.
-   `src/conf.rs`: Defines the `Conf` struct, which centralizes all configuration parameters.
-   `src/read_group.rs`: Contains the core consumer logic within the `ReadGroup` struct and its `run` method.
-   `src/auto_new.rs`: Provides the `auto_new` function to automatically create a stream consumer group.
-   `src/parse_stream.rs`: Includes utilities for parsing responses from `XREADGROUP` and `XAUTOCLAIM`.
-   `src/rm_id_li.rs`: A helper function to `XACK` (acknowledge) and `XDEL` (delete) processed messages.
-   `src/error.rs`: Defines custom error types for the application.
-   `tests/`: Integration tests demonstrating usage patterns.

## API Reference

### `Conf`

A struct to hold all configuration for the `ReadGroup` consumer.

-   `stream`: The Redis key for the stream.
-   `group`: The consumer group name.
-   `consumer`: A unique name for this consumer.
-   `block_ms`: The time in milliseconds to block waiting for new messages.
-   `claim_idle_ms`: The idle time in milliseconds after which a pending message is considered abandoned and can be claimed by another consumer.
-   `count`: The maximum number of messages to fetch in a single batch.
-   `max_retry`: The maximum number of times a message will be retried before being passed to the `on_error` handler.

### `Parse` Trait

A trait that defines the application logic for message handling. You must implement this trait.

-   `run(&self, kv: &Kv, retry: u64) -> impl Future<Output = aok::Result<Option<Kv>>> + Send`:
    -   The asynchronous method called to process a single message.
    -   `kv`: The message data, as a `Vec<(Bytes, Bytes)>`.
    -   `retry`: The current retry count (delivery count) for this message.
    -   Return `Ok(None)` on successful processing.
    -   Return `Ok(Some(new_kv))` to add a new message to the queue (for workflow chaining).
    -   Return `Err(...)` to retry the message later.
-   `on_error(&self, kv: Kv, error: String) -> impl Future<Output = aok::Void> + Send`:
    -   The asynchronous method called when a message has failed more than `max_retry` times.
    -   `kv`: The message data that failed.
    -   `error`: The last error message that caused the failure.

### `ReadGroup`

The main consumer struct.

-   `ReadGroup::new(parse: P, conf: Conf)`: Creates a new `ReadGroup` instance.
-   `run(&self)`: Starts the infinite processing loop.

### `StreamItem`

Represents a single message from the Redis Stream.

-   `id`: The unique ID of the message.
-   `retry`: The delivery count (number of times delivered).
-   `idle_ms`: Time in milliseconds the message has been idle (if claimed).
-   `kv`: The message payload as a vector of Key-Value byte pairs.

## History

In 1983, Vivek Ranadive, a 26-year-old MIT graduate, observed that while hardware components communicated via a "bus", software lacked a similar standard mechanism. He envisioned a "software bus" where applications could publish and subscribe to information without direct, rigid connections. This idea led to the creation of **The Information Bus (TIB)**, the first commercial message queue software. TIB revolutionised financial trading floors by replacing manual chalkboards with real-time digital streams, allowing different trading systems to communicate instantly. This innovation laid the groundwork for modern event-driven architectures and the message queue systems we rely on today, such as Redis Streams.

---

## About

This project is an open-source component of [js0.site ⋅ Refactoring the Internet Plan](https://js0.site).

We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:

* [Google Group](https://groups.google.com/g/js0-site)
* [js0site.bsky.social](https://bsky.app/profile/js0site.bsky.social)

---

<a id="zh"></a>

# msgq : 基于 Redis Stream 的健壮消息队列

基于 Redis Stream 的健壮消息队列,支持自动认领和重试处理。

## 目录

- [功能特性](#功能特性)
- [使用演示](#使用演示)
- [设计思路](#设计思路)
- [技术堆栈](#技术堆栈)
- [目录结构](#目录结构)
- [API 参考](#api-参考)
- [历史背景](#历史背景)

## 功能特性

- **基于 Redis Stream**: 利用 `XREADGROUP` 实现高效、可扩展的消息消费。
- **消费组**: 支持组内多个消费者并行处理,提高吞吐量。
- **自动创建组**: 如果 Redis Stream 组和消费者不存在,会自动创建。
- **可靠投递**: 实现空闲消息(Pending Messages)的自动认领,防止消息丢失。
- **并发处理**: 使用 `tokio::spawn` 结合 `async-scoped` 并发处理消息。
- **零拷贝**: 利用 `async-scoped` 处理来自 Stream 响应的借用数据,无需克隆,从而最大化性能。
- **消息链**: `Parse::run` 方法可以返回 `Some(Kv)` 来向队列添加新消息,实现工作流链式处理。
- **可配置重试**: 为失败的消息提供自动重试机制,并可配置重试次数。
- **集中化配置**: 使用简单的 `Conf` 结构体管理所有连接和行为设置。
- **基于 Trait 的回调**: 使用 `Parse` Trait 定义消息处理和错误处理逻辑,使代码更清晰和可复用。

## 使用演示

定义一个结构体并实现 `Parse` trait,以处理您的消息和错误逻辑。

```rust
use msgq::{Conf, Kv, Parse, ReadGroup};
use std::future::Future;
use aok::{OK, Void};
use log::info;

// 1. 定义你的消息处理器

struct MyParser;

impl Parse for MyParser {
  // 2. 实现消息处理逻辑
  async fn run(&self, kv: &Kv, retry: u64) -> aok::Result<Option<Kv>> {
    info!("处理消息: {:?}, 重试次数: {}", kv, retry);
    // 返回 Ok(None) 表示成功处理
    // 返回 Ok(Some(new_kv)) 向队列添加新消息
    // 返回 Err(...) 稍后重试该消息
    Ok(None)
  }

  // 3. 为重试失败的消息实现错误处理逻辑
  async fn on_error(&self, kv: Kv, error: String) -> Void {
    info!("错误处理: {:?}, 错误: {}", kv, error);
    OK
  }
}

#[tokio::main]
async fn main() -> Void {
  // 4. 初始化环境 (例如使用 xboot 设置全局 Redis 客户端)
  // xboot::init().await?; 

  // 5. 配置消费者
  let conf = Conf::new(
    "s1", // stream 键名
    "g1", // 消费组名称
    "c1", // 消费者名称
    5,    // block_sec: 等待新消息的阻塞时间(秒)
    60,   // claim_idle_sec: 认领空闲消息的超时时间(秒)
    10,   // count: 单次获取的最大消息数
    3,    // max_retry: 最大重试次数
  );

  // 6. 创建 ReadGroup 并运行
  ReadGroup::new(MyParser, conf).run().await?;
  OK
}
```

## 设计思路

`ReadGroup::run` 方法执行一个持续的循环,确保消息处理的健壮性:

1.  **认领空闲消息**: 首先调用 `XPENDING` 查找空闲时间超过 `claim_idle_ms` 的消息,并使用 `XAUTOCLAIM` 认领它们。这确保了因消费者崩溃或缓慢而未处理的消息能够被重新处理。
2.  **获取新消息**: 接着执行带 `BLOCK` 超时的 `XREADGROUP` 命令,高效地等待并接收一批新消息。
3.  **组管理**: 如果命令因 `NOGROUP` 错误而失败,将调用 `auto_new` 函数自动创建消费组,使启动过程无缝衔接。
4.  **解析与处理**: 所有被认领和新获取的消息都由 `parse_stream` 解析成 `StreamItem` 列表。
5.  **零拷贝并发执行**:
    -   系统使用 `async_scoped` 为每个 `StreamItem` 生成一个 `tokio` 任务。
    -   关键在于,这允许任务直接从 `StreamItem` 借用数据(如消息体),而无需进行克隆(不需要 `'static` 生命周期)。
    -   这种"零拷贝"方法显著降低了内存开销并提高了性能,尤其是对于大消息而言。
6.  **错误处理与重试**:
    -   如果 `run` 方法返回错误,该消息将在稍后被重试。
    -   系统会跟踪每条消息的 `retry`(重试)次数并传递给 `run` 方法。如果消息的重试次数超过 `max_retry`,它将被传递给 `Parse` trait 的 `on_error` 回调进行最终处理(例如,移入死信队列)。
7.  **消息链**: 如果 `run` 方法返回 `Ok(Some(new_kv))`,新消息将使用 `XADD` 添加到队列,实现工作流链式处理。
8.  **清理**: 成功处理(或由 `on_error` 处理)的消息会通过 `rm_id_li`(`XACK` 和 `XDEL`)进行确认和删除,以防止重复处理。

## 技术堆栈

-   **Rust**: 核心语言,提供高性能和内存安全。
-   **Tokio**: 用于处理并发的异步运行时。
-   **Async Scoped**: 支持生成非 `'static` 的 Future,允许对借用数据进行高效的零拷贝处理。
-   **Fred**: 一个高性能、底层的 Rust Redis 客户端。
-   **ThisError**: 一个用于派生 `Error` 实现的库,简化错误处理。

## 目录结构

-   `src/lib.rs`: 库的主入口文件。它导出公共 API,包括 `Parse` trait 和核心结构体,如 `Conf`、`ReadGroup` 和 `StreamItem`。
-   `src/conf.rs`: 定义 `Conf` 结构体,用于集中管理所有配置参数。
-   `src/read_group.rs`: 在 `ReadGroup` 结构体及其 `run` 方法中实现核心的消费者逻辑。
-   `src/auto_new.rs`: 提供 `auto_new` 函数,用于自动创建 Stream 消费组。
-   `src/parse_stream.rs`: 包含用于解析 `XREADGROUP` 和 `XAUTOCLAIM` 响应的工具。
-   `src/rm_id_li.rs`: 一个辅助函数,用于 `XACK`(确认)和 `XDEL`(删除)已处理的消息。
-   `src/error.rs`: 定义应用程序的自定义错误类型。
-   `tests/`: 集成测试,展示了库的使用模式。

## API 参考

### `Conf`

一个用于保存 `ReadGroup` 消费者所有配置的结构体。

-   `stream`: Stream 的 Redis 键名。
-   `group`: 消费组的名称。
-   `consumer`: 当前消费者的唯一名称。
-   `block_ms`: 阻塞等待新消息的毫秒数。
-   `claim_idle_ms`: 一条待处理消息在被认领前可以空闲的毫秒数。超过此时限,消息可被其他消费者认领。
-   `count`: 单个批次中获取的最大消息数。
-   `max_retry`: 一条消息在被移交至 `on_error` 处理器前将尝试重试的最大次数。

### `Parse` Trait

一个定义了消息处理应用逻辑的 trait。你必须实现此 trait。

-   `run(&self, kv: &Kv, retry: u64) -> impl Future<Output = aok::Result<Option<Kv>>> + Send`:
    -   用于处理单条消息的异步方法。
    -   `kv`: 消息数据,类型为 `Vec<(Bytes, Bytes)>`。
    -   `retry`: 当前消息的重试次数(投递计数)。
    -   成功处理时返回 `Ok(None)`。
    -   返回 `Ok(Some(new_kv))` 向队列添加新消息(用于工作流链式处理)。
    -   返回 `Err(...)` 稍后重试该消息。
-   `on_error(&self, kv: Kv, error: String) -> impl Future<Output = aok::Void> + Send`:
    -   当一条消息的失败次数超过 `max_retry` 时调用的异步方法。
    -   `kv`: 失败的消息数据。
    -   `error`: 导致失败的最后一条错误信息。

### `ReadGroup`

主要的消费者结构体。

-   `ReadGroup::new(parse: P, conf: Conf)`: 创建一个新的 `ReadGroup` 实例。
-   `run(&self)`: 启动无限处理循环。

### `StreamItem`

表示来自 Redis Stream 的单条消息。

-   `id`: 消息唯一 ID。
-   `retry`: 投递次数(重试计数)。
-   `idle_ms`: 消息闲置时间(毫秒,若被认领)。
-   `kv`: 消息负载,为键值对字节向量。

## 历史背景

1983 年,26 岁的麻省理工学院毕业生 Vivek Ranadive 观察到,虽然硬件组件通过“总线”进行通信,但软件缺乏类似的标准机制。他设想了一种“软件总线”,应用程序可以在没有直接、僵化连接的情况下发布和订阅信息。这一想法促成了 **The Information Bus (TIB)** 的诞生,这是首款商业消息队列软件。TIB 取代了人工黑板,实现了实时数字流,彻底改变了金融交易大厅,允许不同的交易系统即时通信。这一创新为现代事件驱动架构以及我们今天所依赖的 Redis Stream 等消息队列系统奠定了基础。

---

## 关于

本项目为 [js0.site ⋅ 重构互联网计划](https://js0.site) 的开源组件。

我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注:

* [谷歌邮件列表](https://groups.google.com/g/js0-site)
* [js0site.bsky.social](https://bsky.app/profile/js0site.bsky.social)