use crate::buffer::RPBuffer;
use crate::circular_buffer::CircularBuffer;
use crate::queue::RPQueue;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
pub struct BlockingQueue<'a, T> {
buffer: *mut CircularBuffer<'a, T>,
buffer_mtx: Mutex<*mut CircularBuffer<'a, T>>,
buffer_cvar: Condvar,
}
impl<'a, T> BlockingQueue<'a, T> {
pub fn new(size: usize, f: impl FnMut() -> T) -> Self {
let buf = BlockingQueue::<T>::allocate(size, f);
let buf_ptr = Box::into_raw(Box::new(buf));
return BlockingQueue {
buffer: buf_ptr,
buffer_mtx: Mutex::new(buf_ptr),
buffer_cvar: Condvar::new(),
};
}
pub fn new_arc(size: usize, f: impl FnMut() -> T) -> Arc<Self> {
let q = BlockingQueue::new(size, f);
return Arc::new(q);
}
fn allocate(size: usize, mut f: impl FnMut() -> T) -> CircularBuffer<'a, T> {
let mut buf: CircularBuffer<T> = CircularBuffer::new(size);
for _ in 0..size {
buf.add(f());
}
return buf;
}
}
impl<'a, T> RPQueue<'a, T> for BlockingQueue<'a, T> {
#[inline]
fn capacity(&self) -> usize {
unsafe {
return (*self.buffer).capacity();
}
}
#[inline]
fn available(&self) -> usize {
unsafe {
return (*self.buffer).available();
}
}
#[inline]
fn take(&self) -> Option<&'a mut T> {
if self.available() <= 0 {
return None;
}
let guard = self.buffer_mtx.lock().unwrap();
unsafe {
return (**guard).take();
}
}
#[inline]
fn take_wait(&self, timeout: Duration) -> Option<&'a mut T> {
let mut guard = self.buffer_mtx.lock().unwrap();
unsafe {
let wait_result = self.buffer_cvar.wait_timeout_while(guard, timeout, |current| (**current).available() == 0).unwrap();
if wait_result.1.timed_out() {
return None;
}
guard = wait_result.0;
};
unsafe {
return (**guard).take();
}
}
#[inline]
fn offer(&self, item: &'a T) -> usize {
let guard = self.buffer_mtx.lock().unwrap();
unsafe {
return (**guard).offer(item);
}
}
}
impl<'a, T> Drop for BlockingQueue<'a, T> {
fn drop(&mut self) {
unsafe {
Box::from_raw(self.buffer);
}
}
}
unsafe impl<'a, T> Sync for BlockingQueue<'a, T> {}
unsafe impl<'a, T> Send for BlockingQueue<'a, T> {}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::thread;
use std::time::{Duration, Instant};
#[derive(Debug, PartialEq)]
struct TestObject {
name: String,
}
impl TestObject {
fn new(name: String) -> Self {
return TestObject { name: name };
}
}
impl Default for TestObject {
fn default() -> Self {
return TestObject::new(String::from("default"));
}
}
fn create_test_queue<'a>(size: usize) -> Arc<BlockingQueue<'a, TestObject>> {
let mut counter = 0;
let obj_maker = || {
counter = counter + 1;
let name = format!("object {}", counter);
return TestObject::new(name);
};
return BlockingQueue::new_arc(size, obj_maker);
}
#[test]
fn test_bq_take() {
let size = 3;
let q = create_test_queue(size);
let obj1 = q.take();
let obj2 = q.take();
let obj3 = q.take();
assert_eq!("object 1", obj1.unwrap().name);
assert_eq!("object 2", obj2.unwrap().name);
assert_eq!("object 3", obj3.unwrap().name);
assert_eq!(size, q.capacity());
}
#[test]
fn test_bq_take_and_offer() {
let size = 2;
let q = create_test_queue(size);
q.take();
let obj2 = q.take().unwrap();
assert_eq!("object 2", obj2.name);
q.offer(obj2);
let obj2 = q.take().unwrap();
assert_eq!("object 2", obj2.name);
}
#[test]
fn test_bq_take_threads() {
let size = 2;
let q = create_test_queue(size);
let tq = q.clone();
let t0 = thread::spawn(move || {
return tq.take().unwrap().name.as_str();
});
let tq = q.clone();
let t1 = thread::spawn(move || {
return tq.take().unwrap().name.as_str();
});
let name0 = t0.join().unwrap();
let name1 = t1.join().unwrap();
let expected = vec!["object 1", "object 2"];
let mut actual = vec![name0, name1];
actual.sort();
assert_eq!(expected, actual);
assert_eq!(size, q.capacity());
}
#[test]
fn test_bq_over_limit() {
let size = 2;
let q = BlockingQueue::<TestObject>::new(size, || TestObject::default());
for _ in 0..size {
q.take();
}
let obj = q.take();
assert_eq!(None, obj);
}
#[test]
fn test_bq_over_limit_block() {
let size = 2;
let wait_ms = Duration::from_millis(10);
let q = BlockingQueue::<TestObject>::new(size, || TestObject::default());
for _ in 0..size {
q.take();
}
let now = Instant::now();
let obj = q.take_wait(wait_ms);
let elapsed = now.elapsed();
assert_eq!(None, obj);
assert!(elapsed >= wait_ms, "didn't block for sufficient time. Expected {} actual {}", wait_ms.as_millis(), elapsed.as_millis());
}
#[test]
fn test_bq_mt_take_offer() {
let thread_count = 7;
let size = thread_count;
let iterations = 1000;
let mut queue_items = HashMap::new();
let mut counter = 0;
let item_maker = || {
counter = counter + 1;
let item = format!("object {}", counter);
queue_items.insert(item.clone(), true);
return item;
};
let arc_queue = BlockingQueue::new_arc(size, item_maker);
let mut jhv = Vec::new();
for _ in 0..thread_count {
let c_buf = arc_queue.clone();
let jh = thread::spawn(move || {
let mut taken = 0;
for _ in 0..iterations {
let obj = c_buf.take();
if obj != None {
c_buf.offer(obj.unwrap());
taken = taken + 1;
}
}
return taken;
});
jhv.push(jh);
}
jhv.into_iter().for_each(|jh| {
let taken = jh.join();
assert!(taken.unwrap() > size);
});
assert_eq!(size, arc_queue.available());
let mut item = arc_queue.take();
while item != None {
let object = item.unwrap();
queue_items.remove(object);
item = arc_queue.take();
}
assert_eq!(0, queue_items.len());
}
}