crossmist 1.1.2

Efficient and seamless cross-process communication, both synchronously and asynchronously
Documentation
//! Uni- and bidirectional channels between processes.
//!
//! # Channels
//!
//! Create and use a unidirectional channel:
//!
//! ```rust
//! # use crossmist::{channel, Receiver, Sender};
//! let (mut sender, mut receiver): (Sender<i32>, Receiver<i32>) = channel::<i32>()?;
//! sender.send(&57)?;
//! drop(sender);
//! assert_eq!(receiver.recv()?, Some(57));
//! assert_eq!(receiver.recv()?, None);
//! # std::io::Result::Ok(())
//! ```
//!
//! Create and use a bidirectional channel:
//!
//! ```rust
//! # use crossmist::{duplex, Duplex};
//! let (mut side1, mut side2) = duplex::<i32, (i32, i32)>()?;
//! side1.send(&57)?;
//! assert_eq!(side2.recv()?, Some(57));
//! side2.send(&(1, 2))?;
//! assert_eq!(side1.recv()?, Some((1, 2)));
//! drop(side1);
//! assert_eq!(side2.recv()?, None);
//! # std::io::Result::Ok(())
//! ```
//!
//! # Processes
//!
//! To start a child process, you use the `spawn` method generated by `#[func]`:
//!
//! ```ignore
//! #[func]
//! fn my_process() {
//!     ...
//! }
//!
//! let child = my_process.spawn()?;
//! ```
//!
//! You can then kill the child, get its PID, or join it (i.e. wait till it returns and obtain the
//! returned value).

use crate::{
    asynchronous,
    handles::{AsRawHandle, RawHandle},
    FnOnceObject, KillHandle, Object,
};
use std::future::Future;
use std::io::Result;
use std::pin::pin;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

fn block_on<F: Future>(f: F) -> F::Output {
    // https://github.com/rust-lang/rust/issues/98286
    const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
    const RAW: RawWaker = RawWaker::new(std::ptr::null(), &VTABLE);
    let waker = unsafe { Waker::from_raw(RAW) };
    let mut cx = Context::from_waker(&waker);
    match pin!(f).poll(&mut cx) {
        Poll::Ready(value) => value,
        Poll::Pending => unreachable!(),
    }
}

/// Synchronous implementation marker type.
#[derive(Debug, Object)]
pub struct Blocking(asynchronous::SyncStream);

unsafe impl asynchronous::AsyncStream for Blocking {
    fn try_new(stream: asynchronous::SyncStream) -> Result<Self> {
        Ok(Self(stream))
    }

    fn as_raw_handle(&self) -> RawHandle {
        self.0.as_raw_handle()
    }

    #[cfg(unix)]
    const IS_BLOCKING: bool = true;

    #[cfg(unix)]
    async fn blocking_write<T>(&self, mut f: impl FnMut() -> Result<T> + Send) -> Result<T> {
        f()
    }
    #[cfg(windows)]
    async fn write(&mut self, buf: &[u8]) -> Result<()> {
        use std::io::Write;
        self.0.write_all(buf)
    }

    #[cfg(unix)]
    async fn blocking_read<T>(&self, mut f: impl FnMut() -> Result<T> + Send) -> Result<T> {
        f()
    }
    #[cfg(windows)]
    async fn read(&mut self, buf: &mut [u8]) -> Result<()> {
        use std::io::Read;
        self.0.read_exact(buf)
    }
}

/// The transmitting side of a unidirectional channel.
///
/// `T` is the type of the objects this side sends via the channel and the other side receives.
#[derive(Debug, Object)]
pub struct Sender<T: Object>(pub(crate) asynchronous::Sender<Blocking, T>);

/// The receiving side of a unidirectional channel.
///
/// `T` is the type of the objects the other side sends via the channel and this side receives.
#[derive(Debug, Object)]
pub struct Receiver<T: Object>(pub(crate) asynchronous::Receiver<Blocking, T>);

/// A side of a bidirectional channel.
///
/// `S` is the type of the objects this side sends via the channel and the other side receives, `R`
/// is the type of the objects the other side sends via the channel and this side receives.
#[derive(Debug, Object)]
pub struct Duplex<S: Object, R: Object>(pub(crate) asynchronous::Duplex<Blocking, S, R>);

/// Create a unidirectional channel.
pub fn channel<T: Object>() -> Result<(Sender<T>, Receiver<T>)> {
    let (tx, rx) = asynchronous::channel::<Blocking, T>()?;
    Ok((Sender(tx), Receiver(rx)))
}

/// Create a bidirectional channel.
pub fn duplex<A: Object, B: Object>() -> Result<(Duplex<A, B>, Duplex<B, A>)> {
    let (tx, rx) = asynchronous::duplex::<Blocking, A, B>()?;
    Ok((Duplex(tx), Duplex(rx)))
}

impl<T: Object> Sender<T> {
    /// Send a value to the other side.
    pub fn send(&mut self, value: &T) -> Result<()> {
        block_on(self.0.send(value))
    }
}

#[cfg(unix)]
impl<T: Object> std::os::unix::io::AsRawFd for Sender<T> {
    fn as_raw_fd(&self) -> RawHandle {
        self.0.as_raw_handle()
    }
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::AsRawHandle for Sender<T> {
    fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
        std::os::windows::io::AsRawHandle::as_raw_handle(&self.0)
    }
}

