Skip to main content

columnar/
columnar.rs

1//! Wordcount based on the `columnar` crate.
2
3use std::collections::HashMap;
4
5use columnar::Index;
6use timely::Accountable;
7use timely::container::CapacityContainerBuilder;
8use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
9use timely::dataflow::InputHandle;
10use timely::dataflow::operators::{InspectCore, Operator, Probe};
11use timely::dataflow::ProbeHandle;
12
13// Creates `WordCountContainer` and `WordCountReference` structs,
14// as well as various implementations relating them to `WordCount`.
15#[derive(columnar::Columnar)]
16struct WordCount {
17    text: String,
18    diff: i64,
19}
20
21fn main() {
22
23    type InnerContainer = <WordCount as columnar::Columnar>::Container;
24    type Container = Column<InnerContainer>;
25
26    use columnar::Len;
27
28    let config = timely::Config {
29        communication: timely::CommunicationConfig::ProcessBinary(3),
30        worker: timely::WorkerConfig::default(),
31    };
32
33    // initializes and runs a timely dataflow.
34    timely::execute(config, |worker| {
35        let mut input = <InputHandle<_, CapacityContainerBuilder<Container>>>::new();
36        let probe = ProbeHandle::new();
37
38        // create a new input, exchange data, and inspect its output
39        worker.dataflow::<usize, _, _>(|scope| {
40            input
41                .to_stream(scope)
42                .unary(
43                    Pipeline,
44                    "Split",
45                    |_cap, _info| {
46                        move |input, output| {
47                            input.for_each_time(|time, data| {
48                                let mut session = output.session(&time);
49                                for data in data {
50                                    for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
51                                        wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
52                                    }) {
53                                        session.give(wordcount);
54                                    }
55                                }
56                            });
57                        }
58                    },
59                )
60                .container::<Container>()
61                .unary_frontier(
62                    ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
63                    "WordCount",
64                    |_capability, _info| {
65                        let mut queues = HashMap::new();
66                        let mut counts = HashMap::new();
67
68                        move |(input, frontier), output| {
69                            input.for_each_time(|time, data| {
70                                queues
71                                    .entry(time.retain(output.output_index()))
72                                    .or_insert(Vec::new())
73                                    .extend(data.map(std::mem::take));
74
75                            });
76
77                            for (key, val) in queues.iter_mut() {
78                                if !frontier.less_equal(key.time()) {
79                                    let mut session = output.session(key);
80                                    for batch in val.drain(..) {
81                                        for wordcount in batch.borrow().into_index_iter() {
82                                            let total =
83                                            if let Some(count) = counts.get_mut(wordcount.text) {
84                                                *count += wordcount.diff;
85                                                *count
86                                            }
87                                            else {
88                                                counts.insert(wordcount.text.to_string(), *wordcount.diff);
89                                                *wordcount.diff
90                                            };
91                                            session.give(WordCountReference { text: wordcount.text, diff: total });
92                                        }
93                                    }
94                                }
95                            }
96
97                            queues.retain(|_key, val| !val.is_empty());
98                        }
99                    },
100                )
101                .container::<Container>()
102                .inspect_container(|x| {
103                    match x {
104                        Ok((time, data)) => {
105                            println!("seen at: {:?}\t{:?} records", time, data.record_count());
106                            for wc in data.borrow().into_index_iter() {
107                                println!("  {}: {}", wc.text, wc.diff);
108                            }
109                        },
110                        Err(frontier) => println!("frontier advanced to {:?}", frontier),
111                    }
112                })
113                .probe_with(&probe);
114        });
115
116        // introduce data and watch!
117        for round in 0..10 {
118            input.send(WordCountReference { text: "flat container", diff: 1 });
119            input.advance_to(round + 1);
120            while probe.less_than(input.time()) {
121                worker.step();
122            }
123        }
124    })
125    .unwrap();
126}
127
128
129pub use container::Column;
130mod container {
131
132    use columnar::bytes::stash::Stash;
133
134    #[derive(Clone, Default)]
135    pub struct Column<C> { pub stash: Stash<C, timely_bytes::arc::Bytes> }
136
137    use columnar::{Len, Index};
138    use columnar::bytes::{EncodeDecode, Indexed};
139    use columnar::common::IterOwn;
140
141    impl<C: columnar::ContainerBytes> Column<C> {
142        /// Borrows the contents no matter their representation.
143        #[inline(always)] pub fn borrow(&self) -> C::Borrowed<'_> { self.stash.borrow() }
144    }
145
146    impl<C: columnar::ContainerBytes> timely::Accountable for Column<C> {
147        #[inline] fn record_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() }
148        #[inline] fn is_empty(&self) -> bool { self.borrow().is_empty() }
149    }
150    impl<C: columnar::ContainerBytes> timely::container::DrainContainer for Column<C> {
151        type Item<'a> = C::Ref<'a>;
152        type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
153        fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { self.borrow().into_index_iter() }
154    }
155
156    impl<C: columnar::ContainerBytes> timely::container::SizableContainer for Column<C> {
157        fn at_capacity(&self) -> bool {
158            match &self.stash {
159                Stash::Typed(t) => {
160                    let length_in_bytes = 8 * Indexed::length_in_words(&t.borrow());
161                    length_in_bytes >= (1 << 20)
162                },
163                Stash::Bytes(_) => true,
164                Stash::Align(_) => true,
165            }
166        }
167        fn ensure_capacity(&mut self, _stash: &mut Option<Self>) { }
168    }
169
170    impl<C: columnar::Container, T> timely::container::PushInto<T> for Column<C> where C: columnar::Push<T> {
171        #[inline] fn push_into(&mut self, item: T) { use columnar::Push; self.stash.push(item) }
172    }
173
174    impl<C: columnar::ContainerBytes> timely::dataflow::channels::ContainerBytes for Column<C> {
175        fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { Self { stash: bytes.into() } }
176        fn length_in_bytes(&self) -> usize { self.stash.length_in_bytes() }
177        fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) { self.stash.into_bytes(writer) }
178    }
179}
180
181
182use builder::ColumnBuilder;
183mod builder {
184
185    use std::collections::VecDeque;
186    use columnar::bytes::{EncodeDecode, Indexed, stash::Stash};
187    use super::Column;
188
189    /// A container builder for `Column<C>`.
190    #[derive(Default)]
191    pub struct ColumnBuilder<C> {
192        /// Container that we're writing to.
193        current: C,
194        /// Empty allocation.
195        empty: Option<Column<C>>,
196        /// Completed containers pending to be sent.
197        pending: VecDeque<Column<C>>,
198    }
199
200    impl<C: columnar::ContainerBytes, T> timely::container::PushInto<T> for ColumnBuilder<C> where C: columnar::Push<T> {
201        #[inline]
202        fn push_into(&mut self, item: T) {
203            self.current.push(item);
204            // If there is less than 10% slop with 2MB backing allocations, mint a container.
205            let words = Indexed::length_in_words(&self.current.borrow());
206            let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
207            if round - words < round / 10 {
208                let mut alloc = Vec::with_capacity(round);
209                Indexed::encode(&mut alloc, &self.current.borrow());
210                self.pending.push_back(Column { stash: Stash::Align(alloc.into_boxed_slice()) });
211                self.current.clear();
212            }
213        }
214    }
215
216    use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
217    impl<C: columnar::ContainerBytes> ContainerBuilder for ColumnBuilder<C> {
218        type Container = Column<C>;
219
220        #[inline]
221        fn extract(&mut self) -> Option<&mut Self::Container> {
222            if let Some(container) = self.pending.pop_front() {
223                self.empty = Some(container);
224                self.empty.as_mut()
225            } else {
226                None
227            }
228        }
229
230        #[inline]
231        fn finish(&mut self) -> Option<&mut Self::Container> {
232            if !self.current.is_empty() {
233                self.pending.push_back(Column { stash: Stash::Typed(std::mem::take(&mut self.current)) });
234            }
235            self.empty = self.pending.pop_front();
236            self.empty.as_mut()
237        }
238
239        #[inline]
240        fn relax(&mut self) {
241            // The caller is responsible for draining all contents; assert that we are empty.
242            // The assertion is not strictly necessary, but it helps catch bugs.
243            assert!(self.current.is_empty());
244            assert!(self.pending.is_empty());
245            *self = Self::default();
246        }
247    }
248
249    impl<C: columnar::ContainerBytes> LengthPreservingContainerBuilder for ColumnBuilder<C> { }
250}