Skip to main content

actr_runtime_mailbox/
mailbox.rs

1//! # Actor Mailbox
2//!
3//! This module defines the core interfaces and data structures for persistent message queues.
4//!
5//! ## Reliable Queue Workflow
6//!
7//! This interface is designed as a reliable queue to prevent message loss when a consumer
8//! crashes during message processing. The workflow is as follows:
9//!
10//! 1.  **`dequeue()`**: The consumer retrieves a batch of messages from the queue. These messages
11//!     are atomically marked as `Inflight` in the database, but are **not deleted**.
12//! 2.  **Process messages**: The consumer processes these messages locally.
13//! 3.  **`ack()`**: When a message has been successfully processed, the consumer calls
14//!     `ack(message_id)`. This **permanently deletes** the message, marking the successful
15//!     completion of the work unit.
16//!
17//! If the consumer crashes after `dequeue` but before `ack`, those `Inflight` messages remain
18//! in the database. On the next consumer restart, a "cleanup" routine can be implemented to
19//! reprocess these "stuck" messages.
20
21use crate::error::StorageResult;
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use serde::{Deserialize, Serialize};
25use std::sync::Arc;
26use uuid::Uuid;
27
28/// Message priority
29#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
30pub enum MessagePriority {
31    Normal,
32    High,
33}
34
35/// Message record retrieved from the queue
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct MessageRecord {
38    /// Message ID
39    pub id: Uuid,
40    /// Sender's ActrId (Protobuf bytes)
41    ///
42    /// # Design notes
43    /// - `from` stores raw Protobuf bytes without deserializing into ActrId struct
44    /// - Avoids the decode -> ActrId -> encode round-trip
45    /// - Only deserialize when actually needed
46    /// - Gateway passes bytes directly, zero overhead
47    /// - All messages entering the Mailbox originate from WebRTC and always have a sender
48    pub from: Vec<u8>,
49    /// Message content (raw bytes, not unpacked)
50    pub payload: Vec<u8>,
51    /// Priority
52    pub priority: MessagePriority,
53    /// Creation time
54    pub created_at: DateTime<Utc>,
55    /// Processing status
56    pub status: MessageStatus,
57}
58
59/// Message processing status
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
61pub enum MessageStatus {
62    Queued,
63    Inflight,
64}
65
66/// Mailbox statistics
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68pub struct MailboxStats {
69    /// Total number of messages waiting in the queue
70    pub queued_messages: u64,
71    /// Total number of dequeued but not yet acknowledged messages
72    pub inflight_messages: u64,
73    /// Queued message count by priority
74    pub queued_by_priority: std::collections::HashMap<MessagePriority, u64>,
75}
76
77/// Observer notified whenever a [`Mailbox`] implementation observes a
78/// change in its queue depth.
79///
80/// Installed on a mailbox via [`Mailbox::set_depth_observer`]. Backends
81/// that cannot cheaply report depth on every enqueue may return `false`
82/// from that method, in which case consumers must fall back to periodic
83/// polling via [`Mailbox::status`].
84///
85/// The observer is called synchronously from the enqueue code path;
86/// implementations should therefore keep their work short (typically a
87/// bounded `try_send` into a channel). Implementations must not block
88/// the enqueue path and must tolerate missed notifications.
89pub trait MailboxDepthObserver: Send + Sync + 'static {
90    /// Called after an enqueue completes, carrying the post-enqueue
91    /// queued-message count.
92    fn on_depth_change(&self, queued_messages: usize);
93}
94
95/// Mailbox interface - defines core operations for message persistence
96///
97/// ## Usage example: `dequeue -> process -> ack` loop
98///
99/// The `dequeue` method automatically retrieves the next batch of messages. Callers need not
100/// worry about batch size; that detail is handled internally by the implementation.
101///
102/// ```rust,no_run
103/// use actr_runtime_mailbox::prelude::*;
104/// use std::time::Duration;
105///
106/// async fn message_processor(mailbox: impl Mailbox) {
107///     loop {
108///         // 1. Retrieve the next batch of messages from the queue
109///         match mailbox.dequeue().await {
110///             Ok(messages) => {
111///                 if messages.is_empty() {
112///                     tokio::time::sleep(Duration::from_secs(1)).await;
113///                     continue;
114///                 }
115///
116///                 // 2. Process messages one by one
117///                 for msg in messages {
118///                     println!("Processing message: {}", msg.id);
119///                     // ... execute your business logic here ...
120///
121///                     // 3. After successful processing, acknowledge this message
122///                     if let Err(e) = mailbox.ack(msg.id).await {
123///                         eprintln!("Failed to ack message {}: {}", msg.id, e);
124///                     }
125///                 }
126///             }
127///             Err(e) => {
128///                 eprintln!("Failed to dequeue messages: {}", e);
129///                 tokio::time::sleep(Duration::from_secs(5)).await; // Database error, wait longer
130///             }
131///         }
132///     }
133/// }
134/// ```
135#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
136#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
137pub trait Mailbox: Send + Sync {
138    /// Enqueue a message.
139    ///
140    /// # Arguments
141    /// - `from`: Sender's ActrId (Protobuf bytes, provided directly by Gateway, not unpacked)
142    /// - `payload`: Message content (raw bytes, not unpacked)
143    /// - `priority`: Message priority
144    async fn enqueue(
145        &self,
146        from: Vec<u8>,
147        payload: Vec<u8>,
148        priority: MessagePriority,
149    ) -> StorageResult<Uuid>;
150
151    /// Dequeue a batch of messages from the queue.
152    ///
153    /// This method automatically handles priority: as long as high-priority messages exist,
154    /// they are returned first. Dequeued messages are atomically marked as `Inflight` but
155    /// not deleted. You must call `ack()` after processing to permanently remove them.
156    async fn dequeue(&self) -> StorageResult<Vec<MessageRecord>>;
157
158    /// Acknowledge that a message has been successfully processed, permanently removing it from the queue.
159    async fn ack(&self, message_id: Uuid) -> StorageResult<()>;
160
161    /// Get current mailbox statistics.
162    async fn status(&self) -> StorageResult<MailboxStats>;
163
164    /// Install a [`MailboxDepthObserver`] that receives a
165    /// post-enqueue queued-message count on every enqueue.
166    ///
167    /// Returns `true` if push-based depth notification is supported by
168    /// this backend and the observer has been installed. Returns
169    /// `false` when the backend cannot cheaply compute depth (e.g. a
170    /// remote store where `COUNT(*)` is expensive) and the caller must
171    /// fall back to polling [`Mailbox::status`]; the observer is then
172    /// left uninstalled.
173    ///
174    /// The default implementation returns `false`, so existing mailbox
175    /// backends compile without change.
176    fn set_depth_observer(&self, _observer: Arc<dyn MailboxDepthObserver>) -> bool {
177        false
178    }
179}