Skip to main content

Worker

Struct Worker 

Source
pub struct Worker<A: Allocate> { /* private fields */ }
Expand description

A Worker is the entry point to a timely dataflow computation. It wraps a Allocate, and has a list of dataflows that it manages.

Implementations§

Source§

impl<A: Allocate> Worker<A>

Source

pub fn new(config: Config, c: A, now: Option<Instant>) -> Worker<A>

Allocates a new Worker bound to a channel allocator.

Examples found in repository?
examples/threadless.rs (line 9)
5fn main() {
6
7    // create a naked single-threaded worker.
8    let allocator = timely::communication::allocator::Thread::default();
9    let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11    // create input and probe handles.
12    let mut input = InputHandle::new();
13    let probe = ProbeHandle::new();
14
15    // directly build a dataflow.
16    worker.dataflow(|scope| {
17        input
18            .to_stream(scope)
19            .container::<Vec<_>>()
20            .inspect(|x| println!("{:?}", x))
21            .probe_with(&probe);
22    });
23
24    // manage inputs.
25    for i in 0 .. 10 {
26        input.send(i);
27        input.advance_to(i);
28        while probe.less_than(input.time()) {
29            worker.step();
30        }
31    }
32}
Source

pub fn step(&mut self) -> bool

Performs one step of the computation.

A step gives each dataflow operator a chance to run, and is the main way to ensure that a computation proceeds.

§Examples
timely::execute_from_args(::std::env::args(), |worker| {

    use timely::dataflow::operators::{ToStream, Inspect};

    worker.dataflow::<usize,_,_>(|scope| {
        (0 .. 10)
            .to_stream(scope)
            .container::<Vec<_>>()
            .inspect(|x| println!("{:?}", x));
    });

    worker.step();
});
Examples found in repository?
examples/unordered_input.rs (line 15)
4fn main() {
5    timely::execute(Config::thread(), |worker| {
6        let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7            let (input, stream) = scope.new_unordered_input();
8            stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9            input
10        });
11
12        for round in 0..10 {
13            input.activate().session(&cap).give(round);
14            cap = cap.delayed(&(round + 1));
15            worker.step();
16        }
17    }).unwrap();
18}
More examples
Hide additional examples
examples/sequence.rs (line 20)
6fn main() {
7    timely::execute(Config::process(4), |worker| {
8
9        let timer = Instant::now();
10        let mut sequencer = Sequencer::new(worker, Instant::now());
11
12        for round in 0 .. {
13            // if worker.index() < 3 {
14                std::thread::sleep(Duration::from_secs(1 + worker.index() as u64));
15                sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round));
16            // }
17            for element in &mut sequencer {
18                println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element);
19            }
20            worker.step();
21        }
22
23    }).unwrap(); // asserts error-free execution;
24}
examples/threadless.rs (line 29)
5fn main() {
6
7    // create a naked single-threaded worker.
8    let allocator = timely::communication::allocator::Thread::default();
9    let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11    // create input and probe handles.
12    let mut input = InputHandle::new();
13    let probe = ProbeHandle::new();
14
15    // directly build a dataflow.
16    worker.dataflow(|scope| {
17        input
18            .to_stream(scope)
19            .container::<Vec<_>>()
20            .inspect(|x| println!("{:?}", x))
21            .probe_with(&probe);
22    });
23
24    // manage inputs.
25    for i in 0 .. 10 {
26        input.send(i);
27        input.advance_to(i);
28        while probe.less_than(input.time()) {
29            worker.step();
30        }
31    }
32}
examples/hello.rs (line 28)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let index = worker.index();
9        let mut input = InputHandle::new();
10        let probe = ProbeHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        worker.dataflow(|scope| {
14            scope.input_from(&mut input)
15                 .container::<Vec<_>>()
16                 .exchange(|x| *x)
17                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18                 .probe_with(&probe);
19        });
20
21        // introduce data and watch!
22        for round in 0..10 {
23            if index == 0 {
24                input.send(round);
25            }
26            input.advance_to(round + 1);
27            while probe.less_than(input.time()) {
28                worker.step();
29            }
30        }
31    }).unwrap();
32}
examples/exchange.rs (line 33)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9        let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10        let mut input = InputHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        let probe = worker.dataflow(|scope|
14            scope
15                .input_from(&mut input)
16                .container::<Vec<_>>()
17                .exchange(|&x| x as u64)
18                .probe()
19                .0
20        );
21
22
23        let timer = std::time::Instant::now();
24
25        for round in 0 .. rounds {
26
27            for i in 0 .. batch {
28                input.send(i);
29            }
30            input.advance_to(round);
31
32            while probe.less_than(input.time()) {
33                worker.step();
34            }
35
36        }
37
38        let volume = (rounds * batch) as f64;
39        let elapsed = timer.elapsed();
40        let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42        println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44    }).unwrap();
45}
examples/event_driven.rs (line 43)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let timer = std::time::Instant::now();
9
10        let mut args = std::env::args();
11        args.next();
12
13        let dataflows = args.next().unwrap().parse::<usize>().unwrap();
14        let length = args.next().unwrap().parse::<usize>().unwrap();
15        let record = args.next() == Some("record".to_string());
16
17        let mut inputs = Vec::new();
18        let mut probes = Vec::new();
19
20        // create a new input, exchange data, and inspect its output
21        for _dataflow in 0 .. dataflows {
22            worker.dataflow(|scope| {
23                let (input, mut stream) = scope.new_input();
24                for _step in 0 .. length {
25                    stream = stream.map(|x: ()| x);
26                }
27                let (probe, _stream) = stream.probe();
28                inputs.push(input);
29                probes.push(probe);
30            });
31        }
32
33        println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
34
35        for round in 0 .. {
36            let dataflow = round % dataflows;
37            if record {
38                inputs[dataflow].send(());
39            }
40            inputs[dataflow].advance_to(round);
41            let mut steps = 0;
42            while probes[dataflow].less_than(&round) {
43                worker.step();
44                steps += 1;
45            }
46            println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
47        }
48
49    }).unwrap();
50}
Source

pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool

Performs one step of the computation.

A step gives each dataflow operator a chance to run, and is the main way to ensure that a computation proceeds.

This method takes an optional timeout and may park the thread until there is work to perform or until this timeout expires. A value of None allows the worker to park indefinitely, whereas a value of Some(Duration::new(0, 0)) will return without parking the thread.

§Examples
timely::execute_from_args(::std::env::args(), |worker| {

    use std::time::Duration;
    use timely::dataflow::operators::{ToStream, Inspect};

    worker.dataflow::<usize,_,_>(|scope| {
        (0 .. 10)
            .to_stream(scope)
            .container::<Vec<_>>()
            .inspect(|x| println!("{:?}", x));
    });

    worker.step_or_park(Some(Duration::from_secs(1)));
});
Source

pub fn step_while<F: FnMut() -> bool>(&mut self, func: F)

Calls self.step() as long as func evaluates to true.

This method will continually execute even if there is not work for the worker to perform. Consider using the similar method Self::step_or_park_while(duration) to allow the worker to yield control if that is appropriate.

§Examples
timely::execute_from_args(::std::env::args(), |worker| {

    use timely::dataflow::operators::{ToStream, Inspect, Probe};

    let probe =
    worker.dataflow::<usize,_,_>(|scope| {
        (0 .. 10)
            .to_stream(scope)
            .container::<Vec<_>>()
            .inspect(|x| println!("{:?}", x))
            .probe()
            .0
    });

    worker.step_while(|| probe.less_than(&0));
});
Examples found in repository?
examples/rc.rs (line 29)
10fn main() {
11    // initializes and runs a timely dataflow.
12    timely::execute_from_args(std::env::args(), |worker| {
13        // create a new input, exchange data, and inspect its output
14        let index = worker.index();
15        let mut input = InputHandle::new();
16        let probe = ProbeHandle::new();
17        worker.dataflow(|scope| {
18            scope.input_from(&mut input)
19                 .container::<Vec<_>>()
20                 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21                 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22                 .probe_with(&probe);
23        });
24
25        // introduce data and watch!
26        for round in 0..10 {
27            input.send(Test { _field: Rc::new(round) } );
28            input.advance_to(round + 1);
29            worker.step_while(|| probe.less_than(input.time()));
30        }
31    }).unwrap();
32}
Source

pub fn step_or_park_while<F: FnMut() -> bool>( &mut self, duration: Option<Duration>, func: F, )

Calls self.step_or_park(duration) as long as func evaluates to true.

This method may yield whenever there is no work to perform, as performed by Self::step_or_park(). Please consult the documentation for further information about that method and its behavior. In particular, the method can park the worker indefinitely, if no new work re-awakens the worker.

§Examples
timely::execute_from_args(::std::env::args(), |worker| {

    use timely::dataflow::operators::{ToStream, Inspect, Probe};

    let probe =
    worker.dataflow::<usize,_,_>(|scope| {
        (0 .. 10)
            .to_stream(scope)
            .container::<Vec<_>>()
            .inspect(|x| println!("{:?}", x))
            .probe()
            .0
    });

    worker.step_or_park_while(None, || probe.less_than(&0));
});
Source

pub fn index(&self) -> usize

The index of the worker out of its peers.

