#[ cfg( feature = "streaming_control" ) ]
mod private
{
use core::time::Duration;
use std::sync::Arc;
#[ cfg( feature = "streaming_control" ) ]
#[ derive( Debug, Clone, Copy, PartialEq, Eq ) ]
pub enum StreamState
{
Ready,
Streaming,
Paused,
Cancelled,
}
#[ cfg( feature = "streaming_control" ) ]
impl core::fmt::Display for StreamState
{
fn fmt( &self, f : &mut core::fmt::Formatter< '_ > ) -> core::fmt::Result
{
match self
{
StreamState::Ready => write!( f, "Ready" ),
StreamState::Streaming => write!( f, "Streaming" ),
StreamState::Paused => write!( f, "Paused" ),
StreamState::Cancelled => write!( f, "Cancelled" ),
}
}
}
#[ cfg( feature = "streaming_control" ) ]
#[ derive( Debug, Clone ) ]
pub enum StreamControlError
{
InvalidStateTransition {
from : StreamState,
to : StreamState
},
TimeoutError,
BufferOverflow {
limit : usize
},
StreamCancelled,
GeneralError( String ),
}
#[ cfg( feature = "streaming_control" ) ]
impl core::fmt::Display for StreamControlError
{
fn fmt( &self, f : &mut core::fmt::Formatter< '_ > ) -> core::fmt::Result
{
match self
{
StreamControlError::InvalidStateTransition { from, to } =>
{
write!( f, "Invalid state transition from {from} to {to}" )
},
StreamControlError::TimeoutError => write!( f, "Stream control operation timed out" ),
StreamControlError::BufferOverflow { limit } =>
{
write!( f, "Buffer overflow : exceeded limit of {limit} bytes" )
},
StreamControlError::StreamCancelled => write!( f, "Stream was cancelled" ),
StreamControlError::GeneralError( msg ) => write!( f, "Stream control error : {msg}" ),
}
}
}
#[ cfg( feature = "streaming_control" ) ]
impl std::error::Error for StreamControlError
{}
#[ cfg( feature = "streaming_control" ) ]
#[ derive( Debug, Clone ) ]
pub struct StreamMetrics
{
pub pause_count : u64,
pub resume_count : u64,
pub total_pause_duration : Duration,
pub last_pause_start : Option< std::time::Instant >,
pub total_buffered_bytes : u64,
}
#[ cfg( feature = "streaming_control" ) ]
impl StreamMetrics
{
#[ inline ]
#[ must_use ]
pub fn new() -> Self
{
Self
{
pause_count : 0,
resume_count : 0,
total_pause_duration : Duration::from_secs( 0 ),
last_pause_start : None,
total_buffered_bytes : 0,
}
}
#[ inline ]
pub fn record_pause( &mut self )
{
self.pause_count += 1;
self.last_pause_start = Some( std::time::Instant::now() );
}
#[ inline ]
pub fn record_resume( &mut self )
{
self.resume_count += 1;
if let Some( pause_start ) = self.last_pause_start.take()
{
self.total_pause_duration += pause_start.elapsed();
}
}
#[ inline ]
pub fn record_buffered_bytes( &mut self, bytes : u64 )
{
self.total_buffered_bytes += bytes;
}
}
#[ cfg( feature = "streaming_control" ) ]
impl Default for StreamMetrics
{
fn default() -> Self
{
Self::new()
}
}
#[ cfg( feature = "streaming_control" ) ]
#[ derive( Debug ) ]
pub struct StreamBuffer
{
buffer : Arc< tokio::sync::Mutex< Vec< u8 > > >,
capacity : usize,
}
#[ cfg( feature = "streaming_control" ) ]
impl StreamBuffer
{
#[ inline ]
#[ must_use ]
pub fn new( capacity : usize ) -> Self
{
Self
{
buffer : Arc::new( tokio::sync::Mutex::new( Vec::with_capacity( capacity ) ) ),
capacity,
}
}
#[ inline ]
pub async fn write( &self, data : Vec< u8 > ) -> Result< (), StreamControlError >
{
let mut buffer = self.buffer.lock().await;
if buffer.len() + data.len() > self.capacity
{
return Err( StreamControlError::BufferOverflow { limit : self.capacity } );
}
buffer.extend( data );
Ok( () )
}
#[ inline ]
pub async fn read( &self, size : usize ) -> Result< Vec< u8 >, StreamControlError >
{
let mut buffer = self.buffer.lock().await;
if buffer.len() < size
{
return Ok( buffer.drain( .. ).collect() );
}
Ok( buffer.drain( ..size ).collect() )
}
#[ inline ]
pub async fn len( &self ) -> usize
{
self.buffer.lock().await.len()
}
#[ inline ]
pub async fn is_empty( &self ) -> bool
{
self.buffer.lock().await.is_empty()
}
#[ inline ]
pub fn capacity( &self ) -> usize
{
self.capacity
}
#[ inline ]
pub async fn clear( &self )
{
self.buffer.lock().await.clear();
}
}
#[ cfg( feature = "streaming_control" ) ]
pub struct StreamControl
{
state : Arc< tokio::sync::RwLock< StreamState > >,
metrics : Arc< tokio::sync::Mutex< StreamMetrics > >,
timeout : Option< Duration >,
cancellation_token : Arc< tokio::sync::Mutex< Option< tokio_util::sync::CancellationToken > > >,
state_callbacks : Arc< tokio::sync::Mutex< Vec< Box< dyn Fn( StreamState, StreamState ) + Send + Sync > > > >,
}
#[ cfg( feature = "streaming_control" ) ]
impl StreamControl
{
#[ inline ]
#[ must_use ]
pub fn new() -> Self
{
Self
{
state : Arc::new( tokio::sync::RwLock::new( StreamState::Ready ) ),
metrics : Arc::new( tokio::sync::Mutex::new( StreamMetrics::new() ) ),
timeout : None,
cancellation_token : Arc::new( tokio::sync::Mutex::new( None ) ),
state_callbacks : Arc::new( tokio::sync::Mutex::new( Vec::new() ) ),
}
}
#[ inline ]
#[ must_use ]
pub fn with_timeout( timeout : Duration ) -> Self
{
Self
{
state : Arc::new( tokio::sync::RwLock::new( StreamState::Ready ) ),
metrics : Arc::new( tokio::sync::Mutex::new( StreamMetrics::new() ) ),
timeout : Some( timeout ),
cancellation_token : Arc::new( tokio::sync::Mutex::new( None ) ),
state_callbacks : Arc::new( tokio::sync::Mutex::new( Vec::new() ) ),
}
}
#[ inline ]
pub async fn state( &self ) -> StreamState
{
*self.state.read().await
}
#[ inline ]
pub async fn is_paused( &self ) -> bool
{
*self.state.read().await == StreamState::Paused
}
#[ inline ]
pub async fn is_cancelled( &self ) -> bool
{
*self.state.read().await == StreamState::Cancelled
}
#[ inline ]
pub async fn start( &self ) -> Result< (), StreamControlError >
{
let mut state = self.state.write().await;
if *state != StreamState::Ready
{
return Err( StreamControlError::InvalidStateTransition {
from : *state,
to : StreamState::Streaming,
} );
}
let old_state = *state;
*state = StreamState::Streaming;
{
let mut token = self.cancellation_token.lock().await;
*token = Some( tokio_util::sync::CancellationToken::new() );
}
drop( state );
self.notify_state_change( old_state, StreamState::Streaming ).await;
Ok( () )
}
#[ inline ]
pub async fn pause( &self ) -> Result< (), StreamControlError >
{
let mut state = self.state.write().await;
if *state != StreamState::Streaming
{
return Err( StreamControlError::InvalidStateTransition {
from : *state,
to : StreamState::Paused,
} );
}
let old_state = *state;
*state = StreamState::Paused;
{
let mut metrics = self.metrics.lock().await;
metrics.record_pause();
}
if let Some( timeout ) = self.timeout
{
let control_clone = self.clone();
tokio ::spawn( async move
{
tokio ::time::sleep( timeout ).await;
let current_state = control_clone.state().await;
if current_state == StreamState::Paused
{
let _ = control_clone.cancel().await;
}
} );
}
drop( state );
self.notify_state_change( old_state, StreamState::Paused ).await;
Ok( () )
}
#[ inline ]
pub async fn resume( &self ) -> Result< (), StreamControlError >
{
let mut state = self.state.write().await;
match *state
{
StreamState::Paused =>
{
let old_state = *state;
*state = StreamState::Streaming;
{
let mut metrics = self.metrics.lock().await;
metrics.record_resume();
}
drop( state );
self.notify_state_change( old_state, StreamState::Streaming ).await;
Ok( () )
},
StreamState::Cancelled =>
{
Err( StreamControlError::StreamCancelled )
},
_ =>
{
Err( StreamControlError::InvalidStateTransition {
from : *state,
to : StreamState::Streaming,
} )
}
}
}
#[ inline ]
pub async fn cancel( &self ) -> Result< (), StreamControlError >
{
let mut state = self.state.write().await;
let old_state = *state;
if *state == StreamState::Cancelled
{
return Ok( () );
}
*state = StreamState::Cancelled;
{
let token_guard = self.cancellation_token.lock().await;
if let Some( token ) = token_guard.as_ref()
{
token.cancel();
}
}
drop( state );
self.notify_state_change( old_state, StreamState::Cancelled ).await;
Ok( () )
}
#[ inline ]
pub async fn cleanup_on_cancel( &self, buffer : &StreamBuffer ) -> Result< (), StreamControlError >
{
if self.is_cancelled().await
{
buffer.clear().await;
}
Ok( () )
}
#[ inline ]
pub async fn get_metrics( &self ) -> StreamMetrics
{
self.metrics.lock().await.clone()
}
#[ inline ]
pub async fn on_state_change< F >( &self, callback : F )
where
F: Fn( StreamState, StreamState ) + Send + Sync + 'static,
{
let mut callbacks = self.state_callbacks.lock().await;
callbacks.push( Box::new( callback ) );
}
async fn notify_state_change( &self, old_state : StreamState, new_state : StreamState )
{
let callbacks = self.state_callbacks.lock().await;
for callback in callbacks.iter()
{
callback( old_state, new_state );
}
}
#[ inline ]
pub async fn cancellation_token( &self ) -> Option< tokio_util::sync::CancellationToken >
{
self.cancellation_token.lock().await.clone()
}
}
#[ cfg( feature = "streaming_control" ) ]
impl core::fmt::Debug for StreamControl
{
#[ inline ]
fn fmt( &self, f : &mut core::fmt::Formatter< '_ > ) -> core::fmt::Result
{
f.debug_struct( "StreamControl" )
.field( "timeout", &self.timeout )
.finish()
}
}
#[ cfg( feature = "streaming_control" ) ]
impl Clone for StreamControl
{
#[ inline ]
fn clone( &self ) -> Self
{
Self
{
state : self.state.clone(),
metrics : self.metrics.clone(),
timeout : self.timeout,
cancellation_token : self.cancellation_token.clone(),
state_callbacks : self.state_callbacks.clone(),
}
}
}
#[ cfg( feature = "streaming_control" ) ]
impl Default for StreamControl
{
#[ inline ]
fn default() -> Self
{
Self::new()
}
}
#[ cfg( all( feature = "streaming", feature = "streaming_control" ) ) ]
pub struct ControlledStream< T >
{
#[ allow( dead_code ) ]
stream : std::pin::Pin< Box< dyn futures_core::Stream< Item = T > + Send > >,
control : StreamControl,
#[ allow( dead_code ) ]
buffer : StreamBuffer,
}
#[ cfg( all( feature = "streaming", feature = "streaming_control" ) ) ]
impl< T > core::fmt::Debug for ControlledStream< T >
{
#[ inline ]
fn fmt( &self, f : &mut core::fmt::Formatter< '_ > ) -> core::fmt::Result
{
f.debug_struct( "ControlledStream" )
.field( "control", &self.control )
.field( "buffer", &self.buffer )
.finish()
}
}
#[ cfg( all( feature = "streaming", feature = "streaming_control" ) ) ]
impl< T > ControlledStream< T >
{
#[ inline ]
#[ must_use ]
pub fn new(
stream : std::pin::Pin< Box< dyn futures_core::Stream< Item = T > + Send > >,
control : StreamControl
) -> Self
{
Self
{
stream,
control,
buffer : StreamBuffer::new( 1024 * 1024 ), }
}
#[ inline ]
pub fn control( &self ) -> &StreamControl
{
&self.control
}
#[ inline ]
pub fn is_paused_sync( &self ) -> bool
{
false
}
#[ inline ]
pub fn is_cancelled_sync( &self ) -> bool
{
false
}
}
}
#[ cfg( feature = "streaming_control" ) ]
crate ::mod_interface!
{
exposed use
{
StreamState,
StreamControlError,
StreamMetrics,
StreamBuffer,
StreamControl,
ControlledStream,
};
}