actr_mailbox/
mailbox.rs

1//! # Actor Mailbox
2//!
3//! 本模块定义了持久化消息队列的核心接口和数据结构。
4//!
5//! ## 可靠队列工作流
6//!
7//! 本接口被设计为一个可靠队列,以防止消费者在处理消息时因崩溃而导致消息丢失。
8//! 工作流如下:
9//!
10//! 1.  **`dequeue()`**: 消费者从队列中获取一批消息。这些消息在数据库中被原子性地标记为 `Inflight` (处理中),但**不会被删除**。
11//! 2.  **处理消息**: 消费者在本地处理这些消息。
12//! 3.  **`ack()`**: 当一条消息被成功处理后,消费者调用 `ack(message_id)`。这会**永久删除**该消息,标志着工作单元的成功完成。
13//!
14//! 如果消费者在 `dequeue` 之后、`ack` 之前崩溃,那些处于 `Inflight` 状态的消息会保留在数据库中。
15//! 下次消费者重启时,可以实现一个“清理”逻辑来重新处理这些“卡住”的消息。
16
17use crate::error::StorageResult;
18use async_trait::async_trait;
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23/// 消息优先级
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
25pub enum MessagePriority {
26    Normal,
27    High,
28}
29
30/// 从队列中取出的消息记录
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct MessageRecord {
33    /// 消息 ID
34    pub id: Uuid,
35    /// 消息发送方的 ActrId (Protobuf bytes)
36    ///
37    /// # 设计说明
38    /// - from 存储原始 Protobuf bytes,不反序列化为 ActrId 结构
39    /// - 避免 decode → ActrId → encode 的循环
40    /// - 只在真正需要使用时才反序列化一次
41    /// - Gateway 直接传递 bytes,零开销
42    /// - 所有进入 Mailbox 的消息都来自 WebRTC,必然有 sender
43    pub from: Vec<u8>,
44    /// 消息内容(raw bytes,不解包)
45    pub payload: Vec<u8>,
46    /// 优先级
47    pub priority: MessagePriority,
48    /// 创建时间
49    pub created_at: DateTime<Utc>,
50    /// 处理状态
51    pub status: MessageStatus,
52}
53
54/// 消息处理状态
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56pub enum MessageStatus {
57    Queued,
58    Inflight,
59}
60
61/// 邮箱的统计信息
62#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
63pub struct MailboxStats {
64    /// 在队列中等待处理的消息总数
65    pub queued_messages: u64,
66    /// 已出队但尚未被确认(ack)的消息总数
67    pub inflight_messages: u64,
68    /// 按优先级的排队消息数
69    pub queued_by_priority: std::collections::HashMap<MessagePriority, u64>,
70}
71
72/// 邮箱接口 - 定义消息持久化的核心操作
73///
74/// ## 使用示例: `dequeue -> process -> ack` 循环
75///
76/// `dequeue` 方法会自动获取下一批消息。调用者无需关心批量大小,这个细节由实现内部处理。
77///
78/// ```rust,no_run
79/// use actr_mailbox::prelude::*;
80/// use std::time::Duration;
81///
82/// async fn message_processor(mailbox: impl Mailbox) {
83///     loop {
84///         // 1. 从队列中获取下一批消息
85///         match mailbox.dequeue().await {
86///             Ok(messages) => {
87///                 if messages.is_empty() {
88///                     tokio::time::sleep(Duration::from_secs(1)).await;
89///                     continue;
90///                 }
91///
92///                 // 2. 逐条处理消息
93///                 for msg in messages {
94///                     println!("Processing message: {}", msg.id);
95///                     // ... 在这里执行你的业务逻辑 ...
96///
97///                     // 3. 成功处理后,确认这一条消息
98///                     if let Err(e) = mailbox.ack(msg.id).await {
99///                         eprintln!("消息 {} 确认失败: {}", msg.id, e);
100///                     }
101///                 }
102///             }
103///             Err(e) => {
104///                 eprintln!("从队列拉取消息失败: {}", e);
105///                 tokio::time::sleep(Duration::from_secs(5)).await; // 数据库错误,等待更长时间
106///             }
107///         }
108///     }
109/// }
110/// ```
111#[async_trait]
112pub trait Mailbox: Send + Sync {
113    /// 将消息加入队列。
114    ///
115    /// # 参数
116    /// - `from`: 消息发送方 ActrId (Protobuf bytes,由 Gateway 直接提供,不解包)
117    /// - `payload`: 消息内容(raw bytes,不解包)
118    /// - `priority`: 消息优先级
119    async fn enqueue(
120        &self,
121        from: Vec<u8>,
122        payload: Vec<u8>,
123        priority: MessagePriority,
124    ) -> StorageResult<Uuid>;
125
126    /// 从队列中取出一批消息。
127    ///
128    /// 此方法将自动处理优先级:只要有高优先级消息,就会优先返回它们。
129    /// 取出的消息会被原子性地标记为 `Inflight` (处理中),但不会被删除。
130    /// 必须在处理完成后调用 `ack()` 来将其永久删除。
131    async fn dequeue(&self) -> StorageResult<Vec<MessageRecord>>;
132
133    /// 确认一条消息已成功处理,将其从队列中永久删除。
134    async fn ack(&self, message_id: Uuid) -> StorageResult<()>;
135
136    /// 获取当前邮箱的统计信息。
137    async fn status(&self) -> StorageResult<MailboxStats>;
138}