1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
//! Uni- and bidirectional channels between processes.
//!
//! # Channels
//!
//! Create and use a unidirectional channel:
//!
//! ```rust
//! # use crossmist::{channel, Receiver, Sender};
//! let (mut sender, mut receiver): (Sender<i32>, Receiver<i32>) = channel::<i32>()?;
//! sender.send(&57)?;
//! drop(sender);
//! assert_eq!(receiver.recv()?, Some(57));
//! assert_eq!(receiver.recv()?, None);
//! # std::io::Result::Ok(())
//! ```
//!
//! Create and use a bidirectional channel:
//!
//! ```rust
//! # use crossmist::{duplex, Duplex};
//! let (mut side1, mut side2) = duplex::<i32, (i32, i32)>()?;
//! side1.send(&57)?;
//! assert_eq!(side2.recv()?, Some(57));
//! side2.send(&(1, 2))?;
//! assert_eq!(side1.recv()?, Some((1, 2)));
//! drop(side1);
//! assert_eq!(side2.recv()?, None);
//! # std::io::Result::Ok(())
//! ```
//!
//! # Processes
//!
//! To start a child process, you use the `spawn` method generated by `#[func]`:
//!
//! ```ignore
//! #[func]
//! fn my_process() {
//!     ...
//! }
//!
//! let child = my_process.spawn()?;
//! ```
//!
//! You can then kill the child, get its PID, or join it (i.e. wait till it returns and obtain the
//! returned value).

use crate::{
    asynchronous,
    handles::{AsRawHandle, RawHandle},
    FnOnceObject, KillHandle, Object,
};
use std::future::Future;
use std::io::Result;
use std::pin::pin;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

fn block_on<F: Future>(f: F) -> F::Output {
    // https://github.com/rust-lang/rust/issues/98286
    const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
    const RAW: RawWaker = RawWaker::new(std::ptr::null(), &VTABLE);
    let waker = unsafe { Waker::from_raw(RAW) };
    let mut cx = Context::from_waker(&waker);
    match pin!(f).poll(&mut cx) {
        Poll::Ready(value) => value,
        Poll::Pending => unreachable!(),
    }
}

/// Synchronous implementation marker type.
#[derive(Object)]
pub struct Blocking(asynchronous::SyncStream);

unsafe impl asynchronous::AsyncStream for Blocking {
    fn try_new(stream: asynchronous::SyncStream) -> Result<Self> {
        Ok(Self(stream))
    }

    fn as_raw_handle(&self) -> RawHandle {
        self.0.as_raw_handle()
    }

    #[cfg(unix)]
    const IS_BLOCKING: bool = true;

    #[cfg(unix)]
    async fn blocking_write<T>(&self, mut f: impl FnMut() -> Result<T> + Send) -> Result<T> {
        f()
    }
    #[cfg(windows)]
    async fn write(&mut self, buf: &[u8]) -> Result<()> {
        use std::io::Write;
        self.0.write_all(buf)
    }

    #[cfg(unix)]
    async fn blocking_read<T>(&self, mut f: impl FnMut() -> Result<T> + Send) -> Result<T> {
        f()
    }
    #[cfg(windows)]
    async fn read(&mut self, buf: &mut [u8]) -> Result<()> {
        use std::io::Read;
        self.0.read_exact(buf)
    }
}

/// The transmitting side of a unidirectional channel.
///
/// `T` is the type of the objects this side sends via the channel and the other side receives.
#[derive(Object)]
pub struct Sender<T: Object>(pub(crate) asynchronous::Sender<Blocking, T>);

/// The receiving side of a unidirectional channel.
///
/// `T` is the type of the objects the other side sends via the channel and this side receives.
#[derive(Object)]
pub struct Receiver<T: Object>(pub(crate) asynchronous::Receiver<Blocking, T>);

/// A side of a bidirectional channel.
///
/// `S` is the type of the objects this side sends via the channel and the other side receives, `R`
/// is the type of the objects the other side sends via the channel and this side receives.
#[derive(Object)]
pub struct Duplex<S: Object, R: Object>(pub(crate) asynchronous::Duplex<Blocking, S, R>);

/// Create a unidirectional channel.
pub fn channel<T: Object>() -> Result<(Sender<T>, Receiver<T>)> {
    let (tx, rx) = asynchronous::channel::<Blocking, T>()?;
    Ok((Sender(tx), Receiver(rx)))
}

/// Create a bidirectional channel.
pub fn duplex<A: Object, B: Object>() -> Result<(Duplex<A, B>, Duplex<B, A>)> {
    let (tx, rx) = asynchronous::duplex::<Blocking, A, B>()?;
    Ok((Duplex(tx), Duplex(rx)))
}

impl<T: Object> Sender<T> {
    /// Send a value to the other side.
    pub fn send(&mut self, value: &T) -> Result<()> {
        block_on(self.0.send(value))
    }
}

#[cfg(unix)]
impl<T: Object> std::os::unix::io::AsRawFd for Sender<T> {
    fn as_raw_fd(&self) -> RawHandle {
        self.0.as_raw_handle()
    }
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::AsRawHandle for Sender<T> {
    fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
        std::os::windows::io::AsRawHandle::as_raw_handle(&self.0)
    }
}

