reifydb_runtime/actor/
reply.rs1#[cfg(reifydb_single_threaded)]
11use std::cell::RefCell;
12#[cfg(reifydb_single_threaded)]
13use std::rc::Rc;
14
15use cfg_if::cfg_if;
16#[cfg(not(reifydb_single_threaded))]
17use tokio::sync::oneshot;
18
19#[cfg(not(reifydb_single_threaded))]
20use super::mailbox::AskError;
21
22cfg_if! {
23 if #[cfg(reifydb_single_threaded)] {
24
25 pub struct Reply<T>(Rc<RefCell<Option<T>>>);
27
28 pub struct ReplyReceiver<T>(Rc<RefCell<Option<T>>>);
30
31 unsafe impl<T> Send for Reply<T> {}
34 unsafe impl<T> Sync for Reply<T> {}
35 unsafe impl<T> Send for ReplyReceiver<T> {}
36 unsafe impl<T> Sync for ReplyReceiver<T> {}
37
38 pub fn reply_channel<T>() -> (Reply<T>, ReplyReceiver<T>) {
40 let slot = Rc::new(RefCell::new(None));
41 (Reply(Rc::clone(&slot)), ReplyReceiver(slot))
42 }
43
44 impl<T> Reply<T> {
45 pub fn send(self, value: T) {
47 *self.0.borrow_mut() = Some(value);
48 }
49 }
50
51 impl<T> ReplyReceiver<T> {
52 pub fn try_recv(&self) -> Option<T> {
54 self.0.borrow_mut().take()
55 }
56 }
57 } else {
58 pub struct Reply<T>(oneshot::Sender<T>);
60
61 pub struct ReplyReceiver<T>(oneshot::Receiver<T>);
63
64 pub fn reply_channel<T>() -> (Reply<T>, ReplyReceiver<T>) {
66 let (tx, rx) = oneshot::channel();
67 (Reply(tx), ReplyReceiver(rx))
68 }
69
70 impl<T> Reply<T> {
71 pub fn send(self, value: T) {
73 let _ = self.0.send(value);
74 }
75 }
76
77 impl<T> ReplyReceiver<T> {
78 pub async fn recv(self) -> Result<T, AskError> {
80 self.0.await.map_err(|_| AskError::ResponseClosed)
81 }
82 }
83 }
84}