Skip to main content

speech_prep/
pipeline.rs

1//! End-to-end audio processing coordinator.
2//!
3//! Integrates format conversion, VAD, chunking, and preprocessing into a
4//! synchronous pipeline for one input stream at a time.
5//!
6//! # Architecture
7//!
8//! The coordinator follows a simple synchronous design optimized for CPU-bound
9//! audio processing:
10//!
11//! ```text
12//! Raw Audio Bytes
13//!     ↓
14//! Format Conversion → StandardAudio (16kHz mono PCM)
15//!     ↓
16//! VAD Detection → Speech Segments
17//!     ↓
18//! Chunking → ProcessedChunks (500ms aligned)
19//!     ↓
20//! Preprocessing → Clean Audio
21//!     ↓
22//! Processed Output
23//! ```
24//!
25//! # Performance Contract
26//!
27//! - **Audio processing latency**: <60ms P95 (all 4 stages)
28//! - **Per-stage tracking**: Individual latency metrics exported
29//! - **Stage reporting**: Per-stage latency metrics are returned
30//!
31//! # Example
32//!
33//! ```rust,no_run
34//! use speech_prep::pipeline::AudioPipelineCoordinator;
35//!
36//! # fn main() -> speech_prep::error::Result<()> {
37//! let coordinator = AudioPipelineCoordinator::new_with_defaults()?;
38//!
39//! let audio_bytes = std::fs::read("sample.wav")?;
40//! let result = coordinator.process_frame(&audio_bytes)?;
41//!
42//! assert!(result.total_latency < std::time::Duration::from_secs(1));
43//! assert!(result.chunks_processed <= 16);
44//! # Ok(())
45//! # }
46//! ```
47
48use std::sync::atomic::{AtomicUsize, Ordering};
49use std::sync::Arc;
50use std::time::Duration;
51
52use parking_lot::Mutex;
53
54use crate::chunker::{Chunker, ProcessedChunk};
55use crate::converter::{AudioFormatConverter, ConversionMetadata, StandardAudio};
56use crate::error::{Error, Result};
57use crate::format::AudioFormat;
58use crate::preprocessing::{DcHighPassFilter, NoiseReducer, PreprocessingConfig, VadContext};
59use crate::time::{validate_in_range, AudioDuration, AudioInstant, AudioTimestamp};
60use crate::vad::{SpeechChunk, VadDetector};
61
62/// Result of processing audio through the complete pipeline.
63#[derive(Debug, Clone, Copy)]
64pub struct ProcessingResult {
65    /// Total number of chunks generated.
66    pub chunks_processed: usize,
67
68    /// Total latency for complete pipeline execution.
69    pub total_latency: Duration,
70
71    /// Per-stage latency breakdown.
72    pub stage_latencies: StageLatencies,
73
74    /// Reserved compatibility flag from the earlier fan-out pipeline surface.
75    ///
76    /// The standalone coordinator never applies backpressure, so this is
77    /// always `false`.
78    pub backpressure_active: bool,
79}
80
81/// Latency measurements for each pipeline stage.
82#[derive(Debug, Clone, Copy, Default)]
83pub struct StageLatencies {
84    /// Format conversion latency.
85    pub format_conversion: Duration,
86
87    /// VAD detection latency.
88    pub vad_detection: Duration,
89
90    /// Chunking latency.
91    pub chunking: Duration,
92
93    /// Preprocessing latency (per chunk average).
94    pub preprocessing_avg: Duration,
95
96    /// Reserved compatibility field from the earlier fan-out pipeline surface.
97    ///
98    /// The standalone coordinator has no broadcast stage, so this is always
99    /// `Duration::ZERO`.
100    pub broadcasting_avg: Duration,
101}
102
103/// Audio pipeline coordinator.
104///
105/// Orchestrates format conversion, VAD detection, chunking, and preprocessing
106/// in a synchronous pipeline optimized for CPU-bound audio processing.
107///
108/// **Thread Safety**: Preprocessing filters wrapped in `Mutex` for interior
109/// mutability, allowing concurrent frame processing from multiple callers.
110#[derive(Debug)]
111pub struct AudioPipelineCoordinator {
112    vad_detector: Arc<VadDetector>,
113    chunker: Chunker,
114    dc_filter: Mutex<DcHighPassFilter>,
115    noise_reducer: Mutex<NoiseReducer>,
116    stream_buffer: Mutex<StreamBuffer>,
117    processed_cursor: AtomicUsize,
118}
119
120#[derive(Debug, Default)]
121struct StreamBuffer {
122    base_sample_index: usize,
123    samples: Vec<f32>,
124}
125
126impl StreamBuffer {
127    fn append(&mut self, new_samples: &[f32]) {
128        self.samples.extend_from_slice(new_samples);
129    }
130
131    fn as_slice(&self) -> &[f32] {
132        &self.samples
133    }
134
135    fn base_sample_index(&self) -> usize {
136        self.base_sample_index
137    }
138
139    fn len(&self) -> usize {
140        self.samples.len()
141    }
142
143    fn start_time(&self, stream_start: AudioTimestamp, sample_rate: u32) -> Result<AudioTimestamp> {
144        let offset = samples_to_duration(self.base_sample_index, sample_rate)?;
145        Ok(stream_start.add_duration(offset))
146    }
147
148    fn drop_through(&mut self, sample_index: usize) {
149        if sample_index <= self.base_sample_index {
150            return;
151        }
152
153        let drop_count = sample_index
154            .saturating_sub(self.base_sample_index)
155            .min(self.samples.len());
156
157        if drop_count == 0 {
158            return;
159        }
160
161        if drop_count >= self.samples.len() {
162            self.samples.clear();
163            self.base_sample_index = sample_index;
164        } else {
165            self.samples.drain(..drop_count);
166            self.base_sample_index += drop_count;
167        }
168    }
169}
170
171impl AudioPipelineCoordinator {
172    /// Create a new coordinator with provided components.
173    ///
174    /// # Arguments
175    ///
176    /// * `vad_detector` - Voice activity detector
177    /// * `chunker` - Audio chunker
178    /// * `dc_filter` - DC offset removal and high-pass filter
179    /// * `noise_reducer` - Spectral noise reduction
180    pub fn new(
181        vad_detector: Arc<VadDetector>,
182        chunker: Chunker,
183        dc_filter: DcHighPassFilter,
184        noise_reducer: NoiseReducer,
185    ) -> Self {
186        Self {
187            vad_detector,
188            chunker,
189            dc_filter: Mutex::new(dc_filter),
190            noise_reducer: Mutex::new(noise_reducer),
191            stream_buffer: Mutex::new(StreamBuffer::default()),
192            processed_cursor: AtomicUsize::new(0),
193        }
194    }
195
196    /// Create coordinator with default configuration.
197    ///
198    /// Suitable for testing and standard audio processing scenarios.
199    ///
200    /// # Errors
201    ///
202    /// Returns error if component initialization fails.
203    ///
204    /// # Example
205    ///
206    /// ```rust,no_run
207    /// use speech_prep::pipeline::AudioPipelineCoordinator;
208    ///
209    /// # fn main() -> speech_prep::error::Result<()> {
210    /// let coordinator = AudioPipelineCoordinator::new_with_defaults()?;
211    /// # Ok(())
212    /// # }
213    /// ```
214    pub fn new_with_defaults() -> Result<Self> {
215        use crate::{NoopVadMetricsCollector, VadConfig, VadMetricsCollector};
216
217        let metrics: Arc<dyn VadMetricsCollector> = Arc::new(NoopVadMetricsCollector);
218
219        let vad_config = VadConfig::default();
220        let vad_detector = Arc::new(VadDetector::new(vad_config, metrics)?);
221
222        let chunker = Chunker::default();
223
224        let dc_config = PreprocessingConfig::default();
225        let dc_filter = DcHighPassFilter::new(dc_config)?;
226
227        let noise_config = crate::preprocessing::NoiseReductionConfig::default();
228        let noise_reducer = crate::preprocessing::NoiseReducer::new(noise_config)?;
229
230        Ok(Self::new(vad_detector, chunker, dc_filter, noise_reducer))
231    }
232
233    /// Process raw audio bytes through complete pipeline.
234    ///
235    /// # Performance Contract
236    ///
237    /// Total audio processing latency must be <60ms P95 for standard inputs
238    /// (500ms audio chunks at 16kHz).
239    ///
240    /// # Arguments
241    ///
242    /// * `audio_bytes` - Raw audio data (WAV, PCM, or other supported formats)
243    ///
244    /// # Returns
245    ///
246    /// Processing result with latency metrics and chunk count.
247    ///
248    /// # Errors
249    ///
250    /// Returns error if:
251    /// - Format detection/conversion fails
252    /// - VAD detection fails
253    /// - Chunking fails
254    /// - Preprocessing fails
255    ///
256    /// # Example
257    ///
258    /// ```rust,no_run
259    /// # use speech_prep::pipeline::AudioPipelineCoordinator;
260    /// # fn main() -> speech_prep::error::Result<()> {
261    /// let coordinator = AudioPipelineCoordinator::new_with_defaults()?;
262    /// let audio = std::fs::read("test.wav")?;
263    ///
264    /// let result = coordinator.process_frame(&audio)?;
265    /// assert!(result.total_latency < std::time::Duration::from_millis(60));
266    /// # Ok(())
267    /// # }
268    /// ```
269    pub fn process_frame(&self, audio_bytes: &[u8]) -> Result<ProcessingResult> {
270        let pipeline_start = AudioInstant::now();
271        let mut latencies = StageLatencies::default();
272
273        let format_start = AudioInstant::now();
274        let standard_audio = AudioFormatConverter::convert_to_standard(audio_bytes)?;
275        latencies.format_conversion = format_start.elapsed();
276
277        self.process_standard_audio(&standard_audio, pipeline_start, latencies)
278    }
279
280    /// Flush pending audio by injecting trailing silence to finalize speech
281    /// segments.
282    ///
283    /// This should be invoked when a streaming session ends to ensure any
284    /// active speech region is emitted before scoring.
285    pub fn flush(&self) -> Result<ProcessingResult> {
286        let pipeline_start = AudioInstant::now();
287        let latencies = StageLatencies::default();
288
289        let config = *self.vad_detector.config();
290        let frame_len = config.frame_length_samples()?;
291        let frames_to_flush = (config.hangover_frames.max(1)) + 1;
292        let silence_samples = vec![0.0f32; frame_len * frames_to_flush];
293
294        let metadata = ConversionMetadata {
295            original_format: AudioFormat::WavPcm,
296            original_sample_rate: config.sample_rate,
297            original_channels: 1,
298            original_bit_depth: Some(16),
299            peak_before: 0.0,
300            peak_after: 0.0,
301            conversion_time_ms: 0.0,
302            detection_time_ms: 0.0,
303            decode_time_ms: 0.0,
304            resample_time_ms: 0.0,
305            mix_time_ms: 0.0,
306        };
307
308        let standard_audio = StandardAudio {
309            samples: silence_samples,
310            metadata,
311        };
312        self.process_standard_audio(&standard_audio, pipeline_start, latencies)
313    }
314
315    fn process_standard_audio(
316        &self,
317        standard_audio: &StandardAudio,
318        pipeline_start: AudioInstant,
319        mut latencies: StageLatencies,
320    ) -> Result<ProcessingResult> {
321        let vad_start = AudioInstant::now();
322        let vad_segments = self.vad_detector.detect(&standard_audio.samples)?;
323        latencies.vad_detection = vad_start.elapsed();
324
325        let sample_rate = self.vad_detector.config().sample_rate;
326        let stream_start_time = self.vad_detector.config().stream_start_time;
327
328        let (chunks, chunk_duration) = {
329            let mut buffer = self.stream_buffer.lock();
330            buffer.append(&standard_audio.samples);
331
332            if buffer.as_slice().is_empty() {
333                Ok::<(Vec<ProcessedChunk>, Duration), Error>((Vec::new(), Duration::default()))
334            } else {
335                let buffer_base = buffer.base_sample_index();
336                let buffer_len = buffer.len();
337                let buffer_end_abs = buffer_base + buffer_len;
338                let processed_before = self.processed_cursor.load(Ordering::Acquire);
339                let slice_start_abs = processed_before.max(buffer_base);
340
341                if slice_start_abs >= buffer_base + buffer_len {
342                    let lookback_samples = (sample_rate as usize) / 5;
343                    let drop_target = slice_start_abs.saturating_sub(lookback_samples);
344                    buffer.drop_through(drop_target);
345                    drop(buffer);
346                    return Ok(ProcessingResult {
347                        chunks_processed: 0,
348                        total_latency: pipeline_start.elapsed(),
349                        stage_latencies: latencies,
350                        backpressure_active: false,
351                    });
352                }
353
354                let base_time = buffer.start_time(stream_start_time, sample_rate)?;
355                let offset_samples = slice_start_abs.saturating_sub(buffer_base);
356                let offset_duration = samples_to_duration(offset_samples, sample_rate)?;
357                let audio_start = base_time.add_duration(offset_duration);
358
359                let audio_slice = buffer
360                    .as_slice()
361                    .get(offset_samples..)
362                    .ok_or_else(|| Error::InvalidInput("invalid buffer window".into()))?;
363
364                let normalized_segments = normalize_vad_segments(
365                    &vad_segments,
366                    stream_start_time,
367                    audio_start,
368                    slice_start_abs,
369                    buffer_end_abs,
370                    sample_rate,
371                )?;
372
373                let chunk_start = AudioInstant::now();
374                let chunks = self.chunker.chunk_with_stream_start(
375                    audio_slice,
376                    sample_rate,
377                    &normalized_segments,
378                    audio_start,
379                )?;
380                let elapsed = chunk_start.elapsed();
381
382                let mut max_processed_sample = processed_before;
383                for chunk in &chunks {
384                    let end_sample =
385                        time_to_sample_index(chunk.end_time, stream_start_time, sample_rate)?;
386                    if end_sample > max_processed_sample {
387                        max_processed_sample = end_sample;
388                    }
389                }
390                self.processed_cursor
391                    .store(max_processed_sample, Ordering::Release);
392
393                let lookback_samples = (sample_rate as usize) / 5; // Retain ~200ms history
394                let drop_target = max_processed_sample.saturating_sub(lookback_samples);
395                buffer.drop_through(drop_target);
396                drop(buffer);
397
398                Ok::<(Vec<ProcessedChunk>, Duration), Error>((chunks, elapsed))
399            }
400        }?;
401        latencies.chunking = chunk_duration;
402
403        let mut total_preprocess = Duration::default();
404        let mut prev_overlap_next: Option<Vec<f32>> = None;
405
406        for chunk in &chunks {
407            let preprocess_start = AudioInstant::now();
408            let mut preprocessed = self.preprocess_chunk(chunk)?;
409
410            if let Some(prev_overlap) = prev_overlap_next.take() {
411                preprocessed.overlap_prev = Some(prev_overlap);
412            } else {
413                preprocessed.overlap_prev = None;
414            }
415
416            prev_overlap_next.clone_from(&preprocessed.overlap_next);
417
418            total_preprocess += preprocess_start.elapsed();
419        }
420
421        let chunk_count = chunks.len().max(1);
422        latencies.preprocessing_avg = total_preprocess / chunk_count as u32;
423        latencies.broadcasting_avg = Duration::ZERO;
424
425        let total_latency = pipeline_start.elapsed();
426
427        if total_latency > Duration::from_millis(60) {
428            tracing::warn!(
429                latency_ms = total_latency.as_millis(),
430                "Audio processing exceeded 60ms target"
431            );
432        }
433
434        let backpressure_active = false;
435
436        Ok(ProcessingResult {
437            chunks_processed: chunks.len(),
438            total_latency,
439            stage_latencies: latencies,
440            backpressure_active,
441        })
442    }
443
444    fn preprocess_chunk(&self, chunk: &ProcessedChunk) -> Result<ProcessedChunk> {
445        let vad_ctx = VadContext {
446            is_silence: chunk.is_silence(),
447        };
448        let dc_clean = {
449            let mut filter = self.dc_filter.lock();
450            filter.process(&chunk.samples, Some(&vad_ctx))?
451        };
452
453        let denoised = {
454            let mut reducer = self.noise_reducer.lock();
455            reducer.reduce(&dc_clean, Some(vad_ctx))?
456        };
457
458        let (energy, has_clipping) = Self::compute_energy_and_clipping(&denoised);
459        let snr_db = Self::recalculate_snr(chunk.snr_db, chunk.energy, energy);
460
461        let overlap_next = chunk.overlap_next.as_ref().and_then(|existing| {
462            let retain = existing.len().min(denoised.len());
463            denoised.get(denoised.len() - retain..).map(<[f32]>::to_vec)
464        });
465
466        let mut processed = chunk.clone();
467        processed.samples = denoised;
468        processed.energy = energy;
469        processed.snr_db = snr_db;
470        processed.has_clipping = has_clipping;
471        processed.overlap_next = overlap_next;
472        processed.overlap_prev = None;
473
474        Ok(processed)
475    }
476
477    fn compute_energy_and_clipping(samples: &[f32]) -> (f32, bool) {
478        const CLIPPING_THRESHOLD: f32 = 0.999;
479
480        if samples.is_empty() {
481            return (0.0, false);
482        }
483
484        let mut sum_squares = 0.0f32;
485        let mut has_clipping = false;
486        for &sample in samples {
487            let abs = sample.abs();
488            if abs >= CLIPPING_THRESHOLD {
489                has_clipping = true;
490            }
491            sum_squares = sample.mul_add(sample, sum_squares);
492        }
493        let mean_square = sum_squares / samples.len() as f32;
494        (mean_square.sqrt(), has_clipping)
495    }
496
497    fn recalculate_snr(
498        previous_snr: Option<f32>,
499        previous_energy: f32,
500        new_energy: f32,
501    ) -> Option<f32> {
502        const EPSILON: f32 = 1e-10;
503        let snr_db = previous_snr?;
504
505        if previous_energy <= EPSILON {
506            return Some(snr_db);
507        }
508
509        let noise_rms = previous_energy / 10_f32.powf(snr_db / 20.0);
510        if noise_rms <= EPSILON || new_energy <= EPSILON {
511            return Some(snr_db);
512        }
513
514        let ratio = new_energy / noise_rms;
515        if ratio <= EPSILON {
516            return Some(snr_db);
517        }
518
519        Some(20.0 * ratio.log10())
520    }
521}
522
523fn samples_to_duration(samples: usize, sample_rate: u32) -> Result<AudioDuration> {
524    validate_in_range(sample_rate, 1_u32, u32::MAX, "sample_rate")?;
525
526    let sample_rate_u128 = u128::from(sample_rate);
527    let sample_count = samples as u128;
528    let nanos = (sample_count * 1_000_000_000u128 + (sample_rate_u128 / 2)) / sample_rate_u128;
529    Ok(AudioDuration::from_nanos(nanos as u64))
530}
531
532fn time_to_sample_index(
533    time: AudioTimestamp,
534    stream_start: AudioTimestamp,
535    sample_rate: u32,
536) -> Result<usize> {
537    validate_in_range(sample_rate, 1_u32, u32::MAX, "sample_rate")?;
538
539    let duration = time
540        .duration_since(stream_start)
541        .ok_or_else(|| Error::TemporalOperation("time precedes stream start".into()))?;
542    let samples = (duration.as_secs_f64() * f64::from(sample_rate)).round() as usize;
543    Ok(samples)
544}
545
546fn normalize_vad_segments(
547    segments: &[SpeechChunk],
548    stream_start: AudioTimestamp,
549    slice_start_time: AudioTimestamp,
550    slice_start_sample: usize,
551    buffer_end_sample: usize,
552    sample_rate: u32,
553) -> Result<Vec<SpeechChunk>> {
554    validate_in_range(sample_rate, 1_u32, u32::MAX, "sample_rate")?;
555
556    let mut normalized = Vec::with_capacity(segments.len());
557
558    for segment in segments {
559        let start_sample_abs = time_to_sample_index(segment.start_time, stream_start, sample_rate)?;
560        let end_sample_abs = time_to_sample_index(segment.end_time, stream_start, sample_rate)?;
561
562        if end_sample_abs <= slice_start_sample {
563            // Entire segment already processed; skip.
564            continue;
565        }
566
567        let clamped_start_abs = start_sample_abs.max(slice_start_sample);
568        let clamped_end_abs = end_sample_abs.min(buffer_end_sample);
569
570        if clamped_end_abs <= clamped_start_abs {
571            continue;
572        }
573
574        let rel_start_samples = clamped_start_abs - slice_start_sample;
575        let rel_end_samples = clamped_end_abs - slice_start_sample;
576
577        let start_time =
578            slice_start_time.add_duration(samples_to_duration(rel_start_samples, sample_rate)?);
579        let end_time =
580            slice_start_time.add_duration(samples_to_duration(rel_end_samples, sample_rate)?);
581
582        let mut adjusted = *segment;
583        adjusted.start_time = start_time;
584        adjusted.end_time = end_time;
585        normalized.push(adjusted);
586    }
587
588    Ok(normalized)
589}
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594    use crate::fixtures::AudioFixtures;
595
596    /// Helper to convert f32 samples to WAV bytes for testing
597    /// Creates a minimal 16-bit PCM WAV file
598    fn samples_to_wav_bytes(samples: &[f32], sample_rate: u32) -> Vec<u8> {
599        let mut wav_data = Vec::new();
600
601        // WAV header
602        let num_samples = samples.len() as u32;
603        let num_channels = 1u16;
604        let bits_per_sample = 16u16;
605        let byte_rate = sample_rate * u32::from(num_channels) * u32::from(bits_per_sample) / 8;
606        let block_align = num_channels * bits_per_sample / 8;
607        let data_size = num_samples * u32::from(block_align);
608
609        // RIFF header
610        wav_data.extend_from_slice(b"RIFF");
611        wav_data.extend_from_slice(&(36 + data_size).to_le_bytes());
612        wav_data.extend_from_slice(b"WAVE");
613
614        // fmt chunk
615        wav_data.extend_from_slice(b"fmt ");
616        wav_data.extend_from_slice(&16u32.to_le_bytes()); // fmt chunk size
617        wav_data.extend_from_slice(&1u16.to_le_bytes()); // PCM format
618        wav_data.extend_from_slice(&num_channels.to_le_bytes());
619        wav_data.extend_from_slice(&sample_rate.to_le_bytes());
620        wav_data.extend_from_slice(&byte_rate.to_le_bytes());
621        wav_data.extend_from_slice(&block_align.to_le_bytes());
622        wav_data.extend_from_slice(&bits_per_sample.to_le_bytes());
623
624        // data chunk
625        wav_data.extend_from_slice(b"data");
626        wav_data.extend_from_slice(&data_size.to_le_bytes());
627
628        // Convert f32 samples to i16 PCM
629        for &sample in samples {
630            let i16_sample = (sample.clamp(-1.0, 1.0) * 32767.0) as i16;
631            wav_data.extend_from_slice(&i16_sample.to_le_bytes());
632        }
633
634        wav_data
635    }
636
637    /// Test coordinator creation with default configuration.
638    #[test]
639    fn test_coordinator_creation_with_defaults() {
640        let coordinator = AudioPipelineCoordinator::new_with_defaults();
641        assert!(
642            coordinator.is_ok(),
643            "Failed to create coordinator with defaults"
644        );
645    }
646
647    /// Test basic frame processing with real audio data.
648    #[test]
649    fn test_process_frame_with_real_audio() {
650        let coordinator =
651            AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
652
653        // Load real test audio and convert to WAV bytes
654        let fixtures = AudioFixtures::new();
655        let audio_sample = fixtures
656            .load_sample("french_short")
657            .expect("Failed to load test audio");
658        let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
659
660        // Process audio through complete pipeline
661        let result = coordinator.process_frame(&test_audio);
662        assert!(
663            result.is_ok(),
664            "Failed to process audio frame: {:?}",
665            result.err()
666        );
667
668        let processing_result = result.unwrap();
669
670        // Verify results
671        assert!(
672            processing_result.chunks_processed > 0,
673            "No chunks were generated from audio"
674        );
675
676        // Verify latency tracking
677        assert!(
678            processing_result.total_latency < Duration::from_millis(100),
679            "Processing took too long: {:?}",
680            processing_result.total_latency
681        );
682    }
683
684    /// Test that all stage latencies are tracked.
685    #[test]
686    fn test_stage_latencies_tracked() {
687        let coordinator =
688            AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
689
690        let fixtures = AudioFixtures::new();
691        let audio_sample = fixtures
692            .load_sample("french_short")
693            .expect("Failed to load test audio");
694        let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
695
696        let result = coordinator
697            .process_frame(&test_audio)
698            .expect("Failed to process audio");
699
700        let latencies = result.stage_latencies;
701
702        // Verify all stages have latency measurements
703        assert!(
704            latencies.format_conversion > Duration::ZERO,
705            "Format conversion latency not tracked"
706        );
707        assert!(
708            latencies.vad_detection > Duration::ZERO,
709            "VAD detection latency not tracked"
710        );
711        assert!(
712            latencies.chunking > Duration::ZERO,
713            "Chunking latency not tracked"
714        );
715
716        let _ = latencies.preprocessing_avg;
717        assert_eq!(
718            latencies.broadcasting_avg,
719            Duration::ZERO,
720            "Standalone pipeline should not report broadcast latency"
721        );
722    }
723
724    /// Test <60ms latency performance contract for audio processing.
725    #[test]
726    fn test_latency_performance_contract() {
727        let coordinator =
728            AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
729
730        let fixtures = AudioFixtures::new();
731        let audio_sample = fixtures
732            .load_sample("french_short")
733            .expect("Failed to load test audio");
734        let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
735
736        coordinator
737            .process_frame(&test_audio)
738            .expect("Failed to process warm-up audio");
739
740        let mut latencies = Vec::new();
741        for _ in 0..5 {
742            let result = coordinator
743                .process_frame(&test_audio)
744                .expect("Failed to process audio");
745            latencies.push(result.total_latency);
746        }
747
748        latencies.sort();
749        let p95_index = (latencies.len() as f64 * 0.95).ceil() as usize - 1;
750        let p95_latency = latencies[p95_index];
751
752        assert!(
753            p95_latency < Duration::from_millis(150),
754            "P95 latency exceeds 150ms (CI-tolerant): {:?}",
755            p95_latency
756        );
757    }
758
759    /// Test the compatibility flag remains disabled.
760    #[test]
761    fn test_backpressure_detection() {
762        let coordinator =
763            AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
764
765        let fixtures = AudioFixtures::new();
766        let audio_sample = fixtures
767            .load_sample("french_short")
768            .expect("Failed to load test audio");
769        let test_audio = samples_to_wav_bytes(&audio_sample.audio_data, audio_sample.sample_rate);
770
771        let result = coordinator
772            .process_frame(&test_audio)
773            .expect("Failed to process audio");
774
775        assert!(
776            !result.backpressure_active,
777            "Standalone coordinator should not report backpressure"
778        );
779
780        for _ in 0..3 {
781            let result = coordinator
782                .process_frame(&test_audio)
783                .expect("Failed to process audio");
784            assert!(
785                !result.backpressure_active,
786                "Standalone coordinator should not report backpressure"
787            );
788        }
789    }
790
791    /// Test processing empty audio handles gracefully.
792    #[test]
793    fn test_process_empty_audio() {
794        let coordinator =
795            AudioPipelineCoordinator::new_with_defaults().expect("Failed to create coordinator");
796
797        let empty_audio = &[];
798        let result = coordinator.process_frame(empty_audio);
799
800        assert!(result.is_err(), "Empty audio should return error");
801    }
802}