use std::fmt::Debug;
use std::time::{Duration, Instant};
use std::vec::Drain;
#[derive(Debug)]
pub enum PollResult {
Ready,
NotReady(Option<Duration>),
}
#[derive(Debug)]
pub struct BufBatch<I: Debug> {
items: Vec<I>,
first_item: Option<Instant>,
max_size: usize,
max_duration: Duration,
}
impl<I: Debug> BufBatch<I> {
pub fn new(max_size: usize, max_duration: Duration) -> BufBatch<I> {
assert!(max_size > 0, "BufBatch::new bad max_size");
BufBatch {
items: Vec::new(),
first_item: None,
max_size,
max_duration,
}
}
pub fn poll(&self) -> PollResult {
debug_assert!(self.items.is_empty() ^ self.first_item.is_some());
if self.items.len() >= self.max_size {
return PollResult::Ready;
}
if let Some(first_item) = self.first_item {
let since_start = Instant::now().duration_since(first_item);
if since_start >= self.max_duration {
return PollResult::Ready;
}
return PollResult::NotReady(Some(self.max_duration - since_start));
}
PollResult::NotReady(None)
}
pub fn append(&mut self, item: I) -> &I {
debug_assert!(self.items.is_empty() ^ self.first_item.is_some());
assert!(
self.items.len() < self.max_size,
"BufBatch::append on full batch"
);
self.first_item.get_or_insert_with(|| Instant::now());
self.items.push(item);
self.items.last().unwrap()
}
pub fn clear(&mut self) {
self.first_item = None;
self.items.clear();
}
pub fn drain(&mut self) -> Drain<I> {
self.first_item = None;
self.items.drain(0..)
}
pub fn pop(&mut self) -> Option<I> {
if self.items.len() == 1 {
self.first_item = None;
}
self.items.pop()
}
pub fn remove(&mut self, index: usize) -> I {
if self.items.len() == 1 {
self.first_item = None;
}
self.items.remove(index)
}
pub fn into_vec(self) -> Vec<I> {
self.items
}
pub fn as_slice(&self) -> &[I] {
self.items.as_slice()
}
}
#[cfg(test)]
mod tests {
pub use super::*;
use assert_matches::assert_matches;
use std::time::Duration;
#[test]
fn test_batch_poll() {
let mut batch = BufBatch::new(4, Duration::from_secs(10));
assert_matches!(batch.poll(), PollResult::NotReady(None));
batch.append(1);
assert_matches!(batch.poll(), PollResult::NotReady(Some(_instant)));
batch.append(2);
batch.append(3);
batch.append(4);
assert_matches!(batch.poll(), PollResult::Ready =>
assert_eq!(batch.drain().collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
);
assert_matches!(batch.poll(), PollResult::NotReady(None));
}
#[test]
fn test_batch_max_size() {
let mut batch = BufBatch::new(4, Duration::from_secs(10));
batch.append(1);
batch.append(2);
batch.append(3);
batch.append(4);
assert_matches!(batch.poll(), PollResult::Ready =>
assert_eq!(batch.drain().collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
);
batch.append(5);
batch.append(6);
batch.append(7);
batch.append(8);
assert_matches!(batch.poll(), PollResult::Ready =>
assert_eq!(batch.drain().collect::<Vec<_>>().as_slice(), [5, 6, 7, 8])
);
}
#[test]
fn test_batch_max_duration() {
let mut batch = BufBatch::new(4, Duration::from_millis(100));
batch.append(1);
batch.append(2);
let ready_after = match batch.poll() {
PollResult::NotReady(Some(ready_after)) => ready_after,
_ => panic!("expected NotReady with instant"),
};
std::thread::sleep(ready_after);
assert_matches!(batch.poll(), PollResult::Ready =>
assert_eq!(batch.drain().collect::<Vec<_>>().as_slice(), [1, 2])
);
batch.append(3);
batch.append(4);
batch.append(5);
batch.append(6);
assert_matches!(batch.poll(), PollResult::Ready =>
assert_eq!(batch.drain().collect::<Vec<_>>().as_slice(), [3, 4, 5, 6])
);
}
#[test]
fn test_drain_stream() {
let mut batch = BufBatch::new(4, Duration::from_secs(10));
batch.append(1);
batch.append(2);
batch.append(3);
assert_eq!(batch.drain().collect::<Vec<_>>().as_slice(), [1, 2, 3]);
batch.append(1);
batch.append(2);
assert_eq!(batch.drain().collect::<Vec<_>>().as_slice(), [1, 2]);
batch.append(5);
batch.append(6);
batch.append(7);
batch.append(8);
assert_matches!(batch.poll(), PollResult::Ready =>
assert_eq!(batch.drain().collect::<Vec<_>>().as_slice(), [5, 6, 7, 8])
);
}
}