readahead_iterator/lib.rs
1// Copyright 2020, 2021 Martin Pool
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! Readahead adaptor for iterators.
10//!
11//! Items are generated from the iterator in a separate thread, and returned
12//! to the caller as a regular iterator, in the same order.
13//!
14//! This is useful when the wrapped iterator does significant work that
15//! can be parallelized with other work on the calling thread. For example,
16//! if both the iterator and its client are CPU-intensive, they utilize separate
17//! cores. Or if the iterator does blocking IO on multiple files, opening of
18//! later files can be overlapped with processing of earlier files.
19//!
20//! The wrapped iterator (and its items) must be `Send` so that they can be
21//! sent between threads.
22//!
23//! The iterator must also have `'static` lifetime, so that it lives long
24//! enough for the thread and wrapper. Often this can be accomplished by
25//! making sure the inner iterator is by-value, rather than iterating
26//! references through a collection: construct it with
27//! [`into_iter()`](https://doc.rust-lang.org/std/iter/index.html#the-three-forms-of-iteration).
28//!
29//! For example, to overlap opening files with reading from them:
30//!
31//! ```
32//! use std::fs::File;
33//! use std::io::{BufRead, BufReader};
34//! use readahead_iterator::IntoReadahead;
35//!
36//! let filenames = vec!["src/lib.rs", "examples/linecount.rs", "Cargo.toml"];
37//! let total_lines: usize = filenames
38//! .into_iter()
39//! .filter_map(|filename| {
40//! File::open(filename.clone())
41//! .map_err(|err| eprintln!("failed to open {}: {:?}", filename, err))
42//! .map(|file| (filename, file))
43//! .ok()
44//! })
45//! .readahead(5)
46//! .map(|(filename, file)| {
47//! let line_count = BufReader::new(file).lines().count();
48//! println!("{:>8} {}", line_count, filename);
49//! line_count
50//! })
51//! .sum();
52//! println!("{:>8} TOTAL", total_lines);
53//! ```
54
55#![warn(missing_docs)]
56#![forbid(unsafe_code)]
57
58use std::sync::mpsc::{sync_channel, Receiver};
59use std::thread;
60
61/// An iterator adaptor that evaluates the iterator on a separate thread,
62/// and transports the items back to be consumed from the original thread.
63pub struct Readahead<T: Send + 'static> {
64 receiver: Option<Receiver<Option<T>>>,
65}
66
67impl<T> Readahead<T>
68where
69 T: Send + 'static,
70{
71 /// Apply a threaded readahead to an iterator.
72 ///
73 /// Items from the iterator are produced on a separate thread and passed
74 /// back to the calling thread.
75 ///
76 /// `buffer_size` is the maximum number of items that can be buffered.
77 ///
78 /// ```
79 /// use readahead_iterator::Readahead;
80 /// let c = Readahead::new("Hello Ferris".chars(), 10)
81 /// .filter(|c| c.is_uppercase())
82 /// .count();
83 /// # assert_eq!(c, 2);
84 /// ```
85 pub fn new<I>(inner: I, buffer_size: usize) -> Self
86 where
87 I: Iterator<Item = T> + Send + 'static,
88 {
89 // TODO: What if the iterator is dropped?
90 let (sender, receiver) = sync_channel(buffer_size);
91 thread::Builder::new()
92 .name("readahead_iterator".to_owned())
93 .spawn(move || {
94 for item in inner {
95 sender
96 .send(Some(item))
97 .expect("send from inner iterator failed");
98 }
99 sender.send(None).expect("send of final None failed");
100 })
101 .expect("failed to spawn readahead_iterator thread");
102 Readahead {
103 receiver: Some(receiver),
104 }
105 }
106}
107
108impl<T> Iterator for Readahead<T>
109where
110 T: Send + 'static,
111{
112 type Item = T;
113
114 fn next(&mut self) -> Option<T> {
115 let r = self
116 .receiver
117 .as_ref()
118 .and_then(|r| r.recv().expect("recv of iterator value failed"));
119 if r.is_none() {
120 self.receiver = None
121 }
122 r
123 }
124}
125
126/// Adds a `.readahead(buffer_size)` method to any iterator.
127///
128/// ```
129/// use readahead_iterator::IntoReadahead;
130///
131/// let c = "Some input data".chars()
132/// .readahead(10)
133/// .filter(|c| c.is_alphabetic())
134/// .count();
135/// # assert_eq!(c, 13);
136/// ```
137pub trait IntoReadahead<T>
138where
139 T: Send + 'static,
140{
141 /// Apply a readahead adaptor to an iterator.
142 ///
143 /// `buffer_size` is the maximum number of buffered items.
144 fn readahead(self, buffer_size: usize) -> Readahead<T>
145 where
146 Self: Send + 'static;
147}
148
149impl<I, T> IntoReadahead<T> for I
150where
151 T: Send + 'static,
152 I: Iterator<Item = T>,
153{
154 fn readahead(self, buffer_size: usize) -> Readahead<T>
155 where
156 Self: Send + 'static,
157 {
158 Readahead::new(self, buffer_size)
159 }
160}