1use columnar::{Columnar, Len};
10use timely::progress::frontier::{Antichain, AntichainRef};
11
12use super::super::layout::ColumnarUpdate as Update;
13use super::super::updates::UpdatesTyped;
14
15struct Merging<I1: Iterator, I2: Iterator> {
17 iter1: std::iter::Peekable<I1>,
18 iter2: std::iter::Peekable<I2>,
19}
20
21impl<K, V, T, D, I1, I2> Iterator for Merging<I1, I2>
22where
23 K: Copy + Ord,
24 V: Copy + Ord,
25 T: Copy + Ord,
26 I1: Iterator<Item = (K, V, T, D)>,
27 I2: Iterator<Item = (K, V, T, D)>,
28{
29 type Item = (K, V, T, D);
30 #[inline]
31 fn next(&mut self) -> Option<Self::Item> {
32 match (self.iter1.peek(), self.iter2.peek()) {
33 (Some(a), Some(b)) => {
34 if (a.0, a.1, a.2) <= (b.0, b.1, b.2) {
35 self.iter1.next()
36 } else {
37 self.iter2.next()
38 }
39 }
40 (Some(_), None) => self.iter1.next(),
41 (None, Some(_)) => self.iter2.next(),
42 (None, None) => None,
43 }
44 }
45}
46
47fn form_chunks<'a, U: Update>(
50 sorted: impl Iterator<Item = columnar::Ref<'a, super::super::updates::Tuple<U>>>,
51 output: &mut Vec<UpdatesTyped<U>>,
52) {
53 let mut sorted = sorted.peekable();
54 while sorted.peek().is_some() {
55 let chunk = UpdatesTyped::<U>::form((&mut sorted).take(crate::columnar::LINK_TARGET));
56 if chunk.len() > 0 {
57 output.push(chunk);
58 }
59 }
60}
61
62pub fn extract<U, I, FShip, FKept>(
68 merged: I,
69 upper: AntichainRef<U::Time>,
70 frontier: &mut Antichain<U::Time>,
71 mut ship: FShip,
72 mut kept: FKept,
73)
74where
75 U: Update,
76 U::Time: 'static,
77 I: IntoIterator<Item = UpdatesTyped<U>>,
78 FShip: FnMut(UpdatesTyped<U>),
79 FKept: FnMut(UpdatesTyped<U>),
80{
81 use columnar::{Container, ContainerOf, Index, Push};
82 use columnar::primitive::offsets::Strides;
83 use crate::columnar::updates::{Lists, retain_items};
84
85 let mut time_owned = U::Time::default();
87 let mut bitmap = Vec::new(); for chunk in merged {
89 bitmap.clear();
90 let view = chunk.view();
91 let times = view.times.values;
92 for idx in 0 .. times.len() {
93 Columnar::copy_from(&mut time_owned, times.get(idx));
94 if upper.less_equal(&time_owned) {
95 frontier.insert_ref(&time_owned);
96 bitmap.push(true);
97 }
98 else { bitmap.push(false); }
99 }
100 if bitmap.iter().all(|x| *x) { kept(chunk); }
101 else if bitmap.iter().all(|x| !*x) { ship(chunk); }
102 else {
103
104 let (times, temp) = retain_items::<ContainerOf<U::Time>>(view.times, &bitmap[..]);
105 let (vals, temp) = retain_items::<ContainerOf<U::Val>>(view.vals, &temp[..]);
106 let (keys, _temp) = retain_items::<ContainerOf<U::Key>>(view.keys, &temp[..]);
107 let d_borrow = view.diffs;
108 let mut diffs = <Lists::<ContainerOf<U::Diff>> as Container>::with_capacity_for([d_borrow].into_iter());
109 for (index, bit) in bitmap.iter().enumerate() {
110 if *bit { diffs.values.push(d_borrow.values.get(index)); }
111 }
112 diffs.bounds = Strides::new(1, times.values.len() as u64);
113 kept(UpdatesTyped {
114 keys,
115 vals,
116 times,
117 diffs,
118 });
119
120 for bit in bitmap.iter_mut() { *bit = !*bit; }
121
122 let (times, temp) = retain_items::<ContainerOf<U::Time>>(view.times, &bitmap[..]);
123 let (vals, temp) = retain_items::<ContainerOf<U::Val>>(view.vals, &temp[..]);
124 let (keys, _temp) = retain_items::<ContainerOf<U::Key>>(view.keys, &temp[..]);
125 let d_borrow = view.diffs;
126 let mut diffs = <Lists::<ContainerOf<U::Diff>> as Container>::with_capacity_for([d_borrow].into_iter());
127 for (index, bit) in bitmap.iter().enumerate() {
128 if *bit { diffs.values.push(d_borrow.values.get(index)); }
129 }
130 diffs.bounds = Strides::new(1, times.values.len() as u64);
131 ship(UpdatesTyped {
132 keys,
133 vals,
134 times,
135 diffs,
136 });
137 }
138 }
139}
140
141#[allow(dead_code)]
144fn merge_iterator<U: Update>(
145 list1: &[UpdatesTyped<U>],
146 list2: &[UpdatesTyped<U>],
147 output: &mut Vec<UpdatesTyped<U>>,
148)
149where
150 U::Time: 'static,
151{
152 let iter1 = list1.iter().flat_map(|chunk| chunk.iter());
153 let iter2 = list2.iter().flat_map(|chunk| chunk.iter());
154
155 let merged = Merging {
156 iter1: iter1.peekable(),
157 iter2: iter2.peekable(),
158 };
159
160 form_chunks::<U>(merged, output);
161}
162
163#[inline(never)]
171pub fn merge_batches<U, I1, I2, S>(
172 list1: I1,
173 list2: I2,
174 sink: S,
175)
176where
177 U: Update,
178 U::Time: 'static,
179 I1: IntoIterator<Item = UpdatesTyped<U>>,
180 I2: IntoIterator<Item = UpdatesTyped<U>>,
181 S: FnMut(UpdatesTyped<U>),
182{
183
184 let mut builder = ChainBuilder::new(sink);
200
201 let mut iter1 = list1.into_iter();
202 let mut iter2 = list2.into_iter();
203
204 let mut cursor1 = iter1.next().map(|b| ((0,0,0), b));
207 let mut cursor2 = iter2.next().map(|b| ((0,0,0), b));
208
209 while cursor1.is_some() && cursor2.is_some() {
211 merge_batch(&mut cursor1, &mut cursor2, &mut builder);
212 if cursor1.is_none() { cursor1 = iter1.next().map(|b| ((0,0,0), b)); }
213 if cursor2.is_none() { cursor2 = iter2.next().map(|b| ((0,0,0), b)); }
214 }
215
216 if let Some(((k,v,t),batch)) = cursor1 {
218 let mut out_batch = UpdatesTyped::<U>::default();
219 let empty: UpdatesTyped<U> = Default::default();
220 let view = batch.view();
221 write_from_surveys(
222 &batch,
223 &empty,
224 &[Report::This(0, 1)],
225 &[Report::This(k, view.keys.values.len())],
226 &[Report::This(v, view.vals.values.len())],
227 &[Report::This(t, view.times.values.len())],
228 &mut out_batch,
229 );
230 builder.push(out_batch);
231 }
232 if let Some(((k,v,t),batch)) = cursor2 {
233 let mut out_batch = UpdatesTyped::<U>::default();
234 let empty: UpdatesTyped<U> = Default::default();
235 let view = batch.view();
236 write_from_surveys(
237 &empty,
238 &batch,
239 &[Report::That(0, 1)],
240 &[Report::That(k, view.keys.values.len())],
241 &[Report::That(v, view.vals.values.len())],
242 &[Report::That(t, view.times.values.len())],
243 &mut out_batch,
244 );
245 builder.push(out_batch);
246 }
247
248 builder.extend(iter1);
249 builder.extend(iter2);
250 builder.done();
251 }
253
254#[inline(never)]
272fn merge_batch<U: Update, F: FnMut(UpdatesTyped<U>)>(
273 batch1: &mut Option<((usize, usize, usize), UpdatesTyped<U>)>,
274 batch2: &mut Option<((usize, usize, usize), UpdatesTyped<U>)>,
275 builder: &mut ChainBuilder<U, F>,
276)
277where
278 U::Time: 'static,
279{
280 let ((k0_idx, v0_idx, t0_idx), updates0) = batch1.take().unwrap();
283 let ((k1_idx, v1_idx, t1_idx), updates1) = batch2.take().unwrap();
284
285 let view0 = updates0.view();
286 let view1 = updates1.view();
287 let keys0 = view0.keys;
288 let keys1 = view1.keys;
289 let vals0 = view0.vals;
290 let vals1 = view1.vals;
291 let times0 = view0.times;
292 let times1 = view1.times;
293
294 let mut key_survey = survey::<columnar::ContainerOf<U::Key>>(keys0, keys1, &[Report::Both(0,0)]);
296 let mut val_survey = survey::<columnar::ContainerOf<U::Val>>(vals0, vals1, &key_survey);
297 let mut time_survey = survey::<columnar::ContainerOf<U::Time>>(times0, times1, &val_survey);
298
299 if (k0_idx, v0_idx, t0_idx) != (0,0,0) {
313 let mut done = false; while !done { if let Report::This(l,u) = &mut key_survey[0] { if *u <= k0_idx { key_survey.remove(0); } else { *l = k0_idx; done = true; } } else { done = true; } }
314 let mut done = false; while !done { if let Report::This(l,u) = &mut val_survey[0] { if *u <= v0_idx { val_survey.remove(0); } else { *l = v0_idx; done = true; } } else { done = true; } }
315 let mut done = false; while !done { if let Report::This(l,u) = &mut time_survey[0] { if *u <= t0_idx { time_survey.remove(0); } else { *l = t0_idx; done = true; } } else { done = true; } }
316 }
317
318 if (k1_idx, v1_idx, t1_idx) != (0,0,0) {
319 let mut done = false; while !done { if let Report::That(l,u) = &mut key_survey[0] { if *u <= k1_idx { key_survey.remove(0); } else { *l = k1_idx; done = true; } } else { done = true; } }
320 let mut done = false; while !done { if let Report::That(l,u) = &mut val_survey[0] { if *u <= v1_idx { val_survey.remove(0); } else { *l = v1_idx; done = true; } } else { done = true; } }
321 let mut done = false; while !done { if let Report::That(l,u) = &mut time_survey[0] { if *u <= t1_idx { time_survey.remove(0); } else { *l = t1_idx; done = true; } } else { done = true; } }
322 }
323
324 let next_cursor = match time_survey.last().unwrap() {
332 Report::This(_,_) => {
333 let mut t = times0.values.len();
335 while let Some(Report::This(l,_)) = time_survey.last() { t = *l; time_survey.pop(); }
336 let mut v = vals0.values.len();
337 while let Some(Report::This(l,_)) = val_survey.last() { v = *l; val_survey.pop(); }
338 let mut k = keys0.values.len();
339 while let Some(Report::This(l,_)) = key_survey.last() { k = *l; key_survey.pop(); }
340 if v == times0.len() || times0.bounds.bounds(v).0 > t { v -= 1; }
342 if k == vals0.len() || vals0.bounds.bounds(k).0 > v { k -= 1; }
343 Some(Ok((k,v,t)))
344 }
345 Report::Both(_,_) => { None }
346 Report::That(_,_) => {
347 let mut t = times1.values.len();
349 while let Some(Report::That(l,_)) = time_survey.last() { t = *l; time_survey.pop(); }
350 let mut v = vals1.values.len();
351 while let Some(Report::That(l,_)) = val_survey.last() { v = *l; val_survey.pop(); }
352 let mut k = keys1.values.len();
353 while let Some(Report::That(l,_)) = key_survey.last() { k = *l; key_survey.pop(); }
354 if v == times1.len() || times1.bounds.bounds(v).0 > t { v -= 1; }
356 if k == vals1.len() || vals1.bounds.bounds(k).0 > v { k -= 1; }
357 Some(Err((k,v,t)))
358 }
359 };
360
361 let mut out_batch = UpdatesTyped::<U>::default();
363 write_from_surveys(&updates0, &updates1, &[Report::Both(0,0)], &key_survey, &val_survey, &time_survey, &mut out_batch);
365 builder.push(out_batch);
366
367 match next_cursor {
368 Some(Ok(kvt)) => { *batch1 = Some((kvt, updates0)); }
369 Some(Err(kvt)) => {*batch2 = Some((kvt, updates1)); }
370 None => { }
371 }
372}
373
374#[inline(never)]
379fn write_from_surveys<U: Update>(
380 updates0: &UpdatesTyped<U>,
381 updates1: &UpdatesTyped<U>,
382 root_survey: &[Report],
383 key_survey: &[Report],
384 val_survey: &[Report],
385 time_survey: &[Report],
386 output: &mut UpdatesTyped<U>,
387) {
388 let view0 = updates0.view();
389 let view1 = updates1.view();
390 write_layer(view0.keys, view1.keys, root_survey, key_survey, &mut output.keys);
391 write_layer(view0.vals, view1.vals, key_survey, val_survey, &mut output.vals);
392 write_layer(view0.times, view1.times, val_survey, time_survey, &mut output.times);
393 write_diffs::<U>(view0.diffs, view1.diffs, time_survey, &mut output.diffs);
394}
395
396#[inline(never)]
405pub fn survey<'a, C: columnar::Container<Ref<'a>: Ord>>(
406 lists0: <super::super::updates::Lists<C> as columnar::Borrow>::Borrowed<'a>,
407 lists1: <super::super::updates::Lists<C> as columnar::Borrow>::Borrowed<'a>,
408 reports: &[Report],
409) -> Vec<Report> {
410 use columnar::Index;
411 let mut output = Vec::with_capacity(reports.len()); for report in reports.iter() {
413 match report {
414 Report::This(lower0, upper0) => {
415 let (new_lower, _) = lists0.bounds.bounds(*lower0);
416 let (_, new_upper) = lists0.bounds.bounds(*upper0-1);
417 output.push(Report::This(new_lower, new_upper));
418 }
419 Report::Both(index0, index1) => {
420
421 let (mut lower0, upper0) = lists0.bounds.bounds(*index0);
423 let (mut lower1, upper1) = lists1.bounds.bounds(*index1);
424
425 while lower0 < upper0 && lower1 < upper1 {
427 let val0 = lists0.values.get(lower0);
428 let val1 = lists1.values.get(lower1);
429 match val0.cmp(&val1) {
430 std::cmp::Ordering::Less => {
431 let start = lower0;
432 lower0 += 1;
433 gallop(lists0.values, &mut lower0, upper0, |x| x < val1);
434 output.push(Report::This(start, lower0));
435 },
436 std::cmp::Ordering::Equal => {
437 output.push(Report::Both(lower0, lower1));
438 lower0 += 1;
439 lower1 += 1;
440 },
441 std::cmp::Ordering::Greater => {
442 let start = lower1;
443 lower1 += 1;
444 gallop(lists1.values, &mut lower1, upper1, |x| x < val0);
445 output.push(Report::That(start, lower1));
446 },
447 }
448 }
449 if lower0 < upper0 { output.push(Report::This(lower0, upper0)); }
450 if lower1 < upper1 { output.push(Report::That(lower1, upper1)); }
451
452 }
453 Report::That(lower1, upper1) => {
454 let (new_lower, _) = lists1.bounds.bounds(*lower1);
455 let (_, new_upper) = lists1.bounds.bounds(*upper1-1);
456 output.push(Report::That(new_lower, new_upper));
457 }
458 }
459 }
460
461 output
462}
463
464#[inline(never)]
475pub fn write_layer<'a, C: columnar::Container<Ref<'a>: Ord>>(
476 lists0: <super::super::updates::Lists<C> as columnar::Borrow>::Borrowed<'a>,
477 lists1: <super::super::updates::Lists<C> as columnar::Borrow>::Borrowed<'a>,
478 list_survey: &[Report],
479 item_survey: &[Report],
480 output: &mut super::super::updates::Lists<C>,
481) {
482 use columnar::{Container, Index};
483
484 let mut item_idx = 0;
485
486 for (pos, list_report) in list_survey.iter().enumerate() {
487 let is_first = pos == 0;
488 let is_last = pos == list_survey.len() - 1;
489 let may_be_pruned = is_first || is_last;
490
491 match list_report {
492 Report::This(lo, hi) => {
493 let Report::This(item_lo, item_hi) = item_survey[item_idx] else { unreachable!("Expected This in item survey for This list") };
494 item_idx += 1;
495 if may_be_pruned {
496 let base = output.values.len();
500 output.values.extend_from_self(lists0.values, item_lo..item_hi);
501 for i in *lo..*hi {
502 let (_, nat_hi) = lists0.bounds.bounds(i);
503 output.bounds.push((base + nat_hi.min(item_hi) - item_lo) as u64);
504 }
505 } else {
506 output.extend_from_self(lists0, *lo..*hi);
507 }
508 }
509 Report::That(lo, hi) => {
510 let Report::That(item_lo, item_hi) = item_survey[item_idx] else { unreachable!("Expected That in item survey for That list") };
511 item_idx += 1;
512 if may_be_pruned {
513 let base = output.values.len();
514 output.values.extend_from_self(lists1.values, item_lo..item_hi);
515 for i in *lo..*hi {
516 let (_, nat_hi) = lists1.bounds.bounds(i);
517 output.bounds.push((base + nat_hi.min(item_hi) - item_lo) as u64);
518 }
519 } else {
520 output.extend_from_self(lists1, *lo..*hi);
521 }
522 }
523 Report::Both(i0, i1) => {
524 let (mut c0, end0) = lists0.bounds.bounds(*i0);
526 let (mut c1, end1) = lists1.bounds.bounds(*i1);
527 while (c0 < end0 || c1 < end1) && item_idx < item_survey.len() {
528 match item_survey[item_idx] {
529 Report::This(lo, hi) => {
530 if lo >= end0 { break; }
531 output.values.extend_from_self(lists0.values, lo..hi);
532 c0 = hi;
533 }
534 Report::That(lo, hi) => {
535 if lo >= end1 { break; }
536 output.values.extend_from_self(lists1.values, lo..hi);
537 c1 = hi;
538 }
539 Report::Both(v0, v1) => {
540 if v0 >= end0 && v1 >= end1 { break; }
541 output.values.push(lists0.values.get(v0));
542 c0 = v0 + 1;
543 c1 = v1 + 1;
544 }
545 }
546 item_idx += 1;
547 }
548 output.bounds.push(output.values.len() as u64);
549 }
550 }
551 }
552}
553
554#[inline(never)]
564pub fn write_diffs<U: super::super::layout::ColumnarUpdate>(
565 diffs0: <super::super::updates::Lists<columnar::ContainerOf<U::Diff>> as columnar::Borrow>::Borrowed<'_>,
566 diffs1: <super::super::updates::Lists<columnar::ContainerOf<U::Diff>> as columnar::Borrow>::Borrowed<'_>,
567 time_survey: &[Report],
568 output: &mut super::super::updates::Lists<columnar::ContainerOf<U::Diff>>,
569) {
570 use columnar::{Columnar, Container, Index, Len, Push};
571 use crate::difference::{Semigroup, IsZero};
572
573 for report in time_survey.iter() {
574 match report {
575 Report::This(lo, hi) => { output.extend_from_self(diffs0, *lo..*hi); }
576 Report::That(lo, hi) => { output.extend_from_self(diffs1, *lo..*hi); }
577 Report::Both(t0, t1) => {
578 let (d0_lo, d0_hi) = diffs0.bounds.bounds(*t0);
580 let (d1_lo, d1_hi) = diffs1.bounds.bounds(*t1);
581 assert_eq!(d0_hi - d0_lo, 1, "Expected singleton diff list at t0={t0}");
582 assert_eq!(d1_hi - d1_lo, 1, "Expected singleton diff list at t1={t1}");
583 let mut diff: U::Diff = Columnar::into_owned(diffs0.values.get(d0_lo));
584 diff.plus_equals(&Columnar::into_owned(diffs1.values.get(d1_lo)));
585 if !diff.is_zero() { output.values.push(&diff); }
586 output.bounds.push(output.values.len() as u64);
587 }
588 }
589 }
590}
591
592#[inline(always)]
597pub(crate) fn gallop<C: columnar::Index>(input: C, lower: &mut usize, upper: usize, mut cmp: impl FnMut(<C as columnar::Index>::Ref) -> bool) {
598 if *lower < upper && cmp(input.get(*lower)) {
600 let mut step = 1;
601 while *lower + step < upper && cmp(input.get(*lower + step)) {
602 *lower += step;
603 step <<= 1;
604 }
605
606 step >>= 1;
607 while step > 0 {
608 if *lower + step < upper && cmp(input.get(*lower + step)) {
609 *lower += step;
610 }
611 step >>= 1;
612 }
613
614 *lower += 1;
615 }
616}
617
618#[derive(Copy, Clone, columnar::Columnar, Debug)]
624pub enum Report {
625 This(usize, usize),
627 That(usize, usize),
629 Both(usize, usize),
631}
632
633pub struct ChainBuilder<U: super::super::layout::ColumnarUpdate, F: FnMut(UpdatesTyped<U>)> {
639 last: Option<UpdatesTyped<U>>,
640 sink: F,
641}
642
643impl<U: super::super::layout::ColumnarUpdate, F: FnMut(UpdatesTyped<U>)> ChainBuilder<U, F>
644where
645 U::Time: 'static,
646{
647 fn new(sink: F) -> Self { Self { last: None, sink } }
648
649 fn push(&mut self, mut link: UpdatesTyped<U>) {
650 link = link.filter_zero();
651 if link.len() == 0 { return; }
652 if link.len() > 2 * crate::columnar::LINK_TARGET {
656 let (first, rest) = split_at::<U>(link, crate::columnar::LINK_TARGET);
657 self.push(first);
658 self.push(rest);
659 return;
660 }
661 match self.last.as_mut() {
662 Some(last) if last.len() + link.len() < 2 * crate::columnar::LINK_TARGET => {
663 let mut build = super::super::updates::UpdatesBuilder::new_from(std::mem::take(last));
664 build.meld(&link);
665 *last = build.done();
666 }
667 _ => {
668 if let Some(prev) = self.last.take() {
669 (self.sink)(prev);
670 }
671 self.last = Some(link);
672 }
673 }
674 }
675 fn extend(&mut self, iter: impl IntoIterator<Item=UpdatesTyped<U>>) {
676 for link in iter { self.push(link); }
677 }
678 fn done(mut self) {
679 if let Some(last) = self.last.take() {
680 (self.sink)(last);
681 }
682 }
683}
684
685fn split_at<U: Update>(chunk: UpdatesTyped<U>, n: usize) -> (UpdatesTyped<U>, UpdatesTyped<U>)
689where
690 U::Time: 'static,
691{
692 use columnar::{Container, ContainerOf, Index, Push};
693 use columnar::primitive::offsets::Strides;
694 use crate::columnar::updates::{Lists, retain_items};
695
696 let total = chunk.len();
697 if n == 0 { return (UpdatesTyped::default(), chunk); }
698 if n >= total { return (chunk, UpdatesTyped::default()); }
699
700 let view = chunk.view();
701 let mut bitmap: Vec<bool> = (0..total).map(|i| i < n).collect();
702
703 let (times, temp) = retain_items::<ContainerOf<U::Time>>(view.times, &bitmap[..]);
705 let (vals, temp) = retain_items::<ContainerOf<U::Val>>(view.vals, &temp[..]);
706 let (keys, _temp) = retain_items::<ContainerOf<U::Key>>(view.keys, &temp[..]);
707 let d_borrow = view.diffs;
708 let mut diffs = <Lists::<ContainerOf<U::Diff>> as Container>::with_capacity_for([d_borrow].into_iter());
709 for (i, &bit) in bitmap.iter().enumerate() {
710 if bit { diffs.values.push(d_borrow.values.get(i)); }
711 }
712 diffs.bounds = Strides::new(1, times.values.len() as u64);
713 let first = UpdatesTyped { keys, vals, times, diffs };
714
715 for bit in bitmap.iter_mut() { *bit = !*bit; }
717 let (times, temp) = retain_items::<ContainerOf<U::Time>>(view.times, &bitmap[..]);
718 let (vals, temp) = retain_items::<ContainerOf<U::Val>>(view.vals, &temp[..]);
719 let (keys, _temp) = retain_items::<ContainerOf<U::Key>>(view.keys, &temp[..]);
720 let mut diffs = <Lists::<ContainerOf<U::Diff>> as Container>::with_capacity_for([d_borrow].into_iter());
721 for (i, &bit) in bitmap.iter().enumerate() {
722 if bit { diffs.values.push(d_borrow.values.get(i)); }
723 }
724 diffs.bounds = Strides::new(1, times.values.len() as u64);
725 let second = UpdatesTyped { keys, vals, times, diffs };
726
727 (first, second)
728}