use crate::{audio::AudioBuffer, error::Result, types::AudioFormat, VoirsError};
use futures::{stream::BoxStream, Stream, StreamExt};
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, VecDeque},
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
mod duration_secs {
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use std::time::Duration;
pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
duration.as_secs_f64().serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let secs = f64::deserialize(deserializer)?;
if secs < 0.0 {
return Err(D::Error::custom("Duration cannot be negative"));
}
Ok(Duration::from_secs_f64(secs))
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct StreamingConfig {
pub max_chunk_chars: usize,
pub min_chunk_chars: usize,
pub max_concurrent_chunks: usize,
pub overlap_frames: usize,
#[serde(with = "duration_secs")]
pub max_latency: Duration,
pub realtime_buffer_size: usize,
pub maintain_order: bool,
pub max_buffer_size: usize,
#[serde(with = "duration_secs")]
pub urgent_timeout: Duration,
#[serde(with = "duration_secs")]
pub task_timeout: Duration,
pub quality_vs_latency: f32,
pub adaptive_chunking: bool,
pub target_rtf: f32,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
max_chunk_chars: 200,
min_chunk_chars: 50,
max_concurrent_chunks: 4,
overlap_frames: 8,
max_latency: Duration::from_millis(500),
realtime_buffer_size: 32,
maintain_order: true,
max_buffer_size: 2000,
urgent_timeout: Duration::from_millis(100),
task_timeout: Duration::from_millis(1000),
quality_vs_latency: 0.7,
adaptive_chunking: true,
target_rtf: 0.3,
}
}
}
impl StreamingConfig {
pub fn low_latency() -> Self {
Self {
max_chunk_chars: 100,
min_chunk_chars: 25,
max_latency: Duration::from_millis(200),
urgent_timeout: Duration::from_millis(50),
quality_vs_latency: 0.3,
target_rtf: 0.2,
..Default::default()
}
}
pub fn high_quality() -> Self {
Self {
max_chunk_chars: 400,
min_chunk_chars: 100,
max_latency: Duration::from_millis(1000),
quality_vs_latency: 1.0,
target_rtf: 0.8,
overlap_frames: 16,
..Default::default()
}
}
pub fn batch_processing() -> Self {
Self {
max_chunk_chars: 500,
min_chunk_chars: 200,
max_concurrent_chunks: 8,
max_latency: Duration::from_millis(5000),
quality_vs_latency: 0.9,
target_rtf: 1.0,
maintain_order: true,
..Default::default()
}
}
pub fn validate(&self) -> Result<()> {
if self.min_chunk_chars >= self.max_chunk_chars {
return Err(VoirsError::invalid_config(
"chunk_chars",
format!("min={}, max={}", self.min_chunk_chars, self.max_chunk_chars),
"min_chunk_chars must be less than max_chunk_chars",
));
}
if self.max_concurrent_chunks == 0 {
return Err(VoirsError::invalid_config(
"max_concurrent_chunks",
"0",
"must be greater than 0",
));
}
if self.quality_vs_latency < 0.0 || self.quality_vs_latency > 1.0 {
return Err(VoirsError::invalid_config(
"quality_vs_latency",
self.quality_vs_latency.to_string(),
"must be between 0.0 and 1.0",
));
}
Ok(())
}
pub fn adapt_for_performance(&mut self, rtf: f32, latency: Duration) {
if !self.adaptive_chunking {
return;
}
if rtf > self.target_rtf * 1.2 {
self.max_chunk_chars = (self.max_chunk_chars * 9 / 10).max(self.min_chunk_chars + 10);
self.min_chunk_chars = (self.min_chunk_chars * 9 / 10).max(20);
}
else if rtf < self.target_rtf * 0.8 && latency < self.max_latency / 2 {
self.max_chunk_chars = (self.max_chunk_chars * 11 / 10).min(1000);
self.min_chunk_chars = (self.min_chunk_chars * 11 / 10).min(self.max_chunk_chars / 2);
}
}
}
#[derive(Debug, Clone)]
pub struct StreamingState {
pub chunks_processed: usize,
pub total_duration: f32,
pub avg_processing_time: Duration,
pub quality_metrics: QualityMetrics,
pub processing_start: Option<Instant>,
pub total_chars_processed: usize,
pub throughput: ThroughputMetrics,
pub error_count: usize,
pub last_error: Option<String>,
}
impl Default for StreamingState {
fn default() -> Self {
Self {
chunks_processed: 0,
total_duration: 0.0,
avg_processing_time: Duration::ZERO,
quality_metrics: QualityMetrics::default(),
processing_start: None,
total_chars_processed: 0,
throughput: ThroughputMetrics::default(),
error_count: 0,
last_error: None,
}
}
}
impl StreamingState {
pub fn reset_for_new_synthesis(&mut self) {
*self = Self::default();
self.processing_start = Some(Instant::now());
}
pub fn update_with_chunk(&mut self, chunk: &AudioChunk) {
self.chunks_processed += 1;
self.total_duration += chunk.audio.duration();
self.total_chars_processed += chunk.text.len();
let total_time = self.avg_processing_time.as_nanos() as u64
* (self.chunks_processed - 1) as u64
+ chunk.processing_time.as_nanos() as u64;
self.avg_processing_time = Duration::from_nanos(total_time / self.chunks_processed as u64);
self.quality_metrics.update_with_chunk(chunk);
if let Some(start) = self.processing_start {
let elapsed = start.elapsed();
self.throughput.update(self.total_chars_processed, elapsed);
}
}
pub fn record_error(&mut self, error: &str) {
self.error_count += 1;
self.last_error = Some(error.to_string());
}
pub fn overall_rtf(&self) -> f32 {
if self.total_duration <= 0.0 {
return 0.0;
}
if let Some(start) = self.processing_start {
let processing_time = start.elapsed().as_secs_f32();
processing_time / self.total_duration
} else {
self.avg_processing_time.as_secs_f32()
/ (self.total_duration / self.chunks_processed as f32)
}
}
pub fn is_realtime(&self) -> bool {
if self.total_duration <= 0.0 || self.chunks_processed == 0 {
return false;
}
self.overall_rtf() <= 1.0
}
pub fn efficiency(&self) -> f32 {
let rtf = self.overall_rtf();
if rtf <= 1.0 {
1.0 - rtf * 0.5 } else {
1.0 / rtf }
}
}
#[derive(Debug, Clone)]
pub struct QualityMetrics {
pub real_time_factor: f32,
pub avg_latency: Duration,
pub underruns: usize,
pub dropped_chunks: usize,
pub peak_rtf: f32,
pub latency_percentiles: LatencyPercentiles,
pub consistency_score: f32,
}
impl Default for QualityMetrics {
fn default() -> Self {
Self {
real_time_factor: 0.0,
avg_latency: Duration::ZERO,
underruns: 0,
dropped_chunks: 0,
peak_rtf: 0.0,
latency_percentiles: LatencyPercentiles::default(),
consistency_score: 1.0,
}
}
}
impl QualityMetrics {
pub fn update_with_chunk(&mut self, chunk: &AudioChunk) {
let chunk_rtf = chunk.real_time_factor();
self.real_time_factor = if self.real_time_factor == 0.0 {
chunk_rtf
} else {
self.real_time_factor * 0.9 + chunk_rtf * 0.1 };
if chunk_rtf > self.peak_rtf {
self.peak_rtf = chunk_rtf;
}
self.latency_percentiles.add_sample(chunk.processing_time);
let rtf_variance = (chunk_rtf - self.real_time_factor).abs();
self.consistency_score =
self.consistency_score * 0.95 + (1.0 - rtf_variance.min(1.0)) * 0.05;
}
pub fn is_quality_degrading(&self) -> bool {
self.real_time_factor > 1.0 ||
self.peak_rtf > 1.0 || self.consistency_score < 0.7 ||
self.dropped_chunks > 0
}
}
#[derive(Debug, Clone)]
pub struct LatencyPercentiles {
samples: VecDeque<Duration>,
max_samples: usize,
}
impl Default for LatencyPercentiles {
fn default() -> Self {
Self {
samples: VecDeque::new(),
max_samples: 100,
}
}
}
impl LatencyPercentiles {
fn add_sample(&mut self, latency: Duration) {
self.samples.push_back(latency);
if self.samples.len() > self.max_samples {
self.samples.pop_front();
}
}
pub fn percentile(&self, p: f32) -> Duration {
if self.samples.is_empty() {
return Duration::ZERO;
}
let mut sorted: Vec<_> = self.samples.iter().collect();
sorted.sort();
let index = ((sorted.len() - 1) as f32 * p / 100.0) as usize;
*sorted[index]
}
pub fn p50(&self) -> Duration {
self.percentile(50.0)
}
pub fn p95(&self) -> Duration {
self.percentile(95.0)
}
pub fn p99(&self) -> Duration {
self.percentile(99.0)
}
}
#[derive(Debug, Clone, Default)]
pub struct ThroughputMetrics {
pub chars_per_second: f32,
pub audio_per_second: f32,
pub peak_chars_per_second: f32,
}
impl ThroughputMetrics {
fn update(&mut self, total_chars: usize, elapsed: Duration) {
if elapsed.as_secs_f32() > 0.0 {
self.chars_per_second = total_chars as f32 / elapsed.as_secs_f32();
if self.chars_per_second > self.peak_chars_per_second {
self.peak_chars_per_second = self.chars_per_second;
}
}
}
}
#[derive(Debug, Clone)]
pub struct LatencyStats {
pub sample_count: usize,
pub average_latency: Duration,
pub p95_latency: Duration,
pub max_latency: Duration,
pub urgent_count: usize,
pub recent_samples: VecDeque<Duration>,
}
impl Default for LatencyStats {
fn default() -> Self {
Self {
sample_count: 0,
average_latency: Duration::ZERO,
p95_latency: Duration::ZERO,
max_latency: Duration::ZERO,
urgent_count: 0,
recent_samples: VecDeque::new(),
}
}
}
impl LatencyStats {
const MAX_RECENT_SAMPLES: usize = 100;
pub fn update(&mut self, latency: Duration) {
self.sample_count += 1;
let total_nanos = self.average_latency.as_nanos() as u64 * (self.sample_count - 1) as u64
+ latency.as_nanos() as u64;
self.average_latency = Duration::from_nanos(total_nanos / self.sample_count as u64);
if latency > self.max_latency {
self.max_latency = latency;
}
self.recent_samples.push_back(latency);
if self.recent_samples.len() > Self::MAX_RECENT_SAMPLES {
self.recent_samples.pop_front();
}
self.update_percentiles();
}
pub fn update_urgent(&mut self, latency: Duration) {
self.urgent_count += 1;
self.update(latency);
}
fn update_percentiles(&mut self) {
if self.recent_samples.is_empty() {
return;
}
let mut sorted: Vec<_> = self.recent_samples.iter().collect();
sorted.sort();
let p95_index = (sorted.len() as f32 * 0.95) as usize;
if p95_index < sorted.len() {
self.p95_latency = *sorted[p95_index];
}
}
}
#[derive(Debug, Clone)]
pub struct AudioChunk {
pub chunk_id: usize,
pub audio: AudioBuffer,
pub text: String,
pub processing_time: Duration,
pub metadata: ChunkMetadata,
}
impl AudioChunk {
pub fn real_time_factor(&self) -> f32 {
let audio_duration = Duration::from_secs_f32(self.audio.duration());
if audio_duration.as_secs_f32() > 0.0 {
self.processing_time.as_secs_f32() / audio_duration.as_secs_f32()
} else {
f32::INFINITY
}
}
pub fn is_realtime(&self) -> bool {
self.real_time_factor() <= 1.0
}
pub fn efficiency_score(&self) -> f32 {
let rtf = self.real_time_factor();
if rtf <= 1.0 {
(1.0 / (rtf + 0.1)).min(1.0)
} else {
1.0 / rtf.powf(2.0) }
}
pub fn export_metadata(&self) -> Result<String> {
serde_json::to_string_pretty(&self.metadata).map_err(|e| {
VoirsError::serialization("JSON", format!("Failed to serialize chunk metadata: {e}"))
})
}
pub fn new(
chunk_id: usize,
audio: AudioBuffer,
text: String,
processing_time: Duration,
phoneme_count: usize,
mel_frames: usize,
) -> Self {
let metadata = ChunkMetadata {
phoneme_count,
mel_frames,
is_sentence_boundary: text.trim_end().ends_with(['.', '!', '?']),
is_paragraph_boundary: text.trim_end().ends_with('\n'),
real_time_factor: Some(processing_time.as_secs_f32() / audio.duration()),
confidence_score: Self::calculate_confidence_score(
processing_time,
audio.duration(),
phoneme_count,
mel_frames,
),
};
Self {
chunk_id,
audio,
text,
processing_time,
metadata,
}
}
fn calculate_confidence_score(
processing_time: Duration,
audio_duration: f32,
phoneme_count: usize,
mel_frames: usize,
) -> f32 {
let mut confidence = 1.0f32;
let rtf = processing_time.as_secs_f32() / audio_duration.max(0.001);
if rtf > 1.0 {
confidence *= (1.0 / rtf).powf(0.5);
} else if rtf < 0.1 {
confidence *= (rtf / 0.1).powf(0.2);
}
let phonemes_per_second = phoneme_count as f32 / audio_duration.max(0.001);
let expected_phonemes_per_second = 10.0; let phoneme_ratio = (phonemes_per_second / expected_phonemes_per_second).min(2.0);
if !(0.5..=1.5).contains(&phoneme_ratio) {
confidence *= 0.9; }
let expected_mel_frames = (audio_duration * 86.13).round() as usize; let frame_ratio = mel_frames as f32 / expected_mel_frames.max(1) as f32;
if !(0.9..=1.1).contains(&frame_ratio) {
confidence *= 0.95; }
if audio_duration < 0.1 {
confidence *= 0.9;
}
confidence.clamp(0.0, 1.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkMetadata {
pub phoneme_count: usize,
pub mel_frames: usize,
pub is_sentence_boundary: bool,
pub is_paragraph_boundary: bool,
pub real_time_factor: Option<f32>,
pub confidence_score: f32,
}
pub struct OrderedChunkStream {
chunks: BoxStream<'static, Result<AudioChunk>>,
next_expected_id: usize,
buffer: BTreeMap<usize, AudioChunk>,
max_buffer_size: usize,
stats: StreamStats,
}
impl OrderedChunkStream {
pub fn new(chunks: BoxStream<'static, Result<AudioChunk>>, max_buffer_size: usize) -> Self {
Self {
chunks,
next_expected_id: 0,
buffer: BTreeMap::new(),
max_buffer_size,
stats: StreamStats::default(),
}
}
pub fn stats(&self) -> &StreamStats {
&self.stats
}
pub fn reset_stats(&mut self) {
self.stats = StreamStats::default();
}
}
impl Stream for OrderedChunkStream {
type Item = Result<AudioChunk>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let expected_id = self.next_expected_id;
if let Some(chunk) = self.buffer.remove(&expected_id) {
self.next_expected_id += 1;
self.stats.chunks_delivered += 1;
return Poll::Ready(Some(Ok(chunk)));
}
loop {
match self.chunks.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(chunk))) => {
self.stats.chunks_received += 1;
if chunk.chunk_id == self.next_expected_id {
self.next_expected_id += 1;
self.stats.chunks_delivered += 1;
return Poll::Ready(Some(Ok(chunk)));
} else if chunk.chunk_id > self.next_expected_id {
if self.buffer.len() < self.max_buffer_size {
self.buffer.insert(chunk.chunk_id, chunk);
self.stats.chunks_buffered += 1;
} else {
self.stats.buffer_overflows += 1;
return Poll::Ready(Some(Err(VoirsError::internal(
"streaming",
"Chunk ordering buffer overflow",
))));
}
} else {
self.stats.chunks_dropped += 1;
tracing::warn!(
"Dropping old chunk {} (expected {})",
chunk.chunk_id,
self.next_expected_id
);
}
}
Poll::Ready(Some(Err(e))) => {
self.stats.errors += 1;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
if let Some((&id, _)) = self.buffer.iter().next() {
if let Some(chunk) = self.buffer.remove(&id) {
self.stats.chunks_delivered += 1;
return Poll::Ready(Some(Ok(chunk)));
}
}
return Poll::Ready(None);
}
Poll::Pending => return Poll::Pending,
}
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamStats {
pub chunks_received: usize,
pub chunks_delivered: usize,
pub chunks_buffered: usize,
pub chunks_dropped: usize,
pub buffer_overflows: usize,
pub errors: usize,
}
impl StreamStats {
pub fn ordering_efficiency(&self) -> f32 {
if self.chunks_received == 0 {
return 1.0;
}
self.chunks_delivered as f32 / self.chunks_received as f32
}
pub fn drop_rate(&self) -> f32 {
if self.chunks_received == 0 {
return 0.0;
}
self.chunks_dropped as f32 / self.chunks_received as f32
}
pub fn is_healthy(&self) -> bool {
self.drop_rate() < 0.01 && self.buffer_overflows == 0 &&
self.ordering_efficiency() > 0.95
}
}
pub struct StreamCombiner {
streams: Vec<BoxStream<'static, Result<AudioChunk>>>,
#[allow(dead_code)]
output_format: AudioFormat,
combination_strategy: CombinationStrategy,
}
impl StreamCombiner {
pub fn new(
streams: Vec<BoxStream<'static, Result<AudioChunk>>>,
output_format: AudioFormat,
strategy: CombinationStrategy,
) -> Self {
Self {
streams,
output_format,
combination_strategy: strategy,
}
}
pub async fn combine(self) -> Result<impl Stream<Item = Result<AudioChunk>>> {
match self.combination_strategy {
CombinationStrategy::Concatenate => {
let combined = futures::stream::iter(self.streams)
.then(|stream| async move { stream.collect::<Vec<_>>().await })
.map(futures::stream::iter)
.flatten();
Ok(Box::pin(combined) as BoxStream<'static, Result<AudioChunk>>)
}
CombinationStrategy::Interleave => {
let stream = self.interleave_streams().await?;
Ok(Box::pin(stream) as BoxStream<'static, Result<AudioChunk>>)
}
CombinationStrategy::Mix => {
let stream = self.mix_streams().await?;
Ok(Box::pin(stream) as BoxStream<'static, Result<AudioChunk>>)
}
}
}
async fn interleave_streams(self) -> Result<impl Stream<Item = Result<AudioChunk>>> {
use futures::stream::{select_all, StreamExt};
let indexed_streams: Vec<_> = self
.streams
.into_iter()
.enumerate()
.map(|(stream_idx, stream)| {
stream.map(move |chunk_result| {
chunk_result.map(|mut chunk| {
chunk.chunk_id += stream_idx * 10000;
chunk
})
})
})
.collect();
let combined = select_all(indexed_streams);
Ok(Box::pin(combined) as BoxStream<'static, Result<AudioChunk>>)
}
async fn mix_streams(self) -> Result<impl Stream<Item = Result<AudioChunk>>> {
let mut chunk_groups: std::collections::BTreeMap<usize, Vec<AudioChunk>> =
std::collections::BTreeMap::new();
for stream in self.streams {
let chunks: Vec<Result<AudioChunk>> = stream.collect().await;
for chunk_result in chunks {
match chunk_result {
Ok(chunk) => {
chunk_groups.entry(chunk.chunk_id).or_default().push(chunk);
}
Err(e) => return Err(e),
}
}
}
let mixed_chunks: Vec<Result<AudioChunk>> = chunk_groups
.into_values()
.map(|chunks| {
if chunks.len() == 1 {
Ok(chunks
.into_iter()
.next()
.expect("iterator should have next element"))
} else {
Self::mix_audio_chunks_static(chunks)
}
})
.collect();
Ok(futures::stream::iter(mixed_chunks))
}
fn mix_audio_chunks_static(chunks: Vec<AudioChunk>) -> Result<AudioChunk> {
if chunks.is_empty() {
return Err(VoirsError::internal("streaming", "No chunks to mix"));
}
if chunks.len() == 1 {
return Ok(chunks
.into_iter()
.next()
.expect("iterator should have next element"));
}
let template = &chunks[0];
let chunk_id = template.chunk_id;
let mut mixed_text = String::new();
let mut total_processing_time = Duration::default();
let mut total_phonemes = 0;
let mut total_mel_frames = 0;
let max_samples = chunks
.iter()
.map(|c| c.audio.samples().len())
.max()
.unwrap_or(0);
let mut mixed_samples = vec![0.0f32; max_samples];
let chunk_count = chunks.len() as f32;
for chunk in &chunks {
mixed_text.push_str(&chunk.text);
mixed_text.push(' ');
total_processing_time += chunk.processing_time;
total_phonemes += chunk.metadata.phoneme_count;
total_mel_frames += chunk.metadata.mel_frames;
let samples = chunk.audio.samples();
for (i, &sample) in samples.iter().enumerate() {
if i < mixed_samples.len() {
mixed_samples[i] += sample / chunk_count; }
}
}
let mixed_audio = AudioBuffer::mono(mixed_samples, template.audio.sample_rate());
let mixed_chunk = AudioChunk::new(
chunk_id,
mixed_audio,
mixed_text.trim().to_string(),
total_processing_time / chunks.len() as u32, total_phonemes,
total_mel_frames,
);
Ok(mixed_chunk)
}
}
#[derive(Debug, Clone, Copy)]
pub enum CombinationStrategy {
Concatenate,
Interleave,
Mix,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::audio::AudioBuffer;
#[test]
fn test_streaming_config_validation() {
let mut config = StreamingConfig::default();
assert!(config.validate().is_ok());
config.min_chunk_chars = 200;
config.max_chunk_chars = 200;
assert!(config.validate().is_err());
config = StreamingConfig::default();
config.quality_vs_latency = 1.5;
assert!(config.validate().is_err());
}
#[test]
fn test_config_presets() {
let low_latency = StreamingConfig::low_latency();
assert!(low_latency.max_latency < StreamingConfig::default().max_latency);
assert!(low_latency.quality_vs_latency < 0.5);
let high_quality = StreamingConfig::high_quality();
assert!(high_quality.quality_vs_latency > 0.9);
assert!(high_quality.overlap_frames > StreamingConfig::default().overlap_frames);
}
#[test]
fn test_adaptive_config() {
let mut config = StreamingConfig {
adaptive_chunking: true,
..Default::default()
};
let original_max = config.max_chunk_chars;
config.adapt_for_performance(1.5, Duration::from_millis(800));
assert!(config.max_chunk_chars < original_max);
config.adapt_for_performance(0.1, Duration::from_millis(50));
assert!(config.max_chunk_chars >= original_max * 9 / 10);
}
#[test]
fn test_streaming_state() {
let mut state = StreamingState::default();
assert_eq!(state.chunks_processed, 0);
assert!(!state.is_realtime());
let audio = AudioBuffer::sine_wave(440.0, 1.0, 22050, 0.5);
let chunk = AudioChunk::new(
0,
audio,
"Test text".to_string(),
Duration::from_millis(100),
10,
100,
);
state.update_with_chunk(&chunk);
assert_eq!(state.chunks_processed, 1);
assert!(state.is_realtime()); }
#[test]
fn test_audio_chunk_metrics() {
let audio = AudioBuffer::sine_wave(440.0, 1.0, 22050, 0.5);
let chunk = AudioChunk::new(
0,
audio,
"Test".to_string(),
Duration::from_millis(100),
4,
100,
);
let rtf = chunk.real_time_factor();
assert!(rtf > 0.0);
assert!(rtf < 1.0);
assert!(chunk.is_realtime());
assert!(chunk.efficiency_score() > 0.5);
}
#[test]
fn test_latency_percentiles() {
let mut percentiles = LatencyPercentiles::default();
for i in 1..=100 {
percentiles.add_sample(Duration::from_millis(i));
}
assert_eq!(percentiles.p50(), Duration::from_millis(50));
assert_eq!(percentiles.p95(), Duration::from_millis(95));
assert_eq!(percentiles.p99(), Duration::from_millis(99));
}
#[test]
fn test_latency_stats() {
let mut stats = LatencyStats::default();
stats.update(Duration::from_millis(100));
stats.update(Duration::from_millis(200));
assert_eq!(stats.sample_count, 2);
assert_eq!(stats.average_latency, Duration::from_millis(150));
assert_eq!(stats.max_latency, Duration::from_millis(200));
}
#[test]
fn test_quality_metrics() {
let mut metrics = QualityMetrics::default();
let audio = AudioBuffer::sine_wave(440.0, 1.0, 22050, 0.5);
let chunk = AudioChunk::new(
0,
audio,
"Test".to_string(),
Duration::from_millis(500), 10,
100,
);
metrics.update_with_chunk(&chunk);
assert!(metrics.real_time_factor > 0.0);
assert!(metrics.peak_rtf > 0.0);
assert!(metrics.consistency_score <= 1.0);
}
#[test]
fn test_stream_stats() {
let stats = StreamStats {
chunks_received: 100,
chunks_delivered: 95,
chunks_dropped: 5,
..Default::default()
};
assert_eq!(stats.ordering_efficiency(), 0.95);
assert_eq!(stats.drop_rate(), 0.05);
assert!(!stats.is_healthy()); }
#[test]
fn test_chunk_metadata_serialization() {
let audio = AudioBuffer::sine_wave(440.0, 1.0, 22050, 0.5);
let chunk = AudioChunk::new(
0,
audio,
"Test".to_string(),
Duration::from_millis(100),
10,
100,
);
let json = chunk.export_metadata().unwrap();
assert!(json.contains("phoneme_count"));
assert!(json.contains("mel_frames"));
let metadata: ChunkMetadata = serde_json::from_str(&json).unwrap();
assert_eq!(metadata.phoneme_count, 10);
assert_eq!(metadata.mel_frames, 100);
}
#[tokio::test]
async fn test_ordered_chunk_stream() {
let chunks = vec![
Ok(AudioChunk::new(
2,
AudioBuffer::sine_wave(440.0, 0.1, 22050, 0.5),
"Third".to_string(),
Duration::from_millis(10),
5,
20,
)),
Ok(AudioChunk::new(
0,
AudioBuffer::sine_wave(440.0, 0.1, 22050, 0.5),
"First".to_string(),
Duration::from_millis(10),
5,
20,
)),
Ok(AudioChunk::new(
1,
AudioBuffer::sine_wave(440.0, 0.1, 22050, 0.5),
"Second".to_string(),
Duration::from_millis(10),
5,
20,
)),
];
let stream = futures::stream::iter(chunks);
let mut ordered_stream = OrderedChunkStream::new(Box::pin(stream), 10);
let chunk0 = ordered_stream.next().await.unwrap().unwrap();
assert_eq!(chunk0.chunk_id, 0);
assert_eq!(chunk0.text, "First");
let chunk1 = ordered_stream.next().await.unwrap().unwrap();
assert_eq!(chunk1.chunk_id, 1);
assert_eq!(chunk1.text, "Second");
let chunk2 = ordered_stream.next().await.unwrap().unwrap();
assert_eq!(chunk2.chunk_id, 2);
assert_eq!(chunk2.text, "Third");
let stats = ordered_stream.stats();
assert_eq!(stats.chunks_received, 3);
assert_eq!(stats.chunks_delivered, 3);
assert!(stats.is_healthy());
}
}