par_map/lib.rs
1// Copyright (c) 2016-2017 Guillaume Pinot <texitoi(a)texitoi.eu>
2//
3// This work is free. You can redistribute it and/or modify it under
4// the terms of the Do What The Fuck You Want To Public License,
5// Version 2, as published by Sam Hocevar. See the COPYING file for
6// more details.
7
8//! This crate provides an easy way to get parallel iteration. The
9//! contract of the added method are (almost) exacly the same as the
10//! method without the `par_` prefix proposed in `std`.
11
12#[deny(missing_docs)]
13extern crate futures;
14extern crate futures_cpupool;
15extern crate num_cpus;
16#[macro_use]
17extern crate pub_iterator_type;
18
19use futures::Future;
20use futures_cpupool::{CpuFuture, CpuPool};
21use std::collections::VecDeque;
22use std::sync::Arc;
23
24/// This trait extends `std::iter::Iterator` with parallel
25/// iterator adaptors. Just `use` it to get access to the methods:
26///
27/// ```
28/// use par_map::ParMap;
29/// ```
30///
31/// Each iterator adaptor will have its own thread pool of the number
32/// of CPU. At maximum, 2 times the number of defined threads
33/// (the default is the number of cpus) will be
34/// launched in advance, guarantying that the memory will not be
35/// exceeded if the iterator is not consumed faster that the
36/// production. To be effective, the given function should be costy
37/// to compute and each call should take about the same time. The
38/// `packed` variants will do the same, processing by batch instead of
39/// doing one job for each item.
40///
41/// The `'static` constraints are needed to have such a simple
42/// interface. These adaptors are well suited for big iterators that
43/// can't be collected into a `Vec`. Else, crates such as `rayon` are
44/// more suited for this kind of task.
45pub trait ParMap: Iterator + Sized {
46 /// Takes a closure and creates an iterator which calls that
47 /// closure on each element, exactly as
48 /// `std::iter::Iterator::map`.
49 ///
50 /// The order of the elements are guaranted to be unchanged. Of
51 /// course, the given closures can be executed in parallel out of
52 /// order.
53 ///
54 /// # Example
55 ///
56 /// ```
57 /// use par_map::ParMap;
58 /// let a = [1, 2, 3];
59 /// let mut iter = a.iter().cloned().par_map(|x| 2 * x);
60 /// assert_eq!(iter.next(), Some(2));
61 /// assert_eq!(iter.next(), Some(4));
62 /// assert_eq!(iter.next(), Some(6));
63 /// assert_eq!(iter.next(), None);
64 /// ```
65 fn par_map<B, F>(self, f: F) -> Map<Self, B, F>
66 where
67 F: Sync + Send + 'static + Fn(Self::Item) -> B,
68 B: Send + 'static,
69 Self::Item: Send + 'static,
70 {
71 self.with_nb_threads(num_cpus::get()).par_map(f)
72 }
73
74 /// Creates an iterator that works like map, but flattens nested
75 /// structure, exactly as `std::iter::Iterator::flat_map`.
76 ///
77 /// The order of the elements are guaranted to be unchanged. Of
78 /// course, the given closures can be executed in parallel out of
79 /// order.
80 ///
81 /// # Example
82 ///
83 /// ```
84 /// use par_map::ParMap;
85 /// let words = ["alpha", "beta", "gamma"];
86 /// let merged: String = words.iter()
87 /// .cloned() // as items must be 'static
88 /// .par_flat_map(|s| s.chars()) // exactly as std::iter::Iterator::flat_map
89 /// .collect();
90 /// assert_eq!(merged, "alphabetagamma");
91 /// ```
92 fn par_flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
93 where
94 F: Sync + Send + 'static + Fn(Self::Item) -> U,
95 U: IntoIterator,
96 U::Item: Send + 'static,
97 Self::Item: Send + 'static,
98 {
99 self.with_nb_threads(num_cpus::get()).par_flat_map(f)
100 }
101
102 /// Creates an iterator that yields `Vec<Self::Item>` of size `nb`
103 /// (or less on the last element).
104 ///
105 /// # Example
106 ///
107 /// ```
108 /// use par_map::ParMap;
109 /// let nbs = [1, 2, 3, 4, 5, 6, 7];
110 /// let mut iter = nbs.iter().cloned().pack(3);
111 /// assert_eq!(Some(vec![1, 2, 3]), iter.next());
112 /// assert_eq!(Some(vec![4, 5, 6]), iter.next());
113 /// assert_eq!(Some(vec![7]), iter.next());
114 /// assert_eq!(None, iter.next());
115 /// ```
116 fn pack(self, nb: usize) -> Pack<Self> {
117 Pack { iter: self, nb: nb }
118 }
119
120 /// Same as `par_map`, but the parallel work is batched by `nb` items.
121 ///
122 /// # Example
123 ///
124 /// ```
125 /// use par_map::ParMap;
126 /// let a = [1, 2, 3];
127 /// let mut iter = a.iter().cloned().par_packed_map(2, |x| 2 * x);
128 /// assert_eq!(iter.next(), Some(2));
129 /// assert_eq!(iter.next(), Some(4));
130 /// assert_eq!(iter.next(), Some(6));
131 /// assert_eq!(iter.next(), None);
132 /// ```
133 fn par_packed_map<'a, B, F>(self, nb: usize, f: F) -> PackedMap<'a, B>
134 where
135 F: Sync + Send + 'static + Fn(Self::Item) -> B,
136 B: Send + 'static,
137 Self::Item: Send + 'static,
138 Self: 'a,
139 {
140 self.with_nb_threads(num_cpus::get()).par_packed_map(nb, f)
141 }
142
143 /// Same as `par_flat_map`, but the parallel work is batched by `nb` items.
144 ///
145 /// # Example
146 ///
147 /// ```
148 /// use par_map::ParMap;
149 /// let words = ["alpha", "beta", "gamma"];
150 /// let merged: String = words.iter()
151 /// .cloned()
152 /// .par_packed_flat_map(2, |s| s.chars())
153 /// .collect();
154 /// assert_eq!(merged, "alphabetagamma");
155 /// ```
156 fn par_packed_flat_map<'a, U, F>(self, nb: usize, f: F) -> PackedFlatMap<'a, U::Item>
157 where
158 F: Sync + Send + 'static + Fn(Self::Item) -> U,
159 U: IntoIterator + 'a,
160 U::Item: Send + 'static,
161 Self::Item: Send + 'static,
162 Self: 'a,
163 {
164 self.with_nb_threads(num_cpus::get()).par_packed_flat_map(nb, f)
165 }
166
167 /// Configure the number of thread used.
168 /// If not set, the default is the number of cpus available
169 ///
170 /// # Example
171 ///
172 /// ```
173 /// use par_map::ParMap;
174 /// let a = [1, 2, 3];
175 /// let mut iter = a.iter().cloned().with_nb_threads(2).par_map(|x| 2 * x);
176 /// assert_eq!(iter.next(), Some(2));
177 /// assert_eq!(iter.next(), Some(4));
178 /// assert_eq!(iter.next(), Some(6));
179 /// assert_eq!(iter.next(), None);
180 /// ```
181 fn with_nb_threads(self, nb: usize) -> ParMapBuilder<Self> {
182 ParMapBuilder {
183 iter: self,
184 nb_threads: nb,
185 }
186 }
187}
188impl<I: Iterator> ParMap for I {}
189
190/// An iterator that maps the values of `iter` with `f`.
191///
192/// This struct is created by the `flat_map()` method on
193/// `ParIter`. See its documentation for more.
194#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
195pub struct Map<I, B, F> {
196 pool: CpuPool,
197 queue: VecDeque<CpuFuture<B, ()>>,
198 iter: I,
199 f: Arc<F>,
200}
201impl<I: Iterator, B: Send + 'static, F> Map<I, B, F>
202where
203 F: Sync + Send + 'static + Fn(I::Item) -> B,
204 I::Item: Send + 'static,
205{
206 fn spawn(&mut self) {
207 let future = match self.iter.next() {
208 None => return,
209 Some(item) => {
210 let f = self.f.clone();
211 self.pool.spawn_fn(move || Ok(f(item)))
212 }
213 };
214 self.queue.push_back(future);
215 }
216 fn with_nb_threads(iter: I, f: F, nb_thread: usize) -> Self {
217 let mut res = Map {
218 pool: CpuPool::new(nb_thread),
219 queue: VecDeque::new(),
220 iter: iter,
221 f: Arc::new(f),
222 };
223 for _ in 0..nb_thread * 2 {
224 res.spawn();
225 }
226 res
227 }
228}
229impl<I: Iterator, B: Send + 'static, F> Iterator for Map<I, B, F>
230where
231 F: Sync + Send + 'static + Fn(I::Item) -> B,
232 I::Item: Send + 'static,
233{
234 type Item = B;
235 fn next(&mut self) -> Option<Self::Item> {
236 self.queue.pop_front().map(|future| {
237 let i = future.wait().unwrap();
238 self.spawn();
239 i
240 })
241 }
242}
243
244/// An iterator that maps each element to an iterator, and yields the
245/// elements of the produced iterators.
246///
247/// This struct is created by the `par_flat_map()` method on
248/// `ParIter`. See its documentation for more.
249#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
250pub struct FlatMap<I: Iterator, U: IntoIterator, F> {
251 pool: CpuPool,
252 queue: VecDeque<CpuFuture<Vec<U::Item>, ()>>,
253 iter: I,
254 f: Arc<F>,
255 cur_iter: ::std::vec::IntoIter<U::Item>,
256}
257impl<I: Iterator, U: IntoIterator, F> FlatMap<I, U, F>
258where
259 F: Sync + Send + 'static + Fn(I::Item) -> U,
260 U::Item: Send + 'static,
261 I::Item: Send + 'static,
262{
263 fn spawn(&mut self) {
264 let future = match self.iter.next() {
265 None => return,
266 Some(item) => {
267 let f = self.f.clone();
268 self.pool
269 .spawn_fn(move || Ok(f(item).into_iter().collect()))
270 }
271 };
272 self.queue.push_back(future);
273 }
274 fn with_nb_threads(iter: I, f: F, nb_thread: usize) -> Self {
275 let mut res = FlatMap {
276 pool: CpuPool::new(nb_thread),
277 queue: VecDeque::new(),
278 iter: iter,
279 f: Arc::new(f),
280 cur_iter: vec![].into_iter(),
281 };
282 for _ in 0..nb_thread * 2 {
283 res.spawn();
284 }
285 res
286 }
287}
288impl<I: Iterator, U: IntoIterator, F> Iterator for FlatMap<I, U, F>
289where
290 F: Sync + Send + 'static + Fn(I::Item) -> U,
291 U::Item: Send + 'static,
292 I::Item: Send + 'static,
293{
294 type Item = U::Item;
295 fn next(&mut self) -> Option<Self::Item> {
296 loop {
297 if let Some(item) = self.cur_iter.next() {
298 return Some(item);
299 }
300 let v = match self.queue.pop_front() {
301 Some(future) => future.wait().unwrap(),
302 None => return None,
303 };
304 self.cur_iter = v.into_iter();
305 self.spawn();
306 }
307 }
308}
309
310/// An iterator that yields `Vec<Self::Item>` of size `nb` (or less on
311/// the last element).
312///
313/// This struct is created by the `pack()` method on
314/// `ParIter`. See its documentation for more.
315#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
316pub struct Pack<I> {
317 iter: I,
318 nb: usize,
319}
320impl<I: Iterator> Iterator for Pack<I> {
321 type Item = Vec<I::Item>;
322 fn next(&mut self) -> Option<Self::Item> {
323 let item: Vec<_> = self.iter.by_ref().take(self.nb).collect();
324 if item.is_empty() {
325 None
326 } else {
327 Some(item)
328 }
329 }
330}
331
332pub_iterator_type! {
333 #[doc="As `Map` but packed."]
334 PackedMap['a, B] = Box<Iterator<Item = B> + 'a>
335}
336pub_iterator_type! {
337 #[doc="As `FlatMap` but packed."]
338 PackedFlatMap['a, T] = Box<Iterator<Item = T> + 'a>
339}
340
341/// A builder used to configure the parallele work.
342///
343/// This struct is created by the `with_nb_threads()` method on
344/// `ParIter`. See its documentation for more.
345#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
346pub struct ParMapBuilder<I> {
347 iter: I,
348 nb_threads: usize,
349}
350
351impl<I: Iterator> ParMapBuilder<I> {
352 /// As ParMap::par_map, but with a custom number of threads
353 /// # Example
354 ///
355 /// ```
356 /// use par_map::ParMap;
357 /// let a = [1, 2, 3];
358 /// let mut iter = a.iter()
359 /// .cloned()
360 /// .with_nb_threads(2)
361 /// .par_map(|x| 2 * x);
362 /// assert_eq!(iter.next(), Some(2));
363 /// assert_eq!(iter.next(), Some(4));
364 /// assert_eq!(iter.next(), Some(6));
365 /// assert_eq!(iter.next(), None);
366 /// ```
367 pub fn par_map<B, F>(self, f: F) -> Map<I, B, F>
368 where
369 F: Sync + Send + 'static + Fn(I::Item) -> B,
370 B: Send + 'static,
371 I::Item: Send + 'static,
372 {
373 Map::with_nb_threads(self.iter, f, self.nb_threads)
374 }
375
376 /// As ParMap::par_flat_map, but with a custom number of threads
377 /// # Example
378 ///
379 /// ```
380 /// use par_map::ParMap;
381 /// let words = ["alpha", "beta", "gamma"];
382 /// let merged: String = words.iter()
383 /// .cloned() // as items must be 'static
384 /// .with_nb_threads(2)
385 /// .par_flat_map(|s| s.chars()) // exactly as std::iter::Iterator::flat_map
386 /// .collect();
387 /// assert_eq!(merged, "alphabetagamma");
388 /// ```
389 pub fn par_flat_map<U, F>(self, f: F) -> FlatMap<I, U, F>
390 where
391 F: Sync + Send + 'static + Fn(I::Item) -> U,
392 U: IntoIterator,
393 U::Item: Send + 'static,
394 I::Item: Send + 'static,
395 {
396 FlatMap::with_nb_threads(self.iter, f, self.nb_threads)
397 }
398
399 /// As ParMap::par_packed_map, but with a custom number of threads
400 ///
401 /// # Example
402 ///
403 /// ```
404 /// use par_map::ParMap;
405 /// let a = [1, 2, 3];
406 /// let mut iter = a.iter()
407 /// .cloned()
408 /// .with_nb_threads(2)
409 /// .par_packed_map(2, |x| 2 * x);
410 /// assert_eq!(iter.next(), Some(2));
411 /// assert_eq!(iter.next(), Some(4));
412 /// assert_eq!(iter.next(), Some(6));
413 /// assert_eq!(iter.next(), None);
414 /// ```
415 pub fn par_packed_map<'a, B, F>(self, nb: usize, f: F) -> PackedMap<'a, B>
416 where
417 F: Sync + Send + 'static + Fn(I::Item) -> B,
418 B: Send + 'static,
419 I::Item: Send + 'static,
420 Self: 'a,
421 {
422 let f = Arc::new(f);
423 let f = move |iter: Vec<I::Item>| {
424 let f = f.clone();
425 iter.into_iter().map(move |i| f(i))
426 };
427 PackedMap(Box::new(
428 self.iter
429 .pack(nb)
430 .with_nb_threads(self.nb_threads)
431 .par_flat_map(f),
432 ))
433 }
434
435 /// As ParMap::par_packed_flat_map, but with a custom number of threads
436 ///
437 /// # Example
438 ///
439 /// ```
440 /// use par_map::ParMap;
441 /// let words = ["alpha", "beta", "gamma"];
442 /// let merged: String = words.iter()
443 /// .cloned()
444 /// .with_nb_threads(2)
445 /// .par_packed_flat_map(2, |s| s.chars())
446 /// .collect();
447 /// assert_eq!(merged, "alphabetagamma");
448 /// ```
449 pub fn par_packed_flat_map<'a, U, F>(self, nb: usize, f: F) -> PackedFlatMap<'a, U::Item>
450 where
451 F: Sync + Send + 'static + Fn(I::Item) -> U,
452 U: IntoIterator + 'a,
453 U::Item: Send + 'static,
454 I::Item: Send + 'static,
455 Self: 'a,
456 {
457 let f = Arc::new(f);
458 let f = move |iter: Vec<I::Item>| {
459 let f = f.clone();
460 iter.into_iter().flat_map(move |i| f(i))
461 };
462 PackedFlatMap(Box::new(
463 self.iter
464 .pack(nb)
465 .with_nb_threads(self.nb_threads)
466 .par_flat_map(f),
467 ))
468 }
469}