ruchei-connection 0.0.0-a.3

Readiness queues used in the implementation of ruchei for managing multiple connections
Documentation
use std::{
    marker::PhantomPinned,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll, Wake, Waker},
};

use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded};
use futures_util::{Stream, task::AtomicWaker};
use pin_project::pin_project;
use ruchei_collections::as_linked_slab::{AsLinkedSlab, SlabKey};

#[derive(Debug, Default)]
#[must_use]
struct SlabWaker {
    waker: AtomicWaker,
}

impl Wake for SlabWaker {
    fn wake(self: Arc<Self>) {
        self.wake_by_ref();
    }

    fn wake_by_ref(self: &Arc<Self>) {
        self.waker.wake();
    }
}

#[must_use]
#[pin_project]
#[derive(Debug)]
pub struct Ready {
    send: UnboundedSender<SlabKey>,
    #[pin]
    recv: UnboundedReceiver<SlabKey>,
    inner: Arc<SlabWaker>,
    waker: Waker,
    _pinned: PhantomPinned,
}

impl Default for Ready {
    fn default() -> Self {
        let (send, recv) = unbounded();
        let inner = Arc::<SlabWaker>::default();
        let waker = inner.clone().into();
        Self {
            send,
            recv,
            inner,
            waker,
            _pinned: PhantomPinned,
        }
    }
}

#[must_use]
#[derive(Debug, Default)]
pub struct ReadyWeak(Option<UnboundedSender<SlabKey>>);

impl Ready {
    pub fn downgrade(&self) -> ReadyWeak {
        ReadyWeak(Some(self.send.clone()))
    }

    pub fn wake(&self) {
        self.inner.wake_by_ref();
    }

    pub fn compact<const M: usize>(self: Pin<&mut Self>, slab: &mut impl AsLinkedSlab) {
        let mut this = self.project();
        while let Poll::Ready(Some(key)) = this
            .recv
            .as_mut()
            .poll_next(&mut Context::from_waker(this.waker))
        {
            slab.link_push_back::<M>(key);
        }
    }

    #[must_use]
    pub fn next<const M: usize>(
        self: Pin<&mut Self>,
        slab: &mut impl AsLinkedSlab,
    ) -> Option<SlabKey> {
        self.compact::<M>(slab);
        slab.link_pop_front::<M>()
    }

    pub fn register(&self, cx: &mut Context<'_>) {
        self.inner.waker.register(cx.waker());
    }
}

impl ReadyWeak {
    pub fn insert(&self, key: SlabKey) {
        if let Some(sender) = self.0.as_ref() {
            let _ = sender.unbounded_send(key);
        }
    }
}

impl Extend<SlabKey> for ReadyWeak {
    fn extend<T: IntoIterator<Item = SlabKey>>(&mut self, iter: T) {
        for key in iter {
            self.insert(key);
        }
    }
}

#[must_use]
#[derive(Debug)]
pub struct ConnectionWaker {
    waker: AtomicWaker,
    ready: ReadyWeak,
    key: SlabKey,
}

impl ConnectionWaker {
    #[must_use]
    pub fn new(key: SlabKey, ready: ReadyWeak) -> Arc<Self> {
        Arc::new(Self {
            waker: Default::default(),
            ready,
            key,
        })
    }

    pub fn wake(&self) {
        self.ready.insert(self.key);
        self.waker.wake();
    }
}

impl Wake for ConnectionWaker {
    fn wake(self: Arc<Self>) {
        (*self).wake();
    }

    fn wake_by_ref(self: &Arc<Self>) {
        (**self).wake();
    }
}

impl ConnectionWaker {
    pub fn poll<T>(
        self: &Arc<Self>,
        cx: &mut Context<'_>,
        f: impl FnOnce(&mut Context<'_>) -> T,
    ) -> T {
        self.waker.register(cx.waker());
        self.poll_detached(f)
    }

    pub fn poll_detached<T>(self: &Arc<Self>, f: impl FnOnce(&mut Context<'_>) -> T) -> T {
        f(&mut Context::from_waker(&Waker::from(self.clone())))
    }
}

#[must_use]
#[derive(Debug)]
pub struct ConnectionWaker2 {
    waker0: AtomicWaker,
    waker1: AtomicWaker,
    ready: ReadyWeak,
    key: SlabKey,
}

impl ConnectionWaker2 {
    #[must_use]
    pub fn new(key: SlabKey, ready: ReadyWeak) -> Arc<Self> {
        Arc::new(Self {
            waker0: Default::default(),
            waker1: Default::default(),
            ready,
            key,
        })
    }

    pub fn wake(&self) {
        self.ready.insert(self.key);
        self.waker0.wake();
        self.waker1.wake();
    }
}

impl Wake for ConnectionWaker2 {
    fn wake(self: Arc<Self>) {
        (*self).wake();
    }

    fn wake_by_ref(self: &Arc<Self>) {
        (**self).wake();
    }
}

impl ConnectionWaker2 {
    pub fn poll0<T>(
        self: &Arc<Self>,
        cx: &mut Context<'_>,
        f: impl FnOnce(&mut Context<'_>) -> T,
    ) -> T {
        self.waker0.register(cx.waker());
        self.poll_detached(f)
    }

    pub fn poll1<T>(
        self: &Arc<Self>,
        cx: &mut Context<'_>,
        f: impl FnOnce(&mut Context<'_>) -> T,
    ) -> T {
        self.waker1.register(cx.waker());
        self.poll_detached(f)
    }

    pub fn poll_detached<T>(self: &Arc<Self>, f: impl FnOnce(&mut Context<'_>) -> T) -> T {
        f(&mut Context::from_waker(&Waker::from(self.clone())))
    }
}

#[must_use]
#[derive(Debug)]
pub struct Connection<S> {
    pub stream: S,
    pub next: Arc<ConnectionWaker>,
    pub ready: Arc<ConnectionWaker>,
    pub flush: Arc<ConnectionWaker>,
    pub close: Arc<ConnectionWaker>,
}

#[must_use]
#[derive(Debug)]
pub struct Connection2<S> {
    pub stream: S,
    pub next: Arc<ConnectionWaker>,
    pub ready: Arc<ConnectionWaker2>,
    pub flush: Arc<ConnectionWaker2>,
    pub close: Arc<ConnectionWaker>,
}