Skip to main content

reifydb_runtime/actor/mailbox/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Actor mailbox and message sending types.
5//!
6//! This module provides:
7//! - [`ActorRef`]: A handle for sending messages to an actor
8//! - [`SendError`]: Error type for failed sends
9//!
10//! # Platform Differences
11//!
12//! - **Native**: Uses `crossbeam-channel` for lock-free message passing between threads
13//! - **WASM**: Uses `Rc<RefCell>` processor for inline (synchronous) message handling
14
15#[cfg(reifydb_target = "wasm")]
16use std::cell::{Cell, RefCell};
17use std::fmt;
18#[cfg(reifydb_target = "wasm")]
19use std::rc::Rc;
20#[cfg(reifydb_target = "wasm")]
21use std::sync::Arc;
22#[cfg(reifydb_target = "wasm")]
23use std::sync::atomic::AtomicBool;
24
25use cfg_if::cfg_if;
26
27#[cfg(reifydb_target = "native")]
28pub(crate) mod native;
29
30#[cfg(reifydb_target = "wasm")]
31pub(crate) mod wasm;
32
33cfg_if! {
34	if #[cfg(reifydb_target = "native")] {
35		type ActorRefInnerImpl<M> = native::ActorRefInner<M>;
36	} else {
37		type ActorRefInnerImpl<M> = wasm::ActorRefInner<M>;
38	}
39}
40
41/// Error returned when sending a message fails.
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum SendError<M> {
44	/// The actor has stopped and the mailbox is closed.
45	Closed(M),
46	/// The mailbox is full (bounded mailbox only).
47	Full(M),
48}
49
50impl<M> SendError<M> {
51	/// Get the message that failed to send.
52	#[inline]
53	pub fn into_inner(self) -> M {
54		match self {
55			SendError::Closed(m) => m,
56			SendError::Full(m) => m,
57		}
58	}
59}
60
61impl<M: fmt::Debug> fmt::Display for SendError<M> {
62	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63		match self {
64			SendError::Closed(_) => write!(f, "actor mailbox closed"),
65			SendError::Full(_) => write!(f, "actor mailbox full"),
66		}
67	}
68}
69
70impl<M: fmt::Debug> error::Error for SendError<M> {}
71
72/// Error returned when an ask (request-response) fails.
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum AskError {
75	/// Failed to send the request.
76	SendFailed,
77	/// The response channel was closed (actor stopped or didn't respond).
78	ResponseClosed,
79}
80
81impl fmt::Display for AskError {
82	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83		match self {
84			AskError::SendFailed => write!(f, "failed to send ask request"),
85			AskError::ResponseClosed => write!(f, "response channel closed"),
86		}
87	}
88}
89
90impl error::Error for AskError {}
91
92/// Error when trying to receive without blocking.
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum TryRecvError {
95	/// No message available.
96	Empty,
97	/// Mailbox closed.
98	Closed,
99}
100
101/// Error when receiving blocks.
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum RecvError {
104	/// Mailbox closed.
105	Closed,
106}
107
108/// Error when receiving with timeout.
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum RecvTimeoutError {
111	/// Timeout elapsed without receiving a message.
112	Timeout,
113	/// Mailbox closed.
114	Closed,
115}
116
117/// Handle to send messages to an actor.
118///
119/// - **Native**: Uses `crossbeam-channel` for lock-free message passing
120/// - **WASM**: Messages are processed synchronously inline when sent
121///
122/// Cheap to clone, safe to share across threads (native) or within single thread (WASM).
123pub struct ActorRef<M> {
124	inner: ActorRefInnerImpl<M>,
125}
126
127impl<M> Clone for ActorRef<M> {
128	#[inline]
129	fn clone(&self) -> Self {
130		Self {
131			inner: self.inner.clone(),
132		}
133	}
134}
135
136impl<M> fmt::Debug for ActorRef<M> {
137	#[inline]
138	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139		self.inner.fmt(f)
140	}
141}
142
143// SAFETY: WASM is single-threaded, so Send and Sync are safe
144#[cfg(reifydb_target = "wasm")]
145unsafe impl<M> Send for ActorRef<M> {}
146
147#[cfg(reifydb_target = "wasm")]
148unsafe impl<M> Sync for ActorRef<M> {}
149
150impl<M> ActorRef<M> {
151	/// Create a new ActorRef from an inner implementation.
152	#[inline]
153	pub(crate) fn from_inner(inner: ActorRefInnerImpl<M>) -> Self {
154		Self {
155			inner,
156		}
157	}
158}
159
160// Native-specific methods (require M: Send)
161#[cfg(reifydb_target = "native")]
162impl<M: Send> ActorRef<M> {
163	/// Create a new ActorRef from a crossbeam sender (native only).
164	#[inline]
165	pub(crate) fn new(tx: Sender<M>) -> Self {
166		Self {
167			inner: native::ActorRefInner::new(tx),
168		}
169	}
170
171	/// Set the notify callback, called on successful send to wake the actor.
172	#[inline]
173	pub(crate) fn set_notify(&self, f: sync::Arc<dyn Fn() + Send + Sync>) {
174		self.inner.set_notify(f)
175	}
176
177	/// Send a message (non-blocking, may fail if mailbox full).
178	///
179	/// Returns `Ok(())` if the message was queued/processed successfully.
180	/// Returns `Err(SendError::Closed)` if the actor has stopped.
181	/// Returns `Err(SendError::Full)` if the mailbox is full (bounded only).
182	#[inline]
183	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
184		self.inner.send(msg)
185	}
186
187	/// Send a message, blocking if the mailbox is full.
188	///
189	/// This provides backpressure - sender blocks until there's room.
190	#[inline]
191	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
192		self.inner.send_blocking(msg)
193	}
194
195	/// Check if the actor is still alive.
196	///
197	/// Returns `false` if the actor has stopped and the mailbox is closed.
198	#[inline]
199	pub fn is_alive(&self) -> bool {
200		self.inner.is_alive()
201	}
202}
203
204// WASM-specific methods (no Send bound needed)
205#[cfg(reifydb_target = "wasm")]
206impl<M> ActorRef<M> {
207	/// Create a new ActorRef with WASM components (WASM only).
208	#[inline]
209	pub(crate) fn new(
210		processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
211		alive: Arc<AtomicBool>,
212		queue: Rc<RefCell<Vec<M>>>,
213		processing: Rc<Cell<bool>>,
214	) -> Self {
215		Self {
216			inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
217		}
218	}
219
220	/// Create a new ActorRef from WASM inner components (used by system/wasm).
221	#[inline]
222	pub(crate) fn from_wasm_inner(
223		processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
224		alive: Arc<AtomicBool>,
225		queue: Rc<RefCell<Vec<M>>>,
226		processing: Rc<Cell<bool>>,
227	) -> Self {
228		Self {
229			inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
230		}
231	}
232
233	/// Send a message (processes synchronously inline in WASM).
234	///
235	/// Returns `Ok(())` if the message was processed/queued successfully.
236	/// Returns `Err(SendError::Closed)` if the actor has stopped.
237	#[inline]
238	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
239		self.inner.send(msg)
240	}
241
242	/// Send a message, blocking if the mailbox is full.
243	///
244	/// In WASM, this is identical to `send()` since processing is inline.
245	#[inline]
246	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
247		self.inner.send_blocking(msg)
248	}
249
250	/// Check if the actor is still alive.
251	///
252	/// Returns `false` if the actor has stopped.
253	#[inline]
254	pub fn is_alive(&self) -> bool {
255		self.inner.is_alive()
256	}
257
258	/// Mark the actor as stopped (WASM only).
259	#[inline]
260	pub(crate) fn mark_stopped(&self) {
261		self.inner.mark_stopped()
262	}
263
264	/// Get access to the processor for setting it up (WASM only).
265	#[inline]
266	pub(crate) fn processor(&self) -> &Rc<RefCell<Option<Box<dyn FnMut(M)>>>> {
267		&self.inner.processor
268	}
269}
270
271use std::error;
272#[cfg(reifydb_target = "native")]
273use std::sync;
274
275#[cfg(reifydb_target = "native")]
276use crossbeam_channel::Sender;
277#[cfg(reifydb_target = "native")]
278pub(crate) use native::create_mailbox;
279#[cfg(reifydb_target = "wasm")]
280pub(crate) use wasm::create_actor_ref;