Skip to main content

Handle

Struct Handle 

Source
pub struct Handle<T: Timestamp, CB: ContainerBuilder<Container: Clone>> { /* private fields */ }
Expand description

A handle to an input Stream, used to introduce data to a timely dataflow computation.

Implementations§

Source§

impl<T: Timestamp, C: Container + Clone> Handle<T, CapacityContainerBuilder<C>>

Source

pub fn new() -> Self

Allocates a new input handle, from which one can create timely streams.

§Examples
use timely::*;
use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Inspect};

// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {

    // add an input and base computation off of it
    let mut input = InputHandle::new();
    worker.dataflow(|scope| {
        scope.input_from(&mut input)
             .container::<Vec<_>>()
             .inspect(|x| println!("hello {:?}", x));
    });

    // introduce input, advance computation
    for round in 0..10 {
        input.send(round);
        input.advance_to(round + 1);
        worker.step();
    }
});
Examples found in repository?
examples/threadless.rs (line 12)
5fn main() {
6
7    // create a naked single-threaded worker.
8    let allocator = timely::communication::allocator::Thread::default();
9    let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11    // create input and probe handles.
12    let mut input = InputHandle::new();
13    let probe = ProbeHandle::new();
14
15    // directly build a dataflow.
16    worker.dataflow(|scope| {
17        input
18            .to_stream(scope)
19            .container::<Vec<_>>()
20            .inspect(|x| println!("{:?}", x))
21            .probe_with(&probe);
22    });
23
24    // manage inputs.
25    for i in 0 .. 10 {
26        input.send(i);
27        input.advance_to(i);
28        while probe.less_than(input.time()) {
29            worker.step();
30        }
31    }
32}
More examples
Hide additional examples
examples/rc.rs (line 15)
10fn main() {
11    // initializes and runs a timely dataflow.
12    timely::execute_from_args(std::env::args(), |worker| {
13        // create a new input, exchange data, and inspect its output
14        let index = worker.index();
15        let mut input = InputHandle::new();
16        let probe = ProbeHandle::new();
17        worker.dataflow(|scope| {
18            scope.input_from(&mut input)
19                 .container::<Vec<_>>()
20                 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21                 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22                 .probe_with(&probe);
23        });
24
25        // introduce data and watch!
26        for round in 0..10 {
27            input.send(Test { _field: Rc::new(round) } );
28            input.advance_to(round + 1);
29            worker.step_while(|| probe.less_than(input.time()));
30        }
31    }).unwrap();
32}
examples/hello.rs (line 9)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let index = worker.index();
9        let mut input = InputHandle::new();
10        let probe = ProbeHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        worker.dataflow(|scope| {
14            scope.input_from(&mut input)
15                 .container::<Vec<_>>()
16                 .exchange(|x| *x)
17                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18                 .probe_with(&probe);
19        });
20
21        // introduce data and watch!
22        for round in 0..10 {
23            if index == 0 {
24                input.send(round);
25            }
26            input.advance_to(round + 1);
27            while probe.less_than(input.time()) {
28                worker.step();
29            }
30        }
31    }).unwrap();
32}
examples/exchange.rs (line 10)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9        let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10        let mut input = InputHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        let probe = worker.dataflow(|scope|
14            scope
15                .input_from(&mut input)
16                .container::<Vec<_>>()
17                .exchange(|&x| x as u64)
18                .probe()
19                .0
20        );
21
22
23        let timer = std::time::Instant::now();
24
25        for round in 0 .. rounds {
26
27            for i in 0 .. batch {
28                input.send(i);
29            }
30            input.advance_to(round);
31
32            while probe.less_than(input.time()) {
33                worker.step();
34            }
35
36        }
37
38        let volume = (rounds * batch) as f64;
39        let elapsed = timer.elapsed();
40        let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42        println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44    }).unwrap();
45}
examples/unionfind.rs (line 20)
8fn main() {
9
10    // command-line args: numbers of nodes and edges in the random graph.
11    let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12    let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13    let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15    timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17        let index = worker.index();
18        let peers = worker.peers();
19
20        let mut input = InputHandle::new();
21        let probe = ProbeHandle::new();
22
23        worker.dataflow(|scope| {
24            scope.input_from(&mut input)
25                //  .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26                 .union_find()
27                 .exchange(|_| 0)
28                 .union_find()
29                 .probe_with(&probe);
30        });
31
32        // Generate roughly random data.
33        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34        let hasher = BuildHasherDefault::<DefaultHasher>::new();
35        let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36                                         hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38        for (edge, arc) in insert.take(edges / peers).enumerate() {
39            input.send(arc);
40            if edge % batch == (batch - 1) {
41                let next = input.epoch() + 1;
42                input.advance_to(next);
43                while probe.less_than(input.time()) {
44                    worker.step();
45                }
46            }
47        }
48
49    }).unwrap(); // asserts error-free execution;
50}
examples/distinct.rs (line 12)
8fn main() {
9    // initializes and runs a timely dataflow.
10    timely::execute_from_args(std::env::args(), |worker| {
11        let index = worker.index();
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        // create a new input, exchange data, and inspect its output
16        worker.dataflow::<usize,_,_>(|scope| {
17            let mut counts_by_time = HashMap::new();
18            scope.input_from(&mut input)
19                .unary(Exchange::new(|x| *x), "Distinct", move |_, _|
20                    move |input, output| {
21                        input.for_each_time(|time, data| {
22                            let counts =
23                            counts_by_time
24                                .entry(*time.time())
25                                .or_insert(HashMap::new());
26                            let mut session = output.session(&time);
27                            for data in data {
28                                for &datum in data.iter() {
29                                    let count = counts.entry(datum).or_insert(0);
30                                    if *count == 0 {
31                                        session.give(datum);
32                                    }
33                                    *count += 1;
34                                }
35                            }
36                        })
37                    })
38                .container::<Vec<_>>()
39                .inspect(move |x| println!("worker {}:\tvalue {}", index, x))
40                .probe_with(&probe);
41        });
42
43        // introduce data and watch!
44        for round in 0..1 {
45            if index == 0 {
46                [0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x));
47            } else if index == 1 {
48                [0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x));
49            }
50            input.advance_to(round + 1);
51            while probe.less_than(input.time()) {
52                worker.step();
53            }
54        }
55    }).unwrap();
56}
Source§

impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB>

Source

pub fn new_with_builder() -> Self

Allocates a new input handle, from which one can create timely streams.

§Examples
use timely::*;
use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Inspect};
use timely_container::CapacityContainerBuilder;

// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {

    // add an input and base computation off of it
    let mut input = InputHandle::<_, CapacityContainerBuilder<_>>::new_with_builder();
    worker.dataflow(|scope| {
        scope.input_from(&mut input)
             .container::<Vec<_>>()
             .inspect(|x| println!("hello {:?}", x));
    });

    // introduce input, advance computation
    for round in 0..10 {
        input.send(round);
        input.advance_to(round + 1);
        worker.step();
    }
});
Source

pub fn to_stream<G>(&mut self, scope: &mut G) -> Stream<G, CB::Container>
where T: TotalOrder, G: Scope<Timestamp = T>,

Creates an input stream from the handle in the supplied scope.

§Examples
use timely::*;
use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Inspect};

// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {

    // add an input and base computation off of it
    let mut input = InputHandle::new();
    worker.dataflow(|scope| {
        input.to_stream(scope)
             .container::<Vec<_>>()
             .inspect(|x| println!("hello {:?}", x));
    });

    // introduce input, advance computation
    for round in 0..10 {
        input.send(round);
        input.advance_to(round + 1);
        worker.step();
    }
});
Examples found in repository?
examples/threadless.rs (line 18)
5fn main() {
6
7    // create a naked single-threaded worker.
8    let allocator = timely::communication::allocator::Thread::default();
9    let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11    // create input and probe handles.
12    let mut input = InputHandle::new();
13    let probe = ProbeHandle::new();
14
15    // directly build a dataflow.
16    worker.dataflow(|scope| {
17        input
18            .to_stream(scope)
19            .container::<Vec<_>>()
20            .inspect(|x| println!("{:?}", x))
21            .probe_with(&probe);
22    });
23
24    // manage inputs.
25    for i in 0 .. 10 {
26        input.send(i);
27        input.advance_to(i);
28        while probe.less_than(input.time()) {
29            worker.step();
30        }
31    }
32}
More examples
Hide additional examples
examples/wordcount.rs (line 20)
8fn main() {
9    // initializes and runs a timely dataflow.
10    timely::execute_from_args(std::env::args(), |worker| {
11
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        // define a distribution function for strings.
16        let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64);
17
18        // create a new input, exchange data, and inspect its output
19        worker.dataflow::<usize,_,_>(|scope| {
20            input.to_stream(scope)
21                 .flat_map(|(text, diff): (String, i64)|
22                    text.split_whitespace()
23                        .map(move |word| (word.to_owned(), diff))
24                        .collect::<Vec<_>>()
25                 )
26                 .unary_frontier(exchange, "WordCount", |_capability, _info| {
27
28                    let mut queues = HashMap::new();
29                    let mut counts = HashMap::new();
30
31                    move |(input, frontier), output| {
32                        input.for_each_time(|time, data| {
33                            queues.entry(time.retain(output.output_index()))
34                                  .or_insert(Vec::new())
35                                  .extend(data.map(std::mem::take));
36                        });
37
38                        for (key, val) in queues.iter_mut() {
39                            if !frontier.less_equal(key.time()) {
40                                let mut session = output.session(key);
41                                for mut batch in val.drain(..) {
42                                    for (word, diff) in batch.drain(..) {
43                                        let entry = counts.entry(word.clone()).or_insert(0i64);
44                                        *entry += diff;
45                                        session.give((word, *entry));
46                                    }
47                                }
48                            }
49                        }
50
51                        queues.retain(|_key, val| !val.is_empty());
52                    }})
53                 .container::<Vec<_>>()
54                 .inspect(|x| println!("seen: {:?}", x))
55                 .probe_with(&probe);
56        });
57
58        // introduce data and watch!
59        for round in 0..10 {
60            input.send(("round".to_owned(), 1));
61            input.advance_to(round + 1);
62            while probe.less_than(input.time()) {
63                worker.step();
64            }
65        }
66    }).unwrap();
67}
examples/columnar.rs (line 41)
21fn main() {
22
23    type InnerContainer = <WordCount as columnar::Columnar>::Container;
24    type Container = Column<InnerContainer>;
25
26    use columnar::Len;
27
28    let config = timely::Config {
29        communication: timely::CommunicationConfig::ProcessBinary(3),
30        worker: timely::WorkerConfig::default(),
31    };
32
33    // initializes and runs a timely dataflow.
34    timely::execute(config, |worker| {
35        let mut input = <InputHandle<_, CapacityContainerBuilder<Container>>>::new();
36        let probe = ProbeHandle::new();
37
38        // create a new input, exchange data, and inspect its output
39        worker.dataflow::<usize, _, _>(|scope| {
40            input
41                .to_stream(scope)
42                .unary(
43                    Pipeline,
44                    "Split",
45                    |_cap, _info| {
46                        move |input, output| {
47                            input.for_each_time(|time, data| {
48                                let mut session = output.session(&time);
49                                for data in data {
50                                    for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
51                                        wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
52                                    }) {
53                                        session.give(wordcount);
54                                    }
55                                }
56                            });
57                        }
58                    },
59                )
60                .container::<Container>()
61                .unary_frontier(
62                    ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
63                    "WordCount",
64                    |_capability, _info| {
65                        let mut queues = HashMap::new();
66                        let mut counts = HashMap::new();
67
68                        move |(input, frontier), output| {
69                            input.for_each_time(|time, data| {
70                                queues
71                                    .entry(time.retain(output.output_index()))
72                                    .or_insert(Vec::new())
73                                    .extend(data.map(std::mem::take));
74
75                            });
76
77                            for (key, val) in queues.iter_mut() {
78                                if !frontier.less_equal(key.time()) {
79                                    let mut session = output.session(key);
80                                    for batch in val.drain(..) {
81                                        for wordcount in batch.borrow().into_index_iter() {
82                                            let total =
83                                            if let Some(count) = counts.get_mut(wordcount.text) {
84                                                *count += wordcount.diff;
85                                                *count
86                                            }
87                                            else {
88                                                counts.insert(wordcount.text.to_string(), *wordcount.diff);
89                                                *wordcount.diff
90                                            };
91                                            session.give(WordCountReference { text: wordcount.text, diff: total });
92                                        }
93                                    }
94                                }
95                            }
96
97                            queues.retain(|_key, val| !val.is_empty());
98                        }
99                    },
100                )
101                .container::<Container>()
102                .inspect_container(|x| {
103                    match x {
104                        Ok((time, data)) => {
105                            println!("seen at: {:?}\t{:?} records", time, data.record_count());
106                            for wc in data.borrow().into_index_iter() {
107                                println!("  {}: {}", wc.text, wc.diff);
108                            }
109                        },
110                        Err(frontier) => println!("frontier advanced to {:?}", frontier),
111                    }
112                })
113                .probe_with(&probe);
114        });
115
116        // introduce data and watch!
117        for round in 0..10 {
118            input.send(WordCountReference { text: "flat container", diff: 1 });
119            input.advance_to(round + 1);
120            while probe.less_than(input.time()) {
121                worker.step();
122            }
123        }
124    })
125    .unwrap();
126}
examples/pagerank.rs (line 18)
8fn main() {
9
10    timely::execute_from_args(std::env::args().skip(3), move |worker| {
11
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        worker.dataflow::<usize,_,_>(|scope| {
16
17            // create a new input, into which we can push edge changes.
18            let edge_stream = input.to_stream(scope);
19
20            // create a new feedback stream, which will be changes to ranks.
21            let (handle, rank_stream) = scope.feedback(1);
22
23            // bring edges and ranks together!
24            let changes = edge_stream.binary_frontier(
25                rank_stream,
26                Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
27                Exchange::new(|x: &(usize, i64)| x.0 as u64),
28                "PageRank",
29                |_capability, _info| {
30
31                    // where we stash out-of-order data.
32                    let mut edge_stash: HashMap<_, Vec<_>> = HashMap::new();
33                    let mut rank_stash: HashMap<_, Vec<_>> = HashMap::new();
34
35                    // lists of edges, ranks, and changes.
36                    let mut edges = Vec::new();
37                    let mut ranks = Vec::new();
38                    let mut diffs = Vec::new(); // for received but un-acted upon deltas.
39                    let mut delta = Vec::new();
40
41                    let timer = ::std::time::Instant::now();
42
43                    move |(input1, frontier1), (input2, frontier2), output| {
44
45                        // hold on to edge changes until it is time.
46                        input1.for_each_time(|time, data| {
47                            let entry = edge_stash.entry(time.retain(output.output_index())).or_default();
48                            data.for_each(|data| entry.append(data));
49                        });
50
51                        // hold on to rank changes until it is time.
52                        input2.for_each_time(|time, data| {
53                            let entry = rank_stash.entry(time.retain(output.output_index())).or_default();
54                            data.for_each(|data| entry.append(data));
55                        });
56
57                        let frontiers = &[frontier1, frontier2];
58
59                        for (time, edge_changes) in edge_stash.iter_mut() {
60                            if frontiers.iter().all(|f| !f.less_equal(time)) {
61
62                                let mut session = output.session(time);
63
64                                compact(edge_changes);
65
66                                for ((src, dst), diff) in edge_changes.drain(..) {
67
68                                    // 0. ensure enough state allocated
69                                    while edges.len() <= src { edges.push(Vec::new()); }
70                                    while ranks.len() <= src { ranks.push(1_000); }
71                                    while diffs.len() <= src { diffs.push(0); }
72
73                                    // 1. subtract previous distribution.
74                                    allocate(ranks[src], &edges[src][..], &mut delta);
75                                    for x in delta.iter_mut() { x.1 *= -1; }
76
77                                    // 2. update edges.
78                                    edges[src].push((dst, diff));
79                                    compact(&mut edges[src]);
80
81                                    // 3. re-distribute allocations.
82                                    allocate(ranks[src], &edges[src][..], &mut delta);
83
84                                    // 4. compact down and send cumulative changes.
85                                    compact(&mut delta);
86                                    for (dst, diff) in delta.drain(..) {
87                                        session.give((dst, diff));
88                                    }
89                                }
90                            }
91                        }
92
93                        edge_stash.retain(|_key, val| !val.is_empty());
94
95                        for (time, rank_changes) in rank_stash.iter_mut() {
96                            if frontiers.iter().all(|f| !f.less_equal(time)) {
97
98                                let mut session = output.session(time);
99
100                                compact(rank_changes);
101
102                                let mut cnt = 0;
103                                let mut sum = 0;
104                                let mut max = 0;
105
106                                for (src, diff) in rank_changes.drain(..) {
107
108                                    cnt += 1;
109                                    sum += diff.abs();
110                                    max = if max < diff.abs() { diff.abs() } else { max };
111
112                                    // 0. ensure enough state allocated
113                                    while edges.len() <= src { edges.push(Vec::new()); }
114                                    while ranks.len() <= src { ranks.push(1_000); }
115                                    while diffs.len() <= src { diffs.push(0); }
116
117                                    // 1. subtract previous distribution.
118                                    allocate(ranks[src], &edges[src][..], &mut delta);
119                                    for x in delta.iter_mut() { x.1 *= -1; }
120
121                                    // 2. update ranks.
122                                    diffs[src] += diff;
123                                    if diffs[src].abs() >= 6 {
124                                        ranks[src] += diffs[src];
125                                        diffs[src] = 0;
126                                    }
127
128                                    // 3. re-distribute allocations.
129                                    allocate(ranks[src], &edges[src][..], &mut delta);
130
131                                    // 4. compact down and send cumulative changes.
132                                    compact(&mut delta);
133                                    for (dst, diff) in delta.drain(..) {
134                                        session.give((dst, diff));
135                                    }
136                                }
137
138                                println!("{:?}:\t{:?}\t{}\t{}\t{}", timer.elapsed(), time.time(), cnt, sum, max);
139                            }
140                        }
141
142                        rank_stash.retain(|_key, val| !val.is_empty());
143
144                    }
145                }
146            );
147
148            changes
149                .probe_with(&probe)
150                .connect_loop(handle);
151
152        });
153
154        let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
155        let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
156
157        let index = worker.index();
158        // Generate roughly random data.
159        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
160        let hasher = BuildHasherDefault::<DefaultHasher>::new();
161        let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
162                                             hasher.hash_one(&(i,index,1)) as usize % nodes));
163        let remove = insert.clone();
164
165        for ins in (&mut insert).take(edges / worker.peers()) {
166            input.send((ins, 1));
167        }
168
169        input.advance_to(1);
170
171        while probe.less_than(input.time()) {
172            worker.step();
173        }
174
175        for (ins, del) in insert.zip(remove).take(1000) {
176            input.send((ins, 1));
177            input.send((del,-1));
178            input.advance_to(input.time() + 1);
179            while probe.less_than(input.time()) {
180                worker.step();
181            }
182        }
183
184    }).unwrap(); // asserts error-free execution;
185}
Source

