readahead_iterator/
lib.rs

1// Copyright 2020, 2021 Martin Pool
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! Readahead adaptor for iterators.
10//!
11//! Items are generated from the iterator in a separate thread, and returned
12//! to the caller as a regular iterator, in the same order.
13//!
14//! This is useful when the wrapped iterator does significant work that
15//! can be parallelized with other work on the calling thread. For example,
16//! if both the iterator and its client are CPU-intensive, they utilize separate
17//! cores. Or if the iterator does blocking IO on multiple files, opening of
18//! later files can be overlapped with processing of earlier files.
19//!
20//! The wrapped iterator (and its items) must be `Send` so that they can be
21//! sent between threads.
22//!
23//! The iterator must also have `'static` lifetime, so that it lives long
24//! enough for the thread and wrapper. Often this can be accomplished by
25//! making sure the inner iterator is by-value, rather than iterating
26//! references through a collection: construct it with
27//! [`into_iter()`](https://doc.rust-lang.org/std/iter/index.html#the-three-forms-of-iteration).
28//!
29//! For example, to overlap opening files with reading from them:
30//!
31//! ```
32//! use std::fs::File;
33//! use std::io::{BufRead, BufReader};
34//! use readahead_iterator::IntoReadahead;
35//!
36//! let filenames = vec!["src/lib.rs", "examples/linecount.rs", "Cargo.toml"];
37//! let total_lines: usize = filenames
38//!     .into_iter()
39//!     .filter_map(|filename| {
40//!         File::open(filename.clone())
41//!             .map_err(|err| eprintln!("failed to open {}: {:?}", filename, err))
42//!             .map(|file| (filename, file))
43//!             .ok()
44//!     })
45//!     .readahead(5)
46//!     .map(|(filename, file)| {
47//!         let line_count = BufReader::new(file).lines().count();
48//!         println!("{:>8} {}", line_count, filename);
49//!         line_count
50//!     })
51//!     .sum();
52//! println!("{:>8} TOTAL", total_lines);
53//! ```
54
55#![warn(missing_docs)]
56#![forbid(unsafe_code)]
57
58use std::sync::mpsc::{sync_channel, Receiver};
59use std::thread;
60
61/// An iterator adaptor that evaluates the iterator on a separate thread,
62/// and transports the items back to be consumed from the original thread.
63pub struct Readahead<T: Send + 'static> {
64    receiver: Option<Receiver<Option<T>>>,
65}
66
67impl<T> Readahead<T>
68where
69    T: Send + 'static,
70{
71    /// Apply a threaded readahead to an iterator.
72    ///
73    /// Items from the iterator are produced on a separate thread and passed
74    /// back to the calling thread.
75    ///
76    /// `buffer_size` is the maximum number of items that can be buffered.
77    ///
78    /// ```
79    /// use readahead_iterator::Readahead;
80    /// let c = Readahead::new("Hello Ferris".chars(), 10)
81    ///     .filter(|c| c.is_uppercase())
82    ///     .count();
83    /// # assert_eq!(c, 2);
84    /// ```
85    pub fn new<I>(inner: I, buffer_size: usize) -> Self
86    where
87        I: Iterator<Item = T> + Send + 'static,
88    {
89        // TODO: What if the iterator is dropped?
90        let (sender, receiver) = sync_channel(buffer_size);
91        thread::Builder::new()
92            .name("readahead_iterator".to_owned())
93            .spawn(move || {
94                for item in inner {
95                    sender
96                        .send(Some(item))
97                        .expect("send from inner iterator failed");
98                }
99                sender.send(None).expect("send of final None failed");
100            })
101            .expect("failed to spawn readahead_iterator thread");
102        Readahead {
103            receiver: Some(receiver),
104        }
105    }
106}
107
108impl<T> Iterator for Readahead<T>
109where
110    T: Send + 'static,
111{
112    type Item = T;
113
114    fn next(&mut self) -> Option<T> {
115        let r = self
116            .receiver
117            .as_ref()
118            .and_then(|r| r.recv().expect("recv of iterator value failed"));
119        if r.is_none() {
120            self.receiver = None
121        }
122        r
123    }
124}
125
126/// Adds a `.readahead(buffer_size)` method to any iterator.
127///
128/// ```
129/// use readahead_iterator::IntoReadahead;
130///
131/// let c = "Some input data".chars()
132///     .readahead(10)
133///     .filter(|c| c.is_alphabetic())
134///     .count();
135/// # assert_eq!(c, 13);
136/// ```
137pub trait IntoReadahead<T>
138where
139    T: Send + 'static,
140{
141    /// Apply a readahead adaptor to an iterator.
142    ///
143    /// `buffer_size` is the maximum number of buffered items.
144    fn readahead(self, buffer_size: usize) -> Readahead<T>
145    where
146        Self: Send + 'static;
147}
148
149impl<I, T> IntoReadahead<T> for I
150where
151    T: Send + 'static,
152    I: Iterator<Item = T>,
153{
154    fn readahead(self, buffer_size: usize) -> Readahead<T>
155    where
156        Self: Send + 'static,
157    {
158        Readahead::new(self, buffer_size)
159    }
160}