#![warn(missing_docs)]
#![forbid(unsafe_code)]
#[doc = include_str!("../NEWS.md")]
pub mod _changelog {}
use std::sync::mpsc::{sync_channel, Receiver};
use std::thread;
pub struct Readahead<T: Send + 'static> {
receiver: Option<Receiver<Option<T>>>,
}
impl<T> Readahead<T>
where
T: Send + 'static,
{
pub fn new<I>(inner: I, buffer_size: usize) -> Self
where
I: Iterator<Item = T> + Send + 'static,
{
let (sender, receiver) = sync_channel(buffer_size);
thread::Builder::new()
.name("readahead_iterator".to_owned())
.spawn(move || {
for item in inner {
if sender.send(Some(item)).is_err() {
return;
}
}
let _ = sender.send(None);
})
.expect("failed to spawn readahead_iterator thread"); Readahead {
receiver: Some(receiver),
}
}
}
impl<T> Iterator for Readahead<T>
where
T: Send + 'static,
{
type Item = T;
fn next(&mut self) -> Option<T> {
let r = self
.receiver
.as_ref()
.and_then(|r| r.recv().ok())
.unwrap_or_default();
if r.is_none() {
self.receiver = None;
}
r
}
}
pub trait IntoReadahead<T>
where
T: Send + 'static,
{
fn readahead(self, buffer_size: usize) -> Readahead<T>
where
Self: Send + 'static;
}
impl<I, T> IntoReadahead<T> for I
where
T: Send + 'static,
I: Iterator<Item = T>,
{
fn readahead(self, buffer_size: usize) -> Readahead<T>
where
Self: Send + 'static,
{
Readahead::new(self, buffer_size)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn sender_exits_unexpectedly() {
let (sender, receiver) = sync_channel(4);
thread::Builder::new()
.spawn(move || {
sender.send(Some(1)).expect("send failed");
})
.expect("failed to spawn readahead_iterator thread"); let mut r = Readahead {
receiver: Some(receiver),
};
assert_eq!(r.next(), Some(1));
assert_eq!(r.next(), None);
assert_eq!(r.next(), None);
}
#[test]
fn receiver_doesnt_panic_if_sender_panics() {
let vals = vec![false, true];
let iter = vals.into_iter().map(|v| if v { panic!() } else { 2 });
let mut r = iter.readahead(1);
assert_eq!(r.next(), Some(2));
assert_eq!(r.next(), None);
assert_eq!(r.next(), None);
}
}