pub fn flush(&mut self)

Flush all contents and distribute to downstream operators.

Source

pub fn send_batch(&mut self, buffer: &mut CB::Container)

Sends a batch of records into the corresponding timely dataflow Stream, at the current epoch.

This method flushes single elements previously sent with send, to keep the insertion order.

§Examples
use timely::*;
use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, InspectCore};

// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {

    // add an input and base computation off of it
    let mut input = InputHandle::new();
    worker.dataflow(|scope| {
        scope.input_from(&mut input)
             .inspect_container(|x| println!("hello {:?}", x));
    });

    // introduce input, advance computation
    for round in 0..10 {
        input.send_batch(&mut vec![format!("{}", round)]);
        input.advance_to(round + 1);
        worker.step();
    }
});
Source

pub fn advance_to(&mut self, next: T)

Advances the current epoch to next.

This method allows timely dataflow to issue progress notifications as it can now determine that this input can no longer produce data at earlier timestamps.

Examples found in repository?
examples/threadless.rs (line 27)
5fn main() {
6
7    // create a naked single-threaded worker.
8    let allocator = timely::communication::allocator::Thread::default();
9    let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11    // create input and probe handles.
12    let mut input = InputHandle::new();
13    let probe = ProbeHandle::new();
14
15    // directly build a dataflow.
16    worker.dataflow(|scope| {
17        input
18            .to_stream(scope)
19            .container::<Vec<_>>()
20            .inspect(|x| println!("{:?}", x))
21            .probe_with(&probe);
22    });
23
24    // manage inputs.
25    for i in 0 .. 10 {
26        input.send(i);
27        input.advance_to(i);
28        while probe.less_than(input.time()) {
29            worker.step();
30        }
31    }
32}
More examples
Hide additional examples
examples/rc.rs (line 28)
10fn main() {
11    // initializes and runs a timely dataflow.
12    timely::execute_from_args(std::env::args(), |worker| {
13        // create a new input, exchange data, and inspect its output
14        let index = worker.index();
15        let mut input = InputHandle::new();
16        let probe = ProbeHandle::new();
17        worker.dataflow(|scope| {
18            scope.input_from(&mut input)
19                 .container::<Vec<_>>()
20                 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21                 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22                 .probe_with(&probe);
23        });
24
25        // introduce data and watch!
26        for round in 0..10 {
27            input.send(Test { _field: Rc::new(round) } );
28            input.advance_to(round + 1);
29            worker.step_while(|| probe.less_than(input.time()));
30        }
31    }).unwrap();
32}
examples/hello.rs (line 26)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let index = worker.index();
9        let mut input = InputHandle::new();
10        let probe = ProbeHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        worker.dataflow(|scope| {
14            scope.input_from(&mut input)
15                 .container::<Vec<_>>()
16                 .exchange(|x| *x)
17                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18                 .probe_with(&probe);
19        });
20
21        // introduce data and watch!
22        for round in 0..10 {
23            if index == 0 {
24                input.send(round);
25            }
26            input.advance_to(round + 1);
27            while probe.less_than(input.time()) {
28                worker.step();
29            }
30        }
31    }).unwrap();
32}
examples/exchange.rs (line 30)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9        let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10        let mut input = InputHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        let probe = worker.dataflow(|scope|
14            scope
15                .input_from(&mut input)
16                .container::<Vec<_>>()
17                .exchange(|&x| x as u64)
18                .probe()
19                .0
20        );
21
22
23        let timer = std::time::Instant::now();
24
25        for round in 0 .. rounds {
26
27            for i in 0 .. batch {
28                input.send(i);
29            }
30            input.advance_to(round);
31
32            while probe.less_than(input.time()) {
33                worker.step();
34            }
35
36        }
37
38        let volume = (rounds * batch) as f64;
39        let elapsed = timer.elapsed();
40        let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42        println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44    }).unwrap();
45}
examples/event_driven.rs (line 40)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let timer = std::time::Instant::now();
9
10        let mut args = std::env::args();
11        args.next();
12
13        let dataflows = args.next().unwrap().parse::<usize>().unwrap();
14        let length = args.next().unwrap().parse::<usize>().unwrap();
15        let record = args.next() == Some("record".to_string());
16
17        let mut inputs = Vec::new();
18        let mut probes = Vec::new();
19
20        // create a new input, exchange data, and inspect its output
21        for _dataflow in 0 .. dataflows {
22            worker.dataflow(|scope| {
23                let (input, mut stream) = scope.new_input();
24                for _step in 0 .. length {
25                    stream = stream.map(|x: ()| x);
26                }
27                let (probe, _stream) = stream.probe();
28                inputs.push(input);
29                probes.push(probe);
30            });
31        }
32
33        println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
34
35        for round in 0 .. {
36            let dataflow = round % dataflows;
37            if record {
38                inputs[dataflow].send(());
39            }
40            inputs[dataflow].advance_to(round);
41            let mut steps = 0;
42            while probes[dataflow].less_than(&round) {
43                worker.step();
44                steps += 1;
45            }
46            println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
47        }
48
49    }).unwrap();
50}
examples/unionfind.rs (line 42)
8fn main() {
9
10    // command-line args: numbers of nodes and edges in the random graph.
11    let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12    let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13    let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15    timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17        let index = worker.index();
18        let peers = worker.peers();
19
20        let mut input = InputHandle::new();
21        let probe = ProbeHandle::new();
22
23        worker.dataflow(|scope| {
24            scope.input_from(&mut input)
25                //  .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26                 .union_find()
27                 .exchange(|_| 0)
28                 .union_find()
29                 .probe_with(&probe);
30        });
31
32        // Generate roughly random data.
33        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34        let hasher = BuildHasherDefault::<DefaultHasher>::new();
35        let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36                                         hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38        for (edge, arc) in insert.take(edges / peers).enumerate() {
39            input.send(arc);
40            if edge % batch == (batch - 1) {
41                let next = input.epoch() + 1;
42                input.advance_to(next);
43                while probe.less_than(input.time()) {
44                    worker.step();
45                }
46            }
47        }
48
49    }).unwrap(); // asserts error-free execution;
50}
Source

