IteratorExt

Trait IteratorExt 

Source
pub trait IteratorExt {
Show 16 methods // Provided methods fn parallel_map<F, O>(self, f: F) -> ParallelMap<Self, O> where Self: Sized + Iterator + 'static, F: 'static + Send + Clone + FnMut(Self::Item) -> O, Self::Item: Send + 'static, O: Send + 'static { ... } fn parallel_map_custom<F, O, OF>(self, of: OF, f: F) -> ParallelMap<Self, O> where Self: Sized + Iterator + 'static, F: 'static + Send + Clone + FnMut(Self::Item) -> O, Self::Item: Send + 'static, O: Send + 'static, OF: FnOnce(ParallelMapBuilder<Self>) -> ParallelMapBuilder<Self> { ... } fn parallel_map_scoped<'env, 'scope, F, O>( self, scope: &'scope Scope<'env>, f: F, ) -> ParallelMap<Self, O> where Self: Sized + Iterator + 'env, F: 'env + Send + Clone + FnMut(Self::Item) -> O, Self::Item: Send + 'env, O: Send + 'env { ... } fn parallel_map_scoped_custom<'env, 'scope, F, O, OF>( self, scope: &'scope Scope<'env>, of: OF, f: F, ) -> ParallelMap<Self, O> where Self: Sized + Iterator + 'env, F: 'env + Send + Clone + FnMut(Self::Item) -> O, Self::Item: Send + 'env, O: Send + 'env, OF: FnOnce(ParallelMapBuilder<Self>) -> ParallelMapBuilder<Self> { ... } fn parallel_filter<F>(self, f: F) -> ParallelFilter<Self> where Self: Sized + Iterator + 'static, F: 'static + Send + Clone + FnMut(&Self::Item) -> bool, Self::Item: Send + 'static { ... } fn parallel_filter_custom<F, OF>(self, of: OF, f: F) -> ParallelFilter<Self> where Self: Sized + Iterator + 'static, F: 'static + Send + Clone + FnMut(&Self::Item) -> bool, Self::Item: Send + 'static, OF: FnOnce(ParallelFilterBuilder<Self>) -> ParallelFilterBuilder<Self> { ... } fn parallel_filter_scoped<'env, 'scope, F>( self, scope: &'scope Scope<'env>, f: F, ) -> ParallelFilter<Self> where Self: Sized + Iterator + 'env, F: 'env + Send + Clone + FnMut(&Self::Item) -> bool, Self::Item: Send + 'env { ... } fn parallel_filter_scoped_custom<'env, 'scope, F, OF>( self, scope: &'scope Scope<'env>, of: OF, f: F, ) -> ParallelFilter<Self> where Self: Sized + Iterator + 'env, F: 'env + Send + Clone + FnMut(&Self::Item) -> bool, Self::Item: Send + 'env, OF: FnOnce(ParallelFilterBuilder<Self>) -> ParallelFilterBuilder<Self> { ... } fn readahead(self) -> Readahead<Self> where Self: Iterator + Send + 'static + Sized, Self::Item: Send + 'static { ... } fn readahead_custom<OF>(self, of: OF) -> Readahead<Self> where Self: Iterator + Sized + Send + 'static, Self::Item: Send + 'static, OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self> { ... } fn readahead_scoped<'env, 'scope>( self, scope: &'scope Scope<'env>, ) -> Readahead<Self> where Self: Sized + Send + Iterator + 'scope + 'env, Self::Item: Send + 'env + 'scope { ... } fn readahead_scoped_custom<'env, 'scope, OF>( self, scope: &'scope Scope<'env>, of: OF, ) -> Readahead<Self> where Self: Sized + Send + Iterator + 'scope + 'env, Self::Item: Send + 'env + 'scope, OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self> { ... } fn profile_egress<P>(self, profiler: P) -> ProfileEgress<Self, P> where P: Profiler, Self: Iterator + Sized { ... } fn profile_ingress<P>(self, profiler: P) -> ProfileIngress<Self, P> where P: Profiler, Self: Iterator + Sized { ... } fn readahead_profiled<TxP, RxP>( self, tx_profiler: TxP, rx_profiler: RxP, ) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP> where TxP: Profiler + Send + 'static, RxP: Profiler, Self: Iterator + Sized + Send + 'static, Self::Item: Send + 'static { ... } fn readahead_scoped_profiled<'env, 'scope, TxP, RxP>( self, scope: &'scope Scope<'env>, tx_profiler: TxP, rx_profiler: RxP, ) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP> where TxP: Profiler + Send + 'static, RxP: Profiler, Self: Sized + Send + Iterator + 'scope + 'env, Self::Item: Send + 'env + 'scope { ... }
}
Expand description

