reifydb_runtime/actor/mailbox/
mod.rs1#[cfg(reifydb_target = "wasm")]
16use std::cell::{Cell, RefCell};
17use std::fmt;
18#[cfg(reifydb_target = "wasm")]
19use std::rc::Rc;
20#[cfg(reifydb_target = "wasm")]
21use std::sync::Arc;
22#[cfg(reifydb_target = "wasm")]
23use std::sync::atomic::AtomicBool;
24
25use cfg_if::cfg_if;
26
27#[cfg(reifydb_target = "native")]
28pub(crate) mod native;
29
30#[cfg(reifydb_target = "wasm")]
31pub(crate) mod wasm;
32
33cfg_if! {
34 if #[cfg(reifydb_target = "native")] {
35 type ActorRefInnerImpl<M> = native::ActorRefInner<M>;
36 } else {
37 type ActorRefInnerImpl<M> = wasm::ActorRefInner<M>;
38 }
39}
40
41#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum SendError<M> {
48 Closed(M),
50 Full(M),
52}
53
54impl<M> SendError<M> {
55 #[inline]
57 pub fn into_inner(self) -> M {
58 match self {
59 SendError::Closed(m) => m,
60 SendError::Full(m) => m,
61 }
62 }
63}
64
65impl<M: fmt::Debug> fmt::Display for SendError<M> {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 match self {
68 SendError::Closed(_) => write!(f, "actor mailbox closed"),
69 SendError::Full(_) => write!(f, "actor mailbox full"),
70 }
71 }
72}
73
74impl<M: fmt::Debug> std::error::Error for SendError<M> {}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
78pub enum AskError {
79 SendFailed,
81 ResponseClosed,
83}
84
85impl fmt::Display for AskError {
86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 match self {
88 AskError::SendFailed => write!(f, "failed to send ask request"),
89 AskError::ResponseClosed => write!(f, "response channel closed"),
90 }
91 }
92}
93
94impl std::error::Error for AskError {}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum TryRecvError {
99 Empty,
101 Closed,
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum RecvError {
108 Closed,
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum RecvTimeoutError {
115 Timeout,
117 Closed,
119}
120
121pub struct ActorRef<M> {
128 inner: ActorRefInnerImpl<M>,
129}
130
131impl<M> Clone for ActorRef<M> {
132 #[inline]
133 fn clone(&self) -> Self {
134 Self {
135 inner: self.inner.clone(),
136 }
137 }
138}
139
140impl<M> fmt::Debug for ActorRef<M> {
141 #[inline]
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 self.inner.fmt(f)
144 }
145}
146
147#[cfg(reifydb_target = "wasm")]
149unsafe impl<M> Send for ActorRef<M> {}
150
151#[cfg(reifydb_target = "wasm")]
152unsafe impl<M> Sync for ActorRef<M> {}
153
154impl<M> ActorRef<M> {
155 #[inline]
157 pub(crate) fn from_inner(inner: ActorRefInnerImpl<M>) -> Self {
158 Self {
159 inner,
160 }
161 }
162}
163
164#[cfg(reifydb_target = "native")]
166impl<M: Send> ActorRef<M> {
167 #[inline]
169 pub(crate) fn new(tx: crossbeam_channel::Sender<M>) -> Self {
170 Self {
171 inner: native::ActorRefInner::new(tx),
172 }
173 }
174
175 #[inline]
177 pub(crate) fn set_notify(&self, f: std::sync::Arc<dyn Fn() + Send + Sync>) {
178 self.inner.set_notify(f)
179 }
180
181 #[inline]
187 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
188 self.inner.send(msg)
189 }
190
191 #[inline]
195 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
196 self.inner.send_blocking(msg)
197 }
198
199 #[inline]
203 pub fn is_alive(&self) -> bool {
204 self.inner.is_alive()
205 }
206}
207
208#[cfg(reifydb_target = "wasm")]
210impl<M> ActorRef<M> {
211 #[inline]
213 pub(crate) fn new(
214 processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
215 alive: Arc<AtomicBool>,
216 queue: Rc<RefCell<Vec<M>>>,
217 processing: Rc<Cell<bool>>,
218 ) -> Self {
219 Self {
220 inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
221 }
222 }
223
224 #[inline]
226 pub(crate) fn from_wasm_inner(
227 processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
228 alive: Arc<AtomicBool>,
229 queue: Rc<RefCell<Vec<M>>>,
230 processing: Rc<Cell<bool>>,
231 ) -> Self {
232 Self {
233 inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
234 }
235 }
236
237 #[inline]
242 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
243 self.inner.send(msg)
244 }
245
246 #[inline]
250 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
251 self.inner.send_blocking(msg)
252 }
253
254 #[inline]
258 pub fn is_alive(&self) -> bool {
259 self.inner.is_alive()
260 }
261
262 #[inline]
264 pub(crate) fn mark_stopped(&self) {
265 self.inner.mark_stopped()
266 }
267
268 #[inline]
270 pub(crate) fn processor(&self) -> &Rc<RefCell<Option<Box<dyn FnMut(M)>>>> {
271 &self.inner.processor
272 }
273}
274
275#[cfg(reifydb_target = "native")]
276pub(crate) use native::create_mailbox;
277#[cfg(reifydb_target = "wasm")]
278pub(crate) use wasm::create_actor_ref;