use api_gemini::models::streaming_control::*;
use futures::stream;
use std::time::Duration;
use tokio::time::timeout;
#[ tokio::test ]
async fn test_optimized_streaming_control_basic() -> Result< (), Box< dyn std::error::Error > >
{
let test_data = vec![ "chunk1", "chunk2", "chunk3", "chunk4", "chunk5" ];
let stream = futures::stream::unfold( test_data.into_iter(), |mut iter| async move {
if let Some( item ) = iter.next()
{
tokio ::time::sleep( Duration::from_millis( 50 ) ).await;
Some( ( Ok( item.to_string() ), iter ) )
} else {
None
}
} );
let boxed_stream = Box::pin( stream );
let config = StreamControlConfig::builder()
.buffer_size( 1024 )
.pause_timeout( Duration::from_secs( 10 ) )
.auto_cleanup( true )
.max_buffered_chunks( 10 )
.build()?;
let mut controllable_stream = ControllableStream::new( boxed_stream, config );
assert_eq!( controllable_stream.state(), StreamState::Running );
assert!( !controllable_stream.is_paused() );
assert!( !controllable_stream.is_cancelled() );
let first_chunk = timeout( Duration::from_secs( 1 ), controllable_stream.next() ).await?;
assert!( first_chunk.is_some() );
assert_eq!( first_chunk.unwrap()?, "chunk1" );
controllable_stream.pause().await?;
assert_eq!( controllable_stream.state(), StreamState::Paused );
assert!( controllable_stream.is_paused() );
controllable_stream.resume().await?;
assert_eq!( controllable_stream.state(), StreamState::Running );
assert!( !controllable_stream.is_paused() );
let metrics = controllable_stream.get_metrics();
assert!( metrics.pause_count >= 1 );
assert!( metrics.resume_count >= 1 );
assert!( metrics.state_changes >= 2 );
println!( "✓ Optimized streaming control works correctly with new features" );
Ok( () )
}
#[ tokio::test ]
async fn test_buffer_strategies() -> Result< (), Box< dyn std::error::Error > >
{
let strategies = vec![
BufferStrategy::Vector,
BufferStrategy::Circular,
BufferStrategy::Chunked { chunk_size : 10 },
];
for strategy in strategies
{
let test_data = vec![ "data1", "data2", "data3" ];
let stream = stream::iter( test_data.into_iter().map( |s| Ok( s.to_string() ) ) );
let boxed_stream = Box::pin( stream );
let config = StreamControlConfig::builder()
.buffer_strategy( strategy.clone() )
.build()?;
let mut controllable_stream = ControllableStream::new( boxed_stream, config );
let first_chunk = timeout( Duration::from_millis( 100 ), controllable_stream.next() ).await?;
assert!( first_chunk.is_some() );
println!( "✓ Buffer strategy {:?} works correctly", strategy );
}
Ok( () )
}
#[ tokio::test ]
async fn test_metrics_levels() -> Result< (), Box< dyn std::error::Error > >
{
let levels = vec![
MetricsLevel::None,
MetricsLevel::Basic,
MetricsLevel::Detailed,
];
for level in levels
{
let test_data = vec![ "item1", "item2" ];
let stream = stream::iter( test_data.into_iter().map( |s| Ok( s.to_string() ) ) );
let boxed_stream = Box::pin( stream );
let config = StreamControlConfig::builder()
.metrics_level( level.clone() )
.build()?;
let mut controllable_stream = ControllableStream::new( boxed_stream, config );
let _ = timeout( Duration::from_millis( 100 ), controllable_stream.next() ).await?;
let metrics = controllable_stream.get_metrics();
match level
{
MetricsLevel::None => {
println!( "✓ MetricsLevel::None - minimal metrics collection" );
},
MetricsLevel::Basic => {
println!( "✓ MetricsLevel::Basic - basic metrics collected (total_chunks : {})", metrics.total_chunks );
assert!( metrics.total_chunks > 0, "Basic metrics should count chunks" );
},
MetricsLevel::Detailed => {
println!( "✓ MetricsLevel::Detailed - detailed metrics collected (total_chunks : {})", metrics.total_chunks );
assert!( metrics.total_chunks > 0, "Detailed metrics should count chunks" );
},
}
}
Ok( () )
}
#[ tokio::test ]
async fn test_configuration_validation() -> Result< (), Box< dyn std::error::Error > >
{
let valid_config = StreamControlConfig::builder()
.buffer_size( 1024 )
.buffer_strategy( BufferStrategy::Chunked { chunk_size : 100 } )
.build();
assert!( valid_config.is_ok() );
let invalid_config = StreamControlConfig::builder()
.buffer_size( 100 )
.buffer_strategy( BufferStrategy::Chunked { chunk_size : 200 } )
.build();
assert!( invalid_config.is_err() );
let invalid_timeout = StreamControlConfig::builder()
.control_operation_timeout( Duration::ZERO )
.build();
assert!( invalid_timeout.is_err() );
println!( "✓ Configuration validation works correctly" );
Ok( () )
}