extern crate core;
use self::core::ptr;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use super::buffer::{Buffer, value_ptr};
use super::concurrent_queue::ConcurrentQueue;
pub struct SpscConcurrentQueue<T> {
buffer: Arc<Buffer<T>>
}
impl<T> SpscConcurrentQueue<T> {
pub fn with_capacity(initial_capacity: usize) -> SpscConcurrentQueue<T> {
SpscConcurrentQueue { buffer: Buffer::with_capacity(initial_capacity) }
}
}
impl<T> Clone for SpscConcurrentQueue<T> {
fn clone(&self) -> Self {
SpscConcurrentQueue { buffer: self.buffer.clone() }
}
}
impl<T> ConcurrentQueue<T> for SpscConcurrentQueue<T> {
fn offer(&self, val: T) -> Option<T> {
let buffer = &self.buffer;
let index = buffer.tail.load(Ordering::Relaxed);
unsafe {
let item = buffer.item(index);
if item.is_defined.load(Ordering::Acquire) {
return Some(val)
}
buffer.tail.store(index + 1, Ordering::Relaxed);
ptr::write(value_ptr(item), val);
item.is_defined.store(true, Ordering::Release);
None
}
}
fn poll(&self) -> Option<T> {
let buffer = &self.buffer;
let index = buffer.head.load(Ordering::Relaxed);
unsafe {
let item = buffer.item(index);
if !item.is_defined.load(Ordering::Acquire) {
return None;
}
buffer.head.store(index + 1, Ordering::Relaxed);
let res = ptr::read(value_ptr(item));
item.is_defined.store(false, Ordering::Release);
Some(res)
}
}
fn peek(&self) -> Option<T> {
let buffer = &self.buffer;
let index = buffer.head.load(Ordering::Relaxed);
unsafe {
let item = buffer.item(index);
if item.is_defined.load(Ordering::Acquire) {
Some(ptr::read(value_ptr(item)))
} else {
None
}
}
}
fn capacity(&self) -> usize {
self.buffer.capacity
}
fn size(&self) -> usize {
self.buffer.size()
}
}
#[cfg(test)]
mod test {
use super::core::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use super::SpscConcurrentQueue;
use super::super::concurrent_queue::ConcurrentQueue;
#[test]
fn capacity_is_next_power_of_two() {
assert_eq!(16, SpscConcurrentQueue::<i32>::with_capacity(10).capacity());
}
#[test]
fn adds_and_removes_a_value() {
let q = SpscConcurrentQueue::<i32>::with_capacity(2);
assert_eq!(None, q.offer(34));
assert_eq!(Some(34), q.poll());
}
#[test]
fn gets_full() {
let q = SpscConcurrentQueue::<i32>::with_capacity(2);
assert_eq!(None, q.offer(1));
assert_eq!(None, q.offer(2));
assert_eq!(Some(3), q.offer(3));
assert!(q.is_full());
}
#[test]
fn gets_empty() {
let q = SpscConcurrentQueue::<i32>::with_capacity(2);
assert_eq!(None, q.poll());
assert!(q.is_empty());
}
#[test]
fn peeks_a_value() {
let q = SpscConcurrentQueue::<i32>::with_capacity(2);
assert_eq!(None, q.offer(34));
assert_eq!(Some(34), q.peek());
assert_eq!(Some(34), q.poll());
assert_eq!(None, q.poll());
}
#[derive(Debug)]
struct Payload {
value: u64,
dropped: Arc<AtomicBool>
}
impl Drop for Payload {
fn drop(&mut self) {
self.dropped.store(true, Ordering::Relaxed);
}
}
impl PartialEq<Payload> for Payload {
fn eq(&self, other: &Payload) -> bool {
self.value == other.value
}
}
#[test]
fn items_are_moved() {
let q = SpscConcurrentQueue::<Payload>::with_capacity(2);
let dropped = Arc::new(AtomicBool::new(false));
let p1 = Payload { value: 67, dropped: dropped.clone() };
assert!(q.is_empty());
assert_eq!(None, q.offer(p1));
let p2 = q.poll().unwrap();
assert_eq!(67, p2.value);
assert!(!dropped.load(Ordering::Relaxed));
mem::drop(p2);
assert!(dropped.load(Ordering::Relaxed));
}
#[test]
fn lost_items_are_dropped() {
let q = SpscConcurrentQueue::<Payload>::with_capacity(2);
let dropped = Arc::new(AtomicBool::new(false));
let p = Payload { value: 67, dropped: dropped.clone() };
assert_eq!(None, q.offer(p));
assert_eq!(1, q.size());
assert!(!dropped.load(Ordering::Relaxed));
mem::drop(q);
assert!(dropped.load(Ordering::Relaxed));
}
#[test]
fn two_threads_can_add_and_remove() {
const REPETITIONS: u64 = 10 * 1000 * 1000;
let q = SpscConcurrentQueue::<u64>::with_capacity(1024);
let barrier = Arc::new(Barrier::new(2));
let cb = barrier.clone();
let cq = q.clone();
let c = thread::spawn(move|| {
cb.wait();
for i in 0..REPETITIONS {
let mut opt: Option<u64>;
while {
opt = cq.poll();
opt.is_none()
} {
thread::yield_now();
}
assert_eq!(i, opt.unwrap());
}
});
let pc = barrier.clone();
let pq = q.clone();
let p = thread::spawn(move|| {
pc.wait();
for i in 0..REPETITIONS {
while pq.offer(i).is_some() {
thread::yield_now();
}
}
});
c.join().unwrap();
p.join().unwrap();
}
}