Extension trait for std::iter::Iterator bringing parallel operations

§TODO

  • parallel_for_each
  • parallel_flat_map
  • possibly others

PRs welcome

Provided Methods§

Source

fn parallel_map<F, O>(self, f: F) -> ParallelMap<Self, O>
where Self: Sized + Iterator + 'static, F: 'static + Send + Clone + FnMut(Self::Item) -> O, Self::Item: Send + 'static, O: Send + 'static,

Run map function in parallel on multiple threads

Results will be returned in order.

No items will be pulled until first time ParallelMap is pulled for elements with ParallelMap::next. In that respect, ParallelMap behaves like every other iterator and is lazy.

Source

fn parallel_map_custom<F, O, OF>(self, of: OF, f: F) -> ParallelMap<Self, O>
where Self: Sized + Iterator + 'static, F: 'static + Send + Clone + FnMut(Self::Item) -> O, Self::Item: Send + 'static, O: Send + 'static, OF: FnOnce(ParallelMapBuilder<Self>) -> ParallelMapBuilder<Self>,

Source

fn parallel_map_scoped<'env, 'scope, F, O>( self, scope: &'scope Scope<'env>, f: F, ) -> ParallelMap<Self, O>
where Self: Sized + Iterator + 'env, F: 'env + Send + Clone + FnMut(Self::Item) -> O, Self::Item: Send + 'env, O: Send + 'env,

A version of [parallel_map] supporting iterating over borrowed values.

See IteratorExt::parallel_map

Source

fn parallel_map_scoped_custom<'env, 'scope, F, O, OF>( self, scope: &'scope Scope<'env>, of: OF, f: F, ) -> ParallelMap<Self, O>
where Self: Sized + Iterator + 'env, F: 'env + Send + Clone + FnMut(Self::Item) -> O, Self::Item: Send + 'env, O: Send + 'env, OF: FnOnce(ParallelMapBuilder<Self>) -> ParallelMapBuilder<Self>,

Source

fn parallel_filter<F>(self, f: F) -> ParallelFilter<Self>
where Self: Sized + Iterator + 'static, F: 'static + Send + Clone + FnMut(&Self::Item) -> bool, Self::Item: Send + 'static,

Run filter function in parallel on multiple threads

A wrapper around IteratorExt::parallel_map really, so it has similiar properties.

Source

fn parallel_filter_custom<F, OF>(self, of: OF, f: F) -> ParallelFilter<Self>
where Self: Sized + Iterator + 'static, F: 'static + Send + Clone + FnMut(&Self::Item) -> bool, Self::Item: Send + 'static, OF: FnOnce(ParallelFilterBuilder<Self>) -> ParallelFilterBuilder<Self>,

Source

fn parallel_filter_scoped<'env, 'scope, F>( self, scope: &'scope Scope<'env>, f: F, ) -> ParallelFilter<Self>
where Self: Sized + Iterator + 'env, F: 'env + Send + Clone + FnMut(&Self::Item) -> bool, Self::Item: Send + 'env,

Source

fn parallel_filter_scoped_custom<'env, 'scope, F, OF>( self, scope: &'scope Scope<'env>, of: OF, f: F, ) -> ParallelFilter<Self>
where Self: Sized + Iterator + 'env, F: 'env + Send + Clone + FnMut(&Self::Item) -> bool, Self::Item: Send + 'env, OF: FnOnce(ParallelFilterBuilder<Self>) -> ParallelFilterBuilder<Self>,

Source

fn readahead(self) -> Readahead<Self>
where Self: Iterator + Send + 'static + Sized, Self::Item: Send + 'static,

Run the current iterator in another thread and return elements through a buffered channel.

buffer_size defines the size of the output channel connecting current and the inner thread. It’s a common mistake to use large channel sizes needlessly in hopes of achieving higher performance. The only benefit large buffer size value provides is smooting out the variance of the inner iterator returning items. The cost - wasting memory. In normal circumstances 0 is recommended (the default).

