#[ cfg( all( feature = "enabled", feature = "streaming" ) ) ]
mod private
{
use futures_core::Stream;
use std::pin::Pin;
use std::task::{ Context, Poll };
use std::time::{ Duration, Instant };
use tokio::time::Sleep;
#[ derive( Debug, Clone ) ]
pub struct BufferConfig
{
pub max_buffer_size : usize,
pub max_buffer_time : Duration,
pub flush_on_newline : bool,
}
impl BufferConfig
{
#[ must_use ]
pub fn new() -> Self
{
Self::default()
}
#[ must_use ]
pub fn with_max_buffer_size( mut self, size : usize ) -> Self
{
self.max_buffer_size = size;
self
}
#[ must_use ]
pub fn with_max_buffer_time( mut self, duration : Duration ) -> Self
{
self.max_buffer_time = duration;
self
}
#[ must_use ]
pub fn with_flush_on_newline( mut self, enabled : bool ) -> Self
{
self.flush_on_newline = enabled;
self
}
}
impl Default for BufferConfig
{
fn default() -> Self
{
Self
{
max_buffer_size : 64,
max_buffer_time : Duration::from_millis( 100 ),
flush_on_newline : true,
}
}
}
#[ derive( Debug ) ]
pub struct BufferedStream< S >
{
inner : S,
buffer : String,
config : BufferConfig,
last_flush : Instant,
_flush_timer : Option< Pin< Box< Sleep > > >,
}
impl< S > BufferedStream< S >
where
S : Stream< Item = String > + Unpin,
{
#[ must_use ]
pub fn new( stream : S, config : BufferConfig ) -> Self
{
Self
{
inner : stream,
buffer : String::new(),
config,
last_flush : Instant::now(),
_flush_timer : None,
}
}
fn should_flush( &self ) -> bool
{
if self.buffer.len() >= self.config.max_buffer_size
{
return true;
}
if self.last_flush.elapsed() >= self.config.max_buffer_time
{
return true;
}
if self.config.flush_on_newline && self.buffer.contains( '\n' )
{
return true;
}
false
}
fn flush( &mut self ) -> Option< String >
{
if self.buffer.is_empty()
{
None
}
else
{
let content = self.buffer.clone();
self.buffer.clear();
self.last_flush = Instant::now();
Some( content )
}
}
}
impl< S > Stream for BufferedStream< S >
where
S : Stream< Item = String > + Unpin,
{
type Item = String;
fn poll_next( mut self : Pin< &mut Self >, cx : &mut Context< '_ > ) -> Poll< Option< Self::Item > >
{
match Pin::new( &mut self.inner ).poll_next( cx )
{
Poll::Ready( Some( item ) ) =>
{
self.buffer.push_str( &item );
if self.should_flush()
{
Poll::Ready( self.flush() )
}
else
{
cx.waker().wake_by_ref();
Poll::Pending
}
},
Poll::Ready( None ) =>
{
Poll::Ready( self.flush() )
},
Poll::Pending =>
{
if !self.buffer.is_empty() && self.should_flush()
{
Poll::Ready( self.flush() )
}
else
{
Poll::Pending
}
}
}
}
}
pub trait StreamBufferExt : Stream< Item = String > + Sized + Unpin
{
fn with_buffer( self, config : BufferConfig ) -> BufferedStream< Self >
{
BufferedStream::new( self, config )
}
fn with_buffer_default( self ) -> BufferedStream< Self >
{
BufferedStream::new( self, BufferConfig::default() )
}
}
impl< T > StreamBufferExt for T
where
T : Stream< Item = String > + Unpin,
{
}
#[ cfg( test ) ]
mod tests
{
use super::*;
use futures_util::{ stream, StreamExt };
#[ tokio::test ]
async fn test_buffer_config_creation()
{
let config = BufferConfig::new();
assert_eq!( config.max_buffer_size, 64 );
assert_eq!( config.max_buffer_time, Duration::from_millis( 100 ) );
assert!( config.flush_on_newline );
}
#[ tokio::test ]
async fn test_buffer_config_builder()
{
let config = BufferConfig::new()
.with_max_buffer_size( 128 )
.with_max_buffer_time( Duration::from_millis( 200 ) )
.with_flush_on_newline( false );
assert_eq!( config.max_buffer_size, 128 );
assert_eq!( config.max_buffer_time, Duration::from_millis( 200 ) );
assert!( !config.flush_on_newline );
}
#[ tokio::test ]
async fn test_buffered_stream_basic()
{
let items = vec![ "a".to_string(), "b".to_string(), "c".to_string() ];
let stream = stream::iter( items );
let config = BufferConfig::new()
.with_max_buffer_size( 10 )
.with_flush_on_newline( false );
let mut buffered = stream.with_buffer( config );
let result = buffered.next().await;
assert!( result.is_some() );
}
#[ tokio::test ]
async fn test_buffered_stream_size_threshold()
{
let items = vec![ "x".to_string(); 100 ]; let stream = stream::iter( items );
let config = BufferConfig::new()
.with_max_buffer_size( 10 )
.with_flush_on_newline( false );
let mut buffered = stream.with_buffer( config );
let mut chunks = Vec::new();
while let Some( chunk ) = buffered.next().await
{
chunks.push( chunk );
}
assert!( chunks.len() > 1 );
}
#[ tokio::test ]
async fn test_buffered_stream_flush_on_newline()
{
let items = vec![ "hello\n".to_string(), "world".to_string() ];
let stream = stream::iter( items );
let config = BufferConfig::new()
.with_max_buffer_size( 100 )
.with_flush_on_newline( true );
let mut buffered = stream.with_buffer( config );
let first = buffered.next().await;
assert!( first.is_some() );
assert!( first.unwrap().contains( '\n' ) );
}
}
}
#[ cfg( all( feature = "enabled", feature = "streaming" ) ) ]
crate::mod_interface!
{
exposed use
{
BufferConfig,
BufferedStream,
StreamBufferExt,
};
}