Skip to main content

reifydb_runtime/actor/mailbox/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
5use std::cell::{Cell, RefCell};
6use std::fmt;
7#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
8use std::rc::Rc;
9#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
10use std::sync::Arc;
11#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
12use std::sync::atomic::AtomicBool;
13
14use cfg_if::cfg_if;
15
16#[cfg(not(reifydb_single_threaded))]
17pub(crate) mod native;
18
19#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
20pub(crate) mod wasm;
21
22#[cfg(reifydb_target = "dst")]
23pub(crate) mod dst;
24
25cfg_if! {
26	if #[cfg(reifydb_target = "dst")] {
27		type ActorRefInnerImpl<M> = dst::ActorRefInner<M>;
28	} else if #[cfg(not(reifydb_single_threaded))] {
29		type ActorRefInnerImpl<M> = native::ActorRefInner<M>;
30	} else {
31		type ActorRefInnerImpl<M> = wasm::ActorRefInner<M>;
32	}
33}
34
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum SendError<M> {
37	Closed(M),
38
39	Full(M),
40}
41
42impl<M> SendError<M> {
43	#[inline]
44	pub fn into_inner(self) -> M {
45		match self {
46			SendError::Closed(m) => m,
47			SendError::Full(m) => m,
48		}
49	}
50}
51
52impl<M: fmt::Debug> fmt::Display for SendError<M> {
53	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54		match self {
55			SendError::Closed(_) => write!(f, "actor mailbox closed"),
56			SendError::Full(_) => write!(f, "actor mailbox full"),
57		}
58	}
59}
60
61impl<M: fmt::Debug> error::Error for SendError<M> {}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum AskError {
65	SendFailed,
66
67	ResponseClosed,
68}
69
70impl fmt::Display for AskError {
71	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72		match self {
73			AskError::SendFailed => write!(f, "failed to send ask request"),
74			AskError::ResponseClosed => write!(f, "response channel closed"),
75		}
76	}
77}
78
79impl error::Error for AskError {}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum TryRecvError {
83	Empty,
84
85	Closed,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum RecvError {
90	Closed,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum RecvTimeoutError {
95	Timeout,
96
97	Closed,
98}
99
100pub struct ActorRef<M> {
101	inner: ActorRefInnerImpl<M>,
102}
103
104impl<M> Clone for ActorRef<M> {
105	#[inline]
106	fn clone(&self) -> Self {
107		Self {
108			inner: self.inner.clone(),
109		}
110	}
111}
112
113impl<M> fmt::Debug for ActorRef<M> {
114	#[inline]
115	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116		self.inner.fmt(f)
117	}
118}
119
120// SAFETY: Single-threaded targets (WASM/WASI) don't have real concurrency
121#[cfg(reifydb_single_threaded)]
122unsafe impl<M> Send for ActorRef<M> {}
123
124#[cfg(reifydb_single_threaded)]
125unsafe impl<M> Sync for ActorRef<M> {}
126
127impl<M> ActorRef<M> {
128	#[inline]
129	pub(crate) fn from_inner(inner: ActorRefInnerImpl<M>) -> Self {
130		Self {
131			inner,
132		}
133	}
134}
135
136#[cfg(not(reifydb_single_threaded))]
137impl<M: Send> ActorRef<M> {
138	#[inline]
139	pub(crate) fn new(tx: Sender<M>) -> Self {
140		Self {
141			inner: native::ActorRefInner::new(tx),
142		}
143	}
144
145	#[inline]
146	pub(crate) fn set_notify(&self, f: sync::Arc<dyn Fn() + Send + Sync>) {
147		self.inner.set_notify(f)
148	}
149
150	#[inline]
151	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
152		self.inner.send(msg)
153	}
154
155	#[inline]
156	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
157		self.inner.send_blocking(msg)
158	}
159
160	#[inline]
161	pub fn is_alive(&self) -> bool {
162		self.inner.is_alive()
163	}
164}
165
166#[cfg(reifydb_target = "dst")]
167impl<M> ActorRef<M> {
168	#[inline]
169	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
170		self.inner.send(msg)
171	}
172
173	#[inline]
174	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
175		self.inner.send_blocking(msg)
176	}
177
178	#[inline]
179	pub fn is_alive(&self) -> bool {
180		self.inner.is_alive()
181	}
182
183	#[inline]
184	pub(crate) fn mark_stopped(&self) {
185		self.inner.mark_stopped()
186	}
187
188	#[inline]
189	pub(crate) fn set_notify(&self, f: Box<dyn Fn()>) {
190		self.inner.set_notify(f)
191	}
192}
193
194#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
195impl<M> ActorRef<M> {
196	#[inline]
197	pub(crate) fn new(
198		processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
199		alive: Arc<AtomicBool>,
200		queue: Rc<RefCell<Vec<M>>>,
201		processing: Rc<Cell<bool>>,
202	) -> Self {
203		Self {
204			inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
205		}
206	}
207
208	#[inline]
209	pub(crate) fn from_wasm_inner(
210		processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
211		alive: Arc<AtomicBool>,
212		queue: Rc<RefCell<Vec<M>>>,
213		processing: Rc<Cell<bool>>,
214	) -> Self {
215		Self {
216			inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
217		}
218	}
219
220	#[inline]
221	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
222		self.inner.send(msg)
223	}
224
225	#[inline]
226	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
227		self.inner.send_blocking(msg)
228	}
229
230	#[inline]
231	pub fn is_alive(&self) -> bool {
232		self.inner.is_alive()
233	}
234
235	#[inline]
236	pub(crate) fn mark_stopped(&self) {
237		self.inner.mark_stopped()
238	}
239
240	#[inline]
241	pub(crate) fn processor(&self) -> &Rc<RefCell<Option<Box<dyn FnMut(M)>>>> {
242		&self.inner.processor
243	}
244}
245
246use std::error;
247#[cfg(not(reifydb_single_threaded))]
248use std::sync;
249
250#[cfg(not(reifydb_single_threaded))]
251use crossbeam_channel::Sender;
252#[cfg(reifydb_target = "dst")]
253pub(crate) use dst::create_mailbox as create_dst_mailbox;
254#[cfg(not(reifydb_single_threaded))]
255pub(crate) use native::create_mailbox;
256#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
257pub(crate) use wasm::create_actor_ref;