1use std::sync::atomic::{AtomicUsize, Ordering};
50use std::sync::Arc;
52use std::time::Duration;
53
54use parking_lot::Mutex;
55
56use crate::chunker::{Chunker, ProcessedChunk};
57use crate::converter::{AudioFormatConverter, ConversionMetadata, StandardAudio};
58use crate::error::{Error, Result};
59use crate::format::AudioFormat;
60use crate::preprocessing::{DcHighPassFilter, NoiseReducer, PreprocessingConfig, VadContext};
61use crate::time::{validate_in_range, AudioDuration, AudioInstant, AudioTimestamp};
62use crate::vad::{SpeechChunk, VadDetector};
63
64#[derive(Debug, Clone, Copy)]
65pub struct ProcessingResult {
66 pub chunks_processed: usize,
68
69 pub total_latency: Duration,
71
72 pub stage_latencies: StageLatencies,
74
75 pub backpressure_active: bool,
77}
78
79#[derive(Debug, Clone, Copy, Default)]
81pub struct StageLatencies {
82 pub format_conversion: Duration,
84
85 pub vad_detection: Duration,
87
88 pub chunking: Duration,
90
91 pub preprocessing_avg: Duration,
93
94 pub broadcasting_avg: Duration,
96}
97
98#[derive(Debug)]
108pub struct AudioPipelineCoordinator {
109 vad_detector: Arc<VadDetector>,
110 chunker: Chunker,
111 dc_filter: Mutex<DcHighPassFilter>,
112 noise_reducer: Mutex<NoiseReducer>,
113 stream_buffer: Mutex<StreamBuffer>,
114 processed_cursor: AtomicUsize,
115}
116
117#[derive(Debug, Default)]
118struct StreamBuffer {
119 base_sample_index: usize,
120 samples: Vec<f32>,
121}
122
123impl StreamBuffer {
124 fn append(&mut self, new_samples: &[f32]) {
125 self.samples.extend_from_slice(new_samples);
126 }
127
128 fn as_slice(&self) -> &[f32] {
129 &self.samples
130 }
131
132 fn base_sample_index(&self) -> usize {
133 self.base_sample_index
134 }
135
136 fn len(&self) -> usize {
137 self.samples.len()
138 }
139
140 fn start_time(&self, stream_start: AudioTimestamp, sample_rate: u32) -> Result<AudioTimestamp> {
141 let offset = samples_to_duration(self.base_sample_index, sample_rate)?;
142 Ok(stream_start.add_duration(offset))
143 }
144
145 fn drop_through(&mut self, sample_index: usize) {
146 if sample_index <= self.base_sample_index {
147 return;
148 }
149
150 let drop_count = sample_index
151 .saturating_sub(self.base_sample_index)
152 .min(self.samples.len());
153
154 if drop_count == 0 {
155 return;
156 }
157
158 if drop_count >= self.samples.len() {
159 self.samples.clear();
160 self.base_sample_index = sample_index;
161 } else {
162 self.samples.drain(..drop_count);
163 self.base_sample_index += drop_count;
164 }
165 }
166}
167
168impl AudioPipelineCoordinator {
169 pub fn new(
179 vad_detector: Arc<VadDetector>,
180 chunker: Chunker,
181 dc_filter: DcHighPassFilter,
182 noise_reducer: NoiseReducer,
183 ) -> Self {
184 Self {
185 vad_detector,
186 chunker,
187 dc_filter: Mutex::new(dc_filter),
188 noise_reducer: Mutex::new(noise_reducer),
189 stream_buffer: Mutex::new(StreamBuffer::default()),
190 processed_cursor: AtomicUsize::new(0),
191 }
192 }
193
194 pub fn new_with_defaults() -> Result<Self> {
213 use crate::{NoopVadMetricsCollector, VadConfig, VadMetricsCollector};
214
215 let metrics: Arc<dyn VadMetricsCollector> = Arc::new(NoopVadMetricsCollector);
216
217 let vad_config = VadConfig::default();
218 let vad_detector = Arc::new(VadDetector::new(vad_config, metrics)?);
219
220 let chunker = Chunker::default();
221
222 let dc_config = PreprocessingConfig::default();
223 let dc_filter = DcHighPassFilter::new(dc_config)?;
224
225 let noise_config = crate::preprocessing::NoiseReductionConfig::default();
226 let noise_reducer = crate::preprocessing::NoiseReducer::new(noise_config)?;
227
228 Ok(Self::new(vad_detector, chunker, dc_filter, noise_reducer))
229 }
230
231 pub fn process_frame(&self, audio_bytes: &[u8]) -> Result<ProcessingResult> {
269 let pipeline_start = AudioInstant::now();
270 let mut latencies = StageLatencies::default();
271
272 let format_start = AudioInstant::now();
273 let standard_audio = AudioFormatConverter::convert_to_standard(audio_bytes)?;
274 latencies.format_conversion = format_start.elapsed();
275
276 self.process_standard_audio(&standard_audio, pipeline_start, latencies)
277 }
278
279 pub fn flush(&self) -> Result<ProcessingResult> {
285 let pipeline_start = AudioInstant::now();
286 let latencies = StageLatencies::default();
287
288 let config = *self.vad_detector.config();
289 let frame_len = config.frame_length_samples()?;
290 let frames_to_flush = (config.hangover_frames.max(1)) + 1;
291 let silence_samples = vec![0.0f32; frame_len * frames_to_flush];
292
293 let metadata = ConversionMetadata {
294 original_format: AudioFormat::WavPcm,
295 original_sample_rate: config.sample_rate,
296 original_channels: 1,
297 original_bit_depth: Some(16),
298 peak_before: 0.0,
299 peak_after: 0.0,
300 conversion_time_ms: 0.0,
301 detection_time_ms: 0.0,
302 decode_time_ms: 0.0,
303 resample_time_ms: 0.0,
304 mix_time_ms: 0.0,
305 };
306
307 let standard_audio = StandardAudio {
308 samples: silence_samples,
309 metadata,
310 };
311 self.process_standard_audio(&standard_audio, pipeline_start, latencies)
312 }
313
314 fn process_standard_audio(
315 &self,
316 standard_audio: &StandardAudio,
317 pipeline_start: AudioInstant,
318 mut latencies: StageLatencies,
319 ) -> Result<ProcessingResult> {
320 let vad_start = AudioInstant::now();
321 let vad_segments = self.vad_detector.detect(&standard_audio.samples)?;
322 latencies.vad_detection = vad_start.elapsed();
323
324 let sample_rate = self.vad_detector.config().sample_rate;
325 let stream_start_time = self.vad_detector.config().stream_start_time;
326
327 let (chunks, chunk_duration) = {
328 let mut buffer = self.stream_buffer.lock();
329 buffer.append(&standard_audio.samples);
330
331 if buffer.as_slice().is_empty() {
332 Ok::<(Vec<ProcessedChunk>, Duration), Error>((Vec::new(), Duration::default()))
333 } else {
334 let buffer_base = buffer.base_sample_index();
335 let buffer_len = buffer.len();
336 let buffer_end_abs = buffer_base + buffer_len;
337 let processed_before = self.processed_cursor.load(Ordering::Acquire);
338 let slice_start_abs = processed_before.max(buffer_base);
339
340 if slice_start_abs >= buffer_base + buffer_len {
341 let lookback_samples = (sample_rate as usize) / 5;
342 let drop_target = slice_start_abs.saturating_sub(lookback_samples);
343 buffer.drop_through(drop_target);
344 drop(buffer);
345 return Ok(ProcessingResult {
346 chunks_processed: 0,
347 total_latency: pipeline_start.elapsed(),
348 stage_latencies: latencies,
349 backpressure_active: false,
350 });
351 }
352
353 let base_time = buffer.start_time(stream_start_time, sample_rate)?;
354 let offset_samples = slice_start_abs.saturating_sub(buffer_base);
355 let offset_duration = samples_to_duration(offset_samples, sample_rate)?;
356 let audio_start = base_time.add_duration(offset_duration);
357
358 let audio_slice = buffer
359 .as_slice()
360 .get(offset_samples..)
361 .ok_or_else(|| Error::InvalidInput("invalid buffer window".into()))?;
362
363 let normalized_segments = normalize_vad_segments(
364 &vad_segments,
365 stream_start_time,
366 audio_start,
367 slice_start_abs,
368 buffer_end_abs,
369 sample_rate,
370 )?;
371
372 let chunk_start = AudioInstant::now();
373 let chunks = self.chunker.chunk_with_stream_start(
374 audio_slice,
375 sample_rate,
376 &normalized_segments,
377 audio_start,
378 )?;
379 let elapsed = chunk_start.elapsed();
380
381 let mut max_processed_sample = processed_before;
382 for chunk in &chunks {
383 let end_sample =
384 time_to_sample_index(chunk.end_time, stream_start_time, sample_rate)?;
385 if end_sample > max_processed_sample {
386 max_processed_sample = end_sample;
387 }
388 }
389 self.processed_cursor
390 .store(max_processed_sample, Ordering::Release);
391
392 let lookback_samples = (sample_rate as usize) / 5; let drop_target = max_processed_sample.saturating_sub(lookback_samples);
394 buffer.drop_through(drop_target);
395 drop(buffer);
396
397 Ok::<(Vec<ProcessedChunk>, Duration), Error>((chunks, elapsed))
398 }
399 }?;
400 latencies.chunking = chunk_duration;
401
402 let mut total_preprocess = Duration::default();
403 let mut total_broadcast = Duration::default();
404
405 let mut prev_overlap_next: Option<Vec<f32>> = None;
406
407 for chunk in &chunks {
408 let preprocess_start = AudioInstant::now();
409 let mut preprocessed = self.preprocess_chunk(chunk)?;
410
411 if let Some(prev_overlap) = prev_overlap_next.take() {
412 preprocessed.overlap_prev = Some(prev_overlap);
413 } else {
414 preprocessed.overlap_prev = None;
415 }
416
417 prev_overlap_next.clone_from(&preprocessed.overlap_next);
418
419 total_preprocess += preprocess_start.elapsed();
420
421 let broadcast_start = AudioInstant::now();
422 total_broadcast += broadcast_start.elapsed();
423 }
424
425 let chunk_count = chunks.len().max(1);
426 latencies.preprocessing_avg = total_preprocess / chunk_count as u32;
427 latencies.broadcasting_avg = total_broadcast / chunk_count as u32;
428
429 let total_latency = pipeline_start.elapsed();
430
431 if total_latency > Duration::from_millis(60) {
432 tracing::warn!(
433 latency_ms = total_latency.as_millis(),
434 "Audio processing exceeded 60ms target"
435 );
436 }
437
438 let backpressure_active = false;
439
440 Ok(ProcessingResult {
441 chunks_processed: chunks.len(),
442 total_latency,
443 stage_latencies: latencies,
444 backpressure_active,
445 })
446 }
447
448 fn preprocess_chunk(&self, chunk: &ProcessedChunk) -> Result<ProcessedChunk> {
449 let vad_ctx = VadContext {
450 is_silence: chunk.is_silence(),
451 };
452 let dc_clean = {
453 let mut filter = self.dc_filter.lock();
454 filter.process(&chunk.samples, Some(&vad_ctx))?
455 };
456
457 let denoised = {
458 let mut reducer = self.noise_reducer.lock();
459 reducer.reduce(&dc_clean, Some(vad_ctx))?
460 };
461
462 let (energy, has_clipping) = Self::compute_energy_and_clipping(&denoised);
463 let snr_db = Self::recalculate_snr(chunk.snr_db, chunk.energy, energy);
464
465 let overlap_next = chunk.overlap_next.as_ref().and_then(|existing| {
466 let retain = existing.len().min(denoised.len());
467 denoised.get(denoised.len() - retain..).map(<[f32]>::to_vec)
468 });
469
470 let mut processed = chunk.clone();
471 processed.samples = denoised;
472 processed.energy = energy;
473 processed.snr_db = snr_db;
474 processed.has_clipping = has_clipping;
475 processed.overlap_next = overlap_next;
476 processed.overlap_prev = None;
477
478 Ok(processed)
479 }
480
481 fn compute_energy_and_clipping(samples: &[f32]) -> (f32, bool) {
482 const CLIPPING_THRESHOLD: f32 = 0.999;
483
484 if samples.is_empty() {
485 return (0.0, false);
486 }
487
488 let mut sum_squares = 0.0f32;
489 let mut has_clipping = false;
490 for &sample in samples {
491 let abs = sample.abs();
492 if abs >= CLIPPING_THRESHOLD {
493 has_clipping = true;
494 }
495 sum_squares = sample.mul_add(sample, sum_squares);
496 }
497 let mean_square = sum_squares / samples.len() as f32;
498 (mean_square.sqrt(), has_clipping)
499 }
500
501 fn recalculate_snr(
502 previous_snr: Option<f32>,
503 previous_energy: f32,
504 new_energy: f32,
505 ) -> Option<f32> {
506 const EPSILON: f32 = 1e-10;
507 let snr_db = previous_snr?;
508
509 if previous_energy <= EPSILON {
510 return Some(snr_db);
511 }
512
513 let noise_rms = previous_energy / 10_f32.powf(snr_db / 20.0);
514 if noise_rms <= EPSILON || new_energy <= EPSILON {
515 return Some(snr_db);
516 }
517
518 let ratio = new_energy / noise_rms;
519 if ratio <= EPSILON {
520 return Some(snr_db);
521 }
522
523 Some(20.0 * ratio.log10())
524 }
525}
526
527fn samples_to_duration(samples: usize, sample_rate: u32) -> Result<AudioDuration> {
528 validate_in_range(sample_rate, 1_u32, u32::MAX, "sample_rate")?;
529
530 let sample_rate_u128 = u128::from(sample_rate);
531 let sample_count = samples as u128;
532 let nanos = (sample_count * 1_000_000_000u128 + (sample_rate_u128 / 2)) / sample_rate_u128;
533 Ok(AudioDuration::from_nanos(nanos as u64))
534}
535
536fn time_to_sample_index(
537 time: AudioTimestamp,
538 stream_start: AudioTimestamp,
539 sample_rate: u32,
540) -> Result<usize> {
541 validate_in_range(sample_rate, 1_u32, u32::MAX, "sample_rate")?;
542
543 let duration = time
544 .duration_since(stream_start)
545 .ok_or_else(|| Error::TemporalOperation("time precedes stream start".into()))?;
546 let samples = (duration.as_secs_f64() * f64::from(sample_rate)).round() as usize;
547 Ok(samples)
548}
549
550fn normalize_vad_segments(
551 segments: &[SpeechChunk],
552 stream_start: AudioTimestamp,
553 slice_start_time: AudioTimestamp,
554 slice_start_sample: usize,
555 buffer_end_sample: usize,
556 sample_rate: u32,
557) -> Result<Vec<SpeechChunk>> {
558 validate_in_range(sample_rate, 1_u32, u32::MAX, "sample_rate")?;
559
560 let mut normalized = Vec::with_capacity(segments.len());
561
562 for segment in segments {
563 let start_sample_abs = time_to_sample_index(segment.start_time, stream_start, sample_rate)?;
564 let end_sample_abs = time_to_sample_index(segment.end_time, stream_start, sample_rate)?;
565
566 if end_sample_abs <= slice_start_sample {
567 continue;
569 }
570
571 let clamped_start_abs = start_sample_abs.max(slice_start_sample);
572 let clamped_end_abs = end_sample_abs.min(buffer_end_sample);
573
574 if clamped_end_abs <= clamped_start_abs {
575 continue;
576 }
577
578 let rel_start_samples = clamped_start_abs - slice_start_sample;
579 let rel_end_samples = clamped_end_abs - slice_start_sample;
580
581 let start_time =
582 slice_start_time.add_duration(samples_to_duration(rel_start_samples, sample_rate)?);
583 let end_time =
584 slice_start_time.add_duration(samples_to_duration(rel_end_samples, sample_rate)?);
585
586 let mut adjusted = *segment;
587 adjusted.start_time = start_time;
588 adjusted.end_time = end_time;
589 normalized.push(adjusted);
590 }
591
592 Ok(normalized)
593}
594
595#[cfg(test)]
596mod tests {
597 use super::*;
598 use crate::fixtures::AudioFixtures;
599
600 fn samples_to_wav_bytes(samples: &[f32], sample_rate: u32) -> Vec<u8> {
603 let mut wav_data = Vec::new();
604
605 let num_samples = samples.len() as u32;
607 let num_channels = 1u16;
608 let bits_per_sample = 16u16;
609 let byte_rate = sample_rate * u32::from(num_channels) * u32::from(bits_per_sample) / 8;
610 let block_align = num_channels * bits_per_sample / 8;
611 let data_size = num_samples * u32::from(block_align);
612
613 wav_data.extend_from_slice(b"RIFF");
615 wav_data.extend_from_slice(&(36 + data_size).to_le_bytes());
616 wav_data.extend_from_slice(b"WAVE");
617
618 wav_data.extend_from_slice(b"fmt ");
620 wav_data.extend_from_slice(&16u32.to_le_bytes()); wav_data.extend_from_slice(&1u16.to_le_bytes()); wav_data.extend_from_slice(&num_channels.to_le_bytes());
623 wav_data.extend_from_slice(&sample_rate.to_le_bytes());
624 wav_data.extend_from_slice(&byte_rate.to_le_bytes());
625 wav_data.extend_from_slice(&block_align.to_le_bytes());
626 wav_data.extend_from_slice(&bits_per_sample.to_le_bytes());
627
628 wav_data.extend_from_slice(b"data");
630 wav_data.extend_from_slice(&data_size.to_le_bytes());
631
632 for &sample in samples {
634 let i16_sample = (sample.clamp(-1.0, 1.0) * 32767.0) as i16;
635 wav_data.extend_from_slice(&i16_sample.to_le_bytes());
636 }
637
638 wav_data
639 }
640
641 #[test]
643 fn test_coordinator_creation_with_defaults() {
644 let coordinator = AudioPipelineCoordinator::new_with_defaults();
645 assert!(
646 coordinator.is_ok(),
647 "Failed to create coordinator with defaults"
648 );
649 }
650
651 #[test]
653 fn test_process_frame_with_real_audio() {
654 let coordinator =
655 AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
656
657 let fixtures = AudioFixtures::new();
659 let audio_sample = fixtures
660 .load_sample("french_short")
661 .expect("Failed to load test audio");
662 let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
663
664 let result = coordinator.process_frame(&test_audio);
666 assert!(
667 result.is_ok(),
668 "Failed to process audio frame: {:?}",
669 result.err()
670 );
671
672 let processing_result = result.unwrap();
673
674 assert!(
676 processing_result.chunks_processed > 0,
677 "No chunks were generated from audio"
678 );
679
680 assert!(
682 processing_result.total_latency < Duration::from_millis(100),
683 "Processing took too long: {:?}",
684 processing_result.total_latency
685 );
686 }
687
688 #[test]
690 fn test_stage_latencies_tracked() {
691 let coordinator =
692 AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
693
694 let fixtures = AudioFixtures::new();
695 let audio_sample = fixtures
696 .load_sample("french_short")
697 .expect("Failed to load test audio");
698 let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
699
700 let result = coordinator
701 .process_frame(&test_audio)
702 .expect("Failed to process audio");
703
704 let latencies = result.stage_latencies;
705
706 assert!(
708 latencies.format_conversion > Duration::ZERO,
709 "Format conversion latency not tracked"
710 );
711 assert!(
712 latencies.vad_detection > Duration::ZERO,
713 "VAD detection latency not tracked"
714 );
715 assert!(
716 latencies.chunking > Duration::ZERO,
717 "Chunking latency not tracked"
718 );
719
720 let _ = latencies.preprocessing_avg;
722 let _ = latencies.broadcasting_avg;
723 }
724
725 #[test]
727 fn test_latency_performance_contract() {
728 let coordinator =
729 AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
730
731 let fixtures = AudioFixtures::new();
732 let audio_sample = fixtures
733 .load_sample("french_short")
734 .expect("Failed to load test audio");
735 let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
736
737 coordinator
738 .process_frame(&test_audio)
739 .expect("Failed to process warm-up audio");
740
741 let mut latencies = Vec::new();
742 for _ in 0..5 {
743 let result = coordinator
744 .process_frame(&test_audio)
745 .expect("Failed to process audio");
746 latencies.push(result.total_latency);
747 }
748
749 latencies.sort();
750 let p95_index = (latencies.len() as f64 * 0.95).ceil() as usize - 1;
751 let p95_latency = latencies[p95_index];
752
753 assert!(
754 p95_latency < Duration::from_millis(150),
755 "P95 latency exceeds 150ms (CI-tolerant): {:?}",
756 p95_latency
757 );
758 }
759
760 #[test]
762 fn test_backpressure_detection() {
763 let coordinator =
764 AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
765
766 assert!(true, "Coordinator should not throttle initially");
767
768 let fixtures = AudioFixtures::new();
769 let audio_sample = fixtures
770 .load_sample("french_short")
771 .expect("Failed to load test audio");
772 let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
773
774 let result = coordinator
775 .process_frame(&test_audio)
776 .expect("Failed to process audio");
777
778 let _ = result.backpressure_active;
779
780 for _ in 0..3 {
781 coordinator
782 .process_frame(&test_audio)
783 .expect("Failed to process audio");
784 }
785 assert!(
786 true,
787 "Coordinator should not throttle without registered consumers"
788 );
789 }
790
791 #[test]
793 fn test_process_empty_audio() {
794 let coordinator =
795 AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
796
797 let empty_audio = &[];
798 let result = coordinator.process_frame(empty_audio);
799
800 assert!(result.is_err(), "Empty audio should return error");
801 }
802}