Skip to main content

Mailbox

Trait Mailbox 

Source
pub trait Mailbox: Send + Sync {
    // Required methods
    fn enqueue<'life0, 'async_trait>(
        &'life0 self,
        from: Vec<u8>,
        payload: Vec<u8>,
        priority: MessagePriority,
    ) -> Pin<Box<dyn Future<Output = Result<Uuid, StorageError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn dequeue<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<Vec<MessageRecord>, StorageError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn ack<'life0, 'async_trait>(
        &'life0 self,
        message_id: Uuid,
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn status<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<MailboxStats, StorageError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;

    // Provided method
    fn set_depth_observer(
        &self,
        _observer: Arc<dyn MailboxDepthObserver>,
    ) -> bool { ... }
}
Expand description

Mailbox interface - defines core operations for message persistence

§Usage example: dequeue -> process -> ack loop

The dequeue method automatically retrieves the next batch of messages. Callers need not worry about batch size; that detail is handled internally by the implementation.

use actr_runtime_mailbox::prelude::*;
use std::time::Duration;

async fn message_processor(mailbox: impl Mailbox) {
    loop {
        // 1. Retrieve the next batch of messages from the queue
        match mailbox.dequeue().await {
            Ok(messages) => {
                if messages.is_empty() {
                    tokio::time::sleep(Duration::from_secs(1)).await;
                    continue;
                }

                // 2. Process messages one by one
                for msg in messages {
                    println!("Processing message: {}", msg.id);
                    // ... execute your business logic here ...

                    // 3. After successful processing, acknowledge this message
                    if let Err(e) = mailbox.ack(msg.id).await {
                        eprintln!("Failed to ack message {}: {}", msg.id, e);
                    }
                }
            }
            Err(e) => {
                eprintln!("Failed to dequeue messages: {}", e);
                tokio::time::sleep(Duration::from_secs(5)).await; // Database error, wait longer
            }
        }
    }
}

Required Methods§

Source

fn enqueue<'life0, 'async_trait>( &'life0 self, from: Vec<u8>, payload: Vec<u8>, priority: MessagePriority, ) -> Pin<Box<dyn Future<Output = Result<Uuid, StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Enqueue a message.

§Arguments
  • from: Sender’s ActrId (Protobuf bytes, provided directly by Gateway, not unpacked)
  • payload: Message content (raw bytes, not unpacked)
  • priority: Message priority
Source

fn dequeue<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Vec<MessageRecord>, StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Dequeue a batch of messages from the queue.

This method automatically handles priority: as long as high-priority messages exist, they are returned first. Dequeued messages are atomically marked as Inflight but not deleted. You must call ack() after processing to permanently remove them.

Source

fn ack<'life0, 'async_trait>( &'life0 self, message_id: Uuid, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Acknowledge that a message has been successfully processed, permanently removing it from the queue.

Source

fn status<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<MailboxStats, StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Get current mailbox statistics.

Provided Methods§

Source

fn set_depth_observer(&self, _observer: Arc<dyn MailboxDepthObserver>) -> bool

Install a MailboxDepthObserver that receives a post-enqueue queued-message count on every enqueue.

Returns true if push-based depth notification is supported by this backend and the observer has been installed. Returns false when the backend cannot cheaply compute depth (e.g. a remote store where COUNT(*) is expensive) and the caller must fall back to polling Mailbox::status; the observer is then left uninstalled.

The default implementation returns false, so existing mailbox backends compile without change.

Dyn Compatibility§

This trait is dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety".

Implementors§