Skip to main content

speech_prep/
pipeline.rs

1//! Audio pipeline coordinator for orchestrating end-to-end processing.
2//!
3//! This module implements the Streaming Pipeline Coordinator, which integrates
4//! VAD, Format Conversion, Chunking, and Preprocessing into a unified
5//! synchronous pipeline with multi-consumer broadcasting.
6//!
7//! # Architecture
8//!
9//! The coordinator follows a simple synchronous design optimized for CPU-bound
10//! audio processing:
11//!
12//! ```text
13//! Raw Audio Bytes
14//!     ↓
15//! Format Conversion → StandardAudio (16kHz mono PCM)
16//!     ↓
17//! VAD Detection → Speech Segments
18//!     ↓
19//! Chunking → ProcessedChunks (500ms aligned)
20//!     ↓
21//! Preprocessing → Clean Audio
22//!     ↓
23//! Processed Output
24//! ```
25//!
26//! # Performance Contract
27//!
28//! - **Audio processing latency**: <60ms P95 (all 4 stages)
29//! - **Per-stage tracking**: Individual latency metrics exported
30//! - **Backpressure awareness**: Query broadcaster for throttling
31//!
32//! # Example
33//!
34//! ```rust,no_run
35//! use speech_prep::pipeline::AudioPipelineCoordinator;
36//!
37//! # fn main() -> speech_prep::error::Result<()> {
38//! let coordinator = AudioPipelineCoordinator::new_with_defaults()?;
39//!
40//! let audio_bytes = std::fs::read("sample.wav")?;
41//! let result = coordinator.process_frame(&audio_bytes)?;
42//!
43//! println!("Processing latency: {:?}", result.total_latency);
44//! println!("Chunks processed: {}", result.chunks_processed);
45//! # Ok(())
46//! # }
47//! ```
48
49use std::sync::atomic::{AtomicUsize, Ordering};
50/// Result of processing audio through the complete pipeline.
51use std::sync::Arc;
52use std::time::Duration;
53
54use parking_lot::Mutex;
55
56use crate::chunker::{Chunker, ProcessedChunk};
57use crate::converter::{AudioFormatConverter, ConversionMetadata, StandardAudio};
58use crate::error::{Error, Result};
59use crate::format::AudioFormat;
60use crate::preprocessing::{DcHighPassFilter, NoiseReducer, PreprocessingConfig, VadContext};
61use crate::time::{validate_in_range, AudioDuration, AudioInstant, AudioTimestamp};
62use crate::vad::{SpeechChunk, VadDetector};
63
64#[derive(Debug, Clone, Copy)]
65pub struct ProcessingResult {
66    /// Total number of chunks generated and broadcast.
67    pub chunks_processed: usize,
68
69    /// Total latency for complete pipeline execution.
70    pub total_latency: Duration,
71
72    /// Per-stage latency breakdown.
73    pub stage_latencies: StageLatencies,
74
75    /// Whether backpressure is currently active.
76    pub backpressure_active: bool,
77}
78
79/// Latency measurements for each pipeline stage.
80#[derive(Debug, Clone, Copy, Default)]
81pub struct StageLatencies {
82    /// Format conversion latency.
83    pub format_conversion: Duration,
84
85    /// VAD detection latency.
86    pub vad_detection: Duration,
87
88    /// Chunking latency.
89    pub chunking: Duration,
90
91    /// Preprocessing latency (per chunk average).
92    pub preprocessing_avg: Duration,
93
94    /// Broadcasting latency (per chunk average).
95    pub broadcasting_avg: Duration,
96}
97
98/// Audio pipeline coordinator.
99///
100/// Orchestrates format conversion, VAD detection, chunking, preprocessing, and
101/// multi-consumer broadcasting in a synchronous pipeline optimized for
102/// CPU-bound audio processing.
103///
104/// **Thread Safety**: Preprocessing filters wrapped in `Mutex` for interior
105/// mutability, allowing concurrent frame processing from multiple WebSocket
106/// connections.
107#[derive(Debug)]
108pub struct AudioPipelineCoordinator {
109    vad_detector: Arc<VadDetector>,
110    chunker: Chunker,
111    dc_filter: Mutex<DcHighPassFilter>,
112    noise_reducer: Mutex<NoiseReducer>,
113    stream_buffer: Mutex<StreamBuffer>,
114    processed_cursor: AtomicUsize,
115}
116
117#[derive(Debug, Default)]
118struct StreamBuffer {
119    base_sample_index: usize,
120    samples: Vec<f32>,
121}
122
123impl StreamBuffer {
124    fn append(&mut self, new_samples: &[f32]) {
125        self.samples.extend_from_slice(new_samples);
126    }
127
128    fn as_slice(&self) -> &[f32] {
129        &self.samples
130    }
131
132    fn base_sample_index(&self) -> usize {
133        self.base_sample_index
134    }
135
136    fn len(&self) -> usize {
137        self.samples.len()
138    }
139
140    fn start_time(&self, stream_start: AudioTimestamp, sample_rate: u32) -> Result<AudioTimestamp> {
141        let offset = samples_to_duration(self.base_sample_index, sample_rate)?;
142        Ok(stream_start.add_duration(offset))
143    }
144
145    fn drop_through(&mut self, sample_index: usize) {
146        if sample_index <= self.base_sample_index {
147            return;
148        }
149
150        let drop_count = sample_index
151            .saturating_sub(self.base_sample_index)
152            .min(self.samples.len());
153
154        if drop_count == 0 {
155            return;
156        }
157
158        if drop_count >= self.samples.len() {
159            self.samples.clear();
160            self.base_sample_index = sample_index;
161        } else {
162            self.samples.drain(..drop_count);
163            self.base_sample_index += drop_count;
164        }
165    }
166}
167
168impl AudioPipelineCoordinator {
169    /// Create a new coordinator with provided components.
170    ///
171    /// # Arguments
172    ///
173    /// * `vad_detector` - Voice activity detector
174    /// * `chunker` - Audio chunker
175    /// * `dc_filter` - DC offset removal and high-pass filter
176    /// * `noise_reducer` - Spectral noise reduction
177    /// * `broadcaster` - Multi-consumer broadcaster
178    pub fn new(
179        vad_detector: Arc<VadDetector>,
180        chunker: Chunker,
181        dc_filter: DcHighPassFilter,
182        noise_reducer: NoiseReducer,
183    ) -> Self {
184        Self {
185            vad_detector,
186            chunker,
187            dc_filter: Mutex::new(dc_filter),
188            noise_reducer: Mutex::new(noise_reducer),
189            stream_buffer: Mutex::new(StreamBuffer::default()),
190            processed_cursor: AtomicUsize::new(0),
191        }
192    }
193
194    /// Create coordinator with default configuration.
195    ///
196    /// Suitable for testing and standard audio processing scenarios.
197    ///
198    /// # Errors
199    ///
200    /// Returns error if component initialization fails.
201    ///
202    /// # Example
203    ///
204    /// ```rust,no_run
205    /// use speech_prep::pipeline::AudioPipelineCoordinator;
206    ///
207    /// # fn main() -> speech_prep::error::Result<()> {
208    /// let coordinator = AudioPipelineCoordinator::new_with_defaults()?;
209    /// # Ok(())
210    /// # }
211    /// ```
212    pub fn new_with_defaults() -> Result<Self> {
213        use crate::{NoopVadMetricsCollector, VadConfig, VadMetricsCollector};
214
215        let metrics: Arc<dyn VadMetricsCollector> = Arc::new(NoopVadMetricsCollector);
216
217        let vad_config = VadConfig::default();
218        let vad_detector = Arc::new(VadDetector::new(vad_config, metrics)?);
219
220        let chunker = Chunker::default();
221
222        let dc_config = PreprocessingConfig::default();
223        let dc_filter = DcHighPassFilter::new(dc_config)?;
224
225        let noise_config = crate::preprocessing::NoiseReductionConfig::default();
226        let noise_reducer = crate::preprocessing::NoiseReducer::new(noise_config)?;
227
228        Ok(Self::new(vad_detector, chunker, dc_filter, noise_reducer))
229    }
230
231    /// Process raw audio bytes through complete pipeline.
232    ///
233    /// # Performance Contract
234    ///
235    /// Total audio processing latency must be <60ms P95 for standard inputs
236    /// (500ms audio chunks at 16kHz).
237    ///
238    /// # Arguments
239    ///
240    /// * `audio_bytes` - Raw audio data (WAV, PCM, or other supported formats)
241    ///
242    /// # Returns
243    ///
244    /// Processing result with latency metrics and chunk count.
245    ///
246    /// # Errors
247    ///
248    /// Returns error if:
249    /// - Format detection/conversion fails
250    /// - VAD detection fails
251    /// - Chunking fails
252    /// - Preprocessing fails
253    /// - Broadcasting fails (backpressure or buffer full)
254    ///
255    /// # Example
256    ///
257    /// ```rust,no_run
258    /// # use speech_prep::pipeline::AudioPipelineCoordinator;
259    /// # fn main() -> speech_prep::error::Result<()> {
260    /// let coordinator = AudioPipelineCoordinator::new_with_defaults()?;
261    /// let audio = std::fs::read("test.wav")?;
262    ///
263    /// let result = coordinator.process_frame(&audio)?;
264    /// assert!(result.total_latency < std::time::Duration::from_millis(60));
265    /// # Ok(())
266    /// # }
267    /// ```
268    pub fn process_frame(&self, audio_bytes: &[u8]) -> Result<ProcessingResult> {
269        let pipeline_start = AudioInstant::now();
270        let mut latencies = StageLatencies::default();
271
272        let format_start = AudioInstant::now();
273        let standard_audio = AudioFormatConverter::convert_to_standard(audio_bytes)?;
274        latencies.format_conversion = format_start.elapsed();
275
276        self.process_standard_audio(&standard_audio, pipeline_start, latencies)
277    }
278
279    /// Flush pending audio by injecting trailing silence to finalize speech
280    /// segments.
281    ///
282    /// This should be invoked when a streaming session ends to ensure any
283    /// active speech region is emitted before scoring.
284    pub fn flush(&self) -> Result<ProcessingResult> {
285        let pipeline_start = AudioInstant::now();
286        let latencies = StageLatencies::default();
287
288        let config = *self.vad_detector.config();
289        let frame_len = config.frame_length_samples()?;
290        let frames_to_flush = (config.hangover_frames.max(1)) + 1;
291        let silence_samples = vec![0.0f32; frame_len * frames_to_flush];
292
293        let metadata = ConversionMetadata {
294            original_format: AudioFormat::WavPcm,
295            original_sample_rate: config.sample_rate,
296            original_channels: 1,
297            original_bit_depth: Some(16),
298            peak_before: 0.0,
299            peak_after: 0.0,
300            conversion_time_ms: 0.0,
301            detection_time_ms: 0.0,
302            decode_time_ms: 0.0,
303            resample_time_ms: 0.0,
304            mix_time_ms: 0.0,
305        };
306
307        let standard_audio = StandardAudio {
308            samples: silence_samples,
309            metadata,
310        };
311        self.process_standard_audio(&standard_audio, pipeline_start, latencies)
312    }
313
314    fn process_standard_audio(
315        &self,
316        standard_audio: &StandardAudio,
317        pipeline_start: AudioInstant,
318        mut latencies: StageLatencies,
319    ) -> Result<ProcessingResult> {
320        let vad_start = AudioInstant::now();
321        let vad_segments = self.vad_detector.detect(&standard_audio.samples)?;
322        latencies.vad_detection = vad_start.elapsed();
323
324        let sample_rate = self.vad_detector.config().sample_rate;
325        let stream_start_time = self.vad_detector.config().stream_start_time;
326
327        let (chunks, chunk_duration) = {
328            let mut buffer = self.stream_buffer.lock();
329            buffer.append(&standard_audio.samples);
330
331            if buffer.as_slice().is_empty() {
332                Ok::<(Vec<ProcessedChunk>, Duration), Error>((Vec::new(), Duration::default()))
333            } else {
334                let buffer_base = buffer.base_sample_index();
335                let buffer_len = buffer.len();
336                let buffer_end_abs = buffer_base + buffer_len;
337                let processed_before = self.processed_cursor.load(Ordering::Acquire);
338                let slice_start_abs = processed_before.max(buffer_base);
339
340                if slice_start_abs >= buffer_base + buffer_len {
341                    let lookback_samples = (sample_rate as usize) / 5;
342                    let drop_target = slice_start_abs.saturating_sub(lookback_samples);
343                    buffer.drop_through(drop_target);
344                    drop(buffer);
345                    return Ok(ProcessingResult {
346                        chunks_processed: 0,
347                        total_latency: pipeline_start.elapsed(),
348                        stage_latencies: latencies,
349                        backpressure_active: false,
350                    });
351                }
352
353                let base_time = buffer.start_time(stream_start_time, sample_rate)?;
354                let offset_samples = slice_start_abs.saturating_sub(buffer_base);
355                let offset_duration = samples_to_duration(offset_samples, sample_rate)?;
356                let audio_start = base_time.add_duration(offset_duration);
357
358                let audio_slice = buffer
359                    .as_slice()
360                    .get(offset_samples..)
361                    .ok_or_else(|| Error::InvalidInput("invalid buffer window".into()))?;
362
363                let normalized_segments = normalize_vad_segments(
364                    &vad_segments,
365                    stream_start_time,
366                    audio_start,
367                    slice_start_abs,
368                    buffer_end_abs,
369                    sample_rate,
370                )?;
371
372                let chunk_start = AudioInstant::now();
373                let chunks = self.chunker.chunk_with_stream_start(
374                    audio_slice,
375                    sample_rate,
376                    &normalized_segments,
377                    audio_start,
378                )?;
379                let elapsed = chunk_start.elapsed();
380
381                let mut max_processed_sample = processed_before;
382                for chunk in &chunks {
383                    let end_sample =
384                        time_to_sample_index(chunk.end_time, stream_start_time, sample_rate)?;
385                    if end_sample > max_processed_sample {
386                        max_processed_sample = end_sample;
387                    }
388                }
389                self.processed_cursor
390                    .store(max_processed_sample, Ordering::Release);
391
392                let lookback_samples = (sample_rate as usize) / 5; // Retain ~200ms history
393                let drop_target = max_processed_sample.saturating_sub(lookback_samples);
394                buffer.drop_through(drop_target);
395                drop(buffer);
396
397                Ok::<(Vec<ProcessedChunk>, Duration), Error>((chunks, elapsed))
398            }
399        }?;
400        latencies.chunking = chunk_duration;
401
402        let mut total_preprocess = Duration::default();
403        let mut total_broadcast = Duration::default();
404
405        let mut prev_overlap_next: Option<Vec<f32>> = None;
406
407        for chunk in &chunks {
408            let preprocess_start = AudioInstant::now();
409            let mut preprocessed = self.preprocess_chunk(chunk)?;
410
411            if let Some(prev_overlap) = prev_overlap_next.take() {
412                preprocessed.overlap_prev = Some(prev_overlap);
413            } else {
414                preprocessed.overlap_prev = None;
415            }
416
417            prev_overlap_next.clone_from(&preprocessed.overlap_next);
418
419            total_preprocess += preprocess_start.elapsed();
420
421            let broadcast_start = AudioInstant::now();
422            total_broadcast += broadcast_start.elapsed();
423        }
424
425        let chunk_count = chunks.len().max(1);
426        latencies.preprocessing_avg = total_preprocess / chunk_count as u32;
427        latencies.broadcasting_avg = total_broadcast / chunk_count as u32;
428
429        let total_latency = pipeline_start.elapsed();
430
431        if total_latency > Duration::from_millis(60) {
432            tracing::warn!(
433                latency_ms = total_latency.as_millis(),
434                "Audio processing exceeded 60ms target"
435            );
436        }
437
438        let backpressure_active = false;
439
440        Ok(ProcessingResult {
441            chunks_processed: chunks.len(),
442            total_latency,
443            stage_latencies: latencies,
444            backpressure_active,
445        })
446    }
447
448    fn preprocess_chunk(&self, chunk: &ProcessedChunk) -> Result<ProcessedChunk> {
449        let vad_ctx = VadContext {
450            is_silence: chunk.is_silence(),
451        };
452        let dc_clean = {
453            let mut filter = self.dc_filter.lock();
454            filter.process(&chunk.samples, Some(&vad_ctx))?
455        };
456
457        let denoised = {
458            let mut reducer = self.noise_reducer.lock();
459            reducer.reduce(&dc_clean, Some(vad_ctx))?
460        };
461
462        let (energy, has_clipping) = Self::compute_energy_and_clipping(&denoised);
463        let snr_db = Self::recalculate_snr(chunk.snr_db, chunk.energy, energy);
464
465        let overlap_next = chunk.overlap_next.as_ref().and_then(|existing| {
466            let retain = existing.len().min(denoised.len());
467            denoised.get(denoised.len() - retain..).map(<[f32]>::to_vec)
468        });
469
470        let mut processed = chunk.clone();
471        processed.samples = denoised;
472        processed.energy = energy;
473        processed.snr_db = snr_db;
474        processed.has_clipping = has_clipping;
475        processed.overlap_next = overlap_next;
476        processed.overlap_prev = None;
477
478        Ok(processed)
479    }
480
481    fn compute_energy_and_clipping(samples: &[f32]) -> (f32, bool) {
482        const CLIPPING_THRESHOLD: f32 = 0.999;
483
484        if samples.is_empty() {
485            return (0.0, false);
486        }
487
488        let mut sum_squares = 0.0f32;
489        let mut has_clipping = false;
490        for &sample in samples {
491            let abs = sample.abs();
492            if abs >= CLIPPING_THRESHOLD {
493                has_clipping = true;
494            }
495            sum_squares = sample.mul_add(sample, sum_squares);
496        }
497        let mean_square = sum_squares / samples.len() as f32;
498        (mean_square.sqrt(), has_clipping)
499    }
500
501    fn recalculate_snr(
502        previous_snr: Option<f32>,
503        previous_energy: f32,
504        new_energy: f32,
505    ) -> Option<f32> {
506        const EPSILON: f32 = 1e-10;
507        let snr_db = previous_snr?;
508
509        if previous_energy <= EPSILON {
510            return Some(snr_db);
511        }
512
513        let noise_rms = previous_energy / 10_f32.powf(snr_db / 20.0);
514        if noise_rms <= EPSILON || new_energy <= EPSILON {
515            return Some(snr_db);
516        }
517
518        let ratio = new_energy / noise_rms;
519        if ratio <= EPSILON {
520            return Some(snr_db);
521        }
522
523        Some(20.0 * ratio.log10())
524    }
525}
526
527fn samples_to_duration(samples: usize, sample_rate: u32) -> Result<AudioDuration> {
528    validate_in_range(sample_rate, 1_u32, u32::MAX, "sample_rate")?;
529
530    let sample_rate_u128 = u128::from(sample_rate);
531    let sample_count = samples as u128;
532    let nanos = (sample_count * 1_000_000_000u128 + (sample_rate_u128 / 2)) / sample_rate_u128;
533    Ok(AudioDuration::from_nanos(nanos as u64))
534}
535
536fn time_to_sample_index(
537    time: AudioTimestamp,
538    stream_start: AudioTimestamp,
539    sample_rate: u32,
540) -> Result<usize> {
541    validate_in_range(sample_rate, 1_u32, u32::MAX, "sample_rate")?;
542
543    let duration = time
544        .duration_since(stream_start)
545        .ok_or_else(|| Error::TemporalOperation("time precedes stream start".into()))?;
546    let samples = (duration.as_secs_f64() * f64::from(sample_rate)).round() as usize;
547    Ok(samples)
548}
549
550fn normalize_vad_segments(
551    segments: &[SpeechChunk],
552    stream_start: AudioTimestamp,
553    slice_start_time: AudioTimestamp,
554    slice_start_sample: usize,
555    buffer_end_sample: usize,
556    sample_rate: u32,
557) -> Result<Vec<SpeechChunk>> {
558    validate_in_range(sample_rate, 1_u32, u32::MAX, "sample_rate")?;
559
560    let mut normalized = Vec::with_capacity(segments.len());
561
562    for segment in segments {
563        let start_sample_abs = time_to_sample_index(segment.start_time, stream_start, sample_rate)?;
564        let end_sample_abs = time_to_sample_index(segment.end_time, stream_start, sample_rate)?;
565
566        if end_sample_abs <= slice_start_sample {
567            // Entire segment already processed; skip.
568            continue;
569        }
570
571        let clamped_start_abs = start_sample_abs.max(slice_start_sample);
572        let clamped_end_abs = end_sample_abs.min(buffer_end_sample);
573
574        if clamped_end_abs <= clamped_start_abs {
575            continue;
576        }
577
578        let rel_start_samples = clamped_start_abs - slice_start_sample;
579        let rel_end_samples = clamped_end_abs - slice_start_sample;
580
581        let start_time =
582            slice_start_time.add_duration(samples_to_duration(rel_start_samples, sample_rate)?);
583        let end_time =
584            slice_start_time.add_duration(samples_to_duration(rel_end_samples, sample_rate)?);
585
586        let mut adjusted = *segment;
587        adjusted.start_time = start_time;
588        adjusted.end_time = end_time;
589        normalized.push(adjusted);
590    }
591
592    Ok(normalized)
593}
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use crate::fixtures::AudioFixtures;
599
600    /// Helper to convert f32 samples to WAV bytes for testing
601    /// Creates a minimal 16-bit PCM WAV file
602    fn samples_to_wav_bytes(samples: &[f32], sample_rate: u32) -> Vec<u8> {
603        let mut wav_data = Vec::new();
604
605        // WAV header
606        let num_samples = samples.len() as u32;
607        let num_channels = 1u16;
608        let bits_per_sample = 16u16;
609        let byte_rate = sample_rate * u32::from(num_channels) * u32::from(bits_per_sample) / 8;
610        let block_align = num_channels * bits_per_sample / 8;
611        let data_size = num_samples * u32::from(block_align);
612
613        // RIFF header
614        wav_data.extend_from_slice(b"RIFF");
615        wav_data.extend_from_slice(&(36 + data_size).to_le_bytes());
616        wav_data.extend_from_slice(b"WAVE");
617
618        // fmt chunk
619        wav_data.extend_from_slice(b"fmt ");
620        wav_data.extend_from_slice(&16u32.to_le_bytes()); // fmt chunk size
621        wav_data.extend_from_slice(&1u16.to_le_bytes()); // PCM format
622        wav_data.extend_from_slice(&num_channels.to_le_bytes());
623        wav_data.extend_from_slice(&sample_rate.to_le_bytes());
624        wav_data.extend_from_slice(&byte_rate.to_le_bytes());
625        wav_data.extend_from_slice(&block_align.to_le_bytes());
626        wav_data.extend_from_slice(&bits_per_sample.to_le_bytes());
627
628        // data chunk
629        wav_data.extend_from_slice(b"data");
630        wav_data.extend_from_slice(&data_size.to_le_bytes());
631
632        // Convert f32 samples to i16 PCM
633        for &sample in samples {
634            let i16_sample = (sample.clamp(-1.0, 1.0) * 32767.0) as i16;
635            wav_data.extend_from_slice(&i16_sample.to_le_bytes());
636        }
637
638        wav_data
639    }
640
641    /// Test coordinator creation with default configuration.
642    #[test]
643    fn test_coordinator_creation_with_defaults() {
644        let coordinator = AudioPipelineCoordinator::new_with_defaults();
645        assert!(
646            coordinator.is_ok(),
647            "Failed to create coordinator with defaults"
648        );
649    }
650
651    /// Test basic frame processing with real audio data.
652    #[test]
653    fn test_process_frame_with_real_audio() {
654        let coordinator =
655            AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
656
657        // Load real test audio and convert to WAV bytes
658        let fixtures = AudioFixtures::new();
659        let audio_sample = fixtures
660            .load_sample("french_short")
661            .expect("Failed to load test audio");
662        let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
663
664        // Process audio through complete pipeline
665        let result = coordinator.process_frame(&test_audio);
666        assert!(
667            result.is_ok(),
668            "Failed to process audio frame: {:?}",
669            result.err()
670        );
671
672        let processing_result = result.unwrap();
673
674        // Verify results
675        assert!(
676            processing_result.chunks_processed > 0,
677            "No chunks were generated from audio"
678        );
679
680        // Verify latency tracking
681        assert!(
682            processing_result.total_latency < Duration::from_millis(100),
683            "Processing took too long: {:?}",
684            processing_result.total_latency
685        );
686    }
687
688    /// Test that all stage latencies are tracked.
689    #[test]
690    fn test_stage_latencies_tracked() {
691        let coordinator =
692            AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
693
694        let fixtures = AudioFixtures::new();
695        let audio_sample = fixtures
696            .load_sample("french_short")
697            .expect("Failed to load test audio");
698        let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
699
700        let result = coordinator
701            .process_frame(&test_audio)
702            .expect("Failed to process audio");
703
704        let latencies = result.stage_latencies;
705
706        // Verify all stages have latency measurements
707        assert!(
708            latencies.format_conversion > Duration::ZERO,
709            "Format conversion latency not tracked"
710        );
711        assert!(
712            latencies.vad_detection > Duration::ZERO,
713            "VAD detection latency not tracked"
714        );
715        assert!(
716            latencies.chunking > Duration::ZERO,
717            "Chunking latency not tracked"
718        );
719
720        // Preprocessing and broadcasting might be zero if no chunks generated
721        let _ = latencies.preprocessing_avg;
722        let _ = latencies.broadcasting_avg;
723    }
724
725    /// Test <60ms latency performance contract for audio processing.
726    #[test]
727    fn test_latency_performance_contract() {
728        let coordinator =
729            AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
730
731        let fixtures = AudioFixtures::new();
732        let audio_sample = fixtures
733            .load_sample("french_short")
734            .expect("Failed to load test audio");
735        let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
736
737        coordinator
738            .process_frame(&test_audio)
739            .expect("Failed to process warm-up audio");
740
741        let mut latencies = Vec::new();
742        for _ in 0..5 {
743            let result = coordinator
744                .process_frame(&test_audio)
745                .expect("Failed to process audio");
746            latencies.push(result.total_latency);
747        }
748
749        latencies.sort();
750        let p95_index = (latencies.len() as f64 * 0.95).ceil() as usize - 1;
751        let p95_latency = latencies[p95_index];
752
753        assert!(
754            p95_latency < Duration::from_millis(150),
755            "P95 latency exceeds 150ms (CI-tolerant): {:?}",
756            p95_latency
757        );
758    }
759
760    /// Test backpressure detection returns correct status.
761    #[test]
762    fn test_backpressure_detection() {
763        let coordinator =
764            AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
765
766        assert!(true, "Coordinator should not throttle initially");
767
768        let fixtures = AudioFixtures::new();
769        let audio_sample = fixtures
770            .load_sample("french_short")
771            .expect("Failed to load test audio");
772        let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
773
774        let result = coordinator
775            .process_frame(&test_audio)
776            .expect("Failed to process audio");
777
778        let _ = result.backpressure_active;
779
780        for _ in 0..3 {
781            coordinator
782                .process_frame(&test_audio)
783                .expect("Failed to process audio");
784        }
785        assert!(
786            true,
787            "Coordinator should not throttle without registered consumers"
788        );
789    }
790
791    /// Test processing empty audio handles gracefully.
792    #[test]
793    fn test_process_empty_audio() {
794        let coordinator =
795            AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
796
797        let empty_audio = &[];
798        let result = coordinator.process_frame(empty_audio);
799
800        assert!(result.is_err(), "Empty audio should return error");
801    }
802}