Skip to main content

differential_dataflow/columnar/
mod.rs

1//! Columnar container infrastructure for differential dataflow.
2//!
3//! **Experimental.** API and internals are still settling. Expect breaking
4//! changes; do not rely on stability across releases.
5//!
6//! Known rough edges:
7//! - `ContainerBytes` for `UpdatesTyped` is `unimplemented!()`. The wire-side
8//!   container is `RecordedUpdates`, whose `ContainerBytes` is implemented;
9//!   `UpdatesTyped` is the input-builder type and isn't shipped over channels.
10//! - `leave_dynamic` consolidates eagerly on each batch; the
11//!   [`crate::dynamic`] counterpart defers consolidation. Same observable
12//!   semantics, different work distribution.
13//! - `join_function` is restricted to same-`ColumnarUpdate` input and output;
14//!   it does not yet generalize to `Key`/`Val`/`Diff`-changing maps.
15//! - Several public items (`join_function`, `leave_dynamic`, `DynTime`) have
16//!   no in-tree callers yet and are not exercised by tests.
17//!
18//! Files inside this module that touch both the local module path and the
19//! [`columnar`](https://docs.rs/columnar) crate should `use columnar as col;`
20//! to disambiguate.
21//!
22//! Module layout (bottom-up):
23//! - [`layout`] — `ColumnarUpdate` / `ColumnarLayout` / `OrdContainer`.
24//! - [`updates`] — `UpdatesTyped<U>` trie, `Consolidating`, `UpdatesBuilder`.
25//! - [`builder`] — `ValColBuilder`: the input-side `ContainerBuilder`.
26//! - [`exchange`] — `ValPact` / `ValDistributor`: PACT for shuffling.
27//! - [`arrangement`] — type aliases + `Coltainer` + `TrieChunker` +
28//!   `trie_merger` + `ValMirror` (trace Builder).
29//! - This file — `RecordedUpdates<U>` (the stream container), container-trait
30//!   impls (`Negate`, `Enter`, `Leave`, `ResultsIn`), and top-level operators
31//!   (`join_function`, `leave_dynamic`, `as_recorded_updates`).
32
33
34pub mod layout;
35pub mod updates;
36pub mod builder;
37pub mod exchange;
38pub mod arrangement;
39pub mod batcher;
40pub mod spill;
41
42pub use updates::UpdatesTyped;
43pub use builder::ValBuilder as ValColBuilder;
44pub use exchange::ValPact;
45pub use arrangement::{ValBatcher, ValBuilder, ValChunker, ValSpine};
46
47/// Target size for update batches, in number of updates.
48pub const LINK_TARGET: usize = 64 * 1024;
49
50/// A thin wrapper around `Updates` that tracks the pre-consolidation record count
51/// for timely's exchange accounting. This wrapper is the stream container type;
52/// the `TrieChunker` strips it, passing bare `UpdatesTyped` into the merge batcher.
53pub struct RecordedUpdates<U: layout::ColumnarUpdate> {
54    /// The trie of `(key, val, time, diff)` updates.
55    pub updates: updates::Updates<U>,
56    /// Number of records in `updates` before consolidation.
57    pub records: usize,
58    /// Whether `updates` is known to be sorted and consolidated
59    /// (no duplicate (key, val, time) triples, no zero diffs).
60    pub consolidated: bool,
61}
62
63impl<U: layout::ColumnarUpdate> Default for RecordedUpdates<U> {
64    fn default() -> Self { Self { updates: Default::default(), records: 0, consolidated: true } }
65}
66
67impl<U: layout::ColumnarUpdate> Clone for RecordedUpdates<U> {
68    fn clone(&self) -> Self { Self { updates: self.updates.clone(), records: self.records, consolidated: self.consolidated } }
69}
70
71impl<U: layout::ColumnarUpdate> timely::Accountable for RecordedUpdates<U> {
72    #[inline] fn record_count(&self) -> i64 { self.records as i64 }
73}
74
75impl<U: layout::ColumnarUpdate> timely::dataflow::channels::ContainerBytes for RecordedUpdates<U> {
76    fn from_bytes(mut bytes: timely::bytes::arc::Bytes) -> Self {
77        // Header: records, consolidated, and four column lengths (all u64).
78        let header = bytes.extract_to(48);
79        let records       = u64::from_le_bytes(header[0..8].try_into().unwrap()) as usize;
80        let consolidated  = u64::from_le_bytes(header[8..16].try_into().unwrap()) != 0;
81        let keys_len      = u64::from_le_bytes(header[16..24].try_into().unwrap()) as usize;
82        let vals_len      = u64::from_le_bytes(header[24..32].try_into().unwrap()) as usize;
83        let times_len     = u64::from_le_bytes(header[32..40].try_into().unwrap()) as usize;
84        let diffs_len     = u64::from_le_bytes(header[40..48].try_into().unwrap()) as usize;
85        // Slice the per-column byte ranges and wrap each as `Stash::Bytes`.
86        let keys_bytes  = bytes.extract_to(keys_len);
87        let vals_bytes  = bytes.extract_to(vals_len);
88        let times_bytes = bytes.extract_to(times_len);
89        let diffs_bytes = bytes.extract_to(diffs_len);
90        use columnar::bytes::stash::Stash;
91        let keys  = Stash::try_from_bytes(keys_bytes).expect("keys decode failed");
92        let vals  = Stash::try_from_bytes(vals_bytes).expect("vals decode failed");
93        let times = Stash::try_from_bytes(times_bytes).expect("times decode failed");
94        let diffs = Stash::try_from_bytes(diffs_bytes).expect("diffs decode failed");
95        RecordedUpdates {
96            updates: updates::Updates { keys, vals, times, diffs },
97            records,
98            consolidated,
99        }
100    }
101
102    fn length_in_bytes(&self) -> usize {
103        48 + self.updates.keys.length_in_bytes()
104           + self.updates.vals.length_in_bytes()
105           + self.updates.times.length_in_bytes()
106           + self.updates.diffs.length_in_bytes()
107    }
108
109    fn into_bytes<W: std::io::Write>(&self, writer: &mut W) {
110        let keys_len  = self.updates.keys.length_in_bytes()  as u64;
111        let vals_len  = self.updates.vals.length_in_bytes()  as u64;
112        let times_len = self.updates.times.length_in_bytes() as u64;
113        let diffs_len = self.updates.diffs.length_in_bytes() as u64;
114        // Header.
115        writer.write_all(&(self.records as u64).to_le_bytes()).unwrap();
116        writer.write_all(&(self.consolidated as u64).to_le_bytes()).unwrap();
117        writer.write_all(&keys_len.to_le_bytes()).unwrap();
118        writer.write_all(&vals_len.to_le_bytes()).unwrap();
119        writer.write_all(&times_len.to_le_bytes()).unwrap();
120        writer.write_all(&diffs_len.to_le_bytes()).unwrap();
121        // Body: each Stash writes its own indexed encoding.
122        self.updates.keys.write_bytes(writer).unwrap();
123        self.updates.vals.write_bytes(writer).unwrap();
124        self.updates.times.write_bytes(writer).unwrap();
125        self.updates.diffs.write_bytes(writer).unwrap();
126    }
127}
128
129// Container trait impls for RecordedUpdates, enabling iterative scopes.
130mod container_impls {
131    use columnar::{Columnar, Index, Len, Push};
132    use timely::progress::{Timestamp, timestamp::Refines};
133    use crate::difference::Abelian;
134    use crate::collection::containers::{Negate, Enter, Leave, ResultsIn};
135
136    use super::layout::ColumnarUpdate as Update;
137    use super::updates::UpdatesTyped;
138    use super::RecordedUpdates;
139
140    impl<U: Update<Diff: Abelian>> Negate for RecordedUpdates<U> {
141        fn negate(self) -> Self {
142            use columnar::Container;
143            let RecordedUpdates { mut updates, records, consolidated } = self;
144            let view = updates.view();
145            let old_diffs = view.diffs.values;
146            let mut new_diffs = <<U::Diff as Columnar>::Container as Container>::with_capacity_for([old_diffs].into_iter());
147            let mut owned = U::Diff::default();
148            for i in 0..old_diffs.len() {
149                columnar::Columnar::copy_from(&mut owned, old_diffs.get(i));
150                owned.negate();
151                new_diffs.push(&owned);
152            }
153            // TODO: avoid make_typed() call as we are overwriting.
154            updates.diffs.make_typed().values = new_diffs;
155            RecordedUpdates { updates, records, consolidated }
156        }
157    }
158
159    impl<K, V, T1, T2, R> Enter<T1, T2> for RecordedUpdates<(K, V, T1, R)>
160    where
161        (K, V, T1, R): Update<Key=K, Val=V, Time=T1, Diff=R>,
162        (K, V, T2, R): Update<Key=K, Val=V, Time=T2, Diff=R>,
163        T1: Timestamp + Columnar + Default + Clone,
164        T2: Refines<T1> + Columnar + Default + Clone,
165        K: Columnar, V: Columnar, R: Columnar,
166    {
167        type InnerContainer = RecordedUpdates<(K, V, T2, R)>;
168        fn enter(self) -> Self::InnerContainer {
169            // Rebuild the time column from a borrowed view; keys/vals/diffs
170            // move untouched, preserving any Stash::Bytes backing.
171            use columnar::bytes::stash::Stash;
172            let RecordedUpdates { updates, records, consolidated } = self;
173            let times = updates.times.borrow();
174            let times_values = times.values;
175            let mut new_times = <<T2 as Columnar>::Container as Default>::default();
176            let mut t1_owned = T1::default();
177            for i in 0..times_values.len() {
178                Columnar::copy_from(&mut t1_owned, times_values.get(i));
179                let t2 = T2::to_inner(t1_owned.clone());
180                new_times.push(&t2);
181            }
182            // TODO: Assumes Enter (to_inner) is order-preserving on times.
183            // Deconstruct `updates` to reform with same parts but different time type.
184            let super::updates::Updates { keys, vals, mut times, diffs } = updates;
185            // TODO: Avoid make_typed() call, as we are overwriting.
186            times.make_typed();
187            let Stash::Typed(times_lists) = times else { unreachable!() };
188            let times = Stash::Typed(super::updates::Lists {
189                values: new_times,
190                bounds: times_lists.bounds,
191            });
192            RecordedUpdates {
193                updates: super::updates::Updates { keys, vals, times, diffs },
194                records,
195                consolidated,
196            }
197        }
198    }
199
200    impl<K, V, T1, T2, R> Leave<T1, T2> for RecordedUpdates<(K, V, T1, R)>
201    where
202        (K, V, T1, R): Update<Key=K, Val=V, Time=T1, Diff=R>,
203        (K, V, T2, R): Update<Key=K, Val=V, Time=T2, Diff=R>,
204        T1: Refines<T2> + Columnar + Default + Clone,
205        T2: Timestamp + Columnar + Default + Clone,
206        K: Columnar, V: Columnar, R: Columnar,
207    {
208        type OuterContainer = RecordedUpdates<(K, V, T2, R)>;
209        fn leave(self) -> Self::OuterContainer {
210            // Rebuild the time column from a borrowed view; keys/vals/diffs
211            // move untouched. Distinct T1 times can collapse to the same T2
212            // time, so the result is consolidated.
213            use columnar::bytes::stash::Stash;
214            let RecordedUpdates { updates, records, consolidated: _ } = self;
215            let times = updates.times.borrow();
216            let times_values = times.values;
217            let mut new_times = <<T2 as Columnar>::Container as Default>::default();
218            let mut t1_owned = T1::default();
219            for i in 0..times_values.len() {
220                Columnar::copy_from(&mut t1_owned, times_values.get(i));
221                let t2: T2 = t1_owned.clone().to_outer();
222                new_times.push(&t2);
223            }
224            let super::updates::Updates { keys, vals, mut times, diffs } = updates;
225            // Extract `times` bounds via make_typed (one-column copy if Bytes-backed).
226            times.make_typed();
227            let Stash::Typed(times_lists) = times else { unreachable!() };
228            let times = Stash::Typed(super::updates::Lists {
229                values: new_times,
230                bounds: times_lists.bounds,
231            });
232            let mid = super::updates::Updates { keys, vals, times, diffs };
233            // Collapse adjacent (k,v,t2) duplicates created by `to_outer`.
234            RecordedUpdates {
235                updates: mid.into_typed().consolidate().into(),
236                records,
237                consolidated: true,
238            }
239        }
240    }
241
242    impl<U: Update> ResultsIn<<U::Time as Timestamp>::Summary> for RecordedUpdates<U> {
243        fn results_in(self, step: &<U::Time as Timestamp>::Summary) -> Self {
244            use timely::progress::PathSummary;
245            // Apply results_in to each time; drop updates whose time maps to None.
246            // This must rebuild the trie since some entries may be removed.
247            let mut output = UpdatesTyped::<U>::default();
248            let mut time_owned = U::Time::default();
249            // TODO: Build all times first, and if no `None` outputs, can re-use k, v, d.
250            for (k, v, t, d) in self.updates.view().iter() {
251                Columnar::copy_from(&mut time_owned, t);
252                if let Some(new_time) = step.results_in(&time_owned) {
253                    output.push((k, v, &new_time, d));
254                }
255            }
256            // TODO: Time advancement may not be order preserving, but .. it could be.
257            // TODO: Before this is consolidated the above would need to be `form`ed.
258            RecordedUpdates { updates: output.into(), records: self.records, consolidated: false }
259        }
260    }
261}
262
263/// A columnar flat_map: iterates RecordedUpdates, calls logic per (key, val, time, diff),
264/// joins output times with input times, multiplies output diffs with input diffs.
265///
266/// This subsumes map, filter, negate, and enter_at for columnar collections.
267pub fn join_function<U, I, L>(
268    input: crate::Collection<U::Time, RecordedUpdates<U>>,
269    mut logic: L,
270) -> crate::Collection<U::Time, RecordedUpdates<U>>
271where
272    U::Time: crate::lattice::Lattice,
273    U: layout::ColumnarUpdate<Diff: crate::difference::Multiply<U::Diff, Output = U::Diff>>,
274    I: IntoIterator<Item = (U::Key, U::Val, U::Time, U::Diff)>,
275    L: FnMut(
276        columnar::Ref<'_, U::Key>,
277        columnar::Ref<'_, U::Val>,
278        columnar::Ref<'_, U::Time>,
279        columnar::Ref<'_, U::Diff>,
280    ) -> I + 'static,
281{
282    use timely::dataflow::operators::generic::Operator;
283    use timely::dataflow::channels::pact::Pipeline;
284    use crate::AsCollection;
285    use crate::difference::Multiply;
286    use crate::lattice::Lattice;
287    use columnar::Columnar;
288
289    input
290        .inner
291        .unary::<ValColBuilder<U>, _, _, _>(Pipeline, "JoinFunction", move |_, _| {
292            move |input, output| {
293                let mut t1o = U::Time::default();
294                let mut d1o = U::Diff::default();
295                input.for_each(|time, data| {
296                    let mut session = output.session_with_builder(&time);
297                    for (k1, v1, t1, d1) in data.updates.view().iter() {
298                        Columnar::copy_from(&mut t1o, t1);
299                        Columnar::copy_from(&mut d1o, d1);
300                        for (k2, v2, t2, d2) in logic(k1, v1, t1, d1) {
301                            let t3 = t2.join(&t1o);
302                            let d3 = d2.multiply(&d1o);
303                            session.give((&k2, &v2, &t3, &d3));
304                        }
305                    }
306                });
307            }
308        })
309        .as_collection()
310}
311
312/// Timestamp shape of a dynamic iterative scope: an outer timestamp paired
313/// with a per-level `PointStamp` of loop counters.
314pub type DynTime<TOuter, T> = timely::order::Product<TOuter, crate::dynamic::pointstamp::PointStamp<T>>;
315
316/// Leave a dynamic iterative scope, truncating PointStamp coordinates.
317///
318/// Uses OperatorBuilder (not unary) for the custom input connection summary
319/// that tells timely how the PointStamp is affected (retain `level - 1` coordinates).
320///
321/// Consolidates after truncation since distinct PointStamp coordinates can collapse.
322pub fn leave_dynamic<K, V, R, TOuter, T>(
323    input: crate::Collection<DynTime<TOuter, T>, RecordedUpdates<(K, V, DynTime<TOuter, T>, R)>>,
324    level: usize,
325) -> crate::Collection<DynTime<TOuter, T>, RecordedUpdates<(K, V, DynTime<TOuter, T>, R)>>
326where
327    K: columnar::Columnar,
328    V: columnar::Columnar,
329    R: columnar::Columnar,
330    TOuter: timely::progress::Timestamp + Default + columnar::Columnar,
331    T: timely::progress::Timestamp + Default + columnar::Columnar,
332    (K, V, DynTime<TOuter, T>, R): layout::ColumnarUpdate<Key = K, Val = V, Time = DynTime<TOuter, T>, Diff = R>,
333{
334    assert!(level > 0, "leave_dynamic requires level > 0");
335    use timely::dataflow::channels::pact::Pipeline;
336    use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
337    use timely::dataflow::operators::generic::OutputBuilder;
338    use timely::order::Product;
339    use timely::progress::Antichain;
340    use timely::container::{ContainerBuilder, PushInto};
341    use crate::AsCollection;
342    use crate::dynamic::pointstamp::{PointStamp, PointStampSummary};
343    use columnar::Columnar;
344
345    let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), input.inner.scope());
346    let (output, stream) = builder.new_output();
347    let mut output = OutputBuilder::from(output);
348    let mut op_input = builder.new_input_connection(
349        input.inner,
350        Pipeline,
351        [(
352            0,
353            Antichain::from_elem(Product {
354                outer: Default::default(),
355                inner: PointStampSummary {
356                    retain: Some(level - 1),
357                    actions: Vec::new(),
358                },
359            }),
360        )],
361    );
362
363    builder.build(move |_capability| {
364        let mut col_builder = ValColBuilder::<(K, V, DynTime<TOuter, T>, R)>::default();
365        let mut time = DynTime::<TOuter, T>::default();
366        move |_frontier| {
367            let mut output = output.activate();
368            op_input.for_each(|cap, data| {
369                // Truncate the capability's timestamp.
370                let mut new_time = cap.time().clone();
371                let mut vec = std::mem::take(&mut new_time.inner).into_inner();
372                vec.truncate(level - 1);
373                new_time.inner = PointStamp::new(vec);
374                let new_cap = cap.delayed(&new_time, 0);
375                // Push updates with truncated times into the builder.
376                // The builder's form call on flush sorts and consolidates,
377                // handling the duplicate times that truncation can produce.
378                // TODO: The input trie is already sorted; a streaming form
379                // that accepts pre-sorted, potentially-collapsing timestamps
380                // could avoid the re-sort inside the builder.
381                for (k, v, t, d) in data.updates.view().iter() {
382                    Columnar::copy_from(&mut time, t);
383                    let mut inner_vec = std::mem::take(&mut time.inner).into_inner();
384                    inner_vec.truncate(level - 1);
385                    time.inner = PointStamp::new(inner_vec);
386                    col_builder.push_into((k, v, &time, d));
387                }
388                let mut session = output.session(&new_cap);
389                while let Some(container) = col_builder.finish() {
390                    session.give_container(container);
391                }
392            });
393        }
394    });
395
396    stream.as_collection()
397}
398
399/// Extract a `Collection<_, RecordedUpdates<U>>` from a columnar `Arranged`.
400///
401/// Cursors through each batch and pushes `(key, val, time, diff)` refs into
402/// a `ValColBuilder`, which sorts and consolidates on flush.
403pub fn as_recorded_updates<U>(
404    arranged: crate::operators::arrange::Arranged<
405        crate::operators::arrange::TraceAgent<ValSpine<U::Key, U::Val, U::Time, U::Diff>>,
406    >,
407) -> crate::Collection<U::Time, RecordedUpdates<U>>
408where
409    U: layout::ColumnarUpdate,
410{
411    use timely::dataflow::operators::generic::Operator;
412    use timely::dataflow::channels::pact::Pipeline;
413    use crate::trace::{BatchReader, Cursor};
414    use crate::AsCollection;
415
416    arranged.stream
417        .unary::<ValColBuilder<U>, _, _, _>(Pipeline, "AsRecordedUpdates", |_, _| {
418            move |input, output| {
419                input.for_each(|time, batches| {
420                    let mut session = output.session_with_builder(&time);
421                    for batch in batches.drain(..) {
422                        let mut cursor = batch.cursor();
423                        while cursor.key_valid(&batch) {
424                            while cursor.val_valid(&batch) {
425                                let key = cursor.key(&batch);
426                                let val = cursor.val(&batch);
427                                cursor.map_times(&batch, |time, diff| {
428                                    session.give((key, val, time, diff));
429                                });
430                                cursor.step_val(&batch);
431                            }
432                            cursor.step_key(&batch);
433                        }
434                    }
435                });
436            }
437        })
438        .as_collection()
439}