pub struct Session<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait<T>> { /* private fields */ }Expand description
An active output building session, which accepts items and builds containers.
Implementations§
Source§impl<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait<T>> Session<'a, 'b, T, CB, CT>
impl<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait<T>> Session<'a, 'b, T, CB, CT>
Sourcepub fn give<D>(&mut self, data: D)where
CB: PushInto<D>,
pub fn give<D>(&mut self, data: D)where
CB: PushInto<D>,
Provides one record at the time specified by the Session.
Examples found in repository?
examples/unordered_input.rs (line 13)
4fn main() {
5 timely::execute(Config::thread(), |worker| {
6 let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7 let (input, stream) = scope.new_unordered_input();
8 stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9 input
10 });
11
12 for round in 0..10 {
13 input.activate().session(&cap).give(round);
14 cap = cap.delayed(&(round + 1));
15 worker.step();
16 }
17 }).unwrap();
18}More examples
examples/unionfind.rs (line 82)
57 fn union_find(self) -> StreamVec<G, (usize, usize)> {
58
59 self.unary(Pipeline, "UnionFind", |_,_| {
60
61 let mut roots = vec![]; // u32 works, and is smaller than uint/u64
62 let mut ranks = vec![]; // u8 should be large enough (n < 2^256)
63
64 move |input, output| {
65
66 input.for_each_time(|time, data| {
67 let mut session = output.session(&time);
68 for &mut (mut x, mut y) in data.flatten() {
69
70 // grow arrays if required.
71 let m = ::std::cmp::max(x, y);
72 for i in roots.len() .. (m + 1) {
73 roots.push(i);
74 ranks.push(0);
75 }
76
77 // look up roots for `x` and `y`.
78 while x != roots[x] { x = roots[x]; }
79 while y != roots[y] { y = roots[y]; }
80
81 if x != y {
82 session.give((x, y));
83 match ranks[x].cmp(&ranks[y]) {
84 Ordering::Less => { roots[x] = y },
85 Ordering::Greater => { roots[y] = x },
86 Ordering::Equal => { roots[y] = x; ranks[x] += 1 },
87 }
88 }
89 }
90 });
91 }
92 })
93 }examples/distinct.rs (line 31)
8fn main() {
9 // initializes and runs a timely dataflow.
10 timely::execute_from_args(std::env::args(), |worker| {
11 let index = worker.index();
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // create a new input, exchange data, and inspect its output
16 worker.dataflow::<usize,_,_>(|scope| {
17 let mut counts_by_time = HashMap::new();
18 scope.input_from(&mut input)
19 .unary(Exchange::new(|x| *x), "Distinct", move |_, _|
20 move |input, output| {
21 input.for_each_time(|time, data| {
22 let counts =
23 counts_by_time
24 .entry(*time.time())
25 .or_insert(HashMap::new());
26 let mut session = output.session(&time);
27 for data in data {
28 for &datum in data.iter() {
29 let count = counts.entry(datum).or_insert(0);
30 if *count == 0 {
31 session.give(datum);
32 }
33 *count += 1;
34 }
35 }
36 })
37 })
38 .container::<Vec<_>>()
39 .inspect(move |x| println!("worker {}:\tvalue {}", index, x))
40 .probe_with(&probe);
41 });
42
43 // introduce data and watch!
44 for round in 0..1 {
45 if index == 0 {
46 [0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x));
47 } else if index == 1 {
48 [0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x));
49 }
50 input.advance_to(round + 1);
51 while probe.less_than(input.time()) {
52 worker.step();
53 }
54 }
55 }).unwrap();
56}examples/wordcount.rs (line 45)
8fn main() {
9 // initializes and runs a timely dataflow.
10 timely::execute_from_args(std::env::args(), |worker| {
11
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // define a distribution function for strings.
16 let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64);
17
18 // create a new input, exchange data, and inspect its output
19 worker.dataflow::<usize,_,_>(|scope| {
20 input.to_stream(scope)
21 .flat_map(|(text, diff): (String, i64)|
22 text.split_whitespace()
23 .map(move |word| (word.to_owned(), diff))
24 .collect::<Vec<_>>()
25 )
26 .unary_frontier(exchange, "WordCount", |_capability, _info| {
27
28 let mut queues = HashMap::new();
29 let mut counts = HashMap::new();
30
31 move |(input, frontier), output| {
32 input.for_each_time(|time, data| {
33 queues.entry(time.retain(output.output_index()))
34 .or_insert(Vec::new())
35 .extend(data.map(std::mem::take));
36 });
37
38 for (key, val) in queues.iter_mut() {
39 if !frontier.less_equal(key.time()) {
40 let mut session = output.session(key);
41 for mut batch in val.drain(..) {
42 for (word, diff) in batch.drain(..) {
43 let entry = counts.entry(word.clone()).or_insert(0i64);
44 *entry += diff;
45 session.give((word, *entry));
46 }
47 }
48 }
49 }
50
51 queues.retain(|_key, val| !val.is_empty());
52 }})
53 .container::<Vec<_>>()
54 .inspect(|x| println!("seen: {:?}", x))
55 .probe_with(&probe);
56 });
57
58 // introduce data and watch!
59 for round in 0..10 {
60 input.send(("round".to_owned(), 1));
61 input.advance_to(round + 1);
62 while probe.less_than(input.time()) {
63 worker.step();
64 }
65 }
66 }).unwrap();
67}examples/hashjoin.rs (line 46)
8fn main() {
9
10 // command-line args: numbers of nodes and edges in the random graph.
11 let keys: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
12 let vals: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input1 = InputHandle::new();
21 let mut input2 = InputHandle::new();
22 let probe = ProbeHandle::new();
23
24 worker.dataflow(|scope| {
25
26 let stream1 = scope.input_from(&mut input1);
27 let stream2 = scope.input_from(&mut input2);
28
29 let exchange1 = Exchange::new(|x: &(u64, u64)| x.0);
30 let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);
31
32 stream1
33 .binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
34
35 let mut map1 = HashMap::<u64, Vec<u64>>::new();
36 let mut map2 = HashMap::<u64, Vec<u64>>::new();
37
38 move |input1, input2, output| {
39
40 // Drain first input, check second map, update first map.
41 input1.for_each_time(|time, data| {
42 let mut session = output.session(&time);
43 for (key, val1) in data.flat_map(|d| d.drain(..)) {
44 if let Some(values) = map2.get(&key) {
45 for val2 in values.iter() {
46 session.give((val1, *val2));
47 }
48 }
49
50 map1.entry(key).or_default().push(val1);
51 }
52 });
53
54 // Drain second input, check first map, update second map.
55 input2.for_each_time(|time, data| {
56 let mut session = output.session(&time);
57 for (key, val2) in data.flat_map(|d| d.drain(..)) {
58 if let Some(values) = map1.get(&key) {
59 for val1 in values.iter() {
60 session.give((*val1, val2));
61 }
62 }
63
64 map2.entry(key).or_default().push(val2);
65 }
66 });
67 }
68 })
69 .container::<Vec<_>>()
70 .probe_with(&probe);
71 });
72
73 // Generate roughly random data.
74 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
75 let hasher = BuildHasherDefault::<DefaultHasher>::new();
76 let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) % keys,
77 hasher.hash_one(&(i,index,1)) % keys,
78 hasher.hash_one(&(i,index,2)) % keys,
79 hasher.hash_one(&(i,index,3)) % keys));
80
81 let timer = std::time::Instant::now();
82
83 let mut sent = 0;
84 while sent < (vals / peers) {
85
86 // Send some amount of data, no more than `batch`.
87 let to_send = std::cmp::min(batch, vals/peers - sent);
88 for (src0, dst0, src1, dst1) in (&mut insert).take(to_send) {
89 input1.send((src0, dst0));
90 input2.send((src1, dst1));
91 }
92 sent += to_send;
93
94 // Advance input, iterate until data cleared.
95 let next = input1.time() + 1;
96 input1.advance_to(next);
97 input2.advance_to(next);
98 while probe.less_than(input1.time()) {
99 worker.step();
100 }
101
102 println!("{:?}\tworker {} batch complete", timer.elapsed(), index)
103 }
104
105 }).unwrap(); // asserts error-free execution;
106}examples/columnar.rs (line 53)
21fn main() {
22
23 type InnerContainer = <WordCount as columnar::Columnar>::Container;
24 type Container = Column<InnerContainer>;
25
26 use columnar::Len;
27
28 let config = timely::Config {
29 communication: timely::CommunicationConfig::ProcessBinary(3),
30 worker: timely::WorkerConfig::default(),
31 };
32
33 // initializes and runs a timely dataflow.
34 timely::execute(config, |worker| {
35 let mut input = <InputHandle<_, CapacityContainerBuilder<Container>>>::new();
36 let probe = ProbeHandle::new();
37
38 // create a new input, exchange data, and inspect its output
39 worker.dataflow::<usize, _, _>(|scope| {
40 input
41 .to_stream(scope)
42 .unary(
43 Pipeline,
44 "Split",
45 |_cap, _info| {
46 move |input, output| {
47 input.for_each_time(|time, data| {
48 let mut session = output.session(&time);
49 for data in data {
50 for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
51 wordcount.text.split(|b| b.is_ascii_whitespace()).filter(|s| !s.is_empty()).map(move |text| WordCountReference { text, diff: wordcount.diff })
52 }) {
53 session.give(wordcount);
54 }
55 }
56 });
57 }
58 },
59 )
60 .container::<Container>()
61 .unary_frontier(
62 ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&[u8],&i64>| x.text.len() as u64),
63 "WordCount",
64 |_capability, _info| {
65 let mut queues = HashMap::new();
66 let mut counts = HashMap::new();
67
68 move |(input, frontier), output| {
69 input.for_each_time(|time, data| {
70 queues
71 .entry(time.retain(output.output_index()))
72 .or_insert(Vec::new())
73 .extend(data.map(std::mem::take));
74
75 });
76
77 for (key, val) in queues.iter_mut() {
78 if !frontier.less_equal(key.time()) {
79 let mut session = output.session(key);
80 for batch in val.drain(..) {
81 for wordcount in batch.borrow().into_index_iter() {
82 let total =
83 if let Some(count) = counts.get_mut(wordcount.text) {
84 *count += wordcount.diff;
85 *count
86 }
87 else {
88 counts.insert(wordcount.text.to_vec(), *wordcount.diff);
89 *wordcount.diff
90 };
91 session.give(WordCountReference { text: wordcount.text, diff: total });
92 }
93 }
94 }
95 }
96
97 queues.retain(|_key, val| !val.is_empty());
98 }
99 },
100 )
101 .container::<Container>()
102 .inspect_container(|x| {
103 match x {
104 Ok((time, data)) => {
105 println!("seen at: {:?}\t{:?} records", time, data.record_count());
106 for wc in data.borrow().into_index_iter() {
107 println!(" {}: {}", std::str::from_utf8(wc.text).unwrap_or("<invalid utf8>"), wc.diff);
108 }
109 },
110 Err(frontier) => println!("frontier advanced to {:?}", frontier),
111 }
112 })
113 .probe_with(&probe);
114 });
115
116 // introduce data and watch!
117 for round in 0..10 {
118 input.send(WordCountReference { text: "flat container", diff: 1 });
119 input.advance_to(round + 1);
120 while probe.less_than(input.time()) {
121 worker.step();
122 }
123 }
124 })
125 .unwrap();
126}Additional examples can be found in:
Sourcepub fn give_iterator<I>(&mut self, iter: I)
pub fn give_iterator<I>(&mut self, iter: I)
Provides an iterator of records at the time specified by the Session.
Sourcepub fn give_container(&mut self, container: &mut CB::Container)
pub fn give_container(&mut self, container: &mut CB::Container)
Provide a container at the time specified by the Session.
Sourcepub fn give_containers<'c>(
&mut self,
containers: impl Iterator<Item = &'c mut CB::Container>,
)
pub fn give_containers<'c>( &mut self, containers: impl Iterator<Item = &'c mut CB::Container>, )
Provide multiple containers at the time specifid by the Session.
Sourcepub fn extract_and_send(&mut self)
pub fn extract_and_send(&mut self)
Extracts built containers and sends them.
Trait Implementations§
Auto Trait Implementations§
impl<'a, 'b, T, CB, CT> Freeze for Session<'a, 'b, T, CB, CT>
impl<'a, 'b, T, CB, CT> !RefUnwindSafe for Session<'a, 'b, T, CB, CT>
impl<'a, 'b, T, CB, CT> !Send for Session<'a, 'b, T, CB, CT>
impl<'a, 'b, T, CB, CT> !Sync for Session<'a, 'b, T, CB, CT>
impl<'a, 'b, T, CB, CT> Unpin for Session<'a, 'b, T, CB, CT>
impl<'a, 'b, T, CB, CT> UnsafeUnpin for Session<'a, 'b, T, CB, CT>
impl<'a, 'b, T, CB, CT> !UnwindSafe for Session<'a, 'b, T, CB, CT>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more