par_iter_sync/
lib.rs

1#![cfg_attr(feature = "bench", feature(test))]
2//!
3//! # par_iter_sync: Parallel Iterator With Sequential Output
4//!
5//! Crate like `rayon` do not offer synchronization mechanism.
6//! This crate provides easy mixture of parallelism and synchronization.
7//! Execute tasks in concurrency with synchronization at any steps.
8//!
9//! Consider the case where multiple threads share a cache which can be read
10//! only after prior tasks have written to it (e.g., reads of task 4 depends
11//! on writes of task 1-4).
12//!
13//! Using `IntoParallelIteratorSync` trait
14//!```
15//! // in concurrency: task1 write | task2 write | task3 write | task4 write
16//! //                      \_____________\_____________\_____________\
17//! //             task4 read depends on task 1-4 write  \___________
18//! //                                                               \
19//! // in concurrency:              | task2 read  | task3 read  | task4 read
20//!
21//! use par_iter_sync::IntoParallelIteratorSync;
22//! use std::sync::{Arc, Mutex};
23//! use std::collections::HashSet;
24//!
25//! // there are 100 tasks
26//! let tasks = 0..100;
27//!
28//! // an in-memory cache for integers
29//! let cache: Arc<Mutex<HashSet<i32>>> = Arc::new(Mutex::new(HashSet::new()));
30//! let cache_clone = cache.clone();
31//!
32//! // iterate through tasks
33//! tasks.into_par_iter_sync(move |task_number| {
34//!
35//!     // writes cache (write the integer in cache), in parallel
36//!     cache.lock().unwrap().insert(task_number);
37//!     // return the task number to the next iterator
38//!     Ok(task_number)
39//!
40//! }).into_par_iter_sync(move |task_number| { // <- synced to sequential order
41//!
42//!     // reads
43//!     assert!(cache_clone.lock().unwrap().contains(&task_number));
44//!     Ok(())
45//! // append a for each to actually run the whole chain
46//! }).for_each(|_| ());
47//!```
48//!
49//! ## Usage Caveat
50//! This crate is designed to clone all resources captured by the closure
51//! for each thread. To prevent unintended RAM usage, you may wrap
52//! large data structure using `Arc`.
53//!
54//! ## Sequential Consistency
55//! The output order is guaranteed to be the same as the upstream iterator,
56//! but the execution order is not sequential.
57//!
58//! ## Examples
59//!
60//! ### Mix Syncing and Parallelism By Chaining
61//! ```
62//! use par_iter_sync::IntoParallelIteratorSync;
63//!
64//! (0..100).into_par_iter_sync(|i| {
65//!     Ok(i)                   // <~ async execution
66//! }).into_par_iter_sync(|i| { // <- sync order
67//!     Ok(i)                   // <~async execution
68//! }).into_par_iter_sync(|i| { // <- sync order
69//!     Ok(i)                   // <~async execution
70//! }).for_each(|x| ());        // <- sync order
71//! ```
72//!
73//! ### Use `std::iter::IntoIterator` interface
74//! ```
75//! use par_iter_sync::IntoParallelIteratorSync;
76//!
77//! let mut count = 0;
78//!
79//! // for loop
80//! for i in (0..100).into_par_iter_sync(|i| Ok(i)) {
81//!     assert_eq!(i, count);
82//!     count += 1;
83//! }
84//!
85//! // sum
86//! let sum: i32 = (1..=100).into_par_iter_sync(|i| Ok(i)).sum();
87//!
88//! // skip, take and collect
89//! let results: Vec<i32> = (0..10)
90//!     .into_par_iter_sync(|i| Ok(i))
91//!     .skip(1)
92//!     .take(5)
93//!     .collect();
94//!
95//! assert_eq!(sum, 5050);
96//! assert_eq!(results, vec![1, 2, 3, 4, 5])
97//! ```
98//!
99//! ### Bridge To Rayon
100//! ```
101//! use par_iter_sync::IntoParallelIteratorSync;
102//! use rayon::prelude::*;
103//!
104//! // sum with rayon
105//! let sum: i32 = (1..=100)
106//!     .into_par_iter_sync(|i| Ok(i))
107//!     .par_bridge()    // <- switch to rayon
108//!     .into_par_iter()
109//!     .sum();
110//!
111//! assert_eq!(sum, 5050);
112//! ```
113//!
114//! ### Closure Captures Variables
115//! Variables captured are cloned to each thread automatically.
116//! ```
117//! use par_iter_sync::IntoParallelIteratorSync;
118//! use std::sync::Arc;
119//!
120//! // use `Arc` to save RAM
121//! let resource_captured = Arc::new(vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3]);
122//! let len = resource_captured.len();
123//!
124//! let result_iter = (0..len).into_par_iter_sync(move |i| {
125//!     // `resource_captured` is moved into the closure
126//!     // and cloned to worker threads.
127//!     let read_from_resource = resource_captured.get(i).unwrap();
128//!     Ok(*read_from_resource)
129//! });
130//!
131//! // the result is produced in sequential order
132//! let collected: Vec<i32> = result_iter.collect();
133//! assert_eq!(collected, vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3])
134//! ```
135//!
136//! ### Fast Fail During Exception
137//! The iterator stops once the inner function returns an `Err`.
138//! ```
139//! use par_iter_sync::IntoParallelIteratorSync;
140//! use std::sync::Arc;
141//! use log::warn;
142//!
143//! /// this function returns `Err` when it reads 1000
144//! fn error_at_1000(n: i32) -> Result<i32, ()> {
145//!     if n == 1000 {
146//!         // you may log this error
147//!         warn!("Some Error Occurs");
148//!         Err(())
149//!     } else {
150//!         Ok(n)
151//!     }
152//! }
153//!
154//! let results: Vec<i32> = (0..10000).into_par_iter_sync(move |a| {
155//!     Ok(a)
156//! }).into_par_iter_sync(move |a| {
157//!     // error at 1000
158//!     error_at_1000(a)
159//! }).into_par_iter_sync(move |a| {
160//!     Ok(a)
161//! }).collect();
162//!
163//! let expected: Vec<i32> = (0..1000).collect();
164//! assert_eq!(results, expected)
165//! ```
166//!
167//! #### You may choose to skip error
168//! If you do not want to stop on `Err`, this is a workaround.
169//! ```
170//! use par_iter_sync::IntoParallelIteratorSync;
171//! use std::sync::Arc;
172//!
173//! let results: Vec<Result<i32, ()>> = (0..5).into_par_iter_sync(move |n| {
174//!     // error at 3, but skip
175//!     if n == 3 {
176//!         Ok(Err(()))
177//!     } else {
178//!         Ok(Ok(n))
179//!     }
180//! }).collect();
181//!
182//! assert_eq!(results, vec![Ok(0), Ok(1), Ok(2), Err(()), Ok(4)])
183//! ```
184//! ## Overhead Benchmark
185//! Platform: Macbook Air (2015 Late) 8 GB RAM, Intel Core i5, 1.6GHZ (2 Core).
186//!
187//! ### Result
188//! One million (1,000,000) empty iteration for each run.
189//! ```text
190//! test iter_async::test_par_iter_async::bench_into_par_iter_async
191//!     ... bench: 110,277,577 ns/iter (+/- 28,510,054)
192//!
193//! test test_par_iter::bench_into_par_iter_sync
194//!     ... bench: 121,063,787 ns/iter (+/- 103,787,056)
195//! ```
196//!
197//! Result:
198//! - Async iterator overhead `110 ns (+/-  28 ns)`.
199//! - Sync iterator overhead  `121 ns (+/- 103 ns)`.
200//!
201//! ## Implementation Note
202//!
203//! ### Output Buffering
204//! - Each worker use a synced single-producer mpsc channel to buffer outputs.
205//!   So, when a thread is waiting for its turn to get polled, it does not
206//!   get blocked. The channel size is hard-coded to 100 for each thread.
207//! - The number of threads equals to the number of logical cores.
208//!
209//! ### Synchronization and Exception Handling
210//! - When each thread fetch a task, it registers its thread ID and task ID into a registry.
211//! - When `next()` is called, the consumer fetch from the task registry the next thread ID.
212//! - `next()` returns None if there is no more task or if some Error occurs.
213//!
214mod iter_async;
215
216use crossbeam::channel::{bounded, Receiver};
217use crossbeam::sync::{Parker, Unparker};
218use crossbeam::utils::Backoff;
219pub use iter_async::*;
220use num_cpus;
221use std::ops::Deref;
222use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
223use std::sync::Arc;
224use std::thread;
225use std::thread::JoinHandle;
226use std::time::Duration;
227
228const MAX_SIZE_FOR_THREAD: usize = 128;
229const BUFFER_SIZE: usize = 64;
230
231///
232/// lock-free sequential parallel iterator
233///
234pub trait IntoParallelIteratorSync<R, T, TL, F>
235    where
236        F: Send + Clone + 'static + Fn(T) -> Result<R, ()>,
237        T: Send + 'static,
238        TL: Send + IntoIterator<Item = T> + 'static,
239        <TL as IntoIterator>::IntoIter: Send + 'static,
240        R: Send,
241{
242    ///
243    /// # Usage
244    ///
245    /// This method executes `func` in parallel.
246    ///
247    /// The `func` is a closure that takes the returned elements
248    /// from the upstream iterator as argument and returns
249    /// some `Result(R, ())`.
250    ///
251    /// This iterator would return type `R` when it gets `Ok(R)`
252    /// and stops when it gets an `Err(())`.
253    ///
254    /// ## Example
255    ///
256    /// ```
257    /// use par_iter_sync::IntoParallelIteratorSync;
258    ///
259    /// let mut count = 0;
260    ///
261    /// // for loop
262    /// for i in (0..100).into_par_iter_sync(|i| Ok(i)) {
263    ///     assert_eq!(i, count);
264    ///     count += 1;
265    /// }
266    ///
267    /// // sum
268    /// let sum: i32 = (1..=100).into_par_iter_sync(|i| Ok(i)).sum();
269    ///
270    /// // take and collect
271    /// let results: Vec<i32> = (0..10).into_par_iter_sync(|i| Ok(i)).take(5).collect();
272    ///
273    /// assert_eq!(sum, 5050);
274    /// assert_eq!(results, vec![0, 1, 2, 3, 4])
275    /// ```
276    ///
277    /// If the result is not polled using `next()`,
278    /// the parallel execution will stop and wait.
279    ///
280    /// ## Sequential Consistency
281    /// The output order is guaranteed to be the same as the provided iterator.
282    ///
283    /// See [crate] module-level doc.
284    ///
285    fn into_par_iter_sync(self, func: F) -> ParIterSync<R>;
286}
287
288impl<R, T, TL, F> IntoParallelIteratorSync<R, T, TL, F> for TL
289    where
290        F: Send + Clone + 'static + Fn(T) -> Result<R, ()>,
291        T: Send + 'static,
292        TL: Send + IntoIterator<Item = T> + 'static,
293        <TL as IntoIterator>::IntoIter: Send + 'static,
294        R: Send + 'static,
295{
296    fn into_par_iter_sync(self, func: F) -> ParIterSync<R> {
297        ParIterSync::new(self, func)
298    }
299}
300
301///
302/// A lookup table to register and look up corresponding thread id for a task.
303///
304/// `-1` represents the task ID is not yet registered
305/// or the task does not exist.
306///
307struct TaskRegistry {
308    // vector of thread ID
309    inner: Arc<Vec<AtomicIsize>>,
310    parkers: Vec<Parker>,
311}
312
313impl Deref for TaskRegistry {
314    type Target = Vec<AtomicIsize>;
315
316    fn deref(&self) -> &Self::Target {
317        self.inner.deref()
318    }
319}
320
321/// Write client for Task registry
322struct TaskRegistryWrite {
323    inner: Arc<Vec<AtomicIsize>>,
324    unparkers: Vec<Unparker>
325}
326
327impl Deref for TaskRegistryWrite {
328    type Target = Vec<AtomicIsize>;
329
330    fn deref(&self) -> &Self::Target {
331        self.inner.deref()
332    }
333}
334
335impl Drop for TaskRegistryWrite {
336    fn drop(&mut self) {
337        for unparker in &self.unparkers {
338            unparker.unpark();
339        }
340    }
341}
342
343impl TaskRegistry {
344
345    ///
346    /// Initialize the registry with `-1` to represent an empty registry
347    ///
348    /// The `size` must be (just) big enough to ensure that no key collision would
349    /// possibly occur.
350    ///
351    fn new(size: usize) -> TaskRegistry {
352        TaskRegistry {
353            inner: Arc::new((0..size).map(|_| AtomicIsize::new(-1)).collect()),
354            parkers: (0..size).map(|_| Parker::new()).collect()
355        }
356    }
357
358    ///
359    /// Look up a thread_number of a task and set that slot to `-1`.
360    ///
361    /// This function blocks to wait for a task to be registered,
362    /// unless all worker threads have stopped so that no more new
363    /// task can possibly be registered.
364    ///
365    /// It should block very rarely since task dispatcher is not blocking,
366    /// and is registered immediately after fetching in `get_task`.
367    ///
368    /// returns `None` only when all worker threads have stopped
369    ///
370    #[inline(always)]
371    pub(crate) fn lookup(&self, task_id: usize) -> Option<isize> {
372        let registry_len = self.len();
373        let pos = TaskRegistry::id_to_key(task_id, registry_len);
374        let backoff = Backoff::new();
375        loop {
376            // check if worker threads are still active
377            if !self.is_disconnected() {
378                let thread_num = self[pos].swap(-1, Ordering::SeqCst);
379                // if `-1` is read, would continue in the loop
380                if thread_num >= 0 {
381                    return Some(thread_num);
382                } else {
383                    // snooze
384                    if backoff.is_completed() {
385                        // park but no more than 500 millis
386                        self.parkers[pos].park_timeout(Duration::from_millis(500));
387                    } else {
388                        backoff.snooze();
389                    }
390                }
391                // if worker threads are no more active, might return `None`
392            } else {
393                let thread_num = self[pos].swap(-1, Ordering::SeqCst);
394                return if thread_num >= 0 {
395                    Some(thread_num)
396                } else {
397                    None
398                };
399            }
400        }
401    }
402
403    /// key of task ID in registry
404    #[inline(always)]
405    fn id_to_key(task_id: usize, registry_len: usize) -> usize {
406        task_id % registry_len
407    }
408
409    fn to_write(&self) -> TaskRegistryWrite {
410        TaskRegistryWrite {
411            inner: self.inner.clone(),
412            unparkers: self.parkers.iter().map(|p| p.unparker().clone()).collect(),
413        }
414    }
415
416    #[inline(always)]
417    fn is_disconnected(&self) -> bool {
418        Arc::strong_count(&self.inner) == 1
419    }
420}
421
422impl TaskRegistryWrite {
423    ///
424    /// When a worker fetches a new task, it calls `register` to tell
425    /// the user thread its own thread ID.
426    ///
427    /// register the worker thread number of a task
428    ///
429    #[inline(always)]
430    pub(crate) fn register(&self, task_id: usize, thread_id: isize) {
431        let registry_len = self.len();
432        let key = TaskRegistry::id_to_key(task_id, registry_len);
433        // never overwrite
434        debug_assert_eq!(self[key].load(Ordering::SeqCst), -1);
435        self[key].store(thread_id, Ordering::SeqCst);
436        self.unparkers[key].unpark();
437    }
438}
439
440///
441/// implementation of lock-free sequential parallel iterator
442///
443pub struct ParIterSync<R> {
444
445    /// Result receivers, one for each worker thread
446    output_receivers: Vec<Receiver<R>>,
447
448    /// Lookup table to register worker thread number corresponding to tasks
449    task_registry: TaskRegistry,
450
451    /// handles to join worker threads
452    worker_thread: Option<Vec<JoinHandle<()>>>,
453
454    /// atomic flag to stop workers from fetching new tasks
455    iterator_stopper: Arc<AtomicBool>,
456
457    /// if this is `true`, it must guarantee that all worker threads have stopped
458    is_killed: bool,
459
460    /// current task id
461    current: usize,
462}
463
464impl<R> ParIterSync<R>
465    where
466        R: Send + 'static,
467{
468    ///
469    /// the worker threads are dispatched in this `new` constructor!
470    ///
471    pub fn new<T, TL, F>(tasks: TL, task_executor: F) -> Self
472        where
473            F: Send + Clone + 'static + Fn(T) -> Result<R, ()>,
474            T: Send + 'static,
475            TL: Send + IntoIterator<Item = T> + 'static,
476            <TL as IntoIterator>::IntoIter: Send + 'static,
477    {
478        let cpus = num_cpus::get();
479        let iterator_stopper = Arc::new(AtomicBool::new(false));
480
481        // `(1 + MAX_SIZE_FOR_THREAD)` * cpus as there might be one more fetching after send blocking
482        let task_registry: TaskRegistry = TaskRegistry::new((1 + MAX_SIZE_FOR_THREAD) * cpus);
483
484        // this thread dispatches tasks to worker threads
485        let (dispatcher, task_receiver) = bounded(BUFFER_SIZE);
486        let sender_thread = thread::spawn(move || {
487            for (task_id, t) in tasks.into_iter().enumerate() {
488                if dispatcher.send((t, task_id)).is_err() {
489                    break;
490                }
491            }
492        });
493
494        // spawn worker threads
495        let mut handles = Vec::with_capacity(cpus + 1);
496        let mut output_receivers = Vec::with_capacity(cpus);
497        for thread_number in 0..cpus as isize {
498            let (output_sender, output_receiver) = bounded(MAX_SIZE_FOR_THREAD);
499            let task_receiver = task_receiver.clone();
500            let task_registry = task_registry.to_write();
501            let iterator_stopper = iterator_stopper.clone();
502            let task_executor = task_executor.clone();
503
504            // workers
505            let handle = thread::spawn(move || {
506                loop {
507                    // check stopper flag
508                    if iterator_stopper.load(Ordering::SeqCst) {
509                        break;
510                    }
511                    //  fetch task and register thread number
512                    match get_task(&task_receiver, &task_registry, thread_number) {
513                        // stop if no more task
514                        None => break,
515                        Some(task) => match task_executor(task) {
516                            Ok(blk) => {
517                                // send output
518                                output_sender.send(blk).unwrap();
519                            }
520                            Err(_) => {
521                                // stop other thread when Error is returned
522                                iterator_stopper.fetch_or(true, Ordering::SeqCst);
523                                break;
524                            }
525                        },
526                    }
527                }
528            });
529            output_receivers.push(output_receiver);
530            handles.push(handle);
531        }
532        handles.push(sender_thread);
533
534        ParIterSync {
535            output_receivers,
536            task_registry,
537            worker_thread: Some(handles),
538            iterator_stopper,
539            is_killed: false,
540            current: 0,
541        }
542    }
543}
544
545impl<R> ParIterSync<R> {
546    ///
547    /// - stop workers from fetching new tasks
548    /// - pull one result from each worker to prevent `send` blocking
549    ///
550    pub fn kill(&mut self) {
551        if !self.is_killed {
552            // stop threads from getting new tasks
553            self.iterator_stopper.fetch_or(true, Ordering::SeqCst);
554            // receive one for each channel to prevent blocking
555            for receiver in &self.output_receivers {
556                let _ = receiver.try_recv();
557            }
558            // loop break only when task_order is dropped (all workers have stopped)
559            self.is_killed = true;
560        }
561    }
562}
563
564///
565/// A helper function to receive task from task receiver.
566/// - it also registers thread ID into task registry immediately.
567///
568/// It guarantees to return None if and only if there is no more new task.
569///
570#[inline(always)]
571fn get_task<T>(
572    tasks: &Receiver<(T, usize)>,
573    registry: &TaskRegistryWrite,
574    thread_number: isize,
575) -> Option<T>
576    where
577        T: Send,
578{
579    // lock task list
580    // let mut task = tasks.lock().unwrap();
581    // registry task stealing
582    match tasks.recv() {
583        Ok((task, task_id)) => {
584            registry.register(task_id, thread_number);
585            Some(task)
586        }
587        Err(_) => None,
588    }
589}
590
591impl<R> Iterator for ParIterSync<R> {
592    type Item = R;
593
594    ///
595    /// The output API, use next to fetch result from the iterator.
596    ///
597    fn next(&mut self) -> Option<Self::Item> {
598        if self.is_killed {
599            return None;
600        }
601
602        // look up which thread to fetch result from
603        match self.task_registry.lookup(self.current) {
604            // no more task
605            None => None,
606            Some(thread_num) => {
607                match self.output_receivers[thread_num as usize].recv() {
608                    Ok(block) => {
609                        self.current += 1;
610                        Some(block)
611                    }
612                    // some worker have stopped
613                    Err(_) => {
614                        self.kill();
615                        None
616                    }
617                }
618            }
619        }
620    }
621}
622
623impl<R> ParIterSync<R> {
624    ///
625    /// Join worker threads. This can be only called once.
626    /// Otherwise it results in panic.
627    /// This is automatically called in `join()`
628    ///
629    fn join(&mut self) {
630        for handle in self.worker_thread.take().unwrap() {
631            handle.join().unwrap()
632        }
633    }
634}
635
636impl<R> Drop for ParIterSync<R> {
637    ///
638    /// Stop worker threads, join the threads.
639    ///
640    fn drop(&mut self) {
641        self.kill();
642        self.join();
643    }
644}
645
646#[cfg(test)]
647mod test_par_iter {
648    #[cfg(feature = "bench")]
649    extern crate test;
650    use crate::IntoParallelIteratorSync;
651    #[cfg(feature = "bench")]
652    use test::Bencher;
653
654    fn error_at_1000(test_vec: &Vec<i32>, a: i32) -> Result<i32, ()> {
655        let n = test_vec.get(a as usize).unwrap().to_owned();
656        if n == 1000 {
657            Err(())
658        } else {
659            Ok(n)
660        }
661    }
662
663    #[test]
664    fn par_iter_test_exception() {
665        for _ in 0..100 {
666            let resource_captured = vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3];
667            let results_expected = vec![3, 1, 4, 1];
668
669            // if Err(()) is returned, the iterator stops early
670            let results: Vec<i32> = (0..resource_captured.len())
671                .into_par_iter_sync(move |a| {
672                    let n = resource_captured.get(a).unwrap().to_owned();
673                    if n == 5 {
674                        Err(())
675                    } else {
676                        Ok(n)
677                    }
678                })
679                .collect();
680
681            assert_eq!(results, results_expected)
682        }
683    }
684
685    ///
686    /// The iterators can be chained.
687    ///
688    /// par_iter_0 -> owned by -> par_iter_1 -> owned by -> par_iter_2
689    ///
690    /// par_iter_1 exception at height 1000,
691    ///
692    /// the final output should contain 0..1000;
693    ///
694    #[test]
695    fn par_iter_chained_exception() {
696        for _ in 0..100 {
697            let resource_captured: Vec<i32> = (0..10000).collect();
698            let resource_captured_1 = resource_captured.clone();
699            let resource_captured_2 = resource_captured.clone();
700            let results_expected: Vec<i32> = (0..1000).collect();
701
702            let results: Vec<i32> = (0..resource_captured.len())
703                .into_par_iter_sync(move |a| Ok(resource_captured.get(a).unwrap().to_owned()))
704                .into_par_iter_sync(move |a| error_at_1000(&resource_captured_1, a))
705                .into_par_iter_sync(move |a| {
706                    Ok(resource_captured_2.get(a as usize).unwrap().to_owned())
707                })
708                .collect();
709
710            assert_eq!(results, results_expected)
711        }
712    }
713
714    ///
715    /// par_iter_0 -> owned by -> par_iter_1 -> owned by -> par_iter_2
716    ///
717    /// par_iter_2 exception at height 1000,
718    ///
719    /// the final output should contain 0..1000;
720    ///
721    #[test]
722    fn par_iter_chained_exception_1() {
723        for _ in 0..100 {
724            let resource_captured: Vec<i32> = (0..10000).collect();
725            let resource_captured_1 = resource_captured.clone();
726            let resource_captured_2 = resource_captured.clone();
727            let results_expected: Vec<i32> = (0..1000).collect();
728
729            let results: Vec<i32> = (0..resource_captured.len())
730                .into_par_iter_sync(move |a| Ok(resource_captured.get(a).unwrap().to_owned()))
731                .into_par_iter_sync(move |a| {
732                    Ok(resource_captured_2.get(a as usize).unwrap().to_owned())
733                })
734                .into_par_iter_sync(move |a| error_at_1000(&resource_captured_1, a))
735                .collect();
736
737            assert_eq!(results, results_expected)
738        }
739    }
740
741    ///
742    /// par_iter_0 -> owned by -> par_iter_1 -> owned by -> par_iter_2
743    ///
744    /// par_iter_0 exception at height 1000,
745    ///
746    /// the final output should contain 0..1000;
747    ///
748    #[test]
749    fn par_iter_chained_exception_2() {
750        for _ in 0..100 {
751            let resource_captured: Vec<i32> = (0..10000).collect();
752            let resource_captured_1 = resource_captured.clone();
753            let resource_captured_2 = resource_captured.clone();
754            let results_expected: Vec<i32> = (0..1000).collect();
755
756            let results: Vec<i32> = (0..resource_captured.len())
757                .into_par_iter_sync(move |a| error_at_1000(&resource_captured_1, a as i32))
758                .into_par_iter_sync(move |a| {
759                    Ok(resource_captured.get(a as usize).unwrap().to_owned())
760                })
761                .into_par_iter_sync(move |a| {
762                    Ok(resource_captured_2.get(a as usize).unwrap().to_owned())
763                })
764                .collect();
765
766            assert_eq!(results, results_expected)
767        }
768    }
769
770    #[test]
771    fn test_break() {
772        for _ in 0..100 {
773            let mut count = 0;
774            for i in (0..20000).into_par_iter_sync(|a| Ok(a)) {
775                if i == 10000 {
776                    break;
777                }
778                count += 1;
779            }
780            assert_eq!(count, 10000)
781        }
782    }
783
784    #[test]
785    fn test_large_iter() {
786        for _ in 0..10 {
787            let mut count = 0;
788            for i in (0..1_000_000).into_par_iter_sync(|i| Ok(i)) {
789                assert_eq!(i, count);
790                count += 1;
791            }
792            assert_eq!(count, 1_000_000)
793        }
794    }
795
796    #[cfg(feature = "bench")]
797    #[bench]
798    fn bench_into_par_iter_sync(b: &mut Bencher) {
799        b.iter(|| {
800            (0..1_000_000)
801                .into_par_iter_sync(|a| Ok(a))
802                .for_each(|_| {})
803        });
804    }
805}