1use std::sync::atomic::{AtomicUsize, Ordering};
49use std::sync::Arc;
50use std::time::Duration;
51
52use parking_lot::Mutex;
53
54use crate::chunker::{Chunker, ProcessedChunk};
55use crate::converter::{AudioFormatConverter, ConversionMetadata, StandardAudio};
56use crate::error::{Error, Result};
57use crate::format::AudioFormat;
58use crate::preprocessing::{DcHighPassFilter, NoiseReducer, PreprocessingConfig, VadContext};
59use crate::time::{validate_in_range, AudioDuration, AudioInstant, AudioTimestamp};
60use crate::vad::{SpeechChunk, VadDetector};
61
62#[derive(Debug, Clone, Copy)]
64pub struct ProcessingResult {
65 pub chunks_processed: usize,
67
68 pub total_latency: Duration,
70
71 pub stage_latencies: StageLatencies,
73
74 pub backpressure_active: bool,
79}
80
81#[derive(Debug, Clone, Copy, Default)]
83pub struct StageLatencies {
84 pub format_conversion: Duration,
86
87 pub vad_detection: Duration,
89
90 pub chunking: Duration,
92
93 pub preprocessing_avg: Duration,
95
96 pub broadcasting_avg: Duration,
101}
102
103#[derive(Debug)]
111pub struct AudioPipelineCoordinator {
112 vad_detector: Arc<VadDetector>,
113 chunker: Chunker,
114 dc_filter: Mutex<DcHighPassFilter>,
115 noise_reducer: Mutex<NoiseReducer>,
116 stream_buffer: Mutex<StreamBuffer>,
117 processed_cursor: AtomicUsize,
118}
119
120#[derive(Debug, Default)]
121struct StreamBuffer {
122 base_sample_index: usize,
123 samples: Vec<f32>,
124}
125
126impl StreamBuffer {
127 fn append(&mut self, new_samples: &[f32]) {
128 self.samples.extend_from_slice(new_samples);
129 }
130
131 fn as_slice(&self) -> &[f32] {
132 &self.samples
133 }
134
135 fn base_sample_index(&self) -> usize {
136 self.base_sample_index
137 }
138
139 fn len(&self) -> usize {
140 self.samples.len()
141 }
142
143 fn start_time(&self, stream_start: AudioTimestamp, sample_rate: u32) -> Result<AudioTimestamp> {
144 let offset = samples_to_duration(self.base_sample_index, sample_rate)?;
145 Ok(stream_start.add_duration(offset))
146 }
147
148 fn drop_through(&mut self, sample_index: usize) {
149 if sample_index <= self.base_sample_index {
150 return;
151 }
152
153 let drop_count = sample_index
154 .saturating_sub(self.base_sample_index)
155 .min(self.samples.len());
156
157 if drop_count == 0 {
158 return;
159 }
160
161 if drop_count >= self.samples.len() {
162 self.samples.clear();
163 self.base_sample_index = sample_index;
164 } else {
165 self.samples.drain(..drop_count);
166 self.base_sample_index += drop_count;
167 }
168 }
169}
170
171impl AudioPipelineCoordinator {
172 pub fn new(
181 vad_detector: Arc<VadDetector>,
182 chunker: Chunker,
183 dc_filter: DcHighPassFilter,
184 noise_reducer: NoiseReducer,
185 ) -> Self {
186 Self {
187 vad_detector,
188 chunker,
189 dc_filter: Mutex::new(dc_filter),
190 noise_reducer: Mutex::new(noise_reducer),
191 stream_buffer: Mutex::new(StreamBuffer::default()),
192 processed_cursor: AtomicUsize::new(0),
193 }
194 }
195
196 pub fn new_with_defaults() -> Result<Self> {
215 use crate::{NoopVadMetricsCollector, VadConfig, VadMetricsCollector};
216
217 let metrics: Arc<dyn VadMetricsCollector> = Arc::new(NoopVadMetricsCollector);
218
219 let vad_config = VadConfig::default();
220 let vad_detector = Arc::new(VadDetector::new(vad_config, metrics)?);
221
222 let chunker = Chunker::default();
223
224 let dc_config = PreprocessingConfig::default();
225 let dc_filter = DcHighPassFilter::new(dc_config)?;
226
227 let noise_config = crate::preprocessing::NoiseReductionConfig::default();
228 let noise_reducer = crate::preprocessing::NoiseReducer::new(noise_config)?;
229
230 Ok(Self::new(vad_detector, chunker, dc_filter, noise_reducer))
231 }
232
233 pub fn process_frame(&self, audio_bytes: &[u8]) -> Result<ProcessingResult> {
270 let pipeline_start = AudioInstant::now();
271 let mut latencies = StageLatencies::default();
272
273 let format_start = AudioInstant::now();
274 let standard_audio = AudioFormatConverter::convert_to_standard(audio_bytes)?;
275 latencies.format_conversion = format_start.elapsed();
276
277 self.process_standard_audio(&standard_audio, pipeline_start, latencies)
278 }
279
280 pub fn flush(&self) -> Result<ProcessingResult> {
286 let pipeline_start = AudioInstant::now();
287 let latencies = StageLatencies::default();
288
289 let config = *self.vad_detector.config();
290 let frame_len = config.frame_length_samples()?;
291 let frames_to_flush = (config.hangover_frames.max(1)) + 1;
292 let silence_samples = vec![0.0f32; frame_len * frames_to_flush];
293
294 let metadata = ConversionMetadata {
295 original_format: AudioFormat::WavPcm,
296 original_sample_rate: config.sample_rate,
297 original_channels: 1,
298 original_bit_depth: Some(16),
299 peak_before: 0.0,
300 peak_after: 0.0,
301 conversion_time_ms: 0.0,
302 detection_time_ms: 0.0,
303 decode_time_ms: 0.0,
304 resample_time_ms: 0.0,
305 mix_time_ms: 0.0,
306 };
307
308 let standard_audio = StandardAudio {
309 samples: silence_samples,
310 metadata,
311 };
312 self.process_standard_audio(&standard_audio, pipeline_start, latencies)
313 }
314
315 fn process_standard_audio(
316 &self,
317 standard_audio: &StandardAudio,
318 pipeline_start: AudioInstant,
319 mut latencies: StageLatencies,
320 ) -> Result<ProcessingResult> {
321 let vad_start = AudioInstant::now();
322 let vad_segments = self.vad_detector.detect(&standard_audio.samples)?;
323 latencies.vad_detection = vad_start.elapsed();
324
325 let sample_rate = self.vad_detector.config().sample_rate;
326 let stream_start_time = self.vad_detector.config().stream_start_time;
327
328 let (chunks, chunk_duration) = {
329 let mut buffer = self.stream_buffer.lock();
330 buffer.append(&standard_audio.samples);
331
332 if buffer.as_slice().is_empty() {
333 Ok::<(Vec<ProcessedChunk>, Duration), Error>((Vec::new(), Duration::default()))
334 } else {
335 let buffer_base = buffer.base_sample_index();
336 let buffer_len = buffer.len();
337 let buffer_end_abs = buffer_base + buffer_len;
338 let processed_before = self.processed_cursor.load(Ordering::Acquire);
339 let slice_start_abs = processed_before.max(buffer_base);
340
341 if slice_start_abs >= buffer_base + buffer_len {
342 let lookback_samples = (sample_rate as usize) / 5;
343 let drop_target = slice_start_abs.saturating_sub(lookback_samples);
344 buffer.drop_through(drop_target);
345 drop(buffer);
346 return Ok(ProcessingResult {
347 chunks_processed: 0,
348 total_latency: pipeline_start.elapsed(),
349 stage_latencies: latencies,
350 backpressure_active: false,
351 });
352 }
353
354 let base_time = buffer.start_time(stream_start_time, sample_rate)?;
355 let offset_samples = slice_start_abs.saturating_sub(buffer_base);
356 let offset_duration = samples_to_duration(offset_samples, sample_rate)?;
357 let audio_start = base_time.add_duration(offset_duration);
358
359 let audio_slice = buffer
360 .as_slice()
361 .get(offset_samples..)
362 .ok_or_else(|| Error::InvalidInput("invalid buffer window".into()))?;
363
364 let normalized_segments = normalize_vad_segments(
365 &vad_segments,
366 stream_start_time,
367 audio_start,
368 slice_start_abs,
369 buffer_end_abs,
370 sample_rate,
371 )?;
372
373 let chunk_start = AudioInstant::now();
374 let chunks = self.chunker.chunk_with_stream_start(
375 audio_slice,
376 sample_rate,
377 &normalized_segments,
378 audio_start,
379 )?;
380 let elapsed = chunk_start.elapsed();
381
382 let mut max_processed_sample = processed_before;
383 for chunk in &chunks {
384 let end_sample =
385 time_to_sample_index(chunk.end_time, stream_start_time, sample_rate)?;
386 if end_sample > max_processed_sample {
387 max_processed_sample = end_sample;
388 }
389 }
390 self.processed_cursor
391 .store(max_processed_sample, Ordering::Release);
392
393 let lookback_samples = (sample_rate as usize) / 5; let drop_target = max_processed_sample.saturating_sub(lookback_samples);
395 buffer.drop_through(drop_target);
396 drop(buffer);
397
398 Ok::<(Vec<ProcessedChunk>, Duration), Error>((chunks, elapsed))
399 }
400 }?;
401 latencies.chunking = chunk_duration;
402
403 let mut total_preprocess = Duration::default();
404 let mut prev_overlap_next: Option<Vec<f32>> = None;
405
406 for chunk in &chunks {
407 let preprocess_start = AudioInstant::now();
408 let mut preprocessed = self.preprocess_chunk(chunk)?;
409
410 if let Some(prev_overlap) = prev_overlap_next.take() {
411 preprocessed.overlap_prev = Some(prev_overlap);
412 } else {
413 preprocessed.overlap_prev = None;
414 }
415
416 prev_overlap_next.clone_from(&preprocessed.overlap_next);
417
418 total_preprocess += preprocess_start.elapsed();
419 }
420
421 let chunk_count = chunks.len().max(1);
422 latencies.preprocessing_avg = total_preprocess / chunk_count as u32;
423 latencies.broadcasting_avg = Duration::ZERO;
424
425 let total_latency = pipeline_start.elapsed();
426
427 if total_latency > Duration::from_millis(60) {
428 tracing::warn!(
429 latency_ms = total_latency.as_millis(),
430 "Audio processing exceeded 60ms target"
431 );
432 }
433
434 let backpressure_active = false;
435
436 Ok(ProcessingResult {
437 chunks_processed: chunks.len(),
438 total_latency,
439 stage_latencies: latencies,
440 backpressure_active,
441 })
442 }
443
444 fn preprocess_chunk(&self, chunk: &ProcessedChunk) -> Result<ProcessedChunk> {
445 let vad_ctx = VadContext {
446 is_silence: chunk.is_silence(),
447 };
448 let dc_clean = {
449 let mut filter = self.dc_filter.lock();
450 filter.process(&chunk.samples, Some(&vad_ctx))?
451 };
452
453 let denoised = {
454 let mut reducer = self.noise_reducer.lock();
455 reducer.reduce(&dc_clean, Some(vad_ctx))?
456 };
457
458 let (energy, has_clipping) = Self::compute_energy_and_clipping(&denoised);
459 let snr_db = Self::recalculate_snr(chunk.snr_db, chunk.energy, energy);
460
461 let overlap_next = chunk.overlap_next.as_ref().and_then(|existing| {
462 let retain = existing.len().min(denoised.len());
463 denoised.get(denoised.len() - retain..).map(<[f32]>::to_vec)
464 });
465
466 let mut processed = chunk.clone();
467 processed.samples = denoised;
468 processed.energy = energy;
469 processed.snr_db = snr_db;
470 processed.has_clipping = has_clipping;
471 processed.overlap_next = overlap_next;
472 processed.overlap_prev = None;
473
474 Ok(processed)
475 }
476
477 fn compute_energy_and_clipping(samples: &[f32]) -> (f32, bool) {
478 const CLIPPING_THRESHOLD: f32 = 0.999;
479
480 if samples.is_empty() {
481 return (0.0, false);
482 }
483
484 let mut sum_squares = 0.0f32;
485 let mut has_clipping = false;
486 for &sample in samples {
487 let abs = sample.abs();
488 if abs >= CLIPPING_THRESHOLD {
489 has_clipping = true;
490 }
491 sum_squares = sample.mul_add(sample, sum_squares);
492 }
493 let mean_square = sum_squares / samples.len() as f32;
494 (mean_square.sqrt(), has_clipping)
495 }
496
497 fn recalculate_snr(
498 previous_snr: Option<f32>,
499 previous_energy: f32,
500 new_energy: f32,
501 ) -> Option<f32> {
502 const EPSILON: f32 = 1e-10;
503 let snr_db = previous_snr?;
504
505 if previous_energy <= EPSILON {
506 return Some(snr_db);
507 }
508
509 let noise_rms = previous_energy / 10_f32.powf(snr_db / 20.0);
510 if noise_rms <= EPSILON || new_energy <= EPSILON {
511 return Some(snr_db);
512 }
513
514 let ratio = new_energy / noise_rms;
515 if ratio <= EPSILON {
516 return Some(snr_db);
517 }
518
519 Some(20.0 * ratio.log10())
520 }
521}
522
523fn samples_to_duration(samples: usize, sample_rate: u32) -> Result<AudioDuration> {
524 validate_in_range(sample_rate, 1_u32, u32::MAX, "sample_rate")?;
525
526 let sample_rate_u128 = u128::from(sample_rate);
527 let sample_count = samples as u128;
528 let nanos = (sample_count * 1_000_000_000u128 + (sample_rate_u128 / 2)) / sample_rate_u128;
529 Ok(AudioDuration::from_nanos(nanos as u64))
530}
531
532fn time_to_sample_index(
533 time: AudioTimestamp,
534 stream_start: AudioTimestamp,
535 sample_rate: u32,
536) -> Result<usize> {
537 validate_in_range(sample_rate, 1_u32, u32::MAX, "sample_rate")?;
538
539 let duration = time
540 .duration_since(stream_start)
541 .ok_or_else(|| Error::TemporalOperation("time precedes stream start".into()))?;
542 let samples = (duration.as_secs_f64() * f64::from(sample_rate)).round() as usize;
543 Ok(samples)
544}
545
546fn normalize_vad_segments(
547 segments: &[SpeechChunk],
548 stream_start: AudioTimestamp,
549 slice_start_time: AudioTimestamp,
550 slice_start_sample: usize,
551 buffer_end_sample: usize,
552 sample_rate: u32,
553) -> Result<Vec<SpeechChunk>> {
554 validate_in_range(sample_rate, 1_u32, u32::MAX, "sample_rate")?;
555
556 let mut normalized = Vec::with_capacity(segments.len());
557
558 for segment in segments {
559 let start_sample_abs = time_to_sample_index(segment.start_time, stream_start, sample_rate)?;
560 let end_sample_abs = time_to_sample_index(segment.end_time, stream_start, sample_rate)?;
561
562 if end_sample_abs <= slice_start_sample {
563 continue;
565 }
566
567 let clamped_start_abs = start_sample_abs.max(slice_start_sample);
568 let clamped_end_abs = end_sample_abs.min(buffer_end_sample);
569
570 if clamped_end_abs <= clamped_start_abs {
571 continue;
572 }
573
574 let rel_start_samples = clamped_start_abs - slice_start_sample;
575 let rel_end_samples = clamped_end_abs - slice_start_sample;
576
577 let start_time =
578 slice_start_time.add_duration(samples_to_duration(rel_start_samples, sample_rate)?);
579 let end_time =
580 slice_start_time.add_duration(samples_to_duration(rel_end_samples, sample_rate)?);
581
582 let mut adjusted = *segment;
583 adjusted.start_time = start_time;
584 adjusted.end_time = end_time;
585 normalized.push(adjusted);
586 }
587
588 Ok(normalized)
589}
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594 use crate::fixtures::AudioFixtures;
595
596 fn samples_to_wav_bytes(samples: &[f32], sample_rate: u32) -> Vec<u8> {
599 let mut wav_data = Vec::new();
600
601 let num_samples = samples.len() as u32;
603 let num_channels = 1u16;
604 let bits_per_sample = 16u16;
605 let byte_rate = sample_rate * u32::from(num_channels) * u32::from(bits_per_sample) / 8;
606 let block_align = num_channels * bits_per_sample / 8;
607 let data_size = num_samples * u32::from(block_align);
608
609 wav_data.extend_from_slice(b"RIFF");
611 wav_data.extend_from_slice(&(36 + data_size).to_le_bytes());
612 wav_data.extend_from_slice(b"WAVE");
613
614 wav_data.extend_from_slice(b"fmt ");
616 wav_data.extend_from_slice(&16u32.to_le_bytes()); wav_data.extend_from_slice(&1u16.to_le_bytes()); wav_data.extend_from_slice(&num_channels.to_le_bytes());
619 wav_data.extend_from_slice(&sample_rate.to_le_bytes());
620 wav_data.extend_from_slice(&byte_rate.to_le_bytes());
621 wav_data.extend_from_slice(&block_align.to_le_bytes());
622 wav_data.extend_from_slice(&bits_per_sample.to_le_bytes());
623
624 wav_data.extend_from_slice(b"data");
626 wav_data.extend_from_slice(&data_size.to_le_bytes());
627
628 for &sample in samples {
630 let i16_sample = (sample.clamp(-1.0, 1.0) * 32767.0) as i16;
631 wav_data.extend_from_slice(&i16_sample.to_le_bytes());
632 }
633
634 wav_data
635 }
636
637 #[test]
639 fn test_coordinator_creation_with_defaults() {
640 let coordinator = AudioPipelineCoordinator::new_with_defaults();
641 assert!(
642 coordinator.is_ok(),
643 "Failed to create coordinator with defaults"
644 );
645 }
646
647 #[test]
649 fn test_process_frame_with_real_audio() {
650 let coordinator =
651 AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
652
653 let fixtures = AudioFixtures::new();
655 let audio_sample = fixtures
656 .load_sample("french_short")
657 .expect("Failed to load test audio");
658 let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
659
660 let result = coordinator.process_frame(&test_audio);
662 assert!(
663 result.is_ok(),
664 "Failed to process audio frame: {:?}",
665 result.err()
666 );
667
668 let processing_result = result.unwrap();
669
670 assert!(
672 processing_result.chunks_processed > 0,
673 "No chunks were generated from audio"
674 );
675
676 assert!(
678 processing_result.total_latency < Duration::from_millis(100),
679 "Processing took too long: {:?}",
680 processing_result.total_latency
681 );
682 }
683
684 #[test]
686 fn test_stage_latencies_tracked() {
687 let coordinator =
688 AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
689
690 let fixtures = AudioFixtures::new();
691 let audio_sample = fixtures
692 .load_sample("french_short")
693 .expect("Failed to load test audio");
694 let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
695
696 let result = coordinator
697 .process_frame(&test_audio)
698 .expect("Failed to process audio");
699
700 let latencies = result.stage_latencies;
701
702 assert!(
704 latencies.format_conversion > Duration::ZERO,
705 "Format conversion latency not tracked"
706 );
707 assert!(
708 latencies.vad_detection > Duration::ZERO,
709 "VAD detection latency not tracked"
710 );
711 assert!(
712 latencies.chunking > Duration::ZERO,
713 "Chunking latency not tracked"
714 );
715
716 let _ = latencies.preprocessing_avg;
717 assert_eq!(
718 latencies.broadcasting_avg,
719 Duration::ZERO,
720 "Standalone pipeline should not report broadcast latency"
721 );
722 }
723
724 #[test]
726 fn test_latency_performance_contract() {
727 let coordinator =
728 AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
729
730 let fixtures = AudioFixtures::new();
731 let audio_sample = fixtures
732 .load_sample("french_short")
733 .expect("Failed to load test audio");
734 let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
735
736 coordinator
737 .process_frame(&test_audio)
738 .expect("Failed to process warm-up audio");
739
740 let mut latencies = Vec::new();
741 for _ in 0..5 {
742 let result = coordinator
743 .process_frame(&test_audio)
744 .expect("Failed to process audio");
745 latencies.push(result.total_latency);
746 }
747
748 latencies.sort();
749 let p95_index = (latencies.len() as f64 * 0.95).ceil() as usize - 1;
750 let p95_latency = latencies[p95_index];
751
752 assert!(
753 p95_latency < Duration::from_millis(150),
754 "P95 latency exceeds 150ms (CI-tolerant): {:?}",
755 p95_latency
756 );
757 }
758
759 #[test]
761 fn test_backpressure_detection() {
762 let coordinator =
763 AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
764
765 let fixtures = AudioFixtures::new();
766 let audio_sample = fixtures
767 .load_sample("french_short")
768 .expect("Failed to load test audio");
769 let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
770
771 let result = coordinator
772 .process_frame(&test_audio)
773 .expect("Failed to process audio");
774
775 assert!(
776 !result.backpressure_active,
777 "Standalone coordinator should not report backpressure"
778 );
779
780 for _ in 0..3 {
781 let result = coordinator
782 .process_frame(&test_audio)
783 .expect("Failed to process audio");
784 assert!(
785 !result.backpressure_active,
786 "Standalone coordinator should not report backpressure"
787 );
788 }
789 }
790
791 #[test]
793 fn test_process_empty_audio() {
794 let coordinator =
795 AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
796
797 let empty_audio = &[];
798 let result = coordinator.process_frame(empty_audio);
799
800 assert!(result.is_err(), "Empty audio should return error");
801 }
802}