Skip to main content

differential_dataflow/columnar/arrangement/
mod.rs

1//! Columnar arrangement plumbing.
2//!
3//! - Type aliases (`ValSpine`, `ValBatcher`, `ValBuilder`) glue columnar storage
4//!   into DD's trace machinery.
5//! - `Coltainer<C>` wraps a columnar `C::Container` as a DD `BatchContainer`.
6//! - `TrieChunker` strips `RecordedUpdates` down to `UpdatesTyped` for the merge batcher.
7//! - `trie_merger` is the batch-at-a-time merging logic.
8//! - `builder::ValMirror` is the `trace::Builder` that seals melded chunks into
9//!   an `OrdValBatch`.
10
11use std::rc::Rc;
12use crate::trace::implementations::ord_neu::OrdValBatch;
13use crate::trace::rc_blanket_impls::RcBuilder;
14use crate::trace::implementations::spine_fueled::Spine;
15
16use super::layout::ColumnarLayout;
17
18pub mod trie_merger;
19
20/// A trace implementation backed by columnar storage.
21pub type ValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<ColumnarLayout<(K,V,T,R)>>>>;
22/// A batcher for columnar storage.
23pub type ValBatcher<K, V, T, R> = super::batcher::MergeBatcher<(K,V,T,R)>;
24/// A chunker that maps `RecordedUpdates<U>` streams into the batcher's `UpdatesTyped<U>` chunks.
25pub type ValChunker<U> = TrieChunker<U>;
26/// A builder for columnar storage.
27pub type ValBuilder<K, V, T, R> = RcBuilder<builder::ValMirror<(K,V,T,R)>>;
28
29/// A batch container implementation for Coltainer<C>.
30pub use batch_container::Coltainer;
31pub mod batch_container {
32    //! [`Coltainer`] wraps a columnar container as a DD [`BatchContainer`].
33
34    use columnar::{Borrow, Columnar, Container, Clear, Push, Index, Len};
35    use crate::trace::implementations::BatchContainer;
36
37    /// Container, anchored by `C` to provide an owned type.
38    pub struct Coltainer<C: Columnar> {
39        /// The underlying columnar container.
40        pub container: C::Container,
41    }
42
43    impl<C: Columnar> Default for Coltainer<C> {
44        fn default() -> Self { Self { container: Default::default() } }
45    }
46
47    impl<C: Columnar + Ord + Clone> BatchContainer for Coltainer<C> where for<'a> columnar::Ref<'a, C> : Ord {
48
49        type ReadItem<'a> = columnar::Ref<'a, C>;
50        type Owned = C;
51
52        #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { C::into_owned(item) }
53        #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.copy_from(item) }
54
55        #[inline(always)] fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.container.push(item) }
56        #[inline(always)] fn push_own(&mut self, item: &Self::Owned) { self.container.push(item) }
57
58        /// Clears the container. May not release resources.
59        fn clear(&mut self) { self.container.clear() }
60
61        /// Creates a new container with sufficient capacity.
62        fn with_capacity(_size: usize) -> Self { Self::default() }
63        /// Creates a new container with sufficient capacity.
64        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
65            Self {
66                container: <C as Columnar>::Container::with_capacity_for([cont1.container.borrow(), cont2.container.borrow()].into_iter()),
67            }
68         }
69
70        /// Converts a read item into one with a narrower lifetime.
71        #[inline(always)] fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { columnar::ContainerOf::<C>::reborrow_ref(item) }
72
73        /// Reference to the element at this position.
74        #[inline(always)] fn index(&self, index: usize) -> Self::ReadItem<'_> { self.container.borrow().get(index) }
75
76        #[inline(always)] fn len(&self) -> usize { self.container.len() }
77
78        /// Reports the number of elements satisfying the predicate.
79        ///
80        /// This methods *relies strongly* on the assumption that the predicate
81        /// stays false once it becomes false, a joint property of the predicate
82        /// and the layout of `Self. This allows `advance` to use exponential search to
83        /// count the number of elements in time logarithmic in the result.
84        fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
85
86            let borrow = self.container.borrow();
87
88            let small_limit = 8;
89
90            // Exponential search if the answer isn't within `small_limit`.
91            if end > start + small_limit && function(borrow.get(start + small_limit)) {
92
93                // start with no advance
94                let mut index = small_limit + 1;
95                if start + index < end && function(borrow.get(start + index)) {
96
97                    // advance in exponentially growing steps.
98                    let mut step = 1;
99                    while start + index + step < end && function(borrow.get(start + index + step)) {
100                        index += step;
101                        step <<= 1;
102                    }
103
104                    // advance in exponentially shrinking steps.
105                    step >>= 1;
106                    while step > 0 {
107                        if start + index + step < end && function(borrow.get(start + index + step)) {
108                            index += step;
109                        }
110                        step >>= 1;
111                    }
112
113                    index += 1;
114                }
115
116                index
117            }
118            else {
119                let limit = std::cmp::min(end, start + small_limit);
120                (start .. limit).filter(|x| function(borrow.get(*x))).count()
121            }
122        }
123    }
124}
125
126use super::updates::UpdatesTyped;
127use super::RecordedUpdates;
128
129/// A chunker that unwraps `RecordedUpdates` into bare `UpdatesTyped` for the merge batcher.
130///
131/// The intended behavior is to produce chunks whose size is within 1-2x `LINK_TARGET`.
132/// It ships large batches immediately, accumulates small batches, consolidates as they
133/// exceed 2xLINK_TARGET, and ships them unless they drop below 1xLINK_TARGET.
134///
135/// The flow is into (or around) `self.stage`, then consolidated blocks into `self.ready`,
136/// each of which is put in `self.stage`
137pub struct TrieChunker<U: super::layout::ColumnarUpdate> {
138    /// Insufficiently large updates we haven't figured out how to ship yet.
139    blobs: Vec<(UpdatesTyped<U>, bool)>,
140    /// Sum of `len()` across `blobs`.
141    blob_records: usize,
142    /// Ready-to-emit chunks. Each is sorted and consolidated; size ≥ `LINK_TARGET`
143    /// (or smaller, only for the final chunk produced by `finish`).
144    ready: std::collections::VecDeque<UpdatesTyped<U>>,
145    /// Staging area for the next pull call.
146    stage: Option<UpdatesTyped<U>>,
147}
148
149impl<U: super::layout::ColumnarUpdate> Default for TrieChunker<U> {
150    fn default() -> Self {
151        Self {
152            blobs: Default::default(),
153            blob_records: 0,
154            ready: Default::default(),
155            stage: None,
156        }
157    }
158}
159
160impl<U: super::layout::ColumnarUpdate> TrieChunker<U> {
161    /// Consolidate and empty `self.blobs`, into `self.ready` if large enough or else return.
162    fn consolidate_blobs(&mut self) -> UpdatesTyped<U> {
163        // Single consolidated entry: pass through, no work.
164        if self.blobs.len() == 1 && self.blobs[0].1 {
165            let (result, _) = self.blobs.pop().unwrap();
166            self.blob_records = 0;
167            return result;
168        }
169
170        // TODO: Improve consolidation through column-oriented sorts.
171        let result = UpdatesTyped::<U>::form_unsorted(self.blobs.iter().flat_map(|(u, _)| u.iter()));
172        self.blobs.clear();
173        self.blob_records = 0;
174        result
175    }
176
177    /// Push a non-empty `UpdatesTyped` into blobs and update accounting.
178    fn absorb(&mut self, updates: UpdatesTyped<U>, consolidated: bool) {
179        self.blob_records += updates.len();
180        self.blobs.push((updates, consolidated));
181    }
182}
183
184impl<'a, U: super::layout::ColumnarUpdate> timely::container::PushInto<&'a mut RecordedUpdates<U>> for TrieChunker<U> {
185    fn push_into(&mut self, container: &'a mut RecordedUpdates<U>) {
186        // Early return if an empty container (legit, for accountable progress tracking).
187        if container.updates.len() == 0 { return; }
188
189        // Our main goal is to only ship links that are 1-2 x LINK_TARGET, using blobs
190        // to accumulate updates until they are ready to go or we are asked to finish.
191        //
192        // Informally, we are aiming to move `container` into or around `self.blobs`.
193        // Into if small enough, as we can further consolidate, but if not we need to
194        // consolidate and then either ship (if large) or hold (if small) the results.
195
196        let updates = std::mem::take(&mut container.updates).into_typed();
197        let consolidated = container.consolidated;
198        let len = updates.len();
199
200        // The input may be ready to ship on its own.
201        // This is ideal, if we've used an accumulating container builder elsewhere.
202        if consolidated && len >= crate::columnar::LINK_TARGET { self.ready.push_back(updates); }
203        // Can move into blobs if the combined length is not too large.
204        else if self.blob_records + len < 2 * crate::columnar::LINK_TARGET { self.absorb(updates, consolidated); }
205        // Otherwise, we'll need to manage `self.blobs`.
206        else {
207            // Together `updates` and `self.blobs` exceed 2 * LINK_TARGET.
208            // At least one, perhaps both of them, are LINK_TARGET in size.
209            // We'll consolidate any that are, and ship or merge the results.
210            // We'll end up with at most LINK_TARGET in `self.blobs`, retiring
211            // a constant factor of the pending work we started with.
212
213            // Consolidate and move to ready if large; stash otherwise.
214            let input_residual = if len >= crate::columnar::LINK_TARGET {
215                let cons = if consolidated { updates } else { updates.consolidate() };
216                if cons.len() >= crate::columnar::LINK_TARGET { self.ready.push_back(cons); None }
217                else if cons.len() > 0 { Some((cons, true)) }
218                else { None }
219            }
220            else { Some((updates, consolidated)) };
221
222            // Consolidate and move to ready if large; stash otherwise.
223            let blobs_residual = if self.blob_records >= crate::columnar::LINK_TARGET {
224                let cons = self.consolidate_blobs();
225                if cons.len() >= crate::columnar::LINK_TARGET { self.ready.push_back(cons); None }
226                else if cons.len() > 0 { Some((cons, true)) }
227                else { None }
228            }
229            else { None };
230
231            // Return un-shipped
232            if let Some((r, c)) = input_residual { self.absorb(r, c); }
233            if let Some((r, c)) = blobs_residual { self.absorb(r, c); }
234        }
235    }
236}
237
238impl<U: super::layout::ColumnarUpdate> timely::container::ContainerBuilder for TrieChunker<U> {
239    type Container = UpdatesTyped<U>;
240    fn extract(&mut self) -> Option<&mut Self::Container> {
241        self.stage = self.ready.pop_front();
242        self.stage.as_mut()
243    }
244    fn finish(&mut self) -> Option<&mut Self::Container> {
245        // Drain whatever's left in blobs as a single (possibly small) final chunk.
246        if !self.blobs.is_empty() {
247            let cons = self.consolidate_blobs();
248            if cons.len() > 0 { self.ready.push_back(cons); }
249        }
250        self.extract()
251    }
252}
253
254pub mod builder {
255    //! [`ValMirror`] trace builder that seals melded chunks into [`OrdValBatch`].
256
257    use crate::trace::implementations::ord_neu::{Vals, Upds};
258    use crate::trace::implementations::ord_neu::val_batch::{OrdValBatch, OrdValStorage};
259    use crate::trace::Description;
260
261    use super::super::updates::UpdatesTyped;
262    use super::super::layout::ColumnarUpdate as Update;
263    use super::super::layout::ColumnarLayout as Layout;
264    use super::Coltainer;
265
266    use columnar::{Borrow, IndexAs};
267    use columnar::primitive::offsets::Strides;
268    use crate::trace::implementations::OffsetList;
269    fn strides_to_offset_list(bounds: &Strides, count: usize) -> OffsetList {
270        let mut output = OffsetList::with_capacity(count);
271        output.push(0);
272        let bounds_b = bounds.borrow();
273        for i in 0..count {
274            output.push(bounds_b.index_as(i) as usize);
275        }
276        output
277    }
278
279    /// Trace [`Builder`](crate::trace::Builder) that accumulates `UpdatesTyped`
280    /// chunks and seals them into a single [`OrdValBatch`].
281    pub struct ValMirror<U: Update> {
282        chunks: Vec<UpdatesTyped<U>>,
283    }
284    impl<U: Update> crate::trace::Builder for ValMirror<U> {
285        type Time = U::Time;
286        type Input = UpdatesTyped<U>;
287        type Output = OrdValBatch<Layout<U>>;
288
289        fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
290            Self { chunks: Vec::new() }
291        }
292        fn push(&mut self, chunk: &mut Self::Input) {
293            if chunk.len() > 0 {
294                self.chunks.push(std::mem::take(chunk));
295            }
296        }
297        fn done(self, description: Description<Self::Time>) -> Self::Output {
298            let mut chain = self.chunks;
299            Self::seal(&mut chain, description)
300        }
301        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
302            use columnar::Len;
303
304            // Meld sorted, consolidated chain entries in order.
305            // Pre-allocate to avoid reallocations during meld.
306            use columnar::Container;
307            let mut updates = UpdatesTyped::<U>::default();
308            updates.keys.reserve_for(chain.iter().map(|c| c.view().keys));
309            updates.vals.reserve_for(chain.iter().map(|c| c.view().vals));
310            updates.times.reserve_for(chain.iter().map(|c| c.view().times));
311            updates.diffs.reserve_for(chain.iter().map(|c| c.view().diffs));
312            let mut builder = super::super::updates::UpdatesBuilder::new_from(updates);
313            for chunk in chain.iter() {
314                builder.meld(chunk);
315            }
316            let merged = builder.done();
317            chain.clear();
318
319            let updates = Len::len(&merged.diffs.values);
320            if updates == 0 {
321                let storage = OrdValStorage {
322                    keys: Default::default(),
323                    vals: Default::default(),
324                    upds: Default::default(),
325                };
326                OrdValBatch { storage, description, updates: 0 }
327            } else {
328                let val_offs = strides_to_offset_list(&merged.vals.bounds, Len::len(&merged.keys.values));
329                let time_offs = strides_to_offset_list(&merged.times.bounds, Len::len(&merged.vals.values));
330                let storage = OrdValStorage {
331                    keys: Coltainer { container: merged.keys.values },
332                    vals: Vals {
333                        offs: val_offs,
334                        vals: Coltainer { container: merged.vals.values },
335                    },
336                    upds: Upds {
337                        offs: time_offs,
338                        times: Coltainer { container: merged.times.values },
339                        diffs: Coltainer { container: merged.diffs.values },
340                    },
341                };
342                OrdValBatch { storage, description, updates }
343            }
344        }
345    }
346}