readahead_iterator/lib.rs
1// Copyright 2020, 2021, 2025 Martin Pool
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! Readahead adaptor for iterators.
10//!
11//! Items are generated from the iterator in a separate thread, and returned
12//! to the caller as a regular iterator, in the same order.
13//!
14//! This is useful when the wrapped iterator does significant work that
15//! can be parallelized with other work on the calling thread.
16//!
17//! For example:
18//!
19//! 1. Both the iterator and its client are CPU-intensive: allowing the iterator to
20//! run ahead will let it do some work in parallel on a separate core.
21//! 2. The iterator generating work does blocking or lengthy IO such as opening
22//! and reading many files: opening the files can proceed in parallel with
23//! processing already-open files.
24//!
25//! The wrapped iterator (and its items) must be `Send` so that they can be
26//! sent between threads.
27//!
28//! The iterator must also have `'static` lifetime, so that it lives long
29//! enough for the thread and wrapper. Often this can be accomplished by
30//! making sure the inner iterator is by-value, rather than iterating
31//! references through a collection: construct it with
32//! [`into_iter()`](https://doc.rust-lang.org/std/iter/index.html#the-three-forms-of-iteration).
33//!
34//! For example, to overlap opening files with reading from them:
35//!
36//! ```
37//! use std::fs::File;
38//! use std::io::{BufRead, BufReader};
39//! use readahead_iterator::IntoReadahead;
40//!
41//! let filenames = vec!["src/lib.rs", "examples/linecount.rs", "Cargo.toml"];
42//! let total_lines: usize = filenames
43//! .into_iter()
44//! .filter_map(|filename| {
45//! File::open(filename.clone())
46//! .map_err(|err| eprintln!("failed to open {}: {:?}", filename, err))
47//! .map(|file| (filename, file))
48//! .ok()
49//! })
50//! .readahead(5)
51//! .map(|(filename, file)| {
52//! let line_count = BufReader::new(file).lines().count();
53//! println!("{:>8} {}", line_count, filename);
54//! line_count
55//! })
56//! .sum();
57//! println!("{:>8} TOTAL", total_lines);
58//! ```
59//!
60//! # Potential future features:
61//!
62//! 1. A threaded `map` across a bounded readahead from the iterator, processing them
63//! out of order within a sliding window.
64
65#![warn(missing_docs)]
66#![forbid(unsafe_code)]
67
68#[doc = include_str!("../NEWS.md")]
69pub mod _changelog {}
70
71use std::sync::mpsc::{sync_channel, Receiver};
72use std::thread;
73
74/// An iterator adaptor that evaluates the iterator on a separate thread,
75/// and transports the items back to be consumed from the original thread.
76pub struct Readahead<T: Send + 'static> {
77 receiver: Option<Receiver<Option<T>>>,
78}
79
80impl<T> Readahead<T>
81where
82 T: Send + 'static,
83{
84 /// Apply a threaded readahead to an iterator.
85 ///
86 /// Items from the iterator are produced on a separate thread and passed
87 /// back to the calling thread.
88 ///
89 /// `buffer_size` is the maximum number of items that can be buffered.
90 ///
91 /// ```
92 /// use readahead_iterator::Readahead;
93 /// let c = Readahead::new("Hello Ferris".chars(), 10)
94 /// .filter(|c| c.is_uppercase())
95 /// .count();
96 /// # assert_eq!(c, 2);
97 /// ```
98 ///
99 /// # Panics
100 ///
101 /// On failing to spawn a new thread.
102 pub fn new<I>(inner: I, buffer_size: usize) -> Self
103 where
104 I: Iterator<Item = T> + Send + 'static,
105 {
106 let (sender, receiver) = sync_channel(buffer_size);
107 thread::Builder::new()
108 .name("readahead_iterator".to_owned())
109 .spawn(move || {
110 for item in inner {
111 if sender.send(Some(item)).is_err() {
112 // Receiver has been dropped, stop sending
113 return;
114 }
115 }
116 // Receiver has been dropped, no need to send final None
117 let _ = sender.send(None);
118 })
119 .expect("failed to spawn readahead_iterator thread"); // TODO: Optionally return an error instead.
120 Readahead {
121 receiver: Some(receiver),
122 }
123 }
124}
125
126impl<T> Iterator for Readahead<T>
127where
128 T: Send + 'static,
129{
130 type Item = T;
131
132 fn next(&mut self) -> Option<T> {
133 // Iterator returns None when:
134 // 1. receiver is already None, i.e. we already ended.
135 // 2. sender sent an explicit None indicating the end, i.e. normal termination
136 // 3. the sender hung up: this shouldn't normally happen but let's not panic.
137 let r = self
138 .receiver
139 .as_ref()
140 .and_then(|r| r.recv().ok())
141 .unwrap_or_default();
142 if r.is_none() {
143 self.receiver = None;
144 }
145 r
146 }
147}
148
149/// Adds a `.readahead(buffer_size)` method to any iterator.
150///
151/// ```
152/// use readahead_iterator::IntoReadahead;
153///
154/// let c = "Some input data".chars()
155/// .readahead(10)
156/// .filter(|c| c.is_alphabetic())
157/// .count();
158/// # assert_eq!(c, 13);
159/// ```
160pub trait IntoReadahead<T>
161where
162 T: Send + 'static,
163{
164 /// Apply a readahead adaptor to an iterator.
165 ///
166 /// `buffer_size` is the maximum number of buffered items.
167 fn readahead(self, buffer_size: usize) -> Readahead<T>
168 where
169 Self: Send + 'static;
170}
171
172impl<I, T> IntoReadahead<T> for I
173where
174 T: Send + 'static,
175 I: Iterator<Item = T>,
176{
177 fn readahead(self, buffer_size: usize) -> Readahead<T>
178 where
179 Self: Send + 'static,
180 {
181 Readahead::new(self, buffer_size)
182 }
183}
184
185#[cfg(test)]
186mod test {
187 use super::*;
188
189 /// Test that we don't panic if the receiver thread quits unexpectedly.
190 ///
191 /// This might not be possible to construct through the public interface
192 /// but it's still good to avoid a potential panic.
193 #[test]
194 fn sender_exits_unexpectedly() {
195 let (sender, receiver) = sync_channel(4);
196 thread::Builder::new()
197 .spawn(move || {
198 sender.send(Some(1)).expect("send failed");
199 })
200 .expect("failed to spawn readahead_iterator thread"); // TODO: Optionally return an error instead.
201 let mut r = Readahead {
202 receiver: Some(receiver),
203 };
204 assert_eq!(r.next(), Some(1));
205 // the sender quit without returning None but we shouldn't panic: just see that as the end
206 assert_eq!(r.next(), None);
207 assert_eq!(r.next(), None);
208 }
209
210 #[test]
211 fn receiver_doesnt_panic_if_sender_panics() {
212 // TODO: Possibly some callers might want to propagate panics??
213 //
214 // Note: this will display a panic warning on the test's stderr, but the
215 // calling thread continues on and succeeds.
216 let vals = vec![false, true];
217 let iter = vals.into_iter().map(|v| if v { panic!() } else { 2 });
218 let mut r = iter.readahead(1);
219 assert_eq!(r.next(), Some(2));
220 assert_eq!(r.next(), None);
221 assert_eq!(r.next(), None);
222 }
223}