1use super::array_from_fn;
4use crate::buffer::{AtomicCell, AtomicStats, SignalBuffer, BufferStats, CACHE_LINE_SIZE};
5use crate::math::Transcendental;
6use core::marker::PhantomData;
7use core::sync::atomic::{AtomicBool, Ordering};
8use std::fmt;
9
10#[repr(align(64))]
16pub struct FanOutBuffer<T: Transcendental, const N: usize, const CONSUMERS: usize> {
17 storage: [AtomicCell<T>; N],
19
20 version: AtomicCell<usize>,
22
23 read_versions: [AtomicCell<usize>; CONSUMERS],
25
26 valid: AtomicBool,
28
29 stats: AtomicStats,
31
32 _phantom: PhantomData<T>,
34}
35
36impl<T: Transcendental, const N: usize, const CONSUMERS: usize> FanOutBuffer<T, N, CONSUMERS> {
37 pub fn new() -> Self {
39 assert!(
40 CONSUMERS > 0,
41 "FanOutBuffer must have at least one consumer"
42 );
43
44 let storage = array_from_fn(|_| AtomicCell::new(T::ZERO));
46
47 Self {
48 storage,
49 version: AtomicCell::new(0),
50 read_versions: array_from_fn(|_| AtomicCell::new(0)),
51 valid: AtomicBool::new(false),
52 stats: AtomicStats::new(),
53 _phantom: PhantomData,
54 }
55 }
56
57 #[inline(always)]
59 pub fn write(&self, data: &[T; N]) {
60 for i in 0..N {
61 self.storage[i].store(data[i]);
62 }
63
64 self.version.store(self.version.load() + 1);
65 self.valid.store(true, Ordering::Release);
66
67 self.stats.record_write();
68 self.stats.update_peak(1);
69 }
70
71 #[inline(always)]
73 pub fn try_read(&self, consumer_id: usize) -> Option<[T; N]> {
74 if consumer_id >= CONSUMERS {
75 return None;
76 }
77
78 let current_version = self.version.load();
79 let last_read = self.read_versions[consumer_id].load();
80
81 if last_read == current_version || !self.valid.load(Ordering::Acquire) {
82 self.stats.record_underflow();
83 return None;
84 }
85
86 let mut result = [T::ZERO; N];
87 for i in 0..N {
88 result[i] = self.storage[i].load();
89 }
90
91 self.read_versions[consumer_id].store(current_version);
92
93 self.stats.record_read();
94
95 Some(result)
96 }
97
98 #[inline(always)]
100 pub fn has_new_data(&self, consumer_id: usize) -> bool {
101 if consumer_id >= CONSUMERS {
102 return false;
103 }
104
105 let current_version = self.version.load();
106 let last_read = self.read_versions[consumer_id].load();
107
108 current_version != last_read && self.valid.load(Ordering::Acquire)
109 }
110
111 pub const fn consumer_count(&self) -> usize {
113 CONSUMERS
114 }
115
116 pub fn current_version(&self) -> usize {
118 self.version.load()
119 }
120
121 pub fn last_read_version(&self, consumer_id: usize) -> Option<usize> {
123 if consumer_id >= CONSUMERS {
124 None
125 } else {
126 Some(self.read_versions[consumer_id].load())
127 }
128 }
129
130 pub fn reset(&self) {
132 self.valid.store(false, Ordering::Release);
133 for i in 0..CONSUMERS {
134 self.read_versions[i].store(0);
135 }
136 self.stats.reset();
137 }
138}
139
140impl<T: Transcendental, const N: usize, const CONSUMERS: usize> SignalBuffer<T>
141 for FanOutBuffer<T, N, CONSUMERS>
142{
143 fn capacity(&self) -> usize {
144 N
145 }
146
147 fn len(&self) -> usize {
148 if self.valid.load(Ordering::Relaxed) {
149 1
150 } else {
151 0
152 }
153 }
154
155 fn is_empty(&self) -> bool {
156 !self.valid.load(Ordering::Relaxed)
157 }
158
159 fn is_full(&self) -> bool {
160 self.valid.load(Ordering::Relaxed)
161 }
162
163 fn clear(&mut self) {
164 self.valid.store(false, Ordering::Release);
165 for i in 0..CONSUMERS {
166 self.read_versions[i].store(0);
167 }
168 self.stats.reset();
169 }
170
171 fn stats(&self) -> BufferStats {
172 let mut stats = self.stats.snapshot();
173 stats.fill_level = if self.valid.load(Ordering::Relaxed) {
174 1.0
175 } else {
176 0.0
177 };
178 stats
179 }
180
181 fn reset_stats(&mut self) {
182 self.stats.reset();
183 }
184}
185
186#[repr(align(64))]
192pub struct FanInBuffer<T: Transcendental, const N: usize, const PRODUCERS: usize> {
193 storage: [[AtomicCell<T>; N]; PRODUCERS],
195
196 valid: [AtomicBool; PRODUCERS],
198
199 write_seq: [AtomicCell<usize>; PRODUCERS],
201
202 read_seq: AtomicCell<usize>,
204
205 stats: AtomicStats,
207
208 _phantom: PhantomData<T>,
210}
211
212impl<T: Transcendental, const N: usize, const PRODUCERS: usize> FanInBuffer<T, N, PRODUCERS> {
213 pub fn new() -> Self {
215 assert!(PRODUCERS > 0, "FanInBuffer must have at least one producer");
216
217 let storage = array_from_fn(|_| array_from_fn(|_| AtomicCell::new(T::ZERO)));
219
220 Self {
221 storage,
222 valid: array_from_fn(|_| AtomicBool::new(false)),
223 write_seq: array_from_fn(|_| AtomicCell::new(0)),
224 read_seq: AtomicCell::new(0),
225 stats: AtomicStats::new(),
226 _phantom: PhantomData,
227 }
228 }
229
230 #[inline(always)]
232 pub fn write(&self, producer_id: usize, data: &[T; N]) {
233 if producer_id >= PRODUCERS {
234 return;
235 }
236
237 for i in 0..N {
238 self.storage[producer_id][i].store(data[i]);
239 }
240
241 self.valid[producer_id].store(true, Ordering::Release);
242 self.write_seq[producer_id].store(self.write_seq[producer_id].load() + 1);
243
244 self.stats.record_write();
245 }
246
247 #[inline(always)]
249 pub fn try_read(&self) -> Option<[T; N]> {
250 let mut result = [T::ZERO; N];
251 let mut any_valid = false;
252 let mut active_producers = 0;
253 let current_seq = self.read_seq.load();
254
255 for producer in 0..PRODUCERS {
256 if self.valid[producer].load(Ordering::Acquire) {
257 let write_seq = self.write_seq[producer].load();
258
259 if write_seq > current_seq {
260 any_valid = true;
261 active_producers += 1;
262 for i in 0..N {
263 result[i] = result[i] + self.storage[producer][i].load();
264 }
265 }
266 }
267 }
268
269 if any_valid {
270 self.read_seq.store(self.read_seq.load() + 1);
271
272 self.stats.record_read();
273 self.stats.update_peak(active_producers);
274 Some(result)
275 } else {
276 self.stats.record_underflow();
277 None
278 }
279 }
280
281 pub const fn producer_count(&self) -> usize {
283 PRODUCERS
284 }
285
286 pub fn producer_has_data(&self, producer_id: usize) -> bool {
288 if producer_id >= PRODUCERS {
289 return false;
290 }
291
292 let write_seq = self.write_seq[producer_id].load();
293 let read_seq = self.read_seq.load();
294
295 write_seq > read_seq && self.valid[producer_id].load(Ordering::Acquire)
296 }
297
298 pub fn read_seq(&self) -> usize {
300 self.read_seq.load()
301 }
302
303 pub fn write_seq(&self, producer_id: usize) -> Option<usize> {
305 if producer_id >= PRODUCERS {
306 None
307 } else {
308 Some(self.write_seq[producer_id].load())
309 }
310 }
311
312 pub fn reset(&self) {
314 for producer in 0..PRODUCERS {
315 self.valid[producer].store(false, Ordering::Release);
316 self.write_seq[producer].store(0);
317 }
318 self.read_seq.store(0);
319 self.stats.reset();
320 }
321
322 pub fn clear_producer(&self, producer_id: usize) {
324 if producer_id < PRODUCERS {
325 self.valid[producer_id].store(false, Ordering::Release);
326 self.write_seq[producer_id].store(0);
327 }
328 }
329}
330
331impl<T: Transcendental, const N: usize, const PRODUCERS: usize> SignalBuffer<T>
332 for FanInBuffer<T, N, PRODUCERS>
333{
334 fn capacity(&self) -> usize {
335 N * PRODUCERS
336 }
337
338 fn len(&self) -> usize {
339 let read_seq = self.read_seq.load();
340 let mut count = 0;
341
342 for producer in 0..PRODUCERS {
343 let write_seq = self.write_seq[producer].load();
344 if write_seq > read_seq && self.valid[producer].load(Ordering::Acquire) {
345 count += 1;
346 }
347 }
348
349 count
350 }
351
352 fn is_empty(&self) -> bool {
353 self.len() == 0
354 }
355
356 fn is_full(&self) -> bool {
357 self.len() == PRODUCERS
358 }
359
360 fn clear(&mut self) {
361 for producer in 0..PRODUCERS {
362 self.valid[producer].store(false, Ordering::Release);
363 self.write_seq[producer].store(0);
364 }
365 self.read_seq.store(0);
366 self.stats.reset();
367 }
368
369 fn stats(&self) -> BufferStats {
370 let mut stats = self.stats.snapshot();
371 stats.fill_level = self.len() as f32 / PRODUCERS as f32;
372 stats
373 }
374
375 fn reset_stats(&mut self) {
376 self.stats.reset();
377 }
378}
379
380impl<T: Transcendental, const N: usize, const CONSUMERS: usize> Default
385 for FanOutBuffer<T, N, CONSUMERS>
386{
387 fn default() -> Self {
388 Self::new()
389 }
390}
391
392impl<T: Transcendental, const N: usize, const PRODUCERS: usize> Default for FanInBuffer<T, N, PRODUCERS> {
393 fn default() -> Self {
394 Self::new()
395 }
396}
397
398impl<T: Transcendental + fmt::Debug, const N: usize, const CONSUMERS: usize> fmt::Debug
403 for FanOutBuffer<T, N, CONSUMERS>
404{
405 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
406 f.debug_struct("FanOutBuffer")
407 .field("capacity", &N)
408 .field("consumers", &CONSUMERS)
409 .field("has_data", &self.valid.load(Ordering::Relaxed))
410 .field("version", &self.version.load())
411 .field("stats", &self.stats.snapshot())
412 .field("alignment", &CACHE_LINE_SIZE)
413 .finish()
414 }
415}
416
417impl<T: Transcendental + fmt::Debug, const N: usize, const PRODUCERS: usize> fmt::Debug
418 for FanInBuffer<T, N, PRODUCERS>
419{
420 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
421 let mut active = 0;
422 for i in 0..PRODUCERS {
423 if self.valid[i].load(Ordering::Relaxed) {
424 active += 1;
425 }
426 }
427
428 f.debug_struct("FanInBuffer")
429 .field("capacity", &(N * PRODUCERS))
430 .field("producers", &PRODUCERS)
431 .field("active_producers", &active)
432 .field("len", &self.len())
433 .field("read_seq", &self.read_seq.load())
434 .field("stats", &self.stats.snapshot())
435 .field("alignment", &CACHE_LINE_SIZE)
436 .finish()
437 }
438}
439
440impl<T: Transcendental + Copy, const N: usize, const CONSUMERS: usize> Clone
445 for FanOutBuffer<T, N, CONSUMERS>
446{
447 fn clone(&self) -> Self {
448 let new = Self::new();
449
450 if self.valid.load(Ordering::Acquire) {
451 let mut data = [T::ZERO; N];
452 for i in 0..N {
453 data[i] = self.storage[i].load();
454 }
455 new.write(&data);
456 }
457
458 new
459 }
460}
461
462impl<T: Transcendental + Copy, const N: usize, const PRODUCERS: usize> Clone
463 for FanInBuffer<T, N, PRODUCERS>
464{
465 fn clone(&self) -> Self {
466 let new = Self::new();
467
468 for producer in 0..PRODUCERS {
469 if self.valid[producer].load(Ordering::Acquire) {
470 let mut data = [T::ZERO; N];
471 for i in 0..N {
472 data[i] = self.storage[producer][i].load();
473 }
474 new.write(producer, &data);
475 }
476 }
477
478 new
479 }
480}
481
482#[cfg(test)]
487mod tests {
488 use super::*;
489
490 #[test]
491 fn test_fan_out_buffer_basic() {
492 let buffer = FanOutBuffer::<f32, 64, 3>::new();
493
494 let data = [42.0; 64];
495 buffer.write(&data);
496
497 for i in 0..3 {
498 let read = buffer.try_read(i).unwrap();
499 assert_eq!(read[0], 42.0);
500 }
501 }
502
503 #[test]
504 fn test_fan_in_buffer_basic() {
505 let buffer = FanInBuffer::<f32, 64, 2>::new();
506
507 buffer.write(0, &[1.0; 64]);
508 buffer.write(1, &[2.0; 64]);
509
510 let mixed = buffer.try_read().unwrap();
511 assert_eq!(mixed[0], 3.0);
512 }
513}