pub trait Mailbox: Send + Sync {
// Required methods
fn enqueue<'life0, 'async_trait>(
&'life0 self,
from: Vec<u8>,
payload: Vec<u8>,
priority: MessagePriority,
) -> Pin<Box<dyn Future<Output = Result<Uuid, StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn dequeue<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<MessageRecord>, StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn ack<'life0, 'async_trait>(
&'life0 self,
message_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<MailboxStats, StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
}Expand description
邮箱接口 - 定义消息持久化的核心操作
§使用示例: dequeue -> process -> ack 循环
dequeue 方法会自动获取下一批消息。调用者无需关心批量大小,这个细节由实现内部处理。
use actr_mailbox::prelude::*;
use std::time::Duration;
async fn message_processor(mailbox: impl Mailbox) {
loop {
// 1. 从队列中获取下一批消息
match mailbox.dequeue().await {
Ok(messages) => {
if messages.is_empty() {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
// 2. 逐条处理消息
for msg in messages {
println!("Processing message: {}", msg.id);
// ... 在这里执行你的业务逻辑 ...
// 3. 成功处理后,确认这一条消息
if let Err(e) = mailbox.ack(msg.id).await {
eprintln!("消息 {} 确认失败: {}", msg.id, e);
}
}
}
Err(e) => {
eprintln!("从队列拉取消息失败: {}", e);
tokio::time::sleep(Duration::from_secs(5)).await; // 数据库错误,等待更长时间
}
}
}
}Required Methods§
Sourcefn enqueue<'life0, 'async_trait>(
&'life0 self,
from: Vec<u8>,
payload: Vec<u8>,
priority: MessagePriority,
) -> Pin<Box<dyn Future<Output = Result<Uuid, StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn enqueue<'life0, 'async_trait>(
&'life0 self,
from: Vec<u8>,
payload: Vec<u8>,
priority: MessagePriority,
) -> Pin<Box<dyn Future<Output = Result<Uuid, StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
将消息加入队列。
§参数
from: 消息发送方 ActrId (Protobuf bytes,由 Gateway 直接提供,不解包)payload: 消息内容(raw bytes,不解包)priority: 消息优先级
Sourcefn dequeue<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<MessageRecord>, StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn dequeue<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<MessageRecord>, StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
从队列中取出一批消息。
此方法将自动处理优先级:只要有高优先级消息,就会优先返回它们。
取出的消息会被原子性地标记为 Inflight (处理中),但不会被删除。
必须在处理完成后调用 ack() 来将其永久删除。
Sourcefn ack<'life0, 'async_trait>(
&'life0 self,
message_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn ack<'life0, 'async_trait>(
&'life0 self,
message_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
确认一条消息已成功处理,将其从队列中永久删除。
Sourcefn status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<MailboxStats, StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<MailboxStats, StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
获取当前邮箱的统计信息。