par_iter_sync/lib.rs
1#![cfg_attr(feature = "bench", feature(test))]
2//!
3//! # par_iter_sync: Parallel Iterator With Sequential Output
4//!
5//! Crate like `rayon` do not offer synchronization mechanism.
6//! This crate provides easy mixture of parallelism and synchronization.
7//! Execute tasks in concurrency with synchronization at any steps.
8//!
9//! Consider the case where multiple threads share a cache which can be read
10//! only after prior tasks have written to it (e.g., reads of task 4 depends
11//! on writes of task 1-4).
12//!
13//! Using `IntoParallelIteratorSync` trait
14//!```
15//! // in concurrency: task1 write | task2 write | task3 write | task4 write
16//! // \_____________\_____________\_____________\
17//! // task4 read depends on task 1-4 write \___________
18//! // \
19//! // in concurrency: | task2 read | task3 read | task4 read
20//!
21//! use par_iter_sync::IntoParallelIteratorSync;
22//! use std::sync::{Arc, Mutex};
23//! use std::collections::HashSet;
24//!
25//! // there are 100 tasks
26//! let tasks = 0..100;
27//!
28//! // an in-memory cache for integers
29//! let cache: Arc<Mutex<HashSet<i32>>> = Arc::new(Mutex::new(HashSet::new()));
30//! let cache_clone = cache.clone();
31//!
32//! // iterate through tasks
33//! tasks.into_par_iter_sync(move |task_number| {
34//!
35//! // writes cache (write the integer in cache), in parallel
36//! cache.lock().unwrap().insert(task_number);
37//! // return the task number to the next iterator
38//! Ok(task_number)
39//!
40//! }).into_par_iter_sync(move |task_number| { // <- synced to sequential order
41//!
42//! // reads
43//! assert!(cache_clone.lock().unwrap().contains(&task_number));
44//! Ok(())
45//! // append a for each to actually run the whole chain
46//! }).for_each(|_| ());
47//!```
48//!
49//! ## Usage Caveat
50//! This crate is designed to clone all resources captured by the closure
51//! for each thread. To prevent unintended RAM usage, you may wrap
52//! large data structure using `Arc`.
53//!
54//! ## Sequential Consistency
55//! The output order is guaranteed to be the same as the upstream iterator,
56//! but the execution order is not sequential.
57//!
58//! ## Examples
59//!
60//! ### Mix Syncing and Parallelism By Chaining
61//! ```
62//! use par_iter_sync::IntoParallelIteratorSync;
63//!
64//! (0..100).into_par_iter_sync(|i| {
65//! Ok(i) // <~ async execution
66//! }).into_par_iter_sync(|i| { // <- sync order
67//! Ok(i) // <~async execution
68//! }).into_par_iter_sync(|i| { // <- sync order
69//! Ok(i) // <~async execution
70//! }).for_each(|x| ()); // <- sync order
71//! ```
72//!
73//! ### Use `std::iter::IntoIterator` interface
74//! ```
75//! use par_iter_sync::IntoParallelIteratorSync;
76//!
77//! let mut count = 0;
78//!
79//! // for loop
80//! for i in (0..100).into_par_iter_sync(|i| Ok(i)) {
81//! assert_eq!(i, count);
82//! count += 1;
83//! }
84//!
85//! // sum
86//! let sum: i32 = (1..=100).into_par_iter_sync(|i| Ok(i)).sum();
87//!
88//! // skip, take and collect
89//! let results: Vec<i32> = (0..10)
90//! .into_par_iter_sync(|i| Ok(i))
91//! .skip(1)
92//! .take(5)
93//! .collect();
94//!
95//! assert_eq!(sum, 5050);
96//! assert_eq!(results, vec![1, 2, 3, 4, 5])
97//! ```
98//!
99//! ### Bridge To Rayon
100//! ```
101//! use par_iter_sync::IntoParallelIteratorSync;
102//! use rayon::prelude::*;
103//!
104//! // sum with rayon
105//! let sum: i32 = (1..=100)
106//! .into_par_iter_sync(|i| Ok(i))
107//! .par_bridge() // <- switch to rayon
108//! .into_par_iter()
109//! .sum();
110//!
111//! assert_eq!(sum, 5050);
112//! ```
113//!
114//! ### Closure Captures Variables
115//! Variables captured are cloned to each thread automatically.
116//! ```
117//! use par_iter_sync::IntoParallelIteratorSync;
118//! use std::sync::Arc;
119//!
120//! // use `Arc` to save RAM
121//! let resource_captured = Arc::new(vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3]);
122//! let len = resource_captured.len();
123//!
124//! let result_iter = (0..len).into_par_iter_sync(move |i| {
125//! // `resource_captured` is moved into the closure
126//! // and cloned to worker threads.
127//! let read_from_resource = resource_captured.get(i).unwrap();
128//! Ok(*read_from_resource)
129//! });
130//!
131//! // the result is produced in sequential order
132//! let collected: Vec<i32> = result_iter.collect();
133//! assert_eq!(collected, vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3])
134//! ```
135//!
136//! ### Fast Fail During Exception
137//! The iterator stops once the inner function returns an `Err`.
138//! ```
139//! use par_iter_sync::IntoParallelIteratorSync;
140//! use std::sync::Arc;
141//! use log::warn;
142//!
143//! /// this function returns `Err` when it reads 1000
144//! fn error_at_1000(n: i32) -> Result<i32, ()> {
145//! if n == 1000 {
146//! // you may log this error
147//! warn!("Some Error Occurs");
148//! Err(())
149//! } else {
150//! Ok(n)
151//! }
152//! }
153//!
154//! let results: Vec<i32> = (0..10000).into_par_iter_sync(move |a| {
155//! Ok(a)
156//! }).into_par_iter_sync(move |a| {
157//! // error at 1000
158//! error_at_1000(a)
159//! }).into_par_iter_sync(move |a| {
160//! Ok(a)
161//! }).collect();
162//!
163//! let expected: Vec<i32> = (0..1000).collect();
164//! assert_eq!(results, expected)
165//! ```
166//!
167//! #### You may choose to skip error
168//! If you do not want to stop on `Err`, this is a workaround.
169//! ```
170//! use par_iter_sync::IntoParallelIteratorSync;
171//! use std::sync::Arc;
172//!
173//! let results: Vec<Result<i32, ()>> = (0..5).into_par_iter_sync(move |n| {
174//! // error at 3, but skip
175//! if n == 3 {
176//! Ok(Err(()))
177//! } else {
178//! Ok(Ok(n))
179//! }
180//! }).collect();
181//!
182//! assert_eq!(results, vec![Ok(0), Ok(1), Ok(2), Err(()), Ok(4)])
183//! ```
184//! ## Overhead Benchmark
185//! Platform: Macbook Air (2015 Late) 8 GB RAM, Intel Core i5, 1.6GHZ (2 Core).
186//!
187//! ### Result
188//! One million (1,000,000) empty iteration for each run.
189//! ```text
190//! test iter_async::test_par_iter_async::bench_into_par_iter_async
191//! ... bench: 110,277,577 ns/iter (+/- 28,510,054)
192//!
193//! test test_par_iter::bench_into_par_iter_sync
194//! ... bench: 121,063,787 ns/iter (+/- 103,787,056)
195//! ```
196//!
197//! Result:
198//! - Async iterator overhead `110 ns (+/- 28 ns)`.
199//! - Sync iterator overhead `121 ns (+/- 103 ns)`.
200//!
201//! ## Implementation Note
202//!
203//! ### Output Buffering
204//! - Each worker use a synced single-producer mpsc channel to buffer outputs.
205//! So, when a thread is waiting for its turn to get polled, it does not
206//! get blocked. The channel size is hard-coded to 100 for each thread.
207//! - The number of threads equals to the number of logical cores.
208//!
209//! ### Synchronization and Exception Handling
210//! - When each thread fetch a task, it registers its thread ID and task ID into a registry.
211//! - When `next()` is called, the consumer fetch from the task registry the next thread ID.
212//! - `next()` returns None if there is no more task or if some Error occurs.
213//!
214mod iter_async;
215
216use crossbeam::channel::{bounded, Receiver};
217use crossbeam::sync::{Parker, Unparker};
218use crossbeam::utils::Backoff;
219pub use iter_async::*;
220use num_cpus;
221use std::ops::Deref;
222use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
223use std::sync::Arc;
224use std::thread;
225use std::thread::JoinHandle;
226use std::time::Duration;
227
228const MAX_SIZE_FOR_THREAD: usize = 128;
229const BUFFER_SIZE: usize = 64;
230
231///
232/// lock-free sequential parallel iterator
233///
234pub trait IntoParallelIteratorSync<R, T, TL, F>
235 where
236 F: Send + Clone + 'static + Fn(T) -> Result<R, ()>,
237 T: Send + 'static,
238 TL: Send + IntoIterator<Item = T> + 'static,
239 <TL as IntoIterator>::IntoIter: Send + 'static,
240 R: Send,
241{
242 ///
243 /// # Usage
244 ///
245 /// This method executes `func` in parallel.
246 ///
247 /// The `func` is a closure that takes the returned elements
248 /// from the upstream iterator as argument and returns
249 /// some `Result(R, ())`.
250 ///
251 /// This iterator would return type `R` when it gets `Ok(R)`
252 /// and stops when it gets an `Err(())`.
253 ///
254 /// ## Example
255 ///
256 /// ```
257 /// use par_iter_sync::IntoParallelIteratorSync;
258 ///
259 /// let mut count = 0;
260 ///
261 /// // for loop
262 /// for i in (0..100).into_par_iter_sync(|i| Ok(i)) {
263 /// assert_eq!(i, count);
264 /// count += 1;
265 /// }
266 ///
267 /// // sum
268 /// let sum: i32 = (1..=100).into_par_iter_sync(|i| Ok(i)).sum();
269 ///
270 /// // take and collect
271 /// let results: Vec<i32> = (0..10).into_par_iter_sync(|i| Ok(i)).take(5).collect();
272 ///
273 /// assert_eq!(sum, 5050);
274 /// assert_eq!(results, vec![0, 1, 2, 3, 4])
275 /// ```
276 ///
277 /// If the result is not polled using `next()`,
278 /// the parallel execution will stop and wait.
279 ///
280 /// ## Sequential Consistency
281 /// The output order is guaranteed to be the same as the provided iterator.
282 ///
283 /// See [crate] module-level doc.
284 ///
285 fn into_par_iter_sync(self, func: F) -> ParIterSync<R>;
286}
287
288impl<R, T, TL, F> IntoParallelIteratorSync<R, T, TL, F> for TL
289 where
290 F: Send + Clone + 'static + Fn(T) -> Result<R, ()>,
291 T: Send + 'static,
292 TL: Send + IntoIterator<Item = T> + 'static,
293 <TL as IntoIterator>::IntoIter: Send + 'static,
294 R: Send + 'static,
295{
296 fn into_par_iter_sync(self, func: F) -> ParIterSync<R> {
297 ParIterSync::new(self, func)
298 }
299}
300
301///
302/// A lookup table to register and look up corresponding thread id for a task.
303///
304/// `-1` represents the task ID is not yet registered
305/// or the task does not exist.
306///
307struct TaskRegistry {
308 // vector of thread ID
309 inner: Arc<Vec<AtomicIsize>>,
310 parkers: Vec<Parker>,
311}
312
313impl Deref for TaskRegistry {
314 type Target = Vec<AtomicIsize>;
315
316 fn deref(&self) -> &Self::Target {
317 self.inner.deref()
318 }
319}
320
321/// Write client for Task registry
322struct TaskRegistryWrite {
323 inner: Arc<Vec<AtomicIsize>>,
324 unparkers: Vec<Unparker>
325}
326
327impl Deref for TaskRegistryWrite {
328 type Target = Vec<AtomicIsize>;
329
330 fn deref(&self) -> &Self::Target {
331 self.inner.deref()
332 }
333}
334
335impl Drop for TaskRegistryWrite {
336 fn drop(&mut self) {
337 for unparker in &self.unparkers {
338 unparker.unpark();
339 }
340 }
341}
342
343impl TaskRegistry {
344
345 ///
346 /// Initialize the registry with `-1` to represent an empty registry
347 ///
348 /// The `size` must be (just) big enough to ensure that no key collision would
349 /// possibly occur.
350 ///
351 fn new(size: usize) -> TaskRegistry {
352 TaskRegistry {
353 inner: Arc::new((0..size).map(|_| AtomicIsize::new(-1)).collect()),
354 parkers: (0..size).map(|_| Parker::new()).collect()
355 }
356 }
357
358 ///
359 /// Look up a thread_number of a task and set that slot to `-1`.
360 ///
361 /// This function blocks to wait for a task to be registered,
362 /// unless all worker threads have stopped so that no more new
363 /// task can possibly be registered.
364 ///
365 /// It should block very rarely since task dispatcher is not blocking,
366 /// and is registered immediately after fetching in `get_task`.
367 ///
368 /// returns `None` only when all worker threads have stopped
369 ///
370 #[inline(always)]
371 pub(crate) fn lookup(&self, task_id: usize) -> Option<isize> {
372 let registry_len = self.len();
373 let pos = TaskRegistry::id_to_key(task_id, registry_len);
374 let backoff = Backoff::new();
375 loop {
376 // check if worker threads are still active
377 if !self.is_disconnected() {
378 let thread_num = self[pos].swap(-1, Ordering::SeqCst);
379 // if `-1` is read, would continue in the loop
380 if thread_num >= 0 {
381 return Some(thread_num);
382 } else {
383 // snooze
384 if backoff.is_completed() {
385 // park but no more than 500 millis
386 self.parkers[pos].park_timeout(Duration::from_millis(500));
387 } else {
388 backoff.snooze();
389 }
390 }
391 // if worker threads are no more active, might return `None`
392 } else {
393 let thread_num = self[pos].swap(-1, Ordering::SeqCst);
394 return if thread_num >= 0 {
395 Some(thread_num)
396 } else {
397 None
398 };
399 }
400 }
401 }
402
403 /// key of task ID in registry
404 #[inline(always)]
405 fn id_to_key(task_id: usize, registry_len: usize) -> usize {
406 task_id % registry_len
407 }
408
409 fn to_write(&self) -> TaskRegistryWrite {
410 TaskRegistryWrite {
411 inner: self.inner.clone(),
412 unparkers: self.parkers.iter().map(|p| p.unparker().clone()).collect(),
413 }
414 }
415
416 #[inline(always)]
417 fn is_disconnected(&self) -> bool {
418 Arc::strong_count(&self.inner) == 1
419 }
420}
421
422impl TaskRegistryWrite {
423 ///
424 /// When a worker fetches a new task, it calls `register` to tell
425 /// the user thread its own thread ID.
426 ///
427 /// register the worker thread number of a task
428 ///
429 #[inline(always)]
430 pub(crate) fn register(&self, task_id: usize, thread_id: isize) {
431 let registry_len = self.len();
432 let key = TaskRegistry::id_to_key(task_id, registry_len);
433 // never overwrite
434 debug_assert_eq!(self[key].load(Ordering::SeqCst), -1);
435 self[key].store(thread_id, Ordering::SeqCst);
436 self.unparkers[key].unpark();
437 }
438}
439
440///
441/// implementation of lock-free sequential parallel iterator
442///
443pub struct ParIterSync<R> {
444
445 /// Result receivers, one for each worker thread
446 output_receivers: Vec<Receiver<R>>,
447
448 /// Lookup table to register worker thread number corresponding to tasks
449 task_registry: TaskRegistry,
450
451 /// handles to join worker threads
452 worker_thread: Option<Vec<JoinHandle<()>>>,
453
454 /// atomic flag to stop workers from fetching new tasks
455 iterator_stopper: Arc<AtomicBool>,
456
457 /// if this is `true`, it must guarantee that all worker threads have stopped
458 is_killed: bool,
459
460 /// current task id
461 current: usize,
462}
463
464impl<R> ParIterSync<R>
465 where
466 R: Send + 'static,
467{
468 ///
469 /// the worker threads are dispatched in this `new` constructor!
470 ///
471 pub fn new<T, TL, F>(tasks: TL, task_executor: F) -> Self
472 where
473 F: Send + Clone + 'static + Fn(T) -> Result<R, ()>,
474 T: Send + 'static,
475 TL: Send + IntoIterator<Item = T> + 'static,
476 <TL as IntoIterator>::IntoIter: Send + 'static,
477 {
478 let cpus = num_cpus::get();
479 let iterator_stopper = Arc::new(AtomicBool::new(false));
480
481 // `(1 + MAX_SIZE_FOR_THREAD)` * cpus as there might be one more fetching after send blocking
482 let task_registry: TaskRegistry = TaskRegistry::new((1 + MAX_SIZE_FOR_THREAD) * cpus);
483
484 // this thread dispatches tasks to worker threads
485 let (dispatcher, task_receiver) = bounded(BUFFER_SIZE);
486 let sender_thread = thread::spawn(move || {
487 for (task_id, t) in tasks.into_iter().enumerate() {
488 if dispatcher.send((t, task_id)).is_err() {
489 break;
490 }
491 }
492 });
493
494 // spawn worker threads
495 let mut handles = Vec::with_capacity(cpus + 1);
496 let mut output_receivers = Vec::with_capacity(cpus);
497 for thread_number in 0..cpus as isize {
498 let (output_sender, output_receiver) = bounded(MAX_SIZE_FOR_THREAD);
499 let task_receiver = task_receiver.clone();
500 let task_registry = task_registry.to_write();
501 let iterator_stopper = iterator_stopper.clone();
502 let task_executor = task_executor.clone();
503
504 // workers
505 let handle = thread::spawn(move || {
506 loop {
507 // check stopper flag
508 if iterator_stopper.load(Ordering::SeqCst) {
509 break;
510 }
511 // fetch task and register thread number
512 match get_task(&task_receiver, &task_registry, thread_number) {
513 // stop if no more task
514 None => break,
515 Some(task) => match task_executor(task) {
516 Ok(blk) => {
517 // send output
518 output_sender.send(blk).unwrap();
519 }
520 Err(_) => {
521 // stop other thread when Error is returned
522 iterator_stopper.fetch_or(true, Ordering::SeqCst);
523 break;
524 }
525 },
526 }
527 }
528 });
529 output_receivers.push(output_receiver);
530 handles.push(handle);
531 }
532 handles.push(sender_thread);
533
534 ParIterSync {
535 output_receivers,
536 task_registry,
537 worker_thread: Some(handles),
538 iterator_stopper,
539 is_killed: false,
540 current: 0,
541 }
542 }
543}
544
545impl<R> ParIterSync<R> {
546 ///
547 /// - stop workers from fetching new tasks
548 /// - pull one result from each worker to prevent `send` blocking
549 ///
550 pub fn kill(&mut self) {
551 if !self.is_killed {
552 // stop threads from getting new tasks
553 self.iterator_stopper.fetch_or(true, Ordering::SeqCst);
554 // receive one for each channel to prevent blocking
555 for receiver in &self.output_receivers {
556 let _ = receiver.try_recv();
557 }
558 // loop break only when task_order is dropped (all workers have stopped)
559 self.is_killed = true;
560 }
561 }
562}
563
564///
565/// A helper function to receive task from task receiver.
566/// - it also registers thread ID into task registry immediately.
567///
568/// It guarantees to return None if and only if there is no more new task.
569///
570#[inline(always)]
571fn get_task<T>(
572 tasks: &Receiver<(T, usize)>,
573 registry: &TaskRegistryWrite,
574 thread_number: isize,
575) -> Option<T>
576 where
577 T: Send,
578{
579 // lock task list
580 // let mut task = tasks.lock().unwrap();
581 // registry task stealing
582 match tasks.recv() {
583 Ok((task, task_id)) => {
584 registry.register(task_id, thread_number);
585 Some(task)
586 }
587 Err(_) => None,
588 }
589}
590
591impl<R> Iterator for ParIterSync<R> {
592 type Item = R;
593
594 ///
595 /// The output API, use next to fetch result from the iterator.
596 ///
597 fn next(&mut self) -> Option<Self::Item> {
598 if self.is_killed {
599 return None;
600 }
601
602 // look up which thread to fetch result from
603 match self.task_registry.lookup(self.current) {
604 // no more task
605 None => None,
606 Some(thread_num) => {
607 match self.output_receivers[thread_num as usize].recv() {
608 Ok(block) => {
609 self.current += 1;
610 Some(block)
611 }
612 // some worker have stopped
613 Err(_) => {
614 self.kill();
615 None
616 }
617 }
618 }
619 }
620 }
621}
622
623impl<R> ParIterSync<R> {
624 ///
625 /// Join worker threads. This can be only called once.
626 /// Otherwise it results in panic.
627 /// This is automatically called in `join()`
628 ///
629 fn join(&mut self) {
630 for handle in self.worker_thread.take().unwrap() {
631 handle.join().unwrap()
632 }
633 }
634}
635
636impl<R> Drop for ParIterSync<R> {
637 ///
638 /// Stop worker threads, join the threads.
639 ///
640 fn drop(&mut self) {
641 self.kill();
642 self.join();
643 }
644}
645
646#[cfg(test)]
647mod test_par_iter {
648 #[cfg(feature = "bench")]
649 extern crate test;
650 use crate::IntoParallelIteratorSync;
651 #[cfg(feature = "bench")]
652 use test::Bencher;
653
654 fn error_at_1000(test_vec: &Vec<i32>, a: i32) -> Result<i32, ()> {
655 let n = test_vec.get(a as usize).unwrap().to_owned();
656 if n == 1000 {
657 Err(())
658 } else {
659 Ok(n)
660 }
661 }
662
663 #[test]
664 fn par_iter_test_exception() {
665 for _ in 0..100 {
666 let resource_captured = vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3];
667 let results_expected = vec![3, 1, 4, 1];
668
669 // if Err(()) is returned, the iterator stops early
670 let results: Vec<i32> = (0..resource_captured.len())
671 .into_par_iter_sync(move |a| {
672 let n = resource_captured.get(a).unwrap().to_owned();
673 if n == 5 {
674 Err(())
675 } else {
676 Ok(n)
677 }
678 })
679 .collect();
680
681 assert_eq!(results, results_expected)
682 }
683 }
684
685 ///
686 /// The iterators can be chained.
687 ///
688 /// par_iter_0 -> owned by -> par_iter_1 -> owned by -> par_iter_2
689 ///
690 /// par_iter_1 exception at height 1000,
691 ///
692 /// the final output should contain 0..1000;
693 ///
694 #[test]
695 fn par_iter_chained_exception() {
696 for _ in 0..100 {
697 let resource_captured: Vec<i32> = (0..10000).collect();
698 let resource_captured_1 = resource_captured.clone();
699 let resource_captured_2 = resource_captured.clone();
700 let results_expected: Vec<i32> = (0..1000).collect();
701
702 let results: Vec<i32> = (0..resource_captured.len())
703 .into_par_iter_sync(move |a| Ok(resource_captured.get(a).unwrap().to_owned()))
704 .into_par_iter_sync(move |a| error_at_1000(&resource_captured_1, a))
705 .into_par_iter_sync(move |a| {
706 Ok(resource_captured_2.get(a as usize).unwrap().to_owned())
707 })
708 .collect();
709
710 assert_eq!(results, results_expected)
711 }
712 }
713
714 ///
715 /// par_iter_0 -> owned by -> par_iter_1 -> owned by -> par_iter_2
716 ///
717 /// par_iter_2 exception at height 1000,
718 ///
719 /// the final output should contain 0..1000;
720 ///
721 #[test]
722 fn par_iter_chained_exception_1() {
723 for _ in 0..100 {
724 let resource_captured: Vec<i32> = (0..10000).collect();
725 let resource_captured_1 = resource_captured.clone();
726 let resource_captured_2 = resource_captured.clone();
727 let results_expected: Vec<i32> = (0..1000).collect();
728
729 let results: Vec<i32> = (0..resource_captured.len())
730 .into_par_iter_sync(move |a| Ok(resource_captured.get(a).unwrap().to_owned()))
731 .into_par_iter_sync(move |a| {
732 Ok(resource_captured_2.get(a as usize).unwrap().to_owned())
733 })
734 .into_par_iter_sync(move |a| error_at_1000(&resource_captured_1, a))
735 .collect();
736
737 assert_eq!(results, results_expected)
738 }
739 }
740
741 ///
742 /// par_iter_0 -> owned by -> par_iter_1 -> owned by -> par_iter_2
743 ///
744 /// par_iter_0 exception at height 1000,
745 ///
746 /// the final output should contain 0..1000;
747 ///
748 #[test]
749 fn par_iter_chained_exception_2() {
750 for _ in 0..100 {
751 let resource_captured: Vec<i32> = (0..10000).collect();
752 let resource_captured_1 = resource_captured.clone();
753 let resource_captured_2 = resource_captured.clone();
754 let results_expected: Vec<i32> = (0..1000).collect();
755
756 let results: Vec<i32> = (0..resource_captured.len())
757 .into_par_iter_sync(move |a| error_at_1000(&resource_captured_1, a as i32))
758 .into_par_iter_sync(move |a| {
759 Ok(resource_captured.get(a as usize).unwrap().to_owned())
760 })
761 .into_par_iter_sync(move |a| {
762 Ok(resource_captured_2.get(a as usize).unwrap().to_owned())
763 })
764 .collect();
765
766 assert_eq!(results, results_expected)
767 }
768 }
769
770 #[test]
771 fn test_break() {
772 for _ in 0..100 {
773 let mut count = 0;
774 for i in (0..20000).into_par_iter_sync(|a| Ok(a)) {
775 if i == 10000 {
776 break;
777 }
778 count += 1;
779 }
780 assert_eq!(count, 10000)
781 }
782 }
783
784 #[test]
785 fn test_large_iter() {
786 for _ in 0..10 {
787 let mut count = 0;
788 for i in (0..1_000_000).into_par_iter_sync(|i| Ok(i)) {
789 assert_eq!(i, count);
790 count += 1;
791 }
792 assert_eq!(count, 1_000_000)
793 }
794 }
795
796 #[cfg(feature = "bench")]
797 #[bench]
798 fn bench_into_par_iter_sync(b: &mut Bencher) {
799 b.iter(|| {
800 (0..1_000_000)
801 .into_par_iter_sync(|a| Ok(a))
802 .for_each(|_| {})
803 });
804 }
805}