pub struct Handle<T: Timestamp, CB: ContainerBuilder<Container: Clone>> { /* private fields */ }Expand description
A handle to an input Stream, used to introduce data to a timely dataflow computation.
Implementations§
Source§impl<T: Timestamp, C: Container + Clone> Handle<T, CapacityContainerBuilder<C>>
impl<T: Timestamp, C: Container + Clone> Handle<T, CapacityContainerBuilder<C>>
Sourcepub fn new() -> Self
pub fn new() -> Self
Allocates a new input handle, from which one can create timely streams.
§Examples
use timely::*;
use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Inspect};
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = InputHandle::new();
worker.dataflow(|scope| {
scope.input_from(&mut input)
.container::<Vec<_>>()
.inspect(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step();
}
});Examples found in repository?
5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::allocator::Thread::default();
9 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11 // create input and probe handles.
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // directly build a dataflow.
16 worker.dataflow(|scope| {
17 input
18 .to_stream(scope)
19 .container::<Vec<_>>()
20 .inspect(|x| println!("{:?}", x))
21 .probe_with(&probe);
22 });
23
24 // manage inputs.
25 for i in 0 .. 10 {
26 input.send(i);
27 input.advance_to(i);
28 while probe.less_than(input.time()) {
29 worker.step();
30 }
31 }
32}More examples
10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let index = worker.index();
9 let mut input = InputHandle::new();
10 let probe = ProbeHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 worker.dataflow(|scope| {
14 scope.input_from(&mut input)
15 .container::<Vec<_>>()
16 .exchange(|x| *x)
17 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18 .probe_with(&probe);
19 });
20
21 // introduce data and watch!
22 for round in 0..10 {
23 if index == 0 {
24 input.send(round);
25 }
26 input.advance_to(round + 1);
27 while probe.less_than(input.time()) {
28 worker.step();
29 }
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9 let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10 let mut input = InputHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 let probe = worker.dataflow(|scope|
14 scope
15 .input_from(&mut input)
16 .container::<Vec<_>>()
17 .exchange(|&x| x as u64)
18 .probe()
19 .0
20 );
21
22
23 let timer = std::time::Instant::now();
24
25 for round in 0 .. rounds {
26
27 for i in 0 .. batch {
28 input.send(i);
29 }
30 input.advance_to(round);
31
32 while probe.less_than(input.time()) {
33 worker.step();
34 }
35
36 }
37
38 let volume = (rounds * batch) as f64;
39 let elapsed = timer.elapsed();
40 let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42 println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44 }).unwrap();
45}8fn main() {
9
10 // command-line args: numbers of nodes and edges in the random graph.
11 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input = InputHandle::new();
21 let probe = ProbeHandle::new();
22
23 worker.dataflow(|scope| {
24 scope.input_from(&mut input)
25 // .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26 .union_find()
27 .exchange(|_| 0)
28 .union_find()
29 .probe_with(&probe);
30 });
31
32 // Generate roughly random data.
33 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34 let hasher = BuildHasherDefault::<DefaultHasher>::new();
35 let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36 hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38 for (edge, arc) in insert.take(edges / peers).enumerate() {
39 input.send(arc);
40 if edge % batch == (batch - 1) {
41 let next = input.epoch() + 1;
42 input.advance_to(next);
43 while probe.less_than(input.time()) {
44 worker.step();
45 }
46 }
47 }
48
49 }).unwrap(); // asserts error-free execution;
50}8fn main() {
9 // initializes and runs a timely dataflow.
10 timely::execute_from_args(std::env::args(), |worker| {
11 let index = worker.index();
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // create a new input, exchange data, and inspect its output
16 worker.dataflow::<usize,_,_>(|scope| {
17 let mut counts_by_time = HashMap::new();
18 scope.input_from(&mut input)
19 .unary(Exchange::new(|x| *x), "Distinct", move |_, _|
20 move |input, output| {
21 input.for_each_time(|time, data| {
22 let counts =
23 counts_by_time
24 .entry(*time.time())
25 .or_insert(HashMap::new());
26 let mut session = output.session(&time);
27 for data in data {
28 for &datum in data.iter() {
29 let count = counts.entry(datum).or_insert(0);
30 if *count == 0 {
31 session.give(datum);
32 }
33 *count += 1;
34 }
35 }
36 })
37 })
38 .container::<Vec<_>>()
39 .inspect(move |x| println!("worker {}:\tvalue {}", index, x))
40 .probe_with(&probe);
41 });
42
43 // introduce data and watch!
44 for round in 0..1 {
45 if index == 0 {
46 [0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x));
47 } else if index == 1 {
48 [0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x));
49 }
50 input.advance_to(round + 1);
51 while probe.less_than(input.time()) {
52 worker.step();
53 }
54 }
55 }).unwrap();
56}Source§impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB>
impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB>
Sourcepub fn new_with_builder() -> Self
pub fn new_with_builder() -> Self
Allocates a new input handle, from which one can create timely streams.
§Examples
use timely::*;
use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Inspect};
use timely_container::CapacityContainerBuilder;
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = InputHandle::<_, CapacityContainerBuilder<_>>::new_with_builder();
worker.dataflow(|scope| {
scope.input_from(&mut input)
.container::<Vec<_>>()
.inspect(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step();
}
});Sourcepub fn to_stream<G>(&mut self, scope: &mut G) -> Stream<G, CB::Container>where
T: TotalOrder,
G: Scope<Timestamp = T>,
pub fn to_stream<G>(&mut self, scope: &mut G) -> Stream<G, CB::Container>where
T: TotalOrder,
G: Scope<Timestamp = T>,
Creates an input stream from the handle in the supplied scope.
§Examples
use timely::*;
use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Inspect};
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = InputHandle::new();
worker.dataflow(|scope| {
input.to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step();
}
});Examples found in repository?
5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::allocator::Thread::default();
9 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11 // create input and probe handles.
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // directly build a dataflow.
16 worker.dataflow(|scope| {
17 input
18 .to_stream(scope)
19 .container::<Vec<_>>()
20 .inspect(|x| println!("{:?}", x))
21 .probe_with(&probe);
22 });
23
24 // manage inputs.
25 for i in 0 .. 10 {
26 input.send(i);
27 input.advance_to(i);
28 while probe.less_than(input.time()) {
29 worker.step();
30 }
31 }
32}More examples
8fn main() {
9 // initializes and runs a timely dataflow.
10 timely::execute_from_args(std::env::args(), |worker| {
11
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // define a distribution function for strings.
16 let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64);
17
18 // create a new input, exchange data, and inspect its output
19 worker.dataflow::<usize,_,_>(|scope| {
20 input.to_stream(scope)
21 .flat_map(|(text, diff): (String, i64)|
22 text.split_whitespace()
23 .map(move |word| (word.to_owned(), diff))
24 .collect::<Vec<_>>()
25 )
26 .unary_frontier(exchange, "WordCount", |_capability, _info| {
27
28 let mut queues = HashMap::new();
29 let mut counts = HashMap::new();
30
31 move |(input, frontier), output| {
32 input.for_each_time(|time, data| {
33 queues.entry(time.retain(output.output_index()))
34 .or_insert(Vec::new())
35 .extend(data.map(std::mem::take));
36 });
37
38 for (key, val) in queues.iter_mut() {
39 if !frontier.less_equal(key.time()) {
40 let mut session = output.session(key);
41 for mut batch in val.drain(..) {
42 for (word, diff) in batch.drain(..) {
43 let entry = counts.entry(word.clone()).or_insert(0i64);
44 *entry += diff;
45 session.give((word, *entry));
46 }
47 }
48 }
49 }
50
51 queues.retain(|_key, val| !val.is_empty());
52 }})
53 .container::<Vec<_>>()
54 .inspect(|x| println!("seen: {:?}", x))
55 .probe_with(&probe);
56 });
57
58 // introduce data and watch!
59 for round in 0..10 {
60 input.send(("round".to_owned(), 1));
61 input.advance_to(round + 1);
62 while probe.less_than(input.time()) {
63 worker.step();
64 }
65 }
66 }).unwrap();
67}21fn main() {
22
23 type InnerContainer = <WordCount as columnar::Columnar>::Container;
24 type Container = Column<InnerContainer>;
25
26 use columnar::Len;
27
28 let config = timely::Config {
29 communication: timely::CommunicationConfig::ProcessBinary(3),
30 worker: timely::WorkerConfig::default(),
31 };
32
33 // initializes and runs a timely dataflow.
34 timely::execute(config, |worker| {
35 let mut input = <InputHandle<_, CapacityContainerBuilder<Container>>>::new();
36 let probe = ProbeHandle::new();
37
38 // create a new input, exchange data, and inspect its output
39 worker.dataflow::<usize, _, _>(|scope| {
40 input
41 .to_stream(scope)
42 .unary(
43 Pipeline,
44 "Split",
45 |_cap, _info| {
46 move |input, output| {
47 input.for_each_time(|time, data| {
48 let mut session = output.session(&time);
49 for data in data {
50 for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
51 wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
52 }) {
53 session.give(wordcount);
54 }
55 }
56 });
57 }
58 },
59 )
60 .container::<Container>()
61 .unary_frontier(
62 ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
63 "WordCount",
64 |_capability, _info| {
65 let mut queues = HashMap::new();
66 let mut counts = HashMap::new();
67
68 move |(input, frontier), output| {
69 input.for_each_time(|time, data| {
70 queues
71 .entry(time.retain(output.output_index()))
72 .or_insert(Vec::new())
73 .extend(data.map(std::mem::take));
74
75 });
76
77 for (key, val) in queues.iter_mut() {
78 if !frontier.less_equal(key.time()) {
79 let mut session = output.session(key);
80 for batch in val.drain(..) {
81 for wordcount in batch.borrow().into_index_iter() {
82 let total =
83 if let Some(count) = counts.get_mut(wordcount.text) {
84 *count += wordcount.diff;
85 *count
86 }
87 else {
88 counts.insert(wordcount.text.to_string(), *wordcount.diff);
89 *wordcount.diff
90 };
91 session.give(WordCountReference { text: wordcount.text, diff: total });
92 }
93 }
94 }
95 }
96
97 queues.retain(|_key, val| !val.is_empty());
98 }
99 },
100 )
101 .container::<Container>()
102 .inspect_container(|x| {
103 match x {
104 Ok((time, data)) => {
105 println!("seen at: {:?}\t{:?} records", time, data.record_count());
106 for wc in data.borrow().into_index_iter() {
107 println!(" {}: {}", wc.text, wc.diff);
108 }
109 },
110 Err(frontier) => println!("frontier advanced to {:?}", frontier),
111 }
112 })
113 .probe_with(&probe);
114 });
115
116 // introduce data and watch!
117 for round in 0..10 {
118 input.send(WordCountReference { text: "flat container", diff: 1 });
119 input.advance_to(round + 1);
120 while probe.less_than(input.time()) {
121 worker.step();
122 }
123 }
124 })
125 .unwrap();
126}8fn main() {
9
10 timely::execute_from_args(std::env::args().skip(3), move |worker| {
11
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 worker.dataflow::<usize,_,_>(|scope| {
16
17 // create a new input, into which we can push edge changes.
18 let edge_stream = input.to_stream(scope);
19
20 // create a new feedback stream, which will be changes to ranks.
21 let (handle, rank_stream) = scope.feedback(1);
22
23 // bring edges and ranks together!
24 let changes = edge_stream.binary_frontier(
25 rank_stream,
26 Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
27 Exchange::new(|x: &(usize, i64)| x.0 as u64),
28 "PageRank",
29 |_capability, _info| {
30
31 // where we stash out-of-order data.
32 let mut edge_stash: HashMap<_, Vec<_>> = HashMap::new();
33 let mut rank_stash: HashMap<_, Vec<_>> = HashMap::new();
34
35 // lists of edges, ranks, and changes.
36 let mut edges = Vec::new();
37 let mut ranks = Vec::new();
38 let mut diffs = Vec::new(); // for received but un-acted upon deltas.
39 let mut delta = Vec::new();
40
41 let timer = ::std::time::Instant::now();
42
43 move |(input1, frontier1), (input2, frontier2), output| {
44
45 // hold on to edge changes until it is time.
46 input1.for_each_time(|time, data| {
47 let entry = edge_stash.entry(time.retain(output.output_index())).or_default();
48 data.for_each(|data| entry.append(data));
49 });
50
51 // hold on to rank changes until it is time.
52 input2.for_each_time(|time, data| {
53 let entry = rank_stash.entry(time.retain(output.output_index())).or_default();
54 data.for_each(|data| entry.append(data));
55 });
56
57 let frontiers = &[frontier1, frontier2];
58
59 for (time, edge_changes) in edge_stash.iter_mut() {
60 if frontiers.iter().all(|f| !f.less_equal(time)) {
61
62 let mut session = output.session(time);
63
64 compact(edge_changes);
65
66 for ((src, dst), diff) in edge_changes.drain(..) {
67
68 // 0. ensure enough state allocated
69 while edges.len() <= src { edges.push(Vec::new()); }
70 while ranks.len() <= src { ranks.push(1_000); }
71 while diffs.len() <= src { diffs.push(0); }
72
73 // 1. subtract previous distribution.
74 allocate(ranks[src], &edges[src][..], &mut delta);
75 for x in delta.iter_mut() { x.1 *= -1; }
76
77 // 2. update edges.
78 edges[src].push((dst, diff));
79 compact(&mut edges[src]);
80
81 // 3. re-distribute allocations.
82 allocate(ranks[src], &edges[src][..], &mut delta);
83
84 // 4. compact down and send cumulative changes.
85 compact(&mut delta);
86 for (dst, diff) in delta.drain(..) {
87 session.give((dst, diff));
88 }
89 }
90 }
91 }
92
93 edge_stash.retain(|_key, val| !val.is_empty());
94
95 for (time, rank_changes) in rank_stash.iter_mut() {
96 if frontiers.iter().all(|f| !f.less_equal(time)) {
97
98 let mut session = output.session(time);
99
100 compact(rank_changes);
101
102 let mut cnt = 0;
103 let mut sum = 0;
104 let mut max = 0;
105
106 for (src, diff) in rank_changes.drain(..) {
107
108 cnt += 1;
109 sum += diff.abs();
110 max = if max < diff.abs() { diff.abs() } else { max };
111
112 // 0. ensure enough state allocated
113 while edges.len() <= src { edges.push(Vec::new()); }
114 while ranks.len() <= src { ranks.push(1_000); }
115 while diffs.len() <= src { diffs.push(0); }
116
117 // 1. subtract previous distribution.
118 allocate(ranks[src], &edges[src][..], &mut delta);
119 for x in delta.iter_mut() { x.1 *= -1; }
120
121 // 2. update ranks.
122 diffs[src] += diff;
123 if diffs[src].abs() >= 6 {
124 ranks[src] += diffs[src];
125 diffs[src] = 0;
126 }
127
128 // 3. re-distribute allocations.
129 allocate(ranks[src], &edges[src][..], &mut delta);
130
131 // 4. compact down and send cumulative changes.
132 compact(&mut delta);
133 for (dst, diff) in delta.drain(..) {
134 session.give((dst, diff));
135 }
136 }
137
138 println!("{:?}:\t{:?}\t{}\t{}\t{}", timer.elapsed(), time.time(), cnt, sum, max);
139 }
140 }
141
142 rank_stash.retain(|_key, val| !val.is_empty());
143
144 }
145 }
146 );
147
148 changes
149 .probe_with(&probe)
150 .connect_loop(handle);
151
152 });
153
154 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
155 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
156
157 let index = worker.index();
158 // Generate roughly random data.
159 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
160 let hasher = BuildHasherDefault::<DefaultHasher>::new();
161 let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
162 hasher.hash_one(&(i,index,1)) as usize % nodes));
163 let remove = insert.clone();
164
165 for ins in (&mut insert).take(edges / worker.peers()) {
166 input.send((ins, 1));
167 }
168
169 input.advance_to(1);
170
171 while probe.less_than(input.time()) {
172 worker.step();
173 }
174
175 for (ins, del) in insert.zip(remove).take(1000) {
176 input.send((ins, 1));
177 input.send((del,-1));
178 input.advance_to(input.time() + 1);
179 while probe.less_than(input.time()) {
180 worker.step();
181 }
182 }
183
184 }).unwrap(); // asserts error-free execution;
185}Sourcepub fn send_batch(&mut self, buffer: &mut CB::Container)
pub fn send_batch(&mut self, buffer: &mut CB::Container)
Sends a batch of records into the corresponding timely dataflow Stream, at the current epoch.
This method flushes single elements previously sent with send, to keep the insertion order.
§Examples
use timely::*;
use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, InspectCore};
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = InputHandle::new();
worker.dataflow(|scope| {
scope.input_from(&mut input)
.inspect_container(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send_batch(&mut vec![format!("{}", round)]);
input.advance_to(round + 1);
worker.step();
}
});Sourcepub fn advance_to(&mut self, next: T)
pub fn advance_to(&mut self, next: T)
Advances the current epoch to next.
This method allows timely dataflow to issue progress notifications as it can now determine that this input can no longer produce data at earlier timestamps.
Examples found in repository?
5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::allocator::Thread::default();
9 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11 // create input and probe handles.
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // directly build a dataflow.
16 worker.dataflow(|scope| {
17 input
18 .to_stream(scope)
19 .container::<Vec<_>>()
20 .inspect(|x| println!("{:?}", x))
21 .probe_with(&probe);
22 });
23
24 // manage inputs.
25 for i in 0 .. 10 {
26 input.send(i);
27 input.advance_to(i);
28 while probe.less_than(input.time()) {
29 worker.step();
30 }
31 }
32}More examples
10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let index = worker.index();
9 let mut input = InputHandle::new();
10 let probe = ProbeHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 worker.dataflow(|scope| {
14 scope.input_from(&mut input)
15 .container::<Vec<_>>()
16 .exchange(|x| *x)
17 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18 .probe_with(&probe);
19 });
20
21 // introduce data and watch!
22 for round in 0..10 {
23 if index == 0 {
24 input.send(round);
25 }
26 input.advance_to(round + 1);
27 while probe.less_than(input.time()) {
28 worker.step();
29 }
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9 let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10 let mut input = InputHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 let probe = worker.dataflow(|scope|
14 scope
15 .input_from(&mut input)
16 .container::<Vec<_>>()
17 .exchange(|&x| x as u64)
18 .probe()
19 .0
20 );
21
22
23 let timer = std::time::Instant::now();
24
25 for round in 0 .. rounds {
26
27 for i in 0 .. batch {
28 input.send(i);
29 }
30 input.advance_to(round);
31
32 while probe.less_than(input.time()) {
33 worker.step();
34 }
35
36 }
37
38 let volume = (rounds * batch) as f64;
39 let elapsed = timer.elapsed();
40 let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42 println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44 }).unwrap();
45}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let timer = std::time::Instant::now();
9
10 let mut args = std::env::args();
11 args.next();
12
13 let dataflows = args.next().unwrap().parse::<usize>().unwrap();
14 let length = args.next().unwrap().parse::<usize>().unwrap();
15 let record = args.next() == Some("record".to_string());
16
17 let mut inputs = Vec::new();
18 let mut probes = Vec::new();
19
20 // create a new input, exchange data, and inspect its output
21 for _dataflow in 0 .. dataflows {
22 worker.dataflow(|scope| {
23 let (input, mut stream) = scope.new_input();
24 for _step in 0 .. length {
25 stream = stream.map(|x: ()| x);
26 }
27 let (probe, _stream) = stream.probe();
28 inputs.push(input);
29 probes.push(probe);
30 });
31 }
32
33 println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
34
35 for round in 0 .. {
36 let dataflow = round % dataflows;
37 if record {
38 inputs[dataflow].send(());
39 }
40 inputs[dataflow].advance_to(round);
41 let mut steps = 0;
42 while probes[dataflow].less_than(&round) {
43 worker.step();
44 steps += 1;
45 }
46 println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
47 }
48
49 }).unwrap();
50}8fn main() {
9
10 // command-line args: numbers of nodes and edges in the random graph.
11 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input = InputHandle::new();
21 let probe = ProbeHandle::new();
22
23 worker.dataflow(|scope| {
24 scope.input_from(&mut input)
25 // .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26 .union_find()
27 .exchange(|_| 0)
28 .union_find()
29 .probe_with(&probe);
30 });
31
32 // Generate roughly random data.
33 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34 let hasher = BuildHasherDefault::<DefaultHasher>::new();
35 let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36 hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38 for (edge, arc) in insert.take(edges / peers).enumerate() {
39 input.send(arc);
40 if edge % batch == (batch - 1) {
41 let next = input.epoch() + 1;
42 input.advance_to(next);
43 while probe.less_than(input.time()) {
44 worker.step();
45 }
46 }
47 }
48
49 }).unwrap(); // asserts error-free execution;
50}Sourcepub fn close(self)
pub fn close(self)
Closes the input.
This method allows timely dataflow to issue all progress notifications blocked by this input and to begin to shut down operators, as this input can no longer produce data.
Sourcepub fn epoch(&self) -> &T
pub fn epoch(&self) -> &T
Reports the current epoch.
Examples found in repository?
8fn main() {
9
10 // command-line args: numbers of nodes and edges in the random graph.
11 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input = InputHandle::new();
21 let probe = ProbeHandle::new();
22
23 worker.dataflow(|scope| {
24 scope.input_from(&mut input)
25 // .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26 .union_find()
27 .exchange(|_| 0)
28 .union_find()
29 .probe_with(&probe);
30 });
31
32 // Generate roughly random data.
33 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34 let hasher = BuildHasherDefault::<DefaultHasher>::new();
35 let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36 hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38 for (edge, arc) in insert.take(edges / peers).enumerate() {
39 input.send(arc);
40 if edge % batch == (batch - 1) {
41 let next = input.epoch() + 1;
42 input.advance_to(next);
43 while probe.less_than(input.time()) {
44 worker.step();
45 }
46 }
47 }
48
49 }).unwrap(); // asserts error-free execution;
50}More examples
8fn main() {
9
10 // command-line args: numbers of nodes and edges in the random graph.
11 let keys: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
12 let vals: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input1 = InputHandle::new();
21 let mut input2 = InputHandle::new();
22 let probe = ProbeHandle::new();
23
24 worker.dataflow(|scope| {
25
26 let stream1 = scope.input_from(&mut input1);
27 let stream2 = scope.input_from(&mut input2);
28
29 let exchange1 = Exchange::new(|x: &(u64, u64)| x.0);
30 let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);
31
32 stream1
33 .binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
34
35 let mut map1 = HashMap::<u64, Vec<u64>>::new();
36 let mut map2 = HashMap::<u64, Vec<u64>>::new();
37
38 move |input1, input2, output| {
39
40 // Drain first input, check second map, update first map.
41 input1.for_each_time(|time, data| {
42 let mut session = output.session(&time);
43 for (key, val1) in data.flat_map(|d| d.drain(..)) {
44 if let Some(values) = map2.get(&key) {
45 for val2 in values.iter() {
46 session.give((val1, *val2));
47 }
48 }
49
50 map1.entry(key).or_default().push(val1);
51 }
52 });
53
54 // Drain second input, check first map, update second map.
55 input2.for_each_time(|time, data| {
56 let mut session = output.session(&time);
57 for (key, val2) in data.flat_map(|d| d.drain(..)) {
58 if let Some(values) = map1.get(&key) {
59 for val1 in values.iter() {
60 session.give((*val1, val2));
61 }
62 }
63
64 map2.entry(key).or_default().push(val2);
65 }
66 });
67 }
68 })
69 .container::<Vec<_>>()
70 .probe_with(&probe);
71 });
72
73 // Generate roughly random data.
74 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
75 let hasher = BuildHasherDefault::<DefaultHasher>::new();
76 let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) % keys,
77 hasher.hash_one(&(i,index,1)) % keys,
78 hasher.hash_one(&(i,index,2)) % keys,
79 hasher.hash_one(&(i,index,3)) % keys));
80
81 let timer = std::time::Instant::now();
82
83 let mut sent = 0;
84 while sent < (vals / peers) {
85
86 // Send some amount of data, no more than `batch`.
87 let to_send = std::cmp::min(batch, vals/peers - sent);
88 for (src0, dst0, src1, dst1) in (&mut insert).take(to_send) {
89 input1.send((src0, dst0));
90 input2.send((src1, dst1));
91 }
92 sent += to_send;
93
94 // Advance input, iterate until data cleared.
95 let next = input1.epoch() + 1;
96 input1.advance_to(next);
97 input2.advance_to(next);
98 while probe.less_than(input1.time()) {
99 worker.step();
100 }
101
102 println!("{:?}\tworker {} batch complete", timer.elapsed(), index)
103 }
104
105 }).unwrap(); // asserts error-free execution;
106}Sourcepub fn time(&self) -> &T
pub fn time(&self) -> &T
Reports the current timestamp.
Examples found in repository?
5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::allocator::Thread::default();
9 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11 // create input and probe handles.
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // directly build a dataflow.
16 worker.dataflow(|scope| {
17 input
18 .to_stream(scope)
19 .container::<Vec<_>>()
20 .inspect(|x| println!("{:?}", x))
21 .probe_with(&probe);
22 });
23
24 // manage inputs.
25 for i in 0 .. 10 {
26 input.send(i);
27 input.advance_to(i);
28 while probe.less_than(input.time()) {
29 worker.step();
30 }
31 }
32}More examples
10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let index = worker.index();
9 let mut input = InputHandle::new();
10 let probe = ProbeHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 worker.dataflow(|scope| {
14 scope.input_from(&mut input)
15 .container::<Vec<_>>()
16 .exchange(|x| *x)
17 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18 .probe_with(&probe);
19 });
20
21 // introduce data and watch!
22 for round in 0..10 {
23 if index == 0 {
24 input.send(round);
25 }
26 input.advance_to(round + 1);
27 while probe.less_than(input.time()) {
28 worker.step();
29 }
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9 let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10 let mut input = InputHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 let probe = worker.dataflow(|scope|
14 scope
15 .input_from(&mut input)
16 .container::<Vec<_>>()
17 .exchange(|&x| x as u64)
18 .probe()
19 .0
20 );
21
22
23 let timer = std::time::Instant::now();
24
25 for round in 0 .. rounds {
26
27 for i in 0 .. batch {
28 input.send(i);
29 }
30 input.advance_to(round);
31
32 while probe.less_than(input.time()) {
33 worker.step();
34 }
35
36 }
37
38 let volume = (rounds * batch) as f64;
39 let elapsed = timer.elapsed();
40 let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42 println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44 }).unwrap();
45}8fn main() {
9
10 // command-line args: numbers of nodes and edges in the random graph.
11 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input = InputHandle::new();
21 let probe = ProbeHandle::new();
22
23 worker.dataflow(|scope| {
24 scope.input_from(&mut input)
25 // .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26 .union_find()
27 .exchange(|_| 0)
28 .union_find()
29 .probe_with(&probe);
30 });
31
32 // Generate roughly random data.
33 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34 let hasher = BuildHasherDefault::<DefaultHasher>::new();
35 let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36 hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38 for (edge, arc) in insert.take(edges / peers).enumerate() {
39 input.send(arc);
40 if edge % batch == (batch - 1) {
41 let next = input.epoch() + 1;
42 input.advance_to(next);
43 while probe.less_than(input.time()) {
44 worker.step();
45 }
46 }
47 }
48
49 }).unwrap(); // asserts error-free execution;
50}8fn main() {
9 // initializes and runs a timely dataflow.
10 timely::execute_from_args(std::env::args(), |worker| {
11 let index = worker.index();
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // create a new input, exchange data, and inspect its output
16 worker.dataflow::<usize,_,_>(|scope| {
17 let mut counts_by_time = HashMap::new();
18 scope.input_from(&mut input)
19 .unary(Exchange::new(|x| *x), "Distinct", move |_, _|
20 move |input, output| {
21 input.for_each_time(|time, data| {
22 let counts =
23 counts_by_time
24 .entry(*time.time())
25 .or_insert(HashMap::new());
26 let mut session = output.session(&time);
27 for data in data {
28 for &datum in data.iter() {
29 let count = counts.entry(datum).or_insert(0);
30 if *count == 0 {
31 session.give(datum);
32 }
33 *count += 1;
34 }
35 }
36 })
37 })
38 .container::<Vec<_>>()
39 .inspect(move |x| println!("worker {}:\tvalue {}", index, x))
40 .probe_with(&probe);
41 });
42
43 // introduce data and watch!
44 for round in 0..1 {
45 if index == 0 {
46 [0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x));
47 } else if index == 1 {
48 [0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x));
49 }
50 input.advance_to(round + 1);
51 while probe.less_than(input.time()) {
52 worker.step();
53 }
54 }
55 }).unwrap();
56}Source§impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB>
impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB>
Sourcepub fn send<D>(&mut self, data: D)where
CB: PushInto<D>,
pub fn send<D>(&mut self, data: D)where
CB: PushInto<D>,
Sends one record into the corresponding timely dataflow Stream, at the current epoch.
§Examples
use timely::*;
use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Inspect};
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = InputHandle::new();
worker.dataflow(|scope| {
scope.input_from(&mut input)
.container::<Vec<_>>()
.inspect(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step();
}
});Examples found in repository?
5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::allocator::Thread::default();
9 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11 // create input and probe handles.
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // directly build a dataflow.
16 worker.dataflow(|scope| {
17 input
18 .to_stream(scope)
19 .container::<Vec<_>>()
20 .inspect(|x| println!("{:?}", x))
21 .probe_with(&probe);
22 });
23
24 // manage inputs.
25 for i in 0 .. 10 {
26 input.send(i);
27 input.advance_to(i);
28 while probe.less_than(input.time()) {
29 worker.step();
30 }
31 }
32}More examples
10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let index = worker.index();
9 let mut input = InputHandle::new();
10 let probe = ProbeHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 worker.dataflow(|scope| {
14 scope.input_from(&mut input)
15 .container::<Vec<_>>()
16 .exchange(|x| *x)
17 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18 .probe_with(&probe);
19 });
20
21 // introduce data and watch!
22 for round in 0..10 {
23 if index == 0 {
24 input.send(round);
25 }
26 input.advance_to(round + 1);
27 while probe.less_than(input.time()) {
28 worker.step();
29 }
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9 let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10 let mut input = InputHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 let probe = worker.dataflow(|scope|
14 scope
15 .input_from(&mut input)
16 .container::<Vec<_>>()
17 .exchange(|&x| x as u64)
18 .probe()
19 .0
20 );
21
22
23 let timer = std::time::Instant::now();
24
25 for round in 0 .. rounds {
26
27 for i in 0 .. batch {
28 input.send(i);
29 }
30 input.advance_to(round);
31
32 while probe.less_than(input.time()) {
33 worker.step();
34 }
35
36 }
37
38 let volume = (rounds * batch) as f64;
39 let elapsed = timer.elapsed();
40 let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42 println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44 }).unwrap();
45}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let timer = std::time::Instant::now();
9
10 let mut args = std::env::args();
11 args.next();
12
13 let dataflows = args.next().unwrap().parse::<usize>().unwrap();
14 let length = args.next().unwrap().parse::<usize>().unwrap();
15 let record = args.next() == Some("record".to_string());
16
17 let mut inputs = Vec::new();
18 let mut probes = Vec::new();
19
20 // create a new input, exchange data, and inspect its output
21 for _dataflow in 0 .. dataflows {
22 worker.dataflow(|scope| {
23 let (input, mut stream) = scope.new_input();
24 for _step in 0 .. length {
25 stream = stream.map(|x: ()| x);
26 }
27 let (probe, _stream) = stream.probe();
28 inputs.push(input);
29 probes.push(probe);
30 });
31 }
32
33 println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
34
35 for round in 0 .. {
36 let dataflow = round % dataflows;
37 if record {
38 inputs[dataflow].send(());
39 }
40 inputs[dataflow].advance_to(round);
41 let mut steps = 0;
42 while probes[dataflow].less_than(&round) {
43 worker.step();
44 steps += 1;
45 }
46 println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
47 }
48
49 }).unwrap();
50}8fn main() {
9
10 // command-line args: numbers of nodes and edges in the random graph.
11 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input = InputHandle::new();
21 let probe = ProbeHandle::new();
22
23 worker.dataflow(|scope| {
24 scope.input_from(&mut input)
25 // .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26 .union_find()
27 .exchange(|_| 0)
28 .union_find()
29 .probe_with(&probe);
30 });
31
32 // Generate roughly random data.
33 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34 let hasher = BuildHasherDefault::<DefaultHasher>::new();
35 let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36 hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38 for (edge, arc) in insert.take(edges / peers).enumerate() {
39 input.send(arc);
40 if edge % batch == (batch - 1) {
41 let next = input.epoch() + 1;
42 input.advance_to(next);
43 while probe.less_than(input.time()) {
44 worker.step();
45 }
46 }
47 }
48
49 }).unwrap(); // asserts error-free execution;
50}Trait Implementations§
Source§impl<T: Debug + Timestamp, CB: Debug + ContainerBuilder<Container: Clone>> Debug for Handle<T, CB>
impl<T: Debug + Timestamp, CB: Debug + ContainerBuilder<Container: Clone>> Debug for Handle<T, CB>
Auto Trait Implementations§
impl<T, CB> Freeze for Handle<T, CB>
impl<T, CB> !RefUnwindSafe for Handle<T, CB>
impl<T, CB> !Send for Handle<T, CB>
impl<T, CB> !Sync for Handle<T, CB>
impl<T, CB> Unpin for Handle<T, CB>
impl<T, CB> UnsafeUnpin for Handle<T, CB>
impl<T, CB> !UnwindSafe for Handle<T, CB>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more