reifydb_runtime/actor/
reply.rs1#[cfg(reifydb_single_threaded)]
5use std::cell::RefCell;
6#[cfg(reifydb_single_threaded)]
7use std::rc::Rc;
8
9use cfg_if::cfg_if;
10#[cfg(not(reifydb_single_threaded))]
11use tokio::sync::oneshot;
12
13#[cfg(not(reifydb_single_threaded))]
14use super::mailbox::AskError;
15
16cfg_if! {
17 if #[cfg(reifydb_single_threaded)] {
18
19
20 pub struct Reply<T>(Rc<RefCell<Option<T>>>);
21
22
23 pub struct ReplyReceiver<T>(Rc<RefCell<Option<T>>>);
24
25unsafe impl<T> Send for Reply<T> {}
28 unsafe impl<T> Sync for Reply<T> {}
29 unsafe impl<T> Send for ReplyReceiver<T> {}
30 unsafe impl<T> Sync for ReplyReceiver<T> {}
31
32
33 pub fn reply_channel<T>() -> (Reply<T>, ReplyReceiver<T>) {
34 let slot = Rc::new(RefCell::new(None));
35 (Reply(Rc::clone(&slot)), ReplyReceiver(slot))
36 }
37
38 impl<T> Reply<T> {
39
40 pub fn send(self, value: T) {
41 *self.0.borrow_mut() = Some(value);
42 }
43 }
44
45 impl<T> ReplyReceiver<T> {
46
47 pub fn try_recv(&self) -> Option<T> {
48 self.0.borrow_mut().take()
49 }
50 }
51 } else {
52
53 pub struct Reply<T>(oneshot::Sender<T>);
54
55
56 pub struct ReplyReceiver<T>(oneshot::Receiver<T>);
57
58
59 pub fn reply_channel<T>() -> (Reply<T>, ReplyReceiver<T>) {
60 let (tx, rx) = oneshot::channel();
61 (Reply(tx), ReplyReceiver(rx))
62 }
63
64 impl<T> Reply<T> {
65
66 pub fn send(self, value: T) {
67 let _ = self.0.send(value);
68 }
69 }
70
71 impl<T> ReplyReceiver<T> {
72
73 pub async fn recv(self) -> Result<T, AskError> {
74 self.0.await.map_err(|_| AskError::ResponseClosed)
75 }
76 }
77 }
78}