rill_core/buffer/pipe.rs
1//! # Point-to-point buffer for single producer, single consumer connections
2
3use super::array_from_fn;
4use crate::buffer::{AtomicCell, AtomicStats, SignalBuffer, BufferStats, CACHE_LINE_SIZE};
5use crate::math::Transcendental;
6use core::marker::PhantomData;
7use core::sync::atomic::{AtomicBool, Ordering};
8use std::fmt;
9
10// ============================================================================
11// PipeBuffer
12// ============================================================================
13
14/// Single-producer, single-consumer buffer for node connections
15///
16/// This buffer provides wait-free operations and minimal overhead.
17/// It is ideal for point-to-point connections between audio nodes.
18///
19/// # Type Parameters
20/// - `T`: Audio sample type (f32 or f64) implementing `Transcendental`
21/// - `N`: Buffer size (number of samples per block)
22#[repr(align(64))]
23pub struct PipeBuffer<T: Transcendental, const N: usize> {
24 /// Storage for the buffer using AtomicCell for each sample
25 /// This provides safe concurrent access without unsafe code
26 storage: [AtomicCell<T>; N],
27
28 /// Flag indicating if buffer contains valid data
29 /// - `true`: data available for reading
30 /// - `false`: buffer empty
31 valid: AtomicBool,
32
33 /// Write sequence number (monotonically increasing)
34 /// Used for debugging and detecting overwrites
35 write_seq: AtomicCell<usize>,
36
37 /// Read sequence number (monotonically increasing)
38 /// Used for debugging and detecting underruns
39 read_seq: AtomicCell<usize>,
40
41 /// Atomic statistics for performance monitoring
42 stats: AtomicStats,
43
44 /// Phantom data to satisfy const generic
45 _phantom: PhantomData<[T; N]>,
46}
47
48impl<T: Transcendental, const N: usize> PipeBuffer<T, N> {
49 /// Create a new pipe buffer
50 ///
51 /// The buffer starts empty with no data available.
52 pub fn new() -> Self {
53 // Create storage with default values (T::ZERO)
54 let storage = array_from_fn(|_| AtomicCell::new(T::ZERO));
55
56 Self {
57 storage,
58 valid: AtomicBool::new(false),
59 write_seq: AtomicCell::new(0),
60 read_seq: AtomicCell::new(0),
61 stats: AtomicStats::new(),
62 _phantom: PhantomData,
63 }
64 }
65
66 /// Write a block of data to the buffer
67 ///
68 /// This operation is wait-free and will overwrite any existing data.
69 /// The buffer holds at most one block at a time - new writes always
70 /// overwrite the previous block, regardless of whether it was read.
71 ///
72 /// # Arguments
73 /// * `data` - Array of samples to write (must be exactly `N` samples)
74 #[inline(always)]
75 pub fn write(&self, data: &[T; N]) {
76 // Copy data to storage using AtomicCell's store
77 // This is safe and doesn't require unsafe code
78 for i in 0..N {
79 self.storage[i].store(data[i]);
80 }
81
82 // Mark as valid (release ordering ensures data is visible)
83 self.valid.store(true, Ordering::Release);
84 self.write_seq.store(self.write_seq.load() + 1);
85
86 // Update statistics
87 self.stats.record_write();
88 self.stats.update_peak(1);
89 }
90
91 /// Read a block without consuming (multiple consumers can read the same data)
92 ///
93 /// Unlike `try_read`, this does not mark the buffer as empty.
94 /// Multiple callers can read the same data before the next write.
95 #[inline(always)]
96 pub fn read(&self) -> Option<[T; N]> {
97 if !self.valid.load(Ordering::Acquire) {
98 return None;
99 }
100 let mut result = [T::ZERO; N];
101 for i in 0..N {
102 result[i] = self.storage[i].load();
103 }
104 self.read_seq.store(self.read_seq.load() + 1);
105 self.stats.record_read();
106 Some(result)
107 }
108
109 /// Try to read a block of data from the buffer
110 ///
111 /// Returns `Some(data)` if data is available, `None` otherwise.
112 /// This operation is wait-free and non-blocking. This call consumes
113 /// the data — subsequent readers will get `None` until the next write.
114 #[inline(always)]
115 pub fn try_read(&self) -> Option<[T; N]> {
116 if !self.valid.load(Ordering::Acquire) {
117 self.stats.record_underflow();
118 return None;
119 }
120
121 let mut result = [T::ZERO; N];
122 for i in 0..N {
123 // AtomicCell's load is safe and doesn't require unsafe code
124 result[i] = self.storage[i].load();
125 }
126
127 self.valid.store(false, Ordering::Release);
128 self.read_seq.store(self.read_seq.load() + 1);
129
130 self.stats.record_read();
131 self.stats.update_peak(0);
132
133 Some(result)
134 }
135
136 /// Read data, blocking until available (for non-real-time use)
137 ///
138 /// This is a convenience method for non-real-time contexts like
139 /// testing or offline processing. It spins until data is available.
140 pub fn read_blocking(&self) -> [T; N] {
141 loop {
142 if let Some(data) = self.try_read() {
143 return data;
144 }
145 core::hint::spin_loop();
146 }
147 }
148
149 /// Check if buffer has valid data available
150 #[inline(always)]
151 pub fn has_data(&self) -> bool {
152 self.valid.load(Ordering::Acquire)
153 }
154
155 /// Get write sequence number (for debugging)
156 pub fn write_seq(&self) -> usize {
157 self.write_seq.load()
158 }
159
160 /// Get read sequence number (for debugging)
161 pub fn read_seq(&self) -> usize {
162 self.read_seq.load()
163 }
164
165 /// Check if reader is caught up with writer
166 pub fn is_caught_up(&self) -> bool {
167 self.write_seq() == self.read_seq()
168 }
169
170 /// Get the number of overwritten blocks (for debugging)
171 pub fn overwrites(&self) -> usize {
172 self.write_seq().saturating_sub(self.read_seq() + 1)
173 }
174
175 /// Reset the buffer to empty state
176 ///
177 /// Clears the valid flag and resets statistics.
178 /// Does not actually zero the memory (not needed for correctness).
179 pub fn reset(&self) {
180 self.valid.store(false, Ordering::Release);
181 self.stats.reset();
182 }
183}
184
185// ============================================================================
186// SignalBuffer Implementation
187// ============================================================================
188
189impl<T: Transcendental, const N: usize> SignalBuffer<T> for PipeBuffer<T, N> {
190 fn capacity(&self) -> usize {
191 N
192 }
193
194 fn len(&self) -> usize {
195 if self.has_data() {
196 1
197 } else {
198 0
199 }
200 }
201
202 fn is_empty(&self) -> bool {
203 !self.has_data()
204 }
205
206 fn is_full(&self) -> bool {
207 self.has_data()
208 }
209
210 fn clear(&mut self) {
211 self.valid.store(false, Ordering::Release);
212 self.stats.reset();
213 }
214
215 fn stats(&self) -> BufferStats {
216 let mut stats = self.stats.snapshot();
217 stats.fill_level = if self.has_data() { 1.0 } else { 0.0 };
218 stats
219 }
220
221 fn reset_stats(&mut self) {
222 self.stats.reset();
223 }
224}
225
226// ============================================================================
227// Default Implementation
228// ============================================================================
229
230impl<T: Transcendental, const N: usize> Default for PipeBuffer<T, N> {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236// ============================================================================
237// Debug Implementation
238// ============================================================================
239
240impl<T: Transcendental + fmt::Debug, const N: usize> fmt::Debug for PipeBuffer<T, N> {
241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242 f.debug_struct("PipeBuffer")
243 .field("capacity", &N)
244 .field("has_data", &self.has_data())
245 .field("write_seq", &self.write_seq())
246 .field("read_seq", &self.read_seq())
247 .field("overwrites", &self.overwrites())
248 .field("stats", &self.stats.snapshot())
249 .field("alignment", &CACHE_LINE_SIZE)
250 .finish()
251 }
252}
253
254// ============================================================================
255// Clone Implementation
256// ============================================================================
257
258impl<T: Transcendental + Copy, const N: usize> Clone for PipeBuffer<T, N> {
259 fn clone(&self) -> Self {
260 let new = Self::new();
261
262 // If this buffer has data, copy it
263 if let Some(data) = self.try_read() {
264 new.write(&data);
265 }
266
267 new
268 }
269}
270
271// ============================================================================
272// Tests
273// ============================================================================
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278
279 #[test]
280 fn test_pipe_buffer_basic() {
281 let buffer = PipeBuffer::<f32, 64>::new();
282
283 let write_data = [42.0; 64];
284 buffer.write(&write_data);
285
286 assert!(buffer.has_data());
287 assert_eq!(buffer.write_seq(), 1);
288
289 let read_data = buffer.try_read().unwrap();
290 assert_eq!(read_data[0], 42.0);
291 assert_eq!(buffer.read_seq(), 1);
292 assert!(buffer.is_caught_up());
293 }
294}