timeout-iterator 1.1.7

TimeoutIterator is a wrapper over any iterator that adds peek_timeout and next_timeout functions. The canonical use-case is parsing multi-line free-form records (such as tailing a log fime) where it is desirable to consume the very last line, and peek whether the record continues on the next time, but not block indefinitely on the peek.
Documentation
use std::time::Duration;

use crate::error;
use std::sync::mpsc;
use std::thread;

pub struct TimeoutIterator<T> {
    source: mpsc::Receiver<T>,
    buffer: Vec<T>,
}

impl<T> TimeoutIterator<T>
where
    T: Send + 'static,
{
    pub fn with_iter<R>(iter: R) -> Result<TimeoutIterator<T>, error::Error>
    where
        R: Iterator<Item = T> + Send + 'static,
    {
        let (sink, source): (mpsc::Sender<T>, mpsc::Receiver<T>) = mpsc::channel();

        thread::Builder::new().name("TimeoutIterator::sender".to_owned()).spawn(move || {
            for item in iter {
                if let Err(e) = sink.send(item) {
                    eprintln!("TimeoutIterator:: Error sending data to channel. Receiver may have closed. Closing up sender. Error: {}", e);
                    return;
                }
            }
        })?;

        Ok(TimeoutIterator {
            source,
            buffer: Vec::new(),
        })
    }

    pub fn next_timeout(&mut self, timeout: Duration) -> Result<T, error::Error> {
        if !self.buffer.is_empty() {
            return Ok(self.buffer.remove(0));
        };

        match self.source.recv_timeout(timeout) {
            Ok(item) => Ok(item),
            Err(e) => match e {
                mpsc::RecvTimeoutError::Timeout => Err(error::Error::TimedOut),
                mpsc::RecvTimeoutError::Disconnected => Err(error::Error::Disconnected),
            },
        }
    }

    pub fn peek_timeout(&mut self, timeout: Duration) -> Result<&T, error::Error> {
        if self.buffer.is_empty() {
            match self.source.recv_timeout(timeout) {
                Ok(item) => self.buffer.push(item),
                Err(e) => match e {
                    mpsc::RecvTimeoutError::Timeout => return Err(error::Error::TimedOut),
                    mpsc::RecvTimeoutError::Disconnected => return Err(error::Error::Disconnected),
                },
            }
        };

        Ok(self.buffer.first().unwrap())
    }

    pub fn peek(&mut self) -> Option<&T> {
        if self.buffer.is_empty() {
            match self.next() {
                Some(item) => self.buffer.push(item),
                None => {
                    return None;
                }
            }
        };

        Some(self.buffer.first().unwrap())
    }
}

impl<T> Iterator for TimeoutIterator<T> {
    type Item = T;
    fn next(&mut self) -> Option<Self::Item> {
        if !self.buffer.is_empty() {
            return Some(self.buffer.remove(0));
        };

        match self.source.recv() {
            Ok(item) => Some(item),
            Err(e) => {
                eprintln!(
                    "TimeoutIterator:: Error occurred reading from source: {}.",
                    e
                );
                None
            }
        }
    }
}

#[cfg(all(test, feature = "sync"))]
mod tests {
    use super::*;
    use std::io::prelude::*;

    #[test]
    fn iterates() {
        let realistic_message = r"1
2
3
4
5";
        let lines_iterator = (Box::new(realistic_message.as_bytes())).lines();

        let mut ti = TimeoutIterator::with_iter(lines_iterator).unwrap();

        assert_eq!(ti.next().unwrap().unwrap(), "1");
        assert_eq!(ti.next().unwrap().unwrap(), "2");
        assert_eq!(ti.next().unwrap().unwrap(), "3");
        assert_eq!(ti.next().unwrap().unwrap(), "4");
        assert_eq!(ti.next().unwrap().unwrap(), "5");
    }

