tokio_interval_buffer/lib.rs
1//! Provides an `IntervalBuffer` that can be used to buffer a stream and emit the values at a regular interval.
2//!
3//! This is useful for when you receive streaming data but want to parse it in bulk.
4//!
5//! ```rust,no_run
6//! use futures::prelude::*;
7//! use irc::client::prelude::*;
8//! use tokio_interval_buffer::IntervalBuffer;
9//!
10//! #[tokio::main]
11//! async fn main() {
12//! let mut client = Client::from_config(Config {
13//! nickname: Some(String::from("...")),
14//! server: Some(String::from("...")),
15//! channels: vec![String::from("...")],
16//! ..Default::default()
17//! })
18//! .await
19//! .expect("Could not create an irc client");
20//!
21//! // Take the IRC stream and process all the messages every 10 seconds
22//! let mut buffered_receiver = IntervalBuffer::<_, _, failure::Error>::new(
23//! client.stream().unwrap().map_err(|e| e.into()),
24//! std::time::Duration::from_secs(10),
25//! );
26//!
27//! while let Some(item) = buffered_receiver.next().await {
28//! println!("Buffer: {:?}", item);
29//! }
30//! }
31//! ```
32
33use std::{
34 pin::Pin,
35 task::{Context, Poll},
36 time::Duration,
37};
38use tokio::time::Interval;
39
40/// This buffer takes a stream and an interval, and will emit a Vec<Stream::Item> every interval.
41///
42/// If no messages are send in that interval, the stream will not send anything.
43/// If the initial stream ends, this stream ends as well.
44///
45/// If either the stream or the interval timer emits an error, this stream will emit the same error.
46/// For the timer, `From<tokio::timer::Error>` has to be emitted for the error.
47/// In the future I want to be able to configure a `.map_err` function for this.
48pub struct IntervalBuffer<Stream, Item, Error, Container: Insertable<Item> = Vec<Item>>
49where
50 Stream: futures::Stream<Item = Result<Item, Error>>,
51 Error: From<tokio::time::error::Error>,
52{
53 stream: Stream,
54 timer: Interval,
55 buffer: Container,
56}
57
58/// A generic component used for the container for the IntervalBuffer.
59/// This can be implemented for any type, and is only implemented for Vec<T> by default.
60///
61/// For performance-specific implementations this should be replaced for whatever works for your situation.
62pub trait Insertable<T>: Sized {
63 /// Insert an item in the buffer
64 fn insert(&mut self, t: T);
65
66 /// Return the current content of the buffer, and clear itself.
67 ///
68 /// If the container is empty, you can return None.
69 ///
70 /// For Vec<T> this is implemented as `std::mem::replace(self, Vec::new())`
71 fn return_content_and_clear(&mut self) -> Option<Self>;
72}
73
74impl<T> Insertable<T> for Vec<T> {
75 fn insert(&mut self, t: T) {
76 self.push(t);
77 }
78
79 fn return_content_and_clear(&mut self) -> Option<Vec<T>> {
80 if self.is_empty() {
81 None
82 } else {
83 // Make sure to preserve the capacity so we have a decent estimate of how big the buffer should be.
84 // TODO: Maybe keep a history of the last 5 capacities and take the average of this? Currently this will stay the largest size the application has seen.
85 let new_vec = Vec::with_capacity(self.capacity());
86 Some(std::mem::replace(self, new_vec))
87 }
88 }
89}
90
91impl<Stream, Item, Error, Container> IntervalBuffer<Stream, Item, Error, Container>
92where
93 Stream: futures::Stream<Item = Result<Item, Error>>,
94 Error: From<tokio::time::error::Error>,
95 Container: Insertable<Item>,
96{
97 /// Create a new IntervalBuffer with a default container. This will simply call `new_with_container(.., .., Container::default())`. See that function for more informaiton.
98 pub fn new(stream: Stream, interval: Duration) -> Self
99 where
100 Container: Default,
101 {
102 Self::new_with_container(stream, interval, Container::default())
103 }
104
105 /// Create a new IntervalBuffer with a given stream, interval and container.
106 ///
107 /// The first time this stream will be able to send data is after `interval: Duration`. This will not emit immediately.
108 ///
109 /// If either the stream or the internal timer emits an error, this stream will emit an error.
110 ///
111 /// If the stream ends (by returning `Ok(Poll::Ready(None))`), this stream will immediately return. The internal timer will not be polled.
112 pub fn new_with_container(stream: Stream, interval: Duration, container: Container) -> Self {
113 let timer = tokio::time::interval(interval);
114 IntervalBuffer {
115 stream,
116 timer,
117 buffer: container,
118 }
119 }
120
121 /// Get a reference to the internal buffer
122 pub fn buffer(&self) -> &Container {
123 &self.buffer
124 }
125
126 /// Get a mutable reference to the internal buffer.
127 ///
128 /// This can be used to e.g. set the capacity for a Vec.
129 pub fn buffer_mut(&mut self) -> &mut Container {
130 &mut self.buffer
131 }
132}
133
134impl<Stream, Item, Error, Container> futures::Stream
135 for IntervalBuffer<Stream, Item, Error, Container>
136where
137 Stream: futures::Stream<Item = Result<Item, Error>>,
138 Error: From<tokio::time::error::Error>,
139 Container: Insertable<Item>,
140{
141 type Item = Result<Container, Error>;
142
143 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144 let inner = unsafe { Pin::get_unchecked_mut(self) };
145 let mut stream = unsafe { Pin::new_unchecked(&mut inner.stream) };
146
147 loop {
148 match stream.as_mut().poll_next(cx) {
149 Poll::Pending => break,
150 Poll::Ready(Some(Ok(v))) => inner.buffer.insert(v),
151 Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
152 Poll::Ready(None) => return Poll::Ready(None),
153 }
154 }
155
156 let mut timer = unsafe { Pin::new_unchecked(&mut inner.timer) };
157 match timer.poll_tick(cx) {
158 Poll::Pending => Poll::Pending,
159 Poll::Ready(_) => {
160 let result = inner.buffer.return_content_and_clear();
161 if let Some(container) = result {
162 Poll::Ready(Some(Ok(container)))
163 } else {
164 Poll::Pending
165 }
166 }
167 }
168 }
169}