use std::sync::{Condvar, Mutex, MutexGuard};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{mem, sync};
unsafe impl<T, S> Send for Queue<T, S> {}
unsafe impl<T, S> Sync for Queue<T, S> {}
struct InnerQueue<T, S> {
capacity: usize,
data: *mut Option<T>,
size: AtomicUsize,
back_lock: Mutex<BackGuardInner<S>>,
front_lock: Mutex<FrontGuardInner>,
not_empty: Condvar,
}
impl<T, S> Drop for InnerQueue<T, S> {
fn drop(&mut self) {
unsafe {
let data =
Vec::from_raw_parts(self.data, self.size.load(Ordering::Acquire), self.capacity);
drop(data);
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum Error<T> {
Full(T),
}
#[derive(Debug, Clone, Copy)]
pub struct FrontGuardInner {
offset: isize,
}
#[derive(Debug)]
pub struct BackGuardInner<S> {
offset: isize,
pub inner: S,
}
impl<T, S> InnerQueue<T, S>
where
S: ::std::default::Default,
{
pub fn with_capacity(capacity: usize) -> InnerQueue<T, S> {
assert!(capacity > 0);
let mut data: Vec<Option<T>> = Vec::with_capacity(capacity);
for _ in 0..capacity {
data.push(None);
}
let raw_data = (&mut data).as_mut_ptr();
mem::forget(data);
InnerQueue {
capacity: capacity,
data: raw_data,
size: AtomicUsize::new(0),
back_lock: Mutex::new(BackGuardInner {
offset: 0,
inner: S::default(),
}),
front_lock: Mutex::new(FrontGuardInner { offset: 0 }),
not_empty: Condvar::new(),
}
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn size(&self) -> usize {
self.size.load(Ordering::Relaxed)
}
pub fn lock_back(&self) -> MutexGuard<BackGuardInner<S>> {
self.back_lock.lock().expect("back lock poisoned")
}
pub fn lock_front(&self) -> MutexGuard<FrontGuardInner> {
self.front_lock.lock().expect("front lock poisoned")
}
pub unsafe fn push_back(
&self,
elem: T,
guard: &mut MutexGuard<BackGuardInner<S>>,
) -> Result<bool, Error<T>> {
let mut must_wake_dequeuers = false;
if self.size.load(Ordering::Acquire) == self.capacity {
return Err(Error::Full(elem));
} else {
assert!((*self.data.offset((*guard).offset)).is_none());
*self.data.offset((*guard).offset) = Some(elem);
(*guard).offset += 1;
(*guard).offset %= self.capacity as isize;
if self.size.fetch_add(1, Ordering::Release) == 0 {
must_wake_dequeuers = true;
}
}
Ok(must_wake_dequeuers)
}
pub unsafe fn pop_front(&self) -> T {
let mut guard = self.front_lock.lock().expect("front lock poisoned");
while self.size.load(Ordering::Acquire) == 0 {
guard = self.not_empty
.wait(guard)
.expect("oops could not wait pop_front");
}
let elem: Option<T> = mem::replace(&mut *self.data.offset((*guard).offset), None);
assert!(elem.is_some());
*self.data.offset((*guard).offset) = None;
(*guard).offset += 1;
(*guard).offset %= self.capacity as isize;
self.size.fetch_sub(1, Ordering::Release);
elem.unwrap()
}
}
pub struct Queue<T, S> {
inner: sync::Arc<InnerQueue<T, S>>,
}
impl<T, S> ::std::fmt::Debug for Queue<T, S> {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(f, "sry")
}
}
impl<T, S> Clone for Queue<T, S> {
fn clone(&self) -> Queue<T, S> {
Queue {
inner: sync::Arc::clone(&self.inner),
}
}
}
impl<T, S> Queue<T, S>
where
S: ::std::default::Default,
{
pub fn with_capacity(capacity: usize) -> Queue<T, S> {
let inner = sync::Arc::new(InnerQueue::with_capacity(capacity));
Queue { inner: inner }
}
pub fn capacity(&self) -> usize {
(*self.inner).capacity()
}
pub fn size(&self) -> usize {
(*self.inner).size()
}
pub fn lock_back(&self) -> MutexGuard<BackGuardInner<S>> {
(*self.inner).lock_back()
}
pub fn lock_front(&self) -> MutexGuard<FrontGuardInner> {
(*self.inner).lock_front()
}
pub fn push_back(
&self,
elem: T,
mut guard: &mut MutexGuard<BackGuardInner<S>>,
) -> Result<bool, Error<T>> {
unsafe { (*self.inner).push_back(elem, &mut guard) }
}
pub fn notify_not_empty(&self, _guard: &MutexGuard<FrontGuardInner>) {
(*self.inner).not_empty.notify_all()
}
pub fn pop_front(&mut self) -> T {
unsafe { (*self.inner).pop_front() }
}
}