Skip to main content

InputCapability

Struct InputCapability 

Source
pub struct InputCapability<T: Timestamp> { /* private fields */ }
Expand description

An capability of an input port.

Holding onto this capability will implicitly hold on to a capability for all the outputs ports this input is connected to, after the connection summaries have been applied.

This input capability supplies a retain_for_output(self) method which consumes the input capability and turns it into a Capability for a specific output port.

Implementations§

Source§

impl<T: Timestamp> InputCapability<T>

Source

pub fn time(&self) -> &T

The timestamp associated with this capability.

Examples found in repository?
examples/distinct.rs (line 24)
8fn main() {
9    // initializes and runs a timely dataflow.
10    timely::execute_from_args(std::env::args(), |worker| {
11        let index = worker.index();
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        // create a new input, exchange data, and inspect its output
16        worker.dataflow::<usize,_,_>(|scope| {
17            let mut counts_by_time = HashMap::new();
18            scope.input_from(&mut input)
19                .unary(Exchange::new(|x| *x), "Distinct", move |_, _|
20                    move |input, output| {
21                        input.for_each_time(|time, data| {
22                            let counts =
23                            counts_by_time
24                                .entry(*time.time())
25                                .or_insert(HashMap::new());
26                            let mut session = output.session(&time);
27                            for data in data {
28                                for &datum in data.iter() {
29                                    let count = counts.entry(datum).or_insert(0);
30                                    if *count == 0 {
31                                        session.give(datum);
32                                    }
33                                    *count += 1;
34                                }
35                            }
36                        })
37                    })
38                .container::<Vec<_>>()
39                .inspect(move |x| println!("worker {}:\tvalue {}", index, x))
40                .probe_with(&probe);
41        });
42
43        // introduce data and watch!
44        for round in 0..1 {
45            if index == 0 {
46                [0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x));
47            } else if index == 1 {
48                [0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x));
49            }
50            input.advance_to(round + 1);
51            while probe.less_than(input.time()) {
52                worker.step();
53            }
54        }
55    }).unwrap();
56}
More examples
Hide additional examples
examples/bfs.rs (line 65)
7fn main() {
8
9    // command-line args: numbers of nodes and edges in the random graph.
10    let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
11    let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
12
13    // let logging = ::timely::logging::to_tcp_socket();
14    timely::execute_from_args(std::env::args().skip(3), move |worker| {
15
16        let index = worker.index();
17        let peers = worker.peers();
18
19        // pending edges and node updates.
20        let mut edge_list = Vec::new();
21        let mut node_lists = HashMap::new();
22
23        // graph data; offsets into targets.
24        let mut offsets = Vec::new();
25        let mut targets = Vec::new();
26
27        // holds the bfs parent of each node, or u32::MAX if unset.
28        let mut done = vec![u32::MAX; 1 + (nodes / peers)];
29
30        let start = std::time::Instant::now();
31
32        worker.dataflow::<usize,_,_>(move |scope| {
33
34            // generate part of a random graph.
35            use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
36            let hasher = BuildHasherDefault::<DefaultHasher>::new();
37            let graph =
38            (0..edges/peers)
39                .map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
40                               hasher.hash_one(&(i,index,1)) as usize % nodes))
41                .map(|(src,dst)| (src as u32, dst as u32))
42                .to_stream(scope)
43                .container::<Vec<_>>();
44
45            // define a loop variable, for the (node, worker) pairs.
46            let (handle, stream) = scope.feedback(1usize);
47
48            // use the stream of edges
49            graph.binary_notify(
50                stream,
51                Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
52                Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
53                "BFS",
54                vec![],
55                move |input1, input2, output, notify| {
56
57                    // receive edges, start to sort them
58                    input1.for_each_time(|time, data| {
59                        notify.notify_at(time.retain(output.output_index()));
60                        edge_list.extend(data.map(std::mem::take));
61                    });
62
63                    // receive (node, worker) pairs, note any new ones.
64                    input2.for_each_time(|time, data| {
65                        node_lists.entry(*time.time())
66                                  .or_insert_with(|| {
67                                      notify.notify_at(time.retain(output.output_index()));
68                                      Vec::new()
69                                  })
70                                  .extend(data.map(std::mem::take));
71                    });
72
73                    notify.for_each(|time, _num, _notify| {
74
75                        // maybe process the graph
76                        if *time == 0 {
77
78                            // print some diagnostic timing information
79                            if index == 0 { println!("{:?}:\tsorting", start.elapsed()); }
80
81                            // sort the edges (previously: radix sorted).
82                            edge_list.sort();
83
84                            let mut count = 0;
85                            for buffer in &edge_list { count += buffer.len(); }
86
87                            // allocate sufficient memory, to avoid resizing.
88                            offsets = Vec::with_capacity(1 + (nodes / peers));
89                            targets = Vec::with_capacity(count);
90
91                            // construct the graph
92                            offsets.push(0);
93                            let mut prev_node = 0;
94                            for buffer in edge_list.drain(..) {
95                                for (node, edge) in buffer {
96                                    let temp = node / peers as u32;
97                                    while prev_node < temp {
98                                        prev_node += 1;
99                                        offsets.push(targets.len() as u32)
100                                    }
101                                    targets.push(edge);
102                                }
103                            }
104                            while offsets.len() < offsets.capacity() {
105                                offsets.push(targets.len() as u32);
106                            }
107                        }
108
109                        // print some diagnostic timing information
110                        if index == 0 { println!("{:?}:\ttime: {:?}", start.elapsed(), time.time()); }
111
112                        if let Some(mut todo) = node_lists.remove(&time) {
113                            let mut session = output.session(&time);
114
115                            // we could sort these, or not (previously: radix sorted).
116                            // todo.sort();
117
118                            for buffer in todo.drain(..) {
119                                for (node, prev) in buffer {
120                                    let temp = (node as usize) / peers;
121                                    if done[temp] == u32::MAX {
122                                        done[temp] = prev;
123                                        let lower = offsets[temp] as usize;
124                                        let upper = offsets[temp + 1] as usize;
125                                        for &target in &targets[lower..upper] {
126                                            session.give((target, node));
127                                        }
128                                    }
129                                }
130                            }
131                        }
132                    });
133                }
134            )
135            .concat((0..1).map(|x| (x,x)).to_stream(scope))
136            .connect_loop(handle);
137        });
138    }).unwrap(); // asserts error-free execution;
139}
Source

