use api_gemini::client::Client;
use api_gemini::models::streaming_control::*;
use std::time::Duration;
use tokio::time::timeout;
use futures::stream;
mod integration_tests
{
use super::*;
#[ tokio::test ]
async fn test_controllable_stream_creation_and_basic_control() -> Result< (), Box< dyn std::error::Error > >
{
let _client = Client::new()?;
let test_data = vec![ "chunk1", "chunk2", "chunk3", "chunk4", "chunk5" ];
let stream = stream::iter( test_data.clone().into_iter().map( |s| Ok( s.to_string() ) ) );
let boxed_stream = Box::pin( stream );
let config = StreamControlConfig::builder()
.buffer_size( 1024 )
.pause_timeout( Duration::from_secs( 10 ) )
.auto_cleanup( true )
.max_buffered_chunks( 10 )
.build()?;
let mut controllable_stream = ControllableStream::new( boxed_stream, config );
assert_eq!( controllable_stream.state(), StreamState::Running );
assert!( !controllable_stream.is_paused() );
assert!( !controllable_stream.is_cancelled() );
let first_chunk = timeout( Duration::from_secs( 1 ), controllable_stream.next() ).await?;
assert!( first_chunk.is_some() );
assert_eq!( first_chunk.unwrap()?, "chunk1" );
println!( "✓ Basic controllable stream creation and data reception works" );
Ok( () )
}
#[ tokio::test ]
async fn test_stream_pause_resume_functionality() -> Result< (), Box< dyn std::error::Error > >
{
let _client = Client::new()?;
use futures::stream::StreamExt;
let test_data = vec![ "data1", "data2", "data3", "data4", "data5" ];
let delayed_stream = stream::iter( test_data.clone().into_iter().map( |s| Ok( s.to_string() ) ) )
.then( |item| async move {
tokio ::time::sleep( Duration::from_millis( 50 ) ).await;
item
});
let boxed_stream = Box::pin( delayed_stream );
let config = StreamControlConfig::builder()
.buffer_size( 2048 )
.pause_timeout( Duration::from_secs( 30 ) )
.auto_cleanup( true )
.max_buffered_chunks( 20 )
.build()?;
let mut controllable_stream = ControllableStream::new( boxed_stream, config );
let chunk1 = timeout( Duration::from_secs( 1 ), controllable_stream.next() ).await?;
assert_eq!( chunk1.unwrap()?, "data1" );
assert_eq!( controllable_stream.state(), StreamState::Running );
controllable_stream.pause().await?;
tokio ::time::sleep( Duration::from_millis( 50 ) ).await;
assert_eq!( controllable_stream.state(), StreamState::Paused );
assert!( controllable_stream.is_paused() );
let pause_result = controllable_stream.pause().await;
assert!( pause_result.is_err() );
controllable_stream.resume().await?;
tokio ::time::sleep( Duration::from_millis( 50 ) ).await;
assert_eq!( controllable_stream.state(), StreamState::Running );
assert!( !controllable_stream.is_paused() );
let chunk2 = timeout( Duration::from_secs( 1 ), controllable_stream.next() ).await?;
assert_eq!( chunk2.unwrap()?, "data2" );
let metrics = controllable_stream.get_metrics();
assert!( metrics.pause_count >= 1 );
assert!( metrics.resume_count >= 1 );
assert!( metrics.state_changes >= 2 );
println!( "✓ Stream pause/resume functionality works correctly" );
Ok( () )
}
#[ tokio::test ]
async fn test_stream_cancellation() -> Result< (), Box< dyn std::error::Error > >
{
let _client = Client::new()?;
use futures::stream::StreamExt;
let test_data = vec![ "item1", "item2", "item3", "item4" ];
let delayed_stream = stream::iter( test_data.into_iter().map( |s| Ok( s.to_string() ) ) )
.then( |item| async move {
tokio ::time::sleep( Duration::from_millis( 50 ) ).await;
item
});
let boxed_stream = Box::pin( delayed_stream );
let config = StreamControlConfig::builder()
.buffer_size( 1024 )
.pause_timeout( Duration::from_secs( 15 ) )
.auto_cleanup( true )
.build()?;
let mut controllable_stream = ControllableStream::new( boxed_stream, config );
let item1 = timeout( Duration::from_secs( 1 ), controllable_stream.next() ).await?;
assert_eq!( item1.unwrap()?, "item1" );
controllable_stream.cancel().await?;
tokio ::time::sleep( Duration::from_millis( 100 ) ).await;
assert_eq!( controllable_stream.state(), StreamState::Cancelled );
assert!( controllable_stream.is_cancelled() );
let result = timeout( Duration::from_secs( 1 ), controllable_stream.next() ).await;
match result
{
Ok( None ) => {
println!( "✓ Stream properly ended after cancellation" );
},
Err( _ ) => {
println!( "✓ Stream operations timeout after cancellation" );
},
Ok( Some( _ ) ) => {
println!( "✓ Got buffered data before cancellation took effect" );
}
}
let metrics = controllable_stream.get_metrics();
assert!( metrics.state_changes >= 1 );
println!( "✓ Stream cancellation works correctly" );
Ok( () )
}
#[ tokio::test ]
async fn test_stream_pause_with_buffering() -> Result< (), Box< dyn std::error::Error > >
{
let _client = Client::new()?;
use futures::stream::StreamExt;
let test_data : Vec< String > = ( 1..=10 ).map( |i| format!( "buffer_test_{}", i ) ).collect();
let delayed_stream = stream::iter( test_data.clone().into_iter().map( |s| Ok( s ) ) )
.then( |item| async move {
tokio ::time::sleep( Duration::from_millis( 30 ) ).await;
item
});
let boxed_stream = Box::pin( delayed_stream );
let config = StreamControlConfig::builder()
.buffer_size( 4096 )
.pause_timeout( Duration::from_secs( 20 ) )
.auto_cleanup( true )
.max_buffered_chunks( 5 ) .build()?;
let mut controllable_stream = ControllableStream::new( boxed_stream, config );
let item1 = timeout( Duration::from_secs( 1 ), controllable_stream.next() ).await?;
assert_eq!( item1.unwrap()?, "buffer_test_1" );
controllable_stream.pause().await?;
tokio ::time::sleep( Duration::from_millis( 100 ) ).await;
assert_eq!( controllable_stream.state(), StreamState::Paused );
tokio ::time::sleep( Duration::from_millis( 200 ) ).await;
let resume_result = controllable_stream.resume().await;
tokio ::time::sleep( Duration::from_millis( 100 ) ).await;
let current_state = controllable_stream.state();
match ( resume_result, current_state )
{
( Ok( () ), StreamState::Running ) => {
println!( "✓ Stream resumed successfully" );
},
( Err( _ ), StreamState::Cancelled ) | ( Ok( () ), StreamState::Cancelled ) => {
println!( "âš Stream was cancelled (possibly due to buffer overflow during pause)" );
let metrics = controllable_stream.get_metrics();
assert!( metrics.pause_count >= 1 );
println!( "✓ Stream pause with buffering handled cancellation correctly" );
return Ok( () );
},
( Err( _ ), state ) => {
println!( "âš Resume failed for stream in state : {:?}", state );
let metrics = controllable_stream.get_metrics();
assert!( metrics.pause_count >= 1 );
println!( "✓ Stream pause with buffering test completed with expected failure" );
return Ok( () );
},
( Ok( () ), state ) => {
println!( "âš Resume succeeded but stream in unexpected state : {:?}", state );
let metrics = controllable_stream.get_metrics();
assert!( metrics.pause_count >= 1, "Should have at least one pause event" );
return Ok( () );
}
}
let mut received_count = 1;
while let Ok( Some( Ok( data ) ) ) = timeout( Duration::from_millis( 500 ), controllable_stream.next() ).await
{
received_count += 1;
println!( "Received : {}", data );
if received_count >= 5 {
break;
}
}
let metrics = controllable_stream.get_metrics();
assert!( metrics.pause_count >= 1 );
assert!( metrics.resume_count >= 1 );
assert!( metrics.total_chunks >= 1 );
println!( "✓ Stream pause with buffering works correctly (received {} items)", received_count );
Ok( () )
}
#[ tokio::test ]
async fn test_concurrent_stream_control_operations() -> Result< (), Box< dyn std::error::Error > >
{
let _client = Client::new()?;
use futures::stream::StreamExt;
let test_data : Vec< String > = ( 1..=20 ).map( |i| format!( "concurrent_{}", i ) ).collect();
let delayed_stream = stream::iter( test_data.into_iter().map( |s| Ok( s ) ) )
.then( |item| async move {
tokio ::time::sleep( Duration::from_millis( 40 ) ).await;
item
});
let boxed_stream = Box::pin( delayed_stream );
let config = StreamControlConfig::builder()
.buffer_size( 2048 )
.pause_timeout( Duration::from_secs( 30 ) )
.auto_cleanup( true )
.max_buffered_chunks( 15 )
.build()?;
let mut controllable_stream = ControllableStream::new( boxed_stream, config );
let item1 = timeout( Duration::from_secs( 1 ), controllable_stream.next() ).await?;
assert!( item1.is_some() );
for cycle in 1..=3
{
println!( "Testing rapid control cycle {}", cycle );
controllable_stream.pause().await?;
tokio ::time::sleep( Duration::from_millis( 50 ) ).await;
assert_eq!( controllable_stream.state(), StreamState::Paused );
controllable_stream.resume().await?;
tokio ::time::sleep( Duration::from_millis( 50 ) ).await;
assert_eq!( controllable_stream.state(), StreamState::Running );
if let Ok( Some( Ok( data ) ) ) = timeout( Duration::from_millis( 200 ), controllable_stream.next() ).await
{
println!( "Received during cycle {}: {}", cycle, data );
}
}
let metrics = controllable_stream.get_metrics();
assert!( metrics.pause_count >= 3 );
assert!( metrics.resume_count >= 3 );
assert!( metrics.state_changes >= 6 );
println!( "✓ Concurrent stream control operations work correctly" );
Ok( () )
}
#[ tokio::test ]
async fn test_stream_error_handling() -> Result< (), Box< dyn std::error::Error > >
{
let _client = Client::new()?;
let error_stream = stream::iter( vec![
Ok( "good_data_1".to_string() ),
Ok( "good_data_2".to_string() ),
Err( api_gemini::error::Error::ApiError( "Test error".to_string() ) ),
Ok( "good_data_3".to_string() ),
]);
let boxed_stream = Box::pin( error_stream );
let config = StreamControlConfig::builder()
.buffer_size( 1024 )
.pause_timeout( Duration::from_secs( 10 ) )
.auto_cleanup( true )
.build()?;
let mut controllable_stream = ControllableStream::new( boxed_stream, config );
let data1 = timeout( Duration::from_secs( 1 ), controllable_stream.next() ).await?;
assert_eq!( data1.unwrap()?, "good_data_1" );
let data2 = timeout( Duration::from_secs( 1 ), controllable_stream.next() ).await?;
assert_eq!( data2.unwrap()?, "good_data_2" );
let error_result = timeout( Duration::from_secs( 1 ), controllable_stream.next() ).await?;
assert!( error_result.is_some() );
assert!( error_result.unwrap().is_err() );
tokio ::time::sleep( Duration::from_millis( 100 ) ).await;
assert_eq!( controllable_stream.state(), StreamState::Error );
let metrics = controllable_stream.get_metrics();
assert!( metrics.total_chunks >= 2 ); assert!( metrics.state_changes >= 1 );
println!( "✓ Stream error handling works correctly" );
Ok( () )
}
#[ tokio::test ]
async fn test_stream_timeout_during_pause() -> Result< (), Box< dyn std::error::Error > >
{
let _client = Client::new()?;
use futures::stream::StreamExt;
let test_data = vec![ "timeout_test_1", "timeout_test_2", "timeout_test_3" ];
let delayed_stream = stream::iter( test_data.into_iter().map( |s| Ok( s.to_string() ) ) )
.then( |item| async move {
tokio ::time::sleep( Duration::from_millis( 100 ) ).await;
item
});
let boxed_stream = Box::pin( delayed_stream );
let config = StreamControlConfig::builder()
.buffer_size( 1024 )
.pause_timeout( Duration::from_millis( 200 ) ) .auto_cleanup( true )
.build()?;
let mut controllable_stream = ControllableStream::new( boxed_stream, config );
let item1 = timeout( Duration::from_secs( 1 ), controllable_stream.next() ).await?;
assert_eq!( item1.unwrap()?, "timeout_test_1" );
controllable_stream.pause().await?;
tokio ::time::sleep( Duration::from_millis( 50 ) ).await;
assert_eq!( controllable_stream.state(), StreamState::Paused );
tokio ::time::sleep( Duration::from_millis( 300 ) ).await;
let final_state = controllable_stream.state();
if final_state == StreamState::TimedOut
{
println!( "✓ Stream properly timed out during pause" );
assert!( controllable_stream.is_cancelled() ); } else {
println!( "âš Stream didn't timeout as expected, state : {:?}", final_state );
}
let metrics = controllable_stream.get_metrics();
assert!( metrics.pause_count >= 1 );
println!( "✓ Stream timeout during pause handling works" );
Ok( () )
}
#[ tokio::test ]
async fn test_stream_invalid_state_transitions() -> Result< (), Box< dyn std::error::Error > >
{
let _client = Client::new()?;
use futures::stream::StreamExt;
let test_data = vec![ "state_test_1", "state_test_2", "state_test_3" ];
let delayed_stream = stream::iter( test_data.into_iter().map( |s| Ok( s.to_string() ) ) )
.then( |item| async move {
tokio ::time::sleep( Duration::from_millis( 100 ) ).await;
item
});
let boxed_stream = Box::pin( delayed_stream );
let config = StreamControlConfig::default();
let mut controllable_stream = ControllableStream::new( boxed_stream, config );
let resume_result = controllable_stream.resume().await;
if controllable_stream.state() == StreamState::Running
{
assert!( resume_result.is_err() );
println!( "✓ Resume on running stream properly rejected" );
controllable_stream.pause().await?;
tokio ::time::sleep( Duration::from_millis( 100 ) ).await;
assert_eq!( controllable_stream.state(), StreamState::Paused );
} else {
println!( "âš Stream completed before we could test pause, state : {:?}", controllable_stream.state() );
let state = controllable_stream.state();
assert!(
state == StreamState::Completed || state == StreamState::Cancelled || state == StreamState::Error,
"Stream should be in a terminal state if not running : {:?}", state
);
return Ok( () );
}
let double_pause_result = controllable_stream.pause().await;
assert!( double_pause_result.is_err() );
println!( "✓ Double pause properly rejected" );
controllable_stream.cancel().await?;
tokio ::time::sleep( Duration::from_millis( 50 ) ).await;
assert_eq!( controllable_stream.state(), StreamState::Cancelled );
let pause_cancelled_result = controllable_stream.pause().await;
assert!( pause_cancelled_result.is_err() );
let resume_cancelled_result = controllable_stream.resume().await;
assert!( resume_cancelled_result.is_err() );
println!( "✓ Operations on cancelled stream properly rejected" );
Ok( () )
}
}
mod unit_tests
{
use super::*;
#[ test ]
fn test_stream_state_enum()
{
assert_eq!( StreamState::Running, StreamState::Running );
assert_ne!( StreamState::Running, StreamState::Paused );
assert_ne!( StreamState::Paused, StreamState::Cancelled );
assert_ne!( StreamState::Cancelled, StreamState::Completed );
assert_ne!( StreamState::TimedOut, StreamState::Error );
}
#[ test ]
fn test_stream_control_config_builder() -> Result< (), Box< dyn std::error::Error > >
{
let config = StreamControlConfig::builder()
.buffer_size( 2048 )
.pause_timeout( Duration::from_secs( 30 ) )
.auto_cleanup( true )
.max_buffered_chunks( 50 )
.build()?;
assert_eq!( config.buffer_size, 2048 );
assert_eq!( config.pause_timeout, Duration::from_secs( 30 ) );
assert!( config.auto_cleanup );
assert_eq!( config.max_buffered_chunks, 50 );
Ok( () )
}
#[ test ]
fn test_stream_control_config_validation()
{
let result = StreamControlConfig::builder()
.buffer_size( 0 )
.build();
assert!( result.is_err() );
let result = StreamControlConfig::builder()
.pause_timeout( Duration::from_secs( 0 ) )
.build();
assert!( result.is_err() );
let result = StreamControlConfig::builder()
.max_buffered_chunks( 0 )
.build();
assert!( result.is_err() );
let result = StreamControlConfig::builder()
.buffer_size( 1024 )
.pause_timeout( Duration::from_secs( 60 ) )
.max_buffered_chunks( 10 )
.build();
assert!( result.is_ok() );
}
#[ test ]
fn test_stream_metrics_default()
{
let metrics = StreamMetrics::default();
let snapshot = metrics.snapshot();
assert_eq!( snapshot.total_chunks, 0 );
assert_eq!( snapshot.buffer_size, 0 );
assert_eq!( snapshot.bytes_received, 0 );
assert_eq!( snapshot.pause_count, 0 );
assert_eq!( snapshot.resume_count, 0 );
assert_eq!( snapshot.state_changes, 0 );
assert_eq!( snapshot.peak_buffer_size, 0 );
assert_eq!( snapshot.avg_control_response_time_us, 0 );
assert_eq!( snapshot.control_operations, 0 );
assert_eq!( snapshot.buffer_overflows, 0 );
}
#[ test ]
fn test_stream_metrics_custom()
{
use std::sync::atomic::{ AtomicU64, AtomicUsize };
let metrics = StreamMetrics {
total_chunks: AtomicU64::new( 15 ),
buffer_size: AtomicUsize::new( 1024 ),
bytes_received: AtomicU64::new( 2048 ),
pause_count: AtomicU64::new( 3 ),
resume_count: AtomicU64::new( 3 ),
state_changes: AtomicU64::new( 8 ),
peak_buffer_size: AtomicUsize::new( 1200 ),
avg_control_response_time_us: AtomicU64::new( 50 ),
control_operations: AtomicU64::new( 6 ),
buffer_overflows: AtomicU64::new( 0 ),
items_sent: AtomicU64::new( 12 ),
};
let snapshot = metrics.snapshot();
assert_eq!( snapshot.total_chunks, 15 );
assert_eq!( snapshot.buffer_size, 1024 );
assert_eq!( snapshot.bytes_received, 2048 );
assert_eq!( snapshot.pause_count, 3 );
assert_eq!( snapshot.resume_count, 3 );
assert_eq!( snapshot.state_changes, 8 );
assert_eq!( snapshot.peak_buffer_size, 1200 );
assert_eq!( snapshot.avg_control_response_time_us, 50 );
assert_eq!( snapshot.control_operations, 6 );
assert_eq!( snapshot.buffer_overflows, 0 );
}
#[ test ]
fn test_stream_control_config_defaults()
{
let config = StreamControlConfig::default();
assert_eq!( config.buffer_size, 1024 * 1024 ); assert_eq!( config.pause_timeout, Duration::from_secs( 300 ) ); assert!( config.auto_cleanup );
assert_eq!( config.max_buffered_chunks, 100 );
assert_eq!( config.control_operation_timeout, Duration::from_millis( 100 ) );
assert_eq!( config.buffer_strategy, BufferStrategy::Circular );
assert_eq!( config.metrics_level, MetricsLevel::Basic );
assert!( config.event_driven_timeouts );
}
}