reifydb_runtime/actor/mailbox/
mod.rs1#[cfg(reifydb_target = "wasm")]
16use std::cell::{Cell, RefCell};
17use std::fmt;
18#[cfg(reifydb_target = "wasm")]
19use std::rc::Rc;
20#[cfg(reifydb_target = "wasm")]
21use std::sync::Arc;
22#[cfg(reifydb_target = "wasm")]
23use std::sync::atomic::AtomicBool;
24
25use cfg_if::cfg_if;
26
27#[cfg(reifydb_target = "native")]
28pub(crate) mod native;
29
30#[cfg(reifydb_target = "wasm")]
31pub(crate) mod wasm;
32
33cfg_if! {
34 if #[cfg(reifydb_target = "native")] {
35 type ActorRefInnerImpl<M> = native::ActorRefInner<M>;
36 } else {
37 type ActorRefInnerImpl<M> = wasm::ActorRefInner<M>;
38 }
39}
40
41#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum SendError<M> {
44 Closed(M),
46 Full(M),
48}
49
50impl<M> SendError<M> {
51 #[inline]
53 pub fn into_inner(self) -> M {
54 match self {
55 SendError::Closed(m) => m,
56 SendError::Full(m) => m,
57 }
58 }
59}
60
61impl<M: fmt::Debug> fmt::Display for SendError<M> {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 match self {
64 SendError::Closed(_) => write!(f, "actor mailbox closed"),
65 SendError::Full(_) => write!(f, "actor mailbox full"),
66 }
67 }
68}
69
70impl<M: fmt::Debug> error::Error for SendError<M> {}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum AskError {
75 SendFailed,
77 ResponseClosed,
79}
80
81impl fmt::Display for AskError {
82 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83 match self {
84 AskError::SendFailed => write!(f, "failed to send ask request"),
85 AskError::ResponseClosed => write!(f, "response channel closed"),
86 }
87 }
88}
89
90impl error::Error for AskError {}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum TryRecvError {
95 Empty,
97 Closed,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum RecvError {
104 Closed,
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum RecvTimeoutError {
111 Timeout,
113 Closed,
115}
116
117pub struct ActorRef<M> {
124 inner: ActorRefInnerImpl<M>,
125}
126
127impl<M> Clone for ActorRef<M> {
128 #[inline]
129 fn clone(&self) -> Self {
130 Self {
131 inner: self.inner.clone(),
132 }
133 }
134}
135
136impl<M> fmt::Debug for ActorRef<M> {
137 #[inline]
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 self.inner.fmt(f)
140 }
141}
142
143#[cfg(reifydb_target = "wasm")]
145unsafe impl<M> Send for ActorRef<M> {}
146
147#[cfg(reifydb_target = "wasm")]
148unsafe impl<M> Sync for ActorRef<M> {}
149
150impl<M> ActorRef<M> {
151 #[inline]
153 pub(crate) fn from_inner(inner: ActorRefInnerImpl<M>) -> Self {
154 Self {
155 inner,
156 }
157 }
158}
159
160#[cfg(reifydb_target = "native")]
162impl<M: Send> ActorRef<M> {
163 #[inline]
165 pub(crate) fn new(tx: Sender<M>) -> Self {
166 Self {
167 inner: native::ActorRefInner::new(tx),
168 }
169 }
170
171 #[inline]
173 pub(crate) fn set_notify(&self, f: sync::Arc<dyn Fn() + Send + Sync>) {
174 self.inner.set_notify(f)
175 }
176
177 #[inline]
183 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
184 self.inner.send(msg)
185 }
186
187 #[inline]
191 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
192 self.inner.send_blocking(msg)
193 }
194
195 #[inline]
199 pub fn is_alive(&self) -> bool {
200 self.inner.is_alive()
201 }
202}
203
204#[cfg(reifydb_target = "wasm")]
206impl<M> ActorRef<M> {
207 #[inline]
209 pub(crate) fn new(
210 processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
211 alive: Arc<AtomicBool>,
212 queue: Rc<RefCell<Vec<M>>>,
213 processing: Rc<Cell<bool>>,
214 ) -> Self {
215 Self {
216 inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
217 }
218 }
219
220 #[inline]
222 pub(crate) fn from_wasm_inner(
223 processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
224 alive: Arc<AtomicBool>,
225 queue: Rc<RefCell<Vec<M>>>,
226 processing: Rc<Cell<bool>>,
227 ) -> Self {
228 Self {
229 inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
230 }
231 }
232
233 #[inline]
238 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
239 self.inner.send(msg)
240 }
241
242 #[inline]
246 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
247 self.inner.send_blocking(msg)
248 }
249
250 #[inline]
254 pub fn is_alive(&self) -> bool {
255 self.inner.is_alive()
256 }
257
258 #[inline]
260 pub(crate) fn mark_stopped(&self) {
261 self.inner.mark_stopped()
262 }
263
264 #[inline]
266 pub(crate) fn processor(&self) -> &Rc<RefCell<Option<Box<dyn FnMut(M)>>>> {
267 &self.inner.processor
268 }
269}
270
271use std::error;
272#[cfg(reifydb_target = "native")]
273use std::sync;
274
275#[cfg(reifydb_target = "native")]
276use crossbeam_channel::Sender;
277#[cfg(reifydb_target = "native")]
278pub(crate) use native::create_mailbox;
279#[cfg(reifydb_target = "wasm")]
280pub(crate) use wasm::create_actor_ref;