pub fn close(self)

Closes the input.

This method allows timely dataflow to issue all progress notifications blocked by this input and to begin to shut down operators, as this input can no longer produce data.

Source

pub fn epoch(&self) -> &T

Reports the current epoch.

Examples found in repository?
examples/unionfind.rs (line 41)
8fn main() {
9
10    // command-line args: numbers of nodes and edges in the random graph.
11    let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12    let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13    let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15    timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17        let index = worker.index();
18        let peers = worker.peers();
19
20        let mut input = InputHandle::new();
21        let probe = ProbeHandle::new();
22
23        worker.dataflow(|scope| {
24            scope.input_from(&mut input)
25                //  .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26                 .union_find()
27                 .exchange(|_| 0)
28                 .union_find()
29                 .probe_with(&probe);
30        });
31
32        // Generate roughly random data.
33        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34        let hasher = BuildHasherDefault::<DefaultHasher>::new();
35        let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36                                         hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38        for (edge, arc) in insert.take(edges / peers).enumerate() {
39            input.send(arc);
40            if edge % batch == (batch - 1) {
41                let next = input.epoch() + 1;
42                input.advance_to(next);
43                while probe.less_than(input.time()) {
44                    worker.step();
45                }
46            }
47        }
48
49    }).unwrap(); // asserts error-free execution;
50}
More examples
Hide additional examples
examples/hashjoin.rs (line 95)
8fn main() {
9
10    // command-line args: numbers of nodes and edges in the random graph.
11    let keys: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
12    let vals: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13    let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15    timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17        let index = worker.index();
18        let peers = worker.peers();
19
20        let mut input1 = InputHandle::new();
21        let mut input2 = InputHandle::new();
22        let probe = ProbeHandle::new();
23
24        worker.dataflow(|scope| {
25
26            let stream1 = scope.input_from(&mut input1);
27            let stream2 = scope.input_from(&mut input2);
28
29            let exchange1 = Exchange::new(|x: &(u64, u64)| x.0);
30            let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);
31
32            stream1
33                .binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
34
35                    let mut map1 = HashMap::<u64, Vec<u64>>::new();
36                    let mut map2 = HashMap::<u64, Vec<u64>>::new();
37
38                    move |input1, input2, output| {
39
40                        // Drain first input, check second map, update first map.
41                        input1.for_each_time(|time, data| {
42                            let mut session = output.session(&time);
43                            for (key, val1) in data.flat_map(|d| d.drain(..)) {
44                                if let Some(values) = map2.get(&key) {
45                                    for val2 in values.iter() {
46                                        session.give((val1, *val2));
47                                    }
48                                }
49
50                                map1.entry(key).or_default().push(val1);
51                            }
52                        });
53
54                        // Drain second input, check first map, update second map.
55                        input2.for_each_time(|time, data| {
56                            let mut session = output.session(&time);
57                            for (key, val2) in data.flat_map(|d| d.drain(..)) {
58                                if let Some(values) = map1.get(&key) {
59                                    for val1 in values.iter() {
60                                        session.give((*val1, val2));
61                                    }
62                                }
63
64                                map2.entry(key).or_default().push(val2);
65                            }
66                        });
67                    }
68                })
69                .container::<Vec<_>>()
70                .probe_with(&probe);
71        });
72
73        // Generate roughly random data.
74        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
75        let hasher = BuildHasherDefault::<DefaultHasher>::new();
76        let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) % keys,
77                                             hasher.hash_one(&(i,index,1)) % keys,
78                                             hasher.hash_one(&(i,index,2)) % keys,
79                                             hasher.hash_one(&(i,index,3)) % keys));
80
81        let timer = std::time::Instant::now();
82
83        let mut sent = 0;
84        while sent < (vals / peers) {
85
86            // Send some amount of data, no more than `batch`.
87            let to_send = std::cmp::min(batch, vals/peers - sent);
88            for (src0, dst0, src1, dst1) in (&mut insert).take(to_send) {
89                input1.send((src0, dst0));
90                input2.send((src1, dst1));
91            }
92            sent += to_send;
93
94            // Advance input, iterate until data cleared.
95            let next = input1.epoch() + 1;
96            input1.advance_to(next);
97            input2.advance_to(next);
98            while probe.less_than(input1.time()) {
99                worker.step();
100            }
101
102            println!("{:?}\tworker {} batch complete", timer.elapsed(), index)
103        }
104
105    }).unwrap(); // asserts error-free execution;
106}
Source