pub fn delayed(&self, new_time: &T, output_port: usize) -> Capability<T>

Delays capability for a specific output port.

Makes a new capability for a timestamp new_time greater or equal to the timestamp of the source capability (self).

This method panics if self.time is not less or equal to new_time.

Source

pub fn retain(&self, output_port: usize) -> Capability<T>

Transforms to an owned capability for a specific output port.

This method produces an owned capability which must be dropped to release the capability. Users should take care that these capabilities are only stored for as long as they are required, as failing to drop them may result in livelock.

This method panics if the timestamp summary to output_port strictly advances the time.

Examples found in repository?
examples/wordcount.rs (line 33)
8fn main() {
9    // initializes and runs a timely dataflow.
10    timely::execute_from_args(std::env::args(), |worker| {
11
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        // define a distribution function for strings.
16        let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64);
17
18        // create a new input, exchange data, and inspect its output
19        worker.dataflow::<usize,_,_>(|scope| {
20            input.to_stream(scope)
21                 .flat_map(|(text, diff): (String, i64)|
22                    text.split_whitespace()
23                        .map(move |word| (word.to_owned(), diff))
24                        .collect::<Vec<_>>()
25                 )
26                 .unary_frontier(exchange, "WordCount", |_capability, _info| {
27
28                    let mut queues = HashMap::new();
29                    let mut counts = HashMap::new();
30
31                    move |(input, frontier), output| {
32                        input.for_each_time(|time, data| {
33                            queues.entry(time.retain(output.output_index()))
34                                  .or_insert(Vec::new())
35                                  .extend(data.map(std::mem::take));
36                        });
37
38                        for (key, val) in queues.iter_mut() {
39                            if !frontier.less_equal(key.time()) {
40                                let mut session = output.session(key);
41                                for mut batch in val.drain(..) {
42                                    for (word, diff) in batch.drain(..) {
43                                        let entry = counts.entry(word.clone()).or_insert(0i64);
44                                        *entry += diff;
45                                        session.give((word, *entry));
46                                    }
47                                }
48                            }
49                        }
50
51                        queues.retain(|_key, val| !val.is_empty());
52                    }})
53                 .container::<Vec<_>>()
54                 .inspect(|x| println!("seen: {:?}", x))
55                 .probe_with(&probe);
56        });
57
58        // introduce data and watch!
59        for round in 0..10 {
60            input.send(("round".to_owned(), 1));
61            input.advance_to(round + 1);
62            while probe.less_than(input.time()) {
63                worker.step();
64            }
65        }
66    }).unwrap();
67}
More examples
Hide additional examples
examples/columnar.rs (line 71)
21fn main() {
22
23    type InnerContainer = <WordCount as columnar::Columnar>::Container;
24    type Container = Column<InnerContainer>;
25
26    use columnar::Len;
27
28    let config = timely::Config {
29        communication: timely::CommunicationConfig::ProcessBinary(3),
30        worker: timely::WorkerConfig::default(),
31    };
32
33    // initializes and runs a timely dataflow.
34    timely::execute(config, |worker| {
35        let mut input = <InputHandle<_, CapacityContainerBuilder<Container>>>::new();
36        let probe = ProbeHandle::new();
37
38        // create a new input, exchange data, and inspect its output
39        worker.dataflow::<usize, _, _>(|scope| {
40            input
41                .to_stream(scope)
42                .unary(
43                    Pipeline,
44                    "Split",
45                    |_cap, _info| {
46                        move |input, output| {
47                            input.for_each_time(|time, data| {
48                                let mut session = output.session(&time);
49                                for data in data {
50                                    for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
51                                        wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
52                                    }) {
53                                        session.give(wordcount);
54                                    }
55                                }
56                            });
57                        }
58                    },
59                )
60                .container::<Container>()
61                .unary_frontier(
62                    ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
63                    "WordCount",
64                    |_capability, _info| {
65                        let mut queues = HashMap::new();
66                        let mut counts = HashMap::new();
67
68                        move |(input, frontier), output| {
69                            input.for_each_time(|time, data| {
70                                queues
71                                    .entry(time.retain(output.output_index()))
72                                    .or_insert(Vec::new())
73                                    .extend(data.map(std::mem::take));
74
75                            });
76
77                            for (key, val) in queues.iter_mut() {
78                                if !frontier.less_equal(key.time()) {
79                                    let mut session = output.session(key);
80                                    for batch in val.drain(..) {
81                                        for wordcount in batch.borrow().into_index_iter() {
82                                            let total =
83                                            if let Some(count) = counts.get_mut(wordcount.text) {
84                                                *count += wordcount.diff;
85                                                *count
86                                            }
87                                            else {
88                                                counts.insert(wordcount.text.to_string(), *wordcount.diff);
89                                                *wordcount.diff
90                                            };
91                                            session.give(WordCountReference { text: wordcount.text, diff: total });
92                                        }
93                                    }
94                                }
95                            }
96
97                            queues.retain(|_key, val| !val.is_empty());
98                        }
99                    },
100                )
101                .container::<Container>()
102                .inspect_container(|x| {
103                    match x {
104                        Ok((time, data)) => {
105                            println!("seen at: {:?}\t{:?} records", time, data.record_count());
106                            for wc in data.borrow().into_index_iter() {
107                                println!("  {}: {}", wc.text, wc.diff);
108                            }
109                        },
110                        Err(frontier) => println!("frontier advanced to {:?}", frontier),
111                    }
112                })
113                .probe_with(&probe);
114        });
115
116        // introduce data and watch!
117        for round in 0..10 {
118            input.send(WordCountReference { text: "flat container", diff: 1 });
119            input.advance_to(round + 1);
120            while probe.less_than(input.time()) {
121                worker.step();
122            }
123        }
124    })
125    .unwrap();
126}
examples/bfs.rs (line 59)
7fn main() {
8
9    // command-line args: numbers of nodes and edges in the random graph.
10    let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
11    let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
12
13    // let logging = ::timely::logging::to_tcp_socket();
14    timely::execute_from_args(std::env::args().skip(3), move |worker| {
15
16        let index = worker.index();
17        let peers = worker.peers();
18
19        // pending edges and node updates.
20        let mut edge_list = Vec::new();
21        let mut node_lists = HashMap::new();
22
23        // graph data; offsets into targets.
24        let mut offsets = Vec::new();
25        let mut targets = Vec::new();
26
27        // holds the bfs parent of each node, or u32::MAX if unset.
28        let mut done = vec![u32::MAX; 1 + (nodes / peers)];
29
30        let start = std::time::Instant::now();
31
32        worker.dataflow::<usize,_,_>(move |scope| {
33
34            // generate part of a random graph.
35            use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
36            let hasher = BuildHasherDefault::<DefaultHasher>::new();
37            let graph =
38            (0..edges/peers)
39                .map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
40                               hasher.hash_one(&(i,index,1)) as usize % nodes))
41                .map(|(src,dst)| (src as u32, dst as u32))
42                .to_stream(scope)
43                .container::<Vec<_>>();
44
45            // define a loop variable, for the (node, worker) pairs.
46            let (handle, stream) = scope.feedback(1usize);
47
48            // use the stream of edges
49            graph.binary_notify(
50                stream,
51                Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
52                Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
53                "BFS",
54                vec![],
55                move |input1, input2, output, notify| {
56
57                    // receive edges, start to sort them
58                    input1.for_each_time(|time, data| {
59                        notify.notify_at(time.retain(output.output_index()));
60                        edge_list.extend(data.map(std::mem::take));
61                    });
62
63                    // receive (node, worker) pairs, note any new ones.
64                    input2.for_each_time(|time, data| {
65                        node_lists.entry(*time.time())
66                                  .or_insert_with(|| {
67                                      notify.notify_at(time.retain(output.output_index()));
68                                      Vec::new()
69                                  })
70                                  .extend(data.map(std::mem::take));
71                    });
72
73                    notify.for_each(|time, _num, _notify| {
74
75                        // maybe process the graph
76                        if *time == 0 {
77
78                            // print some diagnostic timing information
79                            if index == 0 { println!("{:?}:\tsorting", start.elapsed()); }
80
81                            // sort the edges (previously: radix sorted).
82                            edge_list.sort();
83
84                            let mut count = 0;
85                            for buffer in &edge_list { count += buffer.len(); }
86
87                            // allocate sufficient memory, to avoid resizing.
88                            offsets = Vec::with_capacity(1 + (nodes / peers));
89                            targets = Vec::with_capacity(count);
90
91                            // construct the graph
92                            offsets.push(0);
93                            let mut prev_node = 0;
94                            for buffer in edge_list.drain(..) {
95                                for (node, edge) in buffer {
96                                    let temp = node / peers as u32;
97                                    while prev_node < temp {
98                                        prev_node += 1;
99                                        offsets.push(targets.len() as u32)
100                                    }
101                                    targets.push(edge);
102                                }
103                            }
104                            while offsets.len() < offsets.capacity() {
105                                offsets.push(targets.len() as u32);
106                            }
107                        }
108
109                        // print some diagnostic timing information
110                        if index == 0 { println!("{:?}:\ttime: {:?}", start.elapsed(), time.time()); }
111
112                        if let Some(mut todo) = node_lists.remove(&time) {
113                            let mut session = output.session(&time);
114
115                            // we could sort these, or not (previously: radix sorted).
116                            // todo.sort();
117
118                            for buffer in todo.drain(..) {
119                                for (node, prev) in buffer {
120                                    let temp = (node as usize) / peers;
121                                    if done[temp] == u32::MAX {
122                                        done[temp] = prev;
123                                        let lower = offsets[temp] as usize;
124                                        let upper = offsets[temp + 1] as usize;
125                                        for &target in &targets[lower..upper] {
126                                            session.give((target, node));
127                                        }
128                                    }
129                                }
130                            }
131                        }
132                    });
133                }
134            )
135            .concat((0..1).map(|x| (x,x)).to_stream(scope))
136            .connect_loop(handle);
137        });
138    }).unwrap(); // asserts error-free execution;
139}
examples/pagerank.rs (line 47)
8fn main() {
9
10    timely::execute_from_args(std::env::args().skip(3), move |worker| {
11
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        worker.dataflow::<usize,_,_>(|scope| {
16
17            // create a new input, into which we can push edge changes.
18            let edge_stream = input.to_stream(scope);
19
20            // create a new feedback stream, which will be changes to ranks.
21            let (handle, rank_stream) = scope.feedback(1);
22
23            // bring edges and ranks together!
24            let changes = edge_stream.binary_frontier(
25                rank_stream,
26                Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
27                Exchange::new(|x: &(usize, i64)| x.0 as u64),
28                "PageRank",
29                |_capability, _info| {
30
31                    // where we stash out-of-order data.
32                    let mut edge_stash: HashMap<_, Vec<_>> = HashMap::new();
33                    let mut rank_stash: HashMap<_, Vec<_>> = HashMap::new();
34
35                    // lists of edges, ranks, and changes.
36                    let mut edges = Vec::new();
37                    let mut ranks = Vec::new();
38                    let mut diffs = Vec::new(); // for received but un-acted upon deltas.
39                    let mut delta = Vec::new();
40
41                    let timer = ::std::time::Instant::now();
42
43                    move |(input1, frontier1), (input2, frontier2), output| {
44
45                        // hold on to edge changes until it is time.
46                        input1.for_each_time(|time, data| {
47                            let entry = edge_stash.entry(time.retain(output.output_index())).or_default();
48                            data.for_each(|data| entry.append(data));
49                        });
50
51                        // hold on to rank changes until it is time.
52                        input2.for_each_time(|time, data| {
53                            let entry = rank_stash.entry(time.retain(output.output_index())).or_default();
54                            data.for_each(|data| entry.append(data));
55                        });
56
57                        let frontiers = &[frontier1, frontier2];
58
59                        for (time, edge_changes) in edge_stash.iter_mut() {
60                            if frontiers.iter().all(|f| !f.less_equal(time)) {
61
62                                let mut session = output.session(time);
63
64                                compact(edge_changes);
65
66                                for ((src, dst), diff) in edge_changes.drain(..) {
67
68                                    // 0. ensure enough state allocated
69                                    while edges.len() <= src { edges.push(Vec::new()); }
70                                    while ranks.len() <= src { ranks.push(1_000); }
71                                    while diffs.len() <= src { diffs.push(0); }
72
73                                    // 1. subtract previous distribution.
74                                    allocate(ranks[src], &edges[src][..], &mut delta);
75                                    for x in delta.iter_mut() { x.1 *= -1; }
76
77                                    // 2. update edges.
78                                    edges[src].push((dst, diff));
79                                    compact(&mut edges[src]);
80
81                                    // 3. re-distribute allocations.
82                                    allocate(ranks[src], &edges[src][..], &mut delta);
83
84                                    // 4. compact down and send cumulative changes.
85                                    compact(&mut delta);
86                                    for (dst, diff) in delta.drain(..) {
87                                        session.give((dst, diff));
88                                    }
89                                }
90                            }
91                        }
92
93                        edge_stash.retain(|_key, val| !val.is_empty());
94
95                        for (time, rank_changes) in rank_stash.iter_mut() {
96                            if frontiers.iter().all(|f| !f.less_equal(time)) {
97
98                                let mut session = output.session(time);
99
100                                compact(rank_changes);
101
102                                let mut cnt = 0;
103                                let mut sum = 0;
104                                let mut max = 0;
105
106                                for (src, diff) in rank_changes.drain(..) {
107
108                                    cnt += 1;
109                                    sum += diff.abs();
110                                    max = if max < diff.abs() { diff.abs() } else { max };
111
112                                    // 0. ensure enough state allocated
113                                    while edges.len() <= src { edges.push(Vec::new()); }
114                                    while ranks.len() <= src { ranks.push(1_000); }
115                                    while diffs.len() <= src { diffs.push(0); }
116
117                                    // 1. subtract previous distribution.
118                                    allocate(ranks[src], &edges[src][..], &mut delta);
119                                    for x in delta.iter_mut() { x.1 *= -1; }
120
121                                    // 2. update ranks.
122                                    diffs[src] += diff;
123                                    if diffs[src].abs() >= 6 {
124                                        ranks[src] += diffs[src];
125                                        diffs[src] = 0;
126                                    }
127
128                                    // 3. re-distribute allocations.
129                                    allocate(ranks[src], &edges[src][..], &mut delta);
130
131                                    // 4. compact down and send cumulative changes.
132                                    compact(&mut delta);
133                                    for (dst, diff) in delta.drain(..) {
134                                        session.give((dst, diff));
135                                    }
136                                }
137
138                                println!("{:?}:\t{:?}\t{}\t{}\t{}", timer.elapsed(), time.time(), cnt, sum, max);
139                            }
140                        }
141
142                        rank_stash.retain(|_key, val| !val.is_empty());
143
144                    }
145                }
146            );
147
148            changes
149                .probe_with(&probe)
150                .connect_loop(handle);
151
152        });
153
154        let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
155        let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
156
157        let index = worker.index();
158        // Generate roughly random data.
159        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
160        let hasher = BuildHasherDefault::<DefaultHasher>::new();
161        let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
162                                             hasher.hash_one(&(i,index,1)) as usize % nodes));
163        let remove = insert.clone();
164
165        for ins in (&mut insert).take(edges / worker.peers()) {
166            input.send((ins, 1));
167        }
168
169        input.advance_to(1);
170
171        while probe.less_than(input.time()) {
172            worker.step();
173        }
174
175        for (ins, del) in insert.zip(remove).take(1000) {
176            input.send((ins, 1));
177            input.send((del,-1));
178            input.advance_to(input.time() + 1);
179            while probe.less_than(input.time()) {
180                worker.step();
181            }
182        }
183
184    }).unwrap(); // asserts error-free execution;
185}

Trait Implementations§

Source§

impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T>

Source§

fn time(&self) -> &T

The timestamp associated with the capability.
Source§

fn valid_for_output( &self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize, ) -> bool

Validates that the capability is valid for a specific internal buffer and output port.
Source§

impl<T: Timestamp> Debug for InputCapability<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<T: Timestamp> Deref for InputCapability<T>

Source§

type Target = T

The resulting type after dereferencing.
Source§

fn deref(&self) -> &T

Dereferences the value.

Auto Trait Implementations§

§

impl<T> Freeze for InputCapability<T>
where T: Freeze,

§

impl<T> !RefUnwindSafe for InputCapability<T>

§

impl<T> !Send for InputCapability<T>

§

impl<T> !Sync for InputCapability<T>

§

impl<T> Unpin for InputCapability<T>
where T: Unpin,

§

impl<T> UnsafeUnpin for InputCapability<T>
where T: UnsafeUnpin,

§

impl<T> !UnwindSafe for InputCapability<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<P, T> Receiver for P
where P: Deref<Target = T> + ?Sized, T: ?Sized,

Source§

type Target = T

🔬This is a nightly-only experimental API. (arbitrary_self_types)
The target type on which the method may be called.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.