Trait timely::dataflow::operators::count::Accumulate
[−]
[src]
pub trait Accumulate<G: Scope, D: Data> { fn accumulate<A: Data, F: Fn(&mut A, &mut Content<D>) + 'static>(
&self,
default: A,
logic: F
) -> Stream<G, A>; fn count(&self) -> Stream<G, usize> { ... } }
Accumulates records within a timestamp.
Required Methods
fn accumulate<A: Data, F: Fn(&mut A, &mut Content<D>) + 'static>(
&self,
default: A,
logic: F
) -> Stream<G, A>
&self,
default: A,
logic: F
) -> Stream<G, A>
Accumulates records within a timestamp.
Examples
use timely::dataflow::operators::{ToStream, Accumulate, Capture}; use timely::dataflow::operators::capture::Extract; use timely::progress::timestamp::RootTimestamp; let captured = timely::example(|scope| { (0..10).to_stream(scope) .accumulate(0, |sum, data| { for &x in data.iter() { *sum += x; } }) .capture() }); let extracted = captured.extract(); assert_eq!(extracted, vec![(RootTimestamp::new(0), vec![45])]);
Provided Methods
fn count(&self) -> Stream<G, usize>
Counts the number of records observed at each time.
Examples
use timely::dataflow::operators::{ToStream, Accumulate, Capture}; use timely::dataflow::operators::capture::Extract; use timely::progress::timestamp::RootTimestamp; let captured = timely::example(|scope| { (0..10).to_stream(scope) .count() .capture() }); let extracted = captured.extract(); assert_eq!(extracted, vec![(RootTimestamp::new(0), vec![10])]);