Skip to main content

reifydb_runtime/actor/
reply.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4#[cfg(reifydb_single_threaded)]
5use std::cell::RefCell;
6#[cfg(reifydb_single_threaded)]
7use std::rc::Rc;
8
9use cfg_if::cfg_if;
10#[cfg(not(reifydb_single_threaded))]
11use tokio::sync::oneshot;
12
13#[cfg(not(reifydb_single_threaded))]
14use super::mailbox::AskError;
15
16cfg_if! {
17	if #[cfg(reifydb_single_threaded)] {
18
19
20		pub struct Reply<T>(Rc<RefCell<Option<T>>>);
21
22
23		pub struct ReplyReceiver<T>(Rc<RefCell<Option<T>>>);
24
25// SAFETY: DST and WASM are single-threaded. These types never cross thread
26
27		unsafe impl<T> Send for Reply<T> {}
28		unsafe impl<T> Sync for Reply<T> {}
29		unsafe impl<T> Send for ReplyReceiver<T> {}
30		unsafe impl<T> Sync for ReplyReceiver<T> {}
31
32
33		pub fn reply_channel<T>() -> (Reply<T>, ReplyReceiver<T>) {
34			let slot = Rc::new(RefCell::new(None));
35			(Reply(Rc::clone(&slot)), ReplyReceiver(slot))
36		}
37
38		impl<T> Reply<T> {
39
40			pub fn send(self, value: T) {
41				*self.0.borrow_mut() = Some(value);
42			}
43		}
44
45		impl<T> ReplyReceiver<T> {
46
47			pub fn try_recv(&self) -> Option<T> {
48				self.0.borrow_mut().take()
49			}
50		}
51	} else {
52
53		pub struct Reply<T>(oneshot::Sender<T>);
54
55
56		pub struct ReplyReceiver<T>(oneshot::Receiver<T>);
57
58
59		pub fn reply_channel<T>() -> (Reply<T>, ReplyReceiver<T>) {
60			let (tx, rx) = oneshot::channel();
61			(Reply(tx), ReplyReceiver(rx))
62		}
63
64		impl<T> Reply<T> {
65
66			pub fn send(self, value: T) {
67				let _ = self.0.send(value);
68			}
69		}
70
71		impl<T> ReplyReceiver<T> {
72
73			pub async fn recv(self) -> Result<T, AskError> {
74				self.0.await.map_err(|_| AskError::ResponseClosed)
75			}
76		}
77	}
78}