Skip to main content

rill_core/buffer/
pipe.rs

1use super::array_from_fn;
2use crate::buffer::{AtomicStats, Buffer, BufferStats, CACHE_LINE_SIZE};
3use crate::math::Transcendental;
4use core::marker::PhantomData;
5use std::fmt;
6
7/// Single-producer, single-consumer buffer for intra-graph node connections.
8///
9/// Unlike its name suggests, this is **not** thread-safe — it is used
10/// exclusively within the single-threaded signal graph. For cross-thread
11/// communication use [`rill_core::queues`](crate::queues).
12#[repr(align(64))]
13pub struct PipeBuffer<T: Transcendental, const N: usize> {
14    storage: [T; N],
15    valid: bool,
16    write_seq: usize,
17    read_seq: usize,
18    stats: AtomicStats,
19    _phantom: PhantomData<[T; N]>,
20}
21
22impl<T: Transcendental, const N: usize> PipeBuffer<T, N> {
23    /// Create a new empty pipe buffer.
24    pub fn new() -> Self {
25        let storage = array_from_fn(|_| T::ZERO);
26        Self {
27            storage,
28            valid: false,
29            write_seq: 0,
30            read_seq: 0,
31            stats: AtomicStats::new(),
32            _phantom: PhantomData,
33        }
34    }
35
36    /// Write a block of data. Subsequent reads will return this data.
37    #[inline(always)]
38    pub fn write(&mut self, data: &[T; N]) {
39        self.storage.copy_from_slice(data);
40        self.valid = true;
41        self.write_seq += 1;
42        self.stats.record_write();
43        self.stats.update_peak(1);
44    }
45
46    /// Read the latest written data (non-destructive).
47    #[inline(always)]
48    pub fn read(&mut self) -> Option<[T; N]> {
49        if !self.valid {
50            return None;
51        }
52        let mut result = [T::ZERO; N];
53        result.copy_from_slice(&self.storage);
54        self.read_seq += 1;
55        self.stats.record_read();
56        Some(result)
57    }
58
59    /// Read the latest written data (destructive — clears the valid flag).
60    #[inline(always)]
61    pub fn try_read(&mut self) -> Option<[T; N]> {
62        if !self.valid {
63            self.stats.record_underflow();
64            return None;
65        }
66        let mut result = [T::ZERO; N];
67        result.copy_from_slice(&self.storage);
68        self.valid = false;
69        self.read_seq += 1;
70        self.stats.record_read();
71        self.stats.update_peak(0);
72        Some(result)
73    }
74
75    /// Busy-wait until data is available, then read destructively.
76    pub fn read_blocking(&mut self) -> [T; N] {
77        loop {
78            if let Some(data) = self.try_read() {
79                return data;
80            }
81            core::hint::spin_loop();
82        }
83    }
84
85    /// Whether data is available for reading.
86    pub fn has_data(&self) -> bool {
87        self.valid
88    }
89    /// Number of writes performed.
90    pub fn write_seq(&self) -> usize {
91        self.write_seq
92    }
93    /// Number of reads performed.
94    pub fn read_seq(&self) -> usize {
95        self.read_seq
96    }
97    /// Whether all writes have been consumed by reads.
98    pub fn is_caught_up(&self) -> bool {
99        self.write_seq == self.read_seq
100    }
101    /// Number of times the buffer was overwritten before being read.
102    pub fn overwrites(&self) -> usize {
103        self.write_seq.saturating_sub(self.read_seq + 1)
104    }
105
106    /// Reset to initial empty state.
107    pub fn reset(&mut self) {
108        self.valid = false;
109        self.stats.reset();
110    }
111}
112
113impl<T: Transcendental, const N: usize> Buffer<T> for PipeBuffer<T, N> {
114    fn capacity(&self) -> usize {
115        N
116    }
117    fn len(&self) -> usize {
118        if self.valid {
119            1
120        } else {
121            0
122        }
123    }
124    fn is_empty(&self) -> bool {
125        !self.valid
126    }
127    fn is_full(&self) -> bool {
128        self.valid
129    }
130    fn as_slice(&self) -> &[T] {
131        &self.storage
132    }
133    fn as_mut_slice(&mut self) -> &mut [T] {
134        &mut self.storage
135    }
136    fn fill(&mut self, value: T) {
137        self.storage.fill(value);
138    }
139    fn copy_from(&mut self, src: &[T]) {
140        let len = src.len().min(N);
141        self.storage[..len].copy_from_slice(&src[..len]);
142    }
143    fn clear(&mut self) {
144        self.valid = false;
145        self.stats.reset();
146    }
147    fn stats(&self) -> BufferStats {
148        let mut stats = self.stats.snapshot();
149        stats.fill_level = if self.valid { 1.0 } else { 0.0 };
150        stats
151    }
152    fn reset_stats(&mut self) {
153        self.stats.reset();
154    }
155}
156
157impl<T: Transcendental, const N: usize> Default for PipeBuffer<T, N> {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163impl<T: Transcendental + fmt::Debug, const N: usize> fmt::Debug for PipeBuffer<T, N> {
164    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165        f.debug_struct("PipeBuffer")
166            .field("capacity", &N)
167            .field("has_data", &self.valid)
168            .field("write_seq", &self.write_seq)
169            .field("read_seq", &self.read_seq)
170            .field("overwrites", &self.overwrites())
171            .field("stats", &self.stats.snapshot())
172            .field("alignment", &CACHE_LINE_SIZE)
173            .finish()
174    }
175}
176
177impl<T: Transcendental + Copy, const N: usize> Clone for PipeBuffer<T, N> {
178    fn clone(&self) -> Self {
179        let mut new = Self::new();
180        if self.valid {
181            new.write(&self.storage);
182        }
183        new
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190
191    #[test]
192    fn test_pipe_buffer_basic() {
193        let mut buffer = PipeBuffer::<f32, 64>::new();
194        let write_data = [42.0; 64];
195        buffer.write(&write_data);
196        assert!(buffer.has_data());
197        assert_eq!(buffer.write_seq(), 1);
198        let read_data = buffer.try_read().unwrap();
199        assert_eq!(read_data[0], 42.0);
200        assert_eq!(buffer.read_seq(), 1);
201        assert!(buffer.is_caught_up());
202    }
203}