netidx 0.31.9

Secure, fast, pub/sub messaging
Documentation
use futures::channel::oneshot;
use parking_lot::Mutex;
use poolshark::global::{GPooled, Pool};
use std::sync::Arc;
use std::{clone::Clone, mem, ops::Drop, result};

#[derive(Debug)]
struct BatchChannelInner<T: Send + Sync + 'static> {
    send_closed: bool,
    recv_closed: bool,
    notify: Option<oneshot::Sender<()>>,
    queue: GPooled<Vec<T>>,
    pool: Pool<Vec<T>>,
}

#[derive(Debug)]
struct BatchSenderInner<T: Send + Sync + 'static>(Arc<Mutex<BatchChannelInner<T>>>);

impl<T: Send + Sync + 'static> Drop for BatchSenderInner<T> {
    fn drop(&mut self) {
        self.0.lock().send_closed = true;
    }
}

#[derive(Debug)]
pub(crate) struct BatchSender<T: Send + Sync + 'static>(Arc<BatchSenderInner<T>>);

impl<T: Send + Sync + 'static> Clone for BatchSender<T> {
    fn clone(&self) -> Self {
        BatchSender(Arc::clone(&self.0))
    }
}

impl<T: Send + Sync + 'static> BatchSender<T> {
    pub(crate) fn send(&self, m: T) -> bool {
        let mut inner = self.0 .0.lock();
        if inner.recv_closed {
            false
        } else {
            inner.queue.push(m);
            if let Some(sender) = inner.notify.take() {
                let _: result::Result<_, _> = sender.send(());
            }
            true
        }
    }
}

#[derive(Debug)]
pub(crate) struct BatchReceiver<T: Send + Sync + 'static>(
    Arc<Mutex<BatchChannelInner<T>>>,
);

impl<T: Send + Sync + 'static> Drop for BatchReceiver<T> {
    fn drop(&mut self) {
        self.close()
    }
}

impl<T: Send + Sync + 'static> BatchReceiver<T> {
    pub(crate) fn close(&self) {
        let mut inner = self.0.lock();
        inner.recv_closed = true;
        inner.queue.clear();
        inner.notify = None;
    }

    pub(crate) fn len(&self) -> usize {
        self.0.lock().queue.len()
    }

    pub(crate) async fn recv(&self) -> Option<GPooled<Vec<T>>> {
        loop {
            let receiver = {
                let mut inner = self.0.lock();
                if inner.queue.len() > 0 {
                    let v = inner.pool.take();
                    return Some(mem::replace(&mut inner.queue, v));
                } else if inner.send_closed {
                    return None;
                } else {
                    let (tx, rx) = oneshot::channel();
                    inner.notify = Some(tx);
                    rx
                }
            };
            let _: result::Result<_, _> = receiver.await;
        }
    }
}

pub(crate) fn channel<T: Send + Sync + 'static>() -> (BatchSender<T>, BatchReceiver<T>) {
    let pool = Pool::new(1, 100_000);
    let inner = Arc::new(Mutex::new(BatchChannelInner {
        send_closed: false,
        recv_closed: false,
        notify: None,
        queue: pool.take(),
        pool,
    }));
    let sender = BatchSender(Arc::new(BatchSenderInner(inner.clone())));
    let receiver = BatchReceiver(inner);
    (sender, receiver)
}