§Examples
timely::execute_from_args(::std::env::args(), |worker| {

    let index = worker.index();
    let peers = worker.peers();
    let timer = worker.timer().unwrap();

    println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);

});
Examples found in repository?
examples/capture_send.rs (line 8)
5fn main() {
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let addr = format!("127.0.0.1:{}", 8000 + worker.index());
9        let send = TcpStream::connect(addr).unwrap();
10
11        worker.dataflow::<u64,_,_>(|scope|
12            (0..10u64)
13                .to_stream(scope)
14                .container::<Vec<_>>()
15                .capture_into(EventWriter::new(send))
16        );
17    }).unwrap();
18}
More examples
Hide additional examples
examples/sequence.rs (line 14)
6fn main() {
7    timely::execute(Config::process(4), |worker| {
8
9        let timer = Instant::now();
10        let mut sequencer = Sequencer::new(worker, Instant::now());
11
12        for round in 0 .. {
13            // if worker.index() < 3 {
14                std::thread::sleep(Duration::from_secs(1 + worker.index() as u64));
15                sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round));
16            // }
17            for element in &mut sequencer {
18                println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element);
19            }
20            worker.step();
21        }
22
23    }).unwrap(); // asserts error-free execution;
24}
examples/pingpong.rs (line 10)
3fn main() {
4
5    let iterations = std::env::args().nth(1).unwrap().parse::<u64>().unwrap();
6    let elements = std::env::args().nth(2).unwrap().parse::<u64>().unwrap();
7
8    // initializes and runs a timely dataflow
9    timely::execute_from_args(std::env::args().skip(3), move |worker| {
10        let index = worker.index();
11        let peers = worker.peers();
12        worker.dataflow::<u64,_,_>(move |scope| {
13            let (helper, cycle) = scope.feedback(1);
14            (0 .. elements)
15                  .filter(move |&x| (x as usize) % peers == index)
16                  .to_stream(scope)
17                  .concat(cycle)
18                  .exchange(|&x| x)
19                  .map_in_place(|x| *x += 1)
20                  .branch_when(move |t| t < &iterations).1
21                  .connect_loop(helper);
22        });
23    }).unwrap();
24}
examples/capture_recv.rs (line 13)
5fn main() {
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9
10        // create replayers from disjoint partition of source worker identifiers.
11        let replayers =
12        (0 .. source_peers)
13            .filter(|i| i % worker.peers() == worker.index())
14            .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
15            .collect::<Vec<_>>()
16            .into_iter()
17            .map(|l| l.incoming().next().unwrap().unwrap())
18            .map(EventReader::<_,Vec<u64>,_>::new)
19            .collect::<Vec<_>>();
20
21        worker.dataflow::<u64,_,_>(|scope| {
22            replayers
23                .replay_into(scope)
24                .inspect(|x| println!("replayed: {:?}", x));
25        })
26    }).unwrap(); // asserts error-free execution
27}
examples/rc.rs (line 14)
10fn main() {
11    // initializes and runs a timely dataflow.
12    timely::execute_from_args(std::env::args(), |worker| {
13        // create a new input, exchange data, and inspect its output
14        let index = worker.index();
15        let mut input = InputHandle::new();
16        let probe = ProbeHandle::new();
17        worker.dataflow(|scope| {
18            scope.input_from(&mut input)
19                 .container::<Vec<_>>()
20                 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21                 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22                 .probe_with(&probe);
23        });
24
25        // introduce data and watch!
26        for round in 0..10 {
27            input.send(Test { _field: Rc::new(round) } );
28            input.advance_to(round + 1);
29            worker.step_while(|| probe.less_than(input.time()));
30        }
31    }).unwrap();
32}
examples/hello.rs (line 8)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let index = worker.index();
9        let mut input = InputHandle::new();
10        let probe = ProbeHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        worker.dataflow(|scope| {
14            scope.input_from(&mut input)
15                 .container::<Vec<_>>()
16                 .exchange(|x| *x)
17                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18                 .probe_with(&probe);
19        });
20
21        // introduce data and watch!
22        for round in 0..10 {
23            if index == 0 {
24                input.send(round);
25            }
26            input.advance_to(round + 1);
27            while probe.less_than(input.time()) {
28                worker.step();
29            }
30        }
31    }).unwrap();
32}
Source

pub fn peers(&self) -> usize

The total number of peer workers.