#[cfg(unix)]
impl<T: Object> std::os::unix::io::IntoRawFd for Sender<T> {
    fn into_raw_fd(self) -> RawHandle {
        self.0.fd.0.into_raw_fd()
    }
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::IntoRawHandle for Sender<T> {
    fn into_raw_handle(self) -> std::os::windows::io::RawHandle {
        self.0.fd.0.into_raw_handle()
    }
}

#[cfg(unix)]
impl<T: Object> std::os::unix::io::FromRawFd for Sender<T> {
    unsafe fn from_raw_fd(fd: RawHandle) -> Self {
        Self(asynchronous::Sender::from_stream(Blocking(
            asynchronous::SyncStream::from_raw_fd(fd),
        )))
    }
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::FromRawHandle for Sender<T> {
    unsafe fn from_raw_handle(fd: std::os::windows::io::RawHandle) -> Self {
        Self(asynchronous::Sender::from_stream(Blocking(
            asynchronous::SyncStream::from_raw_handle(fd),
        )))
    }
}

impl<T: Object> Receiver<T> {
    /// Receive a value from the other side.
    ///
    /// Returns `Ok(None)` if the other side has dropped the channel.
    pub fn recv(&mut self) -> Result<Option<T>> {
        block_on(self.0.recv())
    }
}

#[cfg(unix)]
impl<T: Object> std::os::unix::io::AsRawFd for Receiver<T> {
    fn as_raw_fd(&self) -> RawHandle {
        self.0.as_raw_handle()
    }
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::AsRawHandle for Receiver<T> {
    fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
        std::os::windows::io::AsRawHandle::as_raw_handle(&self.0)
    }
}

#[cfg(unix)]
impl<T: Object> std::os::unix::io::IntoRawFd for Receiver<T> {
    fn into_raw_fd(self) -> RawHandle {
        self.0.fd.0.into_raw_fd()
    }
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::IntoRawHandle for Receiver<T> {
    fn into_raw_handle(self) -> std::os::windows::io::RawHandle {
        self.0.fd.0.into_raw_handle()
    }
}

#[cfg(unix)]
impl<T: Object> std::os::unix::io::FromRawFd for Receiver<T> {
    unsafe fn from_raw_fd(fd: RawHandle) -> Self {
        Self(asynchronous::Receiver::from_stream(Blocking(
            asynchronous::SyncStream::from_raw_fd(fd),
        )))
    }
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::FromRawHandle for Receiver<T> {
    unsafe fn from_raw_handle(fd: std::os::windows::io::RawHandle) -> Self {
        Self(asynchronous::Receiver::from_stream(Blocking(
            asynchronous::SyncStream::from_raw_handle(fd),
        )))
    }
}

impl<S: Object, R: Object> Duplex<S, R> {
    /// Send a value to the other side.
    pub fn send(&mut self, value: &S) -> Result<()> {
        block_on(self.0.send(value))
    }

    /// Receive a value from the other side.
    ///
    /// Returns `Ok(None)` if the other side has dropped the channel.
    pub fn recv(&mut self) -> Result<Option<R>> {
        block_on(self.0.recv())
    }

    /// Send a value from the other side and wait for a response immediately.
    ///
    /// If the other side closes the channel before responding, an error is returned.
    pub fn request(&mut self, value: &S) -> Result<R> {
        block_on(self.0.request(value))
    }

    pub fn into_sender(self) -> Sender<S> {
        Sender(self.0.into_sender())
    }

    pub fn into_receiver(self) -> Receiver<R> {
        Receiver(self.0.into_receiver())
    }
}

#[cfg(unix)]
impl<S: Object, R: Object> std::os::unix::io::AsRawFd for Duplex<S, R> {
    fn as_raw_fd(&self) -> RawHandle {
        self.0.as_raw_handle()
    }
}

#[cfg(unix)]
impl<S: Object, R: Object> std::os::unix::io::IntoRawFd for Duplex<S, R> {
    fn into_raw_fd(self) -> RawHandle {
        self.0.fd.0.into_raw_fd()
    }
}

#[cfg(unix)]
impl<S: Object, R: Object> std::os::unix::io::FromRawFd for Duplex<S, R> {
    unsafe fn from_raw_fd(fd: RawHandle) -> Self {
        Self(asynchronous::Duplex::from_stream(Blocking(
            asynchronous::SyncStream::from_raw_fd(fd),
        )))
    }
}

/// The subprocess object created by calling `spawn` on a function annottated with `#[func]`.
#[derive(Debug)]
pub struct Child<T: Object>(asynchronous::Child<Blocking, T>);

impl<T: Object> Child<T> {
    /// Get a handle for process termination.
    pub fn get_kill_handle(&self) -> KillHandle {
        self.0.get_kill_handle()
    }

    /// Get ID of the process.
    pub fn id(&self) -> asynchronous::ProcID {
        self.0.id()
    }

    /// Wait for the process to finish and obtain the value it returns.
    ///
    /// An error is returned if the process panics or is terminated. An error is also delivered if
    /// it exits via [`std::process::exit`] or alike instead of returning a value, unless the return
    /// type is `()`. In that case, `Ok(())` is returned.
    pub fn join(self) -> Result<T> {
        block_on(self.0.join())
    }
}

#[doc(hidden)]
pub unsafe fn spawn<T: Object>(
    entry: Box<dyn FnOnceObject<(RawHandle,), Output = i32>>,
) -> Result<Child<T>> {
    block_on(asynchronous::spawn::<Blocking, T>(entry)).map(Child)
}