differential_dataflow/columnar/arrangement/
mod.rs1use std::rc::Rc;
12use crate::trace::implementations::ord_neu::OrdValBatch;
13use crate::trace::rc_blanket_impls::RcBuilder;
14use crate::trace::implementations::spine_fueled::Spine;
15
16use super::layout::ColumnarLayout;
17
18pub mod trie_merger;
19
20pub type ValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<ColumnarLayout<(K,V,T,R)>>>>;
22pub type ValBatcher<K, V, T, R> = super::batcher::MergeBatcher<(K,V,T,R)>;
24pub type ValChunker<U> = TrieChunker<U>;
26pub type ValBuilder<K, V, T, R> = RcBuilder<builder::ValMirror<(K,V,T,R)>>;
28
29pub use batch_container::Coltainer;
31pub mod batch_container {
32 use columnar::{Borrow, Columnar, Container, Clear, Push, Index, Len};
35 use crate::trace::implementations::BatchContainer;
36
37 pub struct Coltainer<C: Columnar> {
39 pub container: C::Container,
41 }
42
43 impl<C: Columnar> Default for Coltainer<C> {
44 fn default() -> Self { Self { container: Default::default() } }
45 }
46
47 impl<C: Columnar + Ord + Clone> BatchContainer for Coltainer<C> where for<'a> columnar::Ref<'a, C> : Ord {
48
49 type ReadItem<'a> = columnar::Ref<'a, C>;
50 type Owned = C;
51
52 #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { C::into_owned(item) }
53 #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.copy_from(item) }
54
55 #[inline(always)] fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.container.push(item) }
56 #[inline(always)] fn push_own(&mut self, item: &Self::Owned) { self.container.push(item) }
57
58 fn clear(&mut self) { self.container.clear() }
60
61 fn with_capacity(_size: usize) -> Self { Self::default() }
63 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
65 Self {
66 container: <C as Columnar>::Container::with_capacity_for([cont1.container.borrow(), cont2.container.borrow()].into_iter()),
67 }
68 }
69
70 #[inline(always)] fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { columnar::ContainerOf::<C>::reborrow_ref(item) }
72
73 #[inline(always)] fn index(&self, index: usize) -> Self::ReadItem<'_> { self.container.borrow().get(index) }
75
76 #[inline(always)] fn len(&self) -> usize { self.container.len() }
77
78 fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
85
86 let borrow = self.container.borrow();
87
88 let small_limit = 8;
89
90 if end > start + small_limit && function(borrow.get(start + small_limit)) {
92
93 let mut index = small_limit + 1;
95 if start + index < end && function(borrow.get(start + index)) {
96
97 let mut step = 1;
99 while start + index + step < end && function(borrow.get(start + index + step)) {
100 index += step;
101 step <<= 1;
102 }
103
104 step >>= 1;
106 while step > 0 {
107 if start + index + step < end && function(borrow.get(start + index + step)) {
108 index += step;
109 }
110 step >>= 1;
111 }
112
113 index += 1;
114 }
115
116 index
117 }
118 else {
119 let limit = std::cmp::min(end, start + small_limit);
120 (start .. limit).filter(|x| function(borrow.get(*x))).count()
121 }
122 }
123 }
124}
125
126use super::updates::UpdatesTyped;
127use super::RecordedUpdates;
128
129pub struct TrieChunker<U: super::layout::ColumnarUpdate> {
138 blobs: Vec<(UpdatesTyped<U>, bool)>,
140 blob_records: usize,
142 ready: std::collections::VecDeque<UpdatesTyped<U>>,
145 stage: Option<UpdatesTyped<U>>,
147}
148
149impl<U: super::layout::ColumnarUpdate> Default for TrieChunker<U> {
150 fn default() -> Self {
151 Self {
152 blobs: Default::default(),
153 blob_records: 0,
154 ready: Default::default(),
155 stage: None,
156 }
157 }
158}
159
160impl<U: super::layout::ColumnarUpdate> TrieChunker<U> {
161 fn consolidate_blobs(&mut self) -> UpdatesTyped<U> {
163 if self.blobs.len() == 1 && self.blobs[0].1 {
165 let (result, _) = self.blobs.pop().unwrap();
166 self.blob_records = 0;
167 return result;
168 }
169
170 let result = UpdatesTyped::<U>::form_unsorted(self.blobs.iter().flat_map(|(u, _)| u.iter()));
172 self.blobs.clear();
173 self.blob_records = 0;
174 result
175 }
176
177 fn absorb(&mut self, updates: UpdatesTyped<U>, consolidated: bool) {
179 self.blob_records += updates.len();
180 self.blobs.push((updates, consolidated));
181 }
182}
183
184impl<'a, U: super::layout::ColumnarUpdate> timely::container::PushInto<&'a mut RecordedUpdates<U>> for TrieChunker<U> {
185 fn push_into(&mut self, container: &'a mut RecordedUpdates<U>) {
186 if container.updates.len() == 0 { return; }
188
189 let updates = std::mem::take(&mut container.updates).into_typed();
197 let consolidated = container.consolidated;
198 let len = updates.len();
199
200 if consolidated && len >= crate::columnar::LINK_TARGET { self.ready.push_back(updates); }
203 else if self.blob_records + len < 2 * crate::columnar::LINK_TARGET { self.absorb(updates, consolidated); }
205 else {
207 let input_residual = if len >= crate::columnar::LINK_TARGET {
215 let cons = if consolidated { updates } else { updates.consolidate() };
216 if cons.len() >= crate::columnar::LINK_TARGET { self.ready.push_back(cons); None }
217 else if cons.len() > 0 { Some((cons, true)) }
218 else { None }
219 }
220 else { Some((updates, consolidated)) };
221
222 let blobs_residual = if self.blob_records >= crate::columnar::LINK_TARGET {
224 let cons = self.consolidate_blobs();
225 if cons.len() >= crate::columnar::LINK_TARGET { self.ready.push_back(cons); None }
226 else if cons.len() > 0 { Some((cons, true)) }
227 else { None }
228 }
229 else { None };
230
231 if let Some((r, c)) = input_residual { self.absorb(r, c); }
233 if let Some((r, c)) = blobs_residual { self.absorb(r, c); }
234 }
235 }
236}
237
238impl<U: super::layout::ColumnarUpdate> timely::container::ContainerBuilder for TrieChunker<U> {
239 type Container = UpdatesTyped<U>;
240 fn extract(&mut self) -> Option<&mut Self::Container> {
241 self.stage = self.ready.pop_front();
242 self.stage.as_mut()
243 }
244 fn finish(&mut self) -> Option<&mut Self::Container> {
245 if !self.blobs.is_empty() {
247 let cons = self.consolidate_blobs();
248 if cons.len() > 0 { self.ready.push_back(cons); }
249 }
250 self.extract()
251 }
252}
253
254pub mod builder {
255 use crate::trace::implementations::ord_neu::{Vals, Upds};
258 use crate::trace::implementations::ord_neu::val_batch::{OrdValBatch, OrdValStorage};
259 use crate::trace::Description;
260
261 use super::super::updates::UpdatesTyped;
262 use super::super::layout::ColumnarUpdate as Update;
263 use super::super::layout::ColumnarLayout as Layout;
264 use super::Coltainer;
265
266 use columnar::{Borrow, IndexAs};
267 use columnar::primitive::offsets::Strides;
268 use crate::trace::implementations::OffsetList;
269 fn strides_to_offset_list(bounds: &Strides, count: usize) -> OffsetList {
270 let mut output = OffsetList::with_capacity(count);
271 output.push(0);
272 let bounds_b = bounds.borrow();
273 for i in 0..count {
274 output.push(bounds_b.index_as(i) as usize);
275 }
276 output
277 }
278
279 pub struct ValMirror<U: Update> {
282 chunks: Vec<UpdatesTyped<U>>,
283 }
284 impl<U: Update> crate::trace::Builder for ValMirror<U> {
285 type Time = U::Time;
286 type Input = UpdatesTyped<U>;
287 type Output = OrdValBatch<Layout<U>>;
288
289 fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
290 Self { chunks: Vec::new() }
291 }
292 fn push(&mut self, chunk: &mut Self::Input) {
293 if chunk.len() > 0 {
294 self.chunks.push(std::mem::take(chunk));
295 }
296 }
297 fn done(self, description: Description<Self::Time>) -> Self::Output {
298 let mut chain = self.chunks;
299 Self::seal(&mut chain, description)
300 }
301 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
302 use columnar::Len;
303
304 use columnar::Container;
307 let mut updates = UpdatesTyped::<U>::default();
308 updates.keys.reserve_for(chain.iter().map(|c| c.view().keys));
309 updates.vals.reserve_for(chain.iter().map(|c| c.view().vals));
310 updates.times.reserve_for(chain.iter().map(|c| c.view().times));
311 updates.diffs.reserve_for(chain.iter().map(|c| c.view().diffs));
312 let mut builder = super::super::updates::UpdatesBuilder::new_from(updates);
313 for chunk in chain.iter() {
314 builder.meld(chunk);
315 }
316 let merged = builder.done();
317 chain.clear();
318
319 let updates = Len::len(&merged.diffs.values);
320 if updates == 0 {
321 let storage = OrdValStorage {
322 keys: Default::default(),
323 vals: Default::default(),
324 upds: Default::default(),
325 };
326 OrdValBatch { storage, description, updates: 0 }
327 } else {
328 let val_offs = strides_to_offset_list(&merged.vals.bounds, Len::len(&merged.keys.values));
329 let time_offs = strides_to_offset_list(&merged.times.bounds, Len::len(&merged.vals.values));
330 let storage = OrdValStorage {
331 keys: Coltainer { container: merged.keys.values },
332 vals: Vals {
333 offs: val_offs,
334 vals: Coltainer { container: merged.vals.values },
335 },
336 upds: Upds {
337 offs: time_offs,
338 times: Coltainer { container: merged.times.values },
339 diffs: Coltainer { container: merged.diffs.values },
340 },
341 };
342 OrdValBatch { storage, description, updates }
343 }
344 }
345 }
346}