§Examples
timely::execute_from_args(::std::env::args(), |worker| {

    let index = worker.index();
    let peers = worker.peers();
    let timer = worker.timer().unwrap();

    println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);

});
Examples found in repository?
examples/pingpong.rs (line 11)
3fn main() {
4
5    let iterations = std::env::args().nth(1).unwrap().parse::<u64>().unwrap();
6    let elements = std::env::args().nth(2).unwrap().parse::<u64>().unwrap();
7
8    // initializes and runs a timely dataflow
9    timely::execute_from_args(std::env::args().skip(3), move |worker| {
10        let index = worker.index();
11        let peers = worker.peers();
12        worker.dataflow::<u64,_,_>(move |scope| {
13            let (helper, cycle) = scope.feedback(1);
14            (0 .. elements)
15                  .filter(move |&x| (x as usize) % peers == index)
16                  .to_stream(scope)
17                  .concat(cycle)
18                  .exchange(|&x| x)
19                  .map_in_place(|x| *x += 1)
20                  .branch_when(move |t| t < &iterations).1
21                  .connect_loop(helper);
22        });
23    }).unwrap();
24}
More examples
Hide additional examples
examples/capture_recv.rs (line 13)
5fn main() {
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9
10        // create replayers from disjoint partition of source worker identifiers.
11        let replayers =
12        (0 .. source_peers)
13            .filter(|i| i % worker.peers() == worker.index())
14            .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
15            .collect::<Vec<_>>()
16            .into_iter()
17            .map(|l| l.incoming().next().unwrap().unwrap())
18            .map(EventReader::<_,Vec<u64>,_>::new)
19            .collect::<Vec<_>>();
20
21        worker.dataflow::<u64,_,_>(|scope| {
22            replayers
23                .replay_into(scope)
24                .inspect(|x| println!("replayed: {:?}", x));
25        })
26    }).unwrap(); // asserts error-free execution
27}
examples/logging-recv.rs (line 16)
8fn main() {
9    timely::execute_from_args(std::env::args(), |worker| {
10
11        let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
12
13        // create replayers from disjoint partition of source worker identifiers.
14        let replayers =
15        (0 .. source_peers)
16            .filter(|i| i % worker.peers() == worker.index())
17            .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
18            .collect::<Vec<_>>()
19            .into_iter()
20            .map(|l| l.incoming().next().unwrap().unwrap())
21            .map(EventReader::<Duration,Vec<(Duration,TimelySetup,TimelyEvent)>,_>::new)
22            .collect::<Vec<_>>();
23
24        worker.dataflow(|scope| {
25            replayers
26                .replay_into(scope)
27                .inspect(|x| println!("replayed: {:?}", x));
28        })
29    }).unwrap(); // asserts error-free execution
30}
examples/unionfind.rs (line 18)
8fn main() {
9
10    // command-line args: numbers of nodes and edges in the random graph.
11    let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12    let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13    let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15    timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17        let index = worker.index();
18        let peers = worker.peers();
19
20        let mut input = InputHandle::new();
21        let probe = ProbeHandle::new();
22
23        worker.dataflow(|scope| {
24            scope.input_from(&mut input)
25                //  .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26                 .union_find()
27                 .exchange(|_| 0)
28                 .union_find()
29                 .probe_with(&probe);
30        });
31
32        // Generate roughly random data.
33        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34        let hasher = BuildHasherDefault::<DefaultHasher>::new();
35        let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36                                         hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38        for (edge, arc) in insert.take(edges / peers).enumerate() {
39            input.send(arc);
40            if edge % batch == (batch - 1) {
41                let next = input.epoch() + 1;
42                input.advance_to(next);
43                while probe.less_than(input.time()) {
44                    worker.step();
45                }
46            }
47        }
48
49    }).unwrap(); // asserts error-free execution;
50}
examples/hashjoin.rs (line 18)
8fn main() {
9
10    // command-line args: numbers of nodes and edges in the random graph.
11    let keys: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
12    let vals: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13    let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15    timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17        let index = worker.index();
18        let peers = worker.peers();
19
20        let mut input1 = InputHandle::new();
21        let mut input2 = InputHandle::new();
22        let probe = ProbeHandle::new();
23
24        worker.dataflow(|scope| {
25
26            let stream1 = scope.input_from(&mut input1);
27            let stream2 = scope.input_from(&mut input2);
28
29            let exchange1 = Exchange::new(|x: &(u64, u64)| x.0);
30            let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);
31
32            stream1
33                .binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
34
35                    let mut map1 = HashMap::<u64, Vec<u64>>::new();
36                    let mut map2 = HashMap::<u64, Vec<u64>>::new();
37
38                    move |input1, input2, output| {
39
40                        // Drain first input, check second map, update first map.
41                        input1.for_each_time(|time, data| {
42                            let mut session = output.session(&time);
43                            for (key, val1) in data.flat_map(|d| d.drain(..)) {
44                                if let Some(values) = map2.get(&key) {
45                                    for val2 in values.iter() {
46                                        session.give((val1, *val2));
47                                    }
48                                }
49
50                                map1.entry(key).or_default().push(val1);
51                            }
52                        });
53
54                        // Drain second input, check first map, update second map.
55                        input2.for_each_time(|time, data| {
56                            let mut session = output.session(&time);
57                            for (key, val2) in data.flat_map(|d| d.drain(..)) {
58                                if let Some(values) = map1.get(&key) {
59                                    for val1 in values.iter() {
60                                        session.give((*val1, val2));
61                                    }
62                                }
63
64                                map2.entry(key).or_default().push(val2);
65                            }
66                        });
67                    }
68                })
69                .container::<Vec<_>>()
70                .probe_with(&probe);
71        });
72
73        // Generate roughly random data.
74        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
75        let hasher = BuildHasherDefault::<DefaultHasher>::new();
76        let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) % keys,
77                                             hasher.hash_one(&(i,index,1)) % keys,
78                                             hasher.hash_one(&(i,index,2)) % keys,
79                                             hasher.hash_one(&(i,index,3)) % keys));
80
81        let timer = std::time::Instant::now();
82
83        let mut sent = 0;
84        while sent < (vals / peers) {
85
86            // Send some amount of data, no more than `batch`.
87            let to_send = std::cmp::min(batch, vals/peers - sent);
88            for (src0, dst0, src1, dst1) in (&mut insert).take(to_send) {
89                input1.send((src0, dst0));
90                input2.send((src1, dst1));
91            }
92            sent += to_send;
93
94            // Advance input, iterate until data cleared.
95            let next = input1.epoch() + 1;
96            input1.advance_to(next);
97            input2.advance_to(next);
98            while probe.less_than(input1.time()) {
99                worker.step();
100            }
101
102            println!("{:?}\tworker {} batch complete", timer.elapsed(), index)
103        }
104
105    }).unwrap(); // asserts error-free execution;
106}
examples/loopdemo.rs (line 15)
5fn main() {
6
7    let mut args = std::env::args();
8    args.next();
9    let rate: usize = args.next().expect("Must specify rate").parse().expect("Rate must be an usize");
10    let duration_s: usize = args.next().expect("Must specify duration_s").parse().expect("duration_s must be an usize");
11
12    timely::execute_from_args(args, move |worker| {
13
14        let index = worker.index();
15        let peers = worker.peers();
16
17        let timer = std::time::Instant::now();
18
19        let mut input = InputHandle::new();
20        let probe = ProbeHandle::new();
21
22        // Create a dataflow that discards input data (just synchronizes).
23        worker.dataflow(|scope| {
24
25            let stream = scope.input_from(&mut input);
26
27            let (loop_handle, loop_stream) = scope.feedback(1);
28
29            let step =
30            stream
31                .concat(loop_stream)
32                .map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 })
33                .filter(|x| x > &1);
34
35            step.probe_with(&probe)
36                .connect_loop(loop_handle);
37        });
38
39        let ns_per_request = 1_000_000_000 / rate;
40        let mut insert_counter = index;           // counts up as we insert records.
41        let mut retire_counter = index;           // counts up as we retire records.
42
43        let mut inserted_ns = 0;
44
45        // We repeatedly consult the elapsed time, and introduce any data now considered available.
46        // At the same time, we observe the output and record which inputs are considered retired.
47
48        let mut counts = vec![[0u64; 16]; 64];
49
50        let counter_limit = rate * duration_s;
51        while retire_counter < counter_limit {
52
53            // Open-loop latency-throughput test, parameterized by offered rate `ns_per_request`.
54            let elapsed = timer.elapsed();
55            let elapsed_ns = elapsed.as_secs() * 1_000_000_000 + (elapsed.subsec_nanos() as u64);
56
57            // Determine completed ns.
58            let acknowledged_ns: u64 = probe.with_frontier(|frontier| frontier[0]);
59
60            // Notice any newly-retired records.
61            while ((retire_counter * ns_per_request) as u64) < acknowledged_ns && retire_counter < counter_limit {
62                let requested_at = (retire_counter * ns_per_request) as u64;
63                let latency_ns = elapsed_ns - requested_at;
64
65                let count_index = latency_ns.next_power_of_two().trailing_zeros() as usize;
66                let low_bits = ((elapsed_ns - requested_at) >> (count_index - 5)) & 0xF;
67                counts[count_index][low_bits as usize] += 1;
68
69                retire_counter += peers;
70            }
71
72            // Now, should we introduce more records before stepping the worker?
73            // Three choices here:
74            //
75            //   1. Wait until previous batch acknowledged.
76            //   2. Tick at most once every millisecond-ish.
77            //   3. Geometrically increasing outstanding batches.
78
79            // Technique 1:
80            // let target_ns = if acknowledged_ns >= inserted_ns { elapsed_ns } else { inserted_ns };
81
82            // Technique 2:
83            // let target_ns = elapsed_ns & !((1 << 20) - 1);
84
85            // Technique 3:
86            let scale = (inserted_ns - acknowledged_ns).next_power_of_two();
87            let target_ns = elapsed_ns & !(scale - 1);
88
89            if inserted_ns < target_ns {
90
91                while ((insert_counter * ns_per_request) as u64) < target_ns {
92                    input.send(insert_counter);
93                    insert_counter += peers;
94                }
95                input.advance_to(target_ns);
96                inserted_ns = target_ns;
97            }
98
99            worker.step();
100        }
101
102        // Report observed latency measurements.
103        if index == 0 {
104
105            let mut results = Vec::new();
106            let total = counts.iter().map(|x| x.iter().sum::<u64>()).sum();
107            let mut sum = 0;
108            for index in (10 .. counts.len()).rev() {
109                for sub in (0 .. 16).rev() {
110                    if sum > 0 && sum < total {
111                        let latency = (1 << (index-1)) + (sub << (index-5));
112                        let fraction = (sum as f64) / (total as f64);
113                        results.push((latency, fraction));
114                    }
115                    sum += counts[index][sub];
116                }
117            }
118            for (latency, fraction) in results.drain(..).rev() {
119                println!("{}\t{}", latency, fraction);
120            }
121        }
122
123    }).unwrap();
124}
Source

