reifydb_runtime/actor/mailbox/
mod.rs1#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
16use std::cell::{Cell, RefCell};
17use std::fmt;
18#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
19use std::rc::Rc;
20#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
21use std::sync::Arc;
22#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
23use std::sync::atomic::AtomicBool;
24
25use cfg_if::cfg_if;
26
27#[cfg(not(reifydb_single_threaded))]
28pub(crate) mod native;
29
30#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
31pub(crate) mod wasm;
32
33#[cfg(reifydb_target = "dst")]
34pub(crate) mod dst;
35
36cfg_if! {
37 if #[cfg(reifydb_target = "dst")] {
38 type ActorRefInnerImpl<M> = dst::ActorRefInner<M>;
39 } else if #[cfg(not(reifydb_single_threaded))] {
40 type ActorRefInnerImpl<M> = native::ActorRefInner<M>;
41 } else {
42 type ActorRefInnerImpl<M> = wasm::ActorRefInner<M>;
43 }
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum SendError<M> {
49 Closed(M),
51 Full(M),
53}
54
55impl<M> SendError<M> {
56 #[inline]
58 pub fn into_inner(self) -> M {
59 match self {
60 SendError::Closed(m) => m,
61 SendError::Full(m) => m,
62 }
63 }
64}
65
66impl<M: fmt::Debug> fmt::Display for SendError<M> {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 match self {
69 SendError::Closed(_) => write!(f, "actor mailbox closed"),
70 SendError::Full(_) => write!(f, "actor mailbox full"),
71 }
72 }
73}
74
75impl<M: fmt::Debug> error::Error for SendError<M> {}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
79pub enum AskError {
80 SendFailed,
82 ResponseClosed,
84}
85
86impl fmt::Display for AskError {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 match self {
89 AskError::SendFailed => write!(f, "failed to send ask request"),
90 AskError::ResponseClosed => write!(f, "response channel closed"),
91 }
92 }
93}
94
95impl error::Error for AskError {}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub enum TryRecvError {
100 Empty,
102 Closed,
104}
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub enum RecvError {
109 Closed,
111}
112
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
115pub enum RecvTimeoutError {
116 Timeout,
118 Closed,
120}
121
122pub struct ActorRef<M> {
129 inner: ActorRefInnerImpl<M>,
130}
131
132impl<M> Clone for ActorRef<M> {
133 #[inline]
134 fn clone(&self) -> Self {
135 Self {
136 inner: self.inner.clone(),
137 }
138 }
139}
140
141impl<M> fmt::Debug for ActorRef<M> {
142 #[inline]
143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144 self.inner.fmt(f)
145 }
146}
147
148#[cfg(reifydb_single_threaded)]
150unsafe impl<M> Send for ActorRef<M> {}
151
152#[cfg(reifydb_single_threaded)]
153unsafe impl<M> Sync for ActorRef<M> {}
154
155impl<M> ActorRef<M> {
156 #[inline]
158 pub(crate) fn from_inner(inner: ActorRefInnerImpl<M>) -> Self {
159 Self {
160 inner,
161 }
162 }
163}
164
165#[cfg(not(reifydb_single_threaded))]
167impl<M: Send> ActorRef<M> {
168 #[inline]
170 pub(crate) fn new(tx: Sender<M>) -> Self {
171 Self {
172 inner: native::ActorRefInner::new(tx),
173 }
174 }
175
176 #[inline]
178 pub(crate) fn set_notify(&self, f: sync::Arc<dyn Fn() + Send + Sync>) {
179 self.inner.set_notify(f)
180 }
181
182 #[inline]
188 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
189 self.inner.send(msg)
190 }
191
192 #[inline]
196 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
197 self.inner.send_blocking(msg)
198 }
199
200 #[inline]
204 pub fn is_alive(&self) -> bool {
205 self.inner.is_alive()
206 }
207}
208
209#[cfg(reifydb_target = "dst")]
211impl<M> ActorRef<M> {
212 #[inline]
214 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
215 self.inner.send(msg)
216 }
217
218 #[inline]
222 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
223 self.inner.send_blocking(msg)
224 }
225
226 #[inline]
228 pub fn is_alive(&self) -> bool {
229 self.inner.is_alive()
230 }
231
232 #[inline]
234 pub(crate) fn mark_stopped(&self) {
235 self.inner.mark_stopped()
236 }
237
238 #[inline]
240 pub(crate) fn set_notify(&self, f: Box<dyn Fn()>) {
241 self.inner.set_notify(f)
242 }
243}
244
245#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
247impl<M> ActorRef<M> {
248 #[inline]
250 pub(crate) fn new(
251 processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
252 alive: Arc<AtomicBool>,
253 queue: Rc<RefCell<Vec<M>>>,
254 processing: Rc<Cell<bool>>,
255 ) -> Self {
256 Self {
257 inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
258 }
259 }
260
261 #[inline]
263 pub(crate) fn from_wasm_inner(
264 processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
265 alive: Arc<AtomicBool>,
266 queue: Rc<RefCell<Vec<M>>>,
267 processing: Rc<Cell<bool>>,
268 ) -> Self {
269 Self {
270 inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
271 }
272 }
273
274 #[inline]
279 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
280 self.inner.send(msg)
281 }
282
283 #[inline]
287 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
288 self.inner.send_blocking(msg)
289 }
290
291 #[inline]
295 pub fn is_alive(&self) -> bool {
296 self.inner.is_alive()
297 }
298
299 #[inline]
301 pub(crate) fn mark_stopped(&self) {
302 self.inner.mark_stopped()
303 }
304
305 #[inline]
307 pub(crate) fn processor(&self) -> &Rc<RefCell<Option<Box<dyn FnMut(M)>>>> {
308 &self.inner.processor
309 }
310}
311
312use std::error;
313#[cfg(not(reifydb_single_threaded))]
314use std::sync;
315
316#[cfg(not(reifydb_single_threaded))]
317use crossbeam_channel::Sender;
318#[cfg(reifydb_target = "dst")]
319pub(crate) use dst::create_mailbox as create_dst_mailbox;
320#[cfg(not(reifydb_single_threaded))]
321pub(crate) use native::create_mailbox;
322#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
323pub(crate) use wasm::create_actor_ref;