Examples found in repository?
examples/simple_profiler.rs (line 83)
55fn main() {
56    dpc_pariter::scope(|scope| {
57        (0..22)
58            .map(|i| {
59                // make producting values slow
60                std::thread::sleep(time::Duration::from_millis(10));
61                i
62            })
63            .readahead_scoped_profiled(
64                scope,
65                dpc_pariter::TotalTimeProfiler::periodically_millis(2_000, || {
66                    eprintln!("Blocked on sending")
67                }),
68                dpc_pariter::TotalTimeProfiler::new(|stat| {
69                    eprintln!(
70                        "Sending receiving wait time: {}ms",
71                        stat.total().as_millis()
72                    )
73                }),
74            )
75            .for_each(|i| {
76                println!("{i}");
77            })
78    })
79    .expect("thread panicked");
80
81    (0..22)
82        .profile_egress(StderrMsgProfiler::new("sending"))
83        .readahead()
84        .profile_ingress(StderrMsgProfiler::new("receiving"))
85        .for_each(|i| {
86            println!("{i}");
87            // make consuming values slow
88            std::thread::sleep(time::Duration::from_millis(10));
89        });
90
91    dpc_pariter::scope(|scope| {
92        (0..22)
93            .map(|i| {
94                // make producting values slow
95                std::thread::sleep(time::Duration::from_millis(10));
96                i
97            })
98            .readahead_scoped_profiled(
99                scope,
100                StderrMsgProfiler::new("sending2"),
101                StderrMsgProfiler::new("receiving2"),
102            )
103            .for_each(|i| {
104                println!("{i}");
105            })
106    })
107    .expect("thread panicked");
108}
Source

fn readahead_custom<OF>(self, of: OF) -> Readahead<Self>
where Self: Iterator + Sized + Send + 'static, Self::Item: Send + 'static, OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self>,

Source

fn readahead_scoped<'env, 'scope>( self, scope: &'scope Scope<'env>, ) -> Readahead<Self>
where Self: Sized + Send + Iterator + 'scope + 'env, Self::Item: Send + 'env + 'scope,

Scoped version of IteratorExt::readahead

Use when you want to process in parallel items that contain borrowed references.

See scope.

Source

fn readahead_scoped_custom<'env, 'scope, OF>( self, scope: &'scope Scope<'env>, of: OF, ) -> Readahead<Self>
where Self: Sized + Send + Iterator + 'scope + 'env, Self::Item: Send + 'env + 'scope, OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self>,

Source

fn profile_egress<P>(self, profiler: P) -> ProfileEgress<Self, P>
where P: Profiler, Self: Iterator + Sized,

Profile the time it takes downstream iterator step to consume the returned items.

See ProfileEgress and profile::Profiler.

Examples found in repository?
examples/simple_profiler.rs (line 82)
55fn main() {
56    dpc_pariter::scope(|scope| {
57        (0..22)
58            .map(|i| {
59                // make producting values slow
60                std::thread::sleep(time::Duration::from_millis(10));
61                i
62            })
63            .readahead_scoped_profiled(
64                scope,
65                dpc_pariter::TotalTimeProfiler::periodically_millis(2_000, || {
66                    eprintln!("Blocked on sending")
67                }),
68                dpc_pariter::TotalTimeProfiler::new(|stat| {
69                    eprintln!(
70                        "Sending receiving wait time: {}ms",
71                        stat.total().as_millis()
72                    )
73                }),
74            )
75            .for_each(|i| {
76                println!("{i}");
77            })
78    })
79    .expect("thread panicked");
80
81    (0..22)
82        .profile_egress(StderrMsgProfiler::new("sending"))
83        .readahead()
84        .profile_ingress(StderrMsgProfiler::new("receiving"))
85        .for_each(|i| {
86            println!("{i}");
87            // make consuming values slow
88            std::thread::sleep(time::Duration::from_millis(10));
89        });
90
91    dpc_pariter::scope(|scope| {
92        (0..22)
93            .map(|i| {
94                // make producting values slow
95                std::thread::sleep(time::Duration::from_millis(10));
96                i
97            })
98            .readahead_scoped_profiled(
99                scope,
100                StderrMsgProfiler::new("sending2"),
101                StderrMsgProfiler::new("receiving2"),
102            )
103            .for_each(|i| {
104                println!("{i}");
105            })
106    })
107    .expect("thread panicked");
108}
Source

fn profile_ingress<P>(self, profiler: P) -> ProfileIngress<Self, P>
where P: Profiler, Self: Iterator + Sized,

Profile the time it takes upstream iterator step to produce the returned items.

