crossmist/
blocking.rs

1//! Uni- and bidirectional channels between processes.
2//!
3//! # Channels
4//!
5//! Create and use a unidirectional channel:
6//!
7//! ```rust
8//! # use crossmist::{channel, Receiver, Sender};
9//! let (mut sender, mut receiver): (Sender<i32>, Receiver<i32>) = channel::<i32>()?;
10//! sender.send(&57)?;
11//! drop(sender);
12//! assert_eq!(receiver.recv()?, Some(57));
13//! assert_eq!(receiver.recv()?, None);
14//! # std::io::Result::Ok(())
15//! ```
16//!
17//! Create and use a bidirectional channel:
18//!
19//! ```rust
20//! # use crossmist::{duplex, Duplex};
21//! let (mut side1, mut side2) = duplex::<i32, (i32, i32)>()?;
22//! side1.send(&57)?;
23//! assert_eq!(side2.recv()?, Some(57));
24//! side2.send(&(1, 2))?;
25//! assert_eq!(side1.recv()?, Some((1, 2)));
26//! drop(side1);
27//! assert_eq!(side2.recv()?, None);
28//! # std::io::Result::Ok(())
29//! ```
30//!
31//! # Processes
32//!
33//! To start a child process, you use the `spawn` method generated by `#[func]`:
34//!
35//! ```ignore
36//! #[func]
37//! fn my_process() {
38//!     ...
39//! }
40//!
41//! let child = my_process.spawn()?;
42//! ```
43//!
44//! You can then kill the child, get its PID, or join it (i.e. wait till it returns and obtain the
45//! returned value).
46
47use crate::{
48    asynchronous,
49    handles::{AsRawHandle, RawHandle},
50    FnOnceObject, KillHandle, Object,
51};
52use std::future::Future;
53use std::io::Result;
54use std::pin::pin;
55use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
56
57fn block_on<F: Future>(f: F) -> F::Output {
58    // https://github.com/rust-lang/rust/issues/98286
59    const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
60    const RAW: RawWaker = RawWaker::new(std::ptr::null(), &VTABLE);
61    let waker = unsafe { Waker::from_raw(RAW) };
62    let mut cx = Context::from_waker(&waker);
63    match pin!(f).poll(&mut cx) {
64        Poll::Ready(value) => value,
65        Poll::Pending => unreachable!(),
66    }
67}
68
69/// Synchronous implementation marker type.
70#[derive(Debug, Object)]
71pub struct Blocking(asynchronous::SyncStream);
72
73unsafe impl asynchronous::AsyncStream for Blocking {
74    fn try_new(stream: asynchronous::SyncStream) -> Result<Self> {
75        Ok(Self(stream))
76    }
77
78    fn as_raw_handle(&self) -> RawHandle {
79        self.0.as_raw_handle()
80    }
81
82    #[cfg(unix)]
83    const IS_BLOCKING: bool = true;
84
85    #[cfg(unix)]
86    async fn blocking_write<T>(&self, mut f: impl FnMut() -> Result<T> + Send) -> Result<T> {
87        f()
88    }
89    #[cfg(windows)]
90    async fn write(&mut self, buf: &[u8]) -> Result<()> {
91        use std::io::Write;
92        self.0.write_all(buf)
93    }
94
95    #[cfg(unix)]
96    async fn blocking_read<T>(&self, mut f: impl FnMut() -> Result<T> + Send) -> Result<T> {
97        f()
98    }
99    #[cfg(windows)]
100    async fn read(&mut self, buf: &mut [u8]) -> Result<()> {
101        use std::io::Read;
102        self.0.read_exact(buf)
103    }
104}
105
106/// The transmitting side of a unidirectional channel.
107///
108/// `T` is the type of the objects this side sends via the channel and the other side receives.
109#[derive(Debug, Object)]
110pub struct Sender<T: Object>(pub(crate) asynchronous::Sender<Blocking, T>);
111
112/// The receiving side of a unidirectional channel.
113///
114/// `T` is the type of the objects the other side sends via the channel and this side receives.
115#[derive(Debug, Object)]
116pub struct Receiver<T: Object>(pub(crate) asynchronous::Receiver<Blocking, T>);
117
118/// A side of a bidirectional channel.
119///
120/// `S` is the type of the objects this side sends via the channel and the other side receives, `R`
121/// is the type of the objects the other side sends via the channel and this side receives.
122#[derive(Debug, Object)]
123pub struct Duplex<S: Object, R: Object>(pub(crate) asynchronous::Duplex<Blocking, S, R>);
124
125/// Create a unidirectional channel.
126pub fn channel<T: Object>() -> Result<(Sender<T>, Receiver<T>)> {
127    let (tx, rx) = asynchronous::channel::<Blocking, T>()?;
128    Ok((Sender(tx), Receiver(rx)))
129}
130
131/// Create a bidirectional channel.
132pub fn duplex<A: Object, B: Object>() -> Result<(Duplex<A, B>, Duplex<B, A>)> {
133    let (tx, rx) = asynchronous::duplex::<Blocking, A, B>()?;
134    Ok((Duplex(tx), Duplex(rx)))
135}
136
137impl<T: Object> Sender<T> {
138    /// Send a value to the other side.
139    pub fn send(&mut self, value: &T) -> Result<()> {
140        block_on(self.0.send(value))
141    }
142}
143
144#[cfg(unix)]
145impl<T: Object> std::os::unix::io::AsRawFd for Sender<T> {
146    fn as_raw_fd(&self) -> RawHandle {
147        self.0.as_raw_handle()
148    }
149}
150#[cfg(windows)]
151impl<T: Object> std::os::windows::io::AsRawHandle for Sender<T> {
152    fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
153        std::os::windows::io::AsRawHandle::as_raw_handle(&self.0)
154    }
155}
156
157#[cfg(unix)]
158impl<T: Object> std::os::unix::io::IntoRawFd for Sender<T> {
159    fn into_raw_fd(self) -> RawHandle {
160        self.0.fd.0.into_raw_fd()
161    }
162}
163#[cfg(windows)]
164impl<T: Object> std::os::windows::io::IntoRawHandle for Sender<T> {
165    fn into_raw_handle(self) -> std::os::windows::io::RawHandle {
166        self.0.fd.0.into_raw_handle()
167    }
168}
169
170#[cfg(unix)]
171impl<T: Object> std::os::unix::io::FromRawFd for Sender<T> {
172    unsafe fn from_raw_fd(fd: RawHandle) -> Self {
173        Self(asynchronous::Sender::from_stream(Blocking(
174            asynchronous::SyncStream::from_raw_fd(fd),
175        )))
176    }
177}
178#[cfg(windows)]
179impl<T: Object> std::os::windows::io::FromRawHandle for Sender<T> {
180    unsafe fn from_raw_handle(fd: std::os::windows::io::RawHandle) -> Self {
181        Self(asynchronous::Sender::from_stream(Blocking(
182            asynchronous::SyncStream::from_raw_handle(fd),
183        )))
184    }
185}
186
187impl<T: Object> Receiver<T> {
188    /// Receive a value from the other side.
189    ///
190    /// Returns `Ok(None)` if the other side has dropped the channel.
191    pub fn recv(&mut self) -> Result<Option<T>> {
192        block_on(self.0.recv())
193    }
194}
195
196#[cfg(unix)]
197impl<T: Object> std::os::unix::io::AsRawFd for Receiver<T> {
198    fn as_raw_fd(&self) -> RawHandle {
199        self.0.as_raw_handle()
200    }
201}
202#[cfg(windows)]
203impl<T: Object> std::os::windows::io::AsRawHandle for Receiver<T> {
204    fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
205        std::os::windows::io::AsRawHandle::as_raw_handle(&self.0)
206    }
207}
208
209#[cfg(unix)]
210impl<T: Object> std::os::unix::io::IntoRawFd for Receiver<T> {
211    fn into_raw_fd(self) -> RawHandle {
212        self.0.fd.0.into_raw_fd()
213    }
214}
215#[cfg(windows)]
216impl<T: Object> std::os::windows::io::IntoRawHandle for Receiver<T> {
217    fn into_raw_handle(self) -> std::os::windows::io::RawHandle {
218        self.0.fd.0.into_raw_handle()
219    }
220}
221
222#[cfg(unix)]
223impl<T: Object> std::os::unix::io::FromRawFd for Receiver<T> {
224    unsafe fn from_raw_fd(fd: RawHandle) -> Self {
225        Self(asynchronous::Receiver::from_stream(Blocking(
226            asynchronous::SyncStream::from_raw_fd(fd),
227        )))
228    }
229}
230#[cfg(windows)]
231impl<T: Object> std::os::windows::io::FromRawHandle for Receiver<T> {
232    unsafe fn from_raw_handle(fd: std::os::windows::io::RawHandle) -> Self {
233        Self(asynchronous::Receiver::from_stream(Blocking(
234            asynchronous::SyncStream::from_raw_handle(fd),
235        )))
236    }
237}
238
239impl<S: Object, R: Object> Duplex<S, R> {
240    /// Send a value to the other side.
241    pub fn send(&mut self, value: &S) -> Result<()> {
242        block_on(self.0.send(value))
243    }
244
245    /// Receive a value from the other side.
246    ///
247    /// Returns `Ok(None)` if the other side has dropped the channel.
248    pub fn recv(&mut self) -> Result<Option<R>> {
249        block_on(self.0.recv())
250    }
251
252    /// Send a value from the other side and wait for a response immediately.
253    ///
254    /// If the other side closes the channel before responding, an error is returned.
255    pub fn request(&mut self, value: &S) -> Result<R> {
256        block_on(self.0.request(value))
257    }
258
259    pub fn into_sender(self) -> Sender<S> {
260        Sender(self.0.into_sender())
261    }
262
263    pub fn into_receiver(self) -> Receiver<R> {
264        Receiver(self.0.into_receiver())
265    }
266}
267
268#[cfg(unix)]
269impl<S: Object, R: Object> std::os::unix::io::AsRawFd for Duplex<S, R> {
270    fn as_raw_fd(&self) -> RawHandle {
271        self.0.as_raw_handle()
272    }
273}
274
275#[cfg(unix)]
276impl<S: Object, R: Object> std::os::unix::io::IntoRawFd for Duplex<S, R> {
277    fn into_raw_fd(self) -> RawHandle {
278        self.0.fd.0.into_raw_fd()
279    }
280}
281
282#[cfg(unix)]
283impl<S: Object, R: Object> std::os::unix::io::FromRawFd for Duplex<S, R> {
284    unsafe fn from_raw_fd(fd: RawHandle) -> Self {
285        Self(asynchronous::Duplex::from_stream(Blocking(
286            asynchronous::SyncStream::from_raw_fd(fd),
287        )))
288    }
289}
290
291/// The subprocess object created by calling `spawn` on a function annottated with `#[func]`.
292#[derive(Debug)]
293pub struct Child<T: Object>(asynchronous::Child<Blocking, T>);
294
295impl<T: Object> Child<T> {
296    /// Get a handle for process termination.
297    pub fn get_kill_handle(&self) -> KillHandle {
298        self.0.get_kill_handle()
299    }
300
301    /// Get ID of the process.
302    pub fn id(&self) -> asynchronous::ProcID {
303        self.0.id()
304    }
305
306    /// Wait for the process to finish and obtain the value it returns.
307    ///
308    /// An error is returned if the process panics or is terminated. An error is also delivered if
309    /// it exits via [`std::process::exit`] or alike instead of returning a value, unless the return
310    /// type is `()`. In that case, `Ok(())` is returned.
311    pub fn join(self) -> Result<T> {
312        block_on(self.0.join())
313    }
314}
315
316#[doc(hidden)]
317pub unsafe fn spawn<T: Object>(
318    entry: Box<dyn FnOnceObject<(RawHandle,), Output = i32>>,
319) -> Result<Child<T>> {
320    block_on(asynchronous::spawn::<Blocking, T>(entry)).map(Child)
321}