Skip to main content

reifydb_runtime/actor/mailbox/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Actor mailbox and message sending types.
5//!
6//! This module provides:
7//! - [`ActorRef`]: A handle for sending messages to an actor
8//! - [`SendError`]: Error type for failed sends
9//!
10//! # Platform Differences
11//!
12//! - **Native**: Uses `crossbeam-channel` for lock-free message passing between threads
13//! - **WASM**: Uses `Rc<RefCell>` processor for inline (synchronous) message handling
14
15#[cfg(reifydb_target = "wasm")]
16use std::cell::{Cell, RefCell};
17use std::fmt;
18#[cfg(reifydb_target = "wasm")]
19use std::rc::Rc;
20#[cfg(reifydb_target = "wasm")]
21use std::sync::Arc;
22#[cfg(reifydb_target = "wasm")]
23use std::sync::atomic::AtomicBool;
24
25use cfg_if::cfg_if;
26
27#[cfg(reifydb_target = "native")]
28pub(crate) mod native;
29
30#[cfg(reifydb_target = "wasm")]
31pub(crate) mod wasm;
32
33cfg_if! {
34	if #[cfg(reifydb_target = "native")] {
35		type ActorRefInnerImpl<M> = native::ActorRefInner<M>;
36	} else {
37		type ActorRefInnerImpl<M> = wasm::ActorRefInner<M>;
38	}
39}
40
41// =============================================================================
42// Shared error types
43// =============================================================================
44
45/// Error returned when sending a message fails.
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum SendError<M> {
48	/// The actor has stopped and the mailbox is closed.
49	Closed(M),
50	/// The mailbox is full (bounded mailbox only).
51	Full(M),
52}
53
54impl<M> SendError<M> {
55	/// Get the message that failed to send.
56	#[inline]
57	pub fn into_inner(self) -> M {
58		match self {
59			SendError::Closed(m) => m,
60			SendError::Full(m) => m,
61		}
62	}
63}
64
65impl<M: fmt::Debug> fmt::Display for SendError<M> {
66	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67		match self {
68			SendError::Closed(_) => write!(f, "actor mailbox closed"),
69			SendError::Full(_) => write!(f, "actor mailbox full"),
70		}
71	}
72}
73
74impl<M: fmt::Debug> error::Error for SendError<M> {}
75
76/// Error returned when an ask (request-response) fails.
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub enum AskError {
79	/// Failed to send the request.
80	SendFailed,
81	/// The response channel was closed (actor stopped or didn't respond).
82	ResponseClosed,
83}
84
85impl fmt::Display for AskError {
86	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87		match self {
88			AskError::SendFailed => write!(f, "failed to send ask request"),
89			AskError::ResponseClosed => write!(f, "response channel closed"),
90		}
91	}
92}
93
94impl error::Error for AskError {}
95
96/// Error when trying to receive without blocking.
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum TryRecvError {
99	/// No message available.
100	Empty,
101	/// Mailbox closed.
102	Closed,
103}
104
105/// Error when receiving blocks.
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum RecvError {
108	/// Mailbox closed.
109	Closed,
110}
111
112/// Error when receiving with timeout.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum RecvTimeoutError {
115	/// Timeout elapsed without receiving a message.
116	Timeout,
117	/// Mailbox closed.
118	Closed,
119}
120
121/// Handle to send messages to an actor.
122///
123/// - **Native**: Uses `crossbeam-channel` for lock-free message passing
124/// - **WASM**: Messages are processed synchronously inline when sent
125///
126/// Cheap to clone, safe to share across threads (native) or within single thread (WASM).
127pub struct ActorRef<M> {
128	inner: ActorRefInnerImpl<M>,
129}
130
131impl<M> Clone for ActorRef<M> {
132	#[inline]
133	fn clone(&self) -> Self {
134		Self {
135			inner: self.inner.clone(),
136		}
137	}
138}
139
140impl<M> fmt::Debug for ActorRef<M> {
141	#[inline]
142	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143		self.inner.fmt(f)
144	}
145}
146
147// SAFETY: WASM is single-threaded, so Send and Sync are safe
148#[cfg(reifydb_target = "wasm")]
149unsafe impl<M> Send for ActorRef<M> {}
150
151#[cfg(reifydb_target = "wasm")]
152unsafe impl<M> Sync for ActorRef<M> {}
153
154impl<M> ActorRef<M> {
155	/// Create a new ActorRef from an inner implementation.
156	#[inline]
157	pub(crate) fn from_inner(inner: ActorRefInnerImpl<M>) -> Self {
158		Self {
159			inner,
160		}
161	}
162}
163
164// Native-specific methods (require M: Send)
165#[cfg(reifydb_target = "native")]
166impl<M: Send> ActorRef<M> {
167	/// Create a new ActorRef from a crossbeam sender (native only).
168	#[inline]
169	pub(crate) fn new(tx: Sender<M>) -> Self {
170		Self {
171			inner: native::ActorRefInner::new(tx),
172		}
173	}
174
175	/// Set the notify callback, called on successful send to wake the actor.
176	#[inline]
177	pub(crate) fn set_notify(&self, f: sync::Arc<dyn Fn() + Send + Sync>) {
178		self.inner.set_notify(f)
179	}
180
181	/// Send a message (non-blocking, may fail if mailbox full).
182	///
183	/// Returns `Ok(())` if the message was queued/processed successfully.
184	/// Returns `Err(SendError::Closed)` if the actor has stopped.
185	/// Returns `Err(SendError::Full)` if the mailbox is full (bounded only).
186	#[inline]
187	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
188		self.inner.send(msg)
189	}
190
191	/// Send a message, blocking if the mailbox is full.
192	///
193	/// This provides backpressure - sender blocks until there's room.
194	#[inline]
195	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
196		self.inner.send_blocking(msg)
197	}
198
199	/// Check if the actor is still alive.
200	///
201	/// Returns `false` if the actor has stopped and the mailbox is closed.
202	#[inline]
203	pub fn is_alive(&self) -> bool {
204		self.inner.is_alive()
205	}
206}
207
208// WASM-specific methods (no Send bound needed)
209#[cfg(reifydb_target = "wasm")]
210impl<M> ActorRef<M> {
211	/// Create a new ActorRef with WASM components (WASM only).
212	#[inline]
213	pub(crate) fn new(
214		processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
215		alive: Arc<AtomicBool>,
216		queue: Rc<RefCell<Vec<M>>>,
217		processing: Rc<Cell<bool>>,
218	) -> Self {
219		Self {
220			inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
221		}
222	}
223
224	/// Create a new ActorRef from WASM inner components (used by system/wasm).
225	#[inline]
226	pub(crate) fn from_wasm_inner(
227		processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
228		alive: Arc<AtomicBool>,
229		queue: Rc<RefCell<Vec<M>>>,
230		processing: Rc<Cell<bool>>,
231	) -> Self {
232		Self {
233			inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
234		}
235	}
236
237	/// Send a message (processes synchronously inline in WASM).
238	///
239	/// Returns `Ok(())` if the message was processed/queued successfully.
240	/// Returns `Err(SendError::Closed)` if the actor has stopped.
241	#[inline]
242	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
243		self.inner.send(msg)
244	}
245
246	/// Send a message, blocking if the mailbox is full.
247	///
248	/// In WASM, this is identical to `send()` since processing is inline.
249	#[inline]
250	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
251		self.inner.send_blocking(msg)
252	}
253
254	/// Check if the actor is still alive.
255	///
256	/// Returns `false` if the actor has stopped.
257	#[inline]
258	pub fn is_alive(&self) -> bool {
259		self.inner.is_alive()
260	}
261
262	/// Mark the actor as stopped (WASM only).
263	#[inline]
264	pub(crate) fn mark_stopped(&self) {
265		self.inner.mark_stopped()
266	}
267
268	/// Get access to the processor for setting it up (WASM only).
269	#[inline]
270	pub(crate) fn processor(&self) -> &Rc<RefCell<Option<Box<dyn FnMut(M)>>>> {
271		&self.inner.processor
272	}
273}
274
275use std::error;
276#[cfg(reifydb_target = "native")]
277use std::sync;
278
279#[cfg(reifydb_target = "native")]
280use crossbeam_channel::Sender;
281#[cfg(reifydb_target = "native")]
282pub(crate) use native::create_mailbox;
283#[cfg(reifydb_target = "wasm")]
284pub(crate) use wasm::create_actor_ref;