rill_core/buffer/mod.rs
1//! # Signal Buffers for single-threaded signal processing
2//!
3//! This module provides real-time safe buffers used by graph nodes inside
4//! the signal thread. All buffers are **single-threaded** — they contain no
5//! atomics or locks. Cross-thread communication goes through
6//! [`rill_core::queues`](crate::queues).
7//!
8//! ## Buffer Types
9//!
10//! | Buffer | Description | Use Case |
11//! |--------|-------------|----------|
12//! | [`PipeBuffer`] | Single-producer, single-consumer | Point-to-point node connections |
13//! | [`FanOutBuffer`] | One producer, multiple consumers | Broadcast signals to multiple nodes |
14//! | [`FanInBuffer`] | Multiple producers, one consumer | Mix multiple signals |
15//! | [`DelayLine`] | Circular buffer with delay | Effects like echo, reverb |
16//! | [`RingBuffer`] | Multi-producer, multi-consumer | Generic queue for any scenario |
17//! | [`TapeLoop`](crate::buffer::TapeLoop) | Heap-allocated circular buffer | Tape delay with large capacity |
18//!
19//! ## Features
20//!
21//! - **Real-time safe** - No allocations, no blocking, no system calls
22//! - **Single-threaded** - No atomics, no locks, minimal overhead
23//! - **Cache-line aligned** - Prevents false sharing
24//! - **Statistically monitored** - Track performance metrics
25//! - **Type-safe** - Generic over [`Scalar`](crate::math::Scalar) (f32, f64, integers)
26
27use core::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
28use std::fmt;
29
30use crate::math::Transcendental;
31
32// ============================================================================
33// Submodules
34// ============================================================================
35
36mod buffer_trait;
37mod delay;
38mod fan;
39mod pipe;
40mod registry;
41mod ring;
42mod storage;
43mod tape;
44
45// ============================================================================
46// Re-exports
47// ============================================================================
48
49pub use buffer_trait::{Buffer, FixedBuffer, HeapBuffer};
50pub use delay::DelayLine;
51pub use fan::{FanInBuffer, FanOutBuffer};
52pub use pipe::PipeBuffer;
53pub use registry::BufferRegistry;
54pub use ring::RingBuffer;
55pub use storage::{AtomicCell, AtomicCellError};
56pub use tape::TapeLoop;
57
58// ============================================================================
59// Constants
60// ============================================================================
61
62/// Cache line size for alignment (64 bytes on x86_64)
63///
64/// This is the typical size of a CPU cache line. Aligning buffers to this
65/// boundary prevents false sharing between threads running on different cores.
66pub const CACHE_LINE_SIZE: usize = 64;
67
68/// Default buffer size for most use cases
69pub const DEFAULT_BUFFER_SIZE: usize = 1024;
70
71/// Maximum buffer size (2^16 = 65536 samples)
72pub const MAX_BUFFER_SIZE: usize = 65536;
73
74/// Minimum buffer size (must be at least 16 for most algorithms)
75pub const MIN_BUFFER_SIZE: usize = 16;
76
77// ============================================================================
78// Atomic Statistics
79// ============================================================================
80
81/// Atomic statistics for safe concurrent access
82///
83/// This structure provides lock-free atomic counters for buffer statistics.
84/// It can be safely shared between threads without mutexes.
85///
86/// # Memory Layout
87/// The structure is cache-line aligned to prevent false sharing.
88///
89/// # Thread Safety
90/// All operations are atomic and use relaxed ordering where appropriate.
91#[repr(align(64))]
92pub struct AtomicStats {
93 /// Total number of successful writes
94 writes: AtomicU64,
95
96 /// Total number of successful reads
97 reads: AtomicU64,
98
99 /// Number of underflow events (read when empty)
100 underflows: AtomicU64,
101
102 /// Number of overflow events (write when full)
103 overflows: AtomicU64,
104
105 /// Peak fill level (0-1000 representing 0.0-1.0)
106 /// Stored as fixed-point for atomic operations
107 peak_fill: AtomicUsize,
108}
109
110impl AtomicStats {
111 /// Create new atomic statistics with all counters set to zero
112 pub const fn new() -> Self {
113 Self {
114 writes: AtomicU64::new(0),
115 reads: AtomicU64::new(0),
116 underflows: AtomicU64::new(0),
117 overflows: AtomicU64::new(0),
118 peak_fill: AtomicUsize::new(0),
119 }
120 }
121
122 /// Record a successful write operation
123 #[inline(always)]
124 pub fn record_write(&self) {
125 self.writes.fetch_add(1, Ordering::Relaxed);
126 }
127
128 /// Record a successful read operation
129 #[inline(always)]
130 pub fn record_read(&self) {
131 self.reads.fetch_add(1, Ordering::Relaxed);
132 }
133
134 /// Record an underflow event (read when empty)
135 #[inline(always)]
136 pub fn record_underflow(&self) {
137 self.underflows.fetch_add(1, Ordering::Relaxed);
138 }
139
140 /// Record an overflow event (write when full)
141 #[inline(always)]
142 pub fn record_overflow(&self) {
143 self.overflows.fetch_add(1, Ordering::Relaxed);
144 }
145
146 /// Update peak fill level (0-1000 representing 0.0-1.0)
147 ///
148 /// # Arguments
149 /// * `current_fill` - Current fill level (0-1000)
150 ///
151 /// This uses a compare-exchange loop to atomically update the peak.
152 #[inline(always)]
153 pub fn update_peak(&self, current_fill: usize) {
154 let mut peak = self.peak_fill.load(Ordering::Relaxed);
155 while current_fill > peak {
156 match self.peak_fill.compare_exchange_weak(
157 peak,
158 current_fill,
159 Ordering::Relaxed,
160 Ordering::Relaxed,
161 ) {
162 Ok(_) => break,
163 Err(new_peak) => peak = new_peak,
164 }
165 }
166 }
167
168 /// Get a consistent snapshot of current statistics
169 ///
170 /// # Returns
171 /// A `BufferStats` struct with a snapshot of all counters.
172 /// Note that the snapshot may not be perfectly consistent due to
173 /// concurrent updates, but it's good enough for monitoring.
174 pub fn snapshot(&self) -> BufferStats {
175 BufferStats {
176 writes: self.writes.load(Ordering::Relaxed),
177 reads: self.reads.load(Ordering::Relaxed),
178 underflows: self.underflows.load(Ordering::Relaxed),
179 overflows: self.overflows.load(Ordering::Relaxed),
180 fill_level: 0.0, // To be filled by caller with current fill level
181 peak_fill: self.peak_fill.load(Ordering::Relaxed) as f32 / 1000.0,
182 }
183 }
184
185 /// Reset all statistics to zero
186 pub fn reset(&self) {
187 self.writes.store(0, Ordering::Relaxed);
188 self.reads.store(0, Ordering::Relaxed);
189 self.underflows.store(0, Ordering::Relaxed);
190 self.overflows.store(0, Ordering::Relaxed);
191 self.peak_fill.store(0, Ordering::Relaxed);
192 }
193}
194
195impl Default for AtomicStats {
196 fn default() -> Self {
197 Self::new()
198 }
199}
200
201impl fmt::Debug for AtomicStats {
202 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203 f.debug_struct("AtomicStats")
204 .field("writes", &self.writes.load(Ordering::Relaxed))
205 .field("reads", &self.reads.load(Ordering::Relaxed))
206 .field("underflows", &self.underflows.load(Ordering::Relaxed))
207 .field("overflows", &self.overflows.load(Ordering::Relaxed))
208 .field(
209 "peak_fill",
210 &(self.peak_fill.load(Ordering::Relaxed) as f32 / 1000.0),
211 )
212 .finish()
213 }
214}
215
216// ============================================================================
217// Buffer Statistics
218// ============================================================================
219
220/// Buffer statistics snapshot for monitoring and debugging
221///
222/// This struct provides a read-only snapshot of buffer performance metrics.
223/// It's typically obtained via `Buffer::stats()`.
224#[derive(Debug, Default, Clone, Copy)]
225pub struct BufferStats {
226 /// Total number of successful write operations
227 pub writes: u64,
228
229 /// Total number of successful read operations
230 pub reads: u64,
231
232 /// Number of underflow events (read when empty)
233 pub underflows: u64,
234
235 /// Number of overflow events (write when full)
236 pub overflows: u64,
237
238 /// Current fill level (0.0 to 1.0)
239 pub fill_level: f32,
240
241 /// Peak fill level since last reset (0.0 to 1.0)
242 pub peak_fill: f32,
243}
244
245impl BufferStats {
246 /// Create a new zeroed statistics snapshot
247 pub fn new() -> Self {
248 Self::default()
249 }
250
251 /// Calculate the success rate (reads / writes)
252 ///
253 /// Returns 1.0 if no writes, otherwise reads/writes.
254 pub fn success_rate(&self) -> f32 {
255 if self.writes == 0 {
256 1.0
257 } else {
258 self.reads as f32 / self.writes as f32
259 }
260 }
261
262 /// Calculate the error rate (underflows + overflows) / operations
263 pub fn error_rate(&self) -> f32 {
264 let total = self.writes + self.reads;
265 if total == 0 {
266 0.0
267 } else {
268 (self.underflows + self.overflows) as f32 / total as f32
269 }
270 }
271}
272
273// The unified Buffer trait (defined in buffer_trait) replaces both the
274// original Buffer and the former SignalBuffer trait.
275
276// ============================================================================
277// Aligned Storage
278// ============================================================================
279
280// Cache-line aligned storage for lock-free buffers
281//
282// This type provides aligned storage that can be safely shared between threads.
283// It is not `Copy` or `Clone` by design - use references or pointers.
284//
285// # Type Parameters
286// - `T`: The sample type (must implement `Transcendental`)
287// - `N`: The number of elements
288// # Safety
289// This type uses `UnsafeCell` for interior mutability and `MaybeUninit`
290// for uninitialized data. Users must ensure proper initialization before reading.
291
292// ============================================================================
293// Utility Functions
294// ============================================================================
295
296/// Utility functions for common buffer operations
297pub mod utils {
298 use super::*;
299
300 /// Copy data from one slice to another with bounds checking
301 ///
302 /// # Arguments
303 /// * `src` - Source slice
304 /// * `dst` - Destination slice
305 ///
306 /// # Returns
307 /// The number of elements copied
308 #[inline(always)]
309 pub fn copy_safe<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
310 let len = src.len().min(dst.len());
311 dst[..len].copy_from_slice(&src[..len]);
312 len
313 }
314
315 /// Fill slice with zeroes
316 ///
317 /// # Arguments
318 /// * `slice` - The slice to fill
319 #[inline(always)]
320 pub fn zero_fill<T: Default + Copy>(slice: &mut [T]) {
321 for item in slice.iter_mut() {
322 *item = T::default();
323 }
324 }
325
326 /// Mix two slices with gain
327 ///
328 /// # Arguments
329 /// * `src` - Source slice to mix in
330 /// * `dst` - Destination slice (will be modified)
331 /// * `gain` - Gain to apply to source
332 #[inline(always)]
333 pub fn mix_with_gain<T>(src: &[T], dst: &mut [T], gain: T)
334 where
335 T: Transcendental + core::ops::Mul<Output = T> + core::ops::Add<Output = T>,
336 {
337 let len = src.len().min(dst.len());
338 for i in 0..len {
339 dst[i] += src[i] * gain;
340 }
341 }
342
343 /// Apply gain to slice
344 ///
345 /// # Arguments
346 /// * `slice` - The slice to modify
347 /// * `gain` - Gain to apply
348 #[inline(always)]
349 pub fn apply_gain<T>(slice: &mut [T], gain: T)
350 where
351 T: Transcendental + core::ops::Mul<Output = T>,
352 {
353 for item in slice.iter_mut() {
354 *item *= gain;
355 }
356 }
357
358 /// Calculate RMS of slice
359 ///
360 /// # Arguments
361 /// * `slice` - The slice to analyze
362 ///
363 /// # Returns
364 /// The RMS value
365 #[inline(always)]
366 pub fn calculate_rms<T>(slice: &[T]) -> f64
367 where
368 T: Transcendental + core::ops::Mul<Output = T> + core::iter::Sum,
369 {
370 let sum_squares: T = slice.iter().map(|&x| x * x).sum();
371 let sum_f64: f64 = sum_squares.to_f64();
372 (sum_f64 / slice.len() as f64).sqrt()
373 }
374
375 /// Calculate peak of slice
376 ///
377 /// # Arguments
378 /// * `slice` - The slice to analyze
379 ///
380 /// # Returns
381 /// The peak absolute value
382 #[inline(always)]
383 pub fn calculate_peak<T>(slice: &[T]) -> f64
384 where
385 T: Transcendental + PartialOrd,
386 {
387 slice.iter().map(|&x| x.to_f64().abs()).fold(0.0, f64::max)
388 }
389}
390
391// ============================================================================
392// Prelude
393// ============================================================================
394
395/// Prelude for convenient imports
396///
397/// Import this module to get all the common buffer types and traits:
398/// ```
399/// use rill_core::buffer::prelude::*;
400/// ```
401pub mod prelude {
402 pub use super::{
403 // Utility functions
404 utils,
405
406 // AtomicCell
407 AtomicCell,
408 AtomicCellError,
409
410 // Core trait (unified Buffer replaces SignalBuffer + Buffer)
411 Buffer,
412
413 // Error types
414 BufferError,
415 BufferResult,
416
417 // Statistics
418 BufferStats,
419
420 DelayLine,
421 FanInBuffer,
422 FanOutBuffer,
423 // Buffer types
424 PipeBuffer,
425 RingBuffer,
426
427 // Constants
428 CACHE_LINE_SIZE,
429 DEFAULT_BUFFER_SIZE,
430 MAX_BUFFER_SIZE,
431 MIN_BUFFER_SIZE,
432 };
433}
434
435// ============================================================================
436// Buffer Error Types
437// ============================================================================
438
439/// Buffer error types
440///
441/// These errors can occur during buffer operations. They are designed to be
442/// `Copy` and `Eq` for efficient handling in real-time contexts.
443#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
444pub enum BufferError {
445 /// Buffer is empty (tried to read when no data available)
446 #[error("Buffer is empty")]
447 Empty,
448
449 /// Buffer is full (tried to write when no space available)
450 #[error("Buffer is full")]
451 Full,
452
453 /// Invalid index access
454 #[error("Invalid index: {0}")]
455 InvalidIndex(usize),
456
457 /// Buffer is disconnected (other end is gone)
458 #[error("Buffer is disconnected")]
459 Disconnected,
460
461 /// Operation would block (for non-blocking operations)
462 #[error("Operation would block")]
463 WouldBlock,
464
465 /// Buffer overflow (data was lost)
466 #[error("Buffer overflow")]
467 Overflow,
468
469 /// Buffer underflow (no data available)
470 #[error("Buffer underflow")]
471 Underflow,
472
473 /// Invalid buffer size
474 #[error("Invalid buffer size: {0}")]
475 InvalidSize(usize),
476}
477
478// ============================================================================
479// Helper Functions
480// ============================================================================
481
482/// Helper function to create arrays without requiring `Copy`
483#[allow(unsafe_code)]
484pub fn array_from_fn<T, const N: usize>(mut f: impl FnMut(usize) -> T) -> [T; N] {
485 use core::mem::MaybeUninit;
486
487 let mut array: [MaybeUninit<T>; N] = unsafe { MaybeUninit::uninit().assume_init() };
488
489 for (i, item) in array.iter_mut().enumerate() {
490 *item = MaybeUninit::new(f(i));
491 }
492
493 unsafe { core::mem::transmute_copy(&array) }
494}
495
496/// Result type for buffer operations
497pub type BufferResult<T> = Result<T, BufferError>;
498
499// ============================================================================
500// Tests
501// ============================================================================
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506
507 #[test]
508 fn test_atomic_stats() {
509 let stats = AtomicStats::new();
510
511 stats.record_write();
512 stats.record_read();
513 stats.record_underflow();
514 stats.record_overflow();
515 stats.update_peak(500);
516
517 let snapshot = stats.snapshot();
518 assert_eq!(snapshot.writes, 1);
519 assert_eq!(snapshot.reads, 1);
520 assert_eq!(snapshot.underflows, 1);
521 assert_eq!(snapshot.overflows, 1);
522 assert!((snapshot.peak_fill - 0.5).abs() < 0.001);
523 }
524
525 #[test]
526 fn test_buffer_stats() {
527 let stats = BufferStats {
528 writes: 100,
529 reads: 95,
530 underflows: 3,
531 overflows: 2,
532 fill_level: 0.5,
533 peak_fill: 0.8,
534 };
535
536 // success_rate = reads/writes = 95/100 = 0.95
537 assert!((stats.success_rate() - 0.95).abs() < 0.001);
538
539 // error_rate = (underflows + overflows) / (writes + reads) = 5/195 ≈ 0.02564
540 assert!((stats.error_rate() - 0.02564).abs() < 0.001);
541 }
542
543 #[test]
544 fn test_utils() {
545 let mut dst = [0.0; 4];
546 let src = [1.0, 2.0, 3.0];
547
548 let copied = utils::copy_safe(&src, &mut dst);
549 assert_eq!(copied, 3);
550 assert_eq!(dst[0], 1.0);
551 assert_eq!(dst[1], 2.0);
552 assert_eq!(dst[2], 3.0);
553
554 utils::zero_fill(&mut dst[..3]);
555 assert_eq!(dst[0], 0.0);
556 assert_eq!(dst[1], 0.0);
557 assert_eq!(dst[2], 0.0);
558
559 let mut mix_dst = [1.0, 1.0, 1.0];
560 utils::mix_with_gain(&[2.0, 2.0, 2.0], &mut mix_dst, 0.5);
561 assert_eq!(mix_dst[0], 2.0);
562
563 let rms = utils::calculate_rms(&[1.0, -1.0, 1.0, -1.0]);
564 assert!((rms - 1.0).abs() < 1e-6);
565
566 let peak = utils::calculate_peak(&[0.5, -0.8, 0.3, -0.9]);
567 assert!((peak - 0.9).abs() < 1e-6);
568 }
569
570 #[test]
571 fn test_constants() {
572 assert_eq!(CACHE_LINE_SIZE, 64);
573 assert!(MAX_BUFFER_SIZE > MIN_BUFFER_SIZE);
574 assert!(DEFAULT_BUFFER_SIZE >= MIN_BUFFER_SIZE);
575 assert!(DEFAULT_BUFFER_SIZE <= MAX_BUFFER_SIZE);
576 }
577
578 #[test]
579 fn test_buffer_error_display() {
580 assert_eq!(format!("{}", BufferError::Empty), "Buffer is empty");
581 assert_eq!(format!("{}", BufferError::Full), "Buffer is full");
582 assert_eq!(
583 format!("{}", BufferError::InvalidIndex(5)),
584 "Invalid index: 5"
585 );
586 }
587
588 #[test]
589 fn test_atomic_cell_basic() {
590 let cell = AtomicCell::new(42);
591 assert_eq!(cell.load(), 42);
592
593 cell.store(100);
594 assert_eq!(cell.load(), 100);
595 }
596
597 #[test]
598 fn test_atomic_cell_try_new() {
599 let cell = AtomicCell::try_new(42).unwrap();
600 assert_eq!(cell.load(), 42);
601 }
602
603 #[test]
604 fn test_atomic_cell_default() {
605 let cell = AtomicCell::<i32>::default();
606 assert_eq!(cell.load(), 0);
607 }
608}