pub fn timer(&self) -> Option<Instant>

A timer started at the initiation of the timely computation.

§Examples
timely::execute_from_args(::std::env::args(), |worker| {

    let index = worker.index();
    let peers = worker.peers();
    let timer = worker.timer().unwrap();

    println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);

});
Source

pub fn new_identifier(&mut self) -> usize

Allocate a new worker-unique identifier.

This method is public, though it is not expected to be widely used outside of the timely dataflow system.

Source

pub fn peek_identifier(&self) -> usize

The next worker-unique identifier to be allocated.

Source

pub fn log_register(&self) -> Option<RefMut<'_, Registry>>

Access to named loggers.

§Examples
timely::execute_from_args(::std::env::args(), |worker| {

    worker.log_register()
          .unwrap()
          .insert::<timely::logging::TimelyEventBuilder,_>("timely", |time, data|
              println!("{:?}\t{:?}", time, data)
          );
});
Examples found in repository?
examples/logging-send.rs (line 19)
9fn main() {
10    // initializes and runs a timely dataflow.
11    timely::execute_from_args(std::env::args(), |worker| {
12
13        let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
14        let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
15        let mut input = InputHandle::new();
16        let probe = ProbeHandle::new();
17
18        // Register timely worker logging.
19        worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
20            if let Some(data) = data {
21                data.iter().for_each(|x| println!("LOG1: {:?}", x))
22            }
23            else {
24                println!("LOG1: Flush {time:?}");
25            }
26        );
27
28        // Register timely progress logging.
29        // Less generally useful: intended for debugging advanced custom operators or timely
30        // internals.
31        worker.log_register().unwrap().insert::<TimelyProgressEventBuilder<usize>,_>("timely/progress/usize", |time, data|
32            if let Some(data) = data {
33                data.iter().for_each(|x| {
34                    println!("PROGRESS: {:?}", x);
35                    let (_, ev) = x;
36                    print!("PROGRESS: TYPED MESSAGES: ");
37                    for (n, p, t, d) in ev.messages.iter() {
38                        print!("{:?}, ", (n, p, t, d));
39                    }
40                    println!();
41                    print!("PROGRESS: TYPED INTERNAL: ");
42                    for (n, p, t, d) in ev.internal.iter() {
43                        print!("{:?}, ", (n, p, t, d));
44                    }
45                    println!();
46                })
47            }
48            else {
49                println!("PROGRESS: Flush {time:?}");
50            }
51        );
52
53        worker.log_register().unwrap().insert::<TrackerEventBuilder<usize>,_>("timely/reachability/usize", |time, data|
54            if let Some(data) = data {
55                data.iter().for_each(|x| {
56                    println!("REACHABILITY: {:?}", x);
57                })
58            }
59            else {
60                println!("REACHABILITY: Flush {time:?}");
61            }
62        );
63
64        worker.log_register().unwrap().insert::<TimelySummaryEventBuilder<usize>,_>("timely/summary/usize", |time, data|
65            if let Some(data) = data {
66                data.iter().for_each(|(_, x)| {
67                    println!("SUMMARY: {:?}", x);
68                })
69            }
70            else {
71                println!("SUMMARY: Flush {time:?}");
72            }
73        );
74
75        // create a new input, exchange data, and inspect its output
76        worker.dataflow(|scope| {
77            scope
78                .input_from(&mut input)
79                .container::<Vec<_>>()
80                .exchange(|&x| x as u64)
81                .probe_with(&probe);
82        });
83
84        // Register timely worker logging.
85        worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
86            if let Some(data) = data {
87                data.iter().for_each(|x| println!("LOG2: {:?}", x))
88            }
89            else {
90                println!("LOG2: Flush {time:?}");
91            }
92        );
93
94        // create a new input, exchange data, and inspect its output
95        worker.dataflow(|scope| {
96            scope
97                .input_from(&mut input)
98                .exchange(|&x| x as u64)
99                .probe_with(&probe);
100        });
101
102        // Register user-level logging.
103        type MyBuilder = CapacityContainerBuilder<Vec<(Duration, ())>>;
104        worker.log_register().unwrap().insert::<MyBuilder,_>("input", |time, data|
105            if let Some(data) = data {
106                for element in data.iter() {
107                    println!("Round tick at: {:?}", element.0);
108                }
109            }
110            else {
111                println!("Round flush at: {time:?}");
112            }
113        );
114
115        let input_logger = worker.log_register().unwrap().get::<MyBuilder>("input").expect("Input logger absent");
116
117        let timer = std::time::Instant::now();
118
119        for round in 0 .. rounds {
120
121            for i in 0 .. batch {
122                input.send(i);
123            }
124            input.advance_to(round);
125            input_logger.log(());
126
127            while probe.less_than(input.time()) {
128                worker.step();
129            }
130
131        }
132
133        let volume = (rounds * batch) as f64;
134        let elapsed = timer.elapsed();
135        let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
136
137        println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
138
139    }).unwrap();
140}
Source

