pub struct Handle<T: Timestamp> { /* private fields */ }Expand description
Reports information about progress at the probe.
Implementations§
Source§impl<T: Timestamp> Handle<T>
impl<T: Timestamp> Handle<T>
Sourcepub fn less_than(&self, time: &T) -> bool
pub fn less_than(&self, time: &T) -> bool
Returns true iff the frontier is strictly less than time.
Examples found in repository?
examples/threadless.rs (line 28)
5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::allocator::Thread::default();
9 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11 // create input and probe handles.
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // directly build a dataflow.
16 worker.dataflow(|scope| {
17 input
18 .to_stream(scope)
19 .container::<Vec<_>>()
20 .inspect(|x| println!("{:?}", x))
21 .probe_with(&probe);
22 });
23
24 // manage inputs.
25 for i in 0 .. 10 {
26 input.send(i);
27 input.advance_to(i);
28 while probe.less_than(input.time()) {
29 worker.step();
30 }
31 }
32}More examples
examples/rc.rs (line 29)
10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}examples/hello.rs (line 27)
4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let index = worker.index();
9 let mut input = InputHandle::new();
10 let probe = ProbeHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 worker.dataflow(|scope| {
14 scope.input_from(&mut input)
15 .container::<Vec<_>>()
16 .exchange(|x| *x)
17 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18 .probe_with(&probe);
19 });
20
21 // introduce data and watch!
22 for round in 0..10 {
23 if index == 0 {
24 input.send(round);
25 }
26 input.advance_to(round + 1);
27 while probe.less_than(input.time()) {
28 worker.step();
29 }
30 }
31 }).unwrap();
32}examples/exchange.rs (line 32)
4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9 let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10 let mut input = InputHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 let probe = worker.dataflow(|scope|
14 scope
15 .input_from(&mut input)
16 .container::<Vec<_>>()
17 .exchange(|&x| x as u64)
18 .probe()
19 .0
20 );
21
22
23 let timer = std::time::Instant::now();
24
25 for round in 0 .. rounds {
26
27 for i in 0 .. batch {
28 input.send(i);
29 }
30 input.advance_to(round);
31
32 while probe.less_than(input.time()) {
33 worker.step();
34 }
35
36 }
37
38 let volume = (rounds * batch) as f64;
39 let elapsed = timer.elapsed();
40 let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42 println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44 }).unwrap();
45}examples/unionfind.rs (line 43)
8fn main() {
9
10 // command-line args: numbers of nodes and edges in the random graph.
11 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input = InputHandle::new();
21 let probe = ProbeHandle::new();
22
23 worker.dataflow(|scope| {
24 scope.input_from(&mut input)
25 // .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26 .union_find()
27 .exchange(|_| 0)
28 .union_find()
29 .probe_with(&probe);
30 });
31
32 // Generate roughly random data.
33 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34 let hasher = BuildHasherDefault::<DefaultHasher>::new();
35 let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36 hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38 for (edge, arc) in insert.take(edges / peers).enumerate() {
39 input.send(arc);
40 if edge % batch == (batch - 1) {
41 let next = input.time() + 1;
42 input.advance_to(next);
43 while probe.less_than(input.time()) {
44 worker.step();
45 }
46 }
47 }
48
49 }).unwrap(); // asserts error-free execution;
50}examples/event_driven.rs (line 47)
5fn main() {
6 // initializes and runs a timely dataflow.
7 timely::execute_from_args(std::env::args(), |worker| {
8
9 let timer = std::time::Instant::now();
10
11 let mut args = std::env::args();
12 args.next();
13
14 let dataflows = args.next().unwrap().parse::<usize>().unwrap();
15 let length = args.next().unwrap().parse::<usize>().unwrap();
16 let record = args.next() == Some("record".to_string());
17
18 let mut inputs = Vec::new();
19 let mut probes = Vec::new();
20
21 // create a new input, exchange data, and inspect its output
22 for _dataflow in 0 .. dataflows {
23 worker.dataflow(|scope| {
24 let (input, stream) = scope.new_input();
25 let stream = scope.region(|inner| {
26 let mut stream = stream.enter(inner);
27 for _step in 0 .. length {
28 stream = stream.map(|x: ()| x);
29 }
30 stream.leave()
31 });
32 let (probe, _stream) = stream.probe();
33 inputs.push(input);
34 probes.push(probe);
35 });
36 }
37
38 println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
39
40 for round in 0 .. {
41 let dataflow = round % dataflows;
42 if record {
43 inputs[dataflow].send(());
44 }
45 inputs[dataflow].advance_to(round);
46 let mut steps = 0;
47 while probes[dataflow].less_than(&round) {
48 worker.step();
49 steps += 1;
50 }
51 if round % 1000 == 0 { println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); }
52 }
53
54 }).unwrap();
55}Additional examples can be found in:
Sourcepub fn less_equal(&self, time: &T) -> bool
pub fn less_equal(&self, time: &T) -> bool
Returns true iff the frontier is less than or equal to time.
Sourcepub fn new() -> Self
pub fn new() -> Self
Allocates a new handle.
Examples found in repository?
examples/threadless.rs (line 13)
5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::allocator::Thread::default();
9 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11 // create input and probe handles.
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // directly build a dataflow.
16 worker.dataflow(|scope| {
17 input
18 .to_stream(scope)
19 .container::<Vec<_>>()
20 .inspect(|x| println!("{:?}", x))
21 .probe_with(&probe);
22 });
23
24 // manage inputs.
25 for i in 0 .. 10 {
26 input.send(i);
27 input.advance_to(i);
28 while probe.less_than(input.time()) {
29 worker.step();
30 }
31 }
32}More examples
examples/rc.rs (line 16)
10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}examples/hello.rs (line 10)
4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let index = worker.index();
9 let mut input = InputHandle::new();
10 let probe = ProbeHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 worker.dataflow(|scope| {
14 scope.input_from(&mut input)
15 .container::<Vec<_>>()
16 .exchange(|x| *x)
17 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18 .probe_with(&probe);
19 });
20
21 // introduce data and watch!
22 for round in 0..10 {
23 if index == 0 {
24 input.send(round);
25 }
26 input.advance_to(round + 1);
27 while probe.less_than(input.time()) {
28 worker.step();
29 }
30 }
31 }).unwrap();
32}examples/flow_controlled.rs (line 8)
4fn main() {
5 timely::execute_from_args(std::env::args(), |worker| {
6 let mut input = (0u64..100000).peekable();
7 worker.dataflow(|scope| {
8 let probe_handle = probe::Handle::new();
9 let probe_handle_2 = probe_handle.clone();
10
11 iterator_source(
12 scope,
13 "Source",
14 move |prev_t| {
15 if let Some(first_x) = input.peek().cloned() {
16 let next_t = first_x / 100 * 100;
17 Some(IteratorSourceInput {
18 lower_bound: Default::default(),
19 data: vec![
20 (next_t,
21 input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())],
22 target: *prev_t,
23 })
24 } else {
25 None
26 }
27 },
28 probe_handle_2)
29 .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
30 .probe_with(&probe_handle);
31 });
32 }).unwrap();
33}examples/unionfind.rs (line 21)
8fn main() {
9
10 // command-line args: numbers of nodes and edges in the random graph.
11 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input = InputHandle::new();
21 let probe = ProbeHandle::new();
22
23 worker.dataflow(|scope| {
24 scope.input_from(&mut input)
25 // .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26 .union_find()
27 .exchange(|_| 0)
28 .union_find()
29 .probe_with(&probe);
30 });
31
32 // Generate roughly random data.
33 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34 let hasher = BuildHasherDefault::<DefaultHasher>::new();
35 let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36 hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38 for (edge, arc) in insert.take(edges / peers).enumerate() {
39 input.send(arc);
40 if edge % batch == (batch - 1) {
41 let next = input.time() + 1;
42 input.advance_to(next);
43 while probe.less_than(input.time()) {
44 worker.step();
45 }
46 }
47 }
48
49 }).unwrap(); // asserts error-free execution;
50}examples/distinct.rs (line 13)
8fn main() {
9 // initializes and runs a timely dataflow.
10 timely::execute_from_args(std::env::args(), |worker| {
11 let index = worker.index();
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // create a new input, exchange data, and inspect its output
16 worker.dataflow::<usize,_,_>(|scope| {
17 let mut counts_by_time = HashMap::new();
18 scope.input_from(&mut input)
19 .unary(Exchange::new(|x| *x), "Distinct", move |_, _|
20 move |input, output| {
21 input.for_each_time(|time, data| {
22 let counts =
23 counts_by_time
24 .entry(*time.time())
25 .or_insert(HashMap::new());
26 let mut session = output.session(&time);
27 for data in data {
28 for &datum in data.iter() {
29 let count = counts.entry(datum).or_insert(0);
30 if *count == 0 {
31 session.give(datum);
32 }
33 *count += 1;
34 }
35 }
36 })
37 })
38 .container::<Vec<_>>()
39 .inspect(move |x| println!("worker {}:\tvalue {}", index, x))
40 .probe_with(&probe);
41 });
42
43 // introduce data and watch!
44 for round in 0..1 {
45 if index == 0 {
46 [0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x));
47 } else if index == 1 {
48 [0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x));
49 }
50 input.advance_to(round + 1);
51 while probe.less_than(input.time()) {
52 worker.step();
53 }
54 }
55 }).unwrap();
56}Sourcepub fn with_frontier<R, F: FnMut(AntichainRef<'_, T>) -> R>(
&self,
function: F,
) -> R
pub fn with_frontier<R, F: FnMut(AntichainRef<'_, T>) -> R>( &self, function: F, ) -> R
Invokes a method on the frontier, returning its result.
This method allows inspection of the frontier, which cannot be returned by reference as
it is on the other side of a RefCell.
§Examples
use timely::dataflow::operators::probe::Handle;
let handle = Handle::<usize>::new();
let frontier = handle.with_frontier(|frontier| frontier.to_vec());Examples found in repository?
examples/loopdemo.rs (line 58)
5fn main() {
6
7 let mut args = std::env::args();
8 args.next();
9 let rate: usize = args.next().expect("Must specify rate").parse().expect("Rate must be an usize");
10 let duration_s: usize = args.next().expect("Must specify duration_s").parse().expect("duration_s must be an usize");
11
12 timely::execute_from_args(args, move |worker| {
13
14 let index = worker.index();
15 let peers = worker.peers();
16
17 let timer = std::time::Instant::now();
18
19 let mut input = InputHandle::new();
20 let probe = ProbeHandle::new();
21
22 // Create a dataflow that discards input data (just synchronizes).
23 worker.dataflow(|scope| {
24
25 let stream = scope.input_from(&mut input);
26
27 let (loop_handle, loop_stream) = scope.feedback(1);
28
29 let step =
30 stream
31 .concat(loop_stream)
32 .map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 })
33 .filter(|x| x > &1);
34
35 step.probe_with(&probe)
36 .connect_loop(loop_handle);
37 });
38
39 let ns_per_request = 1_000_000_000 / rate;
40 let mut insert_counter = index; // counts up as we insert records.
41 let mut retire_counter = index; // counts up as we retire records.
42
43 let mut inserted_ns = 0;
44
45 // We repeatedly consult the elapsed time, and introduce any data now considered available.
46 // At the same time, we observe the output and record which inputs are considered retired.
47
48 let mut counts = vec![[0u64; 16]; 64];
49
50 let counter_limit = rate * duration_s;
51 while retire_counter < counter_limit {
52
53 // Open-loop latency-throughput test, parameterized by offered rate `ns_per_request`.
54 let elapsed = timer.elapsed();
55 let elapsed_ns = elapsed.as_secs() * 1_000_000_000 + (elapsed.subsec_nanos() as u64);
56
57 // Determine completed ns.
58 let acknowledged_ns: u64 = probe.with_frontier(|frontier| frontier[0]);
59
60 // Notice any newly-retired records.
61 while ((retire_counter * ns_per_request) as u64) < acknowledged_ns && retire_counter < counter_limit {
62 let requested_at = (retire_counter * ns_per_request) as u64;
63 let latency_ns = elapsed_ns - requested_at;
64
65 let count_index = latency_ns.next_power_of_two().trailing_zeros() as usize;
66 let low_bits = ((elapsed_ns - requested_at) >> (count_index - 5)) & 0xF;
67 counts[count_index][low_bits as usize] += 1;
68
69 retire_counter += peers;
70 }
71
72 // Now, should we introduce more records before stepping the worker?
73 // Three choices here:
74 //
75 // 1. Wait until previous batch acknowledged.
76 // 2. Tick at most once every millisecond-ish.
77 // 3. Geometrically increasing outstanding batches.
78
79 // Technique 1:
80 // let target_ns = if acknowledged_ns >= inserted_ns { elapsed_ns } else { inserted_ns };
81
82 // Technique 2:
83 // let target_ns = elapsed_ns & !((1 << 20) - 1);
84
85 // Technique 3:
86 let scale = (inserted_ns - acknowledged_ns).next_power_of_two();
87 let target_ns = elapsed_ns & !(scale - 1);
88
89 if inserted_ns < target_ns {
90
91 while ((insert_counter * ns_per_request) as u64) < target_ns {
92 input.send(insert_counter);
93 insert_counter += peers;
94 }
95 input.advance_to(target_ns);
96 inserted_ns = target_ns;
97 }
98
99 worker.step();
100 }
101
102 // Report observed latency measurements.
103 if index == 0 {
104
105 let mut results = Vec::new();
106 let total = counts.iter().map(|x| x.iter().sum::<u64>()).sum();
107 let mut sum = 0;
108 for index in (10 .. counts.len()).rev() {
109 for sub in (0 .. 16).rev() {
110 if sum > 0 && sum < total {
111 let latency = (1 << (index-1)) + (sub << (index-5));
112 let fraction = (sum as f64) / (total as f64);
113 results.push((latency, fraction));
114 }
115 sum += counts[index][sub];
116 }
117 }
118 for (latency, fraction) in results.drain(..).rev() {
119 println!("{}\t{}", latency, fraction);
120 }
121 }
122
123 }).unwrap();
124}More examples
examples/openloop.rs (line 52)
5fn main() {
6
7 let mut args = std::env::args();
8 args.next();
9 let rate: usize = args.next().expect("Must specify rate").parse().expect("Rate must be an usize");
10 let duration_s: usize = args.next().expect("Must specify duration_s").parse().expect("duration_s must be an usize");
11
12 timely::execute_from_args(args, move |worker| {
13
14 let index = worker.index();
15 let peers = worker.peers();
16
17 // re-synchronize all workers (account for start-up).
18 timely::synchronization::Barrier::new(worker).wait();
19
20 let timer = std::time::Instant::now();
21
22 let mut input = InputHandle::new();
23 let probe = ProbeHandle::new();
24
25 // Create a dataflow that discards input data (just synchronizes).
26 worker.dataflow(|scope| {
27 scope
28 .input_from(&mut input) // read input.
29 .filter(|_| false) // do nothing.
30 .probe_with(&probe); // observe output.
31 });
32
33 let ns_per_request = 1_000_000_000 / rate;
34 let mut insert_counter = index; // counts up as we insert records.
35 let mut retire_counter = index; // counts up as we retire records.
36
37 let mut inserted_ns = 0;
38
39 // We repeatedly consult the elapsed time, and introduce any data now considered available.
40 // At the same time, we observe the output and record which inputs are considered retired.
41
42 let mut counts = vec![[0u64; 16]; 64];
43
44 let counter_limit = rate * duration_s;
45 while retire_counter < counter_limit {
46
47 // Open-loop latency-throughput test, parameterized by offered rate `ns_per_request`.
48 let elapsed = timer.elapsed();
49 let elapsed_ns = elapsed.as_secs() * 1_000_000_000 + (elapsed.subsec_nanos() as u64);
50
51 // Determine completed ns.
52 let acknowledged_ns: u64 = probe.with_frontier(|frontier| frontier[0]);
53
54 // Notice any newly-retired records.
55 while ((retire_counter * ns_per_request) as u64) < acknowledged_ns && retire_counter < counter_limit {
56 let requested_at = (retire_counter * ns_per_request) as u64;
57 let latency_ns = elapsed_ns - requested_at;
58
59 let count_index = latency_ns.next_power_of_two().trailing_zeros() as usize;
60 let low_bits = ((elapsed_ns - requested_at) >> (count_index - 5)) & 0xF;
61 counts[count_index][low_bits as usize] += 1;
62
63 retire_counter += peers;
64 }
65
66 // Now, should we introduce more records before stepping the worker?
67 // Three choices here:
68 //
69 // 1. Wait until previous batch acknowledged.
70 // 2. Tick at most once every millisecond-ish.
71 // 3. Geometrically increase outstanding batches.
72
73 // Technique 1:
74 // let target_ns = if acknowledged_ns >= inserted_ns { elapsed_ns } else { inserted_ns };
75
76 // Technique 2:
77 // let target_ns = elapsed_ns & !((1 << 20) - 1);
78
79 // Technique 3:
80 let target_ns = {
81 let delta: u64 = inserted_ns - acknowledged_ns;
82 let bits = ::std::mem::size_of::<u64>() * 8 - delta.leading_zeros() as usize;
83 let scale = ::std::cmp::max((1 << bits) / 4, 1024);
84 elapsed_ns & !(scale - 1)
85 };
86
87 // Common for each technique.
88 if inserted_ns < target_ns {
89
90 while ((insert_counter * ns_per_request) as u64) < target_ns {
91 input.send(insert_counter);
92 insert_counter += peers;
93 }
94 input.advance_to(target_ns);
95 inserted_ns = target_ns;
96 }
97
98 worker.step();
99 }
100
101 // Report observed latency measurements.
102 if index == 0 {
103
104 let mut results = Vec::new();
105 let total = counts.iter().map(|x| x.iter().sum::<u64>()).sum();
106 let mut sum = 0;
107 for index in (10 .. counts.len()).rev() {
108 for sub in (0 .. 16).rev() {
109 if sum > 0 && sum < total {
110 let latency = (1 << (index-1)) + (sub << (index-5));
111 let fraction = (sum as f64) / (total as f64);
112 results.push((latency, fraction));
113 }
114 sum += counts[index][sub];
115 }
116 }
117 for (latency, fraction) in results.drain(..).rev() {
118 println!("{}\t{}", latency, fraction);
119 }
120 }
121
122 }).unwrap();
123}Trait Implementations§
Auto Trait Implementations§
impl<T> Freeze for Handle<T>
impl<T> !RefUnwindSafe for Handle<T>
impl<T> !Send for Handle<T>
impl<T> !Sync for Handle<T>
impl<T> Unpin for Handle<T>
impl<T> UnsafeUnpin for Handle<T>
impl<T> !UnwindSafe for Handle<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more