use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{mem, vec};
#[derive(Debug)]
pub(crate) struct Injector<T, const BUCKET_CAPACITY: usize> {
inner: Mutex<Vec<Bucket<T, BUCKET_CAPACITY>>>,
is_empty: AtomicBool,
}
impl<T, const BUCKET_CAPACITY: usize> Injector<T, BUCKET_CAPACITY> {
pub(crate) const fn new() -> Self {
assert!(BUCKET_CAPACITY >= 1);
Self {
inner: Mutex::new(Vec::new()),
is_empty: AtomicBool::new(true),
}
}
pub(crate) fn insert_task(&self, task: T) {
let mut inner = self.inner.lock().unwrap();
if let Some(bucket) = inner.first_mut() {
if let Err(task) = bucket.push(task) {
let mut new_bucket = Bucket::new();
let _ = new_bucket.push(task);
let full_bucket = mem::replace(bucket, new_bucket);
inner.push(full_bucket);
}
return;
}
let mut new_bucket = Bucket::new();
let _ = new_bucket.push(task);
inner.push(new_bucket);
self.is_empty.store(false, Ordering::Relaxed);
}
pub(crate) fn push_bucket(&self, bucket: Bucket<T, BUCKET_CAPACITY>) {
let mut inner = self.inner.lock().unwrap();
let was_empty = inner.is_empty();
inner.push(bucket);
if was_empty {
self.is_empty.store(false, Ordering::Relaxed);
}
}
pub(crate) fn pop_bucket(&self) -> Option<Bucket<T, BUCKET_CAPACITY>> {
if self.is_empty.load(Ordering::Relaxed) {
return None;
}
let mut inner = self.inner.lock().unwrap();
let bucket = inner.pop();
if inner.is_empty() {
self.is_empty.store(true, Ordering::Relaxed);
}
bucket
}
pub(crate) fn is_empty(&self) -> bool {
self.is_empty.load(Ordering::Relaxed)
}
}
#[derive(Debug)]
pub(crate) struct Bucket<T, const CAPACITY: usize>(Vec<T>);
impl<T, const CAPACITY: usize> Bucket<T, CAPACITY> {
pub(crate) fn new() -> Self {
Self(Vec::with_capacity(CAPACITY))
}
pub(crate) const fn capacity() -> usize {
CAPACITY
}
pub(crate) fn push(&mut self, task: T) -> Result<(), T> {
if self.0.len() < CAPACITY {
self.0.push(task);
Ok(())
} else {
Err(task)
}
}
}
impl<T, const CAPACITY: usize> IntoIterator for Bucket<T, CAPACITY> {
type Item = T;
type IntoIter = vec::IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
impl<T, const CAPACITY: usize> FromIterator<T> for Bucket<T, CAPACITY> {
fn from_iter<U: IntoIterator<Item = T>>(iter: U) -> Self {
Self(Vec::from_iter(iter.into_iter().take(CAPACITY)))
}
}