Skip to main content

audio_engine_core/
pipeline.rs

1//! Streaming Audio Pipeline
2//!
3//! Asynchronous audio processing pipeline for streaming decode and resample.
4//! This eliminates the memory spike issue with 192kHz upsampling.
5
6use parking_lot::RwLock;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::thread::{self, JoinHandle};
10// Channel imports unused in current implementation
11
12use crate::config::ResampleQuality;
13use crate::decoder::{DecoderError, StreamingDecoder};
14use crate::processor::StreamingResampler;
15
16/// Error returned when constructing an [`AudioPipeline`].
17#[derive(Debug, thiserror::Error)]
18pub enum PipelineError {
19    /// The underlying decoder could not open the source.
20    #[error("failed to open decoder: {0}")]
21    Decoder(#[from] DecoderError),
22}
23
24/// Ring buffer size in frames (per channel)
25/// ~4MB for stereo f64 at 192kHz ≈ 0.5 seconds buffer
26const RING_BUFFER_FRAMES: usize = 131072;
27
28/// Status of the audio pipeline
29#[derive(Debug, Clone, Copy, PartialEq)]
30pub enum PipelineStatus {
31    /// Pipeline is idle, waiting for data
32    Idle,
33    /// Pipeline is actively buffering/processing
34    Buffering,
35    /// Pipeline has finished processing all data
36    Finished,
37    /// Pipeline encountered an error
38    Error,
39}
40
41/// Streaming audio pipeline that decodes and resamples in background
42pub struct AudioPipeline {
43    // Ring buffer for processed audio data
44    ring_buffer: Arc<RwLock<RingBuffer>>,
45
46    // Control flags
47    is_running: Arc<AtomicBool>,
48    is_finished: Arc<AtomicBool>,
49
50    // Progress tracking
51    buffered_frames: Arc<AtomicU64>,
52    total_frames: Arc<AtomicU64>,
53    current_read_pos: Arc<AtomicU64>,
54
55    // Worker thread handle
56    worker_handle: Option<JoinHandle<()>>,
57
58    // Audio format info
59    channels: usize,
60    sample_rate: u32,
61    original_sample_rate: u32,
62}
63
64/// Simple ring buffer for audio data
65/// Uses monotonic counters (frames_written, frames_consumed) for clean overflow handling.
66pub struct RingBuffer {
67    data: Vec<f64>,
68    capacity_frames: usize,
69    channels: usize,
70    /// Total frames written (monotonically increasing)
71    frames_written: u64,
72    /// Total frames consumed by readers (monotonically increasing)
73    frames_consumed: u64,
74    /// Number of overflow events
75    overflow_count: u64,
76}
77
78impl RingBuffer {
79    pub fn new(capacity_frames: usize, channels: usize) -> Self {
80        Self {
81            data: vec![0.0; capacity_frames * channels],
82            capacity_frames,
83            channels,
84            frames_written: 0,
85            frames_consumed: 0,
86            overflow_count: 0,
87        }
88    }
89
90    /// Write frames to the buffer, returns number of frames written
91    /// If buffer would overflow, drops the oldest data (ring buffer behavior)
92    /// Returns (frames_written, overflow_new_consumed) — overflow_new_consumed is
93    /// the updated frames_consumed value that external read positions must respect.
94    pub fn write(&mut self, samples: &[f64]) -> (usize, Option<u64>) {
95        let frames_to_write = samples.len() / self.channels;
96        let samples_to_write = frames_to_write * self.channels;
97
98        if frames_to_write == 0 {
99            return (0, None);
100        }
101
102        // Check for potential overflow
103        let frames_in_buffer = self.frames_written.saturating_sub(self.frames_consumed);
104        let available_space = self
105            .capacity_frames
106            .saturating_sub(frames_in_buffer as usize);
107
108        let overflow_consumed = if frames_to_write > available_space {
109            // Overflow detected - advance consumer position to make room
110            // This effectively drops the oldest frames
111            let overflow_frames = frames_to_write - available_space;
112            self.frames_consumed = self.frames_consumed.saturating_add(overflow_frames as u64);
113            self.overflow_count = self.overflow_count.saturating_add(1);
114            log::warn!(
115                "RingBuffer overflow: dropping {} frames (total overflows: {})",
116                overflow_frames,
117                self.overflow_count
118            );
119            // FIX for Defect 5: Return new consumed position so external read_pos can be updated
120            Some(self.frames_consumed)
121        } else {
122            None
123        };
124
125        // Write samples using at most two contiguous copies split at the wrap boundary.
126        let frames_to_copy = frames_to_write.min(self.capacity_frames);
127        let source_frame_offset = frames_to_write - frames_to_copy;
128        let source_sample_offset = source_frame_offset * self.channels;
129        let write_frame =
130            ((self.frames_written as usize) + source_frame_offset) % self.capacity_frames;
131        self.copy_frames_from_slice(
132            write_frame,
133            &samples[source_sample_offset..samples_to_write],
134            frames_to_copy,
135        );
136
137        self.frames_written += frames_to_write as u64;
138        (frames_to_write, overflow_consumed)
139    }
140
141    /// Read frames from the buffer at a given position
142    pub fn read(&self, start_frame: u64, output: &mut [f64]) -> usize {
143        let frames_to_read = output.len() / self.channels;
144        let available = self.frames_written.saturating_sub(start_frame) as usize;
145        let actual_frames = frames_to_read.min(available);
146
147        if actual_frames == 0 {
148            return 0;
149        }
150
151        let read_frame = (start_frame as usize) % self.capacity_frames;
152        self.copy_frames_to_slice(
153            read_frame,
154            &mut output[..actual_frames * self.channels],
155            actual_frames,
156        );
157
158        actual_frames
159    }
160
161    fn copy_frames_from_slice(&mut self, start_frame: usize, source: &[f64], frames: usize) {
162        let first_frames = frames.min(self.capacity_frames - start_frame);
163        let first_samples = first_frames * self.channels;
164        let start_sample = start_frame * self.channels;
165
166        self.data[start_sample..start_sample + first_samples]
167            .copy_from_slice(&source[..first_samples]);
168
169        let remaining_frames = frames - first_frames;
170        if remaining_frames > 0 {
171            let remaining_samples = remaining_frames * self.channels;
172            self.data[..remaining_samples]
173                .copy_from_slice(&source[first_samples..first_samples + remaining_samples]);
174        }
175    }
176
177    fn copy_frames_to_slice(&self, start_frame: usize, output: &mut [f64], frames: usize) {
178        let first_frames = frames.min(self.capacity_frames - start_frame);
179        let first_samples = first_frames * self.channels;
180        let start_sample = start_frame * self.channels;
181
182        output[..first_samples]
183            .copy_from_slice(&self.data[start_sample..start_sample + first_samples]);
184
185        let remaining_frames = frames - first_frames;
186        if remaining_frames > 0 {
187            let remaining_samples = remaining_frames * self.channels;
188            output[first_samples..first_samples + remaining_samples]
189                .copy_from_slice(&self.data[..remaining_samples]);
190        }
191    }
192
193    /// Update consumed position (call after reading)
194    pub fn advance_read_pos(&mut self, frames: u64) {
195        self.frames_consumed = self.frames_consumed.saturating_add(frames);
196    }
197
198    /// Get number of frames available for reading from a given position
199    pub fn available_frames(&self, read_pos: u64) -> u64 {
200        self.frames_written.saturating_sub(read_pos)
201    }
202
203    /// Get total frames written
204    pub fn total_written(&self) -> u64 {
205        self.frames_written
206    }
207
208    /// Get overflow count
209    pub fn overflow_count(&self) -> u64 {
210        self.overflow_count
211    }
212}
213
214impl AudioPipeline {
215    /// Create a new pipeline from a file path.
216    ///
217    /// This opens the decoder once to read the source format (sample rate,
218    /// channel count, and frame count) so callers can query the pipeline before
219    /// starting the background worker. Call [`AudioPipeline::start`] to begin
220    /// decoding and resampling.
221    pub fn new(
222        path: &str,
223        target_sample_rate: Option<u32>,
224        _resample_quality: ResampleQuality,
225    ) -> Result<Self, PipelineError> {
226        let decoder = StreamingDecoder::open(path)?;
227
228        let info = decoder.info.clone();
229        let original_sr = info.sample_rate;
230        let channels = info.channels;
231        let total_source_frames = info.total_frames.unwrap_or(0);
232
233        // Determine target sample rate
234        let target_sr = target_sample_rate.unwrap_or(original_sr);
235
236        // Calculate expected total frames after resampling
237        let total_frames = if target_sr != original_sr {
238            ((total_source_frames as f64) * (target_sr as f64) / (original_sr as f64)).ceil() as u64
239        } else {
240            total_source_frames
241        };
242
243        log::info!(
244            "Creating audio pipeline: {}→{} Hz, {} ch, ~{} frames",
245            original_sr,
246            target_sr,
247            channels,
248            total_frames
249        );
250
251        let ring_buffer = Arc::new(RwLock::new(RingBuffer::new(RING_BUFFER_FRAMES, channels)));
252        let is_running = Arc::new(AtomicBool::new(false));
253        let is_finished = Arc::new(AtomicBool::new(false));
254        let buffered_frames = Arc::new(AtomicU64::new(0));
255        let total_frames_arc = Arc::new(AtomicU64::new(total_frames));
256        let current_read_pos = Arc::new(AtomicU64::new(0));
257
258        let pipeline = Self {
259            ring_buffer: Arc::clone(&ring_buffer),
260            is_running: Arc::clone(&is_running),
261            is_finished: Arc::clone(&is_finished),
262            buffered_frames: Arc::clone(&buffered_frames),
263            total_frames: Arc::clone(&total_frames_arc),
264            current_read_pos: Arc::clone(&current_read_pos),
265            worker_handle: None,
266            channels,
267            sample_rate: target_sr,
268            original_sample_rate: original_sr,
269        };
270
271        Ok(pipeline)
272    }
273
274    /// Start the background processing thread
275    pub fn start(
276        &mut self,
277        path: String,
278        target_sample_rate: Option<u32>,
279        quality: ResampleQuality,
280    ) {
281        if self.is_running.load(Ordering::Relaxed) {
282            return;
283        }
284
285        self.is_running.store(true, Ordering::Relaxed);
286        self.is_finished.store(false, Ordering::Relaxed);
287
288        let ring_buffer = Arc::clone(&self.ring_buffer);
289        let is_running = Arc::clone(&self.is_running);
290        let is_finished = Arc::clone(&self.is_finished);
291        let buffered_frames = Arc::clone(&self.buffered_frames);
292        let total_frames = Arc::clone(&self.total_frames);
293        let current_read_pos = Arc::clone(&self.current_read_pos);
294        let channels = self.channels;
295        let original_sr = self.original_sample_rate;
296        let target_sr = target_sample_rate.unwrap_or(original_sr);
297
298        let handle = thread::spawn(move || {
299            Self::worker_loop(
300                path,
301                channels,
302                original_sr,
303                target_sr,
304                quality,
305                ring_buffer,
306                is_running,
307                is_finished,
308                buffered_frames,
309                total_frames,
310                current_read_pos,
311            );
312        });
313
314        self.worker_handle = Some(handle);
315    }
316
317    /// Background worker that decodes and resamples
318    #[allow(clippy::too_many_arguments)]
319    fn worker_loop(
320        path: String,
321        channels: usize,
322        original_sr: u32,
323        target_sr: u32,
324        quality: ResampleQuality,
325        ring_buffer: Arc<RwLock<RingBuffer>>,
326        is_running: Arc<AtomicBool>,
327        is_finished: Arc<AtomicBool>,
328        buffered_frames: Arc<AtomicU64>,
329        total_frames: Arc<AtomicU64>,
330        current_read_pos: Arc<AtomicU64>,
331    ) {
332        log::info!("Pipeline worker started for: {}", path);
333
334        // Open decoder
335        let mut decoder = match StreamingDecoder::open(&path) {
336            Ok(d) => d,
337            Err(e) => {
338                log::error!("Failed to open decoder in worker: {}", e);
339                is_finished.store(true, Ordering::Relaxed);
340                return;
341            }
342        };
343
344        // Create resampler if needed
345        let mut resampler = if target_sr != original_sr {
346            match StreamingResampler::with_quality(
347                channels,
348                original_sr,
349                target_sr,
350                crate::config::PhaseResponse::default(),
351                quality,
352            ) {
353                Ok(rs) => Some(rs),
354                Err(e) => {
355                    log::error!("Failed to create pipeline resampler: {}", e);
356                    return;
357                }
358            }
359        } else {
360            None
361        };
362
363        let mut total_output_frames: u64 = 0;
364
365        // Process loop
366        while is_running.load(Ordering::Relaxed) {
367            // Decode next chunk
368            let decoded = match decoder.decode_next() {
369                Ok(Some(samples)) => samples,
370                Ok(None) => {
371                    // EOF - flush resampler if present
372                    if let Some(ref mut rs) = resampler {
373                        let flushed = rs.flush_borrowed();
374                        if !flushed.samples.is_empty() {
375                            let (_, overflow) = ring_buffer.write().write(flushed.samples);
376                            // FIX for Defect 5: Sync external read position on overflow
377                            if let Some(min_pos) = overflow {
378                                current_read_pos.fetch_max(min_pos, Ordering::Relaxed);
379                            }
380                            total_output_frames += flushed.frames as u64;
381                            buffered_frames.store(total_output_frames, Ordering::Relaxed);
382                        }
383                    }
384                    break;
385                }
386                Err(e) => {
387                    log::error!("Decode error in pipeline: {}", e);
388                    break;
389                }
390            };
391
392            // Resample if needed. Borrow resampler-owned output directly so the
393            // pipeline worker does not allocate a temporary Vec per chunk.
394            let (output, frames) = if let Some(ref mut rs) = resampler {
395                let resampled = rs.process_chunk_borrowed(&decoded);
396                (resampled.samples, resampled.frames)
397            } else {
398                let frames = decoded.len() / channels;
399                (decoded.as_slice(), frames)
400            };
401
402            if !output.is_empty() {
403                let (_, overflow) = ring_buffer.write().write(output);
404                // FIX for Defect 5: Sync external read position on overflow
405                if let Some(min_pos) = overflow {
406                    current_read_pos.fetch_max(min_pos, Ordering::Relaxed);
407                }
408                total_output_frames += frames as u64;
409                buffered_frames.store(total_output_frames, Ordering::Relaxed);
410            }
411        }
412
413        // Update final total frames (may differ from estimate)
414        total_frames.store(total_output_frames, Ordering::Relaxed);
415        is_finished.store(true, Ordering::Relaxed);
416        is_running.store(false, Ordering::Relaxed);
417
418        log::info!(
419            "Pipeline worker finished. Total frames: {}",
420            total_output_frames
421        );
422    }
423
424    /// Stop the pipeline
425    pub fn stop(&mut self) {
426        self.is_running.store(false, Ordering::Relaxed);
427        if let Some(handle) = self.worker_handle.take() {
428            let _ = handle.join();
429        }
430    }
431
432    /// Read audio data from the pipeline
433    /// Returns number of frames actually read
434    pub fn read(&self, output: &mut [f64]) -> usize {
435        let read_pos = self.current_read_pos.load(Ordering::Relaxed);
436        let Some(buffer) = self.ring_buffer.try_read() else {
437            return 0;
438        };
439        let frames_read = buffer.read(read_pos, output);
440
441        if frames_read > 0 {
442            self.current_read_pos
443                .fetch_add(frames_read as u64, Ordering::Relaxed);
444        }
445
446        frames_read
447    }
448
449    /// Get current read position in frames
450    pub fn read_position(&self) -> u64 {
451        self.current_read_pos.load(Ordering::Relaxed)
452    }
453
454    /// Set read position (for seeking)
455    pub fn set_read_position(&self, frame: u64) {
456        self.current_read_pos.store(frame, Ordering::Relaxed);
457    }
458
459    /// Get total frames
460    pub fn total_frames(&self) -> u64 {
461        self.total_frames.load(Ordering::Relaxed)
462    }
463
464    /// Get buffered frames
465    pub fn buffered_frames(&self) -> u64 {
466        self.buffered_frames.load(Ordering::Relaxed)
467    }
468
469    /// Check if pipeline has finished processing
470    pub fn is_finished(&self) -> bool {
471        self.is_finished.load(Ordering::Relaxed)
472    }
473
474    /// Check if pipeline is running
475    pub fn is_running(&self) -> bool {
476        self.is_running.load(Ordering::Relaxed)
477    }
478
479    /// Get buffering ratio (0.0 - 1.0)
480    pub fn buffer_ratio(&self) -> f32 {
481        let total = self.total_frames.load(Ordering::Relaxed);
482        let buffered = self.buffered_frames.load(Ordering::Relaxed);
483        if total == 0 {
484            return 0.0;
485        }
486        (buffered as f32 / total as f32).min(1.0)
487    }
488
489    /// Get available frames from current read position
490    pub fn available_frames(&self) -> u64 {
491        let read_pos = self.current_read_pos.load(Ordering::Relaxed);
492        self.buffered_frames
493            .load(Ordering::Relaxed)
494            .saturating_sub(read_pos)
495    }
496
497    /// Number of channels in the output stream.
498    pub fn channels(&self) -> usize {
499        self.channels
500    }
501
502    /// Output (post-resample) sample rate in Hz.
503    pub fn sample_rate(&self) -> u32 {
504        self.sample_rate
505    }
506
507    /// Source (pre-resample) sample rate in Hz.
508    pub fn original_sample_rate(&self) -> u32 {
509        self.original_sample_rate
510    }
511}
512
513impl Drop for AudioPipeline {
514    fn drop(&mut self) {
515        self.stop();
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use super::*;
522
523    fn samples(frames: usize, channels: usize, start: f64) -> Vec<f64> {
524        (0..frames * channels).map(|i| start + i as f64).collect()
525    }
526
527    #[test]
528    fn ring_buffer_reads_back_exact_capacity() {
529        let mut buffer = RingBuffer::new(4, 2);
530        let input = samples(4, 2, 1.0);
531        let mut output = vec![0.0; input.len()];
532
533        assert_eq!(buffer.write(&input), (4, None));
534        assert_eq!(buffer.read(0, &mut output), 4);
535        assert_eq!(output, input);
536    }
537
538    #[test]
539    fn ring_buffer_write_and_read_wrap_preserve_order() {
540        let mut buffer = RingBuffer::new(4, 2);
541        let first = samples(3, 2, 1.0);
542        let second = samples(3, 2, 101.0);
543
544        assert_eq!(buffer.write(&first), (3, None));
545        buffer.advance_read_pos(2);
546        assert_eq!(buffer.write(&second), (3, None));
547
548        let mut output = vec![0.0; 4 * 2];
549        assert_eq!(buffer.read(2, &mut output), 4);
550
551        let mut expected = first[2 * 2..].to_vec();
552        expected.extend_from_slice(&second);
553        assert_eq!(output, expected);
554    }
555
556    #[test]
557    fn ring_buffer_overflow_keeps_newest_frames_and_reports_consumed_position() {
558        let mut buffer = RingBuffer::new(4, 2);
559        let input = samples(6, 2, 1.0);
560        let mut output = vec![0.0; 4 * 2];
561
562        assert_eq!(buffer.write(&input), (6, Some(2)));
563        assert_eq!(buffer.overflow_count(), 1);
564        assert_eq!(buffer.read(2, &mut output), 4);
565        assert_eq!(output, input[2 * 2..].to_vec());
566    }
567
568    #[test]
569    fn ring_buffer_empty_read_leaves_output_untouched() {
570        let buffer = RingBuffer::new(4, 2);
571        let mut output = vec![42.0; 4];
572
573        assert_eq!(buffer.read(0, &mut output), 0);
574        assert_eq!(output, vec![42.0; 4]);
575    }
576
577    #[test]
578    fn ring_buffer_partial_read_only_copies_available_frames() {
579        let mut buffer = RingBuffer::new(8, 2);
580        let input = samples(2, 2, 1.0);
581        let mut output = vec![42.0; 4 * 2];
582
583        assert_eq!(buffer.write(&input), (2, None));
584        assert_eq!(buffer.read(0, &mut output), 2);
585        assert_eq!(&output[..4], &input[..]);
586        assert_eq!(&output[4..], &[42.0; 4]);
587    }
588
589    #[test]
590    fn ring_buffer_wrap_preserves_multichannel_interleaving() {
591        let channels = 6;
592        let mut buffer = RingBuffer::new(4, channels);
593        let first = samples(3, channels, 1.0);
594        let second = samples(3, channels, 101.0);
595
596        assert_eq!(buffer.write(&first), (3, None));
597        buffer.advance_read_pos(2);
598        assert_eq!(buffer.write(&second), (3, None));
599
600        let mut output = vec![0.0; 4 * channels];
601        assert_eq!(buffer.read(2, &mut output), 4);
602
603        let mut expected = first[2 * channels..].to_vec();
604        expected.extend_from_slice(&second);
605        assert_eq!(output, expected);
606    }
607
608    #[test]
609    fn audio_pipeline_read_returns_zero_when_ring_buffer_locked() {
610        let ring_buffer = Arc::new(RwLock::new(RingBuffer::new(4, 2)));
611        let pipeline = AudioPipeline {
612            ring_buffer: Arc::clone(&ring_buffer),
613            is_running: Arc::new(AtomicBool::new(false)),
614            is_finished: Arc::new(AtomicBool::new(false)),
615            buffered_frames: Arc::new(AtomicU64::new(0)),
616            total_frames: Arc::new(AtomicU64::new(0)),
617            current_read_pos: Arc::new(AtomicU64::new(0)),
618            worker_handle: None,
619            channels: 2,
620            sample_rate: 48_000,
621            original_sample_rate: 48_000,
622        };
623
624        let _write_guard = ring_buffer.write();
625        let mut output = vec![42.0; 4];
626
627        assert_eq!(pipeline.read(&mut output), 0);
628        assert_eq!(pipeline.read_position(), 0);
629        assert_eq!(output, vec![42.0; 4]);
630    }
631
632    #[test]
633    fn audio_pipeline_read_advances_by_frames_actually_read() {
634        let ring_buffer = Arc::new(RwLock::new(RingBuffer::new(8, 2)));
635        ring_buffer.write().write(&samples(3, 2, 1.0));
636        let pipeline = AudioPipeline {
637            ring_buffer,
638            is_running: Arc::new(AtomicBool::new(false)),
639            is_finished: Arc::new(AtomicBool::new(false)),
640            buffered_frames: Arc::new(AtomicU64::new(3)),
641            total_frames: Arc::new(AtomicU64::new(3)),
642            current_read_pos: Arc::new(AtomicU64::new(0)),
643            worker_handle: None,
644            channels: 2,
645            sample_rate: 48_000,
646            original_sample_rate: 48_000,
647        };
648
649        let mut first = vec![0.0; 2 * 2];
650        assert_eq!(pipeline.read(&mut first), 2);
651        assert_eq!(pipeline.read_position(), 2);
652
653        let mut second = vec![0.0; 2 * 2];
654        assert_eq!(pipeline.read(&mut second), 1);
655        assert_eq!(pipeline.read_position(), 3);
656
657        let mut empty = vec![42.0; 2 * 2];
658        assert_eq!(pipeline.read(&mut empty), 0);
659        assert_eq!(pipeline.read_position(), 3);
660        assert_eq!(empty, vec![42.0; 4]);
661    }
662}