Skip to main content

differential_dataflow/columnar/arrangement/
trie_merger.rs

1//! Batch-at-a-time merging of sorted, consolidated `UpdatesTyped` chains.
2//!
3//! The core is `merge_batches`, which walks pairs of chunks via
4//! `merge_batch`, building a chain of merged outputs with `ChainBuilder`.
5//! `survey` maps the interleaving of the two inputs at each trie layer,
6//! `write_from_surveys` (via `write_layer` and `write_diffs`) copies the
7//! ranges that the surveys identify into the output trie.
8
9use columnar::{Columnar, Len};
10use timely::progress::frontier::{Antichain, AntichainRef};
11
12use super::super::layout::ColumnarUpdate as Update;
13use super::super::updates::UpdatesTyped;
14
15/// A merging iterator over two sorted iterators.
16struct Merging<I1: Iterator, I2: Iterator> {
17    iter1: std::iter::Peekable<I1>,
18    iter2: std::iter::Peekable<I2>,
19}
20
21impl<K, V, T, D, I1, I2> Iterator for Merging<I1, I2>
22where
23    K: Copy + Ord,
24    V: Copy + Ord,
25    T: Copy + Ord,
26    I1: Iterator<Item = (K, V, T, D)>,
27    I2: Iterator<Item = (K, V, T, D)>,
28{
29    type Item = (K, V, T, D);
30    #[inline]
31    fn next(&mut self) -> Option<Self::Item> {
32        match (self.iter1.peek(), self.iter2.peek()) {
33            (Some(a), Some(b)) => {
34                if (a.0, a.1, a.2) <= (b.0, b.1, b.2) {
35                    self.iter1.next()
36                } else {
37                    self.iter2.next()
38                }
39            }
40            (Some(_), None) => self.iter1.next(),
41            (None, Some(_)) => self.iter2.next(),
42            (None, None) => None,
43        }
44    }
45}
46
47/// Build sorted `UpdatesTyped` chunks from a sorted iterator of refs,
48/// using `UpdatesTyped::form` (which consolidates internally) on batches.
49fn form_chunks<'a, U: Update>(
50    sorted: impl Iterator<Item = columnar::Ref<'a, super::super::updates::Tuple<U>>>,
51    output: &mut Vec<UpdatesTyped<U>>,
52) {
53    let mut sorted = sorted.peekable();
54    while sorted.peek().is_some() {
55        let chunk = UpdatesTyped::<U>::form((&mut sorted).take(crate::columnar::LINK_TARGET));
56        if chunk.len() > 0 {
57            output.push(chunk);
58        }
59    }
60}
61
62/// Partition `merged` into chunks ready to ship (times strictly less than `upper`)
63/// and chunks kept for future seals (times at-or-after `upper`), updating
64/// `frontier` to the antichain of kept times. `merged` is consumed lazily,
65/// and outputs flow through `ship` / `kept` sinks so the caller can spill or
66/// forward as chunks are produced rather than buffering them.
67pub fn extract<U, I, FShip, FKept>(
68    merged: I,
69    upper: AntichainRef<U::Time>,
70    frontier: &mut Antichain<U::Time>,
71    mut ship: FShip,
72    mut kept: FKept,
73)
74where
75    U: Update,
76    U::Time: 'static,
77    I: IntoIterator<Item = UpdatesTyped<U>>,
78    FShip: FnMut(UpdatesTyped<U>),
79    FKept: FnMut(UpdatesTyped<U>),
80{
81    use columnar::{Container, ContainerOf, Index, Push};
82    use columnar::primitive::offsets::Strides;
83    use crate::columnar::updates::{Lists, retain_items};
84
85    // TODO: rework to move from trie structure to trie structure.
86    let mut time_owned = U::Time::default();
87    let mut bitmap = Vec::new();    // update should be kept.
88    for chunk in merged {
89        bitmap.clear();
90        let view = chunk.view();
91        let times = view.times.values;
92        for idx in 0 .. times.len() {
93            Columnar::copy_from(&mut time_owned, times.get(idx));
94            if upper.less_equal(&time_owned) {
95                frontier.insert_ref(&time_owned);
96                bitmap.push(true);
97            }
98            else { bitmap.push(false); }
99        }
100        if bitmap.iter().all(|x| *x) { kept(chunk); }
101        else if bitmap.iter().all(|x| !*x) { ship(chunk); }
102        else {
103
104            let (times, temp) = retain_items::<ContainerOf<U::Time>>(view.times, &bitmap[..]);
105            let (vals, temp) = retain_items::<ContainerOf<U::Val>>(view.vals, &temp[..]);
106            let (keys, _temp) = retain_items::<ContainerOf<U::Key>>(view.keys, &temp[..]);
107            let d_borrow = view.diffs;
108            let mut diffs = <Lists::<ContainerOf<U::Diff>> as Container>::with_capacity_for([d_borrow].into_iter());
109            for (index, bit) in bitmap.iter().enumerate() {
110                if *bit { diffs.values.push(d_borrow.values.get(index)); }
111            }
112            diffs.bounds = Strides::new(1, times.values.len() as u64);
113            kept(UpdatesTyped {
114                keys,
115                vals,
116                times,
117                diffs,
118            });
119
120            for bit in bitmap.iter_mut() { *bit = !*bit; }
121
122            let (times, temp) = retain_items::<ContainerOf<U::Time>>(view.times, &bitmap[..]);
123            let (vals, temp) = retain_items::<ContainerOf<U::Val>>(view.vals, &temp[..]);
124            let (keys, _temp) = retain_items::<ContainerOf<U::Key>>(view.keys, &temp[..]);
125            let d_borrow = view.diffs;
126            let mut diffs = <Lists::<ContainerOf<U::Diff>> as Container>::with_capacity_for([d_borrow].into_iter());
127            for (index, bit) in bitmap.iter().enumerate() {
128                if *bit { diffs.values.push(d_borrow.values.get(index)); }
129            }
130            diffs.bounds = Strides::new(1, times.values.len() as u64);
131            ship(UpdatesTyped {
132                keys,
133                vals,
134                times,
135                diffs,
136            });
137        }
138    }
139}
140
141/// Iterator-based merge: flatten, merge, consolidate, form.
142/// Correct but slow — used as fallback.
143#[allow(dead_code)]
144fn merge_iterator<U: Update>(
145    list1: &[UpdatesTyped<U>],
146    list2: &[UpdatesTyped<U>],
147    output: &mut Vec<UpdatesTyped<U>>,
148)
149where
150    U::Time: 'static,
151{
152    let iter1 = list1.iter().flat_map(|chunk| chunk.iter());
153    let iter2 = list2.iter().flat_map(|chunk| chunk.iter());
154
155    let merged = Merging {
156        iter1: iter1.peekable(),
157        iter2: iter2.peekable(),
158    };
159
160    form_chunks::<U>(merged, output);
161}
162
163/// A merge implementation that operates batch-at-a-time.
164///
165/// Inputs are taken as `IntoIterator` so the caller can stream chunks in
166/// lazily — e.g. fetching paged-out chunks one group at a time — rather than
167/// materializing entire chains up front. Output chunks are emitted via the
168/// caller-supplied `sink` as they become stable, so the caller can apply a
169/// spill policy mid-merge rather than buffering the full result.
170#[inline(never)]
171pub fn merge_batches<U, I1, I2, S>(
172    list1: I1,
173    list2: I2,
174    sink: S,
175)
176where
177    U: Update,
178    U::Time: 'static,
179    I1: IntoIterator<Item = UpdatesTyped<U>>,
180    I2: IntoIterator<Item = UpdatesTyped<U>>,
181    S: FnMut(UpdatesTyped<U>),
182{
183
184    // The design for efficient "batch" merginging of chains of links is:
185    // 0.   We choose a target link size, K, and will keep the average link size at least K and the max size at 2k.
186    //      K should be large enough to amortize some set-up, but not so large that one or two extra break the bank.
187    // 1.   We will repeatedly consider pairs of links, and fully merge one with a prefix of the other.
188    //      The last elements of each link will tell us which of the two suffixes must be held back.
189    // 2.   We then have a chain of as many links as we started with, with potential defects to correct:
190    //      a.  A link may contain some number of zeros: we can remove them if we are eager, based on size.
191    //      b.  A link may contain more than 2K updates; we can split it.
192    //      c.  Two adjacent links may contain fewer than 2K updates; we can meld (careful append) them.
193    // 3.   After a pass of the above, we should have restored the invariant.
194    //      We can try and me smarter and fuse some of the above work rather than explicitly stage results.
195    //
196    // The challenging moment is the merge that can start with a suffix of one link, involving a prefix of one link.
197    // These could be the same link, different links, and generally there is the potential for complexity here.
198
199    let mut builder = ChainBuilder::new(sink);
200
201    let mut iter1 = list1.into_iter();
202    let mut iter2 = list2.into_iter();
203
204    // The first unconsumed update in each block, via (k_idx, v_idx, t_idx), or None if exhausted.
205    // These are (0,0,0) for a new block, and should become None once there are no remaining updates.
206    let mut cursor1 = iter1.next().map(|b| ((0,0,0), b));
207    let mut cursor2 = iter2.next().map(|b| ((0,0,0), b));
208
209    // For each pair of batches
210    while cursor1.is_some() && cursor2.is_some() {
211        merge_batch(&mut cursor1, &mut cursor2, &mut builder);
212        if cursor1.is_none() { cursor1 = iter1.next().map(|b| ((0,0,0), b)); }
213        if cursor2.is_none() { cursor2 = iter2.next().map(|b| ((0,0,0), b)); }
214    }
215
216    // TODO: create batch for the non-empty cursor.
217    if let Some(((k,v,t),batch)) = cursor1 {
218        let mut out_batch = UpdatesTyped::<U>::default();
219        let empty: UpdatesTyped<U> = Default::default();
220        let view = batch.view();
221        write_from_surveys(
222            &batch,
223            &empty,
224            &[Report::This(0, 1)],
225            &[Report::This(k, view.keys.values.len())],
226            &[Report::This(v, view.vals.values.len())],
227            &[Report::This(t, view.times.values.len())],
228            &mut out_batch,
229        );
230        builder.push(out_batch);
231    }
232    if let Some(((k,v,t),batch)) = cursor2 {
233        let mut out_batch = UpdatesTyped::<U>::default();
234        let empty: UpdatesTyped<U> = Default::default();
235        let view = batch.view();
236        write_from_surveys(
237            &empty,
238            &batch,
239            &[Report::That(0, 1)],
240            &[Report::That(k, view.keys.values.len())],
241            &[Report::That(v, view.vals.values.len())],
242            &[Report::That(t, view.times.values.len())],
243            &mut out_batch,
244        );
245        builder.push(out_batch);
246    }
247
248    builder.extend(iter1);
249    builder.extend(iter2);
250    builder.done();
251    // TODO: Tidy output to satisfy structural invariants.
252}
253
254/// Merge two batches, one completely and another through the corresponding prefix.
255///
256/// Each invocation determines the maximum amount of both batches we can merge, determined
257/// by comparing the elements at the tails of each batch, and locating the lesser in other.
258/// We will merge the whole of the batch containing the lesser, and the prefix up through
259/// the lesser element in the other batch, setting the cursor to the first element strictly
260/// greater than that lesser element.
261///
262/// The algorithm uses a list of `Report` findings to map the interleavings of the layers.
263/// Each indicates either a range exclusive to one of the inputs, or a one element common
264/// to the layers from both inputs, which must be further explored. This map would normally
265/// allow the full merge to happen, but we need to carefully start at each cursor, and end
266/// just before the first element greater than the lesser bound.
267///
268/// The consumed prefix and disjoint suffix should be single report entries, and it seems
269/// fine to first produce all reports and then reflect on the cursors, rather than use the
270/// cursors as part of the mapping.
271#[inline(never)]
272fn merge_batch<U: Update, F: FnMut(UpdatesTyped<U>)>(
273    batch1: &mut Option<((usize, usize, usize), UpdatesTyped<U>)>,
274    batch2: &mut Option<((usize, usize, usize), UpdatesTyped<U>)>,
275    builder: &mut ChainBuilder<U, F>,
276)
277where
278    U::Time: 'static,
279{
280    // TODO: Optimization for one batch exceeding the other.
281
282    let ((k0_idx, v0_idx, t0_idx), updates0) = batch1.take().unwrap();
283    let ((k1_idx, v1_idx, t1_idx), updates1) = batch2.take().unwrap();
284
285    let view0 = updates0.view();
286    let view1 = updates1.view();
287    let keys0 = view0.keys;
288    let keys1 = view1.keys;
289    let vals0 = view0.vals;
290    let vals1 = view1.vals;
291    let times0 = view0.times;
292    let times1 = view1.times;
293
294    // Survey the interleaving of the two inputs.
295    let mut key_survey = survey::<columnar::ContainerOf<U::Key>>(keys0, keys1, &[Report::Both(0,0)]);
296    let mut val_survey = survey::<columnar::ContainerOf<U::Val>>(vals0, vals1, &key_survey);
297    let mut time_survey = survey::<columnar::ContainerOf<U::Time>>(times0, times1, &val_survey);
298
299    // We now know enough to start writing into an output batch.
300    // We should update the input surveys to reflect the subset
301    // of data that we want.
302    //
303    // At most one cursor should be non-zero (assert!).
304    // A non-zero cursor must correspond to the first entry of the surveys,
305    // as there is at least one consumed update that precedes the other batch.
306    // We need to nudge that report forward to align with the cursor, potentially
307    // squeezing the report to nothing (to the upper bound).
308
309    // We start by updating the surveys to reflect the cursors.
310    // If either cursor is set, then its batch has an element strictly less than the other batch.
311    // We therefore expect to find a prefix of This/That at the start of the survey.
312    if (k0_idx, v0_idx, t0_idx) != (0,0,0) {
313        let mut done = false; while !done { if let Report::This(l,u) = &mut key_survey[0] { if *u <= k0_idx { key_survey.remove(0); } else { *l = k0_idx; done = true; } } else { done = true; } }
314        let mut done = false; while !done { if let Report::This(l,u) = &mut val_survey[0] { if *u <= v0_idx { val_survey.remove(0); } else { *l = v0_idx; done = true; } } else { done = true; } }
315        let mut done = false; while !done { if let Report::This(l,u) = &mut time_survey[0] { if *u <= t0_idx { time_survey.remove(0); } else { *l = t0_idx; done = true; } } else { done = true; } }
316    }
317
318    if (k1_idx, v1_idx, t1_idx) != (0,0,0) {
319        let mut done = false; while !done { if let Report::That(l,u) = &mut key_survey[0] { if *u <= k1_idx { key_survey.remove(0); } else { *l = k1_idx; done = true; } } else { done = true; } }
320        let mut done = false; while !done { if let Report::That(l,u) = &mut val_survey[0] { if *u <= v1_idx { val_survey.remove(0); } else { *l = v1_idx; done = true; } } else { done = true; } }
321        let mut done = false; while !done { if let Report::That(l,u) = &mut time_survey[0] { if *u <= t1_idx { time_survey.remove(0); } else { *l = t1_idx; done = true; } } else { done = true; } }
322    }
323
324    // We want to trim the tails of the surveys to only cover ranges present in both inputs.
325    // We can determine which was "longer" by looking at the last entry of the bottom layer,
326    // which tells us which input (or both) contained the last element.
327    //
328    // From the bottom layer up, we'll identify the index of the last item, and then determine
329    // the index of the list it belongs to. We use that index in the next layer, to locate the
330    // index of the list it belongs to, on upward.
331    let next_cursor = match time_survey.last().unwrap() {
332        Report::This(_,_) => {
333            // Collect the last value indexes known to strictly exceed an entry in the other batch.
334            let mut t = times0.values.len();
335            while let Some(Report::This(l,_)) = time_survey.last() { t = *l; time_survey.pop(); }
336            let mut v = vals0.values.len();
337            while let Some(Report::This(l,_)) = val_survey.last() { v = *l; val_survey.pop(); }
338            let mut k = keys0.values.len();
339            while let Some(Report::This(l,_)) = key_survey.last() { k = *l; key_survey.pop(); }
340            // Now we may need to correct by nudging down.
341            if v == times0.len() || times0.bounds.bounds(v).0 > t { v -= 1; }
342            if k == vals0.len() || vals0.bounds.bounds(k).0 > v { k -= 1; }
343            Some(Ok((k,v,t)))
344        }
345        Report::Both(_,_) => { None }
346        Report::That(_,_) => {
347            // Collect the last value indexes known to strictly exceed an entry in the other batch.
348            let mut t = times1.values.len();
349            while let Some(Report::That(l,_)) = time_survey.last() { t = *l; time_survey.pop(); }
350            let mut v = vals1.values.len();
351            while let Some(Report::That(l,_)) = val_survey.last() { v = *l; val_survey.pop(); }
352            let mut k = keys1.values.len();
353            while let Some(Report::That(l,_)) = key_survey.last() { k = *l; key_survey.pop(); }
354            // Now we may need to correct by nudging down.
355            if v == times1.len() || times1.bounds.bounds(v).0 > t { v -= 1; }
356            if k == vals1.len() || vals1.bounds.bounds(k).0 > v { k -= 1; }
357            Some(Err((k,v,t)))
358        }
359    };
360
361    // Having updated the surveys, we now copy over the ranges they identify.
362    let mut out_batch = UpdatesTyped::<U>::default();
363    // TODO: We should be able to size `out_batch` pretty accurately from the survey.
364    write_from_surveys(&updates0, &updates1, &[Report::Both(0,0)], &key_survey, &val_survey, &time_survey, &mut out_batch);
365    builder.push(out_batch);
366
367    match next_cursor {
368        Some(Ok(kvt)) => { *batch1 = Some((kvt, updates0)); }
369        Some(Err(kvt)) => {*batch2 = Some((kvt, updates1)); }
370        None => { }
371    }
372}
373
374/// Write merged output from four levels of survey reports.
375///
376/// Each layer is written independently: `write_layer` handles keys, vals,
377/// and times; `write_diffs` handles diff consolidation.
378#[inline(never)]
379fn write_from_surveys<U: Update>(
380    updates0: &UpdatesTyped<U>,
381    updates1: &UpdatesTyped<U>,
382    root_survey: &[Report],
383    key_survey: &[Report],
384    val_survey: &[Report],
385    time_survey: &[Report],
386    output: &mut UpdatesTyped<U>,
387) {
388    let view0 = updates0.view();
389    let view1 = updates1.view();
390    write_layer(view0.keys, view1.keys, root_survey, key_survey, &mut output.keys);
391    write_layer(view0.vals, view1.vals, key_survey, val_survey, &mut output.vals);
392    write_layer(view0.times, view1.times, val_survey, time_survey, &mut output.times);
393    write_diffs::<U>(view0.diffs, view1.diffs, time_survey, &mut output.diffs);
394}
395
396/// From two sequences of interleaved lists, map out the interleaving of their values.
397///
398/// The sequence of input reports identify constraints on the sorted order of lists in the two inputs,
399/// callout out ranges of each that are exclusively order, and elements that have equal prefixes and
400/// therefore "overlap" and should be further investigated through the values of the lists.
401///
402/// The output should have the same form but for the next layer: subject to the ordering of `reports`,
403/// a similar report for the values of the two lists, appropriate for the next layer.
404#[inline(never)]
405pub fn survey<'a, C: columnar::Container<Ref<'a>: Ord>>(
406    lists0: <super::super::updates::Lists<C> as columnar::Borrow>::Borrowed<'a>,
407    lists1: <super::super::updates::Lists<C> as columnar::Borrow>::Borrowed<'a>,
408    reports: &[Report],
409) -> Vec<Report> {
410    use columnar::Index;
411    let mut output = Vec::with_capacity(reports.len()); // may grow larger, but at least this large.
412    for report in reports.iter() {
413        match report {
414            Report::This(lower0, upper0) => {
415                let (new_lower, _) = lists0.bounds.bounds(*lower0);
416                let (_, new_upper) = lists0.bounds.bounds(*upper0-1);
417                output.push(Report::This(new_lower, new_upper));
418            }
419            Report::Both(index0, index1) => {
420
421                // Fetch the bounds from the layers.
422                let (mut lower0, upper0) = lists0.bounds.bounds(*index0);
423                let (mut lower1, upper1) = lists1.bounds.bounds(*index1);
424
425                // Scour the intersecting range for matches.
426                while lower0 < upper0 && lower1 < upper1 {
427                    let val0 = lists0.values.get(lower0);
428                    let val1 = lists1.values.get(lower1);
429                    match val0.cmp(&val1) {
430                        std::cmp::Ordering::Less => {
431                            let start = lower0;
432                            lower0 += 1;
433                            gallop(lists0.values, &mut lower0, upper0, |x| x < val1);
434                            output.push(Report::This(start, lower0));
435                        },
436                        std::cmp::Ordering::Equal => {
437                            output.push(Report::Both(lower0, lower1));
438                            lower0 += 1;
439                            lower1 += 1;
440                        },
441                        std::cmp::Ordering::Greater => {
442                            let start = lower1;
443                            lower1 += 1;
444                            gallop(lists1.values, &mut lower1, upper1, |x| x < val0);
445                            output.push(Report::That(start, lower1));
446                        },
447                    }
448                }
449                if lower0 < upper0 { output.push(Report::This(lower0, upper0)); }
450                if lower1 < upper1 { output.push(Report::That(lower1, upper1)); }
451
452            }
453            Report::That(lower1, upper1) => {
454                let (new_lower, _) = lists1.bounds.bounds(*lower1);
455                let (_, new_upper) = lists1.bounds.bounds(*upper1-1);
456                output.push(Report::That(new_lower, new_upper));
457            }
458        }
459    }
460
461    output
462}
463
464/// Write one layer of merged output from a list survey and item survey.
465///
466/// The list survey describes which lists to produce (from the layer above).
467/// The item survey describes how the items within those lists interleave.
468/// Both surveys are consumed completely; a mismatch is a bug.
469///
470/// Pruning (from cursor adjustments) can affect the first and last list
471/// survey entries: the item survey's ranges may not match the natural
472/// bounds of those lists. Middle entries are guaranteed unpruned and can
473/// be bulk-copied.
474#[inline(never)]
475pub fn write_layer<'a, C: columnar::Container<Ref<'a>: Ord>>(
476    lists0: <super::super::updates::Lists<C> as columnar::Borrow>::Borrowed<'a>,
477    lists1: <super::super::updates::Lists<C> as columnar::Borrow>::Borrowed<'a>,
478    list_survey: &[Report],
479    item_survey: &[Report],
480    output: &mut super::super::updates::Lists<C>,
481) {
482    use columnar::{Container, Index};
483
484    let mut item_idx = 0;
485
486    for (pos, list_report) in list_survey.iter().enumerate() {
487        let is_first = pos == 0;
488        let is_last = pos == list_survey.len() - 1;
489        let may_be_pruned = is_first || is_last;
490
491        match list_report {
492            Report::This(lo, hi) => {
493                let Report::This(item_lo, item_hi) = item_survey[item_idx] else { unreachable!("Expected This in item survey for This list") };
494                item_idx += 1;
495                if may_be_pruned {
496                    // Item range may not match natural bounds; copy items in bulk
497                    // but compute per-list bounds from natural bounds clamped to
498                    // the item range.
499                    let base = output.values.len();
500                    output.values.extend_from_self(lists0.values, item_lo..item_hi);
501                    for i in *lo..*hi {
502                        let (_, nat_hi) = lists0.bounds.bounds(i);
503                        output.bounds.push((base + nat_hi.min(item_hi) - item_lo) as u64);
504                    }
505                } else {
506                    output.extend_from_self(lists0, *lo..*hi);
507                }
508            }
509            Report::That(lo, hi) => {
510                let Report::That(item_lo, item_hi) = item_survey[item_idx] else { unreachable!("Expected That in item survey for That list") };
511                item_idx += 1;
512                if may_be_pruned {
513                    let base = output.values.len();
514                    output.values.extend_from_self(lists1.values, item_lo..item_hi);
515                    for i in *lo..*hi {
516                        let (_, nat_hi) = lists1.bounds.bounds(i);
517                        output.bounds.push((base + nat_hi.min(item_hi) - item_lo) as u64);
518                    }
519                } else {
520                    output.extend_from_self(lists1, *lo..*hi);
521                }
522            }
523            Report::Both(i0, i1) => {
524                // Merge: consume item survey entries until both sides are covered.
525                let (mut c0, end0) = lists0.bounds.bounds(*i0);
526                let (mut c1, end1) = lists1.bounds.bounds(*i1);
527                while (c0 < end0 || c1 < end1) && item_idx < item_survey.len() {
528                    match item_survey[item_idx] {
529                        Report::This(lo, hi) => {
530                            if lo >= end0 { break; }
531                            output.values.extend_from_self(lists0.values, lo..hi);
532                            c0 = hi;
533                        }
534                        Report::That(lo, hi) => {
535                            if lo >= end1 { break; }
536                            output.values.extend_from_self(lists1.values, lo..hi);
537                            c1 = hi;
538                        }
539                        Report::Both(v0, v1) => {
540                            if v0 >= end0 && v1 >= end1 { break; }
541                            output.values.push(lists0.values.get(v0));
542                            c0 = v0 + 1;
543                            c1 = v1 + 1;
544                        }
545                    }
546                    item_idx += 1;
547                }
548                output.bounds.push(output.values.len() as u64);
549            }
550        }
551    }
552}
553
554/// Write the diff layer from a time survey and two diff inputs.
555///
556/// The time survey is the item-level survey for the time layer, which
557/// doubles as the list survey for diffs (one diff list per time entry).
558///
559/// - `This(lo, hi)`: bulk-copy diff lists from input 0.
560/// - `That(lo, hi)`: bulk-copy diff lists from input 1.
561/// - `Both(t0, t1)`: consolidate the two singleton diffs. Push `[sum]`
562///   if non-zero, or an empty list `[]` if they cancel.
563#[inline(never)]
564pub fn write_diffs<U: super::super::layout::ColumnarUpdate>(
565    diffs0: <super::super::updates::Lists<columnar::ContainerOf<U::Diff>> as columnar::Borrow>::Borrowed<'_>,
566    diffs1: <super::super::updates::Lists<columnar::ContainerOf<U::Diff>> as columnar::Borrow>::Borrowed<'_>,
567    time_survey: &[Report],
568    output: &mut super::super::updates::Lists<columnar::ContainerOf<U::Diff>>,
569) {
570    use columnar::{Columnar, Container, Index, Len, Push};
571    use crate::difference::{Semigroup, IsZero};
572
573    for report in time_survey.iter() {
574        match report {
575            Report::This(lo, hi) => { output.extend_from_self(diffs0, *lo..*hi); }
576            Report::That(lo, hi) => { output.extend_from_self(diffs1, *lo..*hi); }
577            Report::Both(t0, t1) => {
578                // Read singleton diffs via list bounds, consolidate.
579                let (d0_lo, d0_hi) = diffs0.bounds.bounds(*t0);
580                let (d1_lo, d1_hi) = diffs1.bounds.bounds(*t1);
581                assert_eq!(d0_hi - d0_lo, 1, "Expected singleton diff list at t0={t0}");
582                assert_eq!(d1_hi - d1_lo, 1, "Expected singleton diff list at t1={t1}");
583                let mut diff: U::Diff = Columnar::into_owned(diffs0.values.get(d0_lo));
584                diff.plus_equals(&Columnar::into_owned(diffs1.values.get(d1_lo)));
585                if !diff.is_zero() { output.values.push(&diff); }
586                output.bounds.push(output.values.len() as u64);
587            }
588        }
589    }
590}
591
592/// Increments `index` until just after the last element of `input` to satisfy `cmp`.
593///
594/// The method assumes that `cmp` is monotonic, never becoming true once it is false.
595/// If an `upper` is supplied, it acts as a constraint on the interval of `input` explored.
596#[inline(always)]
597pub(crate) fn gallop<C: columnar::Index>(input: C, lower: &mut usize, upper: usize, mut cmp: impl FnMut(<C as columnar::Index>::Ref) -> bool) {
598    // if empty input, or already >= element, return
599    if *lower < upper && cmp(input.get(*lower)) {
600        let mut step = 1;
601        while *lower + step < upper && cmp(input.get(*lower + step)) {
602            *lower += step;
603            step <<= 1;
604        }
605
606        step >>= 1;
607        while step > 0 {
608            if *lower + step < upper && cmp(input.get(*lower + step)) {
609                *lower += step;
610            }
611            step >>= 1;
612        }
613
614        *lower += 1;
615    }
616}
617
618/// A report we would expect to see in a sequence about two layers.
619///
620/// A sequence of these reports reveal an ordered traversal of the keys
621/// of two layers, with ranges exclusive to one, ranges exclusive to the
622/// other, and individual elements (not ranges) common to both.
623#[derive(Copy, Clone, columnar::Columnar, Debug)]
624pub enum Report {
625    /// Range of indices in this input.
626    This(usize, usize),
627    /// Range of indices in that input.
628    That(usize, usize),
629    /// Matching indices in both inputs.
630    Both(usize, usize),
631}
632
633/// Accumulates `UpdatesTyped` chunks one at a time, melding small adjacent
634/// chunks. Holds at most one chunk in memory (the meld target); whenever a
635/// push doesn't meld, the prior target becomes "stable" and is emitted via
636/// the caller-provided sink. The sink can spill, count, or forward the chunk
637/// however it likes.
638pub struct ChainBuilder<U: super::super::layout::ColumnarUpdate, F: FnMut(UpdatesTyped<U>)> {
639    last: Option<UpdatesTyped<U>>,
640    sink: F,
641}
642
643impl<U: super::super::layout::ColumnarUpdate, F: FnMut(UpdatesTyped<U>)> ChainBuilder<U, F>
644where
645    U::Time: 'static,
646{
647    fn new(sink: F) -> Self { Self { last: None, sink } }
648
649    fn push(&mut self, mut link: UpdatesTyped<U>) {
650        link = link.filter_zero();
651        if link.len() == 0 { return; }
652        // Split links larger than twice the link target so downstream chains
653        // have multiple entries — required for per-chain spill policies (e.g.
654        // `Threshold` in the columnar_spill example) to actually spill anything.
655        if link.len() > 2 * crate::columnar::LINK_TARGET {
656            let (first, rest) = split_at::<U>(link, crate::columnar::LINK_TARGET);
657            self.push(first);
658            self.push(rest);
659            return;
660        }
661        match self.last.as_mut() {
662            Some(last) if last.len() + link.len() < 2 * crate::columnar::LINK_TARGET => {
663                let mut build = super::super::updates::UpdatesBuilder::new_from(std::mem::take(last));
664                build.meld(&link);
665                *last = build.done();
666            }
667            _ => {
668                if let Some(prev) = self.last.take() {
669                    (self.sink)(prev);
670                }
671                self.last = Some(link);
672            }
673        }
674    }
675    fn extend(&mut self, iter: impl IntoIterator<Item=UpdatesTyped<U>>) {
676        for link in iter { self.push(link); }
677    }
678    fn done(mut self) {
679        if let Some(last) = self.last.take() {
680            (self.sink)(last);
681        }
682    }
683}
684
685/// Split `chunk` into two `UpdatesTyped` parts at record index `n`: the first
686/// `n` records and the remaining `chunk.len() - n`. Bitmap pattern mirrors
687/// `extract`'s split between ship/kept halves.
688fn split_at<U: Update>(chunk: UpdatesTyped<U>, n: usize) -> (UpdatesTyped<U>, UpdatesTyped<U>)
689where
690    U::Time: 'static,
691{
692    use columnar::{Container, ContainerOf, Index, Push};
693    use columnar::primitive::offsets::Strides;
694    use crate::columnar::updates::{Lists, retain_items};
695
696    let total = chunk.len();
697    if n == 0 { return (UpdatesTyped::default(), chunk); }
698    if n >= total { return (chunk, UpdatesTyped::default()); }
699
700    let view = chunk.view();
701    let mut bitmap: Vec<bool> = (0..total).map(|i| i < n).collect();
702
703    // First half: records [0, n).
704    let (times, temp) = retain_items::<ContainerOf<U::Time>>(view.times, &bitmap[..]);
705    let (vals, temp) = retain_items::<ContainerOf<U::Val>>(view.vals, &temp[..]);
706    let (keys, _temp) = retain_items::<ContainerOf<U::Key>>(view.keys, &temp[..]);
707    let d_borrow = view.diffs;
708    let mut diffs = <Lists::<ContainerOf<U::Diff>> as Container>::with_capacity_for([d_borrow].into_iter());
709    for (i, &bit) in bitmap.iter().enumerate() {
710        if bit { diffs.values.push(d_borrow.values.get(i)); }
711    }
712    diffs.bounds = Strides::new(1, times.values.len() as u64);
713    let first = UpdatesTyped { keys, vals, times, diffs };
714
715    // Invert and build second half: records [n, total).
716    for bit in bitmap.iter_mut() { *bit = !*bit; }
717    let (times, temp) = retain_items::<ContainerOf<U::Time>>(view.times, &bitmap[..]);
718    let (vals, temp) = retain_items::<ContainerOf<U::Val>>(view.vals, &temp[..]);
719    let (keys, _temp) = retain_items::<ContainerOf<U::Key>>(view.keys, &temp[..]);
720    let mut diffs = <Lists::<ContainerOf<U::Diff>> as Container>::with_capacity_for([d_borrow].into_iter());
721    for (i, &bit) in bitmap.iter().enumerate() {
722        if bit { diffs.values.push(d_borrow.values.get(i)); }
723    }
724    diffs.bounds = Strides::new(1, times.values.len() as u64);
725    let second = UpdatesTyped { keys, vals, times, diffs };
726
727    (first, second)
728}