Mailbox

Trait Mailbox 

Source
pub trait Mailbox: Send + Sync {
    // Required methods
    fn enqueue<'life0, 'async_trait>(
        &'life0 self,
        from: Vec<u8>,
        payload: Vec<u8>,
        priority: MessagePriority,
    ) -> Pin<Box<dyn Future<Output = Result<Uuid, StorageError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn dequeue<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<Vec<MessageRecord>, StorageError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn ack<'life0, 'async_trait>(
        &'life0 self,
        message_id: Uuid,
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn status<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<MailboxStats, StorageError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
}
Expand description

邮箱接口 - 定义消息持久化的核心操作

§使用示例: dequeue -> process -> ack 循环

dequeue 方法会自动获取下一批消息。调用者无需关心批量大小,这个细节由实现内部处理。

use actr_mailbox::prelude::*;
use std::time::Duration;

async fn message_processor(mailbox: impl Mailbox) {
    loop {
        // 1. 从队列中获取下一批消息
        match mailbox.dequeue().await {
            Ok(messages) => {
                if messages.is_empty() {
                    tokio::time::sleep(Duration::from_secs(1)).await;
                    continue;
                }

                // 2. 逐条处理消息
                for msg in messages {
                    println!("Processing message: {}", msg.id);
                    // ... 在这里执行你的业务逻辑 ...

                    // 3. 成功处理后,确认这一条消息
                    if let Err(e) = mailbox.ack(msg.id).await {
                        eprintln!("消息 {} 确认失败: {}", msg.id, e);
                    }
                }
            }
            Err(e) => {
                eprintln!("从队列拉取消息失败: {}", e);
                tokio::time::sleep(Duration::from_secs(5)).await; // 数据库错误,等待更长时间
            }
        }
    }
}

Required Methods§

Source

fn enqueue<'life0, 'async_trait>( &'life0 self, from: Vec<u8>, payload: Vec<u8>, priority: MessagePriority, ) -> Pin<Box<dyn Future<Output = Result<Uuid, StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

将消息加入队列。

§参数
  • from: 消息发送方 ActrId (Protobuf bytes,由 Gateway 直接提供,不解包)
  • payload: 消息内容(raw bytes,不解包)
  • priority: 消息优先级
Source

fn dequeue<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Vec<MessageRecord>, StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

从队列中取出一批消息。

此方法将自动处理优先级:只要有高优先级消息,就会优先返回它们。 取出的消息会被原子性地标记为 Inflight (处理中),但不会被删除。 必须在处理完成后调用 ack() 来将其永久删除。

Source

fn ack<'life0, 'async_trait>( &'life0 self, message_id: Uuid, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

确认一条消息已成功处理,将其从队列中永久删除。

Source

fn status<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<MailboxStats, StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

获取当前邮箱的统计信息。

Implementors§