use crate::{
asynchronous,
handles::{AsRawHandle, RawHandle},
FnOnceObject, KillHandle, Object,
};
use std::future::Future;
use std::io::Result;
use std::pin::pin;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
fn block_on<F: Future>(f: F) -> F::Output {
const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
const RAW: RawWaker = RawWaker::new(std::ptr::null(), &VTABLE);
let waker = unsafe { Waker::from_raw(RAW) };
let mut cx = Context::from_waker(&waker);
match pin!(f).poll(&mut cx) {
Poll::Ready(value) => value,
Poll::Pending => unreachable!(),
}
}
#[derive(Object)]
pub struct Blocking(asynchronous::SyncStream);
unsafe impl asynchronous::AsyncStream for Blocking {
fn try_new(stream: asynchronous::SyncStream) -> Result<Self> {
Ok(Self(stream))
}
fn as_raw_handle(&self) -> RawHandle {
self.0.as_raw_handle()
}
#[cfg(unix)]
const IS_BLOCKING: bool = true;
#[cfg(unix)]
async fn blocking_write<T>(&self, mut f: impl FnMut() -> Result<T> + Send) -> Result<T> {
f()
}
#[cfg(windows)]
async fn write(&mut self, buf: &[u8]) -> Result<()> {
use std::io::Write;
self.0.write_all(buf)
}
#[cfg(unix)]
async fn blocking_read<T>(&self, mut f: impl FnMut() -> Result<T> + Send) -> Result<T> {
f()
}
#[cfg(windows)]
async fn read(&mut self, buf: &mut [u8]) -> Result<()> {
use std::io::Read;
self.0.read_exact(buf)
}
}
#[derive(Object)]
pub struct Sender<T: Object>(pub(crate) asynchronous::Sender<Blocking, T>);
#[derive(Object)]
pub struct Receiver<T: Object>(pub(crate) asynchronous::Receiver<Blocking, T>);
#[derive(Object)]
pub struct Duplex<S: Object, R: Object>(pub(crate) asynchronous::Duplex<Blocking, S, R>);
pub fn channel<T: Object>() -> Result<(Sender<T>, Receiver<T>)> {
let (tx, rx) = asynchronous::channel::<Blocking, T>()?;
Ok((Sender(tx), Receiver(rx)))
}
pub fn duplex<A: Object, B: Object>() -> Result<(Duplex<A, B>, Duplex<B, A>)> {
let (tx, rx) = asynchronous::duplex::<Blocking, A, B>()?;
Ok((Duplex(tx), Duplex(rx)))
}
impl<T: Object> Sender<T> {
pub fn send(&mut self, value: &T) -> Result<()> {
block_on(self.0.send(value))
}
}
#[cfg(unix)]
impl<T: Object> std::os::unix::io::AsRawFd for Sender<T> {
fn as_raw_fd(&self) -> RawHandle {
self.0.as_raw_handle()
}
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::AsRawHandle for Sender<T> {
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
std::os::windows::io::AsRawHandle::as_raw_handle(&self.0)
}
}
#[cfg(unix)]
impl<T: Object> std::os::unix::io::IntoRawFd for Sender<T> {
fn into_raw_fd(self) -> RawHandle {
self.0.fd.0.into_raw_fd()
}
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::IntoRawHandle for Sender<T> {
fn into_raw_handle(self) -> std::os::windows::io::RawHandle {
self.0.fd.0.into_raw_handle()
}
}
#[cfg(unix)]
impl<T: Object> std::os::unix::io::FromRawFd for Sender<T> {
unsafe fn from_raw_fd(fd: RawHandle) -> Self {
Self(asynchronous::Sender::from_stream(Blocking(
asynchronous::SyncStream::from_raw_fd(fd),
)))
}
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::FromRawHandle for Sender<T> {
unsafe fn from_raw_handle(fd: std::os::windows::io::RawHandle) -> Self {
Self(asynchronous::Sender::from_stream(Blocking(
asynchronous::SyncStream::from_raw_handle(fd),
)))
}
}
impl<T: Object> Receiver<T> {
pub fn recv(&mut self) -> Result<Option<T>> {
block_on(self.0.recv())
}
}
#[cfg(unix)]
impl<T: Object> std::os::unix::io::AsRawFd for Receiver<T> {
fn as_raw_fd(&self) -> RawHandle {
self.0.as_raw_handle()
}
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::AsRawHandle for Receiver<T> {
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
std::os::windows::io::AsRawHandle::as_raw_handle(&self.0)
}
}
#[cfg(unix)]
impl<T: Object> std::os::unix::io::IntoRawFd for Receiver<T> {
fn into_raw_fd(self) -> RawHandle {
self.0.fd.0.into_raw_fd()
}
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::IntoRawHandle for Receiver<T> {
fn into_raw_handle(self) -> std::os::windows::io::RawHandle {
self.0.fd.0.into_raw_handle()
}
}
#[cfg(unix)]
impl<T: Object> std::os::unix::io::FromRawFd for Receiver<T> {
unsafe fn from_raw_fd(fd: RawHandle) -> Self {
Self(asynchronous::Receiver::from_stream(Blocking(
asynchronous::SyncStream::from_raw_fd(fd),
)))
}
}
#[cfg(windows)]
impl<T: Object> std::os::windows::io::FromRawHandle for Receiver<T> {
unsafe fn from_raw_handle(fd: std::os::windows::io::RawHandle) -> Self {
Self(asynchronous::Receiver::from_stream(Blocking(
asynchronous::SyncStream::from_raw_handle(fd),
)))
}
}
impl<S: Object, R: Object> Duplex<S, R> {
pub fn send(&mut self, value: &S) -> Result<()> {
block_on(self.0.send(value))
}
pub fn recv(&mut self) -> Result<Option<R>> {
block_on(self.0.recv())
}
pub fn request(&mut self, value: &S) -> Result<R> {
block_on(self.0.request(value))
}
pub fn into_sender(self) -> Sender<S> {
Sender(self.0.into_sender())
}
pub fn into_receiver(self) -> Receiver<R> {
Receiver(self.0.into_receiver())
}
}
#[cfg(unix)]
impl<S: Object, R: Object> std::os::unix::io::AsRawFd for Duplex<S, R> {
fn as_raw_fd(&self) -> RawHandle {
self.0.as_raw_handle()
}
}
#[cfg(unix)]
impl<S: Object, R: Object> std::os::unix::io::IntoRawFd for Duplex<S, R> {
fn into_raw_fd(self) -> RawHandle {
self.0.fd.0.into_raw_fd()
}
}
#[cfg(unix)]
impl<S: Object, R: Object> std::os::unix::io::FromRawFd for Duplex<S, R> {
unsafe fn from_raw_fd(fd: RawHandle) -> Self {
Self(asynchronous::Duplex::from_stream(Blocking(
asynchronous::SyncStream::from_raw_fd(fd),
)))
}
}
pub struct Child<T: Object>(asynchronous::Child<Blocking, T>);
impl<T: Object> Child<T> {
pub fn get_kill_handle(&self) -> KillHandle {
self.0.get_kill_handle()
}
pub fn id(&self) -> asynchronous::ProcID {
self.0.id()
}
pub fn join(self) -> Result<T> {
block_on(self.0.join())
}
}
#[doc(hidden)]
pub unsafe fn spawn<T: Object>(
entry: Box<dyn FnOnceObject<(RawHandle,), Output = i32>>,
) -> Result<Child<T>> {
block_on(asynchronous::spawn::<Blocking, T>(entry)).map(Child)
}