Skip to main content

rill_core/buffer/
fan.rs

1//! # Fan-out and fan-in buffers for complex routing
2
3use super::array_from_fn;
4use crate::buffer::{AtomicCell, AtomicStats, SignalBuffer, BufferStats, CACHE_LINE_SIZE};
5use crate::math::Transcendental;
6use core::marker::PhantomData;
7use core::sync::atomic::{AtomicBool, Ordering};
8use std::fmt;
9
10// ============================================================================
11// FanOutBuffer
12// ============================================================================
13
14/// Buffer for broadcasting from one producer to multiple consumers
15#[repr(align(64))]
16pub struct FanOutBuffer<T: Transcendental, const N: usize, const CONSUMERS: usize> {
17    /// Shared data storage using AtomicCell for each sample
18    storage: [AtomicCell<T>; N],
19
20    /// Current version (incremented on each write)
21    version: AtomicCell<usize>,
22
23    /// Last read version for each consumer
24    read_versions: [AtomicCell<usize>; CONSUMERS],
25
26    /// Valid flag
27    valid: AtomicBool,
28
29    /// Atomic statistics
30    stats: AtomicStats,
31
32    /// Phantom data
33    _phantom: PhantomData<T>,
34}
35
36impl<T: Transcendental, const N: usize, const CONSUMERS: usize> FanOutBuffer<T, N, CONSUMERS> {
37    /// Create new fan-out buffer
38    pub fn new() -> Self {
39        assert!(
40            CONSUMERS > 0,
41            "FanOutBuffer must have at least one consumer"
42        );
43
44        // Create storage with default values
45        let storage = array_from_fn(|_| AtomicCell::new(T::ZERO));
46
47        Self {
48            storage,
49            version: AtomicCell::new(0),
50            read_versions: array_from_fn(|_| AtomicCell::new(0)),
51            valid: AtomicBool::new(false),
52            stats: AtomicStats::new(),
53            _phantom: PhantomData,
54        }
55    }
56
57    /// Write data to all consumers
58    #[inline(always)]
59    pub fn write(&self, data: &[T; N]) {
60        for i in 0..N {
61            self.storage[i].store(data[i]);
62        }
63
64        self.version.store(self.version.load() + 1);
65        self.valid.store(true, Ordering::Release);
66
67        self.stats.record_write();
68        self.stats.update_peak(1);
69    }
70
71    /// Try to read for a specific consumer
72    #[inline(always)]
73    pub fn try_read(&self, consumer_id: usize) -> Option<[T; N]> {
74        if consumer_id >= CONSUMERS {
75            return None;
76        }
77
78        let current_version = self.version.load();
79        let last_read = self.read_versions[consumer_id].load();
80
81        if last_read == current_version || !self.valid.load(Ordering::Acquire) {
82            self.stats.record_underflow();
83            return None;
84        }
85
86        let mut result = [T::ZERO; N];
87        for i in 0..N {
88            result[i] = self.storage[i].load();
89        }
90
91        self.read_versions[consumer_id].store(current_version);
92
93        self.stats.record_read();
94
95        Some(result)
96    }
97
98    /// Check if a specific consumer has new data
99    #[inline(always)]
100    pub fn has_new_data(&self, consumer_id: usize) -> bool {
101        if consumer_id >= CONSUMERS {
102            return false;
103        }
104
105        let current_version = self.version.load();
106        let last_read = self.read_versions[consumer_id].load();
107
108        current_version != last_read && self.valid.load(Ordering::Acquire)
109    }
110
111    /// Get the number of consumers
112    pub const fn consumer_count(&self) -> usize {
113        CONSUMERS
114    }
115
116    /// Get current version
117    pub fn current_version(&self) -> usize {
118        self.version.load()
119    }
120
121    /// Get last read version for consumer
122    pub fn last_read_version(&self, consumer_id: usize) -> Option<usize> {
123        if consumer_id >= CONSUMERS {
124            None
125        } else {
126            Some(self.read_versions[consumer_id].load())
127        }
128    }
129
130    /// Reset the buffer
131    pub fn reset(&self) {
132        self.valid.store(false, Ordering::Release);
133        for i in 0..CONSUMERS {
134            self.read_versions[i].store(0);
135        }
136        self.stats.reset();
137    }
138}
139
140impl<T: Transcendental, const N: usize, const CONSUMERS: usize> SignalBuffer<T>
141    for FanOutBuffer<T, N, CONSUMERS>
142{
143    fn capacity(&self) -> usize {
144        N
145    }
146
147    fn len(&self) -> usize {
148        if self.valid.load(Ordering::Relaxed) {
149            1
150        } else {
151            0
152        }
153    }
154
155    fn is_empty(&self) -> bool {
156        !self.valid.load(Ordering::Relaxed)
157    }
158
159    fn is_full(&self) -> bool {
160        self.valid.load(Ordering::Relaxed)
161    }
162
163    fn clear(&mut self) {
164        self.valid.store(false, Ordering::Release);
165        for i in 0..CONSUMERS {
166            self.read_versions[i].store(0);
167        }
168        self.stats.reset();
169    }
170
171    fn stats(&self) -> BufferStats {
172        let mut stats = self.stats.snapshot();
173        stats.fill_level = if self.valid.load(Ordering::Relaxed) {
174            1.0
175        } else {
176            0.0
177        };
178        stats
179    }
180
181    fn reset_stats(&mut self) {
182        self.stats.reset();
183    }
184}
185
186// ============================================================================
187// FanInBuffer
188// ============================================================================
189
190/// Buffer for mixing multiple producers to one consumer
191#[repr(align(64))]
192pub struct FanInBuffer<T: Transcendental, const N: usize, const PRODUCERS: usize> {
193    /// Storage for each producer, each using AtomicCell for samples
194    storage: [[AtomicCell<T>; N]; PRODUCERS],
195
196    /// Valid flags for each producer
197    valid: [AtomicBool; PRODUCERS],
198
199    /// Write sequence for each producer
200    write_seq: [AtomicCell<usize>; PRODUCERS],
201
202    /// Last read sequence
203    read_seq: AtomicCell<usize>,
204
205    /// Atomic statistics
206    stats: AtomicStats,
207
208    /// Phantom data
209    _phantom: PhantomData<T>,
210}
211
212impl<T: Transcendental, const N: usize, const PRODUCERS: usize> FanInBuffer<T, N, PRODUCERS> {
213    /// Create new fan-in buffer
214    pub fn new() -> Self {
215        assert!(PRODUCERS > 0, "FanInBuffer must have at least one producer");
216
217        // Create storage with default values
218        let storage = array_from_fn(|_| array_from_fn(|_| AtomicCell::new(T::ZERO)));
219
220        Self {
221            storage,
222            valid: array_from_fn(|_| AtomicBool::new(false)),
223            write_seq: array_from_fn(|_| AtomicCell::new(0)),
224            read_seq: AtomicCell::new(0),
225            stats: AtomicStats::new(),
226            _phantom: PhantomData,
227        }
228    }
229
230    /// Write data from a specific producer
231    #[inline(always)]
232    pub fn write(&self, producer_id: usize, data: &[T; N]) {
233        if producer_id >= PRODUCERS {
234            return;
235        }
236
237        for i in 0..N {
238            self.storage[producer_id][i].store(data[i]);
239        }
240
241        self.valid[producer_id].store(true, Ordering::Release);
242        self.write_seq[producer_id].store(self.write_seq[producer_id].load() + 1);
243
244        self.stats.record_write();
245    }
246
247    /// Try to read mixed data from all producers
248    #[inline(always)]
249    pub fn try_read(&self) -> Option<[T; N]> {
250        let mut result = [T::ZERO; N];
251        let mut any_valid = false;
252        let mut active_producers = 0;
253        let current_seq = self.read_seq.load();
254
255        for producer in 0..PRODUCERS {
256            if self.valid[producer].load(Ordering::Acquire) {
257                let write_seq = self.write_seq[producer].load();
258
259                if write_seq > current_seq {
260                    any_valid = true;
261                    active_producers += 1;
262                    for i in 0..N {
263                        result[i] = result[i] + self.storage[producer][i].load();
264                    }
265                }
266            }
267        }
268
269        if any_valid {
270            self.read_seq.store(self.read_seq.load() + 1);
271
272            self.stats.record_read();
273            self.stats.update_peak(active_producers);
274            Some(result)
275        } else {
276            self.stats.record_underflow();
277            None
278        }
279    }
280
281    /// Get number of producers
282    pub const fn producer_count(&self) -> usize {
283        PRODUCERS
284    }
285
286    /// Check if producer has new data
287    pub fn producer_has_data(&self, producer_id: usize) -> bool {
288        if producer_id >= PRODUCERS {
289            return false;
290        }
291
292        let write_seq = self.write_seq[producer_id].load();
293        let read_seq = self.read_seq.load();
294
295        write_seq > read_seq && self.valid[producer_id].load(Ordering::Acquire)
296    }
297
298    /// Get read sequence
299    pub fn read_seq(&self) -> usize {
300        self.read_seq.load()
301    }
302
303    /// Get write sequence for producer
304    pub fn write_seq(&self, producer_id: usize) -> Option<usize> {
305        if producer_id >= PRODUCERS {
306            None
307        } else {
308            Some(self.write_seq[producer_id].load())
309        }
310    }
311
312    /// Reset buffer
313    pub fn reset(&self) {
314        for producer in 0..PRODUCERS {
315            self.valid[producer].store(false, Ordering::Release);
316            self.write_seq[producer].store(0);
317        }
318        self.read_seq.store(0);
319        self.stats.reset();
320    }
321
322    /// Clear specific producer
323    pub fn clear_producer(&self, producer_id: usize) {
324        if producer_id < PRODUCERS {
325            self.valid[producer_id].store(false, Ordering::Release);
326            self.write_seq[producer_id].store(0);
327        }
328    }
329}
330
331impl<T: Transcendental, const N: usize, const PRODUCERS: usize> SignalBuffer<T>
332    for FanInBuffer<T, N, PRODUCERS>
333{
334    fn capacity(&self) -> usize {
335        N * PRODUCERS
336    }
337
338    fn len(&self) -> usize {
339        let read_seq = self.read_seq.load();
340        let mut count = 0;
341
342        for producer in 0..PRODUCERS {
343            let write_seq = self.write_seq[producer].load();
344            if write_seq > read_seq && self.valid[producer].load(Ordering::Acquire) {
345                count += 1;
346            }
347        }
348
349        count
350    }
351
352    fn is_empty(&self) -> bool {
353        self.len() == 0
354    }
355
356    fn is_full(&self) -> bool {
357        self.len() == PRODUCERS
358    }
359
360    fn clear(&mut self) {
361        for producer in 0..PRODUCERS {
362            self.valid[producer].store(false, Ordering::Release);
363            self.write_seq[producer].store(0);
364        }
365        self.read_seq.store(0);
366        self.stats.reset();
367    }
368
369    fn stats(&self) -> BufferStats {
370        let mut stats = self.stats.snapshot();
371        stats.fill_level = self.len() as f32 / PRODUCERS as f32;
372        stats
373    }
374
375    fn reset_stats(&mut self) {
376        self.stats.reset();
377    }
378}
379
380// ============================================================================
381// Default implementations
382// ============================================================================
383
384impl<T: Transcendental, const N: usize, const CONSUMERS: usize> Default
385    for FanOutBuffer<T, N, CONSUMERS>
386{
387    fn default() -> Self {
388        Self::new()
389    }
390}
391
392impl<T: Transcendental, const N: usize, const PRODUCERS: usize> Default for FanInBuffer<T, N, PRODUCERS> {
393    fn default() -> Self {
394        Self::new()
395    }
396}
397
398// ============================================================================
399// Debug implementations
400// ============================================================================
401
402impl<T: Transcendental + fmt::Debug, const N: usize, const CONSUMERS: usize> fmt::Debug
403    for FanOutBuffer<T, N, CONSUMERS>
404{
405    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
406        f.debug_struct("FanOutBuffer")
407            .field("capacity", &N)
408            .field("consumers", &CONSUMERS)
409            .field("has_data", &self.valid.load(Ordering::Relaxed))
410            .field("version", &self.version.load())
411            .field("stats", &self.stats.snapshot())
412            .field("alignment", &CACHE_LINE_SIZE)
413            .finish()
414    }
415}
416
417impl<T: Transcendental + fmt::Debug, const N: usize, const PRODUCERS: usize> fmt::Debug
418    for FanInBuffer<T, N, PRODUCERS>
419{
420    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
421        let mut active = 0;
422        for i in 0..PRODUCERS {
423            if self.valid[i].load(Ordering::Relaxed) {
424                active += 1;
425            }
426        }
427
428        f.debug_struct("FanInBuffer")
429            .field("capacity", &(N * PRODUCERS))
430            .field("producers", &PRODUCERS)
431            .field("active_producers", &active)
432            .field("len", &self.len())
433            .field("read_seq", &self.read_seq.load())
434            .field("stats", &self.stats.snapshot())
435            .field("alignment", &CACHE_LINE_SIZE)
436            .finish()
437    }
438}
439
440// ============================================================================
441// Clone implementations (where possible)
442// ============================================================================
443
444impl<T: Transcendental + Copy, const N: usize, const CONSUMERS: usize> Clone
445    for FanOutBuffer<T, N, CONSUMERS>
446{
447    fn clone(&self) -> Self {
448        let new = Self::new();
449
450        if self.valid.load(Ordering::Acquire) {
451            let mut data = [T::ZERO; N];
452            for i in 0..N {
453                data[i] = self.storage[i].load();
454            }
455            new.write(&data);
456        }
457
458        new
459    }
460}
461
462impl<T: Transcendental + Copy, const N: usize, const PRODUCERS: usize> Clone
463    for FanInBuffer<T, N, PRODUCERS>
464{
465    fn clone(&self) -> Self {
466        let new = Self::new();
467
468        for producer in 0..PRODUCERS {
469            if self.valid[producer].load(Ordering::Acquire) {
470                let mut data = [T::ZERO; N];
471                for i in 0..N {
472                    data[i] = self.storage[producer][i].load();
473                }
474                new.write(producer, &data);
475            }
476        }
477
478        new
479    }
480}
481
482// ============================================================================
483// Tests
484// ============================================================================
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489
490    #[test]
491    fn test_fan_out_buffer_basic() {
492        let buffer = FanOutBuffer::<f32, 64, 3>::new();
493
494        let data = [42.0; 64];
495        buffer.write(&data);
496
497        for i in 0..3 {
498            let read = buffer.try_read(i).unwrap();
499            assert_eq!(read[0], 42.0);
500        }
501    }
502
503    #[test]
504    fn test_fan_in_buffer_basic() {
505        let buffer = FanInBuffer::<f32, 64, 2>::new();
506
507        buffer.write(0, &[1.0; 64]);
508        buffer.write(1, &[2.0; 64]);
509
510        let mixed = buffer.try_read().unwrap();
511        assert_eq!(mixed[0], 3.0);
512    }
513}