Skip to main content

reifydb_runtime/actor/
reply.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Request-response primitive for actor messaging.
5//!
6//! Provides a cfg-gated `Reply<T>` / `ReplyReceiver<T>` pair:
7//! - **Native**: Wraps `tokio::sync::oneshot` — the handler `.await`s the receiver
8//! - **DST**: Wraps `Rc<RefCell<Option<T>>>` — the client reads after `run_until_idle()`
9
10#[cfg(reifydb_single_threaded)]
11use std::cell::RefCell;
12#[cfg(reifydb_single_threaded)]
13use std::rc::Rc;
14
15use cfg_if::cfg_if;
16#[cfg(not(reifydb_single_threaded))]
17use tokio::sync::oneshot;
18
19#[cfg(not(reifydb_single_threaded))]
20use super::mailbox::AskError;
21
22cfg_if! {
23	if #[cfg(reifydb_single_threaded)] {
24
25		/// Sender half of a reply channel (DST/single-threaded).
26		pub struct Reply<T>(Rc<RefCell<Option<T>>>);
27
28		/// Receiver half of a reply channel (DST/single-threaded).
29		pub struct ReplyReceiver<T>(Rc<RefCell<Option<T>>>);
30
31		// SAFETY: DST and WASM are single-threaded. These types never cross thread
32		// boundaries, but the actor trait requires `Send` on messages.
33		unsafe impl<T> Send for Reply<T> {}
34		unsafe impl<T> Sync for Reply<T> {}
35		unsafe impl<T> Send for ReplyReceiver<T> {}
36		unsafe impl<T> Sync for ReplyReceiver<T> {}
37
38		/// Create a linked reply channel pair.
39		pub fn reply_channel<T>() -> (Reply<T>, ReplyReceiver<T>) {
40			let slot = Rc::new(RefCell::new(None));
41			(Reply(Rc::clone(&slot)), ReplyReceiver(slot))
42		}
43
44		impl<T> Reply<T> {
45			/// Send a reply value. Consumes the sender.
46			pub fn send(self, value: T) {
47				*self.0.borrow_mut() = Some(value);
48			}
49		}
50
51		impl<T> ReplyReceiver<T> {
52			/// Try to take the reply value. Returns `None` if not yet sent.
53			pub fn try_recv(&self) -> Option<T> {
54				self.0.borrow_mut().take()
55			}
56		}
57	} else {
58		/// Sender half of a reply channel (native).
59		pub struct Reply<T>(oneshot::Sender<T>);
60
61		/// Receiver half of a reply channel (native).
62		pub struct ReplyReceiver<T>(oneshot::Receiver<T>);
63
64		/// Create a linked reply channel pair.
65		pub fn reply_channel<T>() -> (Reply<T>, ReplyReceiver<T>) {
66			let (tx, rx) = oneshot::channel();
67			(Reply(tx), ReplyReceiver(rx))
68		}
69
70		impl<T> Reply<T> {
71			/// Send a reply value. Consumes the sender.
72			pub fn send(self, value: T) {
73				let _ = self.0.send(value);
74			}
75		}
76
77		impl<T> ReplyReceiver<T> {
78			/// Await the reply value.
79			pub async fn recv(self) -> Result<T, AskError> {
80				self.0.await.map_err(|_| AskError::ResponseClosed)
81			}
82		}
83	}
84}