actr_mailbox/mailbox.rs
1//! # Actor Mailbox
2//!
3//! 本模块定义了持久化消息队列的核心接口和数据结构。
4//!
5//! ## 可靠队列工作流
6//!
7//! 本接口被设计为一个可靠队列,以防止消费者在处理消息时因崩溃而导致消息丢失。
8//! 工作流如下:
9//!
10//! 1. **`dequeue()`**: 消费者从队列中获取一批消息。这些消息在数据库中被原子性地标记为 `Inflight` (处理中),但**不会被删除**。
11//! 2. **处理消息**: 消费者在本地处理这些消息。
12//! 3. **`ack()`**: 当一条消息被成功处理后,消费者调用 `ack(message_id)`。这会**永久删除**该消息,标志着工作单元的成功完成。
13//!
14//! 如果消费者在 `dequeue` 之后、`ack` 之前崩溃,那些处于 `Inflight` 状态的消息会保留在数据库中。
15//! 下次消费者重启时,可以实现一个“清理”逻辑来重新处理这些“卡住”的消息。
16
17use crate::error::StorageResult;
18use async_trait::async_trait;
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23/// 消息优先级
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
25pub enum MessagePriority {
26 Normal,
27 High,
28}
29
30/// 从队列中取出的消息记录
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct MessageRecord {
33 /// 消息 ID
34 pub id: Uuid,
35 /// 消息发送方的 ActrId (Protobuf bytes)
36 ///
37 /// # 设计说明
38 /// - from 存储原始 Protobuf bytes,不反序列化为 ActrId 结构
39 /// - 避免 decode → ActrId → encode 的循环
40 /// - 只在真正需要使用时才反序列化一次
41 /// - Gateway 直接传递 bytes,零开销
42 /// - 所有进入 Mailbox 的消息都来自 WebRTC,必然有 sender
43 pub from: Vec<u8>,
44 /// 消息内容(raw bytes,不解包)
45 pub payload: Vec<u8>,
46 /// 优先级
47 pub priority: MessagePriority,
48 /// 创建时间
49 pub created_at: DateTime<Utc>,
50 /// 处理状态
51 pub status: MessageStatus,
52}
53
54/// 消息处理状态
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56pub enum MessageStatus {
57 Queued,
58 Inflight,
59}
60
61/// 邮箱的统计信息
62#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
63pub struct MailboxStats {
64 /// 在队列中等待处理的消息总数
65 pub queued_messages: u64,
66 /// 已出队但尚未被确认(ack)的消息总数
67 pub inflight_messages: u64,
68 /// 按优先级的排队消息数
69 pub queued_by_priority: std::collections::HashMap<MessagePriority, u64>,
70}
71
72/// 邮箱接口 - 定义消息持久化的核心操作
73///
74/// ## 使用示例: `dequeue -> process -> ack` 循环
75///
76/// `dequeue` 方法会自动获取下一批消息。调用者无需关心批量大小,这个细节由实现内部处理。
77///
78/// ```rust,no_run
79/// use actr_mailbox::prelude::*;
80/// use std::time::Duration;
81///
82/// async fn message_processor(mailbox: impl Mailbox) {
83/// loop {
84/// // 1. 从队列中获取下一批消息
85/// match mailbox.dequeue().await {
86/// Ok(messages) => {
87/// if messages.is_empty() {
88/// tokio::time::sleep(Duration::from_secs(1)).await;
89/// continue;
90/// }
91///
92/// // 2. 逐条处理消息
93/// for msg in messages {
94/// println!("Processing message: {}", msg.id);
95/// // ... 在这里执行你的业务逻辑 ...
96///
97/// // 3. 成功处理后,确认这一条消息
98/// if let Err(e) = mailbox.ack(msg.id).await {
99/// eprintln!("消息 {} 确认失败: {}", msg.id, e);
100/// }
101/// }
102/// }
103/// Err(e) => {
104/// eprintln!("从队列拉取消息失败: {}", e);
105/// tokio::time::sleep(Duration::from_secs(5)).await; // 数据库错误,等待更长时间
106/// }
107/// }
108/// }
109/// }
110/// ```
111#[async_trait]
112pub trait Mailbox: Send + Sync {
113 /// 将消息加入队列。
114 ///
115 /// # 参数
116 /// - `from`: 消息发送方 ActrId (Protobuf bytes,由 Gateway 直接提供,不解包)
117 /// - `payload`: 消息内容(raw bytes,不解包)
118 /// - `priority`: 消息优先级
119 async fn enqueue(
120 &self,
121 from: Vec<u8>,
122 payload: Vec<u8>,
123 priority: MessagePriority,
124 ) -> StorageResult<Uuid>;
125
126 /// 从队列中取出一批消息。
127 ///
128 /// 此方法将自动处理优先级:只要有高优先级消息,就会优先返回它们。
129 /// 取出的消息会被原子性地标记为 `Inflight` (处理中),但不会被删除。
130 /// 必须在处理完成后调用 `ack()` 来将其永久删除。
131 async fn dequeue(&self) -> StorageResult<Vec<MessageRecord>>;
132
133 /// 确认一条消息已成功处理,将其从队列中永久删除。
134 async fn ack(&self, message_id: Uuid) -> StorageResult<()>;
135
136 /// 获取当前邮箱的统计信息。
137 async fn status(&self) -> StorageResult<MailboxStats>;
138}