msgq 0.1.1

Robust Redis Stream based message queue with auto-claim and retry handling / 基于 Redis Stream 的健壮消息队列,支持自动认领和重试处理
Documentation

English | 中文


msgq : Robust Redis Stream Message Queue

中文

Robust Redis Stream based message queue with auto-claim and retry handling.

Features

  • Redis Stream Based: Utilizes XREADGROUP for efficient message consumption.
  • Consumer Groups: Supports scalable message processing with consumer groups.
  • Auto Group Creation: Automatically creates consumer groups if they don't exist.
  • Reliability: Built-in BLOCK and CLAIM mechanisms for handling idle messages.
  • Concurrency: Processes messages concurrently using tokio::spawn.
  • Error Handling: Automatic retry mechanism (up to 3 times) with error callbacks.

Usage

use std::sync::Arc;
use aok::{OK, Void};
use log::info;
use msgq::{Kv, ReadGroup};

async fn on_error(mail: Arc<Kv>, error: String) -> Void {
  info!("on_error {error} {:?}", mail);
  OK
}

async fn print_mail(mail: Arc<Kv>) -> Void {
  info!("print_mail {:?}", mail);
  OK
}

#[tokio::main]
async fn main() -> Void {
  // Initialize Redis connection (example)
  // xboot::init().await?;

  // Create a ReadGroup
  // stream: "smtp", group: "send", consumer: "test"
  // block: 1s, claim_idle: 3s, count: 2
  let read_group = ReadGroup::new("smtp", "send", "test", 1, 3, 2);

  // Run the consumer loop
  read_group.run(print_mail, on_error).await?;
  OK
}

Design

The ReadGroup::run method executes a continuous loop that:

  1. Constructs an XREADGROUP command with BLOCK and CLAIM options.
  2. Fetches messages from the specified Redis Stream.
  3. If the consumer group is missing, it calls auto_new to create it.
  4. Spawns asynchronous tasks to process each message using the provided callback.
  5. Monitors task execution:
    • If a task fails, it logs the error.
    • If retries exceed 3, the message is moved to the error handler.
    • Failed messages are acknowledged and deleted (XACK + XDEL) via rm_err_id_li.

Tech Stack

  • Rust: Core language.
  • Tokio: Asynchronous runtime.
  • Fred: Redis client.
  • xkv: Redis connection management.

Directory Structure

  • src/lib.rs: Library exports.
  • src/read_group.rs: Core consumer logic.
  • src/auto_new.rs: Consumer group management.
  • src/rm_err_id_li.rs: Error cleanup logic.
  • tests/: Integration tests and examples.

API Reference

ReadGroup

Main struct for configuring and running the consumer.

  • new(stream, group, consumer, block_sec, claim_idle_sec, count): Creates a new instance.
  • run(parse, on_error): Starts the processing loop.

StreamItem

Represents a message from the stream.

  • id: Message ID.
  • kv: Key-Value pairs (payload).

Trivia

Redis Streams, introduced in Redis 5.0, brought a log data structure to Redis, inspired by Kafka. It allows for consumer groups, enabling distributed message processing where multiple consumers can read from the same stream without processing the same message twice. XREADGROUP is the primary command for this pattern, facilitating robust and scalable message queue architectures directly within Redis.


About

This project is an open-source component of js0.site ⋅ Refactoring the Internet Plan.

We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:


msgq : 基于 Redis Stream 的健壮消息队列

English

基于 Redis Stream 的健壮消息队列,支持自动认领和重试处理。

功能特性

  • 基于 Redis Stream: 利用 XREADGROUP 高效消费消息。
  • 消费组支持: 支持通过消费组进行扩展式消息处理。
  • 自动创建组: 如果消费组不存在,自动创建。
  • 高可靠性: 内置 BLOCKCLAIM 机制处理闲置消息。
  • 并发处理: 使用 tokio::spawn 并发处理消息。
  • 错误处理: 自动重试机制(最多 3 次),并提供错误回调。

使用演示

use std::sync::Arc;
use aok::{OK, Void};
use log::info;
use msgq::{Kv, ReadGroup};

async fn on_error(mail: Arc<Kv>, error: String) -> Void {
  info!("on_error {error} {:?}", mail);
  OK
}

async fn print_mail(mail: Arc<Kv>) -> Void {
  info!("print_mail {:?}", mail);
  OK
}

#[tokio::main]
async fn main() -> Void {
  // 初始化 Redis 连接 (示例)
  // xboot::init().await?;

  // 创建 ReadGroup
  // stream: "smtp", group: "send", consumer: "test"
  // block: 1s, claim_idle: 3s, count: 2
  let read_group = ReadGroup::new("smtp", "send", "test", 1, 3, 2);

  // 运行消费循环
  read_group.run(print_mail, on_error).await?;
  OK
}

设计思路

ReadGroup::run 方法执行一个持续循环:

  1. 构建带有 BLOCKCLAIM 选项的 XREADGROUP 命令。
  2. 从指定的 Redis Stream 获取消息。
  3. 如果消费组缺失,调用 auto_new 自动创建。
  4. 使用提供的回调函数生成异步任务处理每条消息。
  5. 监控任务执行:
    • 如果任务失败,记录错误。
    • 如果重试超过 3 次,将消息移交错误处理程序。
    • 通过 rm_err_id_li 确认并删除(XACK + XDEL)失败消息。

技术堆栈

  • Rust: 核心语言。
  • Tokio: 异步运行时。
  • Fred: Redis 客户端。
  • xkv: Redis 连接管理。

目录结构

  • src/lib.rs: 库导出。
  • src/read_group.rs: 核心消费逻辑。
  • src/auto_new.rs: 消费组管理。
  • src/rm_err_id_li.rs: 错误清理逻辑。
  • tests/: 集成测试和示例。

API 参考

ReadGroup

配置和运行消费者的主要结构体。

  • new(stream, group, consumer, block_sec, claim_idle_sec, count): 创建新实例。
  • run(parse, on_error): 启动处理循环。

StreamItem

表示来自流的消息。

  • id: 消息 ID。
  • kv: 键值对(负载)。

历史背景

Redis Streams 在 Redis 5.0 中引入,为 Redis 带来了日志数据结构,灵感来源于 Kafka。它支持消费组,允许分布式消息处理,多个消费者可以从同一个流中读取,而不会重复处理同一条消息。XREADGROUP 是此模式的主要命令,直接在 Redis 内部促进了健壮且可扩展的消息队列架构。


关于

本项目为 js0.site ⋅ 重构互联网计划 的开源组件。

我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注: