1use std::collections::HashMap;
4
5use columnar::Index;
6use timely::Accountable;
7use timely::container::CapacityContainerBuilder;
8use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
9use timely::dataflow::InputHandle;
10use timely::dataflow::operators::{InspectCore, Operator, Probe};
11use timely::dataflow::ProbeHandle;
12
13#[derive(columnar::Columnar)]
16struct WordCount {
17 text: String,
18 diff: i64,
19}
20
21fn main() {
22
23 type InnerContainer = <WordCount as columnar::Columnar>::Container;
24 type Container = Column<InnerContainer>;
25
26 use columnar::Len;
27
28 let config = timely::Config {
29 communication: timely::CommunicationConfig::ProcessBinary(3),
30 worker: timely::WorkerConfig::default(),
31 };
32
33 timely::execute(config, |worker| {
35 let mut input = <InputHandle<_, CapacityContainerBuilder<Container>>>::new();
36 let probe = ProbeHandle::new();
37
38 worker.dataflow::<usize, _, _>(|scope| {
40 input
41 .to_stream(scope)
42 .unary(
43 Pipeline,
44 "Split",
45 |_cap, _info| {
46 move |input, output| {
47 input.for_each_time(|time, data| {
48 let mut session = output.session(&time);
49 for data in data {
50 for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
51 wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
52 }) {
53 session.give(wordcount);
54 }
55 }
56 });
57 }
58 },
59 )
60 .container::<Container>()
61 .unary_frontier(
62 ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
63 "WordCount",
64 |_capability, _info| {
65 let mut queues = HashMap::new();
66 let mut counts = HashMap::new();
67
68 move |(input, frontier), output| {
69 input.for_each_time(|time, data| {
70 queues
71 .entry(time.retain(output.output_index()))
72 .or_insert(Vec::new())
73 .extend(data.map(std::mem::take));
74
75 });
76
77 for (key, val) in queues.iter_mut() {
78 if !frontier.less_equal(key.time()) {
79 let mut session = output.session(key);
80 for batch in val.drain(..) {
81 for wordcount in batch.borrow().into_index_iter() {
82 let total =
83 if let Some(count) = counts.get_mut(wordcount.text) {
84 *count += wordcount.diff;
85 *count
86 }
87 else {
88 counts.insert(wordcount.text.to_string(), *wordcount.diff);
89 *wordcount.diff
90 };
91 session.give(WordCountReference { text: wordcount.text, diff: total });
92 }
93 }
94 }
95 }
96
97 queues.retain(|_key, val| !val.is_empty());
98 }
99 },
100 )
101 .container::<Container>()
102 .inspect_container(|x| {
103 match x {
104 Ok((time, data)) => {
105 println!("seen at: {:?}\t{:?} records", time, data.record_count());
106 for wc in data.borrow().into_index_iter() {
107 println!(" {}: {}", wc.text, wc.diff);
108 }
109 },
110 Err(frontier) => println!("frontier advanced to {:?}", frontier),
111 }
112 })
113 .probe_with(&probe);
114 });
115
116 for round in 0..10 {
118 input.send(WordCountReference { text: "flat container", diff: 1 });
119 input.advance_to(round + 1);
120 while probe.less_than(input.time()) {
121 worker.step();
122 }
123 }
124 })
125 .unwrap();
126}
127
128
129pub use container::Column;
130mod container {
131
132 use columnar::bytes::stash::Stash;
133
134 #[derive(Clone, Default)]
135 pub struct Column<C> { pub stash: Stash<C, timely_bytes::arc::Bytes> }
136
137 use columnar::{Len, Index};
138 use columnar::bytes::{EncodeDecode, Indexed};
139 use columnar::common::IterOwn;
140
141 impl<C: columnar::ContainerBytes> Column<C> {
142 #[inline(always)] pub fn borrow(&self) -> C::Borrowed<'_> { self.stash.borrow() }
144 }
145
146 impl<C: columnar::ContainerBytes> timely::Accountable for Column<C> {
147 #[inline] fn record_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() }
148 #[inline] fn is_empty(&self) -> bool { self.borrow().is_empty() }
149 }
150 impl<C: columnar::ContainerBytes> timely::container::DrainContainer for Column<C> {
151 type Item<'a> = C::Ref<'a>;
152 type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
153 fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { self.borrow().into_index_iter() }
154 }
155
156 impl<C: columnar::ContainerBytes> timely::container::SizableContainer for Column<C> {
157 fn at_capacity(&self) -> bool {
158 match &self.stash {
159 Stash::Typed(t) => {
160 let length_in_bytes = 8 * Indexed::length_in_words(&t.borrow());
161 length_in_bytes >= (1 << 20)
162 },
163 Stash::Bytes(_) => true,
164 Stash::Align(_) => true,
165 }
166 }
167 fn ensure_capacity(&mut self, _stash: &mut Option<Self>) { }
168 }
169
170 impl<C: columnar::Container, T> timely::container::PushInto<T> for Column<C> where C: columnar::Push<T> {
171 #[inline] fn push_into(&mut self, item: T) { use columnar::Push; self.stash.push(item) }
172 }
173
174 impl<C: columnar::ContainerBytes> timely::dataflow::channels::ContainerBytes for Column<C> {
175 fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { Self { stash: bytes.into() } }
176 fn length_in_bytes(&self) -> usize { self.stash.length_in_bytes() }
177 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) { self.stash.into_bytes(writer) }
178 }
179}
180
181
182use builder::ColumnBuilder;
183mod builder {
184
185 use std::collections::VecDeque;
186 use columnar::bytes::{EncodeDecode, Indexed, stash::Stash};
187 use super::Column;
188
189 #[derive(Default)]
191 pub struct ColumnBuilder<C> {
192 current: C,
194 empty: Option<Column<C>>,
196 pending: VecDeque<Column<C>>,
198 }
199
200 impl<C: columnar::ContainerBytes, T> timely::container::PushInto<T> for ColumnBuilder<C> where C: columnar::Push<T> {
201 #[inline]
202 fn push_into(&mut self, item: T) {
203 self.current.push(item);
204 let words = Indexed::length_in_words(&self.current.borrow());
206 let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
207 if round - words < round / 10 {
208 let mut alloc = Vec::with_capacity(round);
209 Indexed::encode(&mut alloc, &self.current.borrow());
210 self.pending.push_back(Column { stash: Stash::Align(alloc.into_boxed_slice()) });
211 self.current.clear();
212 }
213 }
214 }
215
216 use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
217 impl<C: columnar::ContainerBytes> ContainerBuilder for ColumnBuilder<C> {
218 type Container = Column<C>;
219
220 #[inline]
221 fn extract(&mut self) -> Option<&mut Self::Container> {
222 if let Some(container) = self.pending.pop_front() {
223 self.empty = Some(container);
224 self.empty.as_mut()
225 } else {
226 None
227 }
228 }
229
230 #[inline]
231 fn finish(&mut self) -> Option<&mut Self::Container> {
232 if !self.current.is_empty() {
233 self.pending.push_back(Column { stash: Stash::Typed(std::mem::take(&mut self.current)) });
234 }
235 self.empty = self.pending.pop_front();
236 self.empty.as_mut()
237 }
238
239 #[inline]
240 fn relax(&mut self) {
241 assert!(self.current.is_empty());
244 assert!(self.pending.is_empty());
245 *self = Self::default();
246 }
247 }
248
249 impl<C: columnar::ContainerBytes> LengthPreservingContainerBuilder for ColumnBuilder<C> { }
250}