1use super::array_from_fn;
2use crate::buffer::{AtomicStats, SignalBuffer, BufferStats, CACHE_LINE_SIZE};
3use crate::math::Transcendental;
4use core::marker::PhantomData;
5use std::fmt;
6
7#[repr(align(64))]
13pub struct PipeBuffer<T: Transcendental, const N: usize> {
14 storage: [T; N],
15 valid: bool,
16 write_seq: usize,
17 read_seq: usize,
18 stats: AtomicStats,
19 _phantom: PhantomData<[T; N]>,
20}
21
22impl<T: Transcendental, const N: usize> PipeBuffer<T, N> {
23 pub fn new() -> Self {
25 let storage = array_from_fn(|_| T::ZERO);
26 Self {
27 storage,
28 valid: false,
29 write_seq: 0,
30 read_seq: 0,
31 stats: AtomicStats::new(),
32 _phantom: PhantomData,
33 }
34 }
35
36 #[inline(always)]
38 pub fn write(&mut self, data: &[T; N]) {
39 self.storage.copy_from_slice(data);
40 self.valid = true;
41 self.write_seq += 1;
42 self.stats.record_write();
43 self.stats.update_peak(1);
44 }
45
46 #[inline(always)]
48 pub fn read(&mut self) -> Option<[T; N]> {
49 if !self.valid {
50 return None;
51 }
52 let mut result = [T::ZERO; N];
53 result.copy_from_slice(&self.storage);
54 self.read_seq += 1;
55 self.stats.record_read();
56 Some(result)
57 }
58
59 #[inline(always)]
61 pub fn try_read(&mut self) -> Option<[T; N]> {
62 if !self.valid {
63 self.stats.record_underflow();
64 return None;
65 }
66 let mut result = [T::ZERO; N];
67 result.copy_from_slice(&self.storage);
68 self.valid = false;
69 self.read_seq += 1;
70 self.stats.record_read();
71 self.stats.update_peak(0);
72 Some(result)
73 }
74
75 pub fn read_blocking(&mut self) -> [T; N] {
77 loop {
78 if let Some(data) = self.try_read() {
79 return data;
80 }
81 core::hint::spin_loop();
82 }
83 }
84
85 pub fn has_data(&self) -> bool { self.valid }
87 pub fn write_seq(&self) -> usize { self.write_seq }
89 pub fn read_seq(&self) -> usize { self.read_seq }
91 pub fn is_caught_up(&self) -> bool { self.write_seq == self.read_seq }
93 pub fn overwrites(&self) -> usize { self.write_seq.saturating_sub(self.read_seq + 1) }
95
96 pub fn reset(&mut self) {
98 self.valid = false;
99 self.stats.reset();
100 }
101}
102
103impl<T: Transcendental, const N: usize> SignalBuffer<T> for PipeBuffer<T, N> {
104 fn capacity(&self) -> usize { N }
105 fn len(&self) -> usize { if self.valid { 1 } else { 0 } }
106 fn is_empty(&self) -> bool { !self.valid }
107 fn is_full(&self) -> bool { self.valid }
108 fn clear(&mut self) { self.valid = false; self.stats.reset(); }
109 fn stats(&self) -> BufferStats {
110 let mut stats = self.stats.snapshot();
111 stats.fill_level = if self.valid { 1.0 } else { 0.0 };
112 stats
113 }
114 fn reset_stats(&mut self) { self.stats.reset(); }
115}
116
117impl<T: Transcendental, const N: usize> Default for PipeBuffer<T, N> {
118 fn default() -> Self { Self::new() }
119}
120
121impl<T: Transcendental + fmt::Debug, const N: usize> fmt::Debug for PipeBuffer<T, N> {
122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123 f.debug_struct("PipeBuffer")
124 .field("capacity", &N)
125 .field("has_data", &self.valid)
126 .field("write_seq", &self.write_seq)
127 .field("read_seq", &self.read_seq)
128 .field("overwrites", &self.overwrites())
129 .field("stats", &self.stats.snapshot())
130 .field("alignment", &CACHE_LINE_SIZE)
131 .finish()
132 }
133}
134
135impl<T: Transcendental + Copy, const N: usize> Clone for PipeBuffer<T, N> {
136 fn clone(&self) -> Self {
137 let mut new = Self::new();
138 if self.valid {
139 new.write(&self.storage);
140 }
141 new
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 #[test]
150 fn test_pipe_buffer_basic() {
151 let mut buffer = PipeBuffer::<f32, 64>::new();
152 let write_data = [42.0; 64];
153 buffer.write(&write_data);
154 assert!(buffer.has_data());
155 assert_eq!(buffer.write_seq(), 1);
156 let read_data = buffer.try_read().unwrap();
157 assert_eq!(read_data[0], 42.0);
158 assert_eq!(buffer.read_seq(), 1);
159 assert!(buffer.is_caught_up());
160 }
161}