mod buffer;
mod operations;
use serde::{ Deserialize, Serialize };
use core::time::Duration;
use core::sync::atomic::{ AtomicU64, AtomicUsize, Ordering };
pub use buffer::BufferStrategy;
pub use operations::
{
ControllableStream,
ControllableStreamBuilder,
StreamingControlApi,
StreamControlStreamBuilder,
};
#[ cfg( all( feature = "websocket_streaming", feature = "streaming_control" ) ) ]
pub use operations::ControllableWebSocketStream;
#[ derive( Debug, Clone, PartialEq, Eq, Serialize, Deserialize ) ]
pub enum StreamState
{
Running,
Paused,
Cancelled,
Completed,
TimedOut,
Error,
}
impl StreamState
{
#[ inline ]
pub( crate ) fn to_u8( &self ) -> u8
{
match self
{
Self::Running => 0,
Self::Paused => 1,
Self::Cancelled => 2,
Self::Completed => 3,
Self::TimedOut => 4,
Self::Error => 5,
}
}
#[ inline ]
pub( crate ) fn from_u8( value : u8 ) -> Self
{
match value
{
0 => Self::Running,
1 => Self::Paused,
2 => Self::Cancelled,
3 => Self::Completed,
4 => Self::TimedOut,
5 => Self::Error,
_ => Self::Error, }
}
}
#[ derive( Debug, Clone, PartialEq, Eq ) ]
pub enum MetricsLevel
{
None,
Basic,
Detailed,
}
#[ derive( Debug, Clone ) ]
pub struct StreamControlConfig
{
pub buffer_size : usize,
pub pause_timeout : Duration,
pub auto_cleanup : bool,
pub max_buffered_chunks : usize,
pub control_operation_timeout : Duration,
pub buffer_strategy : BufferStrategy,
pub metrics_level : MetricsLevel,
pub event_driven_timeouts : bool,
}
impl Default for StreamControlConfig
{
#[ inline ]
fn default() -> Self
{
Self {
buffer_size : 1024 * 1024, pause_timeout : Duration::from_secs( 300 ), auto_cleanup : true,
max_buffered_chunks : 100,
control_operation_timeout : Duration::from_millis( 100 ), buffer_strategy : BufferStrategy::Circular, metrics_level : MetricsLevel::Basic, event_driven_timeouts : true, }
}
}
#[ derive( Debug, Clone ) ]
pub struct StreamControlConfigBuilder
{
config : StreamControlConfig,
}
impl StreamControlConfigBuilder
{
#[ inline ]
#[ must_use ]
pub fn new() -> Self
{
Self {
config : StreamControlConfig::default(),
}
}
#[ inline ]
#[ must_use ]
pub fn buffer_size( mut self, size : usize ) -> Self
{
self.config.buffer_size = size;
self
}
#[ inline ]
#[ must_use ]
pub fn pause_timeout( mut self, timeout : Duration ) -> Self
{
self.config.pause_timeout = timeout;
self
}
#[ inline ]
#[ must_use ]
pub fn auto_cleanup( mut self, enable : bool ) -> Self
{
self.config.auto_cleanup = enable;
self
}
#[ inline ]
#[ must_use ]
pub fn max_buffered_chunks( mut self, count : usize ) -> Self
{
self.config.max_buffered_chunks = count;
self
}
#[ inline ]
#[ must_use ]
pub fn control_operation_timeout( mut self, timeout : Duration ) -> Self
{
self.config.control_operation_timeout = timeout;
self
}
#[ inline ]
#[ must_use ]
pub fn buffer_strategy( mut self, strategy : BufferStrategy ) -> Self
{
self.config.buffer_strategy = strategy;
self
}
#[ inline ]
#[ must_use ]
pub fn metrics_level( mut self, level : MetricsLevel ) -> Self
{
self.config.metrics_level = level;
self
}
#[ inline ]
#[ must_use ]
pub fn event_driven_timeouts( mut self, enable : bool ) -> Self
{
self.config.event_driven_timeouts = enable;
self
}
#[ inline ]
pub fn build( self ) -> Result< StreamControlConfig, crate::error::Error >
{
if self.config.buffer_size == 0
{
return Err( crate::error::Error::ConfigurationError(
"Buffer size must be greater than 0".to_string()
) );
}
if self.config.pause_timeout.is_zero()
{
return Err( crate::error::Error::ConfigurationError(
"Pause timeout must be greater than 0".to_string()
) );
}
if self.config.max_buffered_chunks == 0
{
return Err( crate::error::Error::ConfigurationError(
"Max buffered chunks must be greater than 0".to_string()
) );
}
if self.config.control_operation_timeout.is_zero()
{
return Err( crate::error::Error::ConfigurationError(
"Control operation timeout must be greater than 0".to_string()
) );
}
if let BufferStrategy::Chunked { chunk_size } = self.config.buffer_strategy
{
if chunk_size == 0
{
return Err( crate::error::Error::ConfigurationError(
"Chunked buffer strategy chunk size must be greater than 0".to_string()
) );
}
if chunk_size > self.config.buffer_size
{
return Err( crate::error::Error::ConfigurationError(
"Chunked buffer strategy chunk size cannot exceed total buffer size".to_string()
) );
}
}
Ok( self.config )
}
}
impl StreamControlConfig
{
#[ inline ]
#[ must_use ]
pub fn builder() -> StreamControlConfigBuilder
{
StreamControlConfigBuilder::new()
}
}
#[ derive( Debug ) ]
pub struct StreamMetrics
{
pub total_chunks : AtomicU64,
pub buffer_size : AtomicUsize,
pub bytes_received : AtomicU64,
pub pause_count : AtomicU64,
pub resume_count : AtomicU64,
pub state_changes : AtomicU64,
pub peak_buffer_size : AtomicUsize,
pub avg_control_response_time_us : AtomicU64,
pub control_operations : AtomicU64,
pub buffer_overflows : AtomicU64,
pub items_sent : AtomicU64,
}
impl StreamMetrics
{
pub fn new() -> Self
{
Self
{
total_chunks : AtomicU64::new( 0 ),
buffer_size : AtomicUsize::new( 0 ),
bytes_received : AtomicU64::new( 0 ),
pause_count : AtomicU64::new( 0 ),
resume_count : AtomicU64::new( 0 ),
state_changes : AtomicU64::new( 0 ),
peak_buffer_size : AtomicUsize::new( 0 ),
avg_control_response_time_us : AtomicU64::new( 0 ),
control_operations : AtomicU64::new( 0 ),
buffer_overflows : AtomicU64::new( 0 ),
items_sent : AtomicU64::new( 0 ),
}
}
}
#[ derive( Debug, Clone ) ]
pub struct StreamMetricsSnapshot
{
pub total_chunks : u64,
pub buffer_size : usize,
pub bytes_received : u64,
pub pause_count : u64,
pub resume_count : u64,
pub state_changes : u64,
pub peak_buffer_size : usize,
pub avg_control_response_time_us : u64,
pub control_operations : u64,
pub buffer_overflows : u64,
pub items_sent : u64,
}
impl Default for StreamMetrics
{
fn default() -> Self
{
Self {
total_chunks : AtomicU64::new( 0 ),
buffer_size : AtomicUsize::new( 0 ),
bytes_received : AtomicU64::new( 0 ),
pause_count : AtomicU64::new( 0 ),
resume_count : AtomicU64::new( 0 ),
state_changes : AtomicU64::new( 0 ),
peak_buffer_size : AtomicUsize::new( 0 ),
avg_control_response_time_us : AtomicU64::new( 0 ),
control_operations : AtomicU64::new( 0 ),
buffer_overflows : AtomicU64::new( 0 ),
items_sent : AtomicU64::new( 0 ),
}
}
}
impl StreamMetrics
{
pub fn snapshot( &self ) -> StreamMetricsSnapshot
{
StreamMetricsSnapshot {
total_chunks : self.total_chunks.load( Ordering::Relaxed ),
buffer_size : self.buffer_size.load( Ordering::Relaxed ),
bytes_received : self.bytes_received.load( Ordering::Relaxed ),
pause_count : self.pause_count.load( Ordering::Relaxed ),
resume_count : self.resume_count.load( Ordering::Relaxed ),
state_changes : self.state_changes.load( Ordering::Relaxed ),
peak_buffer_size : self.peak_buffer_size.load( Ordering::Relaxed ),
avg_control_response_time_us : self.avg_control_response_time_us.load( Ordering::Relaxed ),
control_operations : self.control_operations.load( Ordering::Relaxed ),
buffer_overflows : self.buffer_overflows.load( Ordering::Relaxed ),
items_sent : self.items_sent.load( Ordering::Relaxed ),
}
}
}