pub fn dataflow<T, R, F>(&mut self, func: F) -> R
where T: Refines<()>, F: FnOnce(&mut Child<'_, Self, T>) -> R,

Construct a new dataflow.

§Examples
timely::execute_from_args(::std::env::args(), |worker| {

    // We must supply the timestamp type here, although
    // it would generally be determined by type inference.
    worker.dataflow::<usize,_,_>(|scope| {

        // uses of `scope` to build dataflow

    });
});
Examples found in repository?
examples/capture_send.rs (lines 11-16)
5fn main() {
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let addr = format!("127.0.0.1:{}", 8000 + worker.index());
9        let send = TcpStream::connect(addr).unwrap();
10
11        worker.dataflow::<u64,_,_>(|scope|
12            (0..10u64)
13                .to_stream(scope)
14                .container::<Vec<_>>()
15                .capture_into(EventWriter::new(send))
16        );
17    }).unwrap();
18}
More examples
Hide additional examples
examples/unordered_input.rs (lines 6-10)
4fn main() {
5    timely::execute(Config::thread(), |worker| {
6        let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7            let (input, stream) = scope.new_unordered_input();
8            stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9            input
10        });
11
12        for round in 0..10 {
13            input.activate().session(&cap).give(round);
14            cap = cap.delayed(&(round + 1));
15            worker.step();
16        }
17    }).unwrap();
18}
examples/threadless.rs (lines 16-22)
5fn main() {
6
7    // create a naked single-threaded worker.
8    let allocator = timely::communication::allocator::Thread::default();
9    let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11    // create input and probe handles.
12    let mut input = InputHandle::new();
13    let probe = ProbeHandle::new();
14
15    // directly build a dataflow.
16    worker.dataflow(|scope| {
17        input
18            .to_stream(scope)
19            .container::<Vec<_>>()
20            .inspect(|x| println!("{:?}", x))
21            .probe_with(&probe);
22    });
23
24    // manage inputs.
25    for i in 0 .. 10 {
26        input.send(i);
27        input.advance_to(i);
28        while probe.less_than(input.time()) {
29            worker.step();
30        }
31    }
32}
examples/pingpong.rs (lines 12-22)
3fn main() {
4
5    let iterations = std::env::args().nth(1).unwrap().parse::<u64>().unwrap();
6    let elements = std::env::args().nth(2).unwrap().parse::<u64>().unwrap();
7
8    // initializes and runs a timely dataflow
9    timely::execute_from_args(std::env::args().skip(3), move |worker| {
10        let index = worker.index();
11        let peers = worker.peers();
12        worker.dataflow::<u64,_,_>(move |scope| {
13            let (helper, cycle) = scope.feedback(1);
14            (0 .. elements)
15                  .filter(move |&x| (x as usize) % peers == index)
16                  .to_stream(scope)
17                  .concat(cycle)
18                  .exchange(|&x| x)
19                  .map_in_place(|x| *x += 1)
20                  .branch_when(move |t| t < &iterations).1
21                  .connect_loop(helper);
22        });
23    }).unwrap();
24}
examples/capture_recv.rs (lines 21-25)
5fn main() {
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9
10        // create replayers from disjoint partition of source worker identifiers.
11        let replayers =
12        (0 .. source_peers)
13            .filter(|i| i % worker.peers() == worker.index())
14            .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
15            .collect::<Vec<_>>()
16            .into_iter()
17            .map(|l| l.incoming().next().unwrap().unwrap())
18            .map(EventReader::<_,Vec<u64>,_>::new)
19            .collect::<Vec<_>>();
20
21        worker.dataflow::<u64,_,_>(|scope| {
22            replayers
23                .replay_into(scope)
24                .inspect(|x| println!("replayed: {:?}", x));
25        })
26    }).unwrap(); // asserts error-free execution
27}
examples/rc.rs (lines 17-23)
10fn main() {
11    // initializes and runs a timely dataflow.
12    timely::execute_from_args(std::env::args(), |worker| {
13        // create a new input, exchange data, and inspect its output
14        let index = worker.index();
15        let mut input = InputHandle::new();
16        let probe = ProbeHandle::new();
17        worker.dataflow(|scope| {
18            scope.input_from(&mut input)
19                 .container::<Vec<_>>()
20                 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21                 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22                 .probe_with(&probe);
23        });
24
25        // introduce data and watch!
26        for round in 0..10 {
27            input.send(Test { _field: Rc::new(round) } );
28            input.advance_to(round + 1);
29            worker.step_while(|| probe.less_than(input.time()));
30        }
31    }).unwrap();
32}
Source