#[cfg(unix)]
impl<T: Object> std::os::unix::io::IntoRawFd for Sender<T> {
    fn into_raw_fd(self) -> RawHandle {
        self.0.fd.0.into_raw_fd()
    }
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::IntoRawHandle for Sender<T> {
    fn into_raw_handle(self) -> std::os::windows::io::RawHandle {
        self.0.fd.0.into_raw_handle()
    }
}

#[cfg(unix)]
impl<T: Object> std::os::unix::io::FromRawFd for Sender<T> {
    unsafe fn from_raw_fd(fd: RawHandle) -> Self {
        Self(asynchronous::Sender::from_stream(Blocking(
            asynchronous::SyncStream::from_raw_fd(fd),
        )))
    }
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::FromRawHandle for Sender<T> {
    unsafe fn from_raw_handle(fd: std::os::windows::io::RawHandle) -> Self {
        Self(asynchronous::Sender::from_stream(Blocking(
            asynchronous::SyncStream::from_raw_handle(fd),
        )))
    }
}

impl<T: Object> Receiver<T> {
    /// Receive a value from the other side.
    ///
    /// Returns `Ok(None)` if the other side has dropped the channel.
    pub fn recv(&mut self) -> Result<Option<T>> {
        block_on(self.0.recv())
    }
}

#[cfg(unix)]
impl<T: Object> std::os::unix::io::AsRawFd for Receiver<T> {
    fn as_raw_fd(&self) -> RawHandle {
        self.0.as_raw_handle()
    }
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::AsRawHandle for Receiver<T> {
    fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
        std::os::windows::io::AsRawHandle::as_raw_handle(&self.0)
    }
}

#[cfg(unix)]
impl<T: Object> std::os::unix::io::IntoRawFd for Receiver<T> {
    fn into_raw_fd(self) -> RawHandle {
        self.0.fd.0.into_raw_fd()
    }
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::IntoRawHandle for Receiver<T> {
    fn into_raw_handle(self) -> std::os::windows::io::RawHandle {
        self.0.fd.0.into_raw_handle()
    }
}

#[cfg(unix)]
impl<T: Object> std::os::unix::io::FromRawFd for Receiver<T> {
    unsafe fn from_raw_fd(fd: RawHandle) -> Self {
        Self(asynchronous::Receiver::from_stream(Blocking(
            asynchronous::SyncStream::from_raw_fd(fd),
        )))
    }
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::FromRawHandle for Receiver<T> {
    unsafe fn from_raw_handle(fd: std::os::windows::io::RawHandle) -> Self {
        Self(asynchronous::Receiver::from_stream(Blocking(
            asynchronous::SyncStream::from_raw_handle(fd),
        )))
    }
}

impl<S: Object, R: Object> Duplex<S, R> {
    /// Send a value to the other side.
    pub fn send(&mut self, value: &S) -> Result<()> {
        block_on(self.0.send(value))
    }

    /// Receive a value from the other side.
    ///
    /// Returns `Ok(None)` if the other side has dropped the channel.
    pub fn recv(&mut self) -> Result<Option<R>> {
        block_on(self.0.recv())
    }

    /// Send a value from the other side and wait for a response immediately.
    ///
    /// If the other side closes the channel before responding, an error is returned.
    pub fn request(&mut self, value: &S) -> Result<R> {
        block_on(self.0.request(value))
    }

    pub fn into_sender(self) -> Sender<S> {
        Sender(self.0.into_sender())
    }

    pub fn into_receiver(self) -> Receiver<R> {
        Receiver(self.0.into_receiver())
    }
}

#[cfg(unix)]
impl<S: Object, R: Object> std::os::unix::io::AsRawFd for Duplex<S, R> {
    fn as_raw_fd(&self) -> RawHandle {
        self.0.as_raw_handle()
    }
}

#[cfg(unix)]
impl<S: Object, R: Object> std::os::unix::io::IntoRawFd for Duplex<S, R> {
    fn into_raw_fd(self) -> RawHandle {
        self.0.fd.0.into_raw_fd()
    }
}

#[cfg(unix)]
impl<S: Object, R: Object> std::os::unix::io::FromRawFd for Duplex<S, R> {
    unsafe fn from_raw_fd(fd: RawHandle) -> Self {
        Self(asynchronous::Duplex::from_stream(Blocking(
            asynchronous::SyncStream::from_raw_fd(fd),
        )))
    }
}

/// The subprocess object created by calling `spawn` on a function annottated with `#[func]`.
pub struct Child<T: Object>(asynchronous::Child<Blocking, T>);

impl<T: Object> Child<T> {
    /// Get a handle for process termination.
    pub fn get_kill_handle(&self) -> KillHandle {
        self.0.get_kill_handle()
    }

    /// Get ID of the process.
    pub fn id(&self) -> asynchronous::ProcID {
        self.0.id()
    }

    /// Wait for the process to finish and obtain the value it returns.
    ///
    /// An error is returned if the process panics or is terminated. An error is also delivered if
    /// it exits via [`std::process::exit`] or alike instead of returning a value, unless the return
    /// type is `()`. In that case, `Ok(())` is returned.
    pub fn join(self) -> Result<T> {
        block_on(self.0.join())
    }
}

#[doc(hidden)]
pub unsafe fn spawn<T: Object>(
    entry: Box<dyn FnOnceObject<(RawHandle,), Output = i32>>,
) -> Result<Child<T>> {
    block_on(asynchronous::spawn::<Blocking, T>(entry)).map(Child)
}