pub fn time(&self) -> &T

Reports the current timestamp.

Examples found in repository?
examples/threadless.rs (line 28)
5fn main() {
6
7    // create a naked single-threaded worker.
8    let allocator = timely::communication::allocator::Thread::default();
9    let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11    // create input and probe handles.
12    let mut input = InputHandle::new();
13    let probe = ProbeHandle::new();
14
15    // directly build a dataflow.
16    worker.dataflow(|scope| {
17        input
18            .to_stream(scope)
19            .container::<Vec<_>>()
20            .inspect(|x| println!("{:?}", x))
21            .probe_with(&probe);
22    });
23
24    // manage inputs.
25    for i in 0 .. 10 {
26        input.send(i);
27        input.advance_to(i);
28        while probe.less_than(input.time()) {
29            worker.step();
30        }
31    }
32}
More examples
Hide additional examples
examples/rc.rs (line 29)
10fn main() {
11    // initializes and runs a timely dataflow.
12    timely::execute_from_args(std::env::args(), |worker| {
13        // create a new input, exchange data, and inspect its output
14        let index = worker.index();
15        let mut input = InputHandle::new();
16        let probe = ProbeHandle::new();
17        worker.dataflow(|scope| {
18            scope.input_from(&mut input)
19                 .container::<Vec<_>>()
20                 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21                 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22                 .probe_with(&probe);
23        });
24
25        // introduce data and watch!
26        for round in 0..10 {
27            input.send(Test { _field: Rc::new(round) } );
28            input.advance_to(round + 1);
29            worker.step_while(|| probe.less_than(input.time()));
30        }
31    }).unwrap();
32}
examples/hello.rs (line 27)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let index = worker.index();
9        let mut input = InputHandle::new();
10        let probe = ProbeHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        worker.dataflow(|scope| {
14            scope.input_from(&mut input)
15                 .container::<Vec<_>>()
16                 .exchange(|x| *x)
17                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18                 .probe_with(&probe);
19        });
20
21        // introduce data and watch!
22        for round in 0..10 {
23            if index == 0 {
24                input.send(round);
25            }
26            input.advance_to(round + 1);
27            while probe.less_than(input.time()) {
28                worker.step();
29            }
30        }
31    }).unwrap();
32}
examples/exchange.rs (line 32)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9        let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10        let mut input = InputHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        let probe = worker.dataflow(|scope|
14            scope
15                .input_from(&mut input)
16                .container::<Vec<_>>()
17                .exchange(|&x| x as u64)
18                .probe()
19                .0
20        );
21
22
23        let timer = std::time::Instant::now();
24
25        for round in 0 .. rounds {
26
27            for i in 0 .. batch {
28                input.send(i);
29            }
30            input.advance_to(round);
31
32            while probe.less_than(input.time()) {
33                worker.step();
34            }
35
36        }
37
38        let volume = (rounds * batch) as f64;
39        let elapsed = timer.elapsed();
40        let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42        println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44    }).unwrap();
45}
examples/unionfind.rs (line 43)
8fn main() {
9
10    // command-line args: numbers of nodes and edges in the random graph.
11    let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12    let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13    let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15    timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17        let index = worker.index();
18        let peers = worker.peers();
19
20        let mut input = InputHandle::new();
21        let probe = ProbeHandle::new();
22
23        worker.dataflow(|scope| {
24            scope.input_from(&mut input)
25                //  .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26                 .union_find()
27                 .exchange(|_| 0)
28                 .union_find()
29                 .probe_with(&probe);
30        });
31
32        // Generate roughly random data.
33        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34        let hasher = BuildHasherDefault::<DefaultHasher>::new();
35        let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36                                         hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38        for (edge, arc) in insert.take(edges / peers).enumerate() {
39            input.send(arc);
40            if edge % batch == (batch - 1) {
41                let next = input.epoch() + 1;
42                input.advance_to(next);
43                while probe.less_than(input.time()) {
44                    worker.step();
45                }
46            }
47        }
48
49    }).unwrap(); // asserts error-free execution;
50}
examples/distinct.rs (line 51)
8fn main() {
9    // initializes and runs a timely dataflow.
10    timely::execute_from_args(std::env::args(), |worker| {
11        let index = worker.index();
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        // create a new input, exchange data, and inspect its output
16        worker.dataflow::<usize,_,_>(|scope| {
17            let mut counts_by_time = HashMap::new();
18            scope.input_from(&mut input)
19                .unary(Exchange::new(|x| *x), "Distinct", move |_, _|
20                    move |input, output| {
21                        input.for_each_time(|time, data| {
22                            let counts =
23                            counts_by_time
24                                .entry(*time.time())
25                                .or_insert(HashMap::new());
26                            let mut session = output.session(&time);
27                            for data in data {
28                                for &datum in data.iter() {
29                                    let count = counts.entry(datum).or_insert(0);
30                                    if *count == 0 {
31                                        session.give(datum);
32                                    }
33                                    *count += 1;
34                                }
35                            }
36                        })
37                    })
38                .container::<Vec<_>>()
39                .inspect(move |x| println!("worker {}:\tvalue {}", index, x))
40                .probe_with(&probe);
41        });
42
43        // introduce data and watch!
44        for round in 0..1 {
45            if index == 0 {
46                [0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x));
47            } else if index == 1 {
48                [0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x));
49            }
50            input.advance_to(round + 1);
51            while probe.less_than(input.time()) {
52                worker.step();
53            }
54        }
55    }).unwrap();
56}
Source§

impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB>

Source

pub fn send<D>(&mut self, data: D)
where CB: PushInto<D>,

Sends one record into the corresponding timely dataflow Stream, at the current epoch.

§Examples
use timely::*;
use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Inspect};

// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {

    // add an input and base computation off of it
    let mut input = InputHandle::new();
    worker.dataflow(|scope| {
        scope.input_from(&mut input)
             .container::<Vec<_>>()
             .inspect(|x| println!("hello {:?}", x));
    });

    // introduce input, advance computation
    for round in 0..10 {
        input.send(round);
        input.advance_to(round + 1);
        worker.step();
    }
});
Examples found in repository?
examples/threadless.rs (line 26)
5fn main() {
6
7    // create a naked single-threaded worker.
8    let allocator = timely::communication::allocator::Thread::default();
9    let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11    // create input and probe handles.
12    let mut input = InputHandle::new();
13    let probe = ProbeHandle::new();
14
15    // directly build a dataflow.
16    worker.dataflow(|scope| {
17        input
18            .to_stream(scope)
19            .container::<Vec<_>>()
20            .inspect(|x| println!("{:?}", x))
21            .probe_with(&probe);
22    });
23
24    // manage inputs.
25    for i in 0 .. 10 {
26        input.send(i);
27        input.advance_to(i);
28        while probe.less_than(input.time()) {
29            worker.step();
30        }
31    }
32}
More examples
Hide additional examples
examples/rc.rs (line 27)
10fn main() {
11    // initializes and runs a timely dataflow.
12    timely::execute_from_args(std::env::args(), |worker| {
13        // create a new input, exchange data, and inspect its output
14        let index = worker.index();
15        let mut input = InputHandle::new();
16        let probe = ProbeHandle::new();
17        worker.dataflow(|scope| {
18            scope.input_from(&mut input)
19                 .container::<Vec<_>>()
20                 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21                 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22                 .probe_with(&probe);
23        });
24
25        // introduce data and watch!
26        for round in 0..10 {
27            input.send(Test { _field: Rc::new(round) } );
28            input.advance_to(round + 1);
29            worker.step_while(|| probe.less_than(input.time()));
30        }
31    }).unwrap();
32}
examples/hello.rs (line 24)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let index = worker.index();
9        let mut input = InputHandle::new();
10        let probe = ProbeHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        worker.dataflow(|scope| {
14            scope.input_from(&mut input)
15                 .container::<Vec<_>>()
16                 .exchange(|x| *x)
17                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18                 .probe_with(&probe);
19        });
20
21        // introduce data and watch!
22        for round in 0..10 {
23            if index == 0 {
24                input.send(round);
25            }
26            input.advance_to(round + 1);
27            while probe.less_than(input.time()) {
28                worker.step();
29            }
30        }
31    }).unwrap();
32}
examples/exchange.rs (line 28)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9        let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10        let mut input = InputHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        let probe = worker.dataflow(|scope|
14            scope
15                .input_from(&mut input)
16                .container::<Vec<_>>()
17                .exchange(|&x| x as u64)
18                .probe()
19                .0
20        );
21
22
23        let timer = std::time::Instant::now();
24
25        for round in 0 .. rounds {
26
27            for i in 0 .. batch {
28                input.send(i);
29            }
30            input.advance_to(round);
31
32            while probe.less_than(input.time()) {
33                worker.step();
34            }
35
36        }
37
38        let volume = (rounds * batch) as f64;
39        let elapsed = timer.elapsed();
40        let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42        println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44    }).unwrap();
45}
examples/event_driven.rs (line 38)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let timer = std::time::Instant::now();
9
10        let mut args = std::env::args();
11        args.next();
12
13        let dataflows = args.next().unwrap().parse::<usize>().unwrap();
14        let length = args.next().unwrap().parse::<usize>().unwrap();
15        let record = args.next() == Some("record".to_string());
16
17        let mut inputs = Vec::new();
18        let mut probes = Vec::new();
19
20        // create a new input, exchange data, and inspect its output
21        for _dataflow in 0 .. dataflows {
22            worker.dataflow(|scope| {
23                let (input, mut stream) = scope.new_input();
24                for _step in 0 .. length {
25                    stream = stream.map(|x: ()| x);
26                }
27                let (probe, _stream) = stream.probe();
28                inputs.push(input);
29                probes.push(probe);
30            });
31        }
32
33        println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
34
35        for round in 0 .. {
36            let dataflow = round % dataflows;
37            if record {
38                inputs[dataflow].send(());
39            }
40            inputs[dataflow].advance_to(round);
41            let mut steps = 0;
42            while probes[dataflow].less_than(&round) {
43                worker.step();
44                steps += 1;
45            }
46            println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
47        }
48
49    }).unwrap();
50}
examples/unionfind.rs (line 39)
8fn main() {
9
10    // command-line args: numbers of nodes and edges in the random graph.
11    let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12    let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13    let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15    timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17        let index = worker.index();
18        let peers = worker.peers();
19
20        let mut input = InputHandle::new();
21        let probe = ProbeHandle::new();
22
23        worker.dataflow(|scope| {
24            scope.input_from(&mut input)
25                //  .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26                 .union_find()
27                 .exchange(|_| 0)
28                 .union_find()
29                 .probe_with(&probe);
30        });
31
32        // Generate roughly random data.
33        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34        let hasher = BuildHasherDefault::<DefaultHasher>::new();
35        let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36                                         hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38        for (edge, arc) in insert.take(edges / peers).enumerate() {
39            input.send(arc);
40            if edge % batch == (batch - 1) {
41                let next = input.epoch() + 1;
42                input.advance_to(next);
43                while probe.less_than(input.time()) {
44                    worker.step();
45                }
46            }
47        }
48
49    }).unwrap(); // asserts error-free execution;
50}