    #[test]
    fn next_timeout() {
        let realistic_message = r"1
2
3
4
5";
        let lines_iterator = (Box::new(realistic_message.as_bytes())).lines();

        let mut ti = TimeoutIterator::with_iter(lines_iterator).unwrap();

        assert_eq!(ti.next().unwrap().unwrap(), "1");
        assert_eq!(ti.next().unwrap().unwrap(), "2");
        assert_eq!(ti.next().unwrap().unwrap(), "3");
        assert_eq!(ti.next().unwrap().unwrap(), "4");
        assert_eq!(ti.next().unwrap().unwrap(), "5");

        let timeout_result = ti.next_timeout(Duration::from_secs(1));
        assert!(timeout_result.is_err());
    }

    #[test]
    fn peek_timeout_doesnt_remove() {
        let realistic_message = r"1
2
3
4
5";
        let lines_iterator = (Box::new(realistic_message.as_bytes())).lines();
        let mut ti = TimeoutIterator::with_iter(lines_iterator).unwrap();

        assert_eq!(ti.next().unwrap().unwrap(), "1");
        assert_eq!(ti.next().unwrap().unwrap(), "2");
        assert_eq!(
            ti.peek_timeout(Duration::from_secs(1))
                .ok()
                .unwrap()
                .as_ref()
                .unwrap(),
            "3"
        );
        assert_eq!(ti.next().unwrap().unwrap(), "3");
        assert_eq!(ti.next().unwrap().unwrap(), "4");
        assert_eq!(
            ti.peek_timeout(Duration::from_secs(1))
                .ok()
                .unwrap()
                .as_ref()
                .unwrap(),
            "5"
        );
        assert_eq!(
            ti.peek_timeout(Duration::from_secs(1))
                .ok()
                .unwrap()
                .as_ref()
                .unwrap(),
            "5"
        );
        assert_eq!(ti.next().unwrap().unwrap(), "5");

        let timeout_result = ti.next_timeout(Duration::from_secs(1));
        assert!(timeout_result.is_err());
    }

    #[test]
    fn peek_doesnt_remove() {
        let realistic_message = r"1
2
3
4
5";
        let lines_iterator = (Box::new(realistic_message.as_bytes())).lines();
        let mut ti = TimeoutIterator::with_iter(lines_iterator).unwrap();

        assert_eq!(ti.next().unwrap().unwrap(), "1");
        assert_eq!(ti.next().unwrap().unwrap(), "2");
        assert_eq!(ti.peek().unwrap().as_ref().unwrap(), "3");
        assert_eq!(ti.next().unwrap().unwrap(), "3");
        assert_eq!(ti.next().unwrap().unwrap(), "4");
        assert_eq!(ti.peek().unwrap().as_ref().unwrap(), "5");
        assert_eq!(ti.peek().unwrap().as_ref().unwrap(), "5");
        assert_eq!(ti.next().unwrap().unwrap(), "5");

        let timeout_result = ti.next_timeout(Duration::from_secs(1));
        assert!(timeout_result.is_err());
    }

    #[test]
    fn item_iterator() {
        let numbers: Vec<u32> = vec![1, 2, 3, 4, 5];
        let mut ti = TimeoutIterator::with_iter(numbers.into_iter()).unwrap();

        assert_eq!(ti.next().unwrap(), 1);
        assert_eq!(ti.next().unwrap(), 2);
        assert_eq!(*ti.peek_timeout(Duration::from_secs(1)).ok().unwrap(), 3);
        assert_eq!(ti.next().unwrap(), 3);
        assert_eq!(ti.next().unwrap(), 4);
        assert_eq!(*ti.peek_timeout(Duration::from_secs(1)).ok().unwrap(), 5);
        assert_eq!(*ti.peek_timeout(Duration::from_secs(1)).ok().unwrap(), 5);
        assert_eq!(ti.next().unwrap(), 5);

        let timeout_result = ti.next_timeout(Duration::from_secs(1));
        assert!(timeout_result.is_err());
    }

    #[test]
    fn is_sendable() {
        let numbers: Vec<u32> = vec![1, 2, 3, 4, 5];
        let mut ti = TimeoutIterator::with_iter(numbers.into_iter()).unwrap();
        thread::spawn(move || {
            ti.next();
        });
    }
}