[English](#en) | [中文](#zh)
---
<a id="en"></a>
# msgq : Robust Redis Stream Message Queue
Robust Redis Stream based message queue with auto-claim and retry handling.
## Table of Contents
- [Features](#features)
- [Usage](#usage)
- [Design](#design)
- [Tech Stack](#tech-stack)
- [Directory Structure](#directory-structure)
- [API Reference](#api-reference)
- [History](#history)
## Features
- **Redis Stream Based**: Utilizes `XREADGROUP` for efficient message consumption.
- **Consumer Groups**: Supports scalable message processing with consumer groups.
- **Auto Group Creation**: Automatically creates consumer groups if they don't exist.
- **Reliability**: Built-in `BLOCK` and `CLAIM` mechanisms for handling idle messages.
- **Concurrency**: Processes messages concurrently using `tokio::spawn`.
- **Error Handling**: Automatic retry mechanism (configurable limit) with error callbacks.
## Usage
```rust
use std::sync::Arc;
use aok::{OK, Void};
use log::info;
use msgq::{Kv, ReadGroup};
async fn on_error(mail: Arc<Kv>, error: String) -> Void {
info!("on_error {error} {:?}", mail);
OK
}
async fn print_mail(mail: Arc<Kv>) -> Void {
info!("print_mail {:?}", mail);
OK
}
#[tokio::main]
async fn main() -> Void {
// Initialize Redis connection (example)
// xboot::init().await?;
// Create a ReadGroup
// stream: "smtp", group: "send", consumer: "test"
// block: 1s, claim_idle: 3s, count: 2, max_retry: 3
let read_group = ReadGroup::new("smtp", "send", "test", 1, 3, 2, 3);
// Run the consumer loop
read_group.run(print_mail, on_error).await?;
OK
}
```
## Design
The `ReadGroup::run` method executes a continuous loop that ensures robust message processing:
1. **Command Construction**: Prepares an `XREADGROUP` command with `BLOCK` (for waiting on new messages) and `CLAIM` (for recovering idle messages from other consumers) options.
2. **Message Fetching**: Executes the command against Redis to retrieve a batch of messages.
3. **Group Management**: Checks if the consumer group exists; if not, `auto_new` is called to create it automatically.
4. **Concurrent Processing**: Iterates through the fetched messages and spawns a `tokio` task for each message to execute the user-provided `parse` callback.
5. **Error Handling & Retry**:
- Monitors the execution of each task.
- If a task fails, the error is logged.
- The system tracks retries; if a message fails more than `max_retry` times, it is handed off to the `on_error` callback for manual intervention or logging.
6. **Cleanup**: Successfully processed messages (or those handled by `on_error`) are acknowledged and deleted from the stream using `rm_id_li` (`XACK` + `XDEL`) to prevent reprocessing.
## Tech Stack
- **Rust**: Core language for performance and safety.
- **Tokio**: Asynchronous runtime for handling concurrency.
- **Fred**: High-performance Redis client.
- **xkv**: Redis connection and command management wrapper.
## Directory Structure
- `src/lib.rs`: Library exports and type definitions.
- `src/read_group.rs`: Core `ReadGroup` struct and consumer loop logic.
- `src/auto_new.rs`: Logic for automatically creating consumer groups if missing.
- `src/rm_id_li.rs`: Helper for acknowledging and deleting processed messages.
- `src/parse_stream.rs`: Utilities for parsing Redis Stream responses.
- `tests/`: Integration tests demonstrating usage patterns.
## API Reference
### `ReadGroup`
The main struct for configuring and running the consumer.
- `new(stream, group, consumer, block_sec, claim_idle_sec, count, max_retry)`: Creates a new `ReadGroup` instance.
- `stream`: Redis Key for the stream.
- `group`: Consumer group name.
- `consumer`: Consumer name.
- `block_sec`: Seconds to block waiting for new messages.
- `claim_idle_sec`: Seconds before claiming idle messages from other consumers.
- `count`: Maximum number of messages to fetch per batch.
- `max_retry`: Maximum number of retries before passing to `on_error`.
- `run(parse, on_error)`: Starts the infinite processing loop.
- `parse`: Async callback function to process each message.
- `on_error`: Async callback function to handle messages that failed retries.
### `StreamItem`
Represents a single message from the Redis Stream.
- `id`: The unique ID of the message.
- `retry`: The delivery count (number of times delivered).
- `idle_ms`: Time in milliseconds the message has been idle (if claimed).
- `kv`: The message payload as a vector of Key-Value byte pairs.
## History
In 1983, Vivek Ranadive, a 26-year-old MIT graduate, observed that while hardware components communicated via a "bus", software lacked a similar standard mechanism. He envisioned a "software bus" where applications could publish and subscribe to information without direct, rigid connections. This idea led to the creation of **The Information Bus (TIB)**, the first commercial message queue software. TIB revolutionised financial trading floors by replacing manual chalkboards with real-time digital streams, allowing different trading systems to communicate instantly. This innovation laid the groundwork for modern event-driven architectures and the message queue systems we rely on today, such as Redis Streams.
---
## About
This project is an open-source component of [js0.site ⋅ Refactoring the Internet Plan](https://js0.site).
We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:
* [Google Group](https://groups.google.com/g/js0-site)
* [js0site.bsky.social](https://bsky.app/profile/js0site.bsky.social)
---
<a id="zh"></a>
# msgq : 基于 Redis Stream 的健壮消息队列
基于 Redis Stream 的健壮消息队列,支持自动认领和重试处理。
## 目录
- [功能特性](#功能特性)
- [使用演示](#使用演示)
- [设计思路](#设计思路)
- [技术堆栈](#技术堆栈)
- [目录结构](#目录结构)
- [API 参考](#api-参考)
- [历史背景](#历史背景)
## 功能特性
- **基于 Redis Stream**: 利用 `XREADGROUP` 高效消费消息。
- **消费组支持**: 支持通过消费组进行扩展式消息处理。
- **自动创建组**: 若消费组不存在,自动创建。
- **高可靠性**: 内置 `BLOCK` 和 `CLAIM` 机制处理闲置消息。
- **并发处理**: 使用 `tokio::spawn` 并发处理消息。
- **错误处理**: 自动重试机制(可配置重试次数),并提供错误回调。
## 使用演示
```rust
use std::sync::Arc;
use aok::{OK, Void};
use log::info;
use msgq::{Kv, ReadGroup};
async fn on_error(mail: Arc<Kv>, error: String) -> Void {
info!("on_error {error} {:?}", mail);
OK
}
async fn print_mail(mail: Arc<Kv>) -> Void {
info!("print_mail {:?}", mail);
OK
}
#[tokio::main]
async fn main() -> Void {
// 初始化 Redis 连接 (示例)
// xboot::init().await?;
// 创建 ReadGroup
// stream: "smtp", group: "send", consumer: "test"
// block: 1s, claim_idle: 3s, count: 2, max_retry: 3
let read_group = ReadGroup::new("smtp", "send", "test", 1, 3, 2, 3);
// 运行消费循环
read_group.run(print_mail, on_error).await?;
OK
}
```
## 设计思路
`ReadGroup::run` 方法执行一个持续循环,确保消息处理的健壮性:
1. **命令构建**: 构建带有 `BLOCK`(等待新消息)和 `CLAIM`(认领其他消费者的闲置消息)选项的 `XREADGROUP` 命令。
2. **消息获取**: 向 Redis 执行命令以获取一批消息。
3. **组管理**: 检查消费组是否存在;若不存在,调用 `auto_new` 自动创建。
4. **并发处理**: 遍历获取的消息,为每条消息生成一个 `tokio` 异步任务,执行用户提供的 `parse` 回调。
5. **错误处理与重试**:
- 监控每个任务的执行。
- 若任务失败,记录错误日志。
- 系统跟踪重试次数;若消息失败超过 `max_retry` 次,将其移交 `on_error` 回调进行人工干预或记录。
6. **清理**: 成功处理(或由 `on_error` 处理)的消息将通过 `rm_id_li`(`XACK` + `XDEL`)进行确认和删除,防止重复处理。
## 技术堆栈
- **Rust**: 核心语言,提供高性能和安全性。
- **Tokio**: 异步运行时,处理并发任务。
- **Fred**: 高性能 Redis 客户端。
- **xkv**: Redis 连接与命令管理封装。
## 目录结构
- `src/lib.rs`: 库导出与类型定义。
- `src/read_group.rs`: 核心 `ReadGroup` 结构体与消费循环逻辑。
- `src/auto_new.rs`: 消费组自动创建逻辑。
- `src/rm_id_li.rs`: 消息确认与删除辅助工具。
- `src/parse_stream.rs`: Redis Stream 响应解析工具。
- `tests/`: 集成测试与使用示例。
## API 参考
### `ReadGroup`
配置和运行消费者的主要结构体。
- `new(stream, group, consumer, block_sec, claim_idle_sec, count, max_retry)`: 创建新的 `ReadGroup` 实例。
- `stream`: Redis Stream 键名。
- `group`: 消费组名称。
- `consumer`: 消费者名称。
- `block_sec`: 等待新消息的阻塞秒数。
- `claim_idle_sec`: 认领闲置消息前的等待秒数。
- `count`: 每批次获取的最大消息数。
- `max_retry`: 移交 `on_error` 前的最大重试次数。
- `run(parse, on_error)`: 启动无限处理循环。
- `parse`: 处理每条消息的异步回调函数。
- `on_error`: 处理重试失败消息的异步回调函数。
### `StreamItem`
表示来自 Redis Stream 的单条消息。
- `id`: 消息唯一 ID。
- `retry`: 投递次数(重试计数)。
- `idle_ms`: 消息闲置时间(毫秒,若被认领)。
- `kv`: 消息负载,为键值对字节向量。
## 历史背景
1983 年,26 岁的麻省理工学院毕业生 Vivek Ranadive 观察到,虽然硬件组件通过“总线”进行通信,但软件缺乏类似的标准机制。他设想了一种“软件总线”,应用程序可以在没有直接、僵化连接的情况下发布和订阅信息。这一想法促成了 **The Information Bus (TIB)** 的诞生,这是首款商业消息队列软件。TIB 取代了人工黑板,实现了实时数字流,彻底改变了金融交易大厅,允许不同的交易系统即时通信。这一创新为现代事件驱动架构以及我们今天所依赖的 Redis Stream 等消息队列系统奠定了基础。
---
## 关于
本项目为 [js0.site ⋅ 重构互联网计划](https://js0.site) 的开源组件。
我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注:
* [谷歌邮件列表](https://groups.google.com/g/js0-site)
* [js0site.bsky.social](https://bsky.app/profile/js0site.bsky.social)