1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
//! Provides an `IntervalBuffer` that can be used to buffer a stream and emit the values at a regular interval.
//!
//! This is useful for when you receive streaming data but want to parse it in bulk.
//!
//! ```rust,no_run
//! use futures::prelude::*;
//! use irc::client::prelude::*;
//! use tokio_interval_buffer::IntervalBuffer;
//!
//! #[tokio::main]
//! async fn main() {
//!     let mut client = Client::from_config(Config {
//!         nickname: Some(String::from("...")),
//!         server: Some(String::from("...")),
//!         channels: vec![String::from("...")],
//!         ..Default::default()
//!     })
//!     .await
//!     .expect("Could not create an irc client");
//!
//!     // Take the IRC stream and process all the messages every 10 seconds
//!     let mut buffered_receiver = IntervalBuffer::<_, _, failure::Error>::new(
//!         client.stream().unwrap().map_err(|e| e.into()),
//!         std::time::Duration::from_secs(10),
//!     );
//!
//!     while let Some(item) = buffered_receiver.next().await {
//!         println!("Buffer: {:?}", item);
//!     }
//! }
//! ```

use std::{
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};
use tokio::time::Interval;

/// This buffer takes a stream and an interval, and will emit a Vec<Stream::Item> every interval.
///
/// If no messages are send in that interval, the stream will not send anything.
/// If the initial stream ends, this stream ends as well.
///
/// If either the stream or the interval timer emits an error, this stream will emit the same error.
/// For the timer, `From<tokio::timer::Error>` has to be emitted for the error.
/// In the future I want to be able to configure a `.map_err` function for this.
pub struct IntervalBuffer<Stream, Item, Error, Container: Insertable<Item> = Vec<Item>>
where
    Stream: futures::Stream<Item = Result<Item, Error>>,
    Error: From<tokio::time::error::Error>,
{
    stream: Stream,
    timer: Interval,
    buffer: Container,
}

/// A generic component used for the container for the IntervalBuffer.
/// This can be implemented for any type, and is only implemented for Vec<T> by default.
///
/// For performance-specific implementations this should be replaced for whatever works for your situation.
pub trait Insertable<T>: Sized {
    /// Insert an item in the buffer
    fn insert(&mut self, t: T);

    /// Return the current content of the buffer, and clear itself.
    ///
    /// If the container is empty, you can return None.
    ///
    /// For Vec<T> this is implemented as `std::mem::replace(self, Vec::new())`
    fn return_content_and_clear(&mut self) -> Option<Self>;
}

impl<T> Insertable<T> for Vec<T> {
    fn insert(&mut self, t: T) {
        self.push(t);
    }

    fn return_content_and_clear(&mut self) -> Option<Vec<T>> {
        if self.is_empty() {
            None
        } else {
            // Make sure to preserve the capacity so we have a decent estimate of how big the buffer should be.
            // TODO: Maybe keep a history of the last 5 capacities and take the average of this? Currently this will stay the largest size the application has seen.
            let new_vec = Vec::with_capacity(self.capacity());
            Some(std::mem::replace(self, new_vec))
        }
    }
}

impl<Stream, Item, Error, Container> IntervalBuffer<Stream, Item, Error, Container>
where
    Stream: futures::Stream<Item = Result<Item, Error>>,
    Error: From<tokio::time::error::Error>,
    Container: Insertable<Item>,
{
    /// Create a new IntervalBuffer with a default container. This will simply call `new_with_container(.., .., Container::default())`. See that function for more informaiton.
    pub fn new(stream: Stream, interval: Duration) -> Self
    where
        Container: Default,
    {
        Self::new_with_container(stream, interval, Container::default())
    }

    /// Create a new IntervalBuffer with a given stream, interval and container.
    ///
    /// The first time this stream will be able to send data is after `interval: Duration`. This will not emit immediately.
    ///
    /// If either the stream or the internal timer emits an error, this stream will emit an error.
    ///
    /// If the stream ends (by returning `Ok(Poll::Ready(None))`), this stream will immediately return. The internal timer will not be polled.
    pub fn new_with_container(stream: Stream, interval: Duration, container: Container) -> Self {
        let timer = tokio::time::interval(interval);
        IntervalBuffer {
            stream,
            timer,
            buffer: container,
        }
    }

    /// Get a reference to the internal buffer
    pub fn buffer(&self) -> &Container {
        &self.buffer
    }

    /// Get a mutable reference to the internal buffer.
    ///
    /// This can be used to e.g. set the capacity for a Vec.
    pub fn buffer_mut(&mut self) -> &mut Container {
        &mut self.buffer
    }
}

impl<Stream, Item, Error, Container> futures::Stream
    for IntervalBuffer<Stream, Item, Error, Container>
where
    Stream: futures::Stream<Item = Result<Item, Error>>,
    Error: From<tokio::time::error::Error>,
    Container: Insertable<Item>,
{
    type Item = Result<Container, Error>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let inner = unsafe { Pin::get_unchecked_mut(self) };
        let mut stream = unsafe { Pin::new_unchecked(&mut inner.stream) };

        loop {
            match stream.as_mut().poll_next(cx) {
                Poll::Pending => break,
                Poll::Ready(Some(Ok(v))) => inner.buffer.insert(v),
                Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
                Poll::Ready(None) => return Poll::Ready(None),
            }
        }

        let mut timer = unsafe { Pin::new_unchecked(&mut inner.timer) };
        match timer.poll_tick(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(_) => {
                let result = inner.buffer.return_content_and_clear();
                if let Some(container) = result {
                    Poll::Ready(Some(Ok(container)))
                } else {
                    Poll::Pending
                }
            }
        }
    }
}