use super::array_from_fn;
use crate::buffer::{AtomicCell, AtomicStats, SignalBuffer, BufferStats, CACHE_LINE_SIZE};
use crate::math::Transcendental;
use core::marker::PhantomData;
use core::sync::atomic::{AtomicBool, Ordering};
use std::fmt;
#[repr(align(64))]
pub struct PipeBuffer<T: Transcendental, const N: usize> {
storage: [AtomicCell<T>; N],
valid: AtomicBool,
write_seq: AtomicCell<usize>,
read_seq: AtomicCell<usize>,
stats: AtomicStats,
_phantom: PhantomData<[T; N]>,
}
impl<T: Transcendental, const N: usize> PipeBuffer<T, N> {
pub fn new() -> Self {
let storage = array_from_fn(|_| AtomicCell::new(T::ZERO));
Self {
storage,
valid: AtomicBool::new(false),
write_seq: AtomicCell::new(0),
read_seq: AtomicCell::new(0),
stats: AtomicStats::new(),
_phantom: PhantomData,
}
}
#[inline(always)]
pub fn write(&self, data: &[T; N]) {
for i in 0..N {
self.storage[i].store(data[i]);
}
self.valid.store(true, Ordering::Release);
self.write_seq.store(self.write_seq.load() + 1);
self.stats.record_write();
self.stats.update_peak(1);
}
#[inline(always)]
pub fn read(&self) -> Option<[T; N]> {
if !self.valid.load(Ordering::Acquire) {
return None;
}
let mut result = [T::ZERO; N];
for i in 0..N {
result[i] = self.storage[i].load();
}
self.read_seq.store(self.read_seq.load() + 1);
self.stats.record_read();
Some(result)
}
#[inline(always)]
pub fn try_read(&self) -> Option<[T; N]> {
if !self.valid.load(Ordering::Acquire) {
self.stats.record_underflow();
return None;
}
let mut result = [T::ZERO; N];
for i in 0..N {
result[i] = self.storage[i].load();
}
self.valid.store(false, Ordering::Release);
self.read_seq.store(self.read_seq.load() + 1);
self.stats.record_read();
self.stats.update_peak(0);
Some(result)
}
pub fn read_blocking(&self) -> [T; N] {
loop {
if let Some(data) = self.try_read() {
return data;
}
core::hint::spin_loop();
}
}
#[inline(always)]
pub fn has_data(&self) -> bool {
self.valid.load(Ordering::Acquire)
}
pub fn write_seq(&self) -> usize {
self.write_seq.load()
}
pub fn read_seq(&self) -> usize {
self.read_seq.load()
}
pub fn is_caught_up(&self) -> bool {
self.write_seq() == self.read_seq()
}
pub fn overwrites(&self) -> usize {
self.write_seq().saturating_sub(self.read_seq() + 1)
}
pub fn reset(&self) {
self.valid.store(false, Ordering::Release);
self.stats.reset();
}
}
impl<T: Transcendental, const N: usize> SignalBuffer<T> for PipeBuffer<T, N> {
fn capacity(&self) -> usize {
N
}
fn len(&self) -> usize {
if self.has_data() {
1
} else {
0
}
}
fn is_empty(&self) -> bool {
!self.has_data()
}
fn is_full(&self) -> bool {
self.has_data()
}
fn clear(&mut self) {
self.valid.store(false, Ordering::Release);
self.stats.reset();
}
fn stats(&self) -> BufferStats {
let mut stats = self.stats.snapshot();
stats.fill_level = if self.has_data() { 1.0 } else { 0.0 };
stats
}
fn reset_stats(&mut self) {
self.stats.reset();
}
}
impl<T: Transcendental, const N: usize> Default for PipeBuffer<T, N> {
fn default() -> Self {
Self::new()
}
}
impl<T: Transcendental + fmt::Debug, const N: usize> fmt::Debug for PipeBuffer<T, N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PipeBuffer")
.field("capacity", &N)
.field("has_data", &self.has_data())
.field("write_seq", &self.write_seq())
.field("read_seq", &self.read_seq())
.field("overwrites", &self.overwrites())
.field("stats", &self.stats.snapshot())
.field("alignment", &CACHE_LINE_SIZE)
.finish()
}
}
impl<T: Transcendental + Copy, const N: usize> Clone for PipeBuffer<T, N> {
fn clone(&self) -> Self {
let new = Self::new();
if let Some(data) = self.try_read() {
new.write(&data);
}
new
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipe_buffer_basic() {
let buffer = PipeBuffer::<f32, 64>::new();
let write_data = [42.0; 64];
buffer.write(&write_data);
assert!(buffer.has_data());
assert_eq!(buffer.write_seq(), 1);
let read_data = buffer.try_read().unwrap();
assert_eq!(read_data[0], 42.0);
assert_eq!(buffer.read_seq(), 1);
assert!(buffer.is_caught_up());
}
}