msgq 0.1.7

Robust Redis Stream based message queue with auto-claim and retry handling / 基于 Redis Stream 的健壮消息队列,支持自动认领和重试处理
Documentation
[English](#en) | [中文](#zh)

---

<a id="en"></a>

# msgq : Robust Redis Stream Message Queue

Robust Redis Stream based message queue with auto-claim and retry handling.

## Table of Contents

- [Features](#features)
- [Usage](#usage)
- [Design](#design)
- [Tech Stack](#tech-stack)
- [Directory Structure](#directory-structure)
- [API Reference](#api-reference)
- [History](#history)

## Features

- **Redis Stream Based**: Utilizes `XREADGROUP` for efficient message consumption.
- **Consumer Groups**: Supports scalable message processing with consumer groups.
- **Auto Group Creation**: Automatically creates consumer groups if they don't exist.
- **Reliability**: Built-in `BLOCK` and `CLAIM` mechanisms for handling idle messages.
- **Concurrency**: Processes messages concurrently using `tokio::spawn`.
- **Error Handling**: Automatic retry mechanism (configurable limit) with error callbacks.

## Usage

```rust
use std::sync::Arc;
use aok::{OK, Void};
use log::info;
use msgq::{Kv, ReadGroup};

async fn on_error(mail: Arc<Kv>, error: String) -> Void {
  info!("on_error {error} {:?}", mail);
  OK
}

async fn print_mail(mail: Arc<Kv>) -> Void {
  info!("print_mail {:?}", mail);
  OK
}

#[tokio::main]
async fn main() -> Void {
  // Initialize Redis connection (example)
  // xboot::init().await?;

  // Create a ReadGroup
  // stream: "smtp", group: "send", consumer: "test"
  // block: 1s, claim_idle: 3s, count: 2, max_retry: 3
  let read_group = ReadGroup::new("smtp", "send", "test", 1, 3, 2, 3);

  // Run the consumer loop
  read_group.run(print_mail, on_error).await?;
  OK
}
```

## Design

The `ReadGroup::run` method executes a continuous loop that ensures robust message processing:

1.  **Command Construction**: Prepares an `XREADGROUP` command with `BLOCK` (for waiting on new messages) and `CLAIM` (for recovering idle messages from other consumers) options.
2.  **Message Fetching**: Executes the command against Redis to retrieve a batch of messages.
3.  **Group Management**: Checks if the consumer group exists; if not, `auto_new` is called to create it automatically.
4.  **Concurrent Processing**: Iterates through the fetched messages and spawns a `tokio` task for each message to execute the user-provided `parse` callback.
5.  **Error Handling & Retry**:
    -   Monitors the execution of each task.
    -   If a task fails, the error is logged.
    -   The system tracks retries; if a message fails more than `max_retry` times, it is handed off to the `on_error` callback for manual intervention or logging.
6.  **Cleanup**: Successfully processed messages (or those handled by `on_error`) are acknowledged and deleted from the stream using `rm_id_li` (`XACK` + `XDEL`) to prevent reprocessing.

## Tech Stack

-   **Rust**: Core language for performance and safety.
-   **Tokio**: Asynchronous runtime for handling concurrency.
-   **Fred**: High-performance Redis client.
-   **xkv**: Redis connection and command management wrapper.

## Directory Structure

-   `src/lib.rs`: Library exports and type definitions.
-   `src/read_group.rs`: Core `ReadGroup` struct and consumer loop logic.
-   `src/auto_new.rs`: Logic for automatically creating consumer groups if missing.
-   `src/rm_id_li.rs`: Helper for acknowledging and deleting processed messages.
-   `src/parse_stream.rs`: Utilities for parsing Redis Stream responses.
-   `tests/`: Integration tests demonstrating usage patterns.

## API Reference

### `ReadGroup`

The main struct for configuring and running the consumer.

-   `new(stream, group, consumer, block_sec, claim_idle_sec, count, max_retry)`: Creates a new `ReadGroup` instance.
    -   `stream`: Redis Key for the stream.
    -   `group`: Consumer group name.
    -   `consumer`: Consumer name.
    -   `block_sec`: Seconds to block waiting for new messages.
    -   `claim_idle_sec`: Seconds before claiming idle messages from other consumers.
    -   `count`: Maximum number of messages to fetch per batch.
    -   `max_retry`: Maximum number of retries before passing to `on_error`.
-   `run(parse, on_error)`: Starts the infinite processing loop.
    -   `parse`: Async callback function to process each message.
    -   `on_error`: Async callback function to handle messages that failed retries.

### `StreamItem`

Represents a single message from the Redis Stream.

-   `id`: The unique ID of the message.
-   `retry`: The delivery count (number of times delivered).
-   `idle_ms`: Time in milliseconds the message has been idle (if claimed).
-   `kv`: The message payload as a vector of Key-Value byte pairs.

## History

In 1983, Vivek Ranadive, a 26-year-old MIT graduate, observed that while hardware components communicated via a "bus", software lacked a similar standard mechanism. He envisioned a "software bus" where applications could publish and subscribe to information without direct, rigid connections. This idea led to the creation of **The Information Bus (TIB)**, the first commercial message queue software. TIB revolutionised financial trading floors by replacing manual chalkboards with real-time digital streams, allowing different trading systems to communicate instantly. This innovation laid the groundwork for modern event-driven architectures and the message queue systems we rely on today, such as Redis Streams.

---

## About

This project is an open-source component of [js0.site ⋅ Refactoring the Internet Plan](https://js0.site).

We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:

* [Google Group](https://groups.google.com/g/js0-site)
* [js0site.bsky.social](https://bsky.app/profile/js0site.bsky.social)

---

<a id="zh"></a>

# msgq : 基于 Redis Stream 的健壮消息队列

基于 Redis Stream 的健壮消息队列,支持自动认领和重试处理。

## 目录

- [功能特性](#功能特性)
- [使用演示](#使用演示)
- [设计思路](#设计思路)
- [技术堆栈](#技术堆栈)
- [目录结构](#目录结构)
- [API 参考](#api-参考)
- [历史背景](#历史背景)

## 功能特性

- **基于 Redis Stream**: 利用 `XREADGROUP` 高效消费消息。
- **消费组支持**: 支持通过消费组进行扩展式消息处理。
- **自动创建组**: 若消费组不存在,自动创建。
- **高可靠性**: 内置 `BLOCK` 和 `CLAIM` 机制处理闲置消息。
- **并发处理**: 使用 `tokio::spawn` 并发处理消息。
- **错误处理**: 自动重试机制(可配置重试次数),并提供错误回调。

## 使用演示

```rust
use std::sync::Arc;
use aok::{OK, Void};
use log::info;
use msgq::{Kv, ReadGroup};

async fn on_error(mail: Arc<Kv>, error: String) -> Void {
  info!("on_error {error} {:?}", mail);
  OK
}

async fn print_mail(mail: Arc<Kv>) -> Void {
  info!("print_mail {:?}", mail);
  OK
}

#[tokio::main]
async fn main() -> Void {
  // 初始化 Redis 连接 (示例)
  // xboot::init().await?;

  // 创建 ReadGroup
  // stream: "smtp", group: "send", consumer: "test"
  // block: 1s, claim_idle: 3s, count: 2, max_retry: 3
  let read_group = ReadGroup::new("smtp", "send", "test", 1, 3, 2, 3);

  // 运行消费循环
  read_group.run(print_mail, on_error).await?;
  OK
}
```

## 设计思路

`ReadGroup::run` 方法执行一个持续循环,确保消息处理的健壮性:

1.  **命令构建**: 构建带有 `BLOCK`(等待新消息)和 `CLAIM`(认领其他消费者的闲置消息)选项的 `XREADGROUP` 命令。
2.  **消息获取**: 向 Redis 执行命令以获取一批消息。
3.  **组管理**: 检查消费组是否存在;若不存在,调用 `auto_new` 自动创建。
4.  **并发处理**: 遍历获取的消息,为每条消息生成一个 `tokio` 异步任务,执行用户提供的 `parse` 回调。
5.  **错误处理与重试**:
    -   监控每个任务的执行。
    -   若任务失败,记录错误日志。
    -   系统跟踪重试次数;若消息失败超过 `max_retry` 次,将其移交 `on_error` 回调进行人工干预或记录。
6.  **清理**: 成功处理(或由 `on_error` 处理)的消息将通过 `rm_id_li`(`XACK` + `XDEL`)进行确认和删除,防止重复处理。

## 技术堆栈

-   **Rust**: 核心语言,提供高性能和安全性。
-   **Tokio**: 异步运行时,处理并发任务。
-   **Fred**: 高性能 Redis 客户端。
-   **xkv**: Redis 连接与命令管理封装。

## 目录结构

-   `src/lib.rs`: 库导出与类型定义。
-   `src/read_group.rs`: 核心 `ReadGroup` 结构体与消费循环逻辑。
-   `src/auto_new.rs`: 消费组自动创建逻辑。
-   `src/rm_id_li.rs`: 消息确认与删除辅助工具。
-   `src/parse_stream.rs`: Redis Stream 响应解析工具。
-   `tests/`: 集成测试与使用示例。

## API 参考

### `ReadGroup`

配置和运行消费者的主要结构体。

-   `new(stream, group, consumer, block_sec, claim_idle_sec, count, max_retry)`: 创建新的 `ReadGroup` 实例。
    -   `stream`: Redis Stream 键名。
    -   `group`: 消费组名称。
    -   `consumer`: 消费者名称。
    -   `block_sec`: 等待新消息的阻塞秒数。
    -   `claim_idle_sec`: 认领闲置消息前的等待秒数。
    -   `count`: 每批次获取的最大消息数。
    -   `max_retry`: 移交 `on_error` 前的最大重试次数。
-   `run(parse, on_error)`: 启动无限处理循环。
    -   `parse`: 处理每条消息的异步回调函数。
    -   `on_error`: 处理重试失败消息的异步回调函数。

### `StreamItem`

表示来自 Redis Stream 的单条消息。

-   `id`: 消息唯一 ID。
-   `retry`: 投递次数(重试计数)。
-   `idle_ms`: 消息闲置时间(毫秒,若被认领)。
-   `kv`: 消息负载,为键值对字节向量。

## 历史背景

1983 年,26 岁的麻省理工学院毕业生 Vivek Ranadive 观察到,虽然硬件组件通过“总线”进行通信,但软件缺乏类似的标准机制。他设想了一种“软件总线”,应用程序可以在没有直接、僵化连接的情况下发布和订阅信息。这一想法促成了 **The Information Bus (TIB)** 的诞生,这是首款商业消息队列软件。TIB 取代了人工黑板,实现了实时数字流,彻底改变了金融交易大厅,允许不同的交易系统即时通信。这一创新为现代事件驱动架构以及我们今天所依赖的 Redis Stream 等消息队列系统奠定了基础。

---

## 关于

本项目为 [js0.site ⋅ 重构互联网计划](https://js0.site) 的开源组件。

我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注:

* [谷歌邮件列表](https://groups.google.com/g/js0-site)
* [js0site.bsky.social](https://bsky.app/profile/js0site.bsky.social)