See ProfileIngress and profile::Profiler.

Examples found in repository?
examples/simple_profiler.rs (line 84)
55fn main() {
56    dpc_pariter::scope(|scope| {
57        (0..22)
58            .map(|i| {
59                // make producting values slow
60                std::thread::sleep(time::Duration::from_millis(10));
61                i
62            })
63            .readahead_scoped_profiled(
64                scope,
65                dpc_pariter::TotalTimeProfiler::periodically_millis(2_000, || {
66                    eprintln!("Blocked on sending")
67                }),
68                dpc_pariter::TotalTimeProfiler::new(|stat| {
69                    eprintln!(
70                        "Sending receiving wait time: {}ms",
71                        stat.total().as_millis()
72                    )
73                }),
74            )
75            .for_each(|i| {
76                println!("{i}");
77            })
78    })
79    .expect("thread panicked");
80
81    (0..22)
82        .profile_egress(StderrMsgProfiler::new("sending"))
83        .readahead()
84        .profile_ingress(StderrMsgProfiler::new("receiving"))
85        .for_each(|i| {
86            println!("{i}");
87            // make consuming values slow
88            std::thread::sleep(time::Duration::from_millis(10));
89        });
90
91    dpc_pariter::scope(|scope| {
92        (0..22)
93            .map(|i| {
94                // make producting values slow
95                std::thread::sleep(time::Duration::from_millis(10));
96                i
97            })
98            .readahead_scoped_profiled(
99                scope,
100                StderrMsgProfiler::new("sending2"),
101                StderrMsgProfiler::new("receiving2"),
102            )
103            .for_each(|i| {
104                println!("{i}");
105            })
106    })
107    .expect("thread panicked");
108}
Source

fn readahead_profiled<TxP, RxP>( self, tx_profiler: TxP, rx_profiler: RxP, ) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP>
where TxP: Profiler + Send + 'static, RxP: Profiler, Self: Iterator + Sized + Send + 'static, Self::Item: Send + 'static,

Profiled version of IteratorExt::readahead

Literally .profile_egress(tx_profiler).readahead(n).profile_ingress(rx_profiler)

See Profiler for more info.

Source

fn readahead_scoped_profiled<'env, 'scope, TxP, RxP>( self, scope: &'scope Scope<'env>, tx_profiler: TxP, rx_profiler: RxP, ) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP>
where TxP: Profiler + Send + 'static, RxP: Profiler, Self: Sized + Send + Iterator + 'scope + 'env, Self::Item: Send + 'env + 'scope,

Profiled version of IteratorExt::readahead_scoped

Literally .profile_egress(tx_profiler).readahead_scoped(scope, n).profile_ingress(rx_profiler)

See Profiler for more info.

Examples found in repository?
examples/simple_profiler.rs (lines 63-74)
55fn main() {
56    dpc_pariter::scope(|scope| {
57        (0..22)
58            .map(|i| {
59                // make producting values slow
60                std::thread::sleep(time::Duration::from_millis(10));
61                i
62            })
63            .readahead_scoped_profiled(
64                scope,
65                dpc_pariter::TotalTimeProfiler::periodically_millis(2_000, || {
66                    eprintln!("Blocked on sending")
67                }),
68                dpc_pariter::TotalTimeProfiler::new(|stat| {
69                    eprintln!(
70                        "Sending receiving wait time: {}ms",
71                        stat.total().as_millis()
72                    )
73                }),
74            )
75            .for_each(|i| {
76                println!("{i}");
77            })
78    })
79    .expect("thread panicked");
80
81    (0..22)
82        .profile_egress(StderrMsgProfiler::new("sending"))
83        .readahead()
84        .profile_ingress(StderrMsgProfiler::new("receiving"))
85        .for_each(|i| {
86            println!("{i}");
87            // make consuming values slow
88            std::thread::sleep(time::Duration::from_millis(10));
89        });
90
91    dpc_pariter::scope(|scope| {
92        (0..22)
93            .map(|i| {
94                // make producting values slow
95                std::thread::sleep(time::Duration::from_millis(10));
96                i
97            })
98            .readahead_scoped_profiled(
99                scope,
100                StderrMsgProfiler::new("sending2"),
101                StderrMsgProfiler::new("receiving2"),
102            )
103            .for_each(|i| {
104                println!("{i}");
105            })
106    })
107    .expect("thread panicked");
108}

Implementors§

Source§

impl<I> IteratorExt for I
where I: Iterator,