pub struct InputCapability<T: Timestamp> { /* private fields */ }Expand description
An capability of an input port.
Holding onto this capability will implicitly hold on to a capability for all the outputs ports this input is connected to, after the connection summaries have been applied.
This input capability supplies a retain_for_output(self) method which consumes the input
capability and turns it into a Capability for a specific output port.
Implementations§
Source§impl<T: Timestamp> InputCapability<T>
impl<T: Timestamp> InputCapability<T>
Sourcepub fn time(&self) -> &T
pub fn time(&self) -> &T
The timestamp associated with this capability.
Examples found in repository?
8fn main() {
9 // initializes and runs a timely dataflow.
10 timely::execute_from_args(std::env::args(), |worker| {
11 let index = worker.index();
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // create a new input, exchange data, and inspect its output
16 worker.dataflow::<usize,_,_>(|scope| {
17 let mut counts_by_time = HashMap::new();
18 scope.input_from(&mut input)
19 .unary(Exchange::new(|x| *x), "Distinct", move |_, _|
20 move |input, output| {
21 input.for_each_time(|time, data| {
22 let counts =
23 counts_by_time
24 .entry(*time.time())
25 .or_insert(HashMap::new());
26 let mut session = output.session(&time);
27 for data in data {
28 for &datum in data.iter() {
29 let count = counts.entry(datum).or_insert(0);
30 if *count == 0 {
31 session.give(datum);
32 }
33 *count += 1;
34 }
35 }
36 })
37 })
38 .container::<Vec<_>>()
39 .inspect(move |x| println!("worker {}:\tvalue {}", index, x))
40 .probe_with(&probe);
41 });
42
43 // introduce data and watch!
44 for round in 0..1 {
45 if index == 0 {
46 [0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x));
47 } else if index == 1 {
48 [0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x));
49 }
50 input.advance_to(round + 1);
51 while probe.less_than(input.time()) {
52 worker.step();
53 }
54 }
55 }).unwrap();
56}More examples
7fn main() {
8
9 // command-line args: numbers of nodes and edges in the random graph.
10 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
11 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
12
13 // let logging = ::timely::logging::to_tcp_socket();
14 timely::execute_from_args(std::env::args().skip(3), move |worker| {
15
16 let index = worker.index();
17 let peers = worker.peers();
18
19 // pending edges and node updates.
20 let mut edge_list = Vec::new();
21 let mut node_lists = HashMap::new();
22
23 // graph data; offsets into targets.
24 let mut offsets = Vec::new();
25 let mut targets = Vec::new();
26
27 // holds the bfs parent of each node, or u32::MAX if unset.
28 let mut done = vec![u32::MAX; 1 + (nodes / peers)];
29
30 let start = std::time::Instant::now();
31
32 worker.dataflow::<usize,_,_>(move |scope| {
33
34 // generate part of a random graph.
35 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
36 let hasher = BuildHasherDefault::<DefaultHasher>::new();
37 let graph =
38 (0..edges/peers)
39 .map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
40 hasher.hash_one(&(i,index,1)) as usize % nodes))
41 .map(|(src,dst)| (src as u32, dst as u32))
42 .to_stream(scope)
43 .container::<Vec<_>>();
44
45 // define a loop variable, for the (node, worker) pairs.
46 let (handle, stream) = scope.feedback(1usize);
47
48 // use the stream of edges
49 graph.binary_notify(
50 stream,
51 Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
52 Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
53 "BFS",
54 vec![],
55 move |input1, input2, output, notify| {
56
57 // receive edges, start to sort them
58 input1.for_each_time(|time, data| {
59 notify.notify_at(time.retain(output.output_index()));
60 edge_list.extend(data.map(std::mem::take));
61 });
62
63 // receive (node, worker) pairs, note any new ones.
64 input2.for_each_time(|time, data| {
65 node_lists.entry(*time.time())
66 .or_insert_with(|| {
67 notify.notify_at(time.retain(output.output_index()));
68 Vec::new()
69 })
70 .extend(data.map(std::mem::take));
71 });
72
73 notify.for_each(|time, _num, _notify| {
74
75 // maybe process the graph
76 if *time == 0 {
77
78 // print some diagnostic timing information
79 if index == 0 { println!("{:?}:\tsorting", start.elapsed()); }
80
81 // sort the edges (previously: radix sorted).
82 edge_list.sort();
83
84 let mut count = 0;
85 for buffer in &edge_list { count += buffer.len(); }
86
87 // allocate sufficient memory, to avoid resizing.
88 offsets = Vec::with_capacity(1 + (nodes / peers));
89 targets = Vec::with_capacity(count);
90
91 // construct the graph
92 offsets.push(0);
93 let mut prev_node = 0;
94 for buffer in edge_list.drain(..) {
95 for (node, edge) in buffer {
96 let temp = node / peers as u32;
97 while prev_node < temp {
98 prev_node += 1;
99 offsets.push(targets.len() as u32)
100 }
101 targets.push(edge);
102 }
103 }
104 while offsets.len() < offsets.capacity() {
105 offsets.push(targets.len() as u32);
106 }
107 }
108
109 // print some diagnostic timing information
110 if index == 0 { println!("{:?}:\ttime: {:?}", start.elapsed(), time.time()); }
111
112 if let Some(mut todo) = node_lists.remove(&time) {
113 let mut session = output.session(&time);
114
115 // we could sort these, or not (previously: radix sorted).
116 // todo.sort();
117
118 for buffer in todo.drain(..) {
119 for (node, prev) in buffer {
120 let temp = (node as usize) / peers;
121 if done[temp] == u32::MAX {
122 done[temp] = prev;
123 let lower = offsets[temp] as usize;
124 let upper = offsets[temp + 1] as usize;
125 for &target in &targets[lower..upper] {
126 session.give((target, node));
127 }
128 }
129 }
130 }
131 }
132 });
133 }
134 )
135 .concat((0..1).map(|x| (x,x)).to_stream(scope))
136 .connect_loop(handle);
137 });
138 }).unwrap(); // asserts error-free execution;
139}Sourcepub fn delayed(&self, new_time: &T, output_port: usize) -> Capability<T>
pub fn delayed(&self, new_time: &T, output_port: usize) -> Capability<T>
Delays capability for a specific output port.
Makes a new capability for a timestamp new_time greater or equal to the timestamp of
the source capability (self).
This method panics if self.time is not less or equal to new_time.
Sourcepub fn retain(&self, output_port: usize) -> Capability<T>
pub fn retain(&self, output_port: usize) -> Capability<T>
Transforms to an owned capability for a specific output port.
This method produces an owned capability which must be dropped to release the capability. Users should take care that these capabilities are only stored for as long as they are required, as failing to drop them may result in livelock.
This method panics if the timestamp summary to output_port strictly advances the time.
Examples found in repository?
8fn main() {
9 // initializes and runs a timely dataflow.
10 timely::execute_from_args(std::env::args(), |worker| {
11
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // define a distribution function for strings.
16 let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64);
17
18 // create a new input, exchange data, and inspect its output
19 worker.dataflow::<usize,_,_>(|scope| {
20 input.to_stream(scope)
21 .flat_map(|(text, diff): (String, i64)|
22 text.split_whitespace()
23 .map(move |word| (word.to_owned(), diff))
24 .collect::<Vec<_>>()
25 )
26 .unary_frontier(exchange, "WordCount", |_capability, _info| {
27
28 let mut queues = HashMap::new();
29 let mut counts = HashMap::new();
30
31 move |(input, frontier), output| {
32 input.for_each_time(|time, data| {
33 queues.entry(time.retain(output.output_index()))
34 .or_insert(Vec::new())
35 .extend(data.map(std::mem::take));
36 });
37
38 for (key, val) in queues.iter_mut() {
39 if !frontier.less_equal(key.time()) {
40 let mut session = output.session(key);
41 for mut batch in val.drain(..) {
42 for (word, diff) in batch.drain(..) {
43 let entry = counts.entry(word.clone()).or_insert(0i64);
44 *entry += diff;
45 session.give((word, *entry));
46 }
47 }
48 }
49 }
50
51 queues.retain(|_key, val| !val.is_empty());
52 }})
53 .container::<Vec<_>>()
54 .inspect(|x| println!("seen: {:?}", x))
55 .probe_with(&probe);
56 });
57
58 // introduce data and watch!
59 for round in 0..10 {
60 input.send(("round".to_owned(), 1));
61 input.advance_to(round + 1);
62 while probe.less_than(input.time()) {
63 worker.step();
64 }
65 }
66 }).unwrap();
67}More examples
21fn main() {
22
23 type InnerContainer = <WordCount as columnar::Columnar>::Container;
24 type Container = Column<InnerContainer>;
25
26 use columnar::Len;
27
28 let config = timely::Config {
29 communication: timely::CommunicationConfig::ProcessBinary(3),
30 worker: timely::WorkerConfig::default(),
31 };
32
33 // initializes and runs a timely dataflow.
34 timely::execute(config, |worker| {
35 let mut input = <InputHandle<_, CapacityContainerBuilder<Container>>>::new();
36 let probe = ProbeHandle::new();
37
38 // create a new input, exchange data, and inspect its output
39 worker.dataflow::<usize, _, _>(|scope| {
40 input
41 .to_stream(scope)
42 .unary(
43 Pipeline,
44 "Split",
45 |_cap, _info| {
46 move |input, output| {
47 input.for_each_time(|time, data| {
48 let mut session = output.session(&time);
49 for data in data {
50 for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
51 wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
52 }) {
53 session.give(wordcount);
54 }
55 }
56 });
57 }
58 },
59 )
60 .container::<Container>()
61 .unary_frontier(
62 ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
63 "WordCount",
64 |_capability, _info| {
65 let mut queues = HashMap::new();
66 let mut counts = HashMap::new();
67
68 move |(input, frontier), output| {
69 input.for_each_time(|time, data| {
70 queues
71 .entry(time.retain(output.output_index()))
72 .or_insert(Vec::new())
73 .extend(data.map(std::mem::take));
74
75 });
76
77 for (key, val) in queues.iter_mut() {
78 if !frontier.less_equal(key.time()) {
79 let mut session = output.session(key);
80 for batch in val.drain(..) {
81 for wordcount in batch.borrow().into_index_iter() {
82 let total =
83 if let Some(count) = counts.get_mut(wordcount.text) {
84 *count += wordcount.diff;
85 *count
86 }
87 else {
88 counts.insert(wordcount.text.to_string(), *wordcount.diff);
89 *wordcount.diff
90 };
91 session.give(WordCountReference { text: wordcount.text, diff: total });
92 }
93 }
94 }
95 }
96
97 queues.retain(|_key, val| !val.is_empty());
98 }
99 },
100 )
101 .container::<Container>()
102 .inspect_container(|x| {
103 match x {
104 Ok((time, data)) => {
105 println!("seen at: {:?}\t{:?} records", time, data.record_count());
106 for wc in data.borrow().into_index_iter() {
107 println!(" {}: {}", wc.text, wc.diff);
108 }
109 },
110 Err(frontier) => println!("frontier advanced to {:?}", frontier),
111 }
112 })
113 .probe_with(&probe);
114 });
115
116 // introduce data and watch!
117 for round in 0..10 {
118 input.send(WordCountReference { text: "flat container", diff: 1 });
119 input.advance_to(round + 1);
120 while probe.less_than(input.time()) {
121 worker.step();
122 }
123 }
124 })
125 .unwrap();
126}7fn main() {
8
9 // command-line args: numbers of nodes and edges in the random graph.
10 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
11 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
12
13 // let logging = ::timely::logging::to_tcp_socket();
14 timely::execute_from_args(std::env::args().skip(3), move |worker| {
15
16 let index = worker.index();
17 let peers = worker.peers();
18
19 // pending edges and node updates.
20 let mut edge_list = Vec::new();
21 let mut node_lists = HashMap::new();
22
23 // graph data; offsets into targets.
24 let mut offsets = Vec::new();
25 let mut targets = Vec::new();
26
27 // holds the bfs parent of each node, or u32::MAX if unset.
28 let mut done = vec![u32::MAX; 1 + (nodes / peers)];
29
30 let start = std::time::Instant::now();
31
32 worker.dataflow::<usize,_,_>(move |scope| {
33
34 // generate part of a random graph.
35 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
36 let hasher = BuildHasherDefault::<DefaultHasher>::new();
37 let graph =
38 (0..edges/peers)
39 .map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
40 hasher.hash_one(&(i,index,1)) as usize % nodes))
41 .map(|(src,dst)| (src as u32, dst as u32))
42 .to_stream(scope)
43 .container::<Vec<_>>();
44
45 // define a loop variable, for the (node, worker) pairs.
46 let (handle, stream) = scope.feedback(1usize);
47
48 // use the stream of edges
49 graph.binary_notify(
50 stream,
51 Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
52 Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
53 "BFS",
54 vec![],
55 move |input1, input2, output, notify| {
56
57 // receive edges, start to sort them
58 input1.for_each_time(|time, data| {
59 notify.notify_at(time.retain(output.output_index()));
60 edge_list.extend(data.map(std::mem::take));
61 });
62
63 // receive (node, worker) pairs, note any new ones.
64 input2.for_each_time(|time, data| {
65 node_lists.entry(*time.time())
66 .or_insert_with(|| {
67 notify.notify_at(time.retain(output.output_index()));
68 Vec::new()
69 })
70 .extend(data.map(std::mem::take));
71 });
72
73 notify.for_each(|time, _num, _notify| {
74
75 // maybe process the graph
76 if *time == 0 {
77
78 // print some diagnostic timing information
79 if index == 0 { println!("{:?}:\tsorting", start.elapsed()); }
80
81 // sort the edges (previously: radix sorted).
82 edge_list.sort();
83
84 let mut count = 0;
85 for buffer in &edge_list { count += buffer.len(); }
86
87 // allocate sufficient memory, to avoid resizing.
88 offsets = Vec::with_capacity(1 + (nodes / peers));
89 targets = Vec::with_capacity(count);
90
91 // construct the graph
92 offsets.push(0);
93 let mut prev_node = 0;
94 for buffer in edge_list.drain(..) {
95 for (node, edge) in buffer {
96 let temp = node / peers as u32;
97 while prev_node < temp {
98 prev_node += 1;
99 offsets.push(targets.len() as u32)
100 }
101 targets.push(edge);
102 }
103 }
104 while offsets.len() < offsets.capacity() {
105 offsets.push(targets.len() as u32);
106 }
107 }
108
109 // print some diagnostic timing information
110 if index == 0 { println!("{:?}:\ttime: {:?}", start.elapsed(), time.time()); }
111
112 if let Some(mut todo) = node_lists.remove(&time) {
113 let mut session = output.session(&time);
114
115 // we could sort these, or not (previously: radix sorted).
116 // todo.sort();
117
118 for buffer in todo.drain(..) {
119 for (node, prev) in buffer {
120 let temp = (node as usize) / peers;
121 if done[temp] == u32::MAX {
122 done[temp] = prev;
123 let lower = offsets[temp] as usize;
124 let upper = offsets[temp + 1] as usize;
125 for &target in &targets[lower..upper] {
126 session.give((target, node));
127 }
128 }
129 }
130 }
131 }
132 });
133 }
134 )
135 .concat((0..1).map(|x| (x,x)).to_stream(scope))
136 .connect_loop(handle);
137 });
138 }).unwrap(); // asserts error-free execution;
139}8fn main() {
9
10 timely::execute_from_args(std::env::args().skip(3), move |worker| {
11
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 worker.dataflow::<usize,_,_>(|scope| {
16
17 // create a new input, into which we can push edge changes.
18 let edge_stream = input.to_stream(scope);
19
20 // create a new feedback stream, which will be changes to ranks.
21 let (handle, rank_stream) = scope.feedback(1);
22
23 // bring edges and ranks together!
24 let changes = edge_stream.binary_frontier(
25 rank_stream,
26 Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
27 Exchange::new(|x: &(usize, i64)| x.0 as u64),
28 "PageRank",
29 |_capability, _info| {
30
31 // where we stash out-of-order data.
32 let mut edge_stash: HashMap<_, Vec<_>> = HashMap::new();
33 let mut rank_stash: HashMap<_, Vec<_>> = HashMap::new();
34
35 // lists of edges, ranks, and changes.
36 let mut edges = Vec::new();
37 let mut ranks = Vec::new();
38 let mut diffs = Vec::new(); // for received but un-acted upon deltas.
39 let mut delta = Vec::new();
40
41 let timer = ::std::time::Instant::now();
42
43 move |(input1, frontier1), (input2, frontier2), output| {
44
45 // hold on to edge changes until it is time.
46 input1.for_each_time(|time, data| {
47 let entry = edge_stash.entry(time.retain(output.output_index())).or_default();
48 data.for_each(|data| entry.append(data));
49 });
50
51 // hold on to rank changes until it is time.
52 input2.for_each_time(|time, data| {
53 let entry = rank_stash.entry(time.retain(output.output_index())).or_default();
54 data.for_each(|data| entry.append(data));
55 });
56
57 let frontiers = &[frontier1, frontier2];
58
59 for (time, edge_changes) in edge_stash.iter_mut() {
60 if frontiers.iter().all(|f| !f.less_equal(time)) {
61
62 let mut session = output.session(time);
63
64 compact(edge_changes);
65
66 for ((src, dst), diff) in edge_changes.drain(..) {
67
68 // 0. ensure enough state allocated
69 while edges.len() <= src { edges.push(Vec::new()); }
70 while ranks.len() <= src { ranks.push(1_000); }
71 while diffs.len() <= src { diffs.push(0); }
72
73 // 1. subtract previous distribution.
74 allocate(ranks[src], &edges[src][..], &mut delta);
75 for x in delta.iter_mut() { x.1 *= -1; }
76
77 // 2. update edges.
78 edges[src].push((dst, diff));
79 compact(&mut edges[src]);
80
81 // 3. re-distribute allocations.
82 allocate(ranks[src], &edges[src][..], &mut delta);
83
84 // 4. compact down and send cumulative changes.
85 compact(&mut delta);
86 for (dst, diff) in delta.drain(..) {
87 session.give((dst, diff));
88 }
89 }
90 }
91 }
92
93 edge_stash.retain(|_key, val| !val.is_empty());
94
95 for (time, rank_changes) in rank_stash.iter_mut() {
96 if frontiers.iter().all(|f| !f.less_equal(time)) {
97
98 let mut session = output.session(time);
99
100 compact(rank_changes);
101
102 let mut cnt = 0;
103 let mut sum = 0;
104 let mut max = 0;
105
106 for (src, diff) in rank_changes.drain(..) {
107
108 cnt += 1;
109 sum += diff.abs();
110 max = if max < diff.abs() { diff.abs() } else { max };
111
112 // 0. ensure enough state allocated
113 while edges.len() <= src { edges.push(Vec::new()); }
114 while ranks.len() <= src { ranks.push(1_000); }
115 while diffs.len() <= src { diffs.push(0); }
116
117 // 1. subtract previous distribution.
118 allocate(ranks[src], &edges[src][..], &mut delta);
119 for x in delta.iter_mut() { x.1 *= -1; }
120
121 // 2. update ranks.
122 diffs[src] += diff;
123 if diffs[src].abs() >= 6 {
124 ranks[src] += diffs[src];
125 diffs[src] = 0;
126 }
127
128 // 3. re-distribute allocations.
129 allocate(ranks[src], &edges[src][..], &mut delta);
130
131 // 4. compact down and send cumulative changes.
132 compact(&mut delta);
133 for (dst, diff) in delta.drain(..) {
134 session.give((dst, diff));
135 }
136 }
137
138 println!("{:?}:\t{:?}\t{}\t{}\t{}", timer.elapsed(), time.time(), cnt, sum, max);
139 }
140 }
141
142 rank_stash.retain(|_key, val| !val.is_empty());
143
144 }
145 }
146 );
147
148 changes
149 .probe_with(&probe)
150 .connect_loop(handle);
151
152 });
153
154 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
155 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
156
157 let index = worker.index();
158 // Generate roughly random data.
159 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
160 let hasher = BuildHasherDefault::<DefaultHasher>::new();
161 let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
162 hasher.hash_one(&(i,index,1)) as usize % nodes));
163 let remove = insert.clone();
164
165 for ins in (&mut insert).take(edges / worker.peers()) {
166 input.send((ins, 1));
167 }
168
169 input.advance_to(1);
170
171 while probe.less_than(input.time()) {
172 worker.step();
173 }
174
175 for (ins, del) in insert.zip(remove).take(1000) {
176 input.send((ins, 1));
177 input.send((del,-1));
178 input.advance_to(input.time() + 1);
179 while probe.less_than(input.time()) {
180 worker.step();
181 }
182 }
183
184 }).unwrap(); // asserts error-free execution;
185}Trait Implementations§
Source§impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T>
impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T>
Source§impl<T: Timestamp> Debug for InputCapability<T>
impl<T: Timestamp> Debug for InputCapability<T>
Auto Trait Implementations§
impl<T> Freeze for InputCapability<T>where
T: Freeze,
impl<T> !RefUnwindSafe for InputCapability<T>
impl<T> !Send for InputCapability<T>
impl<T> !Sync for InputCapability<T>
impl<T> Unpin for InputCapability<T>where
T: Unpin,
impl<T> UnsafeUnpin for InputCapability<T>where
T: UnsafeUnpin,
impl<T> !UnwindSafe for InputCapability<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more