subduction_websocket 0.9.0

WebSocket transport layer for the Subduction sync protocol
Documentation
//! Tokio implementations for [`WebSocket`][crate::websocket::WebSocket]s.

use core::time::Duration;

use future_form::Sendable;
use futures::{
    FutureExt,
    future::BoxFuture,
    stream::{AbortHandle, Abortable},
};
use subduction_core::connection::manager::Spawn;

use subduction_core::timeout::{TimedOut, Timeout};

#[cfg(feature = "tokio_client_any")]
pub mod client;

#[cfg(feature = "tokio_server_any")]
pub mod server;

#[cfg(feature = "tokio_server_any")]
pub mod unified;

/// A spawner that uses tokio to spawn tasks.
#[derive(Debug, Clone, Copy, Default)]
pub struct TokioSpawn;

impl Spawn<Sendable> for TokioSpawn {
    fn spawn(&self, fut: BoxFuture<'static, ()>) -> AbortHandle {
        let (handle, reg) = AbortHandle::new_pair();
        tokio::spawn(async move {
            let _ = Abortable::new(fut, reg).await;
        });
        handle
    }
}

/// Tokio-backed timeout wrapper.
#[derive(Debug, Clone, Copy, Default)]
pub struct TimeoutTokio;

impl Timeout<Sendable> for TimeoutTokio {
    fn timeout<'a, T: 'a>(
        &'a self,
        dur: Duration,
        fut: BoxFuture<'a, T>,
    ) -> BoxFuture<'a, Result<T, TimedOut>> {
        async move {
            match tokio::time::timeout(dur, fut).await {
                Ok(v) => Ok(v),
                Err(_elapsed) => Err(TimedOut),
            }
        }
        .boxed()
    }
}