ruchei 0.1.3-a.0

Utilities for working with many streams
Documentation
use std::{
    collections::{HashMap, VecDeque},
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};

use futures_util::{Future, TryStream, ready};
use pin_project::pin_project;
use route_sink::ReadyRoute;
use ruchei_collections::{
    as_linked_slab::{AsLinkedSlab, SlabKey},
    linked_slab::LinkedSlab,
};
use ruchei_connection::{ConnectionWaker, Ready};

use crate::{merge::pair_item::PairItem, route::Key};

const OP_WAKE_SEND: usize = 0;
const OP_IS_READYING: usize = 1;
const OP_IS_FLUSHING: usize = 2;
const OP_COUNT: usize = 3;

#[derive(Debug)]
struct Connection<K, T> {
    key: K,
    send: Arc<ConnectionWaker>,
    msgs: VecDeque<T>,
}

#[derive(Debug)]
#[pin_project]
#[must_use = "futures must be awaited"]
pub struct Echo<
    S,
    K = <<S as TryStream>::Ok as PairItem>::K,
    T = <<S as TryStream>::Ok as PairItem>::V,
> {
    #[pin]
    router: S,
    connections: LinkedSlab<Connection<K, T>, OP_COUNT>,
    map: HashMap<K, SlabKey>,
    #[pin]
    send: Ready,
}

impl<S: Default, K, T> Default for Echo<S, K, T> {
    fn default() -> Self {
        S::default().into()
    }
}

impl<K: Key, T, S> Echo<S, K, T> {
    fn remove(self: Pin<&mut Self>, ix: SlabKey) {
        let this = self.project();
        let key = this.connections.remove(ix).key;
        assert_eq!(this.map.remove(&key).expect("unknown key"), ix);
    }

    fn push(self: Pin<&mut Self>, key: K, msg: T) {
        let this = self.project();
        if let Some(&ix) = this.map.get(&key) {
            this.connections[ix].msgs.push_back(msg);
            if this.connections.link_pop_at::<OP_IS_FLUSHING>(ix) {
                assert!(this.connections.link_push_back::<OP_IS_READYING>(ix));
                this.send.downgrade().insert(ix);
            }
        } else {
            let ix = this.connections.vacant_key();
            let send = this.send.downgrade();
            let connection = Connection {
                key: key.clone(),
                send: ConnectionWaker::new(ix, send),
                msgs: [msg].into(),
            };
            this.connections.insert_at(ix, connection);
            this.map.insert(key, ix);
            assert!(this.connections.link_push_back::<OP_IS_READYING>(ix));
            this.send.downgrade().insert(ix);
        }
    }

    fn poll_connection<E>(
        mut self: Pin<&mut Self>,
        ix: SlabKey,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), E>>
    where
        S: ReadyRoute<K, T, Error = E>,
    {
        let mut this = self.as_mut().project();
        if let Some(connection) = this.connections.get_mut(ix) {
            if connection.msgs.is_empty() {
                ready!(connection.send.poll(cx, |cx| {
                    this.router.as_mut().poll_flush_route(&connection.key, cx)
                }))?;
                assert!(this.connections.link_pop_at::<OP_IS_FLUSHING>(ix));
                self.as_mut().remove(ix);
                this = self.as_mut().project();
            } else {
                ready!(connection.send.poll(cx, |cx| {
                    this.router.as_mut().poll_ready_route(&connection.key, cx)
                }))?;
                this.router.as_mut().start_send((
                    connection.key.clone(),
                    connection
                        .msgs
                        .pop_front()
                        .expect("no first item but not empty?"),
                ))?;
                let empty = connection.msgs.is_empty();
                assert!(this.connections.link_pop_at::<OP_IS_READYING>(ix));
                if empty {
                    assert!(this.connections.link_push_back::<OP_IS_FLUSHING>(ix));
                } else {
                    assert!(this.connections.link_push_back::<OP_IS_READYING>(ix));
                }
            }
        }
        if this.connections.contains(ix) {
            this.connections.link_push_back::<OP_WAKE_SEND>(ix);
            Poll::Pending
        } else {
            Poll::Ready(Ok(()))
        }
    }
}

impl<K: Key, T, E, S: TryStream<Ok = (K, T), Error = E> + ReadyRoute<K, T, Error = E>> Future
    for Echo<S, K, T>
{
    type Output = Result<(), E>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.as_mut().project();
        macro_rules! check {
            () => {
                assert_eq!(
                    this.connections.link_len::<OP_IS_FLUSHING>()
                        + this.connections.link_len::<OP_IS_READYING>(),
                    this.connections.len(),
                );
            };
        }
        check!();
        while let Poll::Ready(o) = this.router.as_mut().try_poll_next(cx)? {
            if let Some((key, msg)) = o {
                self.as_mut().push(key, msg);
                this = self.as_mut().project();
            } else {
                return Poll::Ready(Ok(()));
            }
        }
        check!();
        while let Some(ix) = this.send.as_mut().next::<OP_WAKE_SEND>(this.connections) {
            let _: Poll<()> = self.as_mut().poll_connection(ix, cx)?;
            this = self.as_mut().project();
        }
        check!();
        while !this.connections.link_empty::<OP_IS_READYING>() {
            ready!(this.router.as_mut().poll_ready(cx))?;
            let ix = this
                .connections
                .link_pop_front::<OP_IS_READYING>()
                .expect("must be non-empty");
            let connection = &mut this.connections[ix];
            this.router.as_mut().start_send((
                connection.key.clone(),
                connection
                    .msgs
                    .pop_front()
                    .expect("no first item but not empty?"),
            ))?;
            let empty = connection.msgs.is_empty();
            if empty {
                assert!(this.connections.link_push_back::<OP_IS_FLUSHING>(ix));
            } else {
                assert!(this.connections.link_push_back::<OP_IS_READYING>(ix));
            }
        }
        check!();
        if !this.connections.is_empty() {
            assert!(this.connections.link_empty::<OP_IS_READYING>());
            ready!(this.router.poll_flush(cx))?;
            this.connections.clear();
        }
        check!();
        Poll::Pending
    }
}

impl<S, K, T> From<S> for Echo<S, K, T> {
    fn from(router: S) -> Self {
        Echo {
            router,
            connections: Default::default(),
            map: Default::default(),
            send: Default::default(),
        }
    }
}

pub trait EchoRoute:
    Sized
    + TryStream<Ok = (Self::K, Self::T), Error = Self::E>
    + ReadyRoute<Self::K, Self::T, Error = Self::E>
{
    type K: Key;
    type T;
    type E;

    fn echo_route(self) -> Echo<Self> {
        self.into()
    }
}

impl<K: Key, T, E, S: TryStream<Ok = (K, T), Error = E> + ReadyRoute<K, T, Error = E>> EchoRoute
    for S
{
    type K = K;
    type T = T;
    type E = E;
}