Skip to main content

rill_core/buffer/
fan.rs

1use super::array_from_fn;
2use crate::buffer::{AtomicStats, BufferStats, SignalBuffer, CACHE_LINE_SIZE};
3use crate::math::Transcendental;
4use core::marker::PhantomData;
5use std::fmt;
6
7// ============================================================================
8// FanOutBuffer
9// ============================================================================
10
11/// Buffer for broadcasting from one producer to multiple consumers.
12/// Single-threaded — use [`rill_core::queues`](crate::queues) for cross-thread.
13#[repr(align(64))]
14pub struct FanOutBuffer<T: Transcendental, const N: usize, const CONSUMERS: usize> {
15    storage: [T; N],
16    version: usize,
17    read_versions: [usize; CONSUMERS],
18    valid: bool,
19    stats: AtomicStats,
20    _phantom: PhantomData<T>,
21}
22
23impl<T: Transcendental, const N: usize, const CONSUMERS: usize> FanOutBuffer<T, N, CONSUMERS> {
24    /// Create a new fan-out buffer.
25    ///
26    /// # Panics
27    /// Panics if `CONSUMERS` is 0.
28    pub fn new() -> Self {
29        assert!(
30            CONSUMERS > 0,
31            "FanOutBuffer must have at least one consumer"
32        );
33        Self {
34            storage: array_from_fn(|_| T::ZERO),
35            version: 0,
36            read_versions: [0; CONSUMERS],
37            valid: false,
38            stats: AtomicStats::new(),
39            _phantom: PhantomData,
40        }
41    }
42
43    /// Broadcast data to all consumers.
44    #[inline(always)]
45    pub fn write(&mut self, data: &[T; N]) {
46        self.storage.copy_from_slice(data);
47        self.version += 1;
48        self.valid = true;
49        self.stats.record_write();
50        self.stats.update_peak(1);
51    }
52
53    /// Read data for a specific consumer, returning `None` if already read or no data available.
54    #[inline(always)]
55    pub fn try_read(&mut self, consumer_id: usize) -> Option<[T; N]> {
56        if consumer_id >= CONSUMERS {
57            return None;
58        }
59        let current_version = self.version;
60        if self.read_versions[consumer_id] == current_version || !self.valid {
61            self.stats.record_underflow();
62            return None;
63        }
64        let mut result = [T::ZERO; N];
65        result.copy_from_slice(&self.storage);
66        self.read_versions[consumer_id] = current_version;
67        self.stats.record_read();
68        Some(result)
69    }
70
71    /// Whether unread data exists for the given consumer.
72    pub fn has_new_data(&self, consumer_id: usize) -> bool {
73        consumer_id < CONSUMERS && self.version != self.read_versions[consumer_id] && self.valid
74    }
75
76    /// Number of consumers (const generic parameter).
77    pub const fn consumer_count(&self) -> usize {
78        CONSUMERS
79    }
80    /// Current write version.
81    pub fn current_version(&self) -> usize {
82        self.version
83    }
84    /// Version last read by a consumer, or `None` if consumer ID is invalid.
85    pub fn last_read_version(&self, consumer_id: usize) -> Option<usize> {
86        if consumer_id >= CONSUMERS {
87            None
88        } else {
89            Some(self.read_versions[consumer_id])
90        }
91    }
92
93    /// Reset to initial state (invalid, all consumers at version 0).
94    pub fn reset(&mut self) {
95        self.valid = false;
96        self.read_versions.fill(0);
97        self.stats.reset();
98    }
99}
100
101impl<T: Transcendental, const N: usize, const CONSUMERS: usize> SignalBuffer<T>
102    for FanOutBuffer<T, N, CONSUMERS>
103{
104    fn capacity(&self) -> usize {
105        N
106    }
107    fn len(&self) -> usize {
108        if self.valid {
109            1
110        } else {
111            0
112        }
113    }
114    fn is_empty(&self) -> bool {
115        !self.valid
116    }
117    fn is_full(&self) -> bool {
118        self.valid
119    }
120    fn clear(&mut self) {
121        self.reset();
122    }
123    fn stats(&self) -> BufferStats {
124        let mut stats = self.stats.snapshot();
125        stats.fill_level = if self.valid { 1.0 } else { 0.0 };
126        stats
127    }
128    fn reset_stats(&mut self) {
129        self.stats.reset();
130    }
131}
132
133impl<T: Transcendental, const N: usize, const CONSUMERS: usize> Default
134    for FanOutBuffer<T, N, CONSUMERS>
135{
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141impl<T: Transcendental + fmt::Debug, const N: usize, const CONSUMERS: usize> fmt::Debug
142    for FanOutBuffer<T, N, CONSUMERS>
143{
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        f.debug_struct("FanOutBuffer")
146            .field("capacity", &N)
147            .field("consumers", &CONSUMERS)
148            .field("has_data", &self.valid)
149            .field("version", &self.version)
150            .field("stats", &self.stats.snapshot())
151            .field("alignment", &CACHE_LINE_SIZE)
152            .finish()
153    }
154}
155
156// ============================================================================
157// FanInBuffer
158// ============================================================================
159
160/// Buffer for mixing multiple producers to one consumer.
161/// Single-threaded — use [`rill_core::queues`](crate::queues) for cross-thread.
162#[repr(align(64))]
163pub struct FanInBuffer<T: Transcendental, const N: usize, const PRODUCERS: usize> {
164    storage: [[T; N]; PRODUCERS],
165    valid: [bool; PRODUCERS],
166    write_seq: [usize; PRODUCERS],
167    read_seq: usize,
168    stats: AtomicStats,
169    _phantom: PhantomData<T>,
170}
171
172impl<T: Transcendental, const N: usize, const PRODUCERS: usize> FanInBuffer<T, N, PRODUCERS> {
173    /// Create a new fan-in buffer.
174    ///
175    /// # Panics
176    /// Panics if `PRODUCERS` is 0.
177    pub fn new() -> Self {
178        assert!(PRODUCERS > 0, "FanInBuffer must have at least one producer");
179        Self {
180            storage: array_from_fn(|_| [T::ZERO; N]),
181            valid: [false; PRODUCERS],
182            write_seq: [0; PRODUCERS],
183            read_seq: 0,
184            stats: AtomicStats::new(),
185            _phantom: PhantomData,
186        }
187    }
188
189    /// Write a block of data from one producer.
190    #[inline(always)]
191    pub fn write(&mut self, producer_id: usize, data: &[T; N]) {
192        if producer_id >= PRODUCERS {
193            return;
194        }
195        self.storage[producer_id].copy_from_slice(data);
196        self.valid[producer_id] = true;
197        self.write_seq[producer_id] += 1;
198        self.stats.record_write();
199    }
200
201    /// Read and sum all producers' data that have new writes since last read.
202    #[inline(always)]
203    pub fn try_read(&mut self) -> Option<[T; N]> {
204        let mut result = [T::ZERO; N];
205        let mut any_valid = false;
206        let mut active_producers = 0;
207        let current_seq = self.read_seq;
208        for producer in 0..PRODUCERS {
209            if self.valid[producer] && self.write_seq[producer] > current_seq {
210                any_valid = true;
211                active_producers += 1;
212                for (res, &val) in result.iter_mut().zip(self.storage[producer].iter()) {
213                    *res += val;
214                }
215            }
216        }
217        if any_valid {
218            self.read_seq += 1;
219            self.stats.record_read();
220            self.stats.update_peak(active_producers);
221            Some(result)
222        } else {
223            self.stats.record_underflow();
224            None
225        }
226    }
227
228    /// Number of producers (const generic parameter).
229    pub const fn producer_count(&self) -> usize {
230        PRODUCERS
231    }
232
233    /// Whether a specific producer has unread data.
234    pub fn producer_has_data(&self, producer_id: usize) -> bool {
235        if producer_id >= PRODUCERS {
236            return false;
237        }
238        self.write_seq[producer_id] > self.read_seq && self.valid[producer_id]
239    }
240
241    /// Current read sequence counter.
242    pub fn read_seq(&self) -> usize {
243        self.read_seq
244    }
245    /// Write sequence counter for a specific producer, or `None` if ID is invalid.
246    pub fn write_seq(&self, producer_id: usize) -> Option<usize> {
247        if producer_id >= PRODUCERS {
248            None
249        } else {
250            Some(self.write_seq[producer_id])
251        }
252    }
253
254    /// Reset all producers and the read counter.
255    pub fn reset(&mut self) {
256        self.valid.fill(false);
257        self.write_seq.fill(0);
258        self.read_seq = 0;
259        self.stats.reset();
260    }
261
262    /// Clear a specific producer's data without affecting others.
263    pub fn clear_producer(&mut self, producer_id: usize) {
264        if producer_id < PRODUCERS {
265            self.valid[producer_id] = false;
266            self.write_seq[producer_id] = 0;
267        }
268    }
269}
270
271impl<T: Transcendental, const N: usize, const PRODUCERS: usize> SignalBuffer<T>
272    for FanInBuffer<T, N, PRODUCERS>
273{
274    fn capacity(&self) -> usize {
275        N * PRODUCERS
276    }
277    fn len(&self) -> usize {
278        let mut count = 0;
279        for producer in 0..PRODUCERS {
280            if self.write_seq[producer] > self.read_seq && self.valid[producer] {
281                count += 1;
282            }
283        }
284        count
285    }
286    fn is_empty(&self) -> bool {
287        self.len() == 0
288    }
289    fn is_full(&self) -> bool {
290        self.len() == PRODUCERS
291    }
292    fn clear(&mut self) {
293        self.reset();
294    }
295    fn stats(&self) -> BufferStats {
296        let mut stats = self.stats.snapshot();
297        stats.fill_level = self.len() as f32 / PRODUCERS as f32;
298        stats
299    }
300    fn reset_stats(&mut self) {
301        self.stats.reset();
302    }
303}
304
305impl<T: Transcendental, const N: usize, const PRODUCERS: usize> Default
306    for FanInBuffer<T, N, PRODUCERS>
307{
308    fn default() -> Self {
309        Self::new()
310    }
311}
312
313impl<T: Transcendental + fmt::Debug, const N: usize, const PRODUCERS: usize> fmt::Debug
314    for FanInBuffer<T, N, PRODUCERS>
315{
316    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
317        let active = self.valid.iter().filter(|v| **v).count();
318        f.debug_struct("FanInBuffer")
319            .field("capacity", &(N * PRODUCERS))
320            .field("producers", &PRODUCERS)
321            .field("active_producers", &active)
322            .field("len", &self.len())
323            .field("read_seq", &self.read_seq)
324            .field("stats", &self.stats.snapshot())
325            .field("alignment", &CACHE_LINE_SIZE)
326            .finish()
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[test]
335    fn test_fan_out_buffer_basic() {
336        let mut buffer = FanOutBuffer::<f32, 64, 3>::new();
337        let data = [42.0; 64];
338        buffer.write(&data);
339        for i in 0..3 {
340            let read = buffer.try_read(i).unwrap();
341            assert_eq!(read[0], 42.0);
342        }
343    }
344
345    #[test]
346    fn test_fan_in_buffer_basic() {
347        let mut buffer = FanInBuffer::<f32, 64, 2>::new();
348        buffer.write(0, &[1.0; 64]);
349        buffer.write(1, &[2.0; 64]);
350        let mixed = buffer.try_read().unwrap();
351        assert_eq!(mixed[0], 3.0);
352    }
353}