pub struct Worker<A: Allocate> { /* private fields */ }Expand description
A Worker is the entry point to a timely dataflow computation. It wraps a Allocate,
and has a list of dataflows that it manages.
Implementations§
Source§impl<A: Allocate> Worker<A>
impl<A: Allocate> Worker<A>
Sourcepub fn new(config: Config, c: A, now: Option<Instant>) -> Worker<A>
pub fn new(config: Config, c: A, now: Option<Instant>) -> Worker<A>
Allocates a new Worker bound to a channel allocator.
Examples found in repository?
5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::allocator::Thread::default();
9 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11 // create input and probe handles.
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // directly build a dataflow.
16 worker.dataflow(|scope| {
17 input
18 .to_stream(scope)
19 .container::<Vec<_>>()
20 .inspect(|x| println!("{:?}", x))
21 .probe_with(&probe);
22 });
23
24 // manage inputs.
25 for i in 0 .. 10 {
26 input.send(i);
27 input.advance_to(i);
28 while probe.less_than(input.time()) {
29 worker.step();
30 }
31 }
32}Sourcepub fn step(&mut self) -> bool
pub fn step(&mut self) -> bool
Performs one step of the computation.
A step gives each dataflow operator a chance to run, and is the main way to ensure that a computation proceeds.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
use timely::dataflow::operators::{ToStream, Inspect};
worker.dataflow::<usize,_,_>(|scope| {
(0 .. 10)
.to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("{:?}", x));
});
worker.step();
});Examples found in repository?
4fn main() {
5 timely::execute(Config::thread(), |worker| {
6 let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7 let (input, stream) = scope.new_unordered_input();
8 stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9 input
10 });
11
12 for round in 0..10 {
13 input.activate().session(&cap).give(round);
14 cap = cap.delayed(&(round + 1));
15 worker.step();
16 }
17 }).unwrap();
18}More examples
6fn main() {
7 timely::execute(Config::process(4), |worker| {
8
9 let timer = Instant::now();
10 let mut sequencer = Sequencer::new(worker, Instant::now());
11
12 for round in 0 .. {
13 // if worker.index() < 3 {
14 std::thread::sleep(Duration::from_secs(1 + worker.index() as u64));
15 sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round));
16 // }
17 for element in &mut sequencer {
18 println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element);
19 }
20 worker.step();
21 }
22
23 }).unwrap(); // asserts error-free execution;
24}5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::allocator::Thread::default();
9 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11 // create input and probe handles.
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // directly build a dataflow.
16 worker.dataflow(|scope| {
17 input
18 .to_stream(scope)
19 .container::<Vec<_>>()
20 .inspect(|x| println!("{:?}", x))
21 .probe_with(&probe);
22 });
23
24 // manage inputs.
25 for i in 0 .. 10 {
26 input.send(i);
27 input.advance_to(i);
28 while probe.less_than(input.time()) {
29 worker.step();
30 }
31 }
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let index = worker.index();
9 let mut input = InputHandle::new();
10 let probe = ProbeHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 worker.dataflow(|scope| {
14 scope.input_from(&mut input)
15 .container::<Vec<_>>()
16 .exchange(|x| *x)
17 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18 .probe_with(&probe);
19 });
20
21 // introduce data and watch!
22 for round in 0..10 {
23 if index == 0 {
24 input.send(round);
25 }
26 input.advance_to(round + 1);
27 while probe.less_than(input.time()) {
28 worker.step();
29 }
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9 let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10 let mut input = InputHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 let probe = worker.dataflow(|scope|
14 scope
15 .input_from(&mut input)
16 .container::<Vec<_>>()
17 .exchange(|&x| x as u64)
18 .probe()
19 .0
20 );
21
22
23 let timer = std::time::Instant::now();
24
25 for round in 0 .. rounds {
26
27 for i in 0 .. batch {
28 input.send(i);
29 }
30 input.advance_to(round);
31
32 while probe.less_than(input.time()) {
33 worker.step();
34 }
35
36 }
37
38 let volume = (rounds * batch) as f64;
39 let elapsed = timer.elapsed();
40 let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42 println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44 }).unwrap();
45}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let timer = std::time::Instant::now();
9
10 let mut args = std::env::args();
11 args.next();
12
13 let dataflows = args.next().unwrap().parse::<usize>().unwrap();
14 let length = args.next().unwrap().parse::<usize>().unwrap();
15 let record = args.next() == Some("record".to_string());
16
17 let mut inputs = Vec::new();
18 let mut probes = Vec::new();
19
20 // create a new input, exchange data, and inspect its output
21 for _dataflow in 0 .. dataflows {
22 worker.dataflow(|scope| {
23 let (input, mut stream) = scope.new_input();
24 for _step in 0 .. length {
25 stream = stream.map(|x: ()| x);
26 }
27 let (probe, _stream) = stream.probe();
28 inputs.push(input);
29 probes.push(probe);
30 });
31 }
32
33 println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
34
35 for round in 0 .. {
36 let dataflow = round % dataflows;
37 if record {
38 inputs[dataflow].send(());
39 }
40 inputs[dataflow].advance_to(round);
41 let mut steps = 0;
42 while probes[dataflow].less_than(&round) {
43 worker.step();
44 steps += 1;
45 }
46 println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
47 }
48
49 }).unwrap();
50}Sourcepub fn step_or_park(&mut self, duration: Option<Duration>) -> bool
pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool
Performs one step of the computation.
A step gives each dataflow operator a chance to run, and is the main way to ensure that a computation proceeds.
This method takes an optional timeout and may park the thread until
there is work to perform or until this timeout expires. A value of
None allows the worker to park indefinitely, whereas a value of
Some(Duration::new(0, 0)) will return without parking the thread.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
use std::time::Duration;
use timely::dataflow::operators::{ToStream, Inspect};
worker.dataflow::<usize,_,_>(|scope| {
(0 .. 10)
.to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("{:?}", x));
});
worker.step_or_park(Some(Duration::from_secs(1)));
});Sourcepub fn step_while<F: FnMut() -> bool>(&mut self, func: F)
pub fn step_while<F: FnMut() -> bool>(&mut self, func: F)
Calls self.step() as long as func evaluates to true.
This method will continually execute even if there is not work
for the worker to perform. Consider using the similar method
Self::step_or_park_while(duration) to allow the worker to yield
control if that is appropriate.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
use timely::dataflow::operators::{ToStream, Inspect, Probe};
let probe =
worker.dataflow::<usize,_,_>(|scope| {
(0 .. 10)
.to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("{:?}", x))
.probe()
.0
});
worker.step_while(|| probe.less_than(&0));
});Examples found in repository?
10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}Sourcepub fn step_or_park_while<F: FnMut() -> bool>(
&mut self,
duration: Option<Duration>,
func: F,
)
pub fn step_or_park_while<F: FnMut() -> bool>( &mut self, duration: Option<Duration>, func: F, )
Calls self.step_or_park(duration) as long as func evaluates to true.
This method may yield whenever there is no work to perform, as performed
by Self::step_or_park(). Please consult the documentation for further
information about that method and its behavior. In particular, the method
can park the worker indefinitely, if no new work re-awakens the worker.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
use timely::dataflow::operators::{ToStream, Inspect, Probe};
let probe =
worker.dataflow::<usize,_,_>(|scope| {
(0 .. 10)
.to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("{:?}", x))
.probe()
.0
});
worker.step_or_park_while(None, || probe.less_than(&0));
});Sourcepub fn index(&self) -> usize
pub fn index(&self) -> usize
The index of the worker out of its peers.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
let index = worker.index();
let peers = worker.peers();
let timer = worker.timer().unwrap();
println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
});Examples found in repository?
5fn main() {
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let addr = format!("127.0.0.1:{}", 8000 + worker.index());
9 let send = TcpStream::connect(addr).unwrap();
10
11 worker.dataflow::<u64,_,_>(|scope|
12 (0..10u64)
13 .to_stream(scope)
14 .container::<Vec<_>>()
15 .capture_into(EventWriter::new(send))
16 );
17 }).unwrap();
18}More examples
6fn main() {
7 timely::execute(Config::process(4), |worker| {
8
9 let timer = Instant::now();
10 let mut sequencer = Sequencer::new(worker, Instant::now());
11
12 for round in 0 .. {
13 // if worker.index() < 3 {
14 std::thread::sleep(Duration::from_secs(1 + worker.index() as u64));
15 sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round));
16 // }
17 for element in &mut sequencer {
18 println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element);
19 }
20 worker.step();
21 }
22
23 }).unwrap(); // asserts error-free execution;
24}3fn main() {
4
5 let iterations = std::env::args().nth(1).unwrap().parse::<u64>().unwrap();
6 let elements = std::env::args().nth(2).unwrap().parse::<u64>().unwrap();
7
8 // initializes and runs a timely dataflow
9 timely::execute_from_args(std::env::args().skip(3), move |worker| {
10 let index = worker.index();
11 let peers = worker.peers();
12 worker.dataflow::<u64,_,_>(move |scope| {
13 let (helper, cycle) = scope.feedback(1);
14 (0 .. elements)
15 .filter(move |&x| (x as usize) % peers == index)
16 .to_stream(scope)
17 .concat(cycle)
18 .exchange(|&x| x)
19 .map_in_place(|x| *x += 1)
20 .branch_when(move |t| t < &iterations).1
21 .connect_loop(helper);
22 });
23 }).unwrap();
24}5fn main() {
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9
10 // create replayers from disjoint partition of source worker identifiers.
11 let replayers =
12 (0 .. source_peers)
13 .filter(|i| i % worker.peers() == worker.index())
14 .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
15 .collect::<Vec<_>>()
16 .into_iter()
17 .map(|l| l.incoming().next().unwrap().unwrap())
18 .map(EventReader::<_,Vec<u64>,_>::new)
19 .collect::<Vec<_>>();
20
21 worker.dataflow::<u64,_,_>(|scope| {
22 replayers
23 .replay_into(scope)
24 .inspect(|x| println!("replayed: {:?}", x));
25 })
26 }).unwrap(); // asserts error-free execution
27}10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let index = worker.index();
9 let mut input = InputHandle::new();
10 let probe = ProbeHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 worker.dataflow(|scope| {
14 scope.input_from(&mut input)
15 .container::<Vec<_>>()
16 .exchange(|x| *x)
17 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18 .probe_with(&probe);
19 });
20
21 // introduce data and watch!
22 for round in 0..10 {
23 if index == 0 {
24 input.send(round);
25 }
26 input.advance_to(round + 1);
27 while probe.less_than(input.time()) {
28 worker.step();
29 }
30 }
31 }).unwrap();
32}Sourcepub fn peers(&self) -> usize
pub fn peers(&self) -> usize
The total number of peer workers.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
let index = worker.index();
let peers = worker.peers();
let timer = worker.timer().unwrap();
println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
});Examples found in repository?
3fn main() {
4
5 let iterations = std::env::args().nth(1).unwrap().parse::<u64>().unwrap();
6 let elements = std::env::args().nth(2).unwrap().parse::<u64>().unwrap();
7
8 // initializes and runs a timely dataflow
9 timely::execute_from_args(std::env::args().skip(3), move |worker| {
10 let index = worker.index();
11 let peers = worker.peers();
12 worker.dataflow::<u64,_,_>(move |scope| {
13 let (helper, cycle) = scope.feedback(1);
14 (0 .. elements)
15 .filter(move |&x| (x as usize) % peers == index)
16 .to_stream(scope)
17 .concat(cycle)
18 .exchange(|&x| x)
19 .map_in_place(|x| *x += 1)
20 .branch_when(move |t| t < &iterations).1
21 .connect_loop(helper);
22 });
23 }).unwrap();
24}More examples
5fn main() {
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9
10 // create replayers from disjoint partition of source worker identifiers.
11 let replayers =
12 (0 .. source_peers)
13 .filter(|i| i % worker.peers() == worker.index())
14 .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
15 .collect::<Vec<_>>()
16 .into_iter()
17 .map(|l| l.incoming().next().unwrap().unwrap())
18 .map(EventReader::<_,Vec<u64>,_>::new)
19 .collect::<Vec<_>>();
20
21 worker.dataflow::<u64,_,_>(|scope| {
22 replayers
23 .replay_into(scope)
24 .inspect(|x| println!("replayed: {:?}", x));
25 })
26 }).unwrap(); // asserts error-free execution
27}8fn main() {
9 timely::execute_from_args(std::env::args(), |worker| {
10
11 let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
12
13 // create replayers from disjoint partition of source worker identifiers.
14 let replayers =
15 (0 .. source_peers)
16 .filter(|i| i % worker.peers() == worker.index())
17 .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
18 .collect::<Vec<_>>()
19 .into_iter()
20 .map(|l| l.incoming().next().unwrap().unwrap())
21 .map(EventReader::<Duration,Vec<(Duration,TimelySetup,TimelyEvent)>,_>::new)
22 .collect::<Vec<_>>();
23
24 worker.dataflow(|scope| {
25 replayers
26 .replay_into(scope)
27 .inspect(|x| println!("replayed: {:?}", x));
28 })
29 }).unwrap(); // asserts error-free execution
30}8fn main() {
9
10 // command-line args: numbers of nodes and edges in the random graph.
11 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input = InputHandle::new();
21 let probe = ProbeHandle::new();
22
23 worker.dataflow(|scope| {
24 scope.input_from(&mut input)
25 // .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26 .union_find()
27 .exchange(|_| 0)
28 .union_find()
29 .probe_with(&probe);
30 });
31
32 // Generate roughly random data.
33 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34 let hasher = BuildHasherDefault::<DefaultHasher>::new();
35 let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36 hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38 for (edge, arc) in insert.take(edges / peers).enumerate() {
39 input.send(arc);
40 if edge % batch == (batch - 1) {
41 let next = input.epoch() + 1;
42 input.advance_to(next);
43 while probe.less_than(input.time()) {
44 worker.step();
45 }
46 }
47 }
48
49 }).unwrap(); // asserts error-free execution;
50}8fn main() {
9
10 // command-line args: numbers of nodes and edges in the random graph.
11 let keys: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
12 let vals: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input1 = InputHandle::new();
21 let mut input2 = InputHandle::new();
22 let probe = ProbeHandle::new();
23
24 worker.dataflow(|scope| {
25
26 let stream1 = scope.input_from(&mut input1);
27 let stream2 = scope.input_from(&mut input2);
28
29 let exchange1 = Exchange::new(|x: &(u64, u64)| x.0);
30 let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);
31
32 stream1
33 .binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
34
35 let mut map1 = HashMap::<u64, Vec<u64>>::new();
36 let mut map2 = HashMap::<u64, Vec<u64>>::new();
37
38 move |input1, input2, output| {
39
40 // Drain first input, check second map, update first map.
41 input1.for_each_time(|time, data| {
42 let mut session = output.session(&time);
43 for (key, val1) in data.flat_map(|d| d.drain(..)) {
44 if let Some(values) = map2.get(&key) {
45 for val2 in values.iter() {
46 session.give((val1, *val2));
47 }
48 }
49
50 map1.entry(key).or_default().push(val1);
51 }
52 });
53
54 // Drain second input, check first map, update second map.
55 input2.for_each_time(|time, data| {
56 let mut session = output.session(&time);
57 for (key, val2) in data.flat_map(|d| d.drain(..)) {
58 if let Some(values) = map1.get(&key) {
59 for val1 in values.iter() {
60 session.give((*val1, val2));
61 }
62 }
63
64 map2.entry(key).or_default().push(val2);
65 }
66 });
67 }
68 })
69 .container::<Vec<_>>()
70 .probe_with(&probe);
71 });
72
73 // Generate roughly random data.
74 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
75 let hasher = BuildHasherDefault::<DefaultHasher>::new();
76 let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) % keys,
77 hasher.hash_one(&(i,index,1)) % keys,
78 hasher.hash_one(&(i,index,2)) % keys,
79 hasher.hash_one(&(i,index,3)) % keys));
80
81 let timer = std::time::Instant::now();
82
83 let mut sent = 0;
84 while sent < (vals / peers) {
85
86 // Send some amount of data, no more than `batch`.
87 let to_send = std::cmp::min(batch, vals/peers - sent);
88 for (src0, dst0, src1, dst1) in (&mut insert).take(to_send) {
89 input1.send((src0, dst0));
90 input2.send((src1, dst1));
91 }
92 sent += to_send;
93
94 // Advance input, iterate until data cleared.
95 let next = input1.epoch() + 1;
96 input1.advance_to(next);
97 input2.advance_to(next);
98 while probe.less_than(input1.time()) {
99 worker.step();
100 }
101
102 println!("{:?}\tworker {} batch complete", timer.elapsed(), index)
103 }
104
105 }).unwrap(); // asserts error-free execution;
106}5fn main() {
6
7 let mut args = std::env::args();
8 args.next();
9 let rate: usize = args.next().expect("Must specify rate").parse().expect("Rate must be an usize");
10 let duration_s: usize = args.next().expect("Must specify duration_s").parse().expect("duration_s must be an usize");
11
12 timely::execute_from_args(args, move |worker| {
13
14 let index = worker.index();
15 let peers = worker.peers();
16
17 let timer = std::time::Instant::now();
18
19 let mut input = InputHandle::new();
20 let probe = ProbeHandle::new();
21
22 // Create a dataflow that discards input data (just synchronizes).
23 worker.dataflow(|scope| {
24
25 let stream = scope.input_from(&mut input);
26
27 let (loop_handle, loop_stream) = scope.feedback(1);
28
29 let step =
30 stream
31 .concat(loop_stream)
32 .map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 })
33 .filter(|x| x > &1);
34
35 step.probe_with(&probe)
36 .connect_loop(loop_handle);
37 });
38
39 let ns_per_request = 1_000_000_000 / rate;
40 let mut insert_counter = index; // counts up as we insert records.
41 let mut retire_counter = index; // counts up as we retire records.
42
43 let mut inserted_ns = 0;
44
45 // We repeatedly consult the elapsed time, and introduce any data now considered available.
46 // At the same time, we observe the output and record which inputs are considered retired.
47
48 let mut counts = vec![[0u64; 16]; 64];
49
50 let counter_limit = rate * duration_s;
51 while retire_counter < counter_limit {
52
53 // Open-loop latency-throughput test, parameterized by offered rate `ns_per_request`.
54 let elapsed = timer.elapsed();
55 let elapsed_ns = elapsed.as_secs() * 1_000_000_000 + (elapsed.subsec_nanos() as u64);
56
57 // Determine completed ns.
58 let acknowledged_ns: u64 = probe.with_frontier(|frontier| frontier[0]);
59
60 // Notice any newly-retired records.
61 while ((retire_counter * ns_per_request) as u64) < acknowledged_ns && retire_counter < counter_limit {
62 let requested_at = (retire_counter * ns_per_request) as u64;
63 let latency_ns = elapsed_ns - requested_at;
64
65 let count_index = latency_ns.next_power_of_two().trailing_zeros() as usize;
66 let low_bits = ((elapsed_ns - requested_at) >> (count_index - 5)) & 0xF;
67 counts[count_index][low_bits as usize] += 1;
68
69 retire_counter += peers;
70 }
71
72 // Now, should we introduce more records before stepping the worker?
73 // Three choices here:
74 //
75 // 1. Wait until previous batch acknowledged.
76 // 2. Tick at most once every millisecond-ish.
77 // 3. Geometrically increasing outstanding batches.
78
79 // Technique 1:
80 // let target_ns = if acknowledged_ns >= inserted_ns { elapsed_ns } else { inserted_ns };
81
82 // Technique 2:
83 // let target_ns = elapsed_ns & !((1 << 20) - 1);
84
85 // Technique 3:
86 let scale = (inserted_ns - acknowledged_ns).next_power_of_two();
87 let target_ns = elapsed_ns & !(scale - 1);
88
89 if inserted_ns < target_ns {
90
91 while ((insert_counter * ns_per_request) as u64) < target_ns {
92 input.send(insert_counter);
93 insert_counter += peers;
94 }
95 input.advance_to(target_ns);
96 inserted_ns = target_ns;
97 }
98
99 worker.step();
100 }
101
102 // Report observed latency measurements.
103 if index == 0 {
104
105 let mut results = Vec::new();
106 let total = counts.iter().map(|x| x.iter().sum::<u64>()).sum();
107 let mut sum = 0;
108 for index in (10 .. counts.len()).rev() {
109 for sub in (0 .. 16).rev() {
110 if sum > 0 && sum < total {
111 let latency = (1 << (index-1)) + (sub << (index-5));
112 let fraction = (sum as f64) / (total as f64);
113 results.push((latency, fraction));
114 }
115 sum += counts[index][sub];
116 }
117 }
118 for (latency, fraction) in results.drain(..).rev() {
119 println!("{}\t{}", latency, fraction);
120 }
121 }
122
123 }).unwrap();
124}Sourcepub fn timer(&self) -> Option<Instant>
pub fn timer(&self) -> Option<Instant>
A timer started at the initiation of the timely computation.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
let index = worker.index();
let peers = worker.peers();
let timer = worker.timer().unwrap();
println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
});Sourcepub fn new_identifier(&mut self) -> usize
pub fn new_identifier(&mut self) -> usize
Allocate a new worker-unique identifier.
This method is public, though it is not expected to be widely used outside of the timely dataflow system.
Sourcepub fn peek_identifier(&self) -> usize
pub fn peek_identifier(&self) -> usize
The next worker-unique identifier to be allocated.
Sourcepub fn log_register(&self) -> Option<RefMut<'_, Registry>>
pub fn log_register(&self) -> Option<RefMut<'_, Registry>>
Access to named loggers.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
worker.log_register()
.unwrap()
.insert::<timely::logging::TimelyEventBuilder,_>("timely", |time, data|
println!("{:?}\t{:?}", time, data)
);
});Examples found in repository?
9fn main() {
10 // initializes and runs a timely dataflow.
11 timely::execute_from_args(std::env::args(), |worker| {
12
13 let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
14 let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17
18 // Register timely worker logging.
19 worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
20 if let Some(data) = data {
21 data.iter().for_each(|x| println!("LOG1: {:?}", x))
22 }
23 else {
24 println!("LOG1: Flush {time:?}");
25 }
26 );
27
28 // Register timely progress logging.
29 // Less generally useful: intended for debugging advanced custom operators or timely
30 // internals.
31 worker.log_register().unwrap().insert::<TimelyProgressEventBuilder<usize>,_>("timely/progress/usize", |time, data|
32 if let Some(data) = data {
33 data.iter().for_each(|x| {
34 println!("PROGRESS: {:?}", x);
35 let (_, ev) = x;
36 print!("PROGRESS: TYPED MESSAGES: ");
37 for (n, p, t, d) in ev.messages.iter() {
38 print!("{:?}, ", (n, p, t, d));
39 }
40 println!();
41 print!("PROGRESS: TYPED INTERNAL: ");
42 for (n, p, t, d) in ev.internal.iter() {
43 print!("{:?}, ", (n, p, t, d));
44 }
45 println!();
46 })
47 }
48 else {
49 println!("PROGRESS: Flush {time:?}");
50 }
51 );
52
53 worker.log_register().unwrap().insert::<TrackerEventBuilder<usize>,_>("timely/reachability/usize", |time, data|
54 if let Some(data) = data {
55 data.iter().for_each(|x| {
56 println!("REACHABILITY: {:?}", x);
57 })
58 }
59 else {
60 println!("REACHABILITY: Flush {time:?}");
61 }
62 );
63
64 worker.log_register().unwrap().insert::<TimelySummaryEventBuilder<usize>,_>("timely/summary/usize", |time, data|
65 if let Some(data) = data {
66 data.iter().for_each(|(_, x)| {
67 println!("SUMMARY: {:?}", x);
68 })
69 }
70 else {
71 println!("SUMMARY: Flush {time:?}");
72 }
73 );
74
75 // create a new input, exchange data, and inspect its output
76 worker.dataflow(|scope| {
77 scope
78 .input_from(&mut input)
79 .container::<Vec<_>>()
80 .exchange(|&x| x as u64)
81 .probe_with(&probe);
82 });
83
84 // Register timely worker logging.
85 worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
86 if let Some(data) = data {
87 data.iter().for_each(|x| println!("LOG2: {:?}", x))
88 }
89 else {
90 println!("LOG2: Flush {time:?}");
91 }
92 );
93
94 // create a new input, exchange data, and inspect its output
95 worker.dataflow(|scope| {
96 scope
97 .input_from(&mut input)
98 .exchange(|&x| x as u64)
99 .probe_with(&probe);
100 });
101
102 // Register user-level logging.
103 type MyBuilder = CapacityContainerBuilder<Vec<(Duration, ())>>;
104 worker.log_register().unwrap().insert::<MyBuilder,_>("input", |time, data|
105 if let Some(data) = data {
106 for element in data.iter() {
107 println!("Round tick at: {:?}", element.0);
108 }
109 }
110 else {
111 println!("Round flush at: {time:?}");
112 }
113 );
114
115 let input_logger = worker.log_register().unwrap().get::<MyBuilder>("input").expect("Input logger absent");
116
117 let timer = std::time::Instant::now();
118
119 for round in 0 .. rounds {
120
121 for i in 0 .. batch {
122 input.send(i);
123 }
124 input.advance_to(round);
125 input_logger.log(());
126
127 while probe.less_than(input.time()) {
128 worker.step();
129 }
130
131 }
132
133 let volume = (rounds * batch) as f64;
134 let elapsed = timer.elapsed();
135 let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
136
137 println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
138
139 }).unwrap();
140}Sourcepub fn dataflow<T, R, F>(&mut self, func: F) -> R
pub fn dataflow<T, R, F>(&mut self, func: F) -> R
Construct a new dataflow.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
// We must supply the timestamp type here, although
// it would generally be determined by type inference.
worker.dataflow::<usize,_,_>(|scope| {
// uses of `scope` to build dataflow
});
});Examples found in repository?
5fn main() {
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let addr = format!("127.0.0.1:{}", 8000 + worker.index());
9 let send = TcpStream::connect(addr).unwrap();
10
11 worker.dataflow::<u64,_,_>(|scope|
12 (0..10u64)
13 .to_stream(scope)
14 .container::<Vec<_>>()
15 .capture_into(EventWriter::new(send))
16 );
17 }).unwrap();
18}More examples
4fn main() {
5 timely::execute(Config::thread(), |worker| {
6 let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7 let (input, stream) = scope.new_unordered_input();
8 stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9 input
10 });
11
12 for round in 0..10 {
13 input.activate().session(&cap).give(round);
14 cap = cap.delayed(&(round + 1));
15 worker.step();
16 }
17 }).unwrap();
18}5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::allocator::Thread::default();
9 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11 // create input and probe handles.
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // directly build a dataflow.
16 worker.dataflow(|scope| {
17 input
18 .to_stream(scope)
19 .container::<Vec<_>>()
20 .inspect(|x| println!("{:?}", x))
21 .probe_with(&probe);
22 });
23
24 // manage inputs.
25 for i in 0 .. 10 {
26 input.send(i);
27 input.advance_to(i);
28 while probe.less_than(input.time()) {
29 worker.step();
30 }
31 }
32}3fn main() {
4
5 let iterations = std::env::args().nth(1).unwrap().parse::<u64>().unwrap();
6 let elements = std::env::args().nth(2).unwrap().parse::<u64>().unwrap();
7
8 // initializes and runs a timely dataflow
9 timely::execute_from_args(std::env::args().skip(3), move |worker| {
10 let index = worker.index();
11 let peers = worker.peers();
12 worker.dataflow::<u64,_,_>(move |scope| {
13 let (helper, cycle) = scope.feedback(1);
14 (0 .. elements)
15 .filter(move |&x| (x as usize) % peers == index)
16 .to_stream(scope)
17 .concat(cycle)
18 .exchange(|&x| x)
19 .map_in_place(|x| *x += 1)
20 .branch_when(move |t| t < &iterations).1
21 .connect_loop(helper);
22 });
23 }).unwrap();
24}5fn main() {
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9
10 // create replayers from disjoint partition of source worker identifiers.
11 let replayers =
12 (0 .. source_peers)
13 .filter(|i| i % worker.peers() == worker.index())
14 .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
15 .collect::<Vec<_>>()
16 .into_iter()
17 .map(|l| l.incoming().next().unwrap().unwrap())
18 .map(EventReader::<_,Vec<u64>,_>::new)
19 .collect::<Vec<_>>();
20
21 worker.dataflow::<u64,_,_>(|scope| {
22 replayers
23 .replay_into(scope)
24 .inspect(|x| println!("replayed: {:?}", x));
25 })
26 }).unwrap(); // asserts error-free execution
27}10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}- examples/hello.rs
- examples/logging-recv.rs
- examples/barrier.rs
- examples/flow_controlled.rs
- examples/exchange.rs
- examples/event_driven.rs
- examples/unionfind.rs
- examples/distinct.rs
- examples/wordcount.rs
- examples/hashjoin.rs
- examples/loopdemo.rs
- examples/logging-send.rs
- examples/openloop.rs
- examples/columnar.rs
- examples/bfs.rs
- examples/pagerank.rs
Sourcepub fn dataflow_named<T, R, F>(&mut self, name: &str, func: F) -> R
pub fn dataflow_named<T, R, F>(&mut self, name: &str, func: F) -> R
Construct a new dataflow with a (purely cosmetic) name.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
// We must supply the timestamp type here, although
// it would generally be determined by type inference.
worker.dataflow_named::<usize,_,_>("Some Dataflow", |scope| {
// uses of `scope` to build dataflow
});
});Sourcepub fn dataflow_core<T, R, F, V>(
&mut self,
name: &str,
logging: Option<TimelyLogger>,
resources: V,
func: F,
) -> R
pub fn dataflow_core<T, R, F, V>( &mut self, name: &str, logging: Option<TimelyLogger>, resources: V, func: F, ) -> R
Construct a new dataflow with specific configurations.
This method constructs a new dataflow, using a name, logger, and additional resources specified as argument. The name is cosmetic, the logger is used to handle events generated by the dataflow, and the additional resources are kept alive for as long as the dataflow is alive (use case: shared library bindings).
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
// We must supply the timestamp type here, although
// it would generally be determined by type inference.
worker.dataflow_core::<usize,_,_,_>(
"dataflow X", // Dataflow name
None, // Optional logger
37, // Any resources
|resources, scope| { // Closure
// uses of `resources`, `scope`to build dataflow
}
);
});Sourcepub fn drop_dataflow(&mut self, dataflow_identifier: usize)
pub fn drop_dataflow(&mut self, dataflow_identifier: usize)
Drops an identified dataflow.
This method removes the identified dataflow, which will no longer be scheduled. Various other resources will be cleaned up, though the method is currently in public beta rather than expected to work. Please report all crashes and unmet expectations!
Sourcepub fn next_dataflow_index(&self) -> usize
pub fn next_dataflow_index(&self) -> usize
Returns the next index to be used for dataflow construction.
This identifier will appear in the address of contained operators, and can
be used to drop the dataflow using self.drop_dataflow().
Sourcepub fn installed_dataflows(&self) -> Vec<usize>
pub fn installed_dataflows(&self) -> Vec<usize>
List the current dataflow indices.
Sourcepub fn has_dataflows(&self) -> bool
pub fn has_dataflows(&self) -> bool
Returns true if there is at least one dataflow under management.
Trait Implementations§
Source§impl<A: Allocate> AsWorker for Worker<A>
impl<A: Allocate> AsWorker for Worker<A>
Source§fn allocate<D: Exchangeable>(
&mut self,
identifier: usize,
address: Rc<[usize]>,
) -> (Vec<Box<dyn Push<D>>>, Box<dyn Pull<D>>)
fn allocate<D: Exchangeable>( &mut self, identifier: usize, address: Rc<[usize]>, ) -> (Vec<Box<dyn Push<D>>>, Box<dyn Pull<D>>)
Source§fn pipeline<T: 'static>(
&mut self,
identifier: usize,
address: Rc<[usize]>,
) -> (ThreadPusher<T>, ThreadPuller<T>)
fn pipeline<T: 'static>( &mut self, identifier: usize, address: Rc<[usize]>, ) -> (ThreadPusher<T>, ThreadPuller<T>)
Source§fn broadcast<T: Exchangeable + Clone>(
&mut self,
identifier: usize,
address: Rc<[usize]>,
) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>)
fn broadcast<T: Exchangeable + Clone>( &mut self, identifier: usize, address: Rc<[usize]>, ) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>)
Source§fn new_identifier(&mut self) -> usize
fn new_identifier(&mut self) -> usize
Source§fn peek_identifier(&self) -> usize
fn peek_identifier(&self) -> usize
Source§fn log_register(&self) -> Option<RefMut<'_, Registry>>
fn log_register(&self) -> Option<RefMut<'_, Registry>>
Source§fn logger_for<CB: ContainerBuilder>(&self, name: &str) -> Option<Logger<CB>>
fn logger_for<CB: ContainerBuilder>(&self, name: &str) -> Option<Logger<CB>>
Source§fn logging(&self) -> Option<TimelyLogger>
fn logging(&self) -> Option<TimelyLogger>
Source§impl<A: Allocate> Scheduler for Worker<A>
impl<A: Allocate> Scheduler for Worker<A>
Source§fn activations(&self) -> Rc<RefCell<Activations>>
fn activations(&self) -> Rc<RefCell<Activations>>
Source§fn activator_for(&self, path: Rc<[usize]>) -> Activator
fn activator_for(&self, path: Rc<[usize]>) -> Activator
Activator tied to the specified operator address.Source§fn sync_activator_for(&self, path: Vec<usize>) -> SyncActivator
fn sync_activator_for(&self, path: Vec<usize>) -> SyncActivator
SyncActivator tied to the specified operator address.Auto Trait Implementations§
impl<A> Freeze for Worker<A>
impl<A> !RefUnwindSafe for Worker<A>
impl<A> !Send for Worker<A>
impl<A> !Sync for Worker<A>
impl<A> Unpin for Worker<A>
impl<A> UnsafeUnpin for Worker<A>
impl<A> !UnwindSafe for Worker<A>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more