Skip to main content

rill_core/buffer/
fan.rs

1use super::array_from_fn;
2use crate::buffer::{AtomicStats, SignalBuffer, BufferStats, CACHE_LINE_SIZE};
3use crate::math::Transcendental;
4use core::marker::PhantomData;
5use std::fmt;
6
7// ============================================================================
8// FanOutBuffer
9// ============================================================================
10
11/// Buffer for broadcasting from one producer to multiple consumers.
12/// Single-threaded — use [`rill_core::queues`](crate::queues) for cross-thread.
13#[repr(align(64))]
14pub struct FanOutBuffer<T: Transcendental, const N: usize, const CONSUMERS: usize> {
15    storage: [T; N],
16    version: usize,
17    read_versions: [usize; CONSUMERS],
18    valid: bool,
19    stats: AtomicStats,
20    _phantom: PhantomData<T>,
21}
22
23impl<T: Transcendental, const N: usize, const CONSUMERS: usize> FanOutBuffer<T, N, CONSUMERS> {
24    /// Create a new fan-out buffer.
25    ///
26    /// # Panics
27    /// Panics if `CONSUMERS` is 0.
28    pub fn new() -> Self {
29        assert!(CONSUMERS > 0, "FanOutBuffer must have at least one consumer");
30        Self {
31            storage: array_from_fn(|_| T::ZERO),
32            version: 0,
33            read_versions: [0; CONSUMERS],
34            valid: false,
35            stats: AtomicStats::new(),
36            _phantom: PhantomData,
37        }
38    }
39
40    /// Broadcast data to all consumers.
41    #[inline(always)]
42    pub fn write(&mut self, data: &[T; N]) {
43        self.storage.copy_from_slice(data);
44        self.version += 1;
45        self.valid = true;
46        self.stats.record_write();
47        self.stats.update_peak(1);
48    }
49
50    /// Read data for a specific consumer, returning `None` if already read or no data available.
51    #[inline(always)]
52    pub fn try_read(&mut self, consumer_id: usize) -> Option<[T; N]> {
53        if consumer_id >= CONSUMERS {
54            return None;
55        }
56        let current_version = self.version;
57        if self.read_versions[consumer_id] == current_version || !self.valid {
58            self.stats.record_underflow();
59            return None;
60        }
61        let mut result = [T::ZERO; N];
62        result.copy_from_slice(&self.storage);
63        self.read_versions[consumer_id] = current_version;
64        self.stats.record_read();
65        Some(result)
66    }
67
68    /// Whether unread data exists for the given consumer.
69    pub fn has_new_data(&self, consumer_id: usize) -> bool {
70        consumer_id < CONSUMERS && self.version != self.read_versions[consumer_id] && self.valid
71    }
72
73    /// Number of consumers (const generic parameter).
74    pub const fn consumer_count(&self) -> usize { CONSUMERS }
75    /// Current write version.
76    pub fn current_version(&self) -> usize { self.version }
77    /// Version last read by a consumer, or `None` if consumer ID is invalid.
78    pub fn last_read_version(&self, consumer_id: usize) -> Option<usize> {
79        if consumer_id >= CONSUMERS { None } else { Some(self.read_versions[consumer_id]) }
80    }
81
82    /// Reset to initial state (invalid, all consumers at version 0).
83    pub fn reset(&mut self) {
84        self.valid = false;
85        self.read_versions.fill(0);
86        self.stats.reset();
87    }
88}
89
90impl<T: Transcendental, const N: usize, const CONSUMERS: usize> SignalBuffer<T>
91    for FanOutBuffer<T, N, CONSUMERS>
92{
93    fn capacity(&self) -> usize { N }
94    fn len(&self) -> usize { if self.valid { 1 } else { 0 } }
95    fn is_empty(&self) -> bool { !self.valid }
96    fn is_full(&self) -> bool { self.valid }
97    fn clear(&mut self) { self.reset(); }
98    fn stats(&self) -> BufferStats {
99        let mut stats = self.stats.snapshot();
100        stats.fill_level = if self.valid { 1.0 } else { 0.0 };
101        stats
102    }
103    fn reset_stats(&mut self) { self.stats.reset(); }
104}
105
106impl<T: Transcendental, const N: usize, const CONSUMERS: usize> Default
107    for FanOutBuffer<T, N, CONSUMERS>
108{
109    fn default() -> Self { Self::new() }
110}
111
112impl<T: Transcendental + fmt::Debug, const N: usize, const CONSUMERS: usize> fmt::Debug
113    for FanOutBuffer<T, N, CONSUMERS>
114{
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        f.debug_struct("FanOutBuffer")
117            .field("capacity", &N)
118            .field("consumers", &CONSUMERS)
119            .field("has_data", &self.valid)
120            .field("version", &self.version)
121            .field("stats", &self.stats.snapshot())
122            .field("alignment", &CACHE_LINE_SIZE)
123            .finish()
124    }
125}
126
127// ============================================================================
128// FanInBuffer
129// ============================================================================
130
131/// Buffer for mixing multiple producers to one consumer.
132/// Single-threaded — use [`rill_core::queues`](crate::queues) for cross-thread.
133#[repr(align(64))]
134pub struct FanInBuffer<T: Transcendental, const N: usize, const PRODUCERS: usize> {
135    storage: [[T; N]; PRODUCERS],
136    valid: [bool; PRODUCERS],
137    write_seq: [usize; PRODUCERS],
138    read_seq: usize,
139    stats: AtomicStats,
140    _phantom: PhantomData<T>,
141}
142
143impl<T: Transcendental, const N: usize, const PRODUCERS: usize> FanInBuffer<T, N, PRODUCERS> {
144    /// Create a new fan-in buffer.
145    ///
146    /// # Panics
147    /// Panics if `PRODUCERS` is 0.
148    pub fn new() -> Self {
149        assert!(PRODUCERS > 0, "FanInBuffer must have at least one producer");
150        Self {
151            storage: array_from_fn(|_| [T::ZERO; N]),
152            valid: [false; PRODUCERS],
153            write_seq: [0; PRODUCERS],
154            read_seq: 0,
155            stats: AtomicStats::new(),
156            _phantom: PhantomData,
157        }
158    }
159
160    /// Write a block of data from one producer.
161    #[inline(always)]
162    pub fn write(&mut self, producer_id: usize, data: &[T; N]) {
163        if producer_id >= PRODUCERS { return; }
164        self.storage[producer_id].copy_from_slice(data);
165        self.valid[producer_id] = true;
166        self.write_seq[producer_id] += 1;
167        self.stats.record_write();
168    }
169
170    /// Read and sum all producers' data that have new writes since last read.
171    #[inline(always)]
172    pub fn try_read(&mut self) -> Option<[T; N]> {
173        let mut result = [T::ZERO; N];
174        let mut any_valid = false;
175        let mut active_producers = 0;
176        let current_seq = self.read_seq;
177        for producer in 0..PRODUCERS {
178            if self.valid[producer] && self.write_seq[producer] > current_seq {
179                any_valid = true;
180                active_producers += 1;
181                for i in 0..N {
182                    result[i] += self.storage[producer][i];
183                }
184            }
185        }
186        if any_valid {
187            self.read_seq += 1;
188            self.stats.record_read();
189            self.stats.update_peak(active_producers);
190            Some(result)
191        } else {
192            self.stats.record_underflow();
193            None
194        }
195    }
196
197    /// Number of producers (const generic parameter).
198    pub const fn producer_count(&self) -> usize { PRODUCERS }
199
200    /// Whether a specific producer has unread data.
201    pub fn producer_has_data(&self, producer_id: usize) -> bool {
202        if producer_id >= PRODUCERS { return false; }
203        self.write_seq[producer_id] > self.read_seq && self.valid[producer_id]
204    }
205
206    /// Current read sequence counter.
207    pub fn read_seq(&self) -> usize { self.read_seq }
208    /// Write sequence counter for a specific producer, or `None` if ID is invalid.
209    pub fn write_seq(&self, producer_id: usize) -> Option<usize> {
210        if producer_id >= PRODUCERS { None } else { Some(self.write_seq[producer_id]) }
211    }
212
213    /// Reset all producers and the read counter.
214    pub fn reset(&mut self) {
215        self.valid.fill(false);
216        self.write_seq.fill(0);
217        self.read_seq = 0;
218        self.stats.reset();
219    }
220
221    /// Clear a specific producer's data without affecting others.
222    pub fn clear_producer(&mut self, producer_id: usize) {
223        if producer_id < PRODUCERS {
224            self.valid[producer_id] = false;
225            self.write_seq[producer_id] = 0;
226        }
227    }
228}
229
230impl<T: Transcendental, const N: usize, const PRODUCERS: usize> SignalBuffer<T>
231    for FanInBuffer<T, N, PRODUCERS>
232{
233    fn capacity(&self) -> usize { N * PRODUCERS }
234    fn len(&self) -> usize {
235        let mut count = 0;
236        for producer in 0..PRODUCERS {
237            if self.write_seq[producer] > self.read_seq && self.valid[producer] {
238                count += 1;
239            }
240        }
241        count
242    }
243    fn is_empty(&self) -> bool { self.len() == 0 }
244    fn is_full(&self) -> bool { self.len() == PRODUCERS }
245    fn clear(&mut self) { self.reset(); }
246    fn stats(&self) -> BufferStats {
247        let mut stats = self.stats.snapshot();
248        stats.fill_level = self.len() as f32 / PRODUCERS as f32;
249        stats
250    }
251    fn reset_stats(&mut self) { self.stats.reset(); }
252}
253
254impl<T: Transcendental, const N: usize, const PRODUCERS: usize> Default for FanInBuffer<T, N, PRODUCERS> {
255    fn default() -> Self { Self::new() }
256}
257
258impl<T: Transcendental + fmt::Debug, const N: usize, const PRODUCERS: usize> fmt::Debug
259    for FanInBuffer<T, N, PRODUCERS>
260{
261    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262        let active = self.valid.iter().filter(|v| **v).count();
263        f.debug_struct("FanInBuffer")
264            .field("capacity", &(N * PRODUCERS))
265            .field("producers", &PRODUCERS)
266            .field("active_producers", &active)
267            .field("len", &self.len())
268            .field("read_seq", &self.read_seq)
269            .field("stats", &self.stats.snapshot())
270            .field("alignment", &CACHE_LINE_SIZE)
271            .finish()
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    #[test]
280    fn test_fan_out_buffer_basic() {
281        let mut buffer = FanOutBuffer::<f32, 64, 3>::new();
282        let data = [42.0; 64];
283        buffer.write(&data);
284        for i in 0..3 {
285            let read = buffer.try_read(i).unwrap();
286            assert_eq!(read[0], 42.0);
287        }
288    }
289
290    #[test]
291    fn test_fan_in_buffer_basic() {
292        let mut buffer = FanInBuffer::<f32, 64, 2>::new();
293        buffer.write(0, &[1.0; 64]);
294        buffer.write(1, &[2.0; 64]);
295        let mixed = buffer.try_read().unwrap();
296        assert_eq!(mixed[0], 3.0);
297    }
298}