Skip to main content

rill_core/buffer/
pipe.rs

1use super::array_from_fn;
2use crate::buffer::{AtomicStats, BufferStats, SignalBuffer, CACHE_LINE_SIZE};
3use crate::math::Transcendental;
4use core::marker::PhantomData;
5use std::fmt;
6
7/// Single-producer, single-consumer buffer for intra-graph node connections.
8///
9/// Unlike its name suggests, this is **not** thread-safe — it is used
10/// exclusively within the single-threaded signal graph. For cross-thread
11/// communication use [`rill_core::queues`](crate::queues).
12#[repr(align(64))]
13pub struct PipeBuffer<T: Transcendental, const N: usize> {
14    storage: [T; N],
15    valid: bool,
16    write_seq: usize,
17    read_seq: usize,
18    stats: AtomicStats,
19    _phantom: PhantomData<[T; N]>,
20}
21
22impl<T: Transcendental, const N: usize> PipeBuffer<T, N> {
23    /// Create a new empty pipe buffer.
24    pub fn new() -> Self {
25        let storage = array_from_fn(|_| T::ZERO);
26        Self {
27            storage,
28            valid: false,
29            write_seq: 0,
30            read_seq: 0,
31            stats: AtomicStats::new(),
32            _phantom: PhantomData,
33        }
34    }
35
36    /// Write a block of data. Subsequent reads will return this data.
37    #[inline(always)]
38    pub fn write(&mut self, data: &[T; N]) {
39        self.storage.copy_from_slice(data);
40        self.valid = true;
41        self.write_seq += 1;
42        self.stats.record_write();
43        self.stats.update_peak(1);
44    }
45
46    /// Read the latest written data (non-destructive).
47    #[inline(always)]
48    pub fn read(&mut self) -> Option<[T; N]> {
49        if !self.valid {
50            return None;
51        }
52        let mut result = [T::ZERO; N];
53        result.copy_from_slice(&self.storage);
54        self.read_seq += 1;
55        self.stats.record_read();
56        Some(result)
57    }
58
59    /// Read the latest written data (destructive — clears the valid flag).
60    #[inline(always)]
61    pub fn try_read(&mut self) -> Option<[T; N]> {
62        if !self.valid {
63            self.stats.record_underflow();
64            return None;
65        }
66        let mut result = [T::ZERO; N];
67        result.copy_from_slice(&self.storage);
68        self.valid = false;
69        self.read_seq += 1;
70        self.stats.record_read();
71        self.stats.update_peak(0);
72        Some(result)
73    }
74
75    /// Busy-wait until data is available, then read destructively.
76    pub fn read_blocking(&mut self) -> [T; N] {
77        loop {
78            if let Some(data) = self.try_read() {
79                return data;
80            }
81            core::hint::spin_loop();
82        }
83    }
84
85    /// Whether data is available for reading.
86    pub fn has_data(&self) -> bool {
87        self.valid
88    }
89    /// Number of writes performed.
90    pub fn write_seq(&self) -> usize {
91        self.write_seq
92    }
93    /// Number of reads performed.
94    pub fn read_seq(&self) -> usize {
95        self.read_seq
96    }
97    /// Whether all writes have been consumed by reads.
98    pub fn is_caught_up(&self) -> bool {
99        self.write_seq == self.read_seq
100    }
101    /// Number of times the buffer was overwritten before being read.
102    pub fn overwrites(&self) -> usize {
103        self.write_seq.saturating_sub(self.read_seq + 1)
104    }
105
106    /// Reset to initial empty state.
107    pub fn reset(&mut self) {
108        self.valid = false;
109        self.stats.reset();
110    }
111}
112
113impl<T: Transcendental, const N: usize> SignalBuffer<T> for PipeBuffer<T, N> {
114    fn capacity(&self) -> usize {
115        N
116    }
117    fn len(&self) -> usize {
118        if self.valid {
119            1
120        } else {
121            0
122        }
123    }
124    fn is_empty(&self) -> bool {
125        !self.valid
126    }
127    fn is_full(&self) -> bool {
128        self.valid
129    }
130    fn clear(&mut self) {
131        self.valid = false;
132        self.stats.reset();
133    }
134    fn stats(&self) -> BufferStats {
135        let mut stats = self.stats.snapshot();
136        stats.fill_level = if self.valid { 1.0 } else { 0.0 };
137        stats
138    }
139    fn reset_stats(&mut self) {
140        self.stats.reset();
141    }
142}
143
144impl<T: Transcendental, const N: usize> Default for PipeBuffer<T, N> {
145    fn default() -> Self {
146        Self::new()
147    }
148}
149
150impl<T: Transcendental + fmt::Debug, const N: usize> fmt::Debug for PipeBuffer<T, N> {
151    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152        f.debug_struct("PipeBuffer")
153            .field("capacity", &N)
154            .field("has_data", &self.valid)
155            .field("write_seq", &self.write_seq)
156            .field("read_seq", &self.read_seq)
157            .field("overwrites", &self.overwrites())
158            .field("stats", &self.stats.snapshot())
159            .field("alignment", &CACHE_LINE_SIZE)
160            .finish()
161    }
162}
163
164impl<T: Transcendental + Copy, const N: usize> Clone for PipeBuffer<T, N> {
165    fn clone(&self) -> Self {
166        let mut new = Self::new();
167        if self.valid {
168            new.write(&self.storage);
169        }
170        new
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177
178    #[test]
179    fn test_pipe_buffer_basic() {
180        let mut buffer = PipeBuffer::<f32, 64>::new();
181        let write_data = [42.0; 64];
182        buffer.write(&write_data);
183        assert!(buffer.has_data());
184        assert_eq!(buffer.write_seq(), 1);
185        let read_data = buffer.try_read().unwrap();
186        assert_eq!(read_data[0], 42.0);
187        assert_eq!(buffer.read_seq(), 1);
188        assert!(buffer.is_caught_up());
189    }
190}