Trait Implementations§

Source§

impl<T: Debug + Timestamp, CB: Debug + ContainerBuilder<Container: Clone>> Debug for Handle<T, CB>
where CB::Container: Debug,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Default for Handle<T, CB>

Source§

fn default() -> Self

Returns the “default value” for a type. Read more
Source§

impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Drop for Handle<T, CB>

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl<T, CB, D> PushInto<D> for Handle<T, CB>
where T: Timestamp, CB: ContainerBuilder<Container: Clone> + PushInto<D>,

Source§

fn push_into(&mut self, item: D)

Push item into self.

Auto Trait Implementations§

§

impl<T, CB> Freeze for Handle<T, CB>
where <CB as ContainerBuilder>::Container: Sized + Freeze, CB: Freeze, T: Freeze,

§

impl<T, CB> !RefUnwindSafe for Handle<T, CB>

§

impl<T, CB> !Send for Handle<T, CB>

§

impl<T, CB> !Sync for Handle<T, CB>

§

impl<T, CB> Unpin for Handle<T, CB>
where <CB as ContainerBuilder>::Container: Sized + Unpin, CB: Unpin, T: Unpin,

§

impl<T, CB> UnsafeUnpin for Handle<T, CB>

§

impl<T, CB> !UnwindSafe for Handle<T, CB>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.