Skip to main content

rill_core/buffer/
mod.rs

1//! # Audio Buffers with Transcendental Support
2//!
3//! This module provides lock-free, real-time safe buffers for signal processing
4//! with full `Transcendental` support for both `f32` and `f64` sample types.
5//!
6//! ## Buffer Types
7//!
8//! | Buffer | Description | Use Case |
9//! |--------|-------------|----------|
10//! | [`PipeBuffer`] | Single-producer, single-consumer | Point-to-point node connections |
11//! | [`FanOutBuffer`] | One producer, multiple consumers | Broadcast signals to multiple nodes |
12//! | [`FanInBuffer`] | Multiple producers, one consumer | Mix multiple signals |
13//! | [`DelayLine`] | Circular buffer with delay | Effects like echo, reverb |
14//! | [`RingBuffer`] | Multi-producer, multi-consumer | Generic queue for any scenario |
15//!
16//! ## Features
17//!
18//! - **Lock-free** - All buffers use atomic operations, no mutexes
19//! - **Wait-free** - Bounded number of steps per operation
20//! - **Cache-line aligned** - Prevents false sharing between threads
21//! - **Real-time safe** - No allocations, no blocking, no system calls
22//! - **Statistically monitored** - Track performance metrics
23//! - **Type-safe** - Generic over `Transcendental` (f32/f64)
24//! - **Const generics** - Sizes checked at compile time
25
26use core::marker::PhantomData;
27use core::ops::{Deref, DerefMut};
28use core::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
29use std::fmt;
30
31use crate::math::Transcendental;
32
33// ============================================================================
34// Submodules
35// ============================================================================
36
37mod delay;
38mod fan;
39mod pipe;
40mod port_buffer;
41mod ring;
42mod storage;
43
44// ============================================================================
45// Re-exports
46// ============================================================================
47
48pub use delay::DelayLine;
49pub use fan::{FanInBuffer, FanOutBuffer};
50pub use pipe::PipeBuffer;
51pub use port_buffer::Buffer;
52pub use ring::RingBuffer;
53pub use storage::{AtomicCell, AtomicCellError};
54
55// ============================================================================
56// Constants
57// ============================================================================
58
59/// Cache line size for alignment (64 bytes on x86_64)
60///
61/// This is the typical size of a CPU cache line. Aligning buffers to this
62/// boundary prevents false sharing between threads running on different cores.
63pub const CACHE_LINE_SIZE: usize = 64;
64
65/// Default buffer size for most use cases
66pub const DEFAULT_BUFFER_SIZE: usize = 1024;
67
68/// Maximum buffer size (2^16 = 65536 samples)
69pub const MAX_BUFFER_SIZE: usize = 65536;
70
71/// Minimum buffer size (must be at least 16 for most algorithms)
72pub const MIN_BUFFER_SIZE: usize = 16;
73
74// ============================================================================
75// Atomic Statistics
76// ============================================================================
77
78/// Atomic statistics for safe concurrent access
79///
80/// This structure provides lock-free atomic counters for buffer statistics.
81/// It can be safely shared between threads without mutexes.
82///
83/// # Memory Layout
84/// The structure is cache-line aligned to prevent false sharing.
85///
86/// # Thread Safety
87/// All operations are atomic and use relaxed ordering where appropriate.
88#[repr(align(64))]
89pub struct AtomicStats {
90    /// Total number of successful writes
91    writes: AtomicU64,
92
93    /// Total number of successful reads
94    reads: AtomicU64,
95
96    /// Number of underflow events (read when empty)
97    underflows: AtomicU64,
98
99    /// Number of overflow events (write when full)
100    overflows: AtomicU64,
101
102    /// Peak fill level (0-1000 representing 0.0-1.0)
103    /// Stored as fixed-point for atomic operations
104    peak_fill: AtomicUsize,
105}
106
107impl AtomicStats {
108    /// Create new atomic statistics with all counters set to zero
109    pub const fn new() -> Self {
110        Self {
111            writes: AtomicU64::new(0),
112            reads: AtomicU64::new(0),
113            underflows: AtomicU64::new(0),
114            overflows: AtomicU64::new(0),
115            peak_fill: AtomicUsize::new(0),
116        }
117    }
118
119    /// Record a successful write operation
120    #[inline(always)]
121    pub fn record_write(&self) {
122        self.writes.fetch_add(1, Ordering::Relaxed);
123    }
124
125    /// Record a successful read operation
126    #[inline(always)]
127    pub fn record_read(&self) {
128        self.reads.fetch_add(1, Ordering::Relaxed);
129    }
130
131    /// Record an underflow event (read when empty)
132    #[inline(always)]
133    pub fn record_underflow(&self) {
134        self.underflows.fetch_add(1, Ordering::Relaxed);
135    }
136
137    /// Record an overflow event (write when full)
138    #[inline(always)]
139    pub fn record_overflow(&self) {
140        self.overflows.fetch_add(1, Ordering::Relaxed);
141    }
142
143    /// Update peak fill level (0-1000 representing 0.0-1.0)
144    ///
145    /// # Arguments
146    /// * `current_fill` - Current fill level (0-1000)
147    ///
148    /// This uses a compare-exchange loop to atomically update the peak.
149    #[inline(always)]
150    pub fn update_peak(&self, current_fill: usize) {
151        let mut peak = self.peak_fill.load(Ordering::Relaxed);
152        while current_fill > peak {
153            match self.peak_fill.compare_exchange_weak(
154                peak,
155                current_fill,
156                Ordering::Relaxed,
157                Ordering::Relaxed,
158            ) {
159                Ok(_) => break,
160                Err(new_peak) => peak = new_peak,
161            }
162        }
163    }
164
165    /// Get a consistent snapshot of current statistics
166    ///
167    /// # Returns
168    /// A `BufferStats` struct with a snapshot of all counters.
169    /// Note that the snapshot may not be perfectly consistent due to
170    /// concurrent updates, but it's good enough for monitoring.
171    pub fn snapshot(&self) -> BufferStats {
172        BufferStats {
173            writes: self.writes.load(Ordering::Relaxed),
174            reads: self.reads.load(Ordering::Relaxed),
175            underflows: self.underflows.load(Ordering::Relaxed),
176            overflows: self.overflows.load(Ordering::Relaxed),
177            fill_level: 0.0, // To be filled by caller with current fill level
178            peak_fill: self.peak_fill.load(Ordering::Relaxed) as f32 / 1000.0,
179        }
180    }
181
182    /// Reset all statistics to zero
183    pub fn reset(&self) {
184        self.writes.store(0, Ordering::Relaxed);
185        self.reads.store(0, Ordering::Relaxed);
186        self.underflows.store(0, Ordering::Relaxed);
187        self.overflows.store(0, Ordering::Relaxed);
188        self.peak_fill.store(0, Ordering::Relaxed);
189    }
190}
191
192impl Default for AtomicStats {
193    fn default() -> Self {
194        Self::new()
195    }
196}
197
198impl fmt::Debug for AtomicStats {
199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200        f.debug_struct("AtomicStats")
201            .field("writes", &self.writes.load(Ordering::Relaxed))
202            .field("reads", &self.reads.load(Ordering::Relaxed))
203            .field("underflows", &self.underflows.load(Ordering::Relaxed))
204            .field("overflows", &self.overflows.load(Ordering::Relaxed))
205            .field(
206                "peak_fill",
207                &(self.peak_fill.load(Ordering::Relaxed) as f32 / 1000.0),
208            )
209            .finish()
210    }
211}
212
213// ============================================================================
214// Buffer Statistics
215// ============================================================================
216
217/// Buffer statistics snapshot for monitoring and debugging
218///
219/// This struct provides a read-only snapshot of buffer performance metrics.
220/// It's typically obtained via `SignalBuffer::stats()`.
221#[derive(Debug, Default, Clone, Copy)]
222pub struct BufferStats {
223    /// Total number of successful write operations
224    pub writes: u64,
225
226    /// Total number of successful read operations
227    pub reads: u64,
228
229    /// Number of underflow events (read when empty)
230    pub underflows: u64,
231
232    /// Number of overflow events (write when full)
233    pub overflows: u64,
234
235    /// Current fill level (0.0 to 1.0)
236    pub fill_level: f32,
237
238    /// Peak fill level since last reset (0.0 to 1.0)
239    pub peak_fill: f32,
240}
241
242impl BufferStats {
243    /// Create a new zeroed statistics snapshot
244    pub fn new() -> Self {
245        Self::default()
246    }
247
248    /// Calculate the success rate (reads / writes)
249    ///
250    /// Returns 1.0 if no writes, otherwise reads/writes.
251    pub fn success_rate(&self) -> f32 {
252        if self.writes == 0 {
253            1.0
254        } else {
255            self.reads as f32 / self.writes as f32
256        }
257    }
258
259    /// Calculate the error rate (underflows + overflows) / operations
260    pub fn error_rate(&self) -> f32 {
261        let total = self.writes + self.reads;
262        if total == 0 {
263            0.0
264        } else {
265            (self.underflows + self.overflows) as f32 / total as f32
266        }
267    }
268}
269
270// ============================================================================
271// SignalBuffer Trait
272// ============================================================================
273
274/// Common trait for all signal buffers
275///
276/// This trait defines the standard interface that all buffer types implement.
277/// It provides methods for querying capacity, current length, and statistics.
278///
279/// # Type Parameters
280/// - `T`: The sample type (must implement `Transcendental`)
281pub trait SignalBuffer<T: Transcendental> {
282    /// Get the total capacity of the buffer in samples
283    ///
284    /// For block-based buffers, this is the number of samples per block.
285    /// For ring buffers, this is the total number of samples that can be stored.
286    fn capacity(&self) -> usize;
287
288    /// Get the current number of items in the buffer
289    ///
290    /// For `PipeBuffer` and `FanOutBuffer`, this is either 0 or 1.
291    /// For `RingBuffer`, this is the number of samples available.
292    /// For `DelayLine`, this is always the maximum delay.
293    fn len(&self) -> usize;
294
295    /// Check if the buffer is empty
296    fn is_empty(&self) -> bool {
297        self.len() == 0
298    }
299
300    /// Check if the buffer is full
301    fn is_full(&self) -> bool {
302        self.len() == self.capacity()
303    }
304
305    /// Clear all items from the buffer
306    ///
307    /// After calling this, the buffer should be empty.
308    /// Note that this may not actually zero the memory for performance reasons.
309    fn clear(&mut self);
310
311    /// Get a snapshot of current buffer statistics
312    fn stats(&self) -> BufferStats;
313
314    /// Reset all statistics to zero
315    ///
316    /// This does not clear the buffer contents, only the performance counters.
317    fn reset_stats(&mut self);
318}
319
320// ============================================================================
321// Aligned Storage
322// ============================================================================
323
324/// Cache-line aligned storage for lock-free buffers
325///
326/// This type provides aligned storage that can be safely shared between threads.
327/// It is not `Copy` or `Clone` by design - use references or pointers.
328///
329/// # Type Parameters
330/// - `T`: The sample type (must implement `Transcendental`)
331/// - `N`: The number of elements
332///
333/// # Safety
334/// This type uses `UnsafeCell` for interior mutability and `MaybeUninit`
335/// for uninitialized data. Users must ensure proper initialization before reading.
336
337// ============================================================================
338// Buffer Read/Write Guards
339// ============================================================================
340
341/// Read guard for buffer access
342///
343/// Provides RAII-style access to buffer data for reading.
344/// The guard releases any locks when dropped.
345
346#[allow(dead_code)]
347pub struct ReadGuard<'a, T, B>
348where
349    T: Transcendental,
350    B: SignalBuffer<T>,
351{
352    buffer: &'a B,
353    data: &'a [T],
354    _marker: PhantomData<B>,
355}
356
357impl<'a, T, B> Deref for ReadGuard<'a, T, B>
358where
359    T: Transcendental,
360    B: SignalBuffer<T>,
361{
362    type Target = [T];
363
364    fn deref(&self) -> &Self::Target {
365        self.data
366    }
367}
368
369/// Write guard for buffer access
370///
371/// Provides RAII-style access to buffer data for writing.
372/// The guard releases any locks and commits changes when dropped.
373#[allow(dead_code)]
374pub struct WriteGuard<'a, T, B>
375where
376    T: Transcendental,
377    B: SignalBuffer<T>,
378{
379    buffer: &'a mut B,
380    data: &'a mut [T],
381    _marker: PhantomData<B>,
382}
383
384impl<'a, T, B> Deref for WriteGuard<'a, T, B>
385where
386    T: Transcendental,
387    B: SignalBuffer<T>,
388{
389    type Target = [T];
390
391    fn deref(&self) -> &Self::Target {
392        self.data
393    }
394}
395
396impl<'a, T, B> DerefMut for WriteGuard<'a, T, B>
397where
398    T: Transcendental,
399    B: SignalBuffer<T>,
400{
401    fn deref_mut(&mut self) -> &mut Self::Target {
402        self.data
403    }
404}
405
406// ============================================================================
407// Utility Functions
408// ============================================================================
409
410/// Utility functions for common buffer operations
411pub mod utils {
412    use super::*;
413
414    /// Copy data from one slice to another with bounds checking
415    ///
416    /// # Arguments
417    /// * `src` - Source slice
418    /// * `dst` - Destination slice
419    ///
420    /// # Returns
421    /// The number of elements copied
422    #[inline(always)]
423    pub fn copy_safe<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
424        let len = src.len().min(dst.len());
425        dst[..len].copy_from_slice(&src[..len]);
426        len
427    }
428
429    /// Fill slice with zeroes
430    ///
431    /// # Arguments
432    /// * `slice` - The slice to fill
433    #[inline(always)]
434    pub fn zero_fill<T: Default + Copy>(slice: &mut [T]) {
435        for item in slice.iter_mut() {
436            *item = T::default();
437        }
438    }
439
440    /// Mix two slices with gain
441    ///
442    /// # Arguments
443    /// * `src` - Source slice to mix in
444    /// * `dst` - Destination slice (will be modified)
445    /// * `gain` - Gain to apply to source
446    #[inline(always)]
447    pub fn mix_with_gain<T>(src: &[T], dst: &mut [T], gain: T)
448    where
449        T: Transcendental + core::ops::Mul<Output = T> + core::ops::Add<Output = T>,
450    {
451        let len = src.len().min(dst.len());
452        for i in 0..len {
453            dst[i] = dst[i] + src[i] * gain;
454        }
455    }
456
457    /// Apply gain to slice
458    ///
459    /// # Arguments
460    /// * `slice` - The slice to modify
461    /// * `gain` - Gain to apply
462    #[inline(always)]
463    pub fn apply_gain<T>(slice: &mut [T], gain: T)
464    where
465        T: Transcendental + core::ops::Mul<Output = T>,
466    {
467        for item in slice.iter_mut() {
468            *item = *item * gain;
469        }
470    }
471
472    /// Calculate RMS of slice
473    ///
474    /// # Arguments
475    /// * `slice` - The slice to analyze
476    ///
477    /// # Returns
478    /// The RMS value
479    #[inline(always)]
480    pub fn calculate_rms<T>(slice: &[T]) -> f64
481    where
482        T: Transcendental + core::ops::Mul<Output = T> + core::iter::Sum,
483    {
484        let sum_squares: T = slice.iter().map(|&x| x * x).sum();
485        let sum_f64: f64 = sum_squares.to_f64();
486        (sum_f64 / slice.len() as f64).sqrt()
487    }
488
489    /// Calculate peak of slice
490    ///
491    /// # Arguments
492    /// * `slice` - The slice to analyze
493    ///
494    /// # Returns
495    /// The peak absolute value
496    #[inline(always)]
497    pub fn calculate_peak<T>(slice: &[T]) -> f64
498    where
499        T: Transcendental + PartialOrd,
500    {
501        slice.iter().map(|&x| x.to_f64().abs()).fold(0.0, f64::max)
502    }
503}
504
505// ============================================================================
506// Prelude
507// ============================================================================
508
509/// Prelude for convenient imports
510///
511/// Import this module to get all the common buffer types and traits:
512/// ```
513/// use rill_core::buffer::prelude::*;
514/// ```
515pub mod prelude {
516    pub use super::{
517        // Utility functions
518        utils,
519
520        // AtomicCell
521        AtomicCell,
522        AtomicCellError,
523
524        // Core trait
525        SignalBuffer,
526
527        // Error types
528        BufferError,
529        BufferResult,
530
531        // Statistics
532        BufferStats,
533
534        DelayLine,
535        FanInBuffer,
536        FanOutBuffer,
537        // Buffer types
538        PipeBuffer,
539        RingBuffer,
540
541        // Constants
542        CACHE_LINE_SIZE,
543        DEFAULT_BUFFER_SIZE,
544        MAX_BUFFER_SIZE,
545        MIN_BUFFER_SIZE,
546    };
547}
548
549// ============================================================================
550// Buffer Error Types
551// ============================================================================
552
553/// Buffer error types
554///
555/// These errors can occur during buffer operations. They are designed to be
556/// `Copy` and `Eq` for efficient handling in real-time contexts.
557#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
558pub enum BufferError {
559    /// Buffer is empty (tried to read when no data available)
560    #[error("Buffer is empty")]
561    Empty,
562
563    /// Buffer is full (tried to write when no space available)
564    #[error("Buffer is full")]
565    Full,
566
567    /// Invalid index access
568    #[error("Invalid index: {0}")]
569    InvalidIndex(usize),
570
571    /// Buffer is disconnected (other end is gone)
572    #[error("Buffer is disconnected")]
573    Disconnected,
574
575    /// Operation would block (for non-blocking operations)
576    #[error("Operation would block")]
577    WouldBlock,
578
579    /// Buffer overflow (data was lost)
580    #[error("Buffer overflow")]
581    Overflow,
582
583    /// Buffer underflow (no data available)
584    #[error("Buffer underflow")]
585    Underflow,
586
587    /// Invalid buffer size
588    #[error("Invalid buffer size: {0}")]
589    InvalidSize(usize),
590}
591
592// ============================================================================
593// Helper Functions
594// ============================================================================
595
596/// Helper function to create arrays without requiring `Copy`
597#[allow(unsafe_code)]
598pub fn array_from_fn<T, const N: usize>(mut f: impl FnMut(usize) -> T) -> [T; N] {
599    use core::mem::MaybeUninit;
600
601    let mut array: [MaybeUninit<T>; N] = unsafe { MaybeUninit::uninit().assume_init() };
602
603    for (i, item) in array.iter_mut().enumerate() {
604        *item = MaybeUninit::new(f(i));
605    }
606
607    unsafe { core::mem::transmute_copy(&array) }
608}
609
610/// Result type for buffer operations
611pub type BufferResult<T> = Result<T, BufferError>;
612
613// ============================================================================
614// Tests
615// ============================================================================
616
617#[cfg(test)]
618mod tests {
619    use super::*;
620
621    #[test]
622    fn test_atomic_stats() {
623        let stats = AtomicStats::new();
624
625        stats.record_write();
626        stats.record_read();
627        stats.record_underflow();
628        stats.record_overflow();
629        stats.update_peak(500);
630
631        let snapshot = stats.snapshot();
632        assert_eq!(snapshot.writes, 1);
633        assert_eq!(snapshot.reads, 1);
634        assert_eq!(snapshot.underflows, 1);
635        assert_eq!(snapshot.overflows, 1);
636        assert!((snapshot.peak_fill - 0.5).abs() < 0.001);
637    }
638
639    #[test]
640    fn test_buffer_stats() {
641        let stats = BufferStats {
642            writes: 100,
643            reads: 95,
644            underflows: 3,
645            overflows: 2,
646            fill_level: 0.5,
647            peak_fill: 0.8,
648        };
649
650        // success_rate = reads/writes = 95/100 = 0.95
651        assert!((stats.success_rate() - 0.95).abs() < 0.001);
652
653        // error_rate = (underflows + overflows) / (writes + reads) = 5/195 ≈ 0.02564
654        assert!((stats.error_rate() - 0.02564).abs() < 0.001);
655    }
656
657    #[test]
658    fn test_utils() {
659        let mut dst = [0.0; 4];
660        let src = [1.0, 2.0, 3.0];
661
662        let copied = utils::copy_safe(&src, &mut dst);
663        assert_eq!(copied, 3);
664        assert_eq!(dst[0], 1.0);
665        assert_eq!(dst[1], 2.0);
666        assert_eq!(dst[2], 3.0);
667
668        utils::zero_fill(&mut dst[..3]);
669        assert_eq!(dst[0], 0.0);
670        assert_eq!(dst[1], 0.0);
671        assert_eq!(dst[2], 0.0);
672
673        let mut mix_dst = [1.0, 1.0, 1.0];
674        utils::mix_with_gain(&[2.0, 2.0, 2.0], &mut mix_dst, 0.5);
675        assert_eq!(mix_dst[0], 2.0);
676
677        let rms = utils::calculate_rms(&[1.0, -1.0, 1.0, -1.0]);
678        assert!((rms - 1.0).abs() < 1e-6);
679
680        let peak = utils::calculate_peak(&[0.5, -0.8, 0.3, -0.9]);
681        assert!((peak - 0.9).abs() < 1e-6);
682    }
683
684    #[test]
685    fn test_constants() {
686        assert_eq!(CACHE_LINE_SIZE, 64);
687        assert!(MAX_BUFFER_SIZE > MIN_BUFFER_SIZE);
688        assert!(DEFAULT_BUFFER_SIZE >= MIN_BUFFER_SIZE);
689        assert!(DEFAULT_BUFFER_SIZE <= MAX_BUFFER_SIZE);
690    }
691
692    #[test]
693    fn test_buffer_error_display() {
694        assert_eq!(format!("{}", BufferError::Empty), "Buffer is empty");
695        assert_eq!(format!("{}", BufferError::Full), "Buffer is full");
696        assert_eq!(
697            format!("{}", BufferError::InvalidIndex(5)),
698            "Invalid index: 5"
699        );
700    }
701
702    #[test]
703    fn test_atomic_cell_basic() {
704        let cell = AtomicCell::new(42);
705        assert_eq!(cell.load(), 42);
706
707        cell.store(100);
708        assert_eq!(cell.load(), 100);
709    }
710
711    #[test]
712    fn test_atomic_cell_try_new() {
713        let cell = AtomicCell::try_new(42).unwrap();
714        assert_eq!(cell.load(), 42);
715    }
716
717    #[test]
718    fn test_atomic_cell_default() {
719        let cell = AtomicCell::<i32>::default();
720        assert_eq!(cell.load(), 0);
721    }
722}