use core::ffi::c_int;
use core::ptr::null;
use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, Ordering};
extern crate alloc;
use alloc::sync::Arc;
use alloc::vec::Vec;
use crate::libc;
pub struct Queue<T: Clone + Default, const N: usize> {
data: [T; N],
insert_pos: AtomicU32,
read_end: AtomicU32,
wait: AtomicI32,
is_closed: AtomicBool,
}
pub struct Consumer<T: Clone + Default, const N: usize> {
queue: Arc<Queue<T, N>>,
current: usize,
}
impl<T: Clone + Default, const N: usize> Consumer<T, N> {
pub fn get_next(&mut self) -> Option<T> {
let item = self.queue.get(self.current);
self.current += 1;
item
}
}
impl<T: Clone + Default, const N: usize> Queue<T, N> {
pub fn new() -> Arc<Self> {
let data: [T; N] = unsafe {
(0..N)
.map(|_| T::default())
.collect::<Vec<_>>()
.try_into()
.unwrap_unchecked()
};
Arc::new(Queue {
data,
insert_pos: AtomicU32::new(0),
read_end: AtomicU32::new(0),
wait: AtomicI32::new(0),
is_closed: AtomicBool::new(false),
})
}
pub fn last(&self) -> usize {
self.read_end.load(Ordering::Relaxed) as usize
}
pub fn consumer(self: &Arc<Self>) -> Consumer<T, N> {
Consumer {
queue: Arc::clone(self),
current: self.read_end.load(Ordering::Relaxed) as usize,
}
}
pub fn add(&self, item: T) {
let insert_at = self.insert_pos.fetch_add(1, Ordering::Relaxed);
unsafe {
let ptr = self.data.as_ptr().add(insert_at as usize % N) as *mut T;
*ptr = item;
}
loop {
let new_commit_pos = self.read_end.compare_exchange(
insert_at,
insert_at + 1,
Ordering::Relaxed,
Ordering::Relaxed,
);
if new_commit_pos == Ok(insert_at) {
break;
}
core::hint::spin_loop();
}
self.wake_threads();
}
pub fn get(&self, idx: usize) -> Option<T> {
while idx == self.last() {
if self.is_closed.load(Ordering::Relaxed) {
return None;
}
self.park_thread();
}
Some(self.data[idx % N].clone())
}
pub fn close(&self) {
self.is_closed.store(true, Ordering::Relaxed);
}
fn park_thread(&self) {
unsafe {
libc::syscall(
libc::SYS_FUTEX,
self.wait.as_ptr() as *const i32,
libc::FUTEX_WAIT,
0,
null::<c_int>(),
null::<c_int>(),
0,
);
}
}
fn wake_threads(&self) {
unsafe {
libc::syscall(
libc::SYS_FUTEX,
self.wait.as_ptr() as *const i32,
libc::FUTEX_WAKE,
999, null::<c_int>(),
null::<c_int>(),
0,
);
}
}
}
impl<T: Clone + Default, const N: usize> Drop for Queue<T, N> {
fn drop(&mut self) {
self.close();
}
}
#[cfg(test)]
mod tests {
use super::*;
const NUM_ITEMS: usize = 40;
#[derive(Default, Clone)]
pub struct Item {
pub val: usize,
#[allow(dead_code)]
pub s: &'static str,
}
impl Item {
pub fn new(val: usize, s: &'static str) -> Self {
Item { val, s }
}
}
#[test]
fn test_queue() {
let q = Queue::<_, NUM_ITEMS>::new();
let mut c1 = q.consumer();
let mut c2 = q.consumer();
for i in 0..NUM_ITEMS {
let i1 = Item::new(i, "x");
q.add(i1);
}
q.close();
for i in 0..10 {
let got_c1 = c1.get_next().unwrap();
assert_eq!(i, got_c1.val);
}
for i in 0..15 {
let got_c2 = c2.get_next().unwrap();
assert_eq!(i, got_c2.val);
}
}
}