reifydb_runtime/actor/mailbox/
mod.rs1#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
5use std::cell::{Cell, RefCell};
6use std::fmt;
7#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
8use std::rc::Rc;
9#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
10use std::sync::Arc;
11#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
12use std::sync::atomic::AtomicBool;
13
14use cfg_if::cfg_if;
15
16#[cfg(not(reifydb_single_threaded))]
17pub(crate) mod native;
18
19#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
20pub(crate) mod wasm;
21
22#[cfg(reifydb_target = "dst")]
23pub(crate) mod dst;
24
25cfg_if! {
26 if #[cfg(reifydb_target = "dst")] {
27 type ActorRefInnerImpl<M> = dst::ActorRefInner<M>;
28 } else if #[cfg(not(reifydb_single_threaded))] {
29 type ActorRefInnerImpl<M> = native::ActorRefInner<M>;
30 } else {
31 type ActorRefInnerImpl<M> = wasm::ActorRefInner<M>;
32 }
33}
34
35#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum SendError<M> {
38 Closed(M),
40 Full(M),
42}
43
44impl<M> SendError<M> {
45 #[inline]
47 pub fn into_inner(self) -> M {
48 match self {
49 SendError::Closed(m) => m,
50 SendError::Full(m) => m,
51 }
52 }
53}
54
55impl<M: fmt::Debug> fmt::Display for SendError<M> {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 match self {
58 SendError::Closed(_) => write!(f, "actor mailbox closed"),
59 SendError::Full(_) => write!(f, "actor mailbox full"),
60 }
61 }
62}
63
64impl<M: fmt::Debug> error::Error for SendError<M> {}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum AskError {
69 SendFailed,
71 ResponseClosed,
73}
74
75impl fmt::Display for AskError {
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 match self {
78 AskError::SendFailed => write!(f, "failed to send ask request"),
79 AskError::ResponseClosed => write!(f, "response channel closed"),
80 }
81 }
82}
83
84impl error::Error for AskError {}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum TryRecvError {
89 Empty,
91 Closed,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum RecvError {
98 Closed,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum RecvTimeoutError {
105 Timeout,
107 Closed,
109}
110
111pub struct ActorRef<M> {
118 inner: ActorRefInnerImpl<M>,
119}
120
121impl<M> Clone for ActorRef<M> {
122 #[inline]
123 fn clone(&self) -> Self {
124 Self {
125 inner: self.inner.clone(),
126 }
127 }
128}
129
130impl<M> fmt::Debug for ActorRef<M> {
131 #[inline]
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 self.inner.fmt(f)
134 }
135}
136
137#[cfg(reifydb_single_threaded)]
139unsafe impl<M> Send for ActorRef<M> {}
140
141#[cfg(reifydb_single_threaded)]
142unsafe impl<M> Sync for ActorRef<M> {}
143
144impl<M> ActorRef<M> {
145 #[inline]
147 pub(crate) fn from_inner(inner: ActorRefInnerImpl<M>) -> Self {
148 Self {
149 inner,
150 }
151 }
152}
153
154#[cfg(not(reifydb_single_threaded))]
156impl<M: Send> ActorRef<M> {
157 #[inline]
159 pub(crate) fn new(tx: Sender<M>) -> Self {
160 Self {
161 inner: native::ActorRefInner::new(tx),
162 }
163 }
164
165 #[inline]
167 pub(crate) fn set_notify(&self, f: sync::Arc<dyn Fn() + Send + Sync>) {
168 self.inner.set_notify(f)
169 }
170
171 #[inline]
177 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
178 self.inner.send(msg)
179 }
180
181 #[inline]
185 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
186 self.inner.send_blocking(msg)
187 }
188
189 #[inline]
193 pub fn is_alive(&self) -> bool {
194 self.inner.is_alive()
195 }
196}
197
198#[cfg(reifydb_target = "dst")]
200impl<M> ActorRef<M> {
201 #[inline]
203 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
204 self.inner.send(msg)
205 }
206
207 #[inline]
211 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
212 self.inner.send_blocking(msg)
213 }
214
215 #[inline]
217 pub fn is_alive(&self) -> bool {
218 self.inner.is_alive()
219 }
220
221 #[inline]
223 pub(crate) fn mark_stopped(&self) {
224 self.inner.mark_stopped()
225 }
226
227 #[inline]
229 pub(crate) fn set_notify(&self, f: Box<dyn Fn()>) {
230 self.inner.set_notify(f)
231 }
232}
233
234#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
236impl<M> ActorRef<M> {
237 #[inline]
239 pub(crate) fn new(
240 processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
241 alive: Arc<AtomicBool>,
242 queue: Rc<RefCell<Vec<M>>>,
243 processing: Rc<Cell<bool>>,
244 ) -> Self {
245 Self {
246 inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
247 }
248 }
249
250 #[inline]
252 pub(crate) fn from_wasm_inner(
253 processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
254 alive: Arc<AtomicBool>,
255 queue: Rc<RefCell<Vec<M>>>,
256 processing: Rc<Cell<bool>>,
257 ) -> Self {
258 Self {
259 inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
260 }
261 }
262
263 #[inline]
268 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
269 self.inner.send(msg)
270 }
271
272 #[inline]
276 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
277 self.inner.send_blocking(msg)
278 }
279
280 #[inline]
284 pub fn is_alive(&self) -> bool {
285 self.inner.is_alive()
286 }
287
288 #[inline]
290 pub(crate) fn mark_stopped(&self) {
291 self.inner.mark_stopped()
292 }
293
294 #[inline]
296 pub(crate) fn processor(&self) -> &Rc<RefCell<Option<Box<dyn FnMut(M)>>>> {
297 &self.inner.processor
298 }
299}
300
301use std::error;
302#[cfg(not(reifydb_single_threaded))]
303use std::sync;
304
305#[cfg(not(reifydb_single_threaded))]
306use crossbeam_channel::Sender;
307#[cfg(reifydb_target = "dst")]
308pub(crate) use dst::create_mailbox as create_dst_mailbox;
309#[cfg(not(reifydb_single_threaded))]
310pub(crate) use native::create_mailbox;
311#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
312pub(crate) use wasm::create_actor_ref;