mod private
{
use core::time::Duration;
use std::time::Instant;
use core::pin::Pin;
use core::task::{ Context, Poll };
use futures_core::Stream;
use tokio::time::Sleep;
#[ derive( Debug, Clone ) ]
pub struct BufferConfig
{
pub max_buffer_size : usize,
pub max_buffer_time : Duration,
pub flush_on_newline : bool,
}
impl Default for BufferConfig
{
#[ inline ]
fn default() -> Self
{
Self
{
max_buffer_size : 100,
max_buffer_time : Duration::from_millis( 50 ),
flush_on_newline : true,
}
}
}
impl BufferConfig
{
#[ must_use ]
#[ inline ]
pub fn new() -> Self
{
Self::default()
}
#[ must_use ]
#[ inline ]
pub fn with_max_size( mut self, size : usize ) -> Self
{
self.max_buffer_size = size;
self
}
#[ must_use ]
#[ inline ]
pub fn with_max_time( mut self, time : Duration ) -> Self
{
self.max_buffer_time = time;
self
}
#[ must_use ]
#[ inline ]
pub fn with_flush_on_newline( mut self, enabled : bool ) -> Self
{
self.flush_on_newline = enabled;
self
}
}
#[ derive( Debug ) ]
pub struct BufferedStream< S >
{
inner : S,
buffer : String,
config : BufferConfig,
last_flush : Instant,
_flush_timer : Option< Pin< Box< Sleep > > >,
}
impl< S > BufferedStream< S >
where
S : Stream< Item = String > + Unpin,
{
#[ inline ]
pub fn new( stream : S, config : BufferConfig ) -> Self
{
Self
{
inner : stream,
buffer : String::new(),
config,
last_flush : Instant::now(),
_flush_timer : None,
}
}
#[ inline ]
fn should_flush( &self ) -> bool
{
if self.buffer.is_empty()
{
return false;
}
if self.buffer.len() >= self.config.max_buffer_size
{
return true;
}
if self.last_flush.elapsed() >= self.config.max_buffer_time
{
return true;
}
if self.config.flush_on_newline && self.buffer.contains( '\n' )
{
return true;
}
false
}
#[ inline ]
fn flush( &mut self ) -> Option< String >
{
if self.buffer.is_empty()
{
return None;
}
let result = self.buffer.clone();
self.buffer.clear();
self.last_flush = Instant::now();
Some( result )
}
}
impl< S > Stream for BufferedStream< S >
where
S : Stream< Item = String > + Unpin,
{
type Item = String;
#[ inline ]
fn poll_next( mut self : Pin< &mut Self >, cx : &mut Context< '_ > ) -> Poll< Option< Self::Item > >
{
loop
{
match Pin::new( &mut self.inner ).poll_next( cx )
{
Poll::Ready( Some( item ) ) =>
{
self.buffer.push_str( &item );
if self.should_flush()
{
if let Some( flushed ) = self.flush()
{
return Poll::Ready( Some( flushed ) );
}
}
},
Poll::Ready( None ) =>
{
if let Some( flushed ) = self.flush()
{
return Poll::Ready( Some( flushed ) );
}
return Poll::Ready( None );
},
Poll::Pending =>
{
if self.should_flush()
{
if let Some( flushed ) = self.flush()
{
return Poll::Ready( Some( flushed ) );
}
}
return Poll::Pending;
},
}
}
}
}
pub trait StreamBufferExt : Stream< Item = String > + Sized + Unpin
{
#[ inline ]
fn with_buffer( self, config : BufferConfig ) -> BufferedStream< Self >
{
BufferedStream::new( self, config )
}
#[ inline ]
fn with_buffer_default( self ) -> BufferedStream< Self >
{
BufferedStream::new( self, BufferConfig::default() )
}
}
impl< S > StreamBufferExt for S where S : Stream< Item = String > + Unpin {}
#[ cfg( test ) ]
mod tests
{
use super::*;
#[ test ]
fn test_buffer_config_creation()
{
let config = BufferConfig::new();
assert_eq!( config.max_buffer_size, 100 );
assert_eq!( config.max_buffer_time, Duration::from_millis( 50 ) );
assert!( config.flush_on_newline );
}
#[ test ]
fn test_buffer_config_builder()
{
let config = BufferConfig::new()
.with_max_size( 200 )
.with_max_time( Duration::from_millis( 100 ) )
.with_flush_on_newline( false );
assert_eq!( config.max_buffer_size, 200 );
assert_eq!( config.max_buffer_time, Duration::from_millis( 100 ) );
assert!( !config.flush_on_newline );
}
}
}
crate::mod_interface!
{
exposed use
{
BufferConfig,
BufferedStream,
StreamBufferExt,
};
}