pub fn dataflow_named<T, R, F>(&mut self, name: &str, func: F) -> R
where T: Refines<()>, F: FnOnce(&mut Child<'_, Self, T>) -> R,

Construct a new dataflow with a (purely cosmetic) name.

§Examples
timely::execute_from_args(::std::env::args(), |worker| {

    // We must supply the timestamp type here, although
    // it would generally be determined by type inference.
    worker.dataflow_named::<usize,_,_>("Some Dataflow", |scope| {

        // uses of `scope` to build dataflow

    });
});
Source

pub fn dataflow_core<T, R, F, V>( &mut self, name: &str, logging: Option<TimelyLogger>, resources: V, func: F, ) -> R
where T: Refines<()>, F: FnOnce(&mut V, &mut Child<'_, Self, T>) -> R, V: Any + 'static,

Construct a new dataflow with specific configurations.

This method constructs a new dataflow, using a name, logger, and additional resources specified as argument. The name is cosmetic, the logger is used to handle events generated by the dataflow, and the additional resources are kept alive for as long as the dataflow is alive (use case: shared library bindings).

§Examples
timely::execute_from_args(::std::env::args(), |worker| {

    // We must supply the timestamp type here, although
    // it would generally be determined by type inference.
    worker.dataflow_core::<usize,_,_,_>(
        "dataflow X",           // Dataflow name
        None,                   // Optional logger
        37,                     // Any resources
        |resources, scope| {    // Closure

            // uses of `resources`, `scope`to build dataflow

        }
    );
});
Source

pub fn drop_dataflow(&mut self, dataflow_identifier: usize)

Drops an identified dataflow.

This method removes the identified dataflow, which will no longer be scheduled. Various other resources will be cleaned up, though the method is currently in public beta rather than expected to work. Please report all crashes and unmet expectations!

Source

pub fn next_dataflow_index(&self) -> usize

Returns the next index to be used for dataflow construction.

This identifier will appear in the address of contained operators, and can be used to drop the dataflow using self.drop_dataflow().

Source

pub fn installed_dataflows(&self) -> Vec<usize>

List the current dataflow indices.

Source

pub fn has_dataflows(&self) -> bool

Returns true if there is at least one dataflow under management.

Trait Implementations§

Source§

impl<A: Allocate> AsWorker for Worker<A>

Source§

fn config(&self) -> &Config

Returns the worker configuration parameters.
Source§

fn index(&self) -> usize

Index of the worker among its peers.
Source§

fn peers(&self) -> usize

Number of peer workers.
Source§

fn allocate<D: Exchangeable>( &mut self, identifier: usize, address: Rc<[usize]>, ) -> (Vec<Box<dyn Push<D>>>, Box<dyn Pull<D>>)

Allocates a new channel from a supplied identifier and address. Read more
Source§

fn pipeline<T: 'static>( &mut self, identifier: usize, address: Rc<[usize]>, ) -> (ThreadPusher<T>, ThreadPuller<T>)

Constructs a pipeline channel from the worker to itself. Read more
Source§

fn broadcast<T: Exchangeable + Clone>( &mut self, identifier: usize, address: Rc<[usize]>, ) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>)

Allocates a broadcast channel, where each pushed message is received by all.
Source§

fn new_identifier(&mut self) -> usize

Allocates a new worker-unique identifier.
Source§

fn peek_identifier(&self) -> usize

The next worker-unique identifier to be allocated.
Source§

fn log_register(&self) -> Option<RefMut<'_, Registry>>

Provides access to named logging streams.
Source§

fn logger_for<CB: ContainerBuilder>(&self, name: &str) -> Option<Logger<CB>>

Acquires a logger by name, if the log register exists and the name is registered. Read more
Source§

fn logging(&self) -> Option<TimelyLogger>

Provides access to the timely logging stream.
Source§

impl<A: Allocate> Clone for Worker<A>

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<A: Allocate> Scheduler for Worker<A>

Source§

fn activations(&self) -> Rc<RefCell<Activations>>

Provides a shared handle to the activation scheduler.
Source§

fn activator_for(&self, path: Rc<[usize]>) -> Activator

Constructs an Activator tied to the specified operator address.
Source§

fn sync_activator_for(&self, path: Vec<usize>) -> SyncActivator

Constructs a SyncActivator tied to the specified operator address.
Source§

impl<A: Allocate> ScopeParent for Worker<A>

Source§

type Timestamp = ()

The timestamp associated with data in this scope.

Auto Trait Implementations§

§

impl<A> Freeze for Worker<A>

§

impl<A> !RefUnwindSafe for Worker<A>

§

impl<A> !Send for Worker<A>

§

impl<A> !Sync for Worker<A>

§

impl<A> Unpin for Worker<A>

§

impl<A> UnsafeUnpin for Worker<A>

§

impl<A> !UnwindSafe for Worker<A>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.