rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
use crate::error::JoinError;

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

// smol's `Task<T>` cancels its future when dropped. tokio's `JoinHandle`
// does the opposite: dropping the handle detaches the task. We want
// tokio-equivalent semantics so the rest of the codebase can spawn
// fire-and-forget without thinking about runtime differences. The wrapper
// detaches the inner `Task` on drop unless the handle was awaited to
// completion.
pub struct JoinHandle<T>(Option<smol::Task<T>>);

impl<T> Future for JoinHandle<T> {
    type Output = Result<T, JoinError>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let task = self.0.as_mut().expect("JoinHandle polled after completion");
        match Pin::new(task).poll(cx) {
            Poll::Ready(v) => {
                self.0 = None;
                Poll::Ready(Ok(v))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

impl<T> Drop for JoinHandle<T> {
    fn drop(&mut self) {
        if let Some(task) = self.0.take() {
            task.detach();
        }
    }
}

impl<T> From<smol::Task<T>> for JoinHandle<T> {
    fn from(h: smol::Task<T>) -> Self {
        Self(Some(h))
    }
}

#[track_caller]
pub fn spawn<T>(task: T) -> JoinHandle<T::Output>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    smol::spawn(task).into()
}

pub async fn sleep(duration: std::time::Duration) {
    async_io::Timer::after(duration).await;
}

pub async fn timeout<F, T>(
    duration: std::time::Duration,
    f: F,
) -> std::result::Result<T, Box<dyn std::error::Error>>
where
    F: Future<Output = T>,
{
    futures::select! {
        result = futures::FutureExt::fuse(f) => Ok(result),
        _ = futures::FutureExt::fuse(async_io::Timer::after(duration)) => {
            Err(Box::new(std::io::Error::new(
                std::io::ErrorKind::TimedOut,
                "operation timed out",
            )) as Box<dyn std::error::Error>)
        }
    }
}