pub trait Map<S: Scope, D: 'static>: Sized {
// Required methods
fn map_in_place<L: FnMut(&mut D) + 'static>(
self,
logic: L,
) -> StreamVec<S, D>;
fn flat_map<I: IntoIterator, L: FnMut(D) -> I + 'static>(
self,
logic: L,
) -> StreamVec<S, I::Item>
where I::Item: 'static;
// Provided method
fn map<D2: 'static, L: FnMut(D) -> D2 + 'static>(
self,
logic: L,
) -> StreamVec<S, D2> { ... }
}Expand description
Extension trait for StreamVec.
Required Methods§
Sourcefn map_in_place<L: FnMut(&mut D) + 'static>(self, logic: L) -> StreamVec<S, D>
fn map_in_place<L: FnMut(&mut D) + 'static>(self, logic: L) -> StreamVec<S, D>
Updates each element of the stream and yields the element, re-using memory where possible.
§Examples
use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
timely::example(|scope| {
(0..10).to_stream(scope)
.map_in_place(|x| *x += 1)
.inspect(|x| println!("seen: {:?}", x));
});Sourcefn flat_map<I: IntoIterator, L: FnMut(D) -> I + 'static>(
self,
logic: L,
) -> StreamVec<S, I::Item>where
I::Item: 'static,
fn flat_map<I: IntoIterator, L: FnMut(D) -> I + 'static>(
self,
logic: L,
) -> StreamVec<S, I::Item>where
I::Item: 'static,
Consumes each element of the stream and yields some number of new elements.
§Examples
use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
timely::example(|scope| {
(0..10).to_stream(scope)
.flat_map(|x| (0..x))
.inspect(|x| println!("seen: {:?}", x));
});Provided Methods§
Sourcefn map<D2: 'static, L: FnMut(D) -> D2 + 'static>(
self,
logic: L,
) -> StreamVec<S, D2>
fn map<D2: 'static, L: FnMut(D) -> D2 + 'static>( self, logic: L, ) -> StreamVec<S, D2>
Consumes each element of the stream and yields a new element.
§Examples
use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
timely::example(|scope| {
(0..10).to_stream(scope)
.map(|x| x + 1)
.inspect(|x| println!("seen: {:?}", x));
});Examples found in repository?
examples/event_driven.rs (line 28)
5fn main() {
6 // initializes and runs a timely dataflow.
7 timely::execute_from_args(std::env::args(), |worker| {
8
9 let timer = std::time::Instant::now();
10
11 let mut args = std::env::args();
12 args.next();
13
14 let dataflows = args.next().unwrap().parse::<usize>().unwrap();
15 let length = args.next().unwrap().parse::<usize>().unwrap();
16 let record = args.next() == Some("record".to_string());
17
18 let mut inputs = Vec::new();
19 let mut probes = Vec::new();
20
21 // create a new input, exchange data, and inspect its output
22 for _dataflow in 0 .. dataflows {
23 worker.dataflow(|scope| {
24 let (input, stream) = scope.new_input();
25 let stream = scope.region(|inner| {
26 let mut stream = stream.enter(inner);
27 for _step in 0 .. length {
28 stream = stream.map(|x: ()| x);
29 }
30 stream.leave()
31 });
32 let (probe, _stream) = stream.probe();
33 inputs.push(input);
34 probes.push(probe);
35 });
36 }
37
38 println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
39
40 for round in 0 .. {
41 let dataflow = round % dataflows;
42 if record {
43 inputs[dataflow].send(());
44 }
45 inputs[dataflow].advance_to(round);
46 let mut steps = 0;
47 while probes[dataflow].less_than(&round) {
48 worker.step();
49 steps += 1;
50 }
51 if round % 1000 == 0 { println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); }
52 }
53
54 }).unwrap();
55}More examples
examples/loopdemo.rs (line 32)
5fn main() {
6
7 let mut args = std::env::args();
8 args.next();
9 let rate: usize = args.next().expect("Must specify rate").parse().expect("Rate must be an usize");
10 let duration_s: usize = args.next().expect("Must specify duration_s").parse().expect("duration_s must be an usize");
11
12 timely::execute_from_args(args, move |worker| {
13
14 let index = worker.index();
15 let peers = worker.peers();
16
17 let timer = std::time::Instant::now();
18
19 let mut input = InputHandle::new();
20 let probe = ProbeHandle::new();
21
22 // Create a dataflow that discards input data (just synchronizes).
23 worker.dataflow(|scope| {
24
25 let stream = scope.input_from(&mut input);
26
27 let (loop_handle, loop_stream) = scope.feedback(1);
28
29 let step =
30 stream
31 .concat(loop_stream)
32 .map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 })
33 .filter(|x| x > &1);
34
35 step.probe_with(&probe)
36 .connect_loop(loop_handle);
37 });
38
39 let ns_per_request = 1_000_000_000 / rate;
40 let mut insert_counter = index; // counts up as we insert records.
41 let mut retire_counter = index; // counts up as we retire records.
42
43 let mut inserted_ns = 0;
44
45 // We repeatedly consult the elapsed time, and introduce any data now considered available.
46 // At the same time, we observe the output and record which inputs are considered retired.
47
48 let mut counts = vec![[0u64; 16]; 64];
49
50 let counter_limit = rate * duration_s;
51 while retire_counter < counter_limit {
52
53 // Open-loop latency-throughput test, parameterized by offered rate `ns_per_request`.
54 let elapsed = timer.elapsed();
55 let elapsed_ns = elapsed.as_secs() * 1_000_000_000 + (elapsed.subsec_nanos() as u64);
56
57 // Determine completed ns.
58 let acknowledged_ns: u64 = probe.with_frontier(|frontier| frontier[0]);
59
60 // Notice any newly-retired records.
61 while ((retire_counter * ns_per_request) as u64) < acknowledged_ns && retire_counter < counter_limit {
62 let requested_at = (retire_counter * ns_per_request) as u64;
63 let latency_ns = elapsed_ns - requested_at;
64
65 let count_index = latency_ns.next_power_of_two().trailing_zeros() as usize;
66 let low_bits = ((elapsed_ns - requested_at) >> (count_index - 5)) & 0xF;
67 counts[count_index][low_bits as usize] += 1;
68
69 retire_counter += peers;
70 }
71
72 // Now, should we introduce more records before stepping the worker?
73 // Three choices here:
74 //
75 // 1. Wait until previous batch acknowledged.
76 // 2. Tick at most once every millisecond-ish.
77 // 3. Geometrically increasing outstanding batches.
78
79 // Technique 1:
80 // let target_ns = if acknowledged_ns >= inserted_ns { elapsed_ns } else { inserted_ns };
81
82 // Technique 2:
83 // let target_ns = elapsed_ns & !((1 << 20) - 1);
84
85 // Technique 3:
86 let scale = (inserted_ns - acknowledged_ns).next_power_of_two();
87 let target_ns = elapsed_ns & !(scale - 1);
88
89 if inserted_ns < target_ns {
90
91 while ((insert_counter * ns_per_request) as u64) < target_ns {
92 input.send(insert_counter);
93 insert_counter += peers;
94 }
95 input.advance_to(target_ns);
96 inserted_ns = target_ns;
97 }
98
99 worker.step();
100 }
101
102 // Report observed latency measurements.
103 if index == 0 {
104
105 let mut results = Vec::new();
106 let total = counts.iter().map(|x| x.iter().sum::<u64>()).sum();
107 let mut sum = 0;
108 for index in (10 .. counts.len()).rev() {
109 for sub in (0 .. 16).rev() {
110 if sum > 0 && sum < total {
111 let latency = (1 << (index-1)) + (sub << (index-5));
112 let fraction = (sum as f64) / (total as f64);
113 results.push((latency, fraction));
114 }
115 sum += counts[index][sub];
116 }
117 }
118 for (latency, fraction) in results.drain(..).rev() {
119 println!("{}\t{}", latency, fraction);
120 }
121 }
122
123 }).unwrap();
124}Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.