Skip to main content

rill_core/buffer/
pipe.rs

1//! # Point-to-point buffer for single producer, single consumer connections
2
3use super::array_from_fn;
4use crate::buffer::{AtomicCell, AtomicStats, SignalBuffer, BufferStats, CACHE_LINE_SIZE};
5use crate::math::Transcendental;
6use core::marker::PhantomData;
7use core::sync::atomic::{AtomicBool, Ordering};
8use std::fmt;
9
10// ============================================================================
11// PipeBuffer
12// ============================================================================
13
14/// Single-producer, single-consumer buffer for node connections
15///
16/// This buffer provides wait-free operations and minimal overhead.
17/// It is ideal for point-to-point connections between audio nodes.
18///
19/// # Type Parameters
20/// - `T`: Audio sample type (f32 or f64) implementing `Transcendental`
21/// - `N`: Buffer size (number of samples per block)
22#[repr(align(64))]
23pub struct PipeBuffer<T: Transcendental, const N: usize> {
24    /// Storage for the buffer using AtomicCell for each sample
25    /// This provides safe concurrent access without unsafe code
26    storage: [AtomicCell<T>; N],
27
28    /// Flag indicating if buffer contains valid data
29    /// - `true`: data available for reading
30    /// - `false`: buffer empty
31    valid: AtomicBool,
32
33    /// Write sequence number (monotonically increasing)
34    /// Used for debugging and detecting overwrites
35    write_seq: AtomicCell<usize>,
36
37    /// Read sequence number (monotonically increasing)
38    /// Used for debugging and detecting underruns
39    read_seq: AtomicCell<usize>,
40
41    /// Atomic statistics for performance monitoring
42    stats: AtomicStats,
43
44    /// Phantom data to satisfy const generic
45    _phantom: PhantomData<[T; N]>,
46}
47
48impl<T: Transcendental, const N: usize> PipeBuffer<T, N> {
49    /// Create a new pipe buffer
50    ///
51    /// The buffer starts empty with no data available.
52    pub fn new() -> Self {
53        // Create storage with default values (T::ZERO)
54        let storage = array_from_fn(|_| AtomicCell::new(T::ZERO));
55
56        Self {
57            storage,
58            valid: AtomicBool::new(false),
59            write_seq: AtomicCell::new(0),
60            read_seq: AtomicCell::new(0),
61            stats: AtomicStats::new(),
62            _phantom: PhantomData,
63        }
64    }
65
66    /// Write a block of data to the buffer
67    ///
68    /// This operation is wait-free and will overwrite any existing data.
69    /// The buffer holds at most one block at a time - new writes always
70    /// overwrite the previous block, regardless of whether it was read.
71    ///
72    /// # Arguments
73    /// * `data` - Array of samples to write (must be exactly `N` samples)
74    #[inline(always)]
75    pub fn write(&self, data: &[T; N]) {
76        // Copy data to storage using AtomicCell's store
77        // This is safe and doesn't require unsafe code
78        for i in 0..N {
79            self.storage[i].store(data[i]);
80        }
81
82        // Mark as valid (release ordering ensures data is visible)
83        self.valid.store(true, Ordering::Release);
84        self.write_seq.store(self.write_seq.load() + 1);
85
86        // Update statistics
87        self.stats.record_write();
88        self.stats.update_peak(1);
89    }
90
91    /// Read a block without consuming (multiple consumers can read the same data)
92    ///
93    /// Unlike `try_read`, this does not mark the buffer as empty.
94    /// Multiple callers can read the same data before the next write.
95    #[inline(always)]
96    pub fn read(&self) -> Option<[T; N]> {
97        if !self.valid.load(Ordering::Acquire) {
98            return None;
99        }
100        let mut result = [T::ZERO; N];
101        for i in 0..N {
102            result[i] = self.storage[i].load();
103        }
104        self.read_seq.store(self.read_seq.load() + 1);
105        self.stats.record_read();
106        Some(result)
107    }
108
109    /// Try to read a block of data from the buffer
110    ///
111    /// Returns `Some(data)` if data is available, `None` otherwise.
112    /// This operation is wait-free and non-blocking. This call consumes
113    /// the data — subsequent readers will get `None` until the next write.
114    #[inline(always)]
115    pub fn try_read(&self) -> Option<[T; N]> {
116        if !self.valid.load(Ordering::Acquire) {
117            self.stats.record_underflow();
118            return None;
119        }
120
121        let mut result = [T::ZERO; N];
122        for i in 0..N {
123            // AtomicCell's load is safe and doesn't require unsafe code
124            result[i] = self.storage[i].load();
125        }
126
127        self.valid.store(false, Ordering::Release);
128        self.read_seq.store(self.read_seq.load() + 1);
129
130        self.stats.record_read();
131        self.stats.update_peak(0);
132
133        Some(result)
134    }
135
136    /// Read data, blocking until available (for non-real-time use)
137    ///
138    /// This is a convenience method for non-real-time contexts like
139    /// testing or offline processing. It spins until data is available.
140    pub fn read_blocking(&self) -> [T; N] {
141        loop {
142            if let Some(data) = self.try_read() {
143                return data;
144            }
145            core::hint::spin_loop();
146        }
147    }
148
149    /// Check if buffer has valid data available
150    #[inline(always)]
151    pub fn has_data(&self) -> bool {
152        self.valid.load(Ordering::Acquire)
153    }
154
155    /// Get write sequence number (for debugging)
156    pub fn write_seq(&self) -> usize {
157        self.write_seq.load()
158    }
159
160    /// Get read sequence number (for debugging)
161    pub fn read_seq(&self) -> usize {
162        self.read_seq.load()
163    }
164
165    /// Check if reader is caught up with writer
166    pub fn is_caught_up(&self) -> bool {
167        self.write_seq() == self.read_seq()
168    }
169
170    /// Get the number of overwritten blocks (for debugging)
171    pub fn overwrites(&self) -> usize {
172        self.write_seq().saturating_sub(self.read_seq() + 1)
173    }
174
175    /// Reset the buffer to empty state
176    ///
177    /// Clears the valid flag and resets statistics.
178    /// Does not actually zero the memory (not needed for correctness).
179    pub fn reset(&self) {
180        self.valid.store(false, Ordering::Release);
181        self.stats.reset();
182    }
183}
184
185// ============================================================================
186// SignalBuffer Implementation
187// ============================================================================
188
189impl<T: Transcendental, const N: usize> SignalBuffer<T> for PipeBuffer<T, N> {
190    fn capacity(&self) -> usize {
191        N
192    }
193
194    fn len(&self) -> usize {
195        if self.has_data() {
196            1
197        } else {
198            0
199        }
200    }
201
202    fn is_empty(&self) -> bool {
203        !self.has_data()
204    }
205
206    fn is_full(&self) -> bool {
207        self.has_data()
208    }
209
210    fn clear(&mut self) {
211        self.valid.store(false, Ordering::Release);
212        self.stats.reset();
213    }
214
215    fn stats(&self) -> BufferStats {
216        let mut stats = self.stats.snapshot();
217        stats.fill_level = if self.has_data() { 1.0 } else { 0.0 };
218        stats
219    }
220
221    fn reset_stats(&mut self) {
222        self.stats.reset();
223    }
224}
225
226// ============================================================================
227// Default Implementation
228// ============================================================================
229
230impl<T: Transcendental, const N: usize> Default for PipeBuffer<T, N> {
231    fn default() -> Self {
232        Self::new()
233    }
234}
235
236// ============================================================================
237// Debug Implementation
238// ============================================================================
239
240impl<T: Transcendental + fmt::Debug, const N: usize> fmt::Debug for PipeBuffer<T, N> {
241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242        f.debug_struct("PipeBuffer")
243            .field("capacity", &N)
244            .field("has_data", &self.has_data())
245            .field("write_seq", &self.write_seq())
246            .field("read_seq", &self.read_seq())
247            .field("overwrites", &self.overwrites())
248            .field("stats", &self.stats.snapshot())
249            .field("alignment", &CACHE_LINE_SIZE)
250            .finish()
251    }
252}
253
254// ============================================================================
255// Clone Implementation
256// ============================================================================
257
258impl<T: Transcendental + Copy, const N: usize> Clone for PipeBuffer<T, N> {
259    fn clone(&self) -> Self {
260        let new = Self::new();
261
262        // If this buffer has data, copy it
263        if let Some(data) = self.try_read() {
264            new.write(&data);
265        }
266
267        new
268    }
269}
270
271// ============================================================================
272// Tests
273// ============================================================================
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    #[test]
280    fn test_pipe_buffer_basic() {
281        let buffer = PipeBuffer::<f32, 64>::new();
282
283        let write_data = [42.0; 64];
284        buffer.write(&write_data);
285
286        assert!(buffer.has_data());
287        assert_eq!(buffer.write_seq(), 1);
288
289        let read_data = buffer.try_read().unwrap();
290        assert_eq!(read_data[0], 42.0);
291        assert_eq!(buffer.read_seq(), 1);
292        assert!(buffer.is_caught_up());
293    }
294}