Skip to main content

reifydb_runtime/actor/mailbox/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
5use std::cell::{Cell, RefCell};
6use std::fmt;
7#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
8use std::rc::Rc;
9#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
10use std::sync::Arc;
11#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
12use std::sync::atomic::AtomicBool;
13
14use cfg_if::cfg_if;
15
16#[cfg(not(reifydb_single_threaded))]
17pub(crate) mod native;
18
19#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
20pub(crate) mod wasm;
21
22#[cfg(reifydb_target = "dst")]
23pub(crate) mod dst;
24
25cfg_if! {
26	if #[cfg(reifydb_target = "dst")] {
27		type ActorRefInnerImpl<M> = dst::ActorRefInner<M>;
28	} else if #[cfg(not(reifydb_single_threaded))] {
29		type ActorRefInnerImpl<M> = native::ActorRefInner<M>;
30	} else {
31		type ActorRefInnerImpl<M> = wasm::ActorRefInner<M>;
32	}
33}
34
35/// Error returned when sending a message fails.
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum SendError<M> {
38	/// The actor has stopped and the mailbox is closed.
39	Closed(M),
40	/// The mailbox is full (bounded mailbox only).
41	Full(M),
42}
43
44impl<M> SendError<M> {
45	/// Get the message that failed to send.
46	#[inline]
47	pub fn into_inner(self) -> M {
48		match self {
49			SendError::Closed(m) => m,
50			SendError::Full(m) => m,
51		}
52	}
53}
54
55impl<M: fmt::Debug> fmt::Display for SendError<M> {
56	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57		match self {
58			SendError::Closed(_) => write!(f, "actor mailbox closed"),
59			SendError::Full(_) => write!(f, "actor mailbox full"),
60		}
61	}
62}
63
64impl<M: fmt::Debug> error::Error for SendError<M> {}
65
66/// Error returned when an ask (request-response) fails.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum AskError {
69	/// Failed to send the request.
70	SendFailed,
71	/// The response channel was closed (actor stopped or didn't respond).
72	ResponseClosed,
73}
74
75impl fmt::Display for AskError {
76	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77		match self {
78			AskError::SendFailed => write!(f, "failed to send ask request"),
79			AskError::ResponseClosed => write!(f, "response channel closed"),
80		}
81	}
82}
83
84impl error::Error for AskError {}
85
86/// Error when trying to receive without blocking.
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum TryRecvError {
89	/// No message available.
90	Empty,
91	/// Mailbox closed.
92	Closed,
93}
94
95/// Error when receiving blocks.
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum RecvError {
98	/// Mailbox closed.
99	Closed,
100}
101
102/// Error when receiving with timeout.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum RecvTimeoutError {
105	/// Timeout elapsed without receiving a message.
106	Timeout,
107	/// Mailbox closed.
108	Closed,
109}
110
111/// Handle to send messages to an actor.
112///
113/// - **Native**: Uses `crossbeam-channel` for lock-free message passing
114/// - **WASM**: Messages are processed synchronously inline when sent
115///
116/// Cheap to clone, safe to share across threads (native) or within single thread (WASM).
117pub struct ActorRef<M> {
118	inner: ActorRefInnerImpl<M>,
119}
120
121impl<M> Clone for ActorRef<M> {
122	#[inline]
123	fn clone(&self) -> Self {
124		Self {
125			inner: self.inner.clone(),
126		}
127	}
128}
129
130impl<M> fmt::Debug for ActorRef<M> {
131	#[inline]
132	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133		self.inner.fmt(f)
134	}
135}
136
137// SAFETY: Single-threaded targets (WASM/WASI) don't have real concurrency
138#[cfg(reifydb_single_threaded)]
139unsafe impl<M> Send for ActorRef<M> {}
140
141#[cfg(reifydb_single_threaded)]
142unsafe impl<M> Sync for ActorRef<M> {}
143
144impl<M> ActorRef<M> {
145	/// Create a new ActorRef from an inner implementation.
146	#[inline]
147	pub(crate) fn from_inner(inner: ActorRefInnerImpl<M>) -> Self {
148		Self {
149			inner,
150		}
151	}
152}
153
154// Native-specific methods (require M: Send)
155#[cfg(not(reifydb_single_threaded))]
156impl<M: Send> ActorRef<M> {
157	/// Create a new ActorRef from a crossbeam sender (native only).
158	#[inline]
159	pub(crate) fn new(tx: Sender<M>) -> Self {
160		Self {
161			inner: native::ActorRefInner::new(tx),
162		}
163	}
164
165	/// Set the notify callback, called on successful send to wake the actor.
166	#[inline]
167	pub(crate) fn set_notify(&self, f: sync::Arc<dyn Fn() + Send + Sync>) {
168		self.inner.set_notify(f)
169	}
170
171	/// Send a message (non-blocking, may fail if mailbox full).
172	///
173	/// Returns `Ok(())` if the message was queued/processed successfully.
174	/// Returns `Err(SendError::Closed)` if the actor has stopped.
175	/// Returns `Err(SendError::Full)` if the mailbox is full (bounded only).
176	#[inline]
177	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
178		self.inner.send(msg)
179	}
180
181	/// Send a message, blocking if the mailbox is full.
182	///
183	/// This provides backpressure - sender blocks until there's room.
184	#[inline]
185	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
186		self.inner.send_blocking(msg)
187	}
188
189	/// Check if the actor is still alive.
190	///
191	/// Returns `false` if the actor has stopped and the mailbox is closed.
192	#[inline]
193	pub fn is_alive(&self) -> bool {
194		self.inner.is_alive()
195	}
196}
197
198// DST-specific methods (no Send bound needed)
199#[cfg(reifydb_target = "dst")]
200impl<M> ActorRef<M> {
201	/// Send a message (enqueue-only in DST).
202	#[inline]
203	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
204		self.inner.send(msg)
205	}
206
207	/// Send a message, blocking if the mailbox is full.
208	///
209	/// In DST, this is identical to `send()`.
210	#[inline]
211	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
212		self.inner.send_blocking(msg)
213	}
214
215	/// Check if the actor is still alive.
216	#[inline]
217	pub fn is_alive(&self) -> bool {
218		self.inner.is_alive()
219	}
220
221	/// Mark the actor as stopped (DST only).
222	#[inline]
223	pub(crate) fn mark_stopped(&self) {
224		self.inner.mark_stopped()
225	}
226
227	/// Install the notify callback (DST only).
228	#[inline]
229	pub(crate) fn set_notify(&self, f: Box<dyn Fn()>) {
230		self.inner.set_notify(f)
231	}
232}
233
234// Single-threaded methods (no Send bound needed)
235#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
236impl<M> ActorRef<M> {
237	/// Create a new ActorRef with WASM components (WASM only).
238	#[inline]
239	pub(crate) fn new(
240		processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
241		alive: Arc<AtomicBool>,
242		queue: Rc<RefCell<Vec<M>>>,
243		processing: Rc<Cell<bool>>,
244	) -> Self {
245		Self {
246			inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
247		}
248	}
249
250	/// Create a new ActorRef from WASM inner components (used by system/wasm).
251	#[inline]
252	pub(crate) fn from_wasm_inner(
253		processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
254		alive: Arc<AtomicBool>,
255		queue: Rc<RefCell<Vec<M>>>,
256		processing: Rc<Cell<bool>>,
257	) -> Self {
258		Self {
259			inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
260		}
261	}
262
263	/// Send a message (processes synchronously inline in WASM).
264	///
265	/// Returns `Ok(())` if the message was processed/queued successfully.
266	/// Returns `Err(SendError::Closed)` if the actor has stopped.
267	#[inline]
268	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
269		self.inner.send(msg)
270	}
271
272	/// Send a message, blocking if the mailbox is full.
273	///
274	/// In WASM, this is identical to `send()` since processing is inline.
275	#[inline]
276	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
277		self.inner.send_blocking(msg)
278	}
279
280	/// Check if the actor is still alive.
281	///
282	/// Returns `false` if the actor has stopped.
283	#[inline]
284	pub fn is_alive(&self) -> bool {
285		self.inner.is_alive()
286	}
287
288	/// Mark the actor as stopped (WASM only).
289	#[inline]
290	pub(crate) fn mark_stopped(&self) {
291		self.inner.mark_stopped()
292	}
293
294	/// Get access to the processor for setting it up (WASM only).
295	#[inline]
296	pub(crate) fn processor(&self) -> &Rc<RefCell<Option<Box<dyn FnMut(M)>>>> {
297		&self.inner.processor
298	}
299}
300
301use std::error;
302#[cfg(not(reifydb_single_threaded))]
303use std::sync;
304
305#[cfg(not(reifydb_single_threaded))]
306use crossbeam_channel::Sender;
307#[cfg(reifydb_target = "dst")]
308pub(crate) use dst::create_mailbox as create_dst_mailbox;
309#[cfg(not(reifydb_single_threaded))]
310pub(crate) use native::create_mailbox;
311#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
312pub(crate) use wasm::create_actor_ref;