Skip to main content

differential_dataflow/columnar/
updates.rs

1//! Trie-structured update storage.
2//!
3//! `UpdatesTyped<U>` is the core trie: four nested `Lists` (keys, vals, times, diffs).
4//! `Consolidating` is a streaming consolidator over sorted `(k,v,t,d)` data.
5//! `UpdatesBuilder` melds sorted, consolidated chunks into a single trie.
6//!
7//! NOTE: `UpdatesTyped::iter` / `form` / `form_unsorted` / `consolidate` / `filter_zero`
8//! are escape hatches that flatten the trie. Prefer trie-native operations where
9//! possible — flattening + rebuilding is a significant cost on hot paths.
10
11use columnar::{Columnar, Container, ContainerOf, Vecs, Borrow, Index, IndexAs, Len, Push};
12use columnar::primitive::offsets::Strides;
13use crate::difference::{Semigroup, IsZero};
14
15use super::layout::ColumnarUpdate as Update;
16
17/// A `Vecs` using strided offsets.
18pub type Lists<C> = Vecs<C, Strides>;
19
20/// Returns the non-empty lists once values are filtered by `keep`, and the bitmap of lists to keep.
21pub fn retain_items<'a, C: Container>(lists: <Lists<C> as Borrow>::Borrowed<'a>, keep: &[bool]) -> (Lists<C>, Vec<bool>) {
22
23    // In principle we can copy runs described in `bools` for bulk copying.
24    let mut output = <Lists::<C> as Container>::with_capacity_for([lists].into_iter());
25    let mut bitmap = Vec::with_capacity(lists.len());
26    assert_eq!(keep.len(), lists.values.len());
27    for list_index in 0 .. lists.len() {
28        let (lower, upper) = lists.bounds.bounds(list_index);
29        for item_index in lower .. upper {
30            if keep[item_index] {
31                output.values.push(lists.values.get(item_index));
32            }
33        }
34        if output.values.len() > columnar::Index::last(&output.bounds.borrow()).unwrap_or(0) as usize {
35            output.bounds.push(output.values.len() as u64);
36            bitmap.push(true);
37        }
38        else { bitmap.push(false); }
39    }
40
41    assert_eq!(bitmap.len(), lists.len());
42    (output, bitmap)
43}
44
45
46/// Trie-structured update storage using columnar containers.
47///
48/// Four nested layers of `Lists`:
49/// - `keys`: lists of keys (outer lists are independent groups)
50/// - `vals`: per-key, lists of vals
51/// - `times`: per-val, lists of times
52/// - `diffs`: per-time, lists of diffs (singletons when consolidated)
53///
54/// A flat unsorted input has stride 1 at every level (one key per entry,
55/// one val per key, one time per val, one diff per time).
56/// A fully consolidated trie has a single outer key list, all lists sorted
57/// and deduplicated, and singleton diff lists.
58pub struct UpdatesTyped<U: Update> {
59    /// Outer key list (one entry per group of keys at the trie root).
60    pub keys:  Lists<ContainerOf<U::Key>>,
61    /// Per-key list of vals.
62    pub vals:  Lists<ContainerOf<U::Val>>,
63    /// Per-val list of times.
64    pub times: Lists<ContainerOf<U::Time>>,
65    /// Per-time list of diffs (one diff per time after consolidation).
66    pub diffs: Lists<ContainerOf<U::Diff>>,
67}
68
69impl<U: Update> Default for UpdatesTyped<U> {
70    fn default() -> Self {
71        Self {
72            keys: Default::default(),
73            vals: Default::default(),
74            times: Default::default(),
75            diffs: Default::default(),
76        }
77    }
78}
79
80impl<U: Update> std::fmt::Debug for UpdatesTyped<U> {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        f.debug_struct("UpdatesTyped").finish()
83    }
84}
85
86impl<U: Update> Clone for UpdatesTyped<U> {
87    fn clone(&self) -> Self {
88        Self {
89            keys: self.keys.clone(),
90            vals: self.vals.clone(),
91            times: self.times.clone(),
92            diffs: self.diffs.clone(),
93        }
94    }
95}
96
97/// Borrowed view of an [`UpdatesTyped<U>`] with the same four-field shape.
98///
99/// Reader code should consume an `UpdatesTyped` through this view rather than reading
100/// fields directly. This decouples readers from the storage representation: the
101/// view's shape stays the same whether the underlying `UpdatesTyped` holds owned
102/// `Lists` or (later) `Stash`-backed columns that may be borrowed from wire bytes.
103pub struct UpdatesView<'a, U: Update> {
104    /// Outer key list (one entry per group of keys at the trie root).
105    pub keys:  <Lists<ContainerOf<U::Key>>  as Borrow>::Borrowed<'a>,
106    /// Per-key list of vals.
107    pub vals:  <Lists<ContainerOf<U::Val>>  as Borrow>::Borrowed<'a>,
108    /// Per-val list of times.
109    pub times: <Lists<ContainerOf<U::Time>> as Borrow>::Borrowed<'a>,
110    /// Per-time list of diffs.
111    pub diffs: <Lists<ContainerOf<U::Diff>> as Borrow>::Borrowed<'a>,
112}
113
114impl<'a, U: Update> Copy for UpdatesView<'a, U> {}
115impl<'a, U: Update> Clone for UpdatesView<'a, U> { fn clone(&self) -> Self { *self } }
116
117impl<'a, U: Update> UpdatesView<'a, U> {
118    /// Iterate all `(key, val, time, diff)` entries as refs.
119    pub fn iter(self) -> impl Iterator<Item = (
120        columnar::Ref<'a, U::Key>,
121        columnar::Ref<'a, U::Val>,
122        columnar::Ref<'a, U::Time>,
123        columnar::Ref<'a, U::Diff>,
124    )> {
125        let UpdatesView { keys, vals, times, diffs } = self;
126        (0..Len::len(&keys))
127            .flat_map(move |outer| child_range(keys.bounds, outer))
128            .flat_map(move |k| {
129                let key = keys.values.get(k);
130                child_range(vals.bounds, k).map(move |v| (key, v))
131            })
132            .flat_map(move |(key, v)| {
133                let val = vals.values.get(v);
134                child_range(times.bounds, v).map(move |t| (key, val, t))
135            })
136            .flat_map(move |(key, val, t)| {
137                let time = times.values.get(t);
138                child_range(diffs.bounds, t).map(move |d| (key, val, time, diffs.values.get(d)))
139            })
140    }
141
142    /// Translate a key-range into the corresponding val-range via `vals.bounds`.
143    pub fn vals_bounds(self, key_range: std::ops::Range<usize>) -> std::ops::Range<usize> {
144        if !key_range.is_empty() {
145            let bounds = self.vals.bounds;
146            let lower = if key_range.start == 0 { 0 } else { bounds.index_as(key_range.start - 1) as usize };
147            let upper = bounds.index_as(key_range.end - 1) as usize;
148            lower..upper
149        } else { key_range }
150    }
151    /// Translate a val-range into the corresponding time-range via `times.bounds`.
152    pub fn times_bounds(self, val_range: std::ops::Range<usize>) -> std::ops::Range<usize> {
153        if !val_range.is_empty() {
154            let bounds = self.times.bounds;
155            let lower = if val_range.start == 0 { 0 } else { bounds.index_as(val_range.start - 1) as usize };
156            let upper = bounds.index_as(val_range.end - 1) as usize;
157            lower..upper
158        } else { val_range }
159    }
160}
161
162impl<U: Update> UpdatesTyped<U> {
163    /// Borrow the four columns as a single `UpdatesView`.
164    pub fn view(&self) -> UpdatesView<'_, U> {
165        UpdatesView {
166            keys:  self.keys.borrow(),
167            vals:  self.vals.borrow(),
168            times: self.times.borrow(),
169            diffs: self.diffs.borrow(),
170        }
171    }
172}
173
174/// `Stash`-backed update storage: each column may be typed (writable) or
175/// borrowed from wire bytes (read-only, zero-copy).
176///
177/// Construction sites work in [`UpdatesTyped`]; convert via `From` at the
178/// boundary. Reader code uses [`UpdatesView`] via [`Updates::view`], which
179/// produces the same shape regardless of whether the columns are typed or
180/// borrowed.
181pub struct Updates<U: Update, B = timely::bytes::arc::Bytes> {
182    /// Outer key list (one entry per group of keys at the trie root).
183    pub keys:  columnar::bytes::stash::Stash<Lists<ContainerOf<U::Key>>,  B>,
184    /// Per-key list of vals.
185    pub vals:  columnar::bytes::stash::Stash<Lists<ContainerOf<U::Val>>,  B>,
186    /// Per-val list of times.
187    pub times: columnar::bytes::stash::Stash<Lists<ContainerOf<U::Time>>, B>,
188    /// Per-time list of diffs.
189    pub diffs: columnar::bytes::stash::Stash<Lists<ContainerOf<U::Diff>>, B>,
190}
191
192impl<U: Update, B> Default for Updates<U, B> {
193    fn default() -> Self {
194        Self {
195            keys:  Default::default(),
196            vals:  Default::default(),
197            times: Default::default(),
198            diffs: Default::default(),
199        }
200    }
201}
202
203impl<U: Update, B: Clone> Clone for Updates<U, B> {
204    fn clone(&self) -> Self {
205        Self {
206            keys:  self.keys.clone(),
207            vals:  self.vals.clone(),
208            times: self.times.clone(),
209            diffs: self.diffs.clone(),
210        }
211    }
212}
213
214impl<U: Update, B> From<UpdatesTyped<U>> for Updates<U, B> {
215    fn from(owned: UpdatesTyped<U>) -> Self {
216        use columnar::bytes::stash::Stash;
217        Self {
218            keys:  Stash::Typed(owned.keys),
219            vals:  Stash::Typed(owned.vals),
220            times: Stash::Typed(owned.times),
221            diffs: Stash::Typed(owned.diffs),
222        }
223    }
224}
225
226impl<U: Update, B: std::ops::Deref<Target = [u8]> + Clone + 'static> Updates<U, B> {
227    /// Borrow the four columns as a single `UpdatesView`.
228    pub fn view(&self) -> UpdatesView<'_, U> {
229        UpdatesView {
230            keys:  self.keys.borrow(),
231            vals:  self.vals.borrow(),
232            times: self.times.borrow(),
233            diffs: self.diffs.borrow(),
234        }
235    }
236
237    /// Total number of updates (records) in the trie.
238    pub fn len(&self) -> usize {
239        self.view().diffs.values.len()
240    }
241
242    /// Whether the trie is empty.
243    pub fn is_empty(&self) -> bool { self.len() == 0 }
244
245    /// Convert to fully owned form, copying any `Stash::Bytes` columns into
246    /// typed `Lists`. Already-typed columns pass through with no copy.
247    ///
248    /// This method should be avoided unless typed containers are truly needed.
249    pub fn into_typed(mut self) -> UpdatesTyped<U> {
250        use columnar::bytes::stash::Stash;
251        self.keys.make_typed();
252        self.vals.make_typed();
253        self.times.make_typed();
254        self.diffs.make_typed();
255        let Stash::Typed(keys)  = self.keys  else { unreachable!() };
256        let Stash::Typed(vals)  = self.vals  else { unreachable!() };
257        let Stash::Typed(times) = self.times else { unreachable!() };
258        let Stash::Typed(diffs) = self.diffs else { unreachable!() };
259        UpdatesTyped { keys, vals, times, diffs }
260    }
261}
262
263/// The flat `(key, val, time, diff)` tuple for an [`Update`].
264pub type Tuple<U> = (<U as Update>::Key, <U as Update>::Val, <U as Update>::Time, <U as Update>::Diff);
265
266/// Returns the value-index range for list `i` given cumulative bounds.
267#[inline]
268pub fn child_range<B: IndexAs<u64>>(bounds: B, i: usize) -> std::ops::Range<usize> {
269    let lower = if i == 0 { 0 } else { bounds.index_as(i - 1) as usize };
270    let upper = bounds.index_as(i) as usize;
271    lower..upper
272}
273
274/// A streaming consolidation iterator for sorted `(key, val, time, diff)` data.
275///
276/// Accumulates diffs for equal `(key, val, time)` triples, yielding at most
277/// one output per distinct triple, with a non-zero accumulated diff.
278/// Input must be sorted by `(key, val, time)`.
279pub struct Consolidating<I: Iterator, D> {
280    iter: std::iter::Peekable<I>,
281    diff: D,
282}
283
284impl<K, V, T, D, I> Consolidating<I, D>
285where
286    K: Copy + Eq,
287    V: Copy + Eq,
288    T: Copy + Eq,
289    D: Semigroup + IsZero + Default,
290    I: Iterator<Item = (K, V, T, D)>,
291{
292    /// Wrap a sorted `(K, V, T, D)` iterator so adjacent equal `(K, V, T)`
293    /// runs accumulate into a single output with the summed diff.
294    pub fn new(iter: I) -> Self {
295        Self { iter: iter.peekable(), diff: D::default() }
296    }
297}
298
299impl<K, V, T, D, I> Iterator for Consolidating<I, D>
300where
301    K: Copy + Eq,
302    V: Copy + Eq,
303    T: Copy + Eq,
304    D: Semigroup + IsZero + Default + Clone,
305    I: Iterator<Item = (K, V, T, D)>,
306{
307    type Item = (K, V, T, D);
308    fn next(&mut self) -> Option<Self::Item> {
309        loop {
310            let (k, v, t, d) = self.iter.next()?;
311            self.diff = d;
312            while let Some(&(k2, v2, t2, _)) = self.iter.peek() {
313                if k2 == k && v2 == v && t2 == t {
314                    let (_, _, _, d2) = self.iter.next().unwrap();
315                    self.diff.plus_equals(&d2);
316                } else {
317                    break;
318                }
319            }
320            if !self.diff.is_zero() {
321                return Some((k, v, t, self.diff.clone()));
322            }
323        }
324    }
325}
326
327impl<U: Update> UpdatesTyped<U> {
328
329    /// Copies `other[key_range]` into self, keys and all.
330    pub fn extend_from_keys(&mut self, other: UpdatesView<'_, U>, key_range: std::ops::Range<usize>) {
331        self.keys.values.extend_from_self(other.keys.values, key_range.clone());
332        self.vals.extend_from_self(other.vals, key_range.clone());
333        let val_range = other.vals_bounds(key_range);
334        self.times.extend_from_self(other.times, val_range.clone());
335        let time_range = other.times_bounds(val_range);
336        self.diffs.extend_from_self(other.diffs, time_range);
337    }
338
339    /// Forms a consolidated `UpdatesTyped` trie from unsorted `(key, val, time, diff)` refs.
340    pub fn form_unsorted<'a>(unsorted: impl Iterator<Item = columnar::Ref<'a, Tuple<U>>>) -> Self {
341        let mut data = unsorted.collect::<Vec<_>>();
342        data.sort();
343        Self::form(data.into_iter())
344    }
345
346    /// Forms a consolidated `UpdatesTyped` trie from sorted `(key, val, time, diff)` refs.
347    pub fn form<'a>(sorted: impl Iterator<Item = columnar::Ref<'a, Tuple<U>>>) -> Self {
348
349        // Step 1: Streaming consolidation — accumulate diffs, drop zeros.
350        let consolidated = Consolidating::new(
351            sorted.map(|(k, v, t, d)| (k, v, t, <U::Diff as Columnar>::into_owned(d)))
352        );
353
354        // Step 2: Build the trie from consolidated, sorted, non-zero data.
355        let mut output = Self::default();
356        let mut updates = consolidated;
357        if let Some((key, val, time, diff)) = updates.next() {
358            let mut prev = (key, val, time);
359            output.keys.values.push(key);
360            output.vals.values.push(val);
361            output.times.values.push(time);
362            output.diffs.values.push(&diff);
363            output.diffs.bounds.push(output.diffs.values.len() as u64);
364
365            // As we proceed, seal up known complete runs.
366            for (key, val, time, diff) in updates {
367
368                // If keys differ, record key and seal vals and times.
369                if key != prev.0 {
370                    output.vals.bounds.push(output.vals.values.len() as u64);
371                    output.times.bounds.push(output.times.values.len() as u64);
372                    output.keys.values.push(key);
373                    output.vals.values.push(val);
374                }
375                // If vals differ, record val and seal times.
376                else if val != prev.1 {
377                    output.times.bounds.push(output.times.values.len() as u64);
378                    output.vals.values.push(val);
379                }
380                else {
381                    // We better not find a duplicate time.
382                    assert!(time != prev.2);
383                }
384
385                // Always record (time, diff).
386                output.times.values.push(time);
387                output.diffs.values.push(&diff);
388                output.diffs.bounds.push(output.diffs.values.len() as u64);
389
390                prev = (key, val, time);
391            }
392
393            // Seal up open lists.
394            output.keys.bounds.push(output.keys.values.len() as u64);
395            output.vals.bounds.push(output.vals.values.len() as u64);
396            output.times.bounds.push(output.times.values.len() as u64);
397        }
398
399        output
400    }
401
402    /// Consolidates into canonical trie form:
403    /// single outer key list, all lists sorted and deduplicated,
404    /// diff lists are singletons (or absent if cancelled).
405    pub fn consolidate(self) -> Self { Self::form_unsorted(self.iter()) }
406    /// Drop entries whose diff list is empty (cancelled), rebuilding the trie.
407    pub fn filter_zero(self) -> Self {
408        if self.diffs.bounds.strided() == Some(1) { self }
409        // TODO: rework to move from trie structure to trie structure.
410        else {
411            let mut keep = Vec::with_capacity(self.times.values.len());
412            for index in 0 .. self.times.values.len() {
413                keep.push({
414                    let (lower, upper) = self.diffs.bounds.bounds(index);
415                    lower < upper
416                });
417            }
418            let (times, keep) = retain_items(self.times.borrow(), &keep[..]);
419            let (vals, keep) = retain_items(self.vals.borrow(), &keep[..]);
420            let (keys, _keep) = retain_items(self.keys.borrow(), &keep[..]);
421            UpdatesTyped {
422                keys,
423                vals,
424                times,
425                diffs: Lists {
426                    bounds: Strides::new(1, self.diffs.values.len() as u64),
427                    values: self.diffs.values,
428                },
429            }
430        }
431        // else { Self::form(self.iter()) }
432    }
433
434    /// The number of leaf-level diff entries (total updates).
435    pub fn len(&self) -> usize { self.diffs.values.len() }
436}
437
438/// Push a single flat update as a stride-1 entry.
439///
440/// Each field is independently typed — columnar refs, `&Owned`, owned values,
441/// or any other type the column container accepts via its `Push` impl.
442impl<KP, VP, TP, DP, U: Update> Push<(KP, VP, TP, DP)> for UpdatesTyped<U>
443where
444    ContainerOf<U::Key>: Push<KP>,
445    ContainerOf<U::Val>: Push<VP>,
446    ContainerOf<U::Time>: Push<TP>,
447    ContainerOf<U::Diff>: Push<DP>,
448{
449    fn push(&mut self, (key, val, time, diff): (KP, VP, TP, DP)) {
450        self.keys.values.push(key);
451        self.keys.bounds.push(self.keys.values.len() as u64);
452        self.vals.values.push(val);
453        self.vals.bounds.push(self.vals.values.len() as u64);
454        self.times.values.push(time);
455        self.times.bounds.push(self.times.values.len() as u64);
456        self.diffs.values.push(diff);
457        self.diffs.bounds.push(self.diffs.values.len() as u64);
458    }
459}
460
461/// PushInto for the `((K, V), T, R)` shape that reduce_trace uses.
462impl<U: Update> timely::container::PushInto<((U::Key, U::Val), U::Time, U::Diff)> for UpdatesTyped<U> {
463    fn push_into(&mut self, ((key, val), time, diff): ((U::Key, U::Val), U::Time, U::Diff)) {
464        self.push((&key, &val, &time, &diff));
465    }
466}
467
468impl<U: Update> UpdatesTyped<U> {
469
470    /// Iterate all `(key, val, time, diff)` entries as refs.
471    pub fn iter(&self) -> impl Iterator<Item = (
472        columnar::Ref<'_, U::Key>,
473        columnar::Ref<'_, U::Val>,
474        columnar::Ref<'_, U::Time>,
475        columnar::Ref<'_, U::Diff>,
476    )> {
477        self.view().iter()
478    }
479}
480
481impl<U: Update> timely::Accountable for UpdatesTyped<U> {
482    #[inline] fn record_count(&self) -> i64 { Len::len(&self.diffs.values) as i64 }
483}
484
485impl<U: Update> timely::dataflow::channels::ContainerBytes for UpdatesTyped<U> {
486    fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() }
487    fn length_in_bytes(&self) -> usize { unimplemented!() }
488    fn into_bytes<W: std::io::Write>(&self, _writer: &mut W) { unimplemented!() }
489}
490
491/// An incremental trie builder that accepts sorted, consolidated `UpdatesTyped` chunks
492/// and melds them into a single `UpdatesTyped` trie.
493///
494/// The internal `UpdatesTyped` has open (unsealed) bounds at the keys, vals, and times
495/// levels — the last group at each level has its values pushed but no corresponding
496/// bounds entry. `diffs.bounds` is always 1:1 with `times.values`.
497///
498/// `meld` accepts a consolidated `UpdatesTyped` whose first `(key, val, time)` is
499/// strictly greater than the builder's last `(key, val, time)`. The key and val
500/// may equal the builder's current open key/val, as long as the time is greater.
501///
502/// `done` seals all open bounds and returns the completed `UpdatesTyped`.
503pub struct UpdatesBuilder<U: Update> {
504    /// Non-empty, consolidated updates.
505    updates: UpdatesTyped<U>,
506}
507
508impl<U: Update> UpdatesBuilder<U> {
509    /// Construct a new builder from consolidated, sealed updates.
510    ///
511    /// Unseals the last group at keys, vals, and times levels so that
512    /// subsequent `meld` calls can extend the open groups.
513    /// If the updates are not consolidated none of this works.
514    pub fn new_from(mut updates: UpdatesTyped<U>) -> Self {
515        use columnar::Len;
516        if Len::len(&updates.keys.values) > 0 {
517            updates.keys.bounds.pop();
518            updates.vals.bounds.pop();
519            updates.times.bounds.pop();
520        }
521        Self { updates }
522    }
523
524    /// Meld a sorted, consolidated `UpdatesTyped` chunk into this builder.
525    ///
526    /// The chunk's first `(key, val, time)` must be strictly greater than
527    /// the builder's last `(key, val, time)`. Keys and vals may overlap
528    /// (continue the current group), but times must be strictly increasing
529    /// within the same `(key, val)`.
530    pub fn meld(&mut self, chunk: &UpdatesTyped<U>) {
531        use columnar::{Borrow, Index, Len};
532
533        if chunk.len() == 0 { return; }
534
535        // Empty builder: clone the chunk and unseal it.
536        if Len::len(&self.updates.keys.values) == 0 {
537            self.updates = chunk.clone();
538            self.updates.keys.bounds.pop();
539            self.updates.vals.bounds.pop();
540            self.updates.times.bounds.pop();
541            return;
542        }
543
544        // Pre-compute boundary comparisons before mutating.
545        let keys_match = {
546            let skb = self.updates.keys.values.borrow();
547            let ckb = chunk.keys.values.borrow();
548            skb.get(Len::len(&skb) - 1) == ckb.get(0)
549        };
550        let vals_match = keys_match && {
551            let svb = self.updates.vals.values.borrow();
552            let cvb = chunk.vals.values.borrow();
553            svb.get(Len::len(&svb) - 1) == cvb.get(0)
554        };
555
556        let chunk_num_keys = Len::len(&chunk.keys.values);
557        let chunk_num_vals = Len::len(&chunk.vals.values);
558        let chunk_num_times = Len::len(&chunk.times.values);
559
560        // Child ranges for the first element at each level of the chunk.
561        let first_key_vals = child_range(chunk.vals.borrow().bounds, 0);
562        let first_val_times = child_range(chunk.times.borrow().bounds, 0);
563
564        // There is a first position where coordinates disagree.
565        // Strictly beyond that position: seal bounds, extend lists, re-open the last bound.
566        // At that position: meld the first list, extend subsequent lists, re-open.
567        let mut differ = false;
568
569        // --- Keys ---
570        if keys_match {
571            // Skip the duplicate first key; add remaining keys.
572            if chunk_num_keys > 1 {
573                self.updates.keys.values.extend_from_self(chunk.keys.values.borrow(), 1..chunk_num_keys);
574            }
575        } else {
576            // All keys are new.
577            self.updates.keys.values.extend_from_self(chunk.keys.values.borrow(), 0..chunk_num_keys);
578            differ = true;
579        }
580
581        // --- Vals ---
582        if differ {
583            // Keys differed: seal open val group, extend all val lists, unseal last.
584            self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64);
585            self.updates.vals.extend_from_self(chunk.vals.borrow(), 0..chunk_num_keys);
586            self.updates.vals.bounds.pop();
587        } else {
588            // Keys matched: meld vals for the shared key.
589            if vals_match {
590                // Skip the duplicate first val; add remaining vals from the first key's list.
591                if first_key_vals.len() > 1 {
592                    self.updates.vals.values.extend_from_self(
593                        chunk.vals.values.borrow(),
594                        (first_key_vals.start + 1)..first_key_vals.end,
595                    );
596                }
597            } else {
598                // First val differs: add all vals from the first key's list.
599                self.updates.vals.values.extend_from_self(
600                    chunk.vals.values.borrow(),
601                    first_key_vals.clone(),
602                );
603                differ = true;
604            }
605            // Seal the matched key's val group, extend remaining keys' val lists, unseal.
606            if chunk_num_keys > 1 {
607                self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64);
608                self.updates.vals.extend_from_self(chunk.vals.borrow(), 1..chunk_num_keys);
609                self.updates.vals.bounds.pop();
610            }
611        }
612
613        // --- Times ---
614        if differ {
615            // Seal open time group, extend all time lists, unseal last.
616            self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64);
617            self.updates.times.extend_from_self(chunk.times.borrow(), 0..chunk_num_vals);
618            self.updates.times.bounds.pop();
619        } else {
620            // Keys and vals matched. Times must be strictly greater (precondition),
621            // so we always set differ = true here.
622            debug_assert!({
623                let stb = self.updates.times.values.borrow();
624                let ctb = chunk.times.values.borrow();
625                stb.get(Len::len(&stb) - 1) != ctb.get(0)
626            }, "meld: duplicate time within same (key, val)");
627            // Add times from the first val's time list into the open group.
628            self.updates.times.values.extend_from_self(
629                chunk.times.values.borrow(),
630                first_val_times.clone(),
631            );
632            differ = true;
633            // Seal the matched val's time group, extend remaining vals' time lists, unseal.
634            if chunk_num_vals > 1 {
635                self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64);
636                self.updates.times.extend_from_self(chunk.times.borrow(), 1..chunk_num_vals);
637                self.updates.times.bounds.pop();
638            }
639        }
640
641        // --- Diffs ---
642        // Diffs are always sealed (1:1 with times). By the precondition that
643        // times are strictly increasing for the same (key, val), differ is
644        // always true by this point — just extend all diff lists.
645        debug_assert!(differ);
646        self.updates.diffs.extend_from_self(chunk.diffs.borrow(), 0..chunk_num_times);
647    }
648
649    /// Seal all open bounds and return the completed `UpdatesTyped`.
650    pub fn done(mut self) -> UpdatesTyped<U> {
651        use columnar::Len;
652        if Len::len(&self.updates.keys.values) > 0 {
653            // Seal the open time group.
654            self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64);
655            // Seal the open val group.
656            self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64);
657            // Seal the outer key group.
658            self.updates.keys.bounds.push(Len::len(&self.updates.keys.values) as u64);
659        }
660        self.updates
661    }
662}
663
664#[cfg(test)]
665mod tests {
666    use super::*;
667    use columnar::Push;
668
669    type TestUpdate = (u64, u64, u64, i64);
670
671    fn collect(updates: &UpdatesTyped<TestUpdate>) -> Vec<(u64, u64, u64, i64)> {
672        updates.iter().map(|(k, v, t, d)| (*k, *v, *t, *d)).collect()
673    }
674
675    #[test]
676    fn test_push_and_consolidate_basic() {
677        let mut updates = UpdatesTyped::<TestUpdate>::default();
678        updates.push((&1, &10, &100, &1));
679        updates.push((&1, &10, &100, &2));
680        updates.push((&2, &20, &200, &5));
681        assert_eq!(updates.len(), 3);
682        assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 3), (2, 20, 200, 5)]);
683    }
684
685    #[test]
686    fn test_cancellation() {
687        let mut updates = UpdatesTyped::<TestUpdate>::default();
688        updates.push((&1, &10, &100, &3));
689        updates.push((&1, &10, &100, &-3));
690        updates.push((&2, &20, &200, &1));
691        assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 1)]);
692    }
693
694    #[test]
695    fn test_multiple_vals_and_times() {
696        let mut updates = UpdatesTyped::<TestUpdate>::default();
697        updates.push((&1, &10, &100, &1));
698        updates.push((&1, &10, &200, &2));
699        updates.push((&1, &20, &100, &3));
700        updates.push((&1, &20, &100, &4));
701        assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 200, 2), (1, 20, 100, 7)]);
702    }
703
704    #[test]
705    fn test_val_cancellation_propagates() {
706        let mut updates = UpdatesTyped::<TestUpdate>::default();
707        updates.push((&1, &10, &100, &5));
708        updates.push((&1, &10, &100, &-5));
709        updates.push((&1, &20, &100, &1));
710        assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 1)]);
711    }
712
713    #[test]
714    fn test_empty() {
715        let updates = UpdatesTyped::<TestUpdate>::default();
716        assert_eq!(collect(&updates.consolidate()), vec![]);
717    }
718
719    #[test]
720    fn test_total_cancellation() {
721        let mut updates = UpdatesTyped::<TestUpdate>::default();
722        updates.push((&1, &10, &100, &1));
723        updates.push((&1, &10, &100, &-1));
724        assert_eq!(collect(&updates.consolidate()), vec![]);
725    }
726
727    #[test]
728    fn test_unsorted_input() {
729        let mut updates = UpdatesTyped::<TestUpdate>::default();
730        updates.push((&3, &30, &300, &1));
731        updates.push((&1, &10, &100, &2));
732        updates.push((&2, &20, &200, &3));
733        assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 2), (2, 20, 200, 3), (3, 30, 300, 1)]);
734    }
735
736    #[test]
737    fn test_first_key_cancels() {
738        let mut updates = UpdatesTyped::<TestUpdate>::default();
739        updates.push((&1, &10, &100, &5));
740        updates.push((&1, &10, &100, &-5));
741        updates.push((&2, &20, &200, &3));
742        assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 3)]);
743    }
744
745    #[test]
746    fn test_middle_time_cancels() {
747        let mut updates = UpdatesTyped::<TestUpdate>::default();
748        updates.push((&1, &10, &100, &1));
749        updates.push((&1, &10, &200, &2));
750        updates.push((&1, &10, &200, &-2));
751        updates.push((&1, &10, &300, &3));
752        assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 300, 3)]);
753    }
754
755    #[test]
756    fn test_first_val_cancels() {
757        let mut updates = UpdatesTyped::<TestUpdate>::default();
758        updates.push((&1, &10, &100, &1));
759        updates.push((&1, &10, &100, &-1));
760        updates.push((&1, &20, &100, &5));
761        assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 5)]);
762    }
763
764    #[test]
765    fn test_interleaved_cancellations() {
766        let mut updates = UpdatesTyped::<TestUpdate>::default();
767        updates.push((&1, &10, &100, &1));
768        updates.push((&1, &10, &100, &-1));
769        updates.push((&2, &20, &200, &7));
770        updates.push((&3, &30, &300, &4));
771        updates.push((&3, &30, &300, &-4));
772        assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 7)]);
773    }
774}