use std::sync::atomic::Ordering;
use std::vec;
use seize::Guard;
use syncpool::prelude::*;
use crate::policy::DefaultPolicy;
use crate::reclaim::{Atomic, Shared};
pub type RingConsumer = Box<dyn Fn(Vec<u64>) -> bool>;
#[derive(Clone)]
pub struct RingStripe<T> {
pub(crate) data: Atomic<Vec<u64>>,
pub capa: usize,
pub(crate) cons: Atomic<DefaultPolicy<T>>,
}
impl<T> RingStripe<T> {
fn new(capa: usize, p: Shared<DefaultPolicy<T>>) -> Self {
RingStripe {
data: Atomic::null(),
capa,
cons: Atomic::from(p),
}
}
fn push<'g>(&'g self, item: u64, guard: &'g Guard) {
let mut data = self.data.load(Ordering::SeqCst, guard);
if data.is_null() {
data = Shared::boxed(vec![0; self.capa], guard.collector().unwrap());
self.data.store(data, Ordering::SeqCst);
}
let data = unsafe { data.as_ptr() };
let data = unsafe { data.as_mut().unwrap() };
data.push(item);
if data.len() >= self.capa {
unsafe {
let p = self.cons.load(Ordering::SeqCst, guard);
let p = unsafe {p.as_ptr()};
let p = unsafe {p.as_mut().unwrap()};
let mut data = self.data.load(Ordering::SeqCst, guard);
if data.is_null() || !unsafe { data.deref() }.is_empty() {
data = Shared::boxed(Vec::with_capacity(self.capa), guard.collector().unwrap());
self.data.store(data, Ordering::SeqCst);
}
let data = data.as_ptr();
if p.push(data.as_mut().unwrap().clone(), guard) {
let empty = Shared::boxed(vec![0; self.capa], guard.collector().unwrap());
self.data.store(empty, Ordering::SeqCst);
} else {
let empty = Shared::boxed(vec![0; self.capa], guard.collector().unwrap());
self.data.store(empty, Ordering::SeqCst);
}
}
}
}
}
#[derive(Clone)]
pub struct RingBuffer<T> {
pool: RingStripe<T>,
}
impl<T> RingBuffer<T> {
pub(crate) fn new(f: Shared<DefaultPolicy<T>>, capa: usize) -> Self
{
RingBuffer {
pool: RingStripe::new(capa, f),
}
}
pub fn push<'g>(&'g self, item: u64, guard: &'g Guard) {
self.pool.push(item, guard);
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_ring_drain() {
}
}