1use parking_lot::RwLock;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::thread::{self, JoinHandle};
10use crate::config::ResampleQuality;
13use crate::decoder::{DecoderError, StreamingDecoder};
14use crate::processor::StreamingResampler;
15
16#[derive(Debug, thiserror::Error)]
18pub enum PipelineError {
19 #[error("failed to open decoder: {0}")]
21 Decoder(#[from] DecoderError),
22}
23
24const RING_BUFFER_FRAMES: usize = 131072;
27
28#[derive(Debug, Clone, Copy, PartialEq)]
30pub enum PipelineStatus {
31 Idle,
33 Buffering,
35 Finished,
37 Error,
39}
40
41pub struct AudioPipeline {
43 ring_buffer: Arc<RwLock<RingBuffer>>,
45
46 is_running: Arc<AtomicBool>,
48 is_finished: Arc<AtomicBool>,
49
50 buffered_frames: Arc<AtomicU64>,
52 total_frames: Arc<AtomicU64>,
53 current_read_pos: Arc<AtomicU64>,
54
55 worker_handle: Option<JoinHandle<()>>,
57
58 channels: usize,
60 sample_rate: u32,
61 original_sample_rate: u32,
62}
63
64pub struct RingBuffer {
67 data: Vec<f64>,
68 capacity_frames: usize,
69 channels: usize,
70 frames_written: u64,
72 frames_consumed: u64,
74 overflow_count: u64,
76}
77
78impl RingBuffer {
79 pub fn new(capacity_frames: usize, channels: usize) -> Self {
80 Self {
81 data: vec![0.0; capacity_frames * channels],
82 capacity_frames,
83 channels,
84 frames_written: 0,
85 frames_consumed: 0,
86 overflow_count: 0,
87 }
88 }
89
90 pub fn write(&mut self, samples: &[f64]) -> (usize, Option<u64>) {
95 let frames_to_write = samples.len() / self.channels;
96 let samples_to_write = frames_to_write * self.channels;
97
98 if frames_to_write == 0 {
99 return (0, None);
100 }
101
102 let frames_in_buffer = self.frames_written.saturating_sub(self.frames_consumed);
104 let available_space = self
105 .capacity_frames
106 .saturating_sub(frames_in_buffer as usize);
107
108 let overflow_consumed = if frames_to_write > available_space {
109 let overflow_frames = frames_to_write - available_space;
112 self.frames_consumed = self.frames_consumed.saturating_add(overflow_frames as u64);
113 self.overflow_count = self.overflow_count.saturating_add(1);
114 log::warn!(
115 "RingBuffer overflow: dropping {} frames (total overflows: {})",
116 overflow_frames,
117 self.overflow_count
118 );
119 Some(self.frames_consumed)
121 } else {
122 None
123 };
124
125 let frames_to_copy = frames_to_write.min(self.capacity_frames);
127 let source_frame_offset = frames_to_write - frames_to_copy;
128 let source_sample_offset = source_frame_offset * self.channels;
129 let write_frame =
130 ((self.frames_written as usize) + source_frame_offset) % self.capacity_frames;
131 self.copy_frames_from_slice(
132 write_frame,
133 &samples[source_sample_offset..samples_to_write],
134 frames_to_copy,
135 );
136
137 self.frames_written += frames_to_write as u64;
138 (frames_to_write, overflow_consumed)
139 }
140
141 pub fn read(&self, start_frame: u64, output: &mut [f64]) -> usize {
143 let frames_to_read = output.len() / self.channels;
144 let available = self.frames_written.saturating_sub(start_frame) as usize;
145 let actual_frames = frames_to_read.min(available);
146
147 if actual_frames == 0 {
148 return 0;
149 }
150
151 let read_frame = (start_frame as usize) % self.capacity_frames;
152 self.copy_frames_to_slice(
153 read_frame,
154 &mut output[..actual_frames * self.channels],
155 actual_frames,
156 );
157
158 actual_frames
159 }
160
161 fn copy_frames_from_slice(&mut self, start_frame: usize, source: &[f64], frames: usize) {
162 let first_frames = frames.min(self.capacity_frames - start_frame);
163 let first_samples = first_frames * self.channels;
164 let start_sample = start_frame * self.channels;
165
166 self.data[start_sample..start_sample + first_samples]
167 .copy_from_slice(&source[..first_samples]);
168
169 let remaining_frames = frames - first_frames;
170 if remaining_frames > 0 {
171 let remaining_samples = remaining_frames * self.channels;
172 self.data[..remaining_samples]
173 .copy_from_slice(&source[first_samples..first_samples + remaining_samples]);
174 }
175 }
176
177 fn copy_frames_to_slice(&self, start_frame: usize, output: &mut [f64], frames: usize) {
178 let first_frames = frames.min(self.capacity_frames - start_frame);
179 let first_samples = first_frames * self.channels;
180 let start_sample = start_frame * self.channels;
181
182 output[..first_samples]
183 .copy_from_slice(&self.data[start_sample..start_sample + first_samples]);
184
185 let remaining_frames = frames - first_frames;
186 if remaining_frames > 0 {
187 let remaining_samples = remaining_frames * self.channels;
188 output[first_samples..first_samples + remaining_samples]
189 .copy_from_slice(&self.data[..remaining_samples]);
190 }
191 }
192
193 pub fn advance_read_pos(&mut self, frames: u64) {
195 self.frames_consumed = self.frames_consumed.saturating_add(frames);
196 }
197
198 pub fn available_frames(&self, read_pos: u64) -> u64 {
200 self.frames_written.saturating_sub(read_pos)
201 }
202
203 pub fn total_written(&self) -> u64 {
205 self.frames_written
206 }
207
208 pub fn overflow_count(&self) -> u64 {
210 self.overflow_count
211 }
212}
213
214impl AudioPipeline {
215 pub fn new(
222 path: &str,
223 target_sample_rate: Option<u32>,
224 _resample_quality: ResampleQuality,
225 ) -> Result<Self, PipelineError> {
226 let decoder = StreamingDecoder::open(path)?;
227
228 let info = decoder.info.clone();
229 let original_sr = info.sample_rate;
230 let channels = info.channels;
231 let total_source_frames = info.total_frames.unwrap_or(0);
232
233 let target_sr = target_sample_rate.unwrap_or(original_sr);
235
236 let total_frames = if target_sr != original_sr {
238 ((total_source_frames as f64) * (target_sr as f64) / (original_sr as f64)).ceil() as u64
239 } else {
240 total_source_frames
241 };
242
243 log::info!(
244 "Creating audio pipeline: {}→{} Hz, {} ch, ~{} frames",
245 original_sr,
246 target_sr,
247 channels,
248 total_frames
249 );
250
251 let ring_buffer = Arc::new(RwLock::new(RingBuffer::new(RING_BUFFER_FRAMES, channels)));
252 let is_running = Arc::new(AtomicBool::new(false));
253 let is_finished = Arc::new(AtomicBool::new(false));
254 let buffered_frames = Arc::new(AtomicU64::new(0));
255 let total_frames_arc = Arc::new(AtomicU64::new(total_frames));
256 let current_read_pos = Arc::new(AtomicU64::new(0));
257
258 let pipeline = Self {
259 ring_buffer: Arc::clone(&ring_buffer),
260 is_running: Arc::clone(&is_running),
261 is_finished: Arc::clone(&is_finished),
262 buffered_frames: Arc::clone(&buffered_frames),
263 total_frames: Arc::clone(&total_frames_arc),
264 current_read_pos: Arc::clone(¤t_read_pos),
265 worker_handle: None,
266 channels,
267 sample_rate: target_sr,
268 original_sample_rate: original_sr,
269 };
270
271 Ok(pipeline)
272 }
273
274 pub fn start(
276 &mut self,
277 path: String,
278 target_sample_rate: Option<u32>,
279 quality: ResampleQuality,
280 ) {
281 if self.is_running.load(Ordering::Relaxed) {
282 return;
283 }
284
285 self.is_running.store(true, Ordering::Relaxed);
286 self.is_finished.store(false, Ordering::Relaxed);
287
288 let ring_buffer = Arc::clone(&self.ring_buffer);
289 let is_running = Arc::clone(&self.is_running);
290 let is_finished = Arc::clone(&self.is_finished);
291 let buffered_frames = Arc::clone(&self.buffered_frames);
292 let total_frames = Arc::clone(&self.total_frames);
293 let current_read_pos = Arc::clone(&self.current_read_pos);
294 let channels = self.channels;
295 let original_sr = self.original_sample_rate;
296 let target_sr = target_sample_rate.unwrap_or(original_sr);
297
298 let handle = thread::spawn(move || {
299 Self::worker_loop(
300 path,
301 channels,
302 original_sr,
303 target_sr,
304 quality,
305 ring_buffer,
306 is_running,
307 is_finished,
308 buffered_frames,
309 total_frames,
310 current_read_pos,
311 );
312 });
313
314 self.worker_handle = Some(handle);
315 }
316
317 #[allow(clippy::too_many_arguments)]
319 fn worker_loop(
320 path: String,
321 channels: usize,
322 original_sr: u32,
323 target_sr: u32,
324 quality: ResampleQuality,
325 ring_buffer: Arc<RwLock<RingBuffer>>,
326 is_running: Arc<AtomicBool>,
327 is_finished: Arc<AtomicBool>,
328 buffered_frames: Arc<AtomicU64>,
329 total_frames: Arc<AtomicU64>,
330 current_read_pos: Arc<AtomicU64>,
331 ) {
332 log::info!("Pipeline worker started for: {}", path);
333
334 let mut decoder = match StreamingDecoder::open(&path) {
336 Ok(d) => d,
337 Err(e) => {
338 log::error!("Failed to open decoder in worker: {}", e);
339 is_finished.store(true, Ordering::Relaxed);
340 return;
341 }
342 };
343
344 let mut resampler = if target_sr != original_sr {
346 match StreamingResampler::with_quality(
347 channels,
348 original_sr,
349 target_sr,
350 crate::config::PhaseResponse::default(),
351 quality,
352 ) {
353 Ok(rs) => Some(rs),
354 Err(e) => {
355 log::error!("Failed to create pipeline resampler: {}", e);
356 return;
357 }
358 }
359 } else {
360 None
361 };
362
363 let mut total_output_frames: u64 = 0;
364
365 while is_running.load(Ordering::Relaxed) {
367 let decoded = match decoder.decode_next() {
369 Ok(Some(samples)) => samples,
370 Ok(None) => {
371 if let Some(ref mut rs) = resampler {
373 let flushed = rs.flush_borrowed();
374 if !flushed.samples.is_empty() {
375 let (_, overflow) = ring_buffer.write().write(flushed.samples);
376 if let Some(min_pos) = overflow {
378 current_read_pos.fetch_max(min_pos, Ordering::Relaxed);
379 }
380 total_output_frames += flushed.frames as u64;
381 buffered_frames.store(total_output_frames, Ordering::Relaxed);
382 }
383 }
384 break;
385 }
386 Err(e) => {
387 log::error!("Decode error in pipeline: {}", e);
388 break;
389 }
390 };
391
392 let (output, frames) = if let Some(ref mut rs) = resampler {
395 let resampled = rs.process_chunk_borrowed(&decoded);
396 (resampled.samples, resampled.frames)
397 } else {
398 let frames = decoded.len() / channels;
399 (decoded.as_slice(), frames)
400 };
401
402 if !output.is_empty() {
403 let (_, overflow) = ring_buffer.write().write(output);
404 if let Some(min_pos) = overflow {
406 current_read_pos.fetch_max(min_pos, Ordering::Relaxed);
407 }
408 total_output_frames += frames as u64;
409 buffered_frames.store(total_output_frames, Ordering::Relaxed);
410 }
411 }
412
413 total_frames.store(total_output_frames, Ordering::Relaxed);
415 is_finished.store(true, Ordering::Relaxed);
416 is_running.store(false, Ordering::Relaxed);
417
418 log::info!(
419 "Pipeline worker finished. Total frames: {}",
420 total_output_frames
421 );
422 }
423
424 pub fn stop(&mut self) {
426 self.is_running.store(false, Ordering::Relaxed);
427 if let Some(handle) = self.worker_handle.take() {
428 let _ = handle.join();
429 }
430 }
431
432 pub fn read(&self, output: &mut [f64]) -> usize {
435 let read_pos = self.current_read_pos.load(Ordering::Relaxed);
436 let Some(buffer) = self.ring_buffer.try_read() else {
437 return 0;
438 };
439 let frames_read = buffer.read(read_pos, output);
440
441 if frames_read > 0 {
442 self.current_read_pos
443 .fetch_add(frames_read as u64, Ordering::Relaxed);
444 }
445
446 frames_read
447 }
448
449 pub fn read_position(&self) -> u64 {
451 self.current_read_pos.load(Ordering::Relaxed)
452 }
453
454 pub fn set_read_position(&self, frame: u64) {
456 self.current_read_pos.store(frame, Ordering::Relaxed);
457 }
458
459 pub fn total_frames(&self) -> u64 {
461 self.total_frames.load(Ordering::Relaxed)
462 }
463
464 pub fn buffered_frames(&self) -> u64 {
466 self.buffered_frames.load(Ordering::Relaxed)
467 }
468
469 pub fn is_finished(&self) -> bool {
471 self.is_finished.load(Ordering::Relaxed)
472 }
473
474 pub fn is_running(&self) -> bool {
476 self.is_running.load(Ordering::Relaxed)
477 }
478
479 pub fn buffer_ratio(&self) -> f32 {
481 let total = self.total_frames.load(Ordering::Relaxed);
482 let buffered = self.buffered_frames.load(Ordering::Relaxed);
483 if total == 0 {
484 return 0.0;
485 }
486 (buffered as f32 / total as f32).min(1.0)
487 }
488
489 pub fn available_frames(&self) -> u64 {
491 let read_pos = self.current_read_pos.load(Ordering::Relaxed);
492 self.buffered_frames
493 .load(Ordering::Relaxed)
494 .saturating_sub(read_pos)
495 }
496
497 pub fn channels(&self) -> usize {
499 self.channels
500 }
501
502 pub fn sample_rate(&self) -> u32 {
504 self.sample_rate
505 }
506
507 pub fn original_sample_rate(&self) -> u32 {
509 self.original_sample_rate
510 }
511}
512
513impl Drop for AudioPipeline {
514 fn drop(&mut self) {
515 self.stop();
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use super::*;
522
523 fn samples(frames: usize, channels: usize, start: f64) -> Vec<f64> {
524 (0..frames * channels).map(|i| start + i as f64).collect()
525 }
526
527 #[test]
528 fn ring_buffer_reads_back_exact_capacity() {
529 let mut buffer = RingBuffer::new(4, 2);
530 let input = samples(4, 2, 1.0);
531 let mut output = vec![0.0; input.len()];
532
533 assert_eq!(buffer.write(&input), (4, None));
534 assert_eq!(buffer.read(0, &mut output), 4);
535 assert_eq!(output, input);
536 }
537
538 #[test]
539 fn ring_buffer_write_and_read_wrap_preserve_order() {
540 let mut buffer = RingBuffer::new(4, 2);
541 let first = samples(3, 2, 1.0);
542 let second = samples(3, 2, 101.0);
543
544 assert_eq!(buffer.write(&first), (3, None));
545 buffer.advance_read_pos(2);
546 assert_eq!(buffer.write(&second), (3, None));
547
548 let mut output = vec![0.0; 4 * 2];
549 assert_eq!(buffer.read(2, &mut output), 4);
550
551 let mut expected = first[2 * 2..].to_vec();
552 expected.extend_from_slice(&second);
553 assert_eq!(output, expected);
554 }
555
556 #[test]
557 fn ring_buffer_overflow_keeps_newest_frames_and_reports_consumed_position() {
558 let mut buffer = RingBuffer::new(4, 2);
559 let input = samples(6, 2, 1.0);
560 let mut output = vec![0.0; 4 * 2];
561
562 assert_eq!(buffer.write(&input), (6, Some(2)));
563 assert_eq!(buffer.overflow_count(), 1);
564 assert_eq!(buffer.read(2, &mut output), 4);
565 assert_eq!(output, input[2 * 2..].to_vec());
566 }
567
568 #[test]
569 fn ring_buffer_empty_read_leaves_output_untouched() {
570 let buffer = RingBuffer::new(4, 2);
571 let mut output = vec![42.0; 4];
572
573 assert_eq!(buffer.read(0, &mut output), 0);
574 assert_eq!(output, vec![42.0; 4]);
575 }
576
577 #[test]
578 fn ring_buffer_partial_read_only_copies_available_frames() {
579 let mut buffer = RingBuffer::new(8, 2);
580 let input = samples(2, 2, 1.0);
581 let mut output = vec![42.0; 4 * 2];
582
583 assert_eq!(buffer.write(&input), (2, None));
584 assert_eq!(buffer.read(0, &mut output), 2);
585 assert_eq!(&output[..4], &input[..]);
586 assert_eq!(&output[4..], &[42.0; 4]);
587 }
588
589 #[test]
590 fn ring_buffer_wrap_preserves_multichannel_interleaving() {
591 let channels = 6;
592 let mut buffer = RingBuffer::new(4, channels);
593 let first = samples(3, channels, 1.0);
594 let second = samples(3, channels, 101.0);
595
596 assert_eq!(buffer.write(&first), (3, None));
597 buffer.advance_read_pos(2);
598 assert_eq!(buffer.write(&second), (3, None));
599
600 let mut output = vec![0.0; 4 * channels];
601 assert_eq!(buffer.read(2, &mut output), 4);
602
603 let mut expected = first[2 * channels..].to_vec();
604 expected.extend_from_slice(&second);
605 assert_eq!(output, expected);
606 }
607
608 #[test]
609 fn audio_pipeline_read_returns_zero_when_ring_buffer_locked() {
610 let ring_buffer = Arc::new(RwLock::new(RingBuffer::new(4, 2)));
611 let pipeline = AudioPipeline {
612 ring_buffer: Arc::clone(&ring_buffer),
613 is_running: Arc::new(AtomicBool::new(false)),
614 is_finished: Arc::new(AtomicBool::new(false)),
615 buffered_frames: Arc::new(AtomicU64::new(0)),
616 total_frames: Arc::new(AtomicU64::new(0)),
617 current_read_pos: Arc::new(AtomicU64::new(0)),
618 worker_handle: None,
619 channels: 2,
620 sample_rate: 48_000,
621 original_sample_rate: 48_000,
622 };
623
624 let _write_guard = ring_buffer.write();
625 let mut output = vec![42.0; 4];
626
627 assert_eq!(pipeline.read(&mut output), 0);
628 assert_eq!(pipeline.read_position(), 0);
629 assert_eq!(output, vec![42.0; 4]);
630 }
631
632 #[test]
633 fn audio_pipeline_read_advances_by_frames_actually_read() {
634 let ring_buffer = Arc::new(RwLock::new(RingBuffer::new(8, 2)));
635 ring_buffer.write().write(&samples(3, 2, 1.0));
636 let pipeline = AudioPipeline {
637 ring_buffer,
638 is_running: Arc::new(AtomicBool::new(false)),
639 is_finished: Arc::new(AtomicBool::new(false)),
640 buffered_frames: Arc::new(AtomicU64::new(3)),
641 total_frames: Arc::new(AtomicU64::new(3)),
642 current_read_pos: Arc::new(AtomicU64::new(0)),
643 worker_handle: None,
644 channels: 2,
645 sample_rate: 48_000,
646 original_sample_rate: 48_000,
647 };
648
649 let mut first = vec![0.0; 2 * 2];
650 assert_eq!(pipeline.read(&mut first), 2);
651 assert_eq!(pipeline.read_position(), 2);
652
653 let mut second = vec![0.0; 2 * 2];
654 assert_eq!(pipeline.read(&mut second), 1);
655 assert_eq!(pipeline.read_position(), 3);
656
657 let mut empty = vec![42.0; 2 * 2];
658 assert_eq!(pipeline.read(&mut empty), 0);
659 assert_eq!(pipeline.read_position(), 3);
660 assert_eq!(empty, vec![42.0; 4]);
661 }
662}