Skip to main content

rill_core/buffer/
mod.rs

1//! # Signal Buffers for single-threaded signal processing
2//!
3//! This module provides real-time safe buffers used by graph nodes inside
4//! the signal thread. All buffers are **single-threaded** — they contain no
5//! atomics or locks. Cross-thread communication goes through
6//! [`rill_core::queues`](crate::queues).
7//!
8//! ## Buffer Types
9//!
10//! | Buffer | Description | Use Case |
11//! |--------|-------------|----------|
12//! | [`PipeBuffer`] | Single-producer, single-consumer | Point-to-point node connections |
13//! | [`FanOutBuffer`] | One producer, multiple consumers | Broadcast signals to multiple nodes |
14//! | [`FanInBuffer`] | Multiple producers, one consumer | Mix multiple signals |
15//! | [`DelayLine`] | Circular buffer with delay | Effects like echo, reverb |
16//! | [`RingBuffer`] | Multi-producer, multi-consumer | Generic queue for any scenario |
17//! | [`TapeLoop`](crate::buffer::TapeLoop) | Heap-allocated circular buffer | Tape delay with large capacity |
18//!
19//! ## Features
20//!
21//! - **Real-time safe** - No allocations, no blocking, no system calls
22//! - **Single-threaded** - No atomics, no locks, minimal overhead
23//! - **Cache-line aligned** - Prevents false sharing
24//! - **Statistically monitored** - Track performance metrics
25//! - **Type-safe** - Generic over `Transcendental` (f32/f64)
26
27use core::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
28use std::fmt;
29
30use crate::math::Transcendental;
31
32// ============================================================================
33// Submodules
34// ============================================================================
35
36mod buffer_trait;
37mod delay;
38mod fan;
39mod pipe;
40mod registry;
41mod ring;
42mod storage;
43mod tape;
44
45// ============================================================================
46// Re-exports
47// ============================================================================
48
49pub use buffer_trait::{Buffer, FixedBuffer, HeapBuffer};
50pub use delay::DelayLine;
51pub use fan::{FanInBuffer, FanOutBuffer};
52pub use pipe::PipeBuffer;
53pub use registry::BufferRegistry;
54pub use ring::RingBuffer;
55pub use storage::{AtomicCell, AtomicCellError};
56pub use tape::TapeLoop;
57
58// ============================================================================
59// Constants
60// ============================================================================
61
62/// Cache line size for alignment (64 bytes on x86_64)
63///
64/// This is the typical size of a CPU cache line. Aligning buffers to this
65/// boundary prevents false sharing between threads running on different cores.
66pub const CACHE_LINE_SIZE: usize = 64;
67
68/// Default buffer size for most use cases
69pub const DEFAULT_BUFFER_SIZE: usize = 1024;
70
71/// Maximum buffer size (2^16 = 65536 samples)
72pub const MAX_BUFFER_SIZE: usize = 65536;
73
74/// Minimum buffer size (must be at least 16 for most algorithms)
75pub const MIN_BUFFER_SIZE: usize = 16;
76
77// ============================================================================
78// Atomic Statistics
79// ============================================================================
80
81/// Atomic statistics for safe concurrent access
82///
83/// This structure provides lock-free atomic counters for buffer statistics.
84/// It can be safely shared between threads without mutexes.
85///
86/// # Memory Layout
87/// The structure is cache-line aligned to prevent false sharing.
88///
89/// # Thread Safety
90/// All operations are atomic and use relaxed ordering where appropriate.
91#[repr(align(64))]
92pub struct AtomicStats {
93    /// Total number of successful writes
94    writes: AtomicU64,
95
96    /// Total number of successful reads
97    reads: AtomicU64,
98
99    /// Number of underflow events (read when empty)
100    underflows: AtomicU64,
101
102    /// Number of overflow events (write when full)
103    overflows: AtomicU64,
104
105    /// Peak fill level (0-1000 representing 0.0-1.0)
106    /// Stored as fixed-point for atomic operations
107    peak_fill: AtomicUsize,
108}
109
110impl AtomicStats {
111    /// Create new atomic statistics with all counters set to zero
112    pub const fn new() -> Self {
113        Self {
114            writes: AtomicU64::new(0),
115            reads: AtomicU64::new(0),
116            underflows: AtomicU64::new(0),
117            overflows: AtomicU64::new(0),
118            peak_fill: AtomicUsize::new(0),
119        }
120    }
121
122    /// Record a successful write operation
123    #[inline(always)]
124    pub fn record_write(&self) {
125        self.writes.fetch_add(1, Ordering::Relaxed);
126    }
127
128    /// Record a successful read operation
129    #[inline(always)]
130    pub fn record_read(&self) {
131        self.reads.fetch_add(1, Ordering::Relaxed);
132    }
133
134    /// Record an underflow event (read when empty)
135    #[inline(always)]
136    pub fn record_underflow(&self) {
137        self.underflows.fetch_add(1, Ordering::Relaxed);
138    }
139
140    /// Record an overflow event (write when full)
141    #[inline(always)]
142    pub fn record_overflow(&self) {
143        self.overflows.fetch_add(1, Ordering::Relaxed);
144    }
145
146    /// Update peak fill level (0-1000 representing 0.0-1.0)
147    ///
148    /// # Arguments
149    /// * `current_fill` - Current fill level (0-1000)
150    ///
151    /// This uses a compare-exchange loop to atomically update the peak.
152    #[inline(always)]
153    pub fn update_peak(&self, current_fill: usize) {
154        let mut peak = self.peak_fill.load(Ordering::Relaxed);
155        while current_fill > peak {
156            match self.peak_fill.compare_exchange_weak(
157                peak,
158                current_fill,
159                Ordering::Relaxed,
160                Ordering::Relaxed,
161            ) {
162                Ok(_) => break,
163                Err(new_peak) => peak = new_peak,
164            }
165        }
166    }
167
168    /// Get a consistent snapshot of current statistics
169    ///
170    /// # Returns
171    /// A `BufferStats` struct with a snapshot of all counters.
172    /// Note that the snapshot may not be perfectly consistent due to
173    /// concurrent updates, but it's good enough for monitoring.
174    pub fn snapshot(&self) -> BufferStats {
175        BufferStats {
176            writes: self.writes.load(Ordering::Relaxed),
177            reads: self.reads.load(Ordering::Relaxed),
178            underflows: self.underflows.load(Ordering::Relaxed),
179            overflows: self.overflows.load(Ordering::Relaxed),
180            fill_level: 0.0, // To be filled by caller with current fill level
181            peak_fill: self.peak_fill.load(Ordering::Relaxed) as f32 / 1000.0,
182        }
183    }
184
185    /// Reset all statistics to zero
186    pub fn reset(&self) {
187        self.writes.store(0, Ordering::Relaxed);
188        self.reads.store(0, Ordering::Relaxed);
189        self.underflows.store(0, Ordering::Relaxed);
190        self.overflows.store(0, Ordering::Relaxed);
191        self.peak_fill.store(0, Ordering::Relaxed);
192    }
193}
194
195impl Default for AtomicStats {
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201impl fmt::Debug for AtomicStats {
202    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203        f.debug_struct("AtomicStats")
204            .field("writes", &self.writes.load(Ordering::Relaxed))
205            .field("reads", &self.reads.load(Ordering::Relaxed))
206            .field("underflows", &self.underflows.load(Ordering::Relaxed))
207            .field("overflows", &self.overflows.load(Ordering::Relaxed))
208            .field(
209                "peak_fill",
210                &(self.peak_fill.load(Ordering::Relaxed) as f32 / 1000.0),
211            )
212            .finish()
213    }
214}
215
216// ============================================================================
217// Buffer Statistics
218// ============================================================================
219
220/// Buffer statistics snapshot for monitoring and debugging
221///
222/// This struct provides a read-only snapshot of buffer performance metrics.
223/// It's typically obtained via `SignalBuffer::stats()`.
224#[derive(Debug, Default, Clone, Copy)]
225pub struct BufferStats {
226    /// Total number of successful write operations
227    pub writes: u64,
228
229    /// Total number of successful read operations
230    pub reads: u64,
231
232    /// Number of underflow events (read when empty)
233    pub underflows: u64,
234
235    /// Number of overflow events (write when full)
236    pub overflows: u64,
237
238    /// Current fill level (0.0 to 1.0)
239    pub fill_level: f32,
240
241    /// Peak fill level since last reset (0.0 to 1.0)
242    pub peak_fill: f32,
243}
244
245impl BufferStats {
246    /// Create a new zeroed statistics snapshot
247    pub fn new() -> Self {
248        Self::default()
249    }
250
251    /// Calculate the success rate (reads / writes)
252    ///
253    /// Returns 1.0 if no writes, otherwise reads/writes.
254    pub fn success_rate(&self) -> f32 {
255        if self.writes == 0 {
256            1.0
257        } else {
258            self.reads as f32 / self.writes as f32
259        }
260    }
261
262    /// Calculate the error rate (underflows + overflows) / operations
263    pub fn error_rate(&self) -> f32 {
264        let total = self.writes + self.reads;
265        if total == 0 {
266            0.0
267        } else {
268            (self.underflows + self.overflows) as f32 / total as f32
269        }
270    }
271}
272
273// ============================================================================
274// SignalBuffer Trait
275// ============================================================================
276
277/// Common trait for all signal buffers
278///
279/// This trait defines the standard interface that all buffer types implement.
280/// It provides methods for querying capacity, current length, and statistics.
281///
282/// # Type Parameters
283/// - `T`: The sample type (must implement `Transcendental`)
284pub trait SignalBuffer<T: Transcendental> {
285    /// Get the total capacity of the buffer in samples
286    ///
287    /// For block-based buffers, this is the number of samples per block.
288    /// For ring buffers, this is the total number of samples that can be stored.
289    fn capacity(&self) -> usize;
290
291    /// Get the current number of items in the buffer
292    ///
293    /// For `PipeBuffer` and `FanOutBuffer`, this is either 0 or 1.
294    /// For `RingBuffer`, this is the number of samples available.
295    /// For `DelayLine`, this is always the maximum delay.
296    fn len(&self) -> usize;
297
298    /// Check if the buffer is empty
299    fn is_empty(&self) -> bool {
300        self.len() == 0
301    }
302
303    /// Check if the buffer is full
304    fn is_full(&self) -> bool {
305        self.len() == self.capacity()
306    }
307
308    /// Clear all items from the buffer
309    ///
310    /// After calling this, the buffer should be empty.
311    /// Note that this may not actually zero the memory for performance reasons.
312    fn clear(&mut self);
313
314    /// Get a snapshot of current buffer statistics
315    fn stats(&self) -> BufferStats;
316
317    /// Reset all statistics to zero
318    ///
319    /// This does not clear the buffer contents, only the performance counters.
320    fn reset_stats(&mut self);
321}
322
323// ============================================================================
324// Aligned Storage
325// ============================================================================
326
327// Cache-line aligned storage for lock-free buffers
328//
329// This type provides aligned storage that can be safely shared between threads.
330// It is not `Copy` or `Clone` by design - use references or pointers.
331//
332// # Type Parameters
333// - `T`: The sample type (must implement `Transcendental`)
334// - `N`: The number of elements
335// # Safety
336// This type uses `UnsafeCell` for interior mutability and `MaybeUninit`
337// for uninitialized data. Users must ensure proper initialization before reading.
338
339// ============================================================================
340// Utility Functions
341// ============================================================================
342
343/// Utility functions for common buffer operations
344pub mod utils {
345    use super::*;
346
347    /// Copy data from one slice to another with bounds checking
348    ///
349    /// # Arguments
350    /// * `src` - Source slice
351    /// * `dst` - Destination slice
352    ///
353    /// # Returns
354    /// The number of elements copied
355    #[inline(always)]
356    pub fn copy_safe<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
357        let len = src.len().min(dst.len());
358        dst[..len].copy_from_slice(&src[..len]);
359        len
360    }
361
362    /// Fill slice with zeroes
363    ///
364    /// # Arguments
365    /// * `slice` - The slice to fill
366    #[inline(always)]
367    pub fn zero_fill<T: Default + Copy>(slice: &mut [T]) {
368        for item in slice.iter_mut() {
369            *item = T::default();
370        }
371    }
372
373    /// Mix two slices with gain
374    ///
375    /// # Arguments
376    /// * `src` - Source slice to mix in
377    /// * `dst` - Destination slice (will be modified)
378    /// * `gain` - Gain to apply to source
379    #[inline(always)]
380    pub fn mix_with_gain<T>(src: &[T], dst: &mut [T], gain: T)
381    where
382        T: Transcendental + core::ops::Mul<Output = T> + core::ops::Add<Output = T>,
383    {
384        let len = src.len().min(dst.len());
385        for i in 0..len {
386            dst[i] += src[i] * gain;
387        }
388    }
389
390    /// Apply gain to slice
391    ///
392    /// # Arguments
393    /// * `slice` - The slice to modify
394    /// * `gain` - Gain to apply
395    #[inline(always)]
396    pub fn apply_gain<T>(slice: &mut [T], gain: T)
397    where
398        T: Transcendental + core::ops::Mul<Output = T>,
399    {
400        for item in slice.iter_mut() {
401            *item *= gain;
402        }
403    }
404
405    /// Calculate RMS of slice
406    ///
407    /// # Arguments
408    /// * `slice` - The slice to analyze
409    ///
410    /// # Returns
411    /// The RMS value
412    #[inline(always)]
413    pub fn calculate_rms<T>(slice: &[T]) -> f64
414    where
415        T: Transcendental + core::ops::Mul<Output = T> + core::iter::Sum,
416    {
417        let sum_squares: T = slice.iter().map(|&x| x * x).sum();
418        let sum_f64: f64 = sum_squares.to_f64();
419        (sum_f64 / slice.len() as f64).sqrt()
420    }
421
422    /// Calculate peak of slice
423    ///
424    /// # Arguments
425    /// * `slice` - The slice to analyze
426    ///
427    /// # Returns
428    /// The peak absolute value
429    #[inline(always)]
430    pub fn calculate_peak<T>(slice: &[T]) -> f64
431    where
432        T: Transcendental + PartialOrd,
433    {
434        slice.iter().map(|&x| x.to_f64().abs()).fold(0.0, f64::max)
435    }
436}
437
438// ============================================================================
439// Prelude
440// ============================================================================
441
442/// Prelude for convenient imports
443///
444/// Import this module to get all the common buffer types and traits:
445/// ```
446/// use rill_core::buffer::prelude::*;
447/// ```
448pub mod prelude {
449    pub use super::{
450        // Utility functions
451        utils,
452
453        // AtomicCell
454        AtomicCell,
455        AtomicCellError,
456
457        // Error types
458        BufferError,
459        BufferResult,
460
461        // Statistics
462        BufferStats,
463
464        DelayLine,
465        FanInBuffer,
466        FanOutBuffer,
467        // Buffer types
468        PipeBuffer,
469        RingBuffer,
470
471        // Core trait
472        SignalBuffer,
473
474        // Constants
475        CACHE_LINE_SIZE,
476        DEFAULT_BUFFER_SIZE,
477        MAX_BUFFER_SIZE,
478        MIN_BUFFER_SIZE,
479    };
480}
481
482// ============================================================================
483// Buffer Error Types
484// ============================================================================
485
486/// Buffer error types
487///
488/// These errors can occur during buffer operations. They are designed to be
489/// `Copy` and `Eq` for efficient handling in real-time contexts.
490#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
491pub enum BufferError {
492    /// Buffer is empty (tried to read when no data available)
493    #[error("Buffer is empty")]
494    Empty,
495
496    /// Buffer is full (tried to write when no space available)
497    #[error("Buffer is full")]
498    Full,
499
500    /// Invalid index access
501    #[error("Invalid index: {0}")]
502    InvalidIndex(usize),
503
504    /// Buffer is disconnected (other end is gone)
505    #[error("Buffer is disconnected")]
506    Disconnected,
507
508    /// Operation would block (for non-blocking operations)
509    #[error("Operation would block")]
510    WouldBlock,
511
512    /// Buffer overflow (data was lost)
513    #[error("Buffer overflow")]
514    Overflow,
515
516    /// Buffer underflow (no data available)
517    #[error("Buffer underflow")]
518    Underflow,
519
520    /// Invalid buffer size
521    #[error("Invalid buffer size: {0}")]
522    InvalidSize(usize),
523}
524
525// ============================================================================
526// Helper Functions
527// ============================================================================
528
529/// Helper function to create arrays without requiring `Copy`
530#[allow(unsafe_code)]
531pub fn array_from_fn<T, const N: usize>(mut f: impl FnMut(usize) -> T) -> [T; N] {
532    use core::mem::MaybeUninit;
533
534    let mut array: [MaybeUninit<T>; N] = unsafe { MaybeUninit::uninit().assume_init() };
535
536    for (i, item) in array.iter_mut().enumerate() {
537        *item = MaybeUninit::new(f(i));
538    }
539
540    unsafe { core::mem::transmute_copy(&array) }
541}
542
543/// Result type for buffer operations
544pub type BufferResult<T> = Result<T, BufferError>;
545
546// ============================================================================
547// Tests
548// ============================================================================
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553
554    #[test]
555    fn test_atomic_stats() {
556        let stats = AtomicStats::new();
557
558        stats.record_write();
559        stats.record_read();
560        stats.record_underflow();
561        stats.record_overflow();
562        stats.update_peak(500);
563
564        let snapshot = stats.snapshot();
565        assert_eq!(snapshot.writes, 1);
566        assert_eq!(snapshot.reads, 1);
567        assert_eq!(snapshot.underflows, 1);
568        assert_eq!(snapshot.overflows, 1);
569        assert!((snapshot.peak_fill - 0.5).abs() < 0.001);
570    }
571
572    #[test]
573    fn test_buffer_stats() {
574        let stats = BufferStats {
575            writes: 100,
576            reads: 95,
577            underflows: 3,
578            overflows: 2,
579            fill_level: 0.5,
580            peak_fill: 0.8,
581        };
582
583        // success_rate = reads/writes = 95/100 = 0.95
584        assert!((stats.success_rate() - 0.95).abs() < 0.001);
585
586        // error_rate = (underflows + overflows) / (writes + reads) = 5/195 ≈ 0.02564
587        assert!((stats.error_rate() - 0.02564).abs() < 0.001);
588    }
589
590    #[test]
591    fn test_utils() {
592        let mut dst = [0.0; 4];
593        let src = [1.0, 2.0, 3.0];
594
595        let copied = utils::copy_safe(&src, &mut dst);
596        assert_eq!(copied, 3);
597        assert_eq!(dst[0], 1.0);
598        assert_eq!(dst[1], 2.0);
599        assert_eq!(dst[2], 3.0);
600
601        utils::zero_fill(&mut dst[..3]);
602        assert_eq!(dst[0], 0.0);
603        assert_eq!(dst[1], 0.0);
604        assert_eq!(dst[2], 0.0);
605
606        let mut mix_dst = [1.0, 1.0, 1.0];
607        utils::mix_with_gain(&[2.0, 2.0, 2.0], &mut mix_dst, 0.5);
608        assert_eq!(mix_dst[0], 2.0);
609
610        let rms = utils::calculate_rms(&[1.0, -1.0, 1.0, -1.0]);
611        assert!((rms - 1.0).abs() < 1e-6);
612
613        let peak = utils::calculate_peak(&[0.5, -0.8, 0.3, -0.9]);
614        assert!((peak - 0.9).abs() < 1e-6);
615    }
616
617    #[test]
618    fn test_constants() {
619        assert_eq!(CACHE_LINE_SIZE, 64);
620        assert!(MAX_BUFFER_SIZE > MIN_BUFFER_SIZE);
621        assert!(DEFAULT_BUFFER_SIZE >= MIN_BUFFER_SIZE);
622        assert!(DEFAULT_BUFFER_SIZE <= MAX_BUFFER_SIZE);
623    }
624
625    #[test]
626    fn test_buffer_error_display() {
627        assert_eq!(format!("{}", BufferError::Empty), "Buffer is empty");
628        assert_eq!(format!("{}", BufferError::Full), "Buffer is full");
629        assert_eq!(
630            format!("{}", BufferError::InvalidIndex(5)),
631            "Invalid index: 5"
632        );
633    }
634
635    #[test]
636    fn test_atomic_cell_basic() {
637        let cell = AtomicCell::new(42);
638        assert_eq!(cell.load(), 42);
639
640        cell.store(100);
641        assert_eq!(cell.load(), 100);
642    }
643
644    #[test]
645    fn test_atomic_cell_try_new() {
646        let cell = AtomicCell::try_new(42).unwrap();
647        assert_eq!(cell.load(), 42);
648    }
649
650    #[test]
651    fn test_atomic_cell_default() {
652        let cell = AtomicCell::<i32>::default();
653        assert_eq!(cell.load(), 0);
654    }
655}