pub trait IteratorExt {
Show 16 methods
// Provided methods
fn parallel_map<F, O>(self, f: F) -> ParallelMap<Self, O> ⓘ
where Self: Sized + Iterator + 'static,
F: 'static + Send + Clone + FnMut(Self::Item) -> O,
Self::Item: Send + 'static,
O: Send + 'static { ... }
fn parallel_map_custom<F, O, OF>(self, of: OF, f: F) -> ParallelMap<Self, O> ⓘ
where Self: Sized + Iterator + 'static,
F: 'static + Send + Clone + FnMut(Self::Item) -> O,
Self::Item: Send + 'static,
O: Send + 'static,
OF: FnOnce(ParallelMapBuilder<Self>) -> ParallelMapBuilder<Self> { ... }
fn parallel_map_scoped<'env, 'scope, F, O>(
self,
scope: &'scope Scope<'env>,
f: F,
) -> ParallelMap<Self, O> ⓘ
where Self: Sized + Iterator + 'env,
F: 'env + Send + Clone + FnMut(Self::Item) -> O,
Self::Item: Send + 'env,
O: Send + 'env { ... }
fn parallel_map_scoped_custom<'env, 'scope, F, O, OF>(
self,
scope: &'scope Scope<'env>,
of: OF,
f: F,
) -> ParallelMap<Self, O> ⓘ
where Self: Sized + Iterator + 'env,
F: 'env + Send + Clone + FnMut(Self::Item) -> O,
Self::Item: Send + 'env,
O: Send + 'env,
OF: FnOnce(ParallelMapBuilder<Self>) -> ParallelMapBuilder<Self> { ... }
fn parallel_filter<F>(self, f: F) -> ParallelFilter<Self> ⓘ
where Self: Sized + Iterator + 'static,
F: 'static + Send + Clone + FnMut(&Self::Item) -> bool,
Self::Item: Send + 'static { ... }
fn parallel_filter_custom<F, OF>(self, of: OF, f: F) -> ParallelFilter<Self> ⓘ
where Self: Sized + Iterator + 'static,
F: 'static + Send + Clone + FnMut(&Self::Item) -> bool,
Self::Item: Send + 'static,
OF: FnOnce(ParallelFilterBuilder<Self>) -> ParallelFilterBuilder<Self> { ... }
fn parallel_filter_scoped<'env, 'scope, F>(
self,
scope: &'scope Scope<'env>,
f: F,
) -> ParallelFilter<Self> ⓘ
where Self: Sized + Iterator + 'env,
F: 'env + Send + Clone + FnMut(&Self::Item) -> bool,
Self::Item: Send + 'env { ... }
fn parallel_filter_scoped_custom<'env, 'scope, F, OF>(
self,
scope: &'scope Scope<'env>,
of: OF,
f: F,
) -> ParallelFilter<Self> ⓘ
where Self: Sized + Iterator + 'env,
F: 'env + Send + Clone + FnMut(&Self::Item) -> bool,
Self::Item: Send + 'env,
OF: FnOnce(ParallelFilterBuilder<Self>) -> ParallelFilterBuilder<Self> { ... }
fn readahead(self) -> Readahead<Self> ⓘ
where Self: Iterator + Send + 'static + Sized,
Self::Item: Send + 'static { ... }
fn readahead_custom<OF>(self, of: OF) -> Readahead<Self> ⓘ
where Self: Iterator + Sized + Send + 'static,
Self::Item: Send + 'static,
OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self> { ... }
fn readahead_scoped<'env, 'scope>(
self,
scope: &'scope Scope<'env>,
) -> Readahead<Self> ⓘ
where Self: Sized + Send + Iterator + 'scope + 'env,
Self::Item: Send + 'env + 'scope { ... }
fn readahead_scoped_custom<'env, 'scope, OF>(
self,
scope: &'scope Scope<'env>,
of: OF,
) -> Readahead<Self> ⓘ
where Self: Sized + Send + Iterator + 'scope + 'env,
Self::Item: Send + 'env + 'scope,
OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self> { ... }
fn profile_egress<P>(self, profiler: P) -> ProfileEgress<Self, P> ⓘ
where P: Profiler,
Self: Iterator + Sized { ... }
fn profile_ingress<P>(self, profiler: P) -> ProfileIngress<Self, P> ⓘ
where P: Profiler,
Self: Iterator + Sized { ... }
fn readahead_profiled<TxP, RxP>(
self,
tx_profiler: TxP,
rx_profiler: RxP,
) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP> ⓘ
where TxP: Profiler + Send + 'static,
RxP: Profiler,
Self: Iterator + Sized + Send + 'static,
Self::Item: Send + 'static { ... }
fn readahead_scoped_profiled<'env, 'scope, TxP, RxP>(
self,
scope: &'scope Scope<'env>,
tx_profiler: TxP,
rx_profiler: RxP,
) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP> ⓘ
where TxP: Profiler + Send + 'static,
RxP: Profiler,
Self: Sized + Send + Iterator + 'scope + 'env,
Self::Item: Send + 'env + 'scope { ... }
}Expand description
Extension trait for std::iter::Iterator bringing parallel operations
§TODO
parallel_for_eachparallel_flat_map- possibly others
PRs welcome
Provided Methods§
Sourcefn parallel_map<F, O>(self, f: F) -> ParallelMap<Self, O> ⓘ
fn parallel_map<F, O>(self, f: F) -> ParallelMap<Self, O> ⓘ
Run map function in parallel on multiple threads
Results will be returned in order.
No items will be pulled until first time ParallelMap is pulled for elements with ParallelMap::next.
In that respect, ParallelMap behaves like every other iterator and is lazy.
Sourcefn parallel_map_custom<F, O, OF>(self, of: OF, f: F) -> ParallelMap<Self, O> ⓘ
fn parallel_map_custom<F, O, OF>(self, of: OF, f: F) -> ParallelMap<Self, O> ⓘ
Sourcefn parallel_map_scoped<'env, 'scope, F, O>(
self,
scope: &'scope Scope<'env>,
f: F,
) -> ParallelMap<Self, O> ⓘ
fn parallel_map_scoped<'env, 'scope, F, O>( self, scope: &'scope Scope<'env>, f: F, ) -> ParallelMap<Self, O> ⓘ
A version of [parallel_map] supporting iterating over
borrowed values.
Sourcefn parallel_map_scoped_custom<'env, 'scope, F, O, OF>(
self,
scope: &'scope Scope<'env>,
of: OF,
f: F,
) -> ParallelMap<Self, O> ⓘ
fn parallel_map_scoped_custom<'env, 'scope, F, O, OF>( self, scope: &'scope Scope<'env>, of: OF, f: F, ) -> ParallelMap<Self, O> ⓘ
Sourcefn parallel_filter<F>(self, f: F) -> ParallelFilter<Self> ⓘ
fn parallel_filter<F>(self, f: F) -> ParallelFilter<Self> ⓘ
Run filter function in parallel on multiple threads
A wrapper around IteratorExt::parallel_map really, so it has similiar properties.
Sourcefn parallel_filter_custom<F, OF>(self, of: OF, f: F) -> ParallelFilter<Self> ⓘ
fn parallel_filter_custom<F, OF>(self, of: OF, f: F) -> ParallelFilter<Self> ⓘ
Sourcefn parallel_filter_scoped<'env, 'scope, F>(
self,
scope: &'scope Scope<'env>,
f: F,
) -> ParallelFilter<Self> ⓘ
fn parallel_filter_scoped<'env, 'scope, F>( self, scope: &'scope Scope<'env>, f: F, ) -> ParallelFilter<Self> ⓘ
Sourcefn parallel_filter_scoped_custom<'env, 'scope, F, OF>(
self,
scope: &'scope Scope<'env>,
of: OF,
f: F,
) -> ParallelFilter<Self> ⓘ
fn parallel_filter_scoped_custom<'env, 'scope, F, OF>( self, scope: &'scope Scope<'env>, of: OF, f: F, ) -> ParallelFilter<Self> ⓘ
Sourcefn readahead(self) -> Readahead<Self> ⓘ
fn readahead(self) -> Readahead<Self> ⓘ
Run the current iterator in another thread and return elements through a buffered channel.
buffer_size defines the size of the output channel connecting
current and the inner thread.
It’s a common mistake to use large channel sizes needlessly
in hopes of achieving higher performance. The only benefit
large buffer size value provides is smooting out the variance
of the inner iterator returning items. The cost - wasting memory.
In normal circumstances 0 is recommended (the default).
Examples found in repository?
55fn main() {
56 dpc_pariter::scope(|scope| {
57 (0..22)
58 .map(|i| {
59 // make producting values slow
60 std::thread::sleep(time::Duration::from_millis(10));
61 i
62 })
63 .readahead_scoped_profiled(
64 scope,
65 dpc_pariter::TotalTimeProfiler::periodically_millis(2_000, || {
66 eprintln!("Blocked on sending")
67 }),
68 dpc_pariter::TotalTimeProfiler::new(|stat| {
69 eprintln!(
70 "Sending receiving wait time: {}ms",
71 stat.total().as_millis()
72 )
73 }),
74 )
75 .for_each(|i| {
76 println!("{i}");
77 })
78 })
79 .expect("thread panicked");
80
81 (0..22)
82 .profile_egress(StderrMsgProfiler::new("sending"))
83 .readahead()
84 .profile_ingress(StderrMsgProfiler::new("receiving"))
85 .for_each(|i| {
86 println!("{i}");
87 // make consuming values slow
88 std::thread::sleep(time::Duration::from_millis(10));
89 });
90
91 dpc_pariter::scope(|scope| {
92 (0..22)
93 .map(|i| {
94 // make producting values slow
95 std::thread::sleep(time::Duration::from_millis(10));
96 i
97 })
98 .readahead_scoped_profiled(
99 scope,
100 StderrMsgProfiler::new("sending2"),
101 StderrMsgProfiler::new("receiving2"),
102 )
103 .for_each(|i| {
104 println!("{i}");
105 })
106 })
107 .expect("thread panicked");
108}fn readahead_custom<OF>(self, of: OF) -> Readahead<Self> ⓘwhere
Self: Iterator + Sized + Send + 'static,
Self::Item: Send + 'static,
OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self>,
Sourcefn readahead_scoped<'env, 'scope>(
self,
scope: &'scope Scope<'env>,
) -> Readahead<Self> ⓘ
fn readahead_scoped<'env, 'scope>( self, scope: &'scope Scope<'env>, ) -> Readahead<Self> ⓘ
Scoped version of IteratorExt::readahead
Use when you want to process in parallel items that contain borrowed references.
See scope.
fn readahead_scoped_custom<'env, 'scope, OF>(
self,
scope: &'scope Scope<'env>,
of: OF,
) -> Readahead<Self> ⓘwhere
Self: Sized + Send + Iterator + 'scope + 'env,
Self::Item: Send + 'env + 'scope,
OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self>,
Sourcefn profile_egress<P>(self, profiler: P) -> ProfileEgress<Self, P> ⓘ
fn profile_egress<P>(self, profiler: P) -> ProfileEgress<Self, P> ⓘ
Profile the time it takes downstream iterator step to consume the returned items.
See ProfileEgress and profile::Profiler.
Examples found in repository?
55fn main() {
56 dpc_pariter::scope(|scope| {
57 (0..22)
58 .map(|i| {
59 // make producting values slow
60 std::thread::sleep(time::Duration::from_millis(10));
61 i
62 })
63 .readahead_scoped_profiled(
64 scope,
65 dpc_pariter::TotalTimeProfiler::periodically_millis(2_000, || {
66 eprintln!("Blocked on sending")
67 }),
68 dpc_pariter::TotalTimeProfiler::new(|stat| {
69 eprintln!(
70 "Sending receiving wait time: {}ms",
71 stat.total().as_millis()
72 )
73 }),
74 )
75 .for_each(|i| {
76 println!("{i}");
77 })
78 })
79 .expect("thread panicked");
80
81 (0..22)
82 .profile_egress(StderrMsgProfiler::new("sending"))
83 .readahead()
84 .profile_ingress(StderrMsgProfiler::new("receiving"))
85 .for_each(|i| {
86 println!("{i}");
87 // make consuming values slow
88 std::thread::sleep(time::Duration::from_millis(10));
89 });
90
91 dpc_pariter::scope(|scope| {
92 (0..22)
93 .map(|i| {
94 // make producting values slow
95 std::thread::sleep(time::Duration::from_millis(10));
96 i
97 })
98 .readahead_scoped_profiled(
99 scope,
100 StderrMsgProfiler::new("sending2"),
101 StderrMsgProfiler::new("receiving2"),
102 )
103 .for_each(|i| {
104 println!("{i}");
105 })
106 })
107 .expect("thread panicked");
108}Sourcefn profile_ingress<P>(self, profiler: P) -> ProfileIngress<Self, P> ⓘ
fn profile_ingress<P>(self, profiler: P) -> ProfileIngress<Self, P> ⓘ
Profile the time it takes upstream iterator step to produce the returned items.
See ProfileIngress and profile::Profiler.
Examples found in repository?
55fn main() {
56 dpc_pariter::scope(|scope| {
57 (0..22)
58 .map(|i| {
59 // make producting values slow
60 std::thread::sleep(time::Duration::from_millis(10));
61 i
62 })
63 .readahead_scoped_profiled(
64 scope,
65 dpc_pariter::TotalTimeProfiler::periodically_millis(2_000, || {
66 eprintln!("Blocked on sending")
67 }),
68 dpc_pariter::TotalTimeProfiler::new(|stat| {
69 eprintln!(
70 "Sending receiving wait time: {}ms",
71 stat.total().as_millis()
72 )
73 }),
74 )
75 .for_each(|i| {
76 println!("{i}");
77 })
78 })
79 .expect("thread panicked");
80
81 (0..22)
82 .profile_egress(StderrMsgProfiler::new("sending"))
83 .readahead()
84 .profile_ingress(StderrMsgProfiler::new("receiving"))
85 .for_each(|i| {
86 println!("{i}");
87 // make consuming values slow
88 std::thread::sleep(time::Duration::from_millis(10));
89 });
90
91 dpc_pariter::scope(|scope| {
92 (0..22)
93 .map(|i| {
94 // make producting values slow
95 std::thread::sleep(time::Duration::from_millis(10));
96 i
97 })
98 .readahead_scoped_profiled(
99 scope,
100 StderrMsgProfiler::new("sending2"),
101 StderrMsgProfiler::new("receiving2"),
102 )
103 .for_each(|i| {
104 println!("{i}");
105 })
106 })
107 .expect("thread panicked");
108}Sourcefn readahead_profiled<TxP, RxP>(
self,
tx_profiler: TxP,
rx_profiler: RxP,
) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP> ⓘ
fn readahead_profiled<TxP, RxP>( self, tx_profiler: TxP, rx_profiler: RxP, ) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP> ⓘ
Profiled version of IteratorExt::readahead
Literally .profile_egress(tx_profiler).readahead(n).profile_ingress(rx_profiler)
See Profiler for more info.
Sourcefn readahead_scoped_profiled<'env, 'scope, TxP, RxP>(
self,
scope: &'scope Scope<'env>,
tx_profiler: TxP,
rx_profiler: RxP,
) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP> ⓘ
fn readahead_scoped_profiled<'env, 'scope, TxP, RxP>( self, scope: &'scope Scope<'env>, tx_profiler: TxP, rx_profiler: RxP, ) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP> ⓘ
Profiled version of IteratorExt::readahead_scoped
Literally .profile_egress(tx_profiler).readahead_scoped(scope, n).profile_ingress(rx_profiler)
See Profiler for more info.
Examples found in repository?
55fn main() {
56 dpc_pariter::scope(|scope| {
57 (0..22)
58 .map(|i| {
59 // make producting values slow
60 std::thread::sleep(time::Duration::from_millis(10));
61 i
62 })
63 .readahead_scoped_profiled(
64 scope,
65 dpc_pariter::TotalTimeProfiler::periodically_millis(2_000, || {
66 eprintln!("Blocked on sending")
67 }),
68 dpc_pariter::TotalTimeProfiler::new(|stat| {
69 eprintln!(
70 "Sending receiving wait time: {}ms",
71 stat.total().as_millis()
72 )
73 }),
74 )
75 .for_each(|i| {
76 println!("{i}");
77 })
78 })
79 .expect("thread panicked");
80
81 (0..22)
82 .profile_egress(StderrMsgProfiler::new("sending"))
83 .readahead()
84 .profile_ingress(StderrMsgProfiler::new("receiving"))
85 .for_each(|i| {
86 println!("{i}");
87 // make consuming values slow
88 std::thread::sleep(time::Duration::from_millis(10));
89 });
90
91 dpc_pariter::scope(|scope| {
92 (0..22)
93 .map(|i| {
94 // make producting values slow
95 std::thread::sleep(time::Duration::from_millis(10));
96 i
97 })
98 .readahead_scoped_profiled(
99 scope,
100 StderrMsgProfiler::new("sending2"),
101 StderrMsgProfiler::new("receiving2"),
102 )
103 .for_each(|i| {
104 println!("{i}");
105 })
106 })
107 .expect("thread panicked");
108}