Skip to main content

reifydb_runtime/actor/mailbox/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Actor mailbox and message sending types.
5//!
6//! This module provides:
7//! - [`ActorRef`]: A handle for sending messages to an actor
8//! - [`SendError`]: Error type for failed sends
9//!
10//! # Platform Differences
11//!
12//! - **Native**: Uses `crossbeam-channel` for lock-free message passing between threads
13//! - **WASM**: Uses `Rc<RefCell>` processor for inline (synchronous) message handling
14
15#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
16use std::cell::{Cell, RefCell};
17use std::fmt;
18#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
19use std::rc::Rc;
20#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
21use std::sync::Arc;
22#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
23use std::sync::atomic::AtomicBool;
24
25use cfg_if::cfg_if;
26
27#[cfg(not(reifydb_single_threaded))]
28pub(crate) mod native;
29
30#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
31pub(crate) mod wasm;
32
33#[cfg(reifydb_target = "dst")]
34pub(crate) mod dst;
35
36cfg_if! {
37	if #[cfg(reifydb_target = "dst")] {
38		type ActorRefInnerImpl<M> = dst::ActorRefInner<M>;
39	} else if #[cfg(not(reifydb_single_threaded))] {
40		type ActorRefInnerImpl<M> = native::ActorRefInner<M>;
41	} else {
42		type ActorRefInnerImpl<M> = wasm::ActorRefInner<M>;
43	}
44}
45
46/// Error returned when sending a message fails.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum SendError<M> {
49	/// The actor has stopped and the mailbox is closed.
50	Closed(M),
51	/// The mailbox is full (bounded mailbox only).
52	Full(M),
53}
54
55impl<M> SendError<M> {
56	/// Get the message that failed to send.
57	#[inline]
58	pub fn into_inner(self) -> M {
59		match self {
60			SendError::Closed(m) => m,
61			SendError::Full(m) => m,
62		}
63	}
64}
65
66impl<M: fmt::Debug> fmt::Display for SendError<M> {
67	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68		match self {
69			SendError::Closed(_) => write!(f, "actor mailbox closed"),
70			SendError::Full(_) => write!(f, "actor mailbox full"),
71		}
72	}
73}
74
75impl<M: fmt::Debug> error::Error for SendError<M> {}
76
77/// Error returned when an ask (request-response) fails.
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub enum AskError {
80	/// Failed to send the request.
81	SendFailed,
82	/// The response channel was closed (actor stopped or didn't respond).
83	ResponseClosed,
84}
85
86impl fmt::Display for AskError {
87	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88		match self {
89			AskError::SendFailed => write!(f, "failed to send ask request"),
90			AskError::ResponseClosed => write!(f, "response channel closed"),
91		}
92	}
93}
94
95impl error::Error for AskError {}
96
97/// Error when trying to receive without blocking.
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub enum TryRecvError {
100	/// No message available.
101	Empty,
102	/// Mailbox closed.
103	Closed,
104}
105
106/// Error when receiving blocks.
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub enum RecvError {
109	/// Mailbox closed.
110	Closed,
111}
112
113/// Error when receiving with timeout.
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
115pub enum RecvTimeoutError {
116	/// Timeout elapsed without receiving a message.
117	Timeout,
118	/// Mailbox closed.
119	Closed,
120}
121
122/// Handle to send messages to an actor.
123///
124/// - **Native**: Uses `crossbeam-channel` for lock-free message passing
125/// - **WASM**: Messages are processed synchronously inline when sent
126///
127/// Cheap to clone, safe to share across threads (native) or within single thread (WASM).
128pub struct ActorRef<M> {
129	inner: ActorRefInnerImpl<M>,
130}
131
132impl<M> Clone for ActorRef<M> {
133	#[inline]
134	fn clone(&self) -> Self {
135		Self {
136			inner: self.inner.clone(),
137		}
138	}
139}
140
141impl<M> fmt::Debug for ActorRef<M> {
142	#[inline]
143	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144		self.inner.fmt(f)
145	}
146}
147
148// SAFETY: Single-threaded targets (WASM/WASI) don't have real concurrency
149#[cfg(reifydb_single_threaded)]
150unsafe impl<M> Send for ActorRef<M> {}
151
152#[cfg(reifydb_single_threaded)]
153unsafe impl<M> Sync for ActorRef<M> {}
154
155impl<M> ActorRef<M> {
156	/// Create a new ActorRef from an inner implementation.
157	#[inline]
158	pub(crate) fn from_inner(inner: ActorRefInnerImpl<M>) -> Self {
159		Self {
160			inner,
161		}
162	}
163}
164
165// Native-specific methods (require M: Send)
166#[cfg(not(reifydb_single_threaded))]
167impl<M: Send> ActorRef<M> {
168	/// Create a new ActorRef from a crossbeam sender (native only).
169	#[inline]
170	pub(crate) fn new(tx: Sender<M>) -> Self {
171		Self {
172			inner: native::ActorRefInner::new(tx),
173		}
174	}
175
176	/// Set the notify callback, called on successful send to wake the actor.
177	#[inline]
178	pub(crate) fn set_notify(&self, f: sync::Arc<dyn Fn() + Send + Sync>) {
179		self.inner.set_notify(f)
180	}
181
182	/// Send a message (non-blocking, may fail if mailbox full).
183	///
184	/// Returns `Ok(())` if the message was queued/processed successfully.
185	/// Returns `Err(SendError::Closed)` if the actor has stopped.
186	/// Returns `Err(SendError::Full)` if the mailbox is full (bounded only).
187	#[inline]
188	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
189		self.inner.send(msg)
190	}
191
192	/// Send a message, blocking if the mailbox is full.
193	///
194	/// This provides backpressure - sender blocks until there's room.
195	#[inline]
196	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
197		self.inner.send_blocking(msg)
198	}
199
200	/// Check if the actor is still alive.
201	///
202	/// Returns `false` if the actor has stopped and the mailbox is closed.
203	#[inline]
204	pub fn is_alive(&self) -> bool {
205		self.inner.is_alive()
206	}
207}
208
209// DST-specific methods (no Send bound needed)
210#[cfg(reifydb_target = "dst")]
211impl<M> ActorRef<M> {
212	/// Send a message (enqueue-only in DST).
213	#[inline]
214	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
215		self.inner.send(msg)
216	}
217
218	/// Send a message, blocking if the mailbox is full.
219	///
220	/// In DST, this is identical to `send()`.
221	#[inline]
222	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
223		self.inner.send_blocking(msg)
224	}
225
226	/// Check if the actor is still alive.
227	#[inline]
228	pub fn is_alive(&self) -> bool {
229		self.inner.is_alive()
230	}
231
232	/// Mark the actor as stopped (DST only).
233	#[inline]
234	pub(crate) fn mark_stopped(&self) {
235		self.inner.mark_stopped()
236	}
237
238	/// Install the notify callback (DST only).
239	#[inline]
240	pub(crate) fn set_notify(&self, f: Box<dyn Fn()>) {
241		self.inner.set_notify(f)
242	}
243}
244
245// Single-threaded methods (no Send bound needed)
246#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
247impl<M> ActorRef<M> {
248	/// Create a new ActorRef with WASM components (WASM only).
249	#[inline]
250	pub(crate) fn new(
251		processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
252		alive: Arc<AtomicBool>,
253		queue: Rc<RefCell<Vec<M>>>,
254		processing: Rc<Cell<bool>>,
255	) -> Self {
256		Self {
257			inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
258		}
259	}
260
261	/// Create a new ActorRef from WASM inner components (used by system/wasm).
262	#[inline]
263	pub(crate) fn from_wasm_inner(
264		processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
265		alive: Arc<AtomicBool>,
266		queue: Rc<RefCell<Vec<M>>>,
267		processing: Rc<Cell<bool>>,
268	) -> Self {
269		Self {
270			inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
271		}
272	}
273
274	/// Send a message (processes synchronously inline in WASM).
275	///
276	/// Returns `Ok(())` if the message was processed/queued successfully.
277	/// Returns `Err(SendError::Closed)` if the actor has stopped.
278	#[inline]
279	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
280		self.inner.send(msg)
281	}
282
283	/// Send a message, blocking if the mailbox is full.
284	///
285	/// In WASM, this is identical to `send()` since processing is inline.
286	#[inline]
287	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
288		self.inner.send_blocking(msg)
289	}
290
291	/// Check if the actor is still alive.
292	///
293	/// Returns `false` if the actor has stopped.
294	#[inline]
295	pub fn is_alive(&self) -> bool {
296		self.inner.is_alive()
297	}
298
299	/// Mark the actor as stopped (WASM only).
300	#[inline]
301	pub(crate) fn mark_stopped(&self) {
302		self.inner.mark_stopped()
303	}
304
305	/// Get access to the processor for setting it up (WASM only).
306	#[inline]
307	pub(crate) fn processor(&self) -> &Rc<RefCell<Option<Box<dyn FnMut(M)>>>> {
308		&self.inner.processor
309	}
310}
311
312use std::error;
313#[cfg(not(reifydb_single_threaded))]
314use std::sync;
315
316#[cfg(not(reifydb_single_threaded))]
317use crossbeam_channel::Sender;
318#[cfg(reifydb_target = "dst")]
319pub(crate) use dst::create_mailbox as create_dst_mailbox;
320#[cfg(not(reifydb_single_threaded))]
321pub(crate) use native::create_mailbox;
322#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
323pub(crate) use wasm::create_actor_ref;