Skip to main content

Session

Struct Session 

Source
pub struct Session<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait<T>> { /* private fields */ }
Expand description

An active output building session, which accepts items and builds containers.

Implementations§

Source§

impl<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait<T>> Session<'a, 'b, T, CB, CT>

Source

pub fn builder(&mut self) -> &mut CB

Provides access to the underlying container builder.

Source

pub fn give<D>(&mut self, data: D)
where CB: PushInto<D>,

Provides one record at the time specified by the Session.

Examples found in repository?
examples/unordered_input.rs (line 13)
4fn main() {
5    timely::execute(Config::thread(), |worker| {
6        let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7            let (input, stream) = scope.new_unordered_input();
8            stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9            input
10        });
11
12        for round in 0..10 {
13            input.activate().session(&cap).give(round);
14            cap = cap.delayed(&(round + 1));
15            worker.step();
16        }
17    }).unwrap();
18}
More examples
Hide additional examples
examples/unionfind.rs (line 82)
57    fn union_find(self) -> StreamVec<G, (usize, usize)> {
58
59        self.unary(Pipeline, "UnionFind", |_,_| {
60
61            let mut roots = vec![];  // u32 works, and is smaller than uint/u64
62            let mut ranks = vec![];  // u8 should be large enough (n < 2^256)
63
64            move |input, output| {
65
66                input.for_each_time(|time, data| {
67                    let mut session = output.session(&time);
68                    for &mut (mut x, mut y) in data.flatten() {
69
70                        // grow arrays if required.
71                        let m = ::std::cmp::max(x, y);
72                        for i in roots.len() .. (m + 1) {
73                            roots.push(i);
74                            ranks.push(0);
75                        }
76
77                        // look up roots for `x` and `y`.
78                        while x != roots[x] { x = roots[x]; }
79                        while y != roots[y] { y = roots[y]; }
80
81                        if x != y {
82                            session.give((x, y));
83                            match ranks[x].cmp(&ranks[y]) {
84                                Ordering::Less    => { roots[x] = y },
85                                Ordering::Greater => { roots[y] = x },
86                                Ordering::Equal   => { roots[y] = x; ranks[x] += 1 },
87                            }
88                        }
89                    }
90                });
91            }
92        })
93    }
examples/distinct.rs (line 31)
8fn main() {
9    // initializes and runs a timely dataflow.
10    timely::execute_from_args(std::env::args(), |worker| {
11        let index = worker.index();
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        // create a new input, exchange data, and inspect its output
16        worker.dataflow::<usize,_,_>(|scope| {
17            let mut counts_by_time = HashMap::new();
18            scope.input_from(&mut input)
19                .unary(Exchange::new(|x| *x), "Distinct", move |_, _|
20                    move |input, output| {
21                        input.for_each_time(|time, data| {
22                            let counts =
23                            counts_by_time
24                                .entry(*time.time())
25                                .or_insert(HashMap::new());
26                            let mut session = output.session(&time);
27                            for data in data {
28                                for &datum in data.iter() {
29                                    let count = counts.entry(datum).or_insert(0);
30                                    if *count == 0 {
31                                        session.give(datum);
32                                    }
33                                    *count += 1;
34                                }
35                            }
36                        })
37                    })
38                .container::<Vec<_>>()
39                .inspect(move |x| println!("worker {}:\tvalue {}", index, x))
40                .probe_with(&probe);
41        });
42
43        // introduce data and watch!
44        for round in 0..1 {
45            if index == 0 {
46                [0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x));
47            } else if index == 1 {
48                [0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x));
49            }
50            input.advance_to(round + 1);
51            while probe.less_than(input.time()) {
52                worker.step();
53            }
54        }
55    }).unwrap();
56}
examples/wordcount.rs (line 45)
8fn main() {
9    // initializes and runs a timely dataflow.
10    timely::execute_from_args(std::env::args(), |worker| {
11
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        // define a distribution function for strings.
16        let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64);
17
18        // create a new input, exchange data, and inspect its output
19        worker.dataflow::<usize,_,_>(|scope| {
20            input.to_stream(scope)
21                 .flat_map(|(text, diff): (String, i64)|
22                    text.split_whitespace()
23                        .map(move |word| (word.to_owned(), diff))
24                        .collect::<Vec<_>>()
25                 )
26                 .unary_frontier(exchange, "WordCount", |_capability, _info| {
27
28                    let mut queues = HashMap::new();
29                    let mut counts = HashMap::new();
30
31                    move |(input, frontier), output| {
32                        input.for_each_time(|time, data| {
33                            queues.entry(time.retain(output.output_index()))
34                                  .or_insert(Vec::new())
35                                  .extend(data.map(std::mem::take));
36                        });
37
38                        for (key, val) in queues.iter_mut() {
39                            if !frontier.less_equal(key.time()) {
40                                let mut session = output.session(key);
41                                for mut batch in val.drain(..) {
42                                    for (word, diff) in batch.drain(..) {
43                                        let entry = counts.entry(word.clone()).or_insert(0i64);
44                                        *entry += diff;
45                                        session.give((word, *entry));
46                                    }
47                                }
48                            }
49                        }
50
51                        queues.retain(|_key, val| !val.is_empty());
52                    }})
53                 .container::<Vec<_>>()
54                 .inspect(|x| println!("seen: {:?}", x))
55                 .probe_with(&probe);
56        });
57
58        // introduce data and watch!
59        for round in 0..10 {
60            input.send(("round".to_owned(), 1));
61            input.advance_to(round + 1);
62            while probe.less_than(input.time()) {
63                worker.step();
64            }
65        }
66    }).unwrap();
67}
examples/hashjoin.rs (line 46)
8fn main() {
9
10    // command-line args: numbers of nodes and edges in the random graph.
11    let keys: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
12    let vals: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13    let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15    timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17        let index = worker.index();
18        let peers = worker.peers();
19
20        let mut input1 = InputHandle::new();
21        let mut input2 = InputHandle::new();
22        let probe = ProbeHandle::new();
23
24        worker.dataflow(|scope| {
25
26            let stream1 = scope.input_from(&mut input1);
27            let stream2 = scope.input_from(&mut input2);
28
29            let exchange1 = Exchange::new(|x: &(u64, u64)| x.0);
30            let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);
31
32            stream1
33                .binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
34
35                    let mut map1 = HashMap::<u64, Vec<u64>>::new();
36                    let mut map2 = HashMap::<u64, Vec<u64>>::new();
37
38                    move |input1, input2, output| {
39
40                        // Drain first input, check second map, update first map.
41                        input1.for_each_time(|time, data| {
42                            let mut session = output.session(&time);
43                            for (key, val1) in data.flat_map(|d| d.drain(..)) {
44                                if let Some(values) = map2.get(&key) {
45                                    for val2 in values.iter() {
46                                        session.give((val1, *val2));
47                                    }
48                                }
49
50                                map1.entry(key).or_default().push(val1);
51                            }
52                        });
53
54                        // Drain second input, check first map, update second map.
55                        input2.for_each_time(|time, data| {
56                            let mut session = output.session(&time);
57                            for (key, val2) in data.flat_map(|d| d.drain(..)) {
58                                if let Some(values) = map1.get(&key) {
59                                    for val1 in values.iter() {
60                                        session.give((*val1, val2));
61                                    }
62                                }
63
64                                map2.entry(key).or_default().push(val2);
65                            }
66                        });
67                    }
68                })
69                .container::<Vec<_>>()
70                .probe_with(&probe);
71        });
72
73        // Generate roughly random data.
74        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
75        let hasher = BuildHasherDefault::<DefaultHasher>::new();
76        let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) % keys,
77                                             hasher.hash_one(&(i,index,1)) % keys,
78                                             hasher.hash_one(&(i,index,2)) % keys,
79                                             hasher.hash_one(&(i,index,3)) % keys));
80
81        let timer = std::time::Instant::now();
82
83        let mut sent = 0;
84        while sent < (vals / peers) {
85
86            // Send some amount of data, no more than `batch`.
87            let to_send = std::cmp::min(batch, vals/peers - sent);
88            for (src0, dst0, src1, dst1) in (&mut insert).take(to_send) {
89                input1.send((src0, dst0));
90                input2.send((src1, dst1));
91            }
92            sent += to_send;
93
94            // Advance input, iterate until data cleared.
95            let next = input1.time() + 1;
96            input1.advance_to(next);
97            input2.advance_to(next);
98            while probe.less_than(input1.time()) {
99                worker.step();
100            }
101
102            println!("{:?}\tworker {} batch complete", timer.elapsed(), index)
103        }
104
105    }).unwrap(); // asserts error-free execution;
106}
examples/columnar.rs (line 53)
21fn main() {
22
23    type InnerContainer = <WordCount as columnar::Columnar>::Container;
24    type Container = Column<InnerContainer>;
25
26    use columnar::Len;
27
28    let config = timely::Config {
29        communication: timely::CommunicationConfig::ProcessBinary(3),
30        worker: timely::WorkerConfig::default(),
31    };
32
33    // initializes and runs a timely dataflow.
34    timely::execute(config, |worker| {
35        let mut input = <InputHandle<_, CapacityContainerBuilder<Container>>>::new();
36        let probe = ProbeHandle::new();
37
38        // create a new input, exchange data, and inspect its output
39        worker.dataflow::<usize, _, _>(|scope| {
40            input
41                .to_stream(scope)
42                .unary(
43                    Pipeline,
44                    "Split",
45                    |_cap, _info| {
46                        move |input, output| {
47                            input.for_each_time(|time, data| {
48                                let mut session = output.session(&time);
49                                for data in data {
50                                    for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
51                                        wordcount.text.split(|b| b.is_ascii_whitespace()).filter(|s| !s.is_empty()).map(move |text| WordCountReference { text, diff: wordcount.diff })
52                                    }) {
53                                        session.give(wordcount);
54                                    }
55                                }
56                            });
57                        }
58                    },
59                )
60                .container::<Container>()
61                .unary_frontier(
62                    ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&[u8],&i64>| x.text.len() as u64),
63                    "WordCount",
64                    |_capability, _info| {
65                        let mut queues = HashMap::new();
66                        let mut counts = HashMap::new();
67
68                        move |(input, frontier), output| {
69                            input.for_each_time(|time, data| {
70                                queues
71                                    .entry(time.retain(output.output_index()))
72                                    .or_insert(Vec::new())
73                                    .extend(data.map(std::mem::take));
74
75                            });
76
77                            for (key, val) in queues.iter_mut() {
78                                if !frontier.less_equal(key.time()) {
79                                    let mut session = output.session(key);
80                                    for batch in val.drain(..) {
81                                        for wordcount in batch.borrow().into_index_iter() {
82                                            let total =
83                                            if let Some(count) = counts.get_mut(wordcount.text) {
84                                                *count += wordcount.diff;
85                                                *count
86                                            }
87                                            else {
88                                                counts.insert(wordcount.text.to_vec(), *wordcount.diff);
89                                                *wordcount.diff
90                                            };
91                                            session.give(WordCountReference { text: wordcount.text, diff: total });
92                                        }
93                                    }
94                                }
95                            }
96
97                            queues.retain(|_key, val| !val.is_empty());
98                        }
99                    },
100                )
101                .container::<Container>()
102                .inspect_container(|x| {
103                    match x {
104                        Ok((time, data)) => {
105                            println!("seen at: {:?}\t{:?} records", time, data.record_count());
106                            for wc in data.borrow().into_index_iter() {
107                                println!("  {}: {}", std::str::from_utf8(wc.text).unwrap_or("<invalid utf8>"), wc.diff);
108                            }
109                        },
110                        Err(frontier) => println!("frontier advanced to {:?}", frontier),
111                    }
112                })
113                .probe_with(&probe);
114        });
115
116        // introduce data and watch!
117        for round in 0..10 {
118            input.send(WordCountReference { text: "flat container", diff: 1 });
119            input.advance_to(round + 1);
120            while probe.less_than(input.time()) {
121                worker.step();
122            }
123        }
124    })
125    .unwrap();
126}
Source

pub fn give_iterator<I>(&mut self, iter: I)
where I: Iterator, CB: PushInto<I::Item>,

Provides an iterator of records at the time specified by the Session.

Source

pub fn give_container(&mut self, container: &mut CB::Container)

Provide a container at the time specified by the Session.

Source

pub fn give_containers<'c>( &mut self, containers: impl Iterator<Item = &'c mut CB::Container>, )

Provide multiple containers at the time specifid by the Session.

Source

pub fn extract_and_send(&mut self)

Extracts built containers and sends them.

Source

pub fn flush(&mut self)

Finalizes containers and sends them.

Trait Implementations§

Source§

impl<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait<T>> Drop for Session<'a, 'b, T, CB, CT>

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl<'a, 'b, T, CB, CT> Freeze for Session<'a, 'b, T, CB, CT>

§

impl<'a, 'b, T, CB, CT> !RefUnwindSafe for Session<'a, 'b, T, CB, CT>

§

impl<'a, 'b, T, CB, CT> !Send for Session<'a, 'b, T, CB, CT>

§

impl<'a, 'b, T, CB, CT> !Sync for Session<'a, 'b, T, CB, CT>

§

impl<'a, 'b, T, CB, CT> Unpin for Session<'a, 'b, T, CB, CT>

§

impl<'a, 'b, T, CB, CT> UnsafeUnpin for Session<'a, 'b, T, CB, CT>

§

impl<'a, 'b, T, CB, CT> !UnwindSafe for Session<'a, 'b, T, CB, CT>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.