Lock-free Sequential Parallel Iterator
par_iter_sync
Crate like rayon
do not offer synchronization mechanism.
This crate provides easy mixture of parallelism and synchronization.
Consider the case where multiple threads share a cache which can be read only after prior tasks have written to it (e.g., reads of task 4 depends on writes of task 1-4).
Using IntoParallelIteratorSync
trait
// in concurrency: task1 write | task2 write | task3 write | task4 write
// \_____________\_____________\_____________\
// task4 read depends on task 1-4 write \___________
// \
// in concurrency: | task2 read | task3 read | task4 read
use IntoParallelIteratorSync;
use ;
use HashSet;
// there are 100 tasks
let tasks = 0..100;
// an in-memory cache for integers
let cache: = new;
let cache_clone = cache.clone;
// iterate through tasks
tasks.into_par_iter_sync.into_par_iter_sync.for_each;
Usage Caveat
This crate is designed to clone all resources captured by the closure
for each thread. To prevent unintended RAM usage, you may wrap
large data structure using Arc
(especially vectors of Clone
objects).
Sequential Consistency
The output order is guaranteed to be the same as the upstream iterator, but the execution order is not sequential.
Overhead Benchmark
Platform: Macbook Air (2015 Late) 8 GB RAM, Intel Core i5, 1.6GHZ (2 Core).
Result
One million (1,000,000) empty iteration for each run.
test iter_async::test_par_iter_async::bench_into_par_iter_async ... bench: 120,003,398 ns/iter (+/- 52,401,527)
test test_par_iter::bench_into_par_iter_sync ... bench: 98,472,767 ns/iter (+/- 9,901,593)
Result:
- Async iterator overhead
120,003,398 / 1,000,000 = 120 ns (+/- 52 ns)
. - Sync iterator overhead
98,472,767 / 1,000,000 = 98 ns (+/- 10 ns)
.
Bench Programs
iter_async
iter_sync
Examples
Mix Syncing and Parallelism By Chaining
use IntoParallelIteratorSync;
.into_par_iter_sync.into_par_iter_sync.into_par_iter_sync; // <- sync order
Use std::iter::IntoIterator
interface
use IntoParallelIteratorSync;
let mut count = 0;
// for loop
for i in .into_par_iter_sync
// sum
let sum: i32 = .into_par_iter_sync.sum;
// take and collect
let results: = .into_par_iter_sync.take.collect;
assert_eq!;
assert_eq!
Closure Captures Variables
Variables captured are cloned to each thread automatically.
use IntoParallelIteratorSync;
use Arc;
// use `Arc` to save RAM
let resource_captured = new;
let len = resource_captured.len;
let result_iter = .into_par_iter_sync;
// the result is produced in sequential order
let collected: = result_iter.collect;
assert_eq!
Fast Fail During Exception
The iterator stops once the inner function returns an Err
.
use IntoParallelIteratorSync;
use Arc;
use warn;
/// this function returns `Err` when it reads 1000
let results: = .into_par_iter_sync.into_par_iter_sync.into_par_iter_sync.collect;
let expected: = .collect;
assert_eq!
You may choose to skip error
If you do not want to stop on Err
, this is a workaround.
use IntoParallelIteratorSync;
use Arc;
let results: = .into_par_iter_sync.collect;
assert_eq!
Implementation Note
Output Buffering
- Each worker use a synced single-producer mpsc channel to buffer outputs. So, when a thread is waiting for its turn to get polled, it does not get blocked. The channel size is hard-coded to 10 for each thread.
- The number of threads equals to the number of logical cores.
Synchronization and Exception Handling
- When each thread fetch a task, it registers its thread ID and task ID into a registry.
- When
next()
is called, the consumer fetch from the task registry the next thread ID. next()
returns None if there is no more task or if some Error occurs.