msgq : Robust Redis Stream Message Queue
Robust Redis Stream based message queue with auto-claim and retry handling.
Table of Contents
Features
- Redis Stream Based: Utilizes
XREADGROUPfor efficient message consumption. - Consumer Groups: Supports scalable message processing with consumer groups.
- Auto Group Creation: Automatically creates consumer groups if they don't exist.
- Reliability: Built-in
BLOCKandCLAIMmechanisms for handling idle messages. - Concurrency: Processes messages concurrently using
tokio::spawn. - Error Handling: Automatic retry mechanism (configurable limit) with error callbacks.
Usage
use Arc;
use ;
use info;
use ;
async
async
async
Design
The ReadGroup::run method executes a continuous loop that ensures robust message processing:
- Command Construction: Prepares an
XREADGROUPcommand withBLOCK(for waiting on new messages) andCLAIM(for recovering idle messages from other consumers) options. - Message Fetching: Executes the command against Redis to retrieve a batch of messages.
- Group Management: Checks if the consumer group exists; if not,
auto_newis called to create it automatically. - Concurrent Processing: Iterates through the fetched messages and spawns a
tokiotask for each message to execute the user-providedparsecallback. - Error Handling & Retry:
- Monitors the execution of each task.
- If a task fails, the error is logged.
- The system tracks retries; if a message fails more than
max_retrytimes, it is handed off to theon_errorcallback for manual intervention or logging.
- Cleanup: Successfully processed messages (or those handled by
on_error) are acknowledged and deleted from the stream usingrm_id_li(XACK+XDEL) to prevent reprocessing.
Tech Stack
- Rust: Core language for performance and safety.
- Tokio: Asynchronous runtime for handling concurrency.
- Fred: High-performance Redis client.
- xkv: Redis connection and command management wrapper.
Directory Structure
src/lib.rs: Library exports and type definitions.src/read_group.rs: CoreReadGroupstruct and consumer loop logic.src/auto_new.rs: Logic for automatically creating consumer groups if missing.src/rm_id_li.rs: Helper for acknowledging and deleting processed messages.src/parse_stream.rs: Utilities for parsing Redis Stream responses.tests/: Integration tests demonstrating usage patterns.
API Reference
ReadGroup
The main struct for configuring and running the consumer.
new(stream, group, consumer, block_sec, claim_idle_sec, count, max_retry): Creates a newReadGroupinstance.stream: Redis Key for the stream.group: Consumer group name.consumer: Consumer name.block_sec: Seconds to block waiting for new messages.claim_idle_sec: Seconds before claiming idle messages from other consumers.count: Maximum number of messages to fetch per batch.max_retry: Maximum number of retries before passing toon_error.
run(parse, on_error): Starts the infinite processing loop.parse: Async callback function to process each message.on_error: Async callback function to handle messages that failed retries.
StreamItem
Represents a single message from the Redis Stream.
id: The unique ID of the message.retry: The delivery count (number of times delivered).idle_ms: Time in milliseconds the message has been idle (if claimed).kv: The message payload as a vector of Key-Value byte pairs.
History
In 1983, Vivek Ranadive, a 26-year-old MIT graduate, observed that while hardware components communicated via a "bus", software lacked a similar standard mechanism. He envisioned a "software bus" where applications could publish and subscribe to information without direct, rigid connections. This idea led to the creation of The Information Bus (TIB), the first commercial message queue software. TIB revolutionised financial trading floors by replacing manual chalkboards with real-time digital streams, allowing different trading systems to communicate instantly. This innovation laid the groundwork for modern event-driven architectures and the message queue systems we rely on today, such as Redis Streams.
About
This project is an open-source component of js0.site ⋅ Refactoring the Internet Plan.
We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:
msgq : 基于 Redis Stream 的健壮消息队列
基于 Redis Stream 的健壮消息队列,支持自动认领和重试处理。
目录
功能特性
- 基于 Redis Stream: 利用
XREADGROUP高效消费消息。 - 消费组支持: 支持通过消费组进行扩展式消息处理。
- 自动创建组: 若消费组不存在,自动创建。
- 高可靠性: 内置
BLOCK和CLAIM机制处理闲置消息。 - 并发处理: 使用
tokio::spawn并发处理消息。 - 错误处理: 自动重试机制(可配置重试次数),并提供错误回调。
使用演示
use Arc;
use ;
use info;
use ;
async
async
async
设计思路
ReadGroup::run 方法执行一个持续循环,确保消息处理的健壮性:
- 命令构建: 构建带有
BLOCK(等待新消息)和CLAIM(认领其他消费者的闲置消息)选项的XREADGROUP命令。 - 消息获取: 向 Redis 执行命令以获取一批消息。
- 组管理: 检查消费组是否存在;若不存在,调用
auto_new自动创建。 - 并发处理: 遍历获取的消息,为每条消息生成一个
tokio异步任务,执行用户提供的parse回调。 - 错误处理与重试:
- 监控每个任务的执行。
- 若任务失败,记录错误日志。
- 系统跟踪重试次数;若消息失败超过
max_retry次,将其移交on_error回调进行人工干预或记录。
- 清理: 成功处理(或由
on_error处理)的消息将通过rm_id_li(XACK+XDEL)进行确认和删除,防止重复处理。
技术堆栈
- Rust: 核心语言,提供高性能和安全性。
- Tokio: 异步运行时,处理并发任务。
- Fred: 高性能 Redis 客户端。
- xkv: Redis 连接与命令管理封装。
目录结构
src/lib.rs: 库导出与类型定义。src/read_group.rs: 核心ReadGroup结构体与消费循环逻辑。src/auto_new.rs: 消费组自动创建逻辑。src/rm_id_li.rs: 消息确认与删除辅助工具。src/parse_stream.rs: Redis Stream 响应解析工具。tests/: 集成测试与使用示例。
API 参考
ReadGroup
配置和运行消费者的主要结构体。
new(stream, group, consumer, block_sec, claim_idle_sec, count, max_retry): 创建新的ReadGroup实例。stream: Redis Stream 键名。group: 消费组名称。consumer: 消费者名称。block_sec: 等待新消息的阻塞秒数。claim_idle_sec: 认领闲置消息前的等待秒数。count: 每批次获取的最大消息数。max_retry: 移交on_error前的最大重试次数。
run(parse, on_error): 启动无限处理循环。parse: 处理每条消息的异步回调函数。on_error: 处理重试失败消息的异步回调函数。
StreamItem
表示来自 Redis Stream 的单条消息。
id: 消息唯一 ID。retry: 投递次数(重试计数)。idle_ms: 消息闲置时间(毫秒,若被认领)。kv: 消息负载,为键值对字节向量。
历史背景
1983 年,26 岁的麻省理工学院毕业生 Vivek Ranadive 观察到,虽然硬件组件通过“总线”进行通信,但软件缺乏类似的标准机制。他设想了一种“软件总线”,应用程序可以在没有直接、僵化连接的情况下发布和订阅信息。这一想法促成了 The Information Bus (TIB) 的诞生,这是首款商业消息队列软件。TIB 取代了人工黑板,实现了实时数字流,彻底改变了金融交易大厅,允许不同的交易系统即时通信。这一创新为现代事件驱动架构以及我们今天所依赖的 Redis Stream 等消息队列系统奠定了基础。
关于
本项目为 js0.site ⋅ 重构互联网计划 的开源组件。
我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注: