readahead_iterator/
lib.rs

1// Copyright 2020, 2021, 2025 Martin Pool
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! Readahead adaptor for iterators.
10//!
11//! Items are generated from the iterator in a separate thread, and returned
12//! to the caller as a regular iterator, in the same order.
13//!
14//! This is useful when the wrapped iterator does significant work that
15//! can be parallelized with other work on the calling thread.
16//!
17//! For example:
18//!
19//! 1. Both the iterator and its client are CPU-intensive: allowing the iterator to
20//!    run ahead will let it do some work in parallel on a separate core.
21//! 2. The iterator generating work does blocking or lengthy IO such as opening
22//!    and reading many files: opening the files can proceed in parallel with
23//!    processing already-open files.
24//!
25//! The wrapped iterator (and its items) must be `Send` so that they can be
26//! sent between threads.
27//!
28//! The iterator must also have `'static` lifetime, so that it lives long
29//! enough for the thread and wrapper. Often this can be accomplished by
30//! making sure the inner iterator is by-value, rather than iterating
31//! references through a collection: construct it with
32//! [`into_iter()`](https://doc.rust-lang.org/std/iter/index.html#the-three-forms-of-iteration).
33//!
34//! For example, to overlap opening files with reading from them:
35//!
36//! ```
37//! use std::fs::File;
38//! use std::io::{BufRead, BufReader};
39//! use readahead_iterator::IntoReadahead;
40//!
41//! let filenames = vec!["src/lib.rs", "examples/linecount.rs", "Cargo.toml"];
42//! let total_lines: usize = filenames
43//!     .into_iter()
44//!     .filter_map(|filename| {
45//!         File::open(filename.clone())
46//!             .map_err(|err| eprintln!("failed to open {}: {:?}", filename, err))
47//!             .map(|file| (filename, file))
48//!             .ok()
49//!     })
50//!     .readahead(5)
51//!     .map(|(filename, file)| {
52//!         let line_count = BufReader::new(file).lines().count();
53//!         println!("{:>8} {}", line_count, filename);
54//!         line_count
55//!     })
56//!     .sum();
57//! println!("{:>8} TOTAL", total_lines);
58//! ```
59//!
60//! # Potential future features:
61//!
62//! 1. A threaded `map` across a bounded readahead from the iterator, processing them
63//!    out of order within a sliding window.
64
65#![warn(missing_docs)]
66#![forbid(unsafe_code)]
67
68#[doc = include_str!("../NEWS.md")]
69pub mod _changelog {}
70
71use std::sync::mpsc::{sync_channel, Receiver};
72use std::thread;
73
74/// An iterator adaptor that evaluates the iterator on a separate thread,
75/// and transports the items back to be consumed from the original thread.
76pub struct Readahead<T: Send + 'static> {
77    receiver: Option<Receiver<Option<T>>>,
78}
79
80impl<T> Readahead<T>
81where
82    T: Send + 'static,
83{
84    /// Apply a threaded readahead to an iterator.
85    ///
86    /// Items from the iterator are produced on a separate thread and passed
87    /// back to the calling thread.
88    ///
89    /// `buffer_size` is the maximum number of items that can be buffered.
90    ///
91    /// ```
92    /// use readahead_iterator::Readahead;
93    /// let c = Readahead::new("Hello Ferris".chars(), 10)
94    ///     .filter(|c| c.is_uppercase())
95    ///     .count();
96    /// # assert_eq!(c, 2);
97    /// ```
98    ///
99    /// # Panics
100    ///
101    /// On failing to spawn a new thread.
102    pub fn new<I>(inner: I, buffer_size: usize) -> Self
103    where
104        I: Iterator<Item = T> + Send + 'static,
105    {
106        let (sender, receiver) = sync_channel(buffer_size);
107        thread::Builder::new()
108            .name("readahead_iterator".to_owned())
109            .spawn(move || {
110                for item in inner {
111                    if sender.send(Some(item)).is_err() {
112                        // Receiver has been dropped, stop sending
113                        return;
114                    }
115                }
116                // Receiver has been dropped, no need to send final None
117                let _ = sender.send(None);
118            })
119            .expect("failed to spawn readahead_iterator thread"); // TODO: Optionally return an error instead.
120        Readahead {
121            receiver: Some(receiver),
122        }
123    }
124}
125
126impl<T> Iterator for Readahead<T>
127where
128    T: Send + 'static,
129{
130    type Item = T;
131
132    fn next(&mut self) -> Option<T> {
133        // Iterator returns None when:
134        // 1. receiver is already None, i.e. we already ended.
135        // 2. sender sent an explicit None indicating the end, i.e. normal termination
136        // 3. the sender hung up: this shouldn't normally happen but let's not panic.
137        let r = self
138            .receiver
139            .as_ref()
140            .and_then(|r| r.recv().ok())
141            .unwrap_or_default();
142        if r.is_none() {
143            self.receiver = None;
144        }
145        r
146    }
147}
148
149/// Adds a `.readahead(buffer_size)` method to any iterator.
150///
151/// ```
152/// use readahead_iterator::IntoReadahead;
153///
154/// let c = "Some input data".chars()
155///     .readahead(10)
156///     .filter(|c| c.is_alphabetic())
157///     .count();
158/// # assert_eq!(c, 13);
159/// ```
160pub trait IntoReadahead<T>
161where
162    T: Send + 'static,
163{
164    /// Apply a readahead adaptor to an iterator.
165    ///
166    /// `buffer_size` is the maximum number of buffered items.
167    fn readahead(self, buffer_size: usize) -> Readahead<T>
168    where
169        Self: Send + 'static;
170}
171
172impl<I, T> IntoReadahead<T> for I
173where
174    T: Send + 'static,
175    I: Iterator<Item = T>,
176{
177    fn readahead(self, buffer_size: usize) -> Readahead<T>
178    where
179        Self: Send + 'static,
180    {
181        Readahead::new(self, buffer_size)
182    }
183}
184
185#[cfg(test)]
186mod test {
187    use super::*;
188
189    /// Test that we don't panic if the receiver thread quits unexpectedly.
190    ///
191    /// This might not be possible to construct through the public interface
192    /// but it's still good to avoid a potential panic.
193    #[test]
194    fn sender_exits_unexpectedly() {
195        let (sender, receiver) = sync_channel(4);
196        thread::Builder::new()
197            .spawn(move || {
198                sender.send(Some(1)).expect("send failed");
199            })
200            .expect("failed to spawn readahead_iterator thread"); // TODO: Optionally return an error instead.
201        let mut r = Readahead {
202            receiver: Some(receiver),
203        };
204        assert_eq!(r.next(), Some(1));
205        // the sender quit without returning None but we shouldn't panic: just see that as the end
206        assert_eq!(r.next(), None);
207        assert_eq!(r.next(), None);
208    }
209
210    #[test]
211    fn receiver_doesnt_panic_if_sender_panics() {
212        // TODO: Possibly some callers might want to propagate panics??
213        //
214        // Note: this will display a panic warning on the test's stderr, but the
215        // calling thread continues on and succeeds.
216        let vals = vec![false, true];
217        let iter = vals.into_iter().map(|v| if v { panic!() } else { 2 });
218        let mut r = iter.readahead(1);
219        assert_eq!(r.next(), Some(2));
220        assert_eq!(r.next(), None);
221        assert_eq!(r.next(), None);
222    }
223}