reifydb_runtime/actor/mailbox/
mod.rs1#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
5use std::cell::{Cell, RefCell};
6use std::fmt;
7#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
8use std::rc::Rc;
9#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
10use std::sync::Arc;
11#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
12use std::sync::atomic::AtomicBool;
13
14use cfg_if::cfg_if;
15
16#[cfg(not(reifydb_single_threaded))]
17pub(crate) mod native;
18
19#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
20pub(crate) mod wasm;
21
22#[cfg(reifydb_target = "dst")]
23pub(crate) mod dst;
24
25cfg_if! {
26 if #[cfg(reifydb_target = "dst")] {
27 type ActorRefInnerImpl<M> = dst::ActorRefInner<M>;
28 } else if #[cfg(not(reifydb_single_threaded))] {
29 type ActorRefInnerImpl<M> = native::ActorRefInner<M>;
30 } else {
31 type ActorRefInnerImpl<M> = wasm::ActorRefInner<M>;
32 }
33}
34
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum SendError<M> {
37 Closed(M),
38
39 Full(M),
40}
41
42impl<M> SendError<M> {
43 #[inline]
44 pub fn into_inner(self) -> M {
45 match self {
46 SendError::Closed(m) => m,
47 SendError::Full(m) => m,
48 }
49 }
50}
51
52impl<M: fmt::Debug> fmt::Display for SendError<M> {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 match self {
55 SendError::Closed(_) => write!(f, "actor mailbox closed"),
56 SendError::Full(_) => write!(f, "actor mailbox full"),
57 }
58 }
59}
60
61impl<M: fmt::Debug> error::Error for SendError<M> {}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum AskError {
65 SendFailed,
66
67 ResponseClosed,
68}
69
70impl fmt::Display for AskError {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 match self {
73 AskError::SendFailed => write!(f, "failed to send ask request"),
74 AskError::ResponseClosed => write!(f, "response channel closed"),
75 }
76 }
77}
78
79impl error::Error for AskError {}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum TryRecvError {
83 Empty,
84
85 Closed,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum RecvError {
90 Closed,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum RecvTimeoutError {
95 Timeout,
96
97 Closed,
98}
99
100pub struct ActorRef<M> {
101 inner: ActorRefInnerImpl<M>,
102}
103
104impl<M> Clone for ActorRef<M> {
105 #[inline]
106 fn clone(&self) -> Self {
107 Self {
108 inner: self.inner.clone(),
109 }
110 }
111}
112
113impl<M> fmt::Debug for ActorRef<M> {
114 #[inline]
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 self.inner.fmt(f)
117 }
118}
119
120#[cfg(reifydb_single_threaded)]
122unsafe impl<M> Send for ActorRef<M> {}
123
124#[cfg(reifydb_single_threaded)]
125unsafe impl<M> Sync for ActorRef<M> {}
126
127impl<M> ActorRef<M> {
128 #[inline]
129 pub(crate) fn from_inner(inner: ActorRefInnerImpl<M>) -> Self {
130 Self {
131 inner,
132 }
133 }
134}
135
136#[cfg(not(reifydb_single_threaded))]
137impl<M: Send> ActorRef<M> {
138 #[inline]
139 pub(crate) fn new(tx: Sender<M>) -> Self {
140 Self {
141 inner: native::ActorRefInner::new(tx),
142 }
143 }
144
145 #[inline]
146 pub(crate) fn set_notify(&self, f: sync::Arc<dyn Fn() + Send + Sync>) {
147 self.inner.set_notify(f)
148 }
149
150 #[inline]
151 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
152 self.inner.send(msg)
153 }
154
155 #[inline]
156 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
157 self.inner.send_blocking(msg)
158 }
159
160 #[inline]
161 pub fn is_alive(&self) -> bool {
162 self.inner.is_alive()
163 }
164}
165
166#[cfg(reifydb_target = "dst")]
167impl<M> ActorRef<M> {
168 #[inline]
169 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
170 self.inner.send(msg)
171 }
172
173 #[inline]
174 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
175 self.inner.send_blocking(msg)
176 }
177
178 #[inline]
179 pub fn is_alive(&self) -> bool {
180 self.inner.is_alive()
181 }
182
183 #[inline]
184 pub(crate) fn mark_stopped(&self) {
185 self.inner.mark_stopped()
186 }
187
188 #[inline]
189 pub(crate) fn set_notify(&self, f: Box<dyn Fn()>) {
190 self.inner.set_notify(f)
191 }
192}
193
194#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
195impl<M> ActorRef<M> {
196 #[inline]
197 pub(crate) fn new(
198 processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
199 alive: Arc<AtomicBool>,
200 queue: Rc<RefCell<Vec<M>>>,
201 processing: Rc<Cell<bool>>,
202 ) -> Self {
203 Self {
204 inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
205 }
206 }
207
208 #[inline]
209 pub(crate) fn from_wasm_inner(
210 processor: Rc<RefCell<Option<Box<dyn FnMut(M)>>>>,
211 alive: Arc<AtomicBool>,
212 queue: Rc<RefCell<Vec<M>>>,
213 processing: Rc<Cell<bool>>,
214 ) -> Self {
215 Self {
216 inner: wasm::ActorRefInner::new(processor, alive, queue, processing),
217 }
218 }
219
220 #[inline]
221 pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
222 self.inner.send(msg)
223 }
224
225 #[inline]
226 pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
227 self.inner.send_blocking(msg)
228 }
229
230 #[inline]
231 pub fn is_alive(&self) -> bool {
232 self.inner.is_alive()
233 }
234
235 #[inline]
236 pub(crate) fn mark_stopped(&self) {
237 self.inner.mark_stopped()
238 }
239
240 #[inline]
241 pub(crate) fn processor(&self) -> &Rc<RefCell<Option<Box<dyn FnMut(M)>>>> {
242 &self.inner.processor
243 }
244}
245
246use std::error;
247#[cfg(not(reifydb_single_threaded))]
248use std::sync;
249
250#[cfg(not(reifydb_single_threaded))]
251use crossbeam_channel::Sender;
252#[cfg(reifydb_target = "dst")]
253pub(crate) use dst::create_mailbox as create_dst_mailbox;
254#[cfg(not(reifydb_single_threaded))]
255pub(crate) use native::create_mailbox;
256#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
257pub(crate) use wasm::create_actor_ref;