#[ cfg( feature = "streaming" ) ]
mod private
{
use futures_core::Stream;
use std::pin::Pin;
use std::task::{ Context, Poll };
use std::time::{ Duration, Instant };
#[ derive( Debug, Clone ) ]
pub struct BufferConfig
{
pub max_buffer_size : usize,
pub max_buffer_time : Duration,
pub flush_on_newline : bool,
}
impl BufferConfig
{
#[ must_use ]
pub fn new() -> Self
{
Self::default()
}
#[ must_use ]
pub fn with_max_buffer_size( mut self, size : usize ) -> Self
{
self.max_buffer_size = size;
self
}
#[ must_use ]
pub fn with_max_buffer_time( mut self, duration : Duration ) -> Self
{
self.max_buffer_time = duration;
self
}
#[ must_use ]
pub fn with_flush_on_newline( mut self, enabled : bool ) -> Self
{
self.flush_on_newline = enabled;
self
}
}
impl Default for BufferConfig
{
fn default() -> Self
{
Self
{
max_buffer_size : 64,
max_buffer_time : Duration::from_millis( 100 ),
flush_on_newline : true,
}
}
}
#[ derive( Debug ) ]
pub struct BufferedStream< S >
{
inner : S,
buffer : String,
config : BufferConfig,
last_flush : Instant,
}
impl< S > BufferedStream< S >
where
S : Stream< Item = String > + Unpin,
{
#[ must_use ]
pub fn new( stream : S, config : BufferConfig ) -> Self
{
Self
{
inner : stream,
buffer : String::new(),
config,
last_flush : Instant::now(),
}
}
fn should_flush( &self ) -> bool
{
if self.buffer.len() >= self.config.max_buffer_size
{
return true;
}
if self.last_flush.elapsed() >= self.config.max_buffer_time
{
return true;
}
if self.config.flush_on_newline && self.buffer.contains( '\n' )
{
return true;
}
false
}
fn flush( &mut self ) -> Option< String >
{
if self.buffer.is_empty()
{
None
}
else
{
let content = self.buffer.clone();
self.buffer.clear();
self.last_flush = Instant::now();
Some( content )
}
}
}
impl< S > Stream for BufferedStream< S >
where
S : Stream< Item = String > + Unpin,
{
type Item = String;
fn poll_next( mut self : Pin< &mut Self >, cx : &mut Context< '_ > ) -> Poll< Option< Self::Item > >
{
match Pin::new( &mut self.inner ).poll_next( cx )
{
Poll::Ready( Some( item ) ) => {
self.buffer.push_str( &item );
if self.should_flush()
{
Poll::Ready( self.flush() )
}
else
{
cx.waker().wake_by_ref();
Poll::Pending
}
},
Poll::Ready( None ) => {
Poll::Ready( self.flush() )
},
Poll::Pending => {
if !self.buffer.is_empty() && self.should_flush()
{
Poll::Ready( self.flush() )
}
else
{
Poll::Pending
}
}
}
}
}
pub trait StreamBufferExt : Stream< Item = String > + Sized + Unpin
{
fn with_buffer( self, config : BufferConfig ) -> BufferedStream< Self >
{
BufferedStream::new( self, config )
}
fn with_buffer_default( self ) -> BufferedStream< Self >
{
BufferedStream::new( self, BufferConfig::default() )
}
}
impl< T > StreamBufferExt for T
where
T : Stream< Item = String > + Unpin,
{
}
}
#[ cfg( feature = "streaming" ) ]
crate::mod_interface!
{
exposed use
{
BufferConfig,
BufferedStream,
StreamBufferExt,
};
}