pub trait Mailbox: Send + Sync {
// Required methods
fn enqueue<'life0, 'async_trait>(
&'life0 self,
from: Vec<u8>,
payload: Vec<u8>,
priority: MessagePriority,
) -> Pin<Box<dyn Future<Output = Result<Uuid, StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn dequeue<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<MessageRecord>, StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn ack<'life0, 'async_trait>(
&'life0 self,
message_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<MailboxStats, StorageError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
// Provided method
fn set_depth_observer(
&self,
_observer: Arc<dyn MailboxDepthObserver>,
) -> bool { ... }
}Expand description
Mailbox interface - defines core operations for message persistence
§Usage example: dequeue -> process -> ack loop
The dequeue method automatically retrieves the next batch of messages. Callers need not
worry about batch size; that detail is handled internally by the implementation.
use actr_runtime_mailbox::prelude::*;
use std::time::Duration;
async fn message_processor(mailbox: impl Mailbox) {
loop {
// 1. Retrieve the next batch of messages from the queue
match mailbox.dequeue().await {
Ok(messages) => {
if messages.is_empty() {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
// 2. Process messages one by one
for msg in messages {
println!("Processing message: {}", msg.id);
// ... execute your business logic here ...
// 3. After successful processing, acknowledge this message
if let Err(e) = mailbox.ack(msg.id).await {
eprintln!("Failed to ack message {}: {}", msg.id, e);
}
}
}
Err(e) => {
eprintln!("Failed to dequeue messages: {}", e);
tokio::time::sleep(Duration::from_secs(5)).await; // Database error, wait longer
}
}
}
}Required Methods§
Sourcefn enqueue<'life0, 'async_trait>(
&'life0 self,
from: Vec<u8>,
payload: Vec<u8>,
priority: MessagePriority,
) -> Pin<Box<dyn Future<Output = Result<Uuid, StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn enqueue<'life0, 'async_trait>(
&'life0 self,
from: Vec<u8>,
payload: Vec<u8>,
priority: MessagePriority,
) -> Pin<Box<dyn Future<Output = Result<Uuid, StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Enqueue a message.
§Arguments
from: Sender’s ActrId (Protobuf bytes, provided directly by Gateway, not unpacked)payload: Message content (raw bytes, not unpacked)priority: Message priority
Sourcefn dequeue<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<MessageRecord>, StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn dequeue<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<MessageRecord>, StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Dequeue a batch of messages from the queue.
This method automatically handles priority: as long as high-priority messages exist,
they are returned first. Dequeued messages are atomically marked as Inflight but
not deleted. You must call ack() after processing to permanently remove them.
Sourcefn ack<'life0, 'async_trait>(
&'life0 self,
message_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn ack<'life0, 'async_trait>(
&'life0 self,
message_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Acknowledge that a message has been successfully processed, permanently removing it from the queue.
Sourcefn status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<MailboxStats, StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<MailboxStats, StorageError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Get current mailbox statistics.
Provided Methods§
Sourcefn set_depth_observer(&self, _observer: Arc<dyn MailboxDepthObserver>) -> bool
fn set_depth_observer(&self, _observer: Arc<dyn MailboxDepthObserver>) -> bool
Install a MailboxDepthObserver that receives a
post-enqueue queued-message count on every enqueue.
Returns true if push-based depth notification is supported by
this backend and the observer has been installed. Returns
false when the backend cannot cheaply compute depth (e.g. a
remote store where COUNT(*) is expensive) and the caller must
fall back to polling Mailbox::status; the observer is then
left uninstalled.
The default implementation returns false, so existing mailbox
backends compile without change.
Dyn Compatibility§
This trait is dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety".