1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
// Copyright 2020, 2021 Martin Pool // // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your // option. This file may not be copied, modified, or distributed // except according to those terms. //! Readahead adaptor for iterators. //! //! Items are generated from the iterator in a separate thread, and returned //! to the caller as a regular iterator, in the same order. //! //! This is useful when the wrapped iterator does significant work that //! can be parallelized with other work on the calling thread. For example, //! if both the iterator and its client are CPU-intensive, they utilize separate //! cores. Or if the iterator does blocking IO on multiple files, opening of //! later files can be overlapped with processing of earlier files. //! //! The wrapped iterator (and its items) must be `Send` so that they can be //! sent between threads. //! //! The iterator must also have `'static` lifetime, so that it lives long //! enough for the thread and wrapper. Often this can be accomplished by //! making sure the inner iterator is by-value, rather than iterating //! references through a collection: construct it with //! [`into_iter()`](https://doc.rust-lang.org/std/iter/index.html#the-three-forms-of-iteration). //! //! For example, to overlap opening files with reading from them: //! //! ``` //! use std::fs::File; //! use std::io::{BufRead, BufReader}; //! use readahead_iterator::IntoReadahead; //! //! let filenames = vec!["src/lib.rs", "examples/linecount.rs", "Cargo.toml"]; //! let total_lines: usize = filenames //! .into_iter() //! .filter_map(|filename| { //! File::open(filename.clone()) //! .map_err(|err| eprintln!("failed to open {}: {:?}", filename, err)) //! .map(|file| (filename, file)) //! .ok() //! }) //! .readahead(5) //! .map(|(filename, file)| { //! let line_count = BufReader::new(file).lines().count(); //! println!("{:>8} {}", line_count, filename); //! line_count //! }) //! .sum(); //! println!("{:>8} TOTAL", total_lines); //! ``` #![warn(missing_docs)] #![forbid(unsafe_code)] use std::sync::mpsc::{sync_channel, Receiver}; use std::thread; /// An iterator adaptor that evaluates the iterator on a separate thread, /// and transports the items back to be consumed from the original thread. pub struct Readahead<T: Send + 'static> { receiver: Option<Receiver<Option<T>>>, } impl<T> Readahead<T> where T: Send + 'static, { /// Apply a threaded readahead to an iterator. /// /// Items from the iterator are produced on a separate thread and passed /// back to the calling thread. /// /// `buffer_size` is the maximum number of items that can be buffered. /// /// ``` /// use readahead_iterator::Readahead; /// let c = Readahead::new("Hello Ferris".chars(), 10) /// .filter(|c| c.is_uppercase()) /// .count(); /// # assert_eq!(c, 2); /// ``` pub fn new<I>(inner: I, buffer_size: usize) -> Self where I: Iterator<Item = T> + Send + 'static, { // TODO: What if the iterator is dropped? let (sender, receiver) = sync_channel(buffer_size); thread::Builder::new() .name("readahead_iterator".to_owned()) .spawn(move || { for item in inner { sender .send(Some(item)) .expect("send from inner iterator failed"); } sender.send(None).expect("send of final None failed"); }) .expect("failed to spawn readahead_iterator thread"); Readahead { receiver: Some(receiver), } } } impl<T> Iterator for Readahead<T> where T: Send + 'static, { type Item = T; fn next(&mut self) -> Option<T> { let r = self .receiver .as_ref() .and_then(|r| r.recv().expect("recv of iterator value failed")); if r.is_none() { self.receiver = None } r } } /// Adds a `.readahead(buffer_size)` method to any iterator. /// /// ``` /// use readahead_iterator::IntoReadahead; /// /// let c = "Some input data".chars() /// .readahead(10) /// .filter(|c| c.is_alphabetic()) /// .count(); /// # assert_eq!(c, 13); /// ``` pub trait IntoReadahead<T> where T: Send + 'static, { /// Apply a readahead adaptor to an iterator. /// /// `buffer_size` is the maximum number of buffered items. fn readahead(self, buffer_size: usize) -> Readahead<T> where Self: Send + 'static; } impl<I, T> IntoReadahead<T> for I where T: Send + 'static, I: Iterator<Item = T>, { fn readahead(self, buffer_size: usize) -> Readahead<T> where Self: Send + 'static, { Readahead::new(self, buffer_size) } }