1use super::array_from_fn;
2use crate::buffer::{AtomicStats, SignalBuffer, BufferStats, CACHE_LINE_SIZE};
3use crate::math::Transcendental;
4use core::marker::PhantomData;
5use std::fmt;
6
7#[repr(align(64))]
14pub struct FanOutBuffer<T: Transcendental, const N: usize, const CONSUMERS: usize> {
15 storage: [T; N],
16 version: usize,
17 read_versions: [usize; CONSUMERS],
18 valid: bool,
19 stats: AtomicStats,
20 _phantom: PhantomData<T>,
21}
22
23impl<T: Transcendental, const N: usize, const CONSUMERS: usize> FanOutBuffer<T, N, CONSUMERS> {
24 pub fn new() -> Self {
29 assert!(CONSUMERS > 0, "FanOutBuffer must have at least one consumer");
30 Self {
31 storage: array_from_fn(|_| T::ZERO),
32 version: 0,
33 read_versions: [0; CONSUMERS],
34 valid: false,
35 stats: AtomicStats::new(),
36 _phantom: PhantomData,
37 }
38 }
39
40 #[inline(always)]
42 pub fn write(&mut self, data: &[T; N]) {
43 self.storage.copy_from_slice(data);
44 self.version += 1;
45 self.valid = true;
46 self.stats.record_write();
47 self.stats.update_peak(1);
48 }
49
50 #[inline(always)]
52 pub fn try_read(&mut self, consumer_id: usize) -> Option<[T; N]> {
53 if consumer_id >= CONSUMERS {
54 return None;
55 }
56 let current_version = self.version;
57 if self.read_versions[consumer_id] == current_version || !self.valid {
58 self.stats.record_underflow();
59 return None;
60 }
61 let mut result = [T::ZERO; N];
62 result.copy_from_slice(&self.storage);
63 self.read_versions[consumer_id] = current_version;
64 self.stats.record_read();
65 Some(result)
66 }
67
68 pub fn has_new_data(&self, consumer_id: usize) -> bool {
70 consumer_id < CONSUMERS && self.version != self.read_versions[consumer_id] && self.valid
71 }
72
73 pub const fn consumer_count(&self) -> usize { CONSUMERS }
75 pub fn current_version(&self) -> usize { self.version }
77 pub fn last_read_version(&self, consumer_id: usize) -> Option<usize> {
79 if consumer_id >= CONSUMERS { None } else { Some(self.read_versions[consumer_id]) }
80 }
81
82 pub fn reset(&mut self) {
84 self.valid = false;
85 self.read_versions.fill(0);
86 self.stats.reset();
87 }
88}
89
90impl<T: Transcendental, const N: usize, const CONSUMERS: usize> SignalBuffer<T>
91 for FanOutBuffer<T, N, CONSUMERS>
92{
93 fn capacity(&self) -> usize { N }
94 fn len(&self) -> usize { if self.valid { 1 } else { 0 } }
95 fn is_empty(&self) -> bool { !self.valid }
96 fn is_full(&self) -> bool { self.valid }
97 fn clear(&mut self) { self.reset(); }
98 fn stats(&self) -> BufferStats {
99 let mut stats = self.stats.snapshot();
100 stats.fill_level = if self.valid { 1.0 } else { 0.0 };
101 stats
102 }
103 fn reset_stats(&mut self) { self.stats.reset(); }
104}
105
106impl<T: Transcendental, const N: usize, const CONSUMERS: usize> Default
107 for FanOutBuffer<T, N, CONSUMERS>
108{
109 fn default() -> Self { Self::new() }
110}
111
112impl<T: Transcendental + fmt::Debug, const N: usize, const CONSUMERS: usize> fmt::Debug
113 for FanOutBuffer<T, N, CONSUMERS>
114{
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 f.debug_struct("FanOutBuffer")
117 .field("capacity", &N)
118 .field("consumers", &CONSUMERS)
119 .field("has_data", &self.valid)
120 .field("version", &self.version)
121 .field("stats", &self.stats.snapshot())
122 .field("alignment", &CACHE_LINE_SIZE)
123 .finish()
124 }
125}
126
127#[repr(align(64))]
134pub struct FanInBuffer<T: Transcendental, const N: usize, const PRODUCERS: usize> {
135 storage: [[T; N]; PRODUCERS],
136 valid: [bool; PRODUCERS],
137 write_seq: [usize; PRODUCERS],
138 read_seq: usize,
139 stats: AtomicStats,
140 _phantom: PhantomData<T>,
141}
142
143impl<T: Transcendental, const N: usize, const PRODUCERS: usize> FanInBuffer<T, N, PRODUCERS> {
144 pub fn new() -> Self {
149 assert!(PRODUCERS > 0, "FanInBuffer must have at least one producer");
150 Self {
151 storage: array_from_fn(|_| [T::ZERO; N]),
152 valid: [false; PRODUCERS],
153 write_seq: [0; PRODUCERS],
154 read_seq: 0,
155 stats: AtomicStats::new(),
156 _phantom: PhantomData,
157 }
158 }
159
160 #[inline(always)]
162 pub fn write(&mut self, producer_id: usize, data: &[T; N]) {
163 if producer_id >= PRODUCERS { return; }
164 self.storage[producer_id].copy_from_slice(data);
165 self.valid[producer_id] = true;
166 self.write_seq[producer_id] += 1;
167 self.stats.record_write();
168 }
169
170 #[inline(always)]
172 pub fn try_read(&mut self) -> Option<[T; N]> {
173 let mut result = [T::ZERO; N];
174 let mut any_valid = false;
175 let mut active_producers = 0;
176 let current_seq = self.read_seq;
177 for producer in 0..PRODUCERS {
178 if self.valid[producer] && self.write_seq[producer] > current_seq {
179 any_valid = true;
180 active_producers += 1;
181 for i in 0..N {
182 result[i] += self.storage[producer][i];
183 }
184 }
185 }
186 if any_valid {
187 self.read_seq += 1;
188 self.stats.record_read();
189 self.stats.update_peak(active_producers);
190 Some(result)
191 } else {
192 self.stats.record_underflow();
193 None
194 }
195 }
196
197 pub const fn producer_count(&self) -> usize { PRODUCERS }
199
200 pub fn producer_has_data(&self, producer_id: usize) -> bool {
202 if producer_id >= PRODUCERS { return false; }
203 self.write_seq[producer_id] > self.read_seq && self.valid[producer_id]
204 }
205
206 pub fn read_seq(&self) -> usize { self.read_seq }
208 pub fn write_seq(&self, producer_id: usize) -> Option<usize> {
210 if producer_id >= PRODUCERS { None } else { Some(self.write_seq[producer_id]) }
211 }
212
213 pub fn reset(&mut self) {
215 self.valid.fill(false);
216 self.write_seq.fill(0);
217 self.read_seq = 0;
218 self.stats.reset();
219 }
220
221 pub fn clear_producer(&mut self, producer_id: usize) {
223 if producer_id < PRODUCERS {
224 self.valid[producer_id] = false;
225 self.write_seq[producer_id] = 0;
226 }
227 }
228}
229
230impl<T: Transcendental, const N: usize, const PRODUCERS: usize> SignalBuffer<T>
231 for FanInBuffer<T, N, PRODUCERS>
232{
233 fn capacity(&self) -> usize { N * PRODUCERS }
234 fn len(&self) -> usize {
235 let mut count = 0;
236 for producer in 0..PRODUCERS {
237 if self.write_seq[producer] > self.read_seq && self.valid[producer] {
238 count += 1;
239 }
240 }
241 count
242 }
243 fn is_empty(&self) -> bool { self.len() == 0 }
244 fn is_full(&self) -> bool { self.len() == PRODUCERS }
245 fn clear(&mut self) { self.reset(); }
246 fn stats(&self) -> BufferStats {
247 let mut stats = self.stats.snapshot();
248 stats.fill_level = self.len() as f32 / PRODUCERS as f32;
249 stats
250 }
251 fn reset_stats(&mut self) { self.stats.reset(); }
252}
253
254impl<T: Transcendental, const N: usize, const PRODUCERS: usize> Default for FanInBuffer<T, N, PRODUCERS> {
255 fn default() -> Self { Self::new() }
256}
257
258impl<T: Transcendental + fmt::Debug, const N: usize, const PRODUCERS: usize> fmt::Debug
259 for FanInBuffer<T, N, PRODUCERS>
260{
261 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262 let active = self.valid.iter().filter(|v| **v).count();
263 f.debug_struct("FanInBuffer")
264 .field("capacity", &(N * PRODUCERS))
265 .field("producers", &PRODUCERS)
266 .field("active_producers", &active)
267 .field("len", &self.len())
268 .field("read_seq", &self.read_seq)
269 .field("stats", &self.stats.snapshot())
270 .field("alignment", &CACHE_LINE_SIZE)
271 .finish()
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278
279 #[test]
280 fn test_fan_out_buffer_basic() {
281 let mut buffer = FanOutBuffer::<f32, 64, 3>::new();
282 let data = [42.0; 64];
283 buffer.write(&data);
284 for i in 0..3 {
285 let read = buffer.try_read(i).unwrap();
286 assert_eq!(read[0], 42.0);
287 }
288 }
289
290 #[test]
291 fn test_fan_in_buffer_basic() {
292 let mut buffer = FanInBuffer::<f32, 64, 2>::new();
293 buffer.write(0, &[1.0; 64]);
294 buffer.write(1, &[2.0; 64]);
295 let mixed = buffer.try_read().unwrap();
296 assert_eq!(mixed[0], 3.0);
297 }
298}