ruyi 0.1.6

An event-driven framework for non-blocking, asynchronous I/O in Rust
Documentation
use std::cell::Cell;
use std::fmt;
use std::mem;
use std::io;
use std::ptr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use futures::stream::Stream;
use futures::{Poll, Async};

use super::err::{SendError, TrySendError, RecvError, TryRecvError};
use nio::{Awakener, Pollable, Poller, Ops, Token};
use stream::IntoStream;
use reactor::PollableIo;

struct Inner<T> {
    buf_ptr: *mut T,
    alloc_cap: usize,
    cap: usize,
    idx_mask: usize,
    _padding0: [usize; cache_line_pad!(4)],

    front: AtomicUsize,
    shadow_rear: Cell<usize>,
    _padding1: [usize; cache_line_pad!(1)],

    shadow_front: Cell<usize>,
    rear: AtomicUsize,
}

unsafe impl<T: Send> Send for Inner<T> {}
unsafe impl<T> Sync for Inner<T> {}

impl<T> Inner<T> {
    #[inline]
    fn with_capacity(capacity: usize) -> Self {
        let n = capacity.next_power_of_two();
        let mut buf = Vec::with_capacity(n);
        let buf_ptr = buf.as_mut_ptr();
        let alloc_cap = buf.capacity();
        mem::forget(buf);
        Inner {
            buf_ptr: buf_ptr,
            alloc_cap: alloc_cap,
            cap: capacity,
            idx_mask: n.wrapping_sub(1),
            _padding0: [0; cache_line_pad!(4)],

            front: AtomicUsize::new(0),
            shadow_rear: Cell::new(0),
            _padding1: [0; cache_line_pad!(1)],

            shadow_front: Cell::new(0),
            rear: AtomicUsize::new(0),
        }
    }

    #[inline]
    fn effective_index(&self, index: usize) -> usize {
        index & self.idx_mask
    }

    fn try_push(&self, t: T) -> Option<T> {
        let rear = self.rear.load(Ordering::Relaxed);
        if self.shadow_front.get() + self.cap <= rear {
            self.shadow_front.set(self.front.load(Ordering::Acquire));
            if self.shadow_front.get() + self.cap <= rear {
                return Some(t);
            }
        }
        let i = self.effective_index(rear) as isize;
        unsafe { ptr::write(self.buf_ptr.offset(i), t) };
        self.rear.store(rear.wrapping_add(1), Ordering::Release);
        None
    }

    fn try_pop(&self) -> Option<T> {
        let front = self.front.load(Ordering::Relaxed);
        if front == self.shadow_rear.get() {
            self.shadow_rear.set(self.rear.load(Ordering::Acquire));
            if front == self.shadow_rear.get() {
                return None;
            }
        }

        let i = self.effective_index(front) as isize;
        let v = unsafe { ptr::read(self.buf_ptr.offset(i)) };
        self.front.store(front.wrapping_add(1), Ordering::Release);
        Some(v)
    }
}

impl<T> fmt::Debug for Inner<T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "Channel {{ ")?;
        write!(
            f,
            "cap: {}, front: {:?}, shadow_rear: {}, shadow_front: {}, rear: {:?}",
            self.cap,
            self.front,
            self.shadow_rear.get(),
            self.shadow_front.get(),
            self.rear
        )?;
        write!(f, " }}")
    }
}

impl<T> Drop for Inner<T> {
    #[inline]
    fn drop(&mut self) {
        while let Some(t) = self.try_pop() {
            drop(t);
        }
        drop(unsafe {
            Vec::from_raw_parts(self.buf_ptr, 0, self.alloc_cap)
        });
    }
}

#[derive(Debug)]
struct SenderAwakener(Arc<Awakener>);

impl Drop for SenderAwakener {
    fn drop(&mut self) {
        if let Err(e) = (&self.0).wakeup() {
            if e.kind() != io::ErrorKind::WouldBlock {
                error!("Failed to wakeup, {:?}: {}", &self.0, e);
            }
        }
    }
}

#[derive(Debug)]
pub struct SyncSender<T> {
    inner: Arc<Inner<T>>,
    awakener: SenderAwakener,
}

impl<T> SyncSender<T> {
    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
        if Arc::strong_count(&self.inner) == 1 {
            return Err(TrySendError::Disconnected(t));
        }
        if let Some(t) = (&self.inner).try_push(t) {
            return Err(TrySendError::Full(t));
        }
        (&self.awakener.0).wakeup()?;
        Ok(())
    }

    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
        let mut msg = t;
        loop {
            if Arc::strong_count(&self.inner) == 1 {
                return Err(SendError::Disconnected(msg));
            }
            match (&self.inner).try_push(msg) {
                Some(t) => msg = t,
                None => break,
            }
        }
        (&self.awakener.0).wakeup()?;
        Ok(())
    }
}

struct ReceiverAwakener(Arc<Awakener>);

impl Drop for ReceiverAwakener {
    fn drop(&mut self) {
        if let Err(e) = (&self.0).reset() {
            if e.kind() != io::ErrorKind::WouldBlock {
                error!("Failed to reset, {:?}: {}", &self.0, e);
            }
        }
    }
}

pub struct Receiver<T> {
    inner: Arc<Inner<T>>,
    awakener: ReceiverAwakener,
}

pub struct Receiving<T> {
    io: PollableIo<Receiver<T>>,
    need_reset: bool,
}

impl<T> Receiver<T> {
    pub fn try_recv(&self) -> Result<T, TryRecvError> {
        self.reset()?;
        if let Some(t) = self.try_pop() {
            Ok(t)
        } else if Arc::strong_count(&self.inner) == 1 {
            Err(TryRecvError::Disconnected)
        } else {
            Err(TryRecvError::Empty)
        }
    }

    pub fn recv(&self) -> Result<T, RecvError> {
        loop {
            if let Some(t) = self.try_pop() {
                (&self.awakener.0).reset()?;
                return Ok(t);
            }
            if Arc::strong_count(&self.inner) == 1 {
                return Err(RecvError::Disconnected);
            }
        }
    }

    #[inline]
    fn reset(&self) -> io::Result<()> {
        (&self.awakener.0).reset()
    }

    #[inline]
    fn try_pop(&self) -> Option<T> {
        (&self.inner).try_pop()
    }
}

impl<T> Pollable for Receiver<T> {
    #[inline]
    fn register(&self, poller: &Poller, interested_ops: Ops, token: Token) -> io::Result<()> {
        (&self.awakener.0).register(poller, interested_ops, token)
    }

    #[inline]
    fn reregister(&self, poller: &Poller, interested_ops: Ops, token: Token) -> io::Result<()> {
        (&self.awakener.0).reregister(poller, interested_ops, token)
    }

    #[inline]
    fn deregister(&self, poller: &Poller) -> io::Result<()> {
        (&self.awakener.0).deregister(poller)
    }
}

impl<T> IntoStream for Receiver<T> {
    type Stream = Receiving<T>;

    #[inline]
    fn into_stream(self) -> Self::Stream {
        Receiving {
            io: PollableIo::new(self),
            need_reset: true,
        }
    }
}

impl<T> fmt::Debug for Receiver<T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(
            f,
            "Receiver {{ inner: {:?}, awakener: {:?} }}",
            &self.inner,
            &self.awakener.0
        )
    }
}

impl<T> Stream for Receiving<T> {
    type Item = T;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        if self.need_reset {
            if let Err(e) = self.io.get_ref().reset() {
                if e.kind() == io::ErrorKind::WouldBlock {
                    self.io.need_read()?;
                    return Ok(Async::NotReady);
                }
                return Err(e);
            }
            self.need_reset = false;
        }
        if let Some(t) = self.io.get_ref().try_pop() {
            Ok(Async::Ready(Some(t)))
        } else if Arc::strong_count(&self.io.get_ref().inner) == 1 {
            self.need_reset = true;
            Ok(Async::Ready(None))
        } else {
            self.need_reset = true;
            self.io.need_read()?;
            Ok(Async::NotReady)
        }
    }
}

pub fn sync_channel<T>(capacity: usize) -> io::Result<(SyncSender<T>, Receiver<T>)> {
    let inner = Arc::new(Inner::with_capacity(capacity));
    let awakener = Arc::new(Awakener::new()?);
    let tx = SyncSender {
        inner: inner.clone(),
        awakener: SenderAwakener(awakener.clone()),
    };
    let rx = Receiver {
        inner: inner,
        awakener: ReceiverAwakener(awakener),
    };
    Ok((tx, rx))
}