par_map/
lib.rs

1// Copyright (c) 2016-2017 Guillaume Pinot <texitoi(a)texitoi.eu>
2//
3// This work is free. You can redistribute it and/or modify it under
4// the terms of the Do What The Fuck You Want To Public License,
5// Version 2, as published by Sam Hocevar. See the COPYING file for
6// more details.
7
8//! This crate provides an easy way to get parallel iteration.  The
9//! contract of the added method are (almost) exacly the same as the
10//! method without the `par_` prefix proposed in `std`.
11
12#[deny(missing_docs)]
13extern crate futures;
14extern crate futures_cpupool;
15extern crate num_cpus;
16#[macro_use]
17extern crate pub_iterator_type;
18
19use futures::Future;
20use futures_cpupool::{CpuFuture, CpuPool};
21use std::collections::VecDeque;
22use std::sync::Arc;
23
24/// This trait extends `std::iter::Iterator` with parallel
25/// iterator adaptors.  Just `use` it to get access to the methods:
26///
27/// ```
28/// use par_map::ParMap;
29/// ```
30///
31/// Each iterator adaptor will have its own thread pool of the number
32/// of CPU.  At maximum, 2 times the number of defined threads
33/// (the default is the number of cpus) will be
34/// launched in advance, guarantying that the memory will not be
35/// exceeded if the iterator is not consumed faster that the
36/// production.  To be effective, the given function should be costy
37/// to compute and each call should take about the same time.  The
38/// `packed` variants will do the same, processing by batch instead of
39/// doing one job for each item.
40///
41/// The `'static` constraints are needed to have such a simple
42/// interface.  These adaptors are well suited for big iterators that
43/// can't be collected into a `Vec`.  Else, crates such as `rayon` are
44/// more suited for this kind of task.
45pub trait ParMap: Iterator + Sized {
46    /// Takes a closure and creates an iterator which calls that
47    /// closure on each element, exactly as
48    /// `std::iter::Iterator::map`.
49    ///
50    /// The order of the elements are guaranted to be unchanged.  Of
51    /// course, the given closures can be executed in parallel out of
52    /// order.
53    ///
54    /// # Example
55    ///
56    /// ```
57    /// use par_map::ParMap;
58    /// let a = [1, 2, 3];
59    /// let mut iter = a.iter().cloned().par_map(|x| 2 * x);
60    /// assert_eq!(iter.next(), Some(2));
61    /// assert_eq!(iter.next(), Some(4));
62    /// assert_eq!(iter.next(), Some(6));
63    /// assert_eq!(iter.next(), None);
64    /// ```
65    fn par_map<B, F>(self, f: F) -> Map<Self, B, F>
66    where
67        F: Sync + Send + 'static + Fn(Self::Item) -> B,
68        B: Send + 'static,
69        Self::Item: Send + 'static,
70    {
71        self.with_nb_threads(num_cpus::get()).par_map(f)
72    }
73
74    /// Creates an iterator that works like map, but flattens nested
75    /// structure, exactly as `std::iter::Iterator::flat_map`.
76    ///
77    /// The order of the elements are guaranted to be unchanged.  Of
78    /// course, the given closures can be executed in parallel out of
79    /// order.
80    ///
81    /// # Example
82    ///
83    /// ```
84    /// use par_map::ParMap;
85    /// let words = ["alpha", "beta", "gamma"];
86    /// let merged: String = words.iter()
87    ///     .cloned() // as items must be 'static
88    ///     .par_flat_map(|s| s.chars()) // exactly as std::iter::Iterator::flat_map
89    ///     .collect();
90    /// assert_eq!(merged, "alphabetagamma");
91    /// ```
92    fn par_flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
93    where
94        F: Sync + Send + 'static + Fn(Self::Item) -> U,
95        U: IntoIterator,
96        U::Item: Send + 'static,
97        Self::Item: Send + 'static,
98    {
99        self.with_nb_threads(num_cpus::get()).par_flat_map(f)
100    }
101
102    /// Creates an iterator that yields `Vec<Self::Item>` of size `nb`
103    /// (or less on the last element).
104    ///
105    /// # Example
106    ///
107    /// ```
108    /// use par_map::ParMap;
109    /// let nbs = [1, 2, 3, 4, 5, 6, 7];
110    /// let mut iter = nbs.iter().cloned().pack(3);
111    /// assert_eq!(Some(vec![1, 2, 3]), iter.next());
112    /// assert_eq!(Some(vec![4, 5, 6]), iter.next());
113    /// assert_eq!(Some(vec![7]), iter.next());
114    /// assert_eq!(None, iter.next());
115    /// ```
116    fn pack(self, nb: usize) -> Pack<Self> {
117        Pack { iter: self, nb: nb }
118    }
119
120    /// Same as `par_map`, but the parallel work is batched by `nb` items.
121    ///
122    /// # Example
123    ///
124    /// ```
125    /// use par_map::ParMap;
126    /// let a = [1, 2, 3];
127    /// let mut iter = a.iter().cloned().par_packed_map(2, |x| 2 * x);
128    /// assert_eq!(iter.next(), Some(2));
129    /// assert_eq!(iter.next(), Some(4));
130    /// assert_eq!(iter.next(), Some(6));
131    /// assert_eq!(iter.next(), None);
132    /// ```
133    fn par_packed_map<'a, B, F>(self, nb: usize, f: F) -> PackedMap<'a, B>
134    where
135        F: Sync + Send + 'static + Fn(Self::Item) -> B,
136        B: Send + 'static,
137        Self::Item: Send + 'static,
138        Self: 'a,
139    {
140        self.with_nb_threads(num_cpus::get()).par_packed_map(nb, f)
141    }
142
143    /// Same as `par_flat_map`, but the parallel work is batched by `nb` items.
144    ///
145    /// # Example
146    ///
147    /// ```
148    /// use par_map::ParMap;
149    /// let words = ["alpha", "beta", "gamma"];
150    /// let merged: String = words.iter()
151    ///     .cloned()
152    ///     .par_packed_flat_map(2, |s| s.chars())
153    ///     .collect();
154    /// assert_eq!(merged, "alphabetagamma");
155    /// ```
156    fn par_packed_flat_map<'a, U, F>(self, nb: usize, f: F) -> PackedFlatMap<'a, U::Item>
157    where
158        F: Sync + Send + 'static + Fn(Self::Item) -> U,
159        U: IntoIterator + 'a,
160        U::Item: Send + 'static,
161        Self::Item: Send + 'static,
162        Self: 'a,
163    {
164        self.with_nb_threads(num_cpus::get()).par_packed_flat_map(nb, f)
165    }
166
167    /// Configure the number of thread used.
168    /// If not set, the default is the number of cpus available
169    ///
170    /// # Example
171    ///
172    /// ```
173    /// use par_map::ParMap;
174    /// let a = [1, 2, 3];
175    /// let mut iter = a.iter().cloned().with_nb_threads(2).par_map(|x| 2 * x);
176    /// assert_eq!(iter.next(), Some(2));
177    /// assert_eq!(iter.next(), Some(4));
178    /// assert_eq!(iter.next(), Some(6));
179    /// assert_eq!(iter.next(), None);
180    /// ```
181    fn with_nb_threads(self, nb: usize) -> ParMapBuilder<Self> {
182        ParMapBuilder {
183            iter: self,
184            nb_threads: nb,
185        }
186    }
187}
188impl<I: Iterator> ParMap for I {}
189
190/// An iterator that maps the values of `iter` with `f`.
191///
192/// This struct is created by the `flat_map()` method on
193/// `ParIter`. See its documentation for more.
194#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
195pub struct Map<I, B, F> {
196    pool: CpuPool,
197    queue: VecDeque<CpuFuture<B, ()>>,
198    iter: I,
199    f: Arc<F>,
200}
201impl<I: Iterator, B: Send + 'static, F> Map<I, B, F>
202where
203    F: Sync + Send + 'static + Fn(I::Item) -> B,
204    I::Item: Send + 'static,
205{
206    fn spawn(&mut self) {
207        let future = match self.iter.next() {
208            None => return,
209            Some(item) => {
210                let f = self.f.clone();
211                self.pool.spawn_fn(move || Ok(f(item)))
212            }
213        };
214        self.queue.push_back(future);
215    }
216    fn with_nb_threads(iter: I, f: F, nb_thread: usize) -> Self {
217        let mut res = Map {
218            pool: CpuPool::new(nb_thread),
219            queue: VecDeque::new(),
220            iter: iter,
221            f: Arc::new(f),
222        };
223        for _ in 0..nb_thread * 2 {
224            res.spawn();
225        }
226        res
227    }
228}
229impl<I: Iterator, B: Send + 'static, F> Iterator for Map<I, B, F>
230where
231    F: Sync + Send + 'static + Fn(I::Item) -> B,
232    I::Item: Send + 'static,
233{
234    type Item = B;
235    fn next(&mut self) -> Option<Self::Item> {
236        self.queue.pop_front().map(|future| {
237            let i = future.wait().unwrap();
238            self.spawn();
239            i
240        })
241    }
242}
243
244/// An iterator that maps each element to an iterator, and yields the
245/// elements of the produced iterators.
246///
247/// This struct is created by the `par_flat_map()` method on
248/// `ParIter`.  See its documentation for more.
249#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
250pub struct FlatMap<I: Iterator, U: IntoIterator, F> {
251    pool: CpuPool,
252    queue: VecDeque<CpuFuture<Vec<U::Item>, ()>>,
253    iter: I,
254    f: Arc<F>,
255    cur_iter: ::std::vec::IntoIter<U::Item>,
256}
257impl<I: Iterator, U: IntoIterator, F> FlatMap<I, U, F>
258where
259    F: Sync + Send + 'static + Fn(I::Item) -> U,
260    U::Item: Send + 'static,
261    I::Item: Send + 'static,
262{
263    fn spawn(&mut self) {
264        let future = match self.iter.next() {
265            None => return,
266            Some(item) => {
267                let f = self.f.clone();
268                self.pool
269                    .spawn_fn(move || Ok(f(item).into_iter().collect()))
270            }
271        };
272        self.queue.push_back(future);
273    }
274    fn with_nb_threads(iter: I, f: F, nb_thread: usize) -> Self {
275        let mut res = FlatMap {
276            pool: CpuPool::new(nb_thread),
277            queue: VecDeque::new(),
278            iter: iter,
279            f: Arc::new(f),
280            cur_iter: vec![].into_iter(),
281        };
282        for _ in 0..nb_thread * 2 {
283            res.spawn();
284        }
285        res
286    }
287}
288impl<I: Iterator, U: IntoIterator, F> Iterator for FlatMap<I, U, F>
289where
290    F: Sync + Send + 'static + Fn(I::Item) -> U,
291    U::Item: Send + 'static,
292    I::Item: Send + 'static,
293{
294    type Item = U::Item;
295    fn next(&mut self) -> Option<Self::Item> {
296        loop {
297            if let Some(item) = self.cur_iter.next() {
298                return Some(item);
299            }
300            let v = match self.queue.pop_front() {
301                Some(future) => future.wait().unwrap(),
302                None => return None,
303            };
304            self.cur_iter = v.into_iter();
305            self.spawn();
306        }
307    }
308}
309
310/// An iterator that yields `Vec<Self::Item>` of size `nb` (or less on
311/// the last element).
312///
313/// This struct is created by the `pack()` method on
314/// `ParIter`.  See its documentation for more.
315#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
316pub struct Pack<I> {
317    iter: I,
318    nb: usize,
319}
320impl<I: Iterator> Iterator for Pack<I> {
321    type Item = Vec<I::Item>;
322    fn next(&mut self) -> Option<Self::Item> {
323        let item: Vec<_> = self.iter.by_ref().take(self.nb).collect();
324        if item.is_empty() {
325            None
326        } else {
327            Some(item)
328        }
329    }
330}
331
332pub_iterator_type! {
333    #[doc="As `Map` but packed."]
334    PackedMap['a, B] = Box<Iterator<Item = B> + 'a>
335}
336pub_iterator_type! {
337    #[doc="As `FlatMap` but packed."]
338    PackedFlatMap['a, T] = Box<Iterator<Item = T> + 'a>
339}
340
341/// A builder used to configure the parallele work.
342///
343/// This struct is created by the `with_nb_threads()` method on
344/// `ParIter`.  See its documentation for more.
345#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
346pub struct ParMapBuilder<I> {
347    iter: I,
348    nb_threads: usize,
349}
350
351impl<I: Iterator> ParMapBuilder<I> {
352    /// As ParMap::par_map, but with a custom number of threads
353    /// # Example
354    ///
355    /// ```
356    /// use par_map::ParMap;
357    /// let a = [1, 2, 3];
358    /// let mut iter = a.iter()
359    ///     .cloned()
360    ///     .with_nb_threads(2)
361    ///     .par_map(|x| 2 * x);
362    /// assert_eq!(iter.next(), Some(2));
363    /// assert_eq!(iter.next(), Some(4));
364    /// assert_eq!(iter.next(), Some(6));
365    /// assert_eq!(iter.next(), None);
366    /// ```
367    pub fn par_map<B, F>(self, f: F) -> Map<I, B, F>
368    where
369        F: Sync + Send + 'static + Fn(I::Item) -> B,
370        B: Send + 'static,
371        I::Item: Send + 'static,
372    {
373        Map::with_nb_threads(self.iter, f, self.nb_threads)
374    }
375
376    /// As ParMap::par_flat_map, but with a custom number of threads
377    /// # Example
378    ///
379    /// ```
380    /// use par_map::ParMap;
381    /// let words = ["alpha", "beta", "gamma"];
382    /// let merged: String = words.iter()
383    ///     .cloned() // as items must be 'static
384    ///     .with_nb_threads(2)
385    ///     .par_flat_map(|s| s.chars()) // exactly as std::iter::Iterator::flat_map
386    ///     .collect();
387    /// assert_eq!(merged, "alphabetagamma");
388    /// ```
389    pub fn par_flat_map<U, F>(self, f: F) -> FlatMap<I, U, F>
390    where
391        F: Sync + Send + 'static + Fn(I::Item) -> U,
392        U: IntoIterator,
393        U::Item: Send + 'static,
394        I::Item: Send + 'static,
395    {
396        FlatMap::with_nb_threads(self.iter, f, self.nb_threads)
397    }
398
399    /// As ParMap::par_packed_map, but with a custom number of threads
400    ///
401    /// # Example
402    ///
403    /// ```
404    /// use par_map::ParMap;
405    /// let a = [1, 2, 3];
406    /// let mut iter = a.iter()
407    ///     .cloned()
408    ///     .with_nb_threads(2)
409    ///     .par_packed_map(2, |x| 2 * x);
410    /// assert_eq!(iter.next(), Some(2));
411    /// assert_eq!(iter.next(), Some(4));
412    /// assert_eq!(iter.next(), Some(6));
413    /// assert_eq!(iter.next(), None);
414    /// ```
415    pub fn par_packed_map<'a, B, F>(self, nb: usize, f: F) -> PackedMap<'a, B>
416    where
417        F: Sync + Send + 'static + Fn(I::Item) -> B,
418        B: Send + 'static,
419        I::Item: Send + 'static,
420        Self: 'a,
421    {
422        let f = Arc::new(f);
423        let f = move |iter: Vec<I::Item>| {
424            let f = f.clone();
425            iter.into_iter().map(move |i| f(i))
426        };
427        PackedMap(Box::new(
428            self.iter
429                .pack(nb)
430                .with_nb_threads(self.nb_threads)
431                .par_flat_map(f),
432        ))
433    }
434
435    /// As ParMap::par_packed_flat_map, but with a custom number of threads
436    ///
437    /// # Example
438    ///
439    /// ```
440    /// use par_map::ParMap;
441    /// let words = ["alpha", "beta", "gamma"];
442    /// let merged: String = words.iter()
443    ///     .cloned()
444    ///     .with_nb_threads(2)
445    ///     .par_packed_flat_map(2, |s| s.chars())
446    ///     .collect();
447    /// assert_eq!(merged, "alphabetagamma");
448    /// ```
449    pub fn par_packed_flat_map<'a, U, F>(self, nb: usize, f: F) -> PackedFlatMap<'a, U::Item>
450    where
451        F: Sync + Send + 'static + Fn(I::Item) -> U,
452        U: IntoIterator + 'a,
453        U::Item: Send + 'static,
454        I::Item: Send + 'static,
455        Self: 'a,
456    {
457        let f = Arc::new(f);
458        let f = move |iter: Vec<I::Item>| {
459            let f = f.clone();
460            iter.into_iter().flat_map(move |i| f(i))
461        };
462        PackedFlatMap(Box::new(
463            self.iter
464                .pack(nb)
465                .with_nb_threads(self.nb_threads)
466                .par_flat_map(f),
467        ))
468    }
469}