tokio_interval_buffer/
lib.rs

1//! Provides an `IntervalBuffer` that can be used to buffer a stream and emit the values at a regular interval.
2//!
3//! This is useful for when you receive streaming data but want to parse it in bulk.
4//!
5//! ```rust,no_run
6//! use futures::prelude::*;
7//! use irc::client::prelude::*;
8//! use tokio_interval_buffer::IntervalBuffer;
9//!
10//! #[tokio::main]
11//! async fn main() {
12//!     let mut client = Client::from_config(Config {
13//!         nickname: Some(String::from("...")),
14//!         server: Some(String::from("...")),
15//!         channels: vec![String::from("...")],
16//!         ..Default::default()
17//!     })
18//!     .await
19//!     .expect("Could not create an irc client");
20//!
21//!     // Take the IRC stream and process all the messages every 10 seconds
22//!     let mut buffered_receiver = IntervalBuffer::<_, _, failure::Error>::new(
23//!         client.stream().unwrap().map_err(|e| e.into()),
24//!         std::time::Duration::from_secs(10),
25//!     );
26//!
27//!     while let Some(item) = buffered_receiver.next().await {
28//!         println!("Buffer: {:?}", item);
29//!     }
30//! }
31//! ```
32
33use std::{
34    pin::Pin,
35    task::{Context, Poll},
36    time::Duration,
37};
38use tokio::time::Interval;
39
40/// This buffer takes a stream and an interval, and will emit a Vec<Stream::Item> every interval.
41///
42/// If no messages are send in that interval, the stream will not send anything.
43/// If the initial stream ends, this stream ends as well.
44///
45/// If either the stream or the interval timer emits an error, this stream will emit the same error.
46/// For the timer, `From<tokio::timer::Error>` has to be emitted for the error.
47/// In the future I want to be able to configure a `.map_err` function for this.
48pub struct IntervalBuffer<Stream, Item, Error, Container: Insertable<Item> = Vec<Item>>
49where
50    Stream: futures::Stream<Item = Result<Item, Error>>,
51    Error: From<tokio::time::error::Error>,
52{
53    stream: Stream,
54    timer: Interval,
55    buffer: Container,
56}
57
58/// A generic component used for the container for the IntervalBuffer.
59/// This can be implemented for any type, and is only implemented for Vec<T> by default.
60///
61/// For performance-specific implementations this should be replaced for whatever works for your situation.
62pub trait Insertable<T>: Sized {
63    /// Insert an item in the buffer
64    fn insert(&mut self, t: T);
65
66    /// Return the current content of the buffer, and clear itself.
67    ///
68    /// If the container is empty, you can return None.
69    ///
70    /// For Vec<T> this is implemented as `std::mem::replace(self, Vec::new())`
71    fn return_content_and_clear(&mut self) -> Option<Self>;
72}
73
74impl<T> Insertable<T> for Vec<T> {
75    fn insert(&mut self, t: T) {
76        self.push(t);
77    }
78
79    fn return_content_and_clear(&mut self) -> Option<Vec<T>> {
80        if self.is_empty() {
81            None
82        } else {
83            // Make sure to preserve the capacity so we have a decent estimate of how big the buffer should be.
84            // TODO: Maybe keep a history of the last 5 capacities and take the average of this? Currently this will stay the largest size the application has seen.
85            let new_vec = Vec::with_capacity(self.capacity());
86            Some(std::mem::replace(self, new_vec))
87        }
88    }
89}
90
91impl<Stream, Item, Error, Container> IntervalBuffer<Stream, Item, Error, Container>
92where
93    Stream: futures::Stream<Item = Result<Item, Error>>,
94    Error: From<tokio::time::error::Error>,
95    Container: Insertable<Item>,
96{
97    /// Create a new IntervalBuffer with a default container. This will simply call `new_with_container(.., .., Container::default())`. See that function for more informaiton.
98    pub fn new(stream: Stream, interval: Duration) -> Self
99    where
100        Container: Default,
101    {
102        Self::new_with_container(stream, interval, Container::default())
103    }
104
105    /// Create a new IntervalBuffer with a given stream, interval and container.
106    ///
107    /// The first time this stream will be able to send data is after `interval: Duration`. This will not emit immediately.
108    ///
109    /// If either the stream or the internal timer emits an error, this stream will emit an error.
110    ///
111    /// If the stream ends (by returning `Ok(Poll::Ready(None))`), this stream will immediately return. The internal timer will not be polled.
112    pub fn new_with_container(stream: Stream, interval: Duration, container: Container) -> Self {
113        let timer = tokio::time::interval(interval);
114        IntervalBuffer {
115            stream,
116            timer,
117            buffer: container,
118        }
119    }
120
121    /// Get a reference to the internal buffer
122    pub fn buffer(&self) -> &Container {
123        &self.buffer
124    }
125
126    /// Get a mutable reference to the internal buffer.
127    ///
128    /// This can be used to e.g. set the capacity for a Vec.
129    pub fn buffer_mut(&mut self) -> &mut Container {
130        &mut self.buffer
131    }
132}
133
134impl<Stream, Item, Error, Container> futures::Stream
135    for IntervalBuffer<Stream, Item, Error, Container>
136where
137    Stream: futures::Stream<Item = Result<Item, Error>>,
138    Error: From<tokio::time::error::Error>,
139    Container: Insertable<Item>,
140{
141    type Item = Result<Container, Error>;
142
143    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144        let inner = unsafe { Pin::get_unchecked_mut(self) };
145        let mut stream = unsafe { Pin::new_unchecked(&mut inner.stream) };
146
147        loop {
148            match stream.as_mut().poll_next(cx) {
149                Poll::Pending => break,
150                Poll::Ready(Some(Ok(v))) => inner.buffer.insert(v),
151                Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
152                Poll::Ready(None) => return Poll::Ready(None),
153            }
154        }
155
156        let mut timer = unsafe { Pin::new_unchecked(&mut inner.timer) };
157        match timer.poll_tick(cx) {
158            Poll::Pending => Poll::Pending,
159            Poll::Ready(_) => {
160                let result = inner.buffer.return_content_and_clear();
161                if let Some(container) = result {
162                    Poll::Ready(Some(Ok(container)))
163                } else {
164                    Poll::Pending
165                }
166            }
167        }
168    }
169}