1use super::array_from_fn;
2use crate::buffer::{AtomicStats, Buffer, BufferStats, CACHE_LINE_SIZE};
3use crate::math::Transcendental;
4use core::marker::PhantomData;
5use std::fmt;
6
7#[repr(align(64))]
13pub struct PipeBuffer<T: Transcendental, const N: usize> {
14 storage: [T; N],
15 valid: bool,
16 write_seq: usize,
17 read_seq: usize,
18 stats: AtomicStats,
19 _phantom: PhantomData<[T; N]>,
20}
21
22impl<T: Transcendental, const N: usize> PipeBuffer<T, N> {
23 pub fn new() -> Self {
25 let storage = array_from_fn(|_| T::ZERO);
26 Self {
27 storage,
28 valid: false,
29 write_seq: 0,
30 read_seq: 0,
31 stats: AtomicStats::new(),
32 _phantom: PhantomData,
33 }
34 }
35
36 #[inline(always)]
38 pub fn write(&mut self, data: &[T; N]) {
39 self.storage.copy_from_slice(data);
40 self.valid = true;
41 self.write_seq += 1;
42 self.stats.record_write();
43 self.stats.update_peak(1);
44 }
45
46 #[inline(always)]
48 pub fn read(&mut self) -> Option<[T; N]> {
49 if !self.valid {
50 return None;
51 }
52 let mut result = [T::ZERO; N];
53 result.copy_from_slice(&self.storage);
54 self.read_seq += 1;
55 self.stats.record_read();
56 Some(result)
57 }
58
59 #[inline(always)]
61 pub fn try_read(&mut self) -> Option<[T; N]> {
62 if !self.valid {
63 self.stats.record_underflow();
64 return None;
65 }
66 let mut result = [T::ZERO; N];
67 result.copy_from_slice(&self.storage);
68 self.valid = false;
69 self.read_seq += 1;
70 self.stats.record_read();
71 self.stats.update_peak(0);
72 Some(result)
73 }
74
75 pub fn read_blocking(&mut self) -> [T; N] {
77 loop {
78 if let Some(data) = self.try_read() {
79 return data;
80 }
81 core::hint::spin_loop();
82 }
83 }
84
85 pub fn has_data(&self) -> bool {
87 self.valid
88 }
89 pub fn write_seq(&self) -> usize {
91 self.write_seq
92 }
93 pub fn read_seq(&self) -> usize {
95 self.read_seq
96 }
97 pub fn is_caught_up(&self) -> bool {
99 self.write_seq == self.read_seq
100 }
101 pub fn overwrites(&self) -> usize {
103 self.write_seq.saturating_sub(self.read_seq + 1)
104 }
105
106 pub fn reset(&mut self) {
108 self.valid = false;
109 self.stats.reset();
110 }
111}
112
113impl<T: Transcendental, const N: usize> Buffer<T> for PipeBuffer<T, N> {
114 fn capacity(&self) -> usize {
115 N
116 }
117 fn len(&self) -> usize {
118 if self.valid {
119 1
120 } else {
121 0
122 }
123 }
124 fn is_empty(&self) -> bool {
125 !self.valid
126 }
127 fn is_full(&self) -> bool {
128 self.valid
129 }
130 fn as_slice(&self) -> &[T] {
131 &self.storage
132 }
133 fn as_mut_slice(&mut self) -> &mut [T] {
134 &mut self.storage
135 }
136 fn fill(&mut self, value: T) {
137 self.storage.fill(value);
138 }
139 fn copy_from(&mut self, src: &[T]) {
140 let len = src.len().min(N);
141 self.storage[..len].copy_from_slice(&src[..len]);
142 }
143 fn clear(&mut self) {
144 self.valid = false;
145 self.stats.reset();
146 }
147 fn stats(&self) -> BufferStats {
148 let mut stats = self.stats.snapshot();
149 stats.fill_level = if self.valid { 1.0 } else { 0.0 };
150 stats
151 }
152 fn reset_stats(&mut self) {
153 self.stats.reset();
154 }
155}
156
157impl<T: Transcendental, const N: usize> Default for PipeBuffer<T, N> {
158 fn default() -> Self {
159 Self::new()
160 }
161}
162
163impl<T: Transcendental + fmt::Debug, const N: usize> fmt::Debug for PipeBuffer<T, N> {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 f.debug_struct("PipeBuffer")
166 .field("capacity", &N)
167 .field("has_data", &self.valid)
168 .field("write_seq", &self.write_seq)
169 .field("read_seq", &self.read_seq)
170 .field("overwrites", &self.overwrites())
171 .field("stats", &self.stats.snapshot())
172 .field("alignment", &CACHE_LINE_SIZE)
173 .finish()
174 }
175}
176
177impl<T: Transcendental + Copy, const N: usize> Clone for PipeBuffer<T, N> {
178 fn clone(&self) -> Self {
179 let mut new = Self::new();
180 if self.valid {
181 new.write(&self.storage);
182 }
183 new
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[test]
192 fn test_pipe_buffer_basic() {
193 let mut buffer = PipeBuffer::<f32, 64>::new();
194 let write_data = [42.0; 64];
195 buffer.write(&write_data);
196 assert!(buffer.has_data());
197 assert_eq!(buffer.write_seq(), 1);
198 let read_data = buffer.try_read().unwrap();
199 assert_eq!(read_data[0], 42.0);
200 assert_eq!(buffer.read_seq(), 1);
201 assert!(buffer.is_caught_up());
202 }
203}