Skip to main content

rill_core/buffer/
pipe.rs

1use super::array_from_fn;
2use crate::buffer::{AtomicStats, SignalBuffer, BufferStats, CACHE_LINE_SIZE};
3use crate::math::Transcendental;
4use core::marker::PhantomData;
5use std::fmt;
6
7/// Single-producer, single-consumer buffer for intra-graph node connections.
8///
9/// Unlike its name suggests, this is **not** thread-safe — it is used
10/// exclusively within the single-threaded signal graph. For cross-thread
11/// communication use [`rill_core::queues`](crate::queues).
12#[repr(align(64))]
13pub struct PipeBuffer<T: Transcendental, const N: usize> {
14    storage: [T; N],
15    valid: bool,
16    write_seq: usize,
17    read_seq: usize,
18    stats: AtomicStats,
19    _phantom: PhantomData<[T; N]>,
20}
21
22impl<T: Transcendental, const N: usize> PipeBuffer<T, N> {
23    /// Create a new empty pipe buffer.
24    pub fn new() -> Self {
25        let storage = array_from_fn(|_| T::ZERO);
26        Self {
27            storage,
28            valid: false,
29            write_seq: 0,
30            read_seq: 0,
31            stats: AtomicStats::new(),
32            _phantom: PhantomData,
33        }
34    }
35
36    /// Write a block of data. Subsequent reads will return this data.
37    #[inline(always)]
38    pub fn write(&mut self, data: &[T; N]) {
39        self.storage.copy_from_slice(data);
40        self.valid = true;
41        self.write_seq += 1;
42        self.stats.record_write();
43        self.stats.update_peak(1);
44    }
45
46    /// Read the latest written data (non-destructive).
47    #[inline(always)]
48    pub fn read(&mut self) -> Option<[T; N]> {
49        if !self.valid {
50            return None;
51        }
52        let mut result = [T::ZERO; N];
53        result.copy_from_slice(&self.storage);
54        self.read_seq += 1;
55        self.stats.record_read();
56        Some(result)
57    }
58
59    /// Read the latest written data (destructive — clears the valid flag).
60    #[inline(always)]
61    pub fn try_read(&mut self) -> Option<[T; N]> {
62        if !self.valid {
63            self.stats.record_underflow();
64            return None;
65        }
66        let mut result = [T::ZERO; N];
67        result.copy_from_slice(&self.storage);
68        self.valid = false;
69        self.read_seq += 1;
70        self.stats.record_read();
71        self.stats.update_peak(0);
72        Some(result)
73    }
74
75    /// Busy-wait until data is available, then read destructively.
76    pub fn read_blocking(&mut self) -> [T; N] {
77        loop {
78            if let Some(data) = self.try_read() {
79                return data;
80            }
81            core::hint::spin_loop();
82        }
83    }
84
85    /// Whether data is available for reading.
86    pub fn has_data(&self) -> bool { self.valid }
87    /// Number of writes performed.
88    pub fn write_seq(&self) -> usize { self.write_seq }
89    /// Number of reads performed.
90    pub fn read_seq(&self) -> usize { self.read_seq }
91    /// Whether all writes have been consumed by reads.
92    pub fn is_caught_up(&self) -> bool { self.write_seq == self.read_seq }
93    /// Number of times the buffer was overwritten before being read.
94    pub fn overwrites(&self) -> usize { self.write_seq.saturating_sub(self.read_seq + 1) }
95
96    /// Reset to initial empty state.
97    pub fn reset(&mut self) {
98        self.valid = false;
99        self.stats.reset();
100    }
101}
102
103impl<T: Transcendental, const N: usize> SignalBuffer<T> for PipeBuffer<T, N> {
104    fn capacity(&self) -> usize { N }
105    fn len(&self) -> usize { if self.valid { 1 } else { 0 } }
106    fn is_empty(&self) -> bool { !self.valid }
107    fn is_full(&self) -> bool { self.valid }
108    fn clear(&mut self) { self.valid = false; self.stats.reset(); }
109    fn stats(&self) -> BufferStats {
110        let mut stats = self.stats.snapshot();
111        stats.fill_level = if self.valid { 1.0 } else { 0.0 };
112        stats
113    }
114    fn reset_stats(&mut self) { self.stats.reset(); }
115}
116
117impl<T: Transcendental, const N: usize> Default for PipeBuffer<T, N> {
118    fn default() -> Self { Self::new() }
119}
120
121impl<T: Transcendental + fmt::Debug, const N: usize> fmt::Debug for PipeBuffer<T, N> {
122    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123        f.debug_struct("PipeBuffer")
124            .field("capacity", &N)
125            .field("has_data", &self.valid)
126            .field("write_seq", &self.write_seq)
127            .field("read_seq", &self.read_seq)
128            .field("overwrites", &self.overwrites())
129            .field("stats", &self.stats.snapshot())
130            .field("alignment", &CACHE_LINE_SIZE)
131            .finish()
132    }
133}
134
135impl<T: Transcendental + Copy, const N: usize> Clone for PipeBuffer<T, N> {
136    fn clone(&self) -> Self {
137        let mut new = Self::new();
138        if self.valid {
139            new.write(&self.storage);
140        }
141        new
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn test_pipe_buffer_basic() {
151        let mut buffer = PipeBuffer::<f32, 64>::new();
152        let write_data = [42.0; 64];
153        buffer.write(&write_data);
154        assert!(buffer.has_data());
155        assert_eq!(buffer.write_seq(), 1);
156        let read_data = buffer.try_read().unwrap();
157        assert_eq!(read_data[0], 42.0);
158        assert_eq!(buffer.read_seq(), 1);
159        assert!(buffer.is_caught_up());
160    }
161}