Skip to main content

rill_core/buffer/
fan.rs

1use super::array_from_fn;
2use crate::buffer::{AtomicStats, Buffer, BufferStats, CACHE_LINE_SIZE};
3use crate::math::Transcendental;
4use core::marker::PhantomData;
5use std::fmt;
6
7// ============================================================================
8// FanOutBuffer
9// ============================================================================
10
11/// Buffer for broadcasting from one producer to multiple consumers.
12/// Single-threaded — use [`rill_core::queues`](crate::queues) for cross-thread.
13#[repr(align(64))]
14pub struct FanOutBuffer<T: Transcendental, const N: usize, const CONSUMERS: usize> {
15    storage: [T; N],
16    version: usize,
17    read_versions: [usize; CONSUMERS],
18    valid: bool,
19    stats: AtomicStats,
20    _phantom: PhantomData<T>,
21}
22
23impl<T: Transcendental, const N: usize, const CONSUMERS: usize> FanOutBuffer<T, N, CONSUMERS> {
24    /// Create a new fan-out buffer.
25    ///
26    /// # Panics
27    /// Panics if `CONSUMERS` is 0.
28    pub fn new() -> Self {
29        assert!(
30            CONSUMERS > 0,
31            "FanOutBuffer must have at least one consumer"
32        );
33        Self {
34            storage: array_from_fn(|_| T::ZERO),
35            version: 0,
36            read_versions: [0; CONSUMERS],
37            valid: false,
38            stats: AtomicStats::new(),
39            _phantom: PhantomData,
40        }
41    }
42
43    /// Broadcast data to all consumers.
44    #[inline(always)]
45    pub fn write(&mut self, data: &[T; N]) {
46        self.storage.copy_from_slice(data);
47        self.version += 1;
48        self.valid = true;
49        self.stats.record_write();
50        self.stats.update_peak(1);
51    }
52
53    /// Read data for a specific consumer, returning `None` if already read or no data available.
54    #[inline(always)]
55    pub fn try_read(&mut self, consumer_id: usize) -> Option<[T; N]> {
56        if consumer_id >= CONSUMERS {
57            return None;
58        }
59        let current_version = self.version;
60        if self.read_versions[consumer_id] == current_version || !self.valid {
61            self.stats.record_underflow();
62            return None;
63        }
64        let mut result = [T::ZERO; N];
65        result.copy_from_slice(&self.storage);
66        self.read_versions[consumer_id] = current_version;
67        self.stats.record_read();
68        Some(result)
69    }
70
71    /// Whether unread data exists for the given consumer.
72    pub fn has_new_data(&self, consumer_id: usize) -> bool {
73        consumer_id < CONSUMERS && self.version != self.read_versions[consumer_id] && self.valid
74    }
75
76    /// Number of consumers (const generic parameter).
77    pub const fn consumer_count(&self) -> usize {
78        CONSUMERS
79    }
80    /// Current write version.
81    pub fn current_version(&self) -> usize {
82        self.version
83    }
84    /// Version last read by a consumer, or `None` if consumer ID is invalid.
85    pub fn last_read_version(&self, consumer_id: usize) -> Option<usize> {
86        if consumer_id >= CONSUMERS {
87            None
88        } else {
89            Some(self.read_versions[consumer_id])
90        }
91    }
92
93    /// Reset to initial state (invalid, all consumers at version 0).
94    pub fn reset(&mut self) {
95        self.valid = false;
96        self.read_versions.fill(0);
97        self.stats.reset();
98    }
99}
100
101impl<T: Transcendental, const N: usize, const CONSUMERS: usize> Buffer<T>
102    for FanOutBuffer<T, N, CONSUMERS>
103{
104    fn capacity(&self) -> usize {
105        N
106    }
107    fn len(&self) -> usize {
108        if self.valid {
109            1
110        } else {
111            0
112        }
113    }
114    fn is_empty(&self) -> bool {
115        !self.valid
116    }
117    fn is_full(&self) -> bool {
118        self.valid
119    }
120    fn as_slice(&self) -> &[T] {
121        &self.storage
122    }
123    fn as_mut_slice(&mut self) -> &mut [T] {
124        &mut self.storage
125    }
126    fn fill(&mut self, value: T) {
127        self.storage.fill(value);
128    }
129    fn copy_from(&mut self, src: &[T]) {
130        let len = src.len().min(N);
131        self.storage[..len].copy_from_slice(&src[..len]);
132    }
133    fn clear(&mut self) {
134        self.reset();
135    }
136    fn stats(&self) -> BufferStats {
137        let mut stats = self.stats.snapshot();
138        stats.fill_level = if self.valid { 1.0 } else { 0.0 };
139        stats
140    }
141    fn reset_stats(&mut self) {
142        self.stats.reset();
143    }
144}
145
146impl<T: Transcendental, const N: usize, const CONSUMERS: usize> Default
147    for FanOutBuffer<T, N, CONSUMERS>
148{
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154impl<T: Transcendental + fmt::Debug, const N: usize, const CONSUMERS: usize> fmt::Debug
155    for FanOutBuffer<T, N, CONSUMERS>
156{
157    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158        f.debug_struct("FanOutBuffer")
159            .field("capacity", &N)
160            .field("consumers", &CONSUMERS)
161            .field("has_data", &self.valid)
162            .field("version", &self.version)
163            .field("stats", &self.stats.snapshot())
164            .field("alignment", &CACHE_LINE_SIZE)
165            .finish()
166    }
167}
168
169// ============================================================================
170// FanInBuffer
171// ============================================================================
172
173/// Buffer for mixing multiple producers to one consumer.
174/// Single-threaded — use [`rill_core::queues`](crate::queues) for cross-thread.
175#[repr(align(64))]
176pub struct FanInBuffer<T: Transcendental, const N: usize, const PRODUCERS: usize> {
177    storage: [[T; N]; PRODUCERS],
178    valid: [bool; PRODUCERS],
179    write_seq: [usize; PRODUCERS],
180    read_seq: usize,
181    stats: AtomicStats,
182    _phantom: PhantomData<T>,
183}
184
185impl<T: Transcendental, const N: usize, const PRODUCERS: usize> FanInBuffer<T, N, PRODUCERS> {
186    /// Create a new fan-in buffer.
187    ///
188    /// # Panics
189    /// Panics if `PRODUCERS` is 0.
190    pub fn new() -> Self {
191        assert!(PRODUCERS > 0, "FanInBuffer must have at least one producer");
192        Self {
193            storage: array_from_fn(|_| [T::ZERO; N]),
194            valid: [false; PRODUCERS],
195            write_seq: [0; PRODUCERS],
196            read_seq: 0,
197            stats: AtomicStats::new(),
198            _phantom: PhantomData,
199        }
200    }
201
202    /// Write a block of data from one producer.
203    #[inline(always)]
204    pub fn write(&mut self, producer_id: usize, data: &[T; N]) {
205        if producer_id >= PRODUCERS {
206            return;
207        }
208        self.storage[producer_id].copy_from_slice(data);
209        self.valid[producer_id] = true;
210        self.write_seq[producer_id] += 1;
211        self.stats.record_write();
212    }
213
214    /// Read and sum all producers' data that have new writes since last read.
215    #[inline(always)]
216    pub fn try_read(&mut self) -> Option<[T; N]> {
217        let mut result = [T::ZERO; N];
218        let mut any_valid = false;
219        let mut active_producers = 0;
220        let current_seq = self.read_seq;
221        for producer in 0..PRODUCERS {
222            if self.valid[producer] && self.write_seq[producer] > current_seq {
223                any_valid = true;
224                active_producers += 1;
225                for (res, &val) in result.iter_mut().zip(self.storage[producer].iter()) {
226                    *res += val;
227                }
228            }
229        }
230        if any_valid {
231            self.read_seq += 1;
232            self.stats.record_read();
233            self.stats.update_peak(active_producers);
234            Some(result)
235        } else {
236            self.stats.record_underflow();
237            None
238        }
239    }
240
241    /// Number of producers (const generic parameter).
242    pub const fn producer_count(&self) -> usize {
243        PRODUCERS
244    }
245
246    /// Whether a specific producer has unread data.
247    pub fn producer_has_data(&self, producer_id: usize) -> bool {
248        if producer_id >= PRODUCERS {
249            return false;
250        }
251        self.write_seq[producer_id] > self.read_seq && self.valid[producer_id]
252    }
253
254    /// Current read sequence counter.
255    pub fn read_seq(&self) -> usize {
256        self.read_seq
257    }
258    /// Write sequence counter for a specific producer, or `None` if ID is invalid.
259    pub fn write_seq(&self, producer_id: usize) -> Option<usize> {
260        if producer_id >= PRODUCERS {
261            None
262        } else {
263            Some(self.write_seq[producer_id])
264        }
265    }
266
267    /// Reset all producers and the read counter.
268    pub fn reset(&mut self) {
269        self.valid.fill(false);
270        self.write_seq.fill(0);
271        self.read_seq = 0;
272        self.stats.reset();
273    }
274
275    /// Clear a specific producer's data without affecting others.
276    pub fn clear_producer(&mut self, producer_id: usize) {
277        if producer_id < PRODUCERS {
278            self.valid[producer_id] = false;
279            self.write_seq[producer_id] = 0;
280        }
281    }
282}
283
284impl<T: Transcendental, const N: usize, const PRODUCERS: usize> Buffer<T>
285    for FanInBuffer<T, N, PRODUCERS>
286{
287    fn capacity(&self) -> usize {
288        N * PRODUCERS
289    }
290    fn len(&self) -> usize {
291        let mut count = 0;
292        for producer in 0..PRODUCERS {
293            if self.write_seq[producer] > self.read_seq && self.valid[producer] {
294                count += 1;
295            }
296        }
297        count
298    }
299    fn is_empty(&self) -> bool {
300        self.len() == 0
301    }
302    fn is_full(&self) -> bool {
303        self.len() == PRODUCERS
304    }
305    fn as_slice(&self) -> &[T] {
306        &self.storage[0]
307    }
308    fn as_mut_slice(&mut self) -> &mut [T] {
309        &mut self.storage[0]
310    }
311    fn fill(&mut self, value: T) {
312        for p in 0..PRODUCERS {
313            self.storage[p].fill(value);
314        }
315    }
316    fn copy_from(&mut self, src: &[T]) {
317        if PRODUCERS > 0 {
318            let len = src.len().min(N);
319            self.storage[0][..len].copy_from_slice(&src[..len]);
320        }
321    }
322    fn clear(&mut self) {
323        self.reset();
324    }
325    fn stats(&self) -> BufferStats {
326        let mut stats = self.stats.snapshot();
327        stats.fill_level = self.len() as f32 / PRODUCERS as f32;
328        stats
329    }
330    fn reset_stats(&mut self) {
331        self.stats.reset();
332    }
333}
334
335impl<T: Transcendental, const N: usize, const PRODUCERS: usize> Default
336    for FanInBuffer<T, N, PRODUCERS>
337{
338    fn default() -> Self {
339        Self::new()
340    }
341}
342
343impl<T: Transcendental + fmt::Debug, const N: usize, const PRODUCERS: usize> fmt::Debug
344    for FanInBuffer<T, N, PRODUCERS>
345{
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347        let active = self.valid.iter().filter(|v| **v).count();
348        f.debug_struct("FanInBuffer")
349            .field("capacity", &(N * PRODUCERS))
350            .field("producers", &PRODUCERS)
351            .field("active_producers", &active)
352            .field("len", &self.len())
353            .field("read_seq", &self.read_seq)
354            .field("stats", &self.stats.snapshot())
355            .field("alignment", &CACHE_LINE_SIZE)
356            .finish()
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363
364    #[test]
365    fn test_fan_out_buffer_basic() {
366        let mut buffer = FanOutBuffer::<f32, 64, 3>::new();
367        let data = [42.0; 64];
368        buffer.write(&data);
369        for i in 0..3 {
370            let read = buffer.try_read(i).unwrap();
371            assert_eq!(read[0], 42.0);
372        }
373    }
374
375    #[test]
376    fn test_fan_in_buffer_basic() {
377        let mut buffer = FanInBuffer::<f32, 64, 2>::new();
378        buffer.write(0, &[1.0; 64]);
379        buffer.write(1, &[2.0; 64]);
380        let mixed = buffer.try_read().unwrap();
381        assert_eq!(mixed[0], 3.0);
382    }
383}