actr_runtime_mailbox/mailbox.rs
1//! # Actor Mailbox
2//!
3//! This module defines the core interfaces and data structures for persistent message queues.
4//!
5//! ## Reliable Queue Workflow
6//!
7//! This interface is designed as a reliable queue to prevent message loss when a consumer
8//! crashes during message processing. The workflow is as follows:
9//!
10//! 1. **`dequeue()`**: The consumer retrieves a batch of messages from the queue. These messages
11//! are atomically marked as `Inflight` in the database, but are **not deleted**.
12//! 2. **Process messages**: The consumer processes these messages locally.
13//! 3. **`ack()`**: When a message has been successfully processed, the consumer calls
14//! `ack(message_id)`. This **permanently deletes** the message, marking the successful
15//! completion of the work unit.
16//!
17//! If the consumer crashes after `dequeue` but before `ack`, those `Inflight` messages remain
18//! in the database. On the next consumer restart, a "cleanup" routine can be implemented to
19//! reprocess these "stuck" messages.
20
21use crate::error::StorageResult;
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use serde::{Deserialize, Serialize};
25use std::sync::Arc;
26use uuid::Uuid;
27
28/// Message priority
29#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
30pub enum MessagePriority {
31 Normal,
32 High,
33}
34
35/// Message record retrieved from the queue
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct MessageRecord {
38 /// Message ID
39 pub id: Uuid,
40 /// Sender's ActrId (Protobuf bytes)
41 ///
42 /// # Design notes
43 /// - `from` stores raw Protobuf bytes without deserializing into ActrId struct
44 /// - Avoids the decode -> ActrId -> encode round-trip
45 /// - Only deserialize when actually needed
46 /// - Gateway passes bytes directly, zero overhead
47 /// - All messages entering the Mailbox originate from WebRTC and always have a sender
48 pub from: Vec<u8>,
49 /// Message content (raw bytes, not unpacked)
50 pub payload: Vec<u8>,
51 /// Priority
52 pub priority: MessagePriority,
53 /// Creation time
54 pub created_at: DateTime<Utc>,
55 /// Processing status
56 pub status: MessageStatus,
57}
58
59/// Message processing status
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
61pub enum MessageStatus {
62 Queued,
63 Inflight,
64}
65
66/// Mailbox statistics
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68pub struct MailboxStats {
69 /// Total number of messages waiting in the queue
70 pub queued_messages: u64,
71 /// Total number of dequeued but not yet acknowledged messages
72 pub inflight_messages: u64,
73 /// Queued message count by priority
74 pub queued_by_priority: std::collections::HashMap<MessagePriority, u64>,
75}
76
77/// Observer notified whenever a [`Mailbox`] implementation observes a
78/// change in its queue depth.
79///
80/// Installed on a mailbox via [`Mailbox::set_depth_observer`]. Backends
81/// that cannot cheaply report depth on every enqueue may return `false`
82/// from that method, in which case consumers must fall back to periodic
83/// polling via [`Mailbox::status`].
84///
85/// The observer is called synchronously from the enqueue code path;
86/// implementations should therefore keep their work short (typically a
87/// bounded `try_send` into a channel). Implementations must not block
88/// the enqueue path and must tolerate missed notifications.
89pub trait MailboxDepthObserver: Send + Sync + 'static {
90 /// Called after an enqueue completes, carrying the post-enqueue
91 /// queued-message count.
92 fn on_depth_change(&self, queued_messages: usize);
93}
94
95/// Mailbox interface - defines core operations for message persistence
96///
97/// ## Usage example: `dequeue -> process -> ack` loop
98///
99/// The `dequeue` method automatically retrieves the next batch of messages. Callers need not
100/// worry about batch size; that detail is handled internally by the implementation.
101///
102/// ```rust,no_run
103/// use actr_runtime_mailbox::prelude::*;
104/// use std::time::Duration;
105///
106/// async fn message_processor(mailbox: impl Mailbox) {
107/// loop {
108/// // 1. Retrieve the next batch of messages from the queue
109/// match mailbox.dequeue().await {
110/// Ok(messages) => {
111/// if messages.is_empty() {
112/// tokio::time::sleep(Duration::from_secs(1)).await;
113/// continue;
114/// }
115///
116/// // 2. Process messages one by one
117/// for msg in messages {
118/// println!("Processing message: {}", msg.id);
119/// // ... execute your business logic here ...
120///
121/// // 3. After successful processing, acknowledge this message
122/// if let Err(e) = mailbox.ack(msg.id).await {
123/// eprintln!("Failed to ack message {}: {}", msg.id, e);
124/// }
125/// }
126/// }
127/// Err(e) => {
128/// eprintln!("Failed to dequeue messages: {}", e);
129/// tokio::time::sleep(Duration::from_secs(5)).await; // Database error, wait longer
130/// }
131/// }
132/// }
133/// }
134/// ```
135#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
136#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
137pub trait Mailbox: Send + Sync {
138 /// Enqueue a message.
139 ///
140 /// # Arguments
141 /// - `from`: Sender's ActrId (Protobuf bytes, provided directly by Gateway, not unpacked)
142 /// - `payload`: Message content (raw bytes, not unpacked)
143 /// - `priority`: Message priority
144 async fn enqueue(
145 &self,
146 from: Vec<u8>,
147 payload: Vec<u8>,
148 priority: MessagePriority,
149 ) -> StorageResult<Uuid>;
150
151 /// Dequeue a batch of messages from the queue.
152 ///
153 /// This method automatically handles priority: as long as high-priority messages exist,
154 /// they are returned first. Dequeued messages are atomically marked as `Inflight` but
155 /// not deleted. You must call `ack()` after processing to permanently remove them.
156 async fn dequeue(&self) -> StorageResult<Vec<MessageRecord>>;
157
158 /// Acknowledge that a message has been successfully processed, permanently removing it from the queue.
159 async fn ack(&self, message_id: Uuid) -> StorageResult<()>;
160
161 /// Get current mailbox statistics.
162 async fn status(&self) -> StorageResult<MailboxStats>;
163
164 /// Install a [`MailboxDepthObserver`] that receives a
165 /// post-enqueue queued-message count on every enqueue.
166 ///
167 /// Returns `true` if push-based depth notification is supported by
168 /// this backend and the observer has been installed. Returns
169 /// `false` when the backend cannot cheaply compute depth (e.g. a
170 /// remote store where `COUNT(*)` is expensive) and the caller must
171 /// fall back to polling [`Mailbox::status`]; the observer is then
172 /// left uninstalled.
173 ///
174 /// The default implementation returns `false`, so existing mailbox
175 /// backends compile without change.
176 fn set_depth_observer(&self, _observer: Arc<dyn MailboxDepthObserver>) -> bool {
177 false
178 }
179}