1use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::merge_batcher::{ColMerger, MergeBatcher, VecMerger};
16use crate::trace::implementations::spine_fueled::Spine;
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Layout, TStack, Vector};
20
21pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
22pub use self::val_batch::{OrdValBatch, OrdValBuilder};
23
24pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K, V), T, R)>>>>;
26pub type OrdValBatcher<K, V, T, R> =
28 MergeBatcher<Vec<((K, V), T, R)>, VecChunker<((K, V), T, R)>, VecMerger<(K, V), T, R>>;
29pub type RcOrdValBuilder<K, V, T, R> =
31 RcBuilder<OrdValBuilder<Vector<((K, V), T, R)>, Vec<((K, V), T, R)>>>;
32
33pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K, V), T, R)>>>>;
38pub type ColValBatcher<K, V, T, R> =
40 MergeBatcher<Vec<((K, V), T, R)>, ColumnationChunker<((K, V), T, R)>, ColMerger<(K, V), T, R>>;
41pub type ColValBuilder<K, V, T, R> =
43 RcBuilder<OrdValBuilder<TStack<((K, V), T, R)>, TimelyStack<((K, V), T, R)>>>;
44
45pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K, ()), T, R)>>>>;
47pub type OrdKeyBatcher<K, T, R> =
49 MergeBatcher<Vec<((K, ()), T, R)>, VecChunker<((K, ()), T, R)>, VecMerger<(K, ()), T, R>>;
50pub type RcOrdKeyBuilder<K, T, R> =
52 RcBuilder<OrdKeyBuilder<Vector<((K, ()), T, R)>, Vec<((K, ()), T, R)>>>;
53
54pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K, ()), T, R)>>>>;
59pub type ColKeyBatcher<K, T, R> = MergeBatcher<
61 Vec<((K, ()), T, R)>,
62 ColumnationChunker<((K, ()), T, R)>,
63 ColMerger<(K, ()), T, R>,
64>;
65pub type ColKeyBuilder<K, T, R> =
67 RcBuilder<OrdKeyBuilder<TStack<((K, ()), T, R)>, TimelyStack<((K, ()), T, R)>>>;
68
69pub use layers::{Upds, Vals};
73pub mod layers {
82
83 use crate::trace::implementations::BatchContainer;
84 use serde::{Deserialize, Serialize};
85
86 #[derive(Debug, Serialize, Deserialize)]
88 pub struct Vals<O, V> {
89 pub offs: O,
93 pub vals: V,
95 }
96
97 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Default for Vals<O, V> {
98 fn default() -> Self {
99 Self::with_capacity(0, 0)
100 }
101 }
102
103 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
104 pub fn bounds(&self, index: usize) -> (usize, usize) {
106 (self.offs.index(index), self.offs.index(index + 1))
107 }
108 pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> V::ReadItem<'_> {
115 self.get_abs(self.bounds(list_idx).0 + item_idx)
116 }
117
118 pub fn len(&self) -> usize {
120 self.offs.len() - 1
121 }
122 pub fn get_abs(&self, index: usize) -> V::ReadItem<'_> {
124 self.vals.index(index)
125 }
126 pub fn with_capacity(o_size: usize, v_size: usize) -> Self {
128 let mut offs = <O as BatchContainer>::with_capacity(o_size);
129 offs.push_ref(0);
130 Self {
131 offs,
132 vals: <V as BatchContainer>::with_capacity(v_size),
133 }
134 }
135 pub fn merge_capacity(this: &Self, that: &Self) -> Self {
137 let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
138 offs.push_ref(0);
139 Self {
140 offs,
141 vals: <V as BatchContainer>::merge_capacity(&this.vals, &that.vals),
142 }
143 }
144 }
145
146 #[derive(Debug, Serialize, Deserialize)]
151 pub struct Upds<O, T, D> {
152 pub offs: O,
154 pub times: T,
156 pub diffs: D,
158 }
159
160 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer>
161 Default for Upds<O, T, D>
162 {
163 fn default() -> Self {
164 Self::with_capacity(0, 0)
165 }
166 }
167 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer>
168 Upds<O, T, D>
169 {
170 pub fn bounds(&self, index: usize) -> (usize, usize) {
172 let mut lower = self.offs.index(index);
173 let upper = self.offs.index(index + 1);
174 if lower == upper {
177 assert!(lower > 0);
178 lower -= 1;
179 }
180 (lower, upper)
181 }
182 pub fn get_rel(
189 &self,
190 list_idx: usize,
191 item_idx: usize,
192 ) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
193 self.get_abs(self.bounds(list_idx).0 + item_idx)
194 }
195
196 pub fn len(&self) -> usize {
198 self.offs.len() - 1
199 }
200 pub fn get_abs(&self, index: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
202 (self.times.index(index), self.diffs.index(index))
203 }
204 pub fn with_capacity(o_size: usize, u_size: usize) -> Self {
206 let mut offs = <O as BatchContainer>::with_capacity(o_size);
207 offs.push_ref(0);
208 Self {
209 offs,
210 times: <T as BatchContainer>::with_capacity(u_size),
211 diffs: <D as BatchContainer>::with_capacity(u_size),
212 }
213 }
214 pub fn merge_capacity(this: &Self, that: &Self) -> Self {
216 let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
217 offs.push_ref(0);
218 Self {
219 offs,
220 times: <T as BatchContainer>::merge_capacity(&this.times, &that.times),
221 diffs: <D as BatchContainer>::merge_capacity(&this.diffs, &that.diffs),
222 }
223 }
224 }
225
226 pub struct UpdsBuilder<T: BatchContainer, D: BatchContainer> {
228 stash: Vec<(T::Owned, D::Owned)>,
233 total: usize,
237
238 time_con: T,
240 diff_con: D,
242 }
243
244 impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
245 fn default() -> Self {
246 Self {
247 stash: Vec::default(),
248 total: 0,
249 time_con: BatchContainer::with_capacity(1),
250 diff_con: BatchContainer::with_capacity(1),
251 }
252 }
253 }
254
255 impl<T, D> UpdsBuilder<T, D>
256 where
257 T: BatchContainer<Owned: Ord>,
258 D: BatchContainer<Owned: crate::difference::Semigroup>,
259 {
260 pub fn push(&mut self, time: T::Owned, diff: D::Owned) {
262 self.stash.push((time, diff));
263 }
264
265 pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(
269 &mut self,
270 upds: &mut Upds<O, T, D>,
271 ) -> bool {
272 use crate::consolidation;
273 consolidation::consolidate(&mut self.stash);
274 if self.stash.is_empty() {
276 return false;
277 }
278 if self.stash.len() == 1 {
280 let (time, diff) = self.stash.last().unwrap();
281 self.time_con.clear();
282 self.time_con.push_own(time);
283 self.diff_con.clear();
284 self.diff_con.push_own(diff);
285 if upds.times.last() == self.time_con.get(0)
286 && upds.diffs.last() == self.diff_con.get(0)
287 {
288 self.total += 1;
289 self.stash.clear();
290 upds.offs.push_ref(upds.times.len());
291 return true;
292 }
293 }
294 self.total += self.stash.len();
296 for (time, diff) in self.stash.drain(..) {
297 upds.times.push_own(&time);
298 upds.diffs.push_own(&diff);
299 }
300 upds.offs.push_ref(upds.times.len());
301 true
302 }
303
304 pub fn total(&self) -> usize {
306 self.total
307 }
308 }
309}
310
311pub mod val_batch {
313
314 use serde::{Deserialize, Serialize};
315 use std::marker::PhantomData;
316 use timely::container::PushInto;
317 use timely::progress::{frontier::AntichainRef, Antichain};
318
319 use crate::trace::implementations::layout;
320 use crate::trace::implementations::{BatchContainer, BuilderInput};
321 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
322
323 use super::{layers::UpdsBuilder, Layout, Upds, Vals};
324
325 #[derive(Debug, Serialize, Deserialize)]
327 #[serde(bound = "
328 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
329 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
330 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
331 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
332 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
333 ")]
334 pub struct OrdValStorage<L: Layout> {
335 pub keys: L::KeyContainer,
337 pub vals: Vals<L::OffsetContainer, L::ValContainer>,
339 pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
341 }
342
343 #[derive(Serialize, Deserialize)]
348 #[serde(bound = "
349 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
350 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
351 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
352 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
353 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
354 ")]
355 pub struct OrdValBatch<L: Layout> {
356 pub storage: OrdValStorage<L>,
358 pub description: Description<layout::Time<L>>,
360 pub updates: usize,
366 }
367
368 impl<L: Layout> WithLayout for OrdValBatch<L> {
369 type Layout = L;
370 }
371
372 impl<L: Layout> BatchReader for OrdValBatch<L> {
373 type Cursor = OrdValCursor<L>;
374 fn cursor(&self) -> Self::Cursor {
375 OrdValCursor {
376 key_cursor: 0,
377 val_cursor: 0,
378 phantom: PhantomData,
379 }
380 }
381 fn len(&self) -> usize {
382 self.updates
385 }
386 fn description(&self) -> &Description<layout::Time<L>> {
387 &self.description
388 }
389 }
390
391 impl<L: Layout> Batch for OrdValBatch<L> {
392 type Merger = OrdValMerger<L>;
393
394 fn begin_merge(
395 &self,
396 other: &Self,
397 compaction_frontier: AntichainRef<layout::Time<L>>,
398 ) -> Self::Merger {
399 OrdValMerger::new(self, other, compaction_frontier)
400 }
401
402 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
403 use timely::progress::Timestamp;
404 Self {
405 storage: OrdValStorage {
406 keys: L::KeyContainer::with_capacity(0),
407 vals: Default::default(),
408 upds: Default::default(),
409 },
410 description: Description::new(
411 lower,
412 upper,
413 Antichain::from_elem(Self::Time::minimum()),
414 ),
415 updates: 0,
416 }
417 }
418 }
419
420 pub struct OrdValMerger<L: Layout> {
422 key_cursor1: usize,
424 key_cursor2: usize,
426 result: OrdValStorage<L>,
428 description: Description<layout::Time<L>>,
430 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
432 }
433
434 impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
435 where
436 OrdValBatch<L>: Batch<Time = layout::Time<L>>,
437 {
438 fn new(
439 batch1: &OrdValBatch<L>,
440 batch2: &OrdValBatch<L>,
441 compaction_frontier: AntichainRef<layout::Time<L>>,
442 ) -> Self {
443 assert!(batch1.upper() == batch2.lower());
444 use crate::lattice::Lattice;
445 let mut since = batch1
446 .description()
447 .since()
448 .join(batch2.description().since());
449 since = since.join(&compaction_frontier.to_owned());
450
451 let description =
452 Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
453
454 let batch1 = &batch1.storage;
455 let batch2 = &batch2.storage;
456
457 OrdValMerger {
458 key_cursor1: 0,
459 key_cursor2: 0,
460 result: OrdValStorage {
461 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
462 vals: Vals::merge_capacity(&batch1.vals, &batch2.vals),
463 upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
464 },
465 description,
466 staging: UpdsBuilder::default(),
467 }
468 }
469 fn done(self) -> OrdValBatch<L> {
470 OrdValBatch {
471 updates: self.staging.total(),
472 storage: self.result,
473 description: self.description,
474 }
475 }
476 fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
477 let starting_updates = self.staging.total();
479 let mut effort = 0isize;
480
481 while self.key_cursor1 < source1.storage.keys.len()
483 && self.key_cursor2 < source2.storage.keys.len()
484 && effort < *fuel
485 {
486 self.merge_key(&source1.storage, &source2.storage);
487 effort = (self.staging.total() - starting_updates) as isize;
489 }
490
491 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
494 self.copy_key(&source1.storage, self.key_cursor1);
495 self.key_cursor1 += 1;
496 effort = (self.staging.total() - starting_updates) as isize;
497 }
498 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
499 self.copy_key(&source2.storage, self.key_cursor2);
500 self.key_cursor2 += 1;
501 effort = (self.staging.total() - starting_updates) as isize;
502 }
503
504 *fuel -= effort;
505 }
506 }
507
508 impl<L: Layout> OrdValMerger<L> {
510 fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
518 let init_vals = self.result.vals.vals.len();
520 let (mut lower, upper) = source.vals.bounds(cursor);
521 while lower < upper {
522 self.stash_updates_for_val(source, lower);
523 if self.staging.seal(&mut self.result.upds) {
524 self.result.vals.vals.push_ref(source.vals.get_abs(lower));
525 }
526 lower += 1;
527 }
528
529 if self.result.vals.vals.len() > init_vals {
531 self.result.keys.push_ref(source.keys.index(cursor));
532 self.result.vals.offs.push_ref(self.result.vals.vals.len());
533 }
534 }
535 fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
540 use ::std::cmp::Ordering;
541 match source1
542 .keys
543 .index(self.key_cursor1)
544 .cmp(&source2.keys.index(self.key_cursor2))
545 {
546 Ordering::Less => {
547 self.copy_key(source1, self.key_cursor1);
548 self.key_cursor1 += 1;
549 }
550 Ordering::Equal => {
551 let (lower1, upper1) = source1.vals.bounds(self.key_cursor1);
553 let (lower2, upper2) = source2.vals.bounds(self.key_cursor2);
554 if let Some(off) =
555 self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2))
556 {
557 self.result
558 .keys
559 .push_ref(source1.keys.index(self.key_cursor1));
560 self.result.vals.offs.push_ref(off);
561 }
562 self.key_cursor1 += 1;
564 self.key_cursor2 += 1;
565 }
566 Ordering::Greater => {
567 self.copy_key(source2, self.key_cursor2);
568 self.key_cursor2 += 1;
569 }
570 }
571 }
572 fn merge_vals(
577 &mut self,
578 (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
579 (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
580 ) -> Option<usize> {
581 let init_vals = self.result.vals.vals.len();
583 while lower1 < upper1 && lower2 < upper2 {
584 use ::std::cmp::Ordering;
588 match source1
589 .vals
590 .get_abs(lower1)
591 .cmp(&source2.vals.get_abs(lower2))
592 {
593 Ordering::Less => {
594 self.stash_updates_for_val(source1, lower1);
596 if self.staging.seal(&mut self.result.upds) {
597 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
598 }
599 lower1 += 1;
600 }
601 Ordering::Equal => {
602 self.stash_updates_for_val(source1, lower1);
603 self.stash_updates_for_val(source2, lower2);
604 if self.staging.seal(&mut self.result.upds) {
605 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
606 }
607 lower1 += 1;
608 lower2 += 1;
609 }
610 Ordering::Greater => {
611 self.stash_updates_for_val(source2, lower2);
613 if self.staging.seal(&mut self.result.upds) {
614 self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
615 }
616 lower2 += 1;
617 }
618 }
619 }
620 while lower1 < upper1 {
622 self.stash_updates_for_val(source1, lower1);
623 if self.staging.seal(&mut self.result.upds) {
624 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
625 }
626 lower1 += 1;
627 }
628 while lower2 < upper2 {
629 self.stash_updates_for_val(source2, lower2);
630 if self.staging.seal(&mut self.result.upds) {
631 self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
632 }
633 lower2 += 1;
634 }
635
636 if self.result.vals.vals.len() > init_vals {
638 Some(self.result.vals.vals.len())
639 } else {
640 None
641 }
642 }
643
644 fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
646 let (lower, upper) = source.upds.bounds(index);
647 for i in lower..upper {
648 let (time, diff) = source.upds.get_abs(i);
650 use crate::lattice::Lattice;
651 let mut new_time: layout::Time<L> = L::TimeContainer::into_owned(time);
652 new_time.advance_by(self.description.since().borrow());
653 self.staging
654 .push(new_time, L::DiffContainer::into_owned(diff));
655 }
656 }
657 }
658
659 pub struct OrdValCursor<L: Layout> {
661 key_cursor: usize,
663 val_cursor: usize,
665 phantom: PhantomData<L>,
667 }
668
669 use crate::trace::implementations::WithLayout;
670 impl<L: Layout> WithLayout for OrdValCursor<L> {
671 type Layout = L;
672 }
673
674 impl<L: Layout> Cursor for OrdValCursor<L> {
675 type Storage = OrdValBatch<L>;
676
677 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
678 storage.storage.keys.get(self.key_cursor)
679 }
680 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
681 if self.val_valid(storage) {
682 Some(self.val(storage))
683 } else {
684 None
685 }
686 }
687
688 fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> {
689 storage.storage.keys.index(self.key_cursor)
690 }
691 fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> {
692 storage.storage.vals.get_abs(self.val_cursor)
693 }
694 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
695 &mut self,
696 storage: &OrdValBatch<L>,
697 mut logic: L2,
698 ) {
699 let (lower, upper) = storage.storage.upds.bounds(self.val_cursor);
700 for index in lower..upper {
701 let (time, diff) = storage.storage.upds.get_abs(index);
702 logic(time, diff);
703 }
704 }
705 fn key_valid(&self, storage: &OrdValBatch<L>) -> bool {
706 self.key_cursor < storage.storage.keys.len()
707 }
708 fn val_valid(&self, storage: &OrdValBatch<L>) -> bool {
709 self.val_cursor < storage.storage.vals.bounds(self.key_cursor).1
710 }
711 fn step_key(&mut self, storage: &OrdValBatch<L>) {
712 self.key_cursor += 1;
713 if self.key_valid(storage) {
714 self.rewind_vals(storage);
715 } else {
716 self.key_cursor = storage.storage.keys.len();
717 }
718 }
719 fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
720 self.key_cursor +=
721 storage
722 .storage
723 .keys
724 .advance(self.key_cursor, storage.storage.keys.len(), |x| {
725 <L::KeyContainer as BatchContainer>::reborrow(x)
726 .lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
727 });
728 if self.key_valid(storage) {
729 self.rewind_vals(storage);
730 }
731 }
732 fn step_val(&mut self, storage: &OrdValBatch<L>) {
733 self.val_cursor += 1;
734 if !self.val_valid(storage) {
735 self.val_cursor = storage.storage.vals.bounds(self.key_cursor).1;
736 }
737 }
738 fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
739 self.val_cursor += storage.storage.vals.vals.advance(
740 self.val_cursor,
741 storage.storage.vals.bounds(self.key_cursor).1,
742 |x| {
743 <L::ValContainer as BatchContainer>::reborrow(x)
744 .lt(&<L::ValContainer as BatchContainer>::reborrow(val))
745 },
746 );
747 }
748 fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
749 self.key_cursor = 0;
750 if self.key_valid(storage) {
751 self.rewind_vals(storage)
752 }
753 }
754 fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
755 self.val_cursor = storage.storage.vals.bounds(self.key_cursor).0;
756 }
757 }
758
759 pub struct OrdValBuilder<L: Layout, CI> {
761 pub result: OrdValStorage<L>,
765 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
766 _marker: PhantomData<CI>,
767 }
768
769 impl<L, CI> Builder for OrdValBuilder<L, CI>
770 where
771 L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>, ValContainer: PushInto<CI::Val<'a>>>,
772 CI: for<'a> BuilderInput<
773 L::KeyContainer,
774 L::ValContainer,
775 Time = layout::Time<L>,
776 Diff = layout::Diff<L>,
777 >,
778 {
779 type Input = CI;
780 type Time = layout::Time<L>;
781 type Output = OrdValBatch<L>;
782
783 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
784 Self {
785 result: OrdValStorage {
786 keys: L::KeyContainer::with_capacity(keys),
787 vals: Vals::with_capacity(keys + 1, vals),
788 upds: Upds::with_capacity(vals + 1, upds),
789 },
790 staging: UpdsBuilder::default(),
791 _marker: PhantomData,
792 }
793 }
794
795 #[inline]
796 fn push(&mut self, chunk: &mut Self::Input) {
797 for item in chunk.drain() {
798 let (key, val, time, diff) = CI::into_parts(item);
799
800 if self.result.keys.is_empty() {
802 self.result.vals.vals.push_into(val);
803 self.result.keys.push_into(key);
804 self.staging.push(time, diff);
805 }
806 else if self
808 .result
809 .keys
810 .last()
811 .map(|k| CI::key_eq(&key, k))
812 .unwrap_or(false)
813 {
814 if self
816 .result
817 .vals
818 .vals
819 .last()
820 .map(|v| CI::val_eq(&val, v))
821 .unwrap_or(false)
822 {
823 self.staging.push(time, diff);
824 } else {
825 self.staging.seal(&mut self.result.upds);
827 self.staging.push(time, diff);
828 self.result.vals.vals.push_into(val);
829 }
830 } else {
831 self.staging.seal(&mut self.result.upds);
833 self.staging.push(time, diff);
834 self.result.vals.offs.push_ref(self.result.vals.vals.len());
835 self.result.vals.vals.push_into(val);
836 self.result.keys.push_into(key);
837 }
838 }
839 }
840
841 #[inline(never)]
842 fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
843 self.staging.seal(&mut self.result.upds);
844 self.result.vals.offs.push_ref(self.result.vals.vals.len());
845 OrdValBatch {
846 updates: self.staging.total(),
847 storage: self.result,
848 description,
849 }
850 }
851
852 fn seal(
853 chain: &mut Vec<Self::Input>,
854 description: Description<Self::Time>,
855 ) -> Self::Output {
856 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
857 let mut builder = Self::with_capacity(keys, vals, upds);
858 for mut chunk in chain.drain(..) {
859 builder.push(&mut chunk);
860 }
861
862 builder.done(description)
863 }
864 }
865}
866
867pub mod key_batch {
869
870 use serde::{Deserialize, Serialize};
871 use std::marker::PhantomData;
872 use timely::container::PushInto;
873 use timely::progress::{frontier::AntichainRef, Antichain};
874
875 use crate::trace::implementations::layout;
876 use crate::trace::implementations::{BatchContainer, BuilderInput};
877 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
878
879 use super::{layers::UpdsBuilder, Layout, Upds};
880
881 #[derive(Debug, Serialize, Deserialize)]
883 #[serde(bound = "
884 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
885 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
886 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
887 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
888 ")]
889 pub struct OrdKeyStorage<L: Layout> {
890 pub keys: L::KeyContainer,
892 pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
894 }
895
896 #[derive(Serialize, Deserialize)]
901 #[serde(bound = "
902 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
903 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
904 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
905 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
906 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
907 ")]
908 pub struct OrdKeyBatch<L: Layout> {
909 pub storage: OrdKeyStorage<L>,
911 pub description: Description<layout::Time<L>>,
913 pub updates: usize,
919
920 pub value: L::ValContainer,
922 }
923
924 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> OrdKeyBatch<L> {
925 pub fn create_value() -> L::ValContainer {
927 let mut value = L::ValContainer::with_capacity(1);
928 value.push_own(&Default::default());
929 value
930 }
931 }
932
933 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> WithLayout for OrdKeyBatch<L> {
934 type Layout = L;
935 }
936
937 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> BatchReader for OrdKeyBatch<L> {
938 type Cursor = OrdKeyCursor<L>;
939 fn cursor(&self) -> Self::Cursor {
940 OrdKeyCursor {
941 key_cursor: 0,
942 val_stepped: false,
943 phantom: std::marker::PhantomData,
944 }
945 }
946 fn len(&self) -> usize {
947 self.updates
950 }
951 fn description(&self) -> &Description<layout::Time<L>> {
952 &self.description
953 }
954 }
955
956 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Batch for OrdKeyBatch<L> {
957 type Merger = OrdKeyMerger<L>;
958
959 fn begin_merge(
960 &self,
961 other: &Self,
962 compaction_frontier: AntichainRef<layout::Time<L>>,
963 ) -> Self::Merger {
964 OrdKeyMerger::new(self, other, compaction_frontier)
965 }
966
967 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
968 use timely::progress::Timestamp;
969 Self {
970 storage: OrdKeyStorage {
971 keys: L::KeyContainer::with_capacity(0),
972 upds: Upds::default(),
973 },
974 description: Description::new(
975 lower,
976 upper,
977 Antichain::from_elem(Self::Time::minimum()),
978 ),
979 updates: 0,
980 value: Self::create_value(),
981 }
982 }
983 }
984
985 pub struct OrdKeyMerger<L: Layout> {
987 key_cursor1: usize,
989 key_cursor2: usize,
991 result: OrdKeyStorage<L>,
993 description: Description<layout::Time<L>>,
995
996 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
998 }
999
1000 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Merger<OrdKeyBatch<L>>
1001 for OrdKeyMerger<L>
1002 where
1003 OrdKeyBatch<L>: Batch<Time = layout::Time<L>>,
1004 {
1005 fn new(
1006 batch1: &OrdKeyBatch<L>,
1007 batch2: &OrdKeyBatch<L>,
1008 compaction_frontier: AntichainRef<layout::Time<L>>,
1009 ) -> Self {
1010 assert!(batch1.upper() == batch2.lower());
1011 use crate::lattice::Lattice;
1012 let mut since = batch1
1013 .description()
1014 .since()
1015 .join(batch2.description().since());
1016 since = since.join(&compaction_frontier.to_owned());
1017
1018 let description =
1019 Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
1020
1021 let batch1 = &batch1.storage;
1022 let batch2 = &batch2.storage;
1023
1024 OrdKeyMerger {
1025 key_cursor1: 0,
1026 key_cursor2: 0,
1027 result: OrdKeyStorage {
1028 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
1029 upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
1030 },
1031 description,
1032 staging: UpdsBuilder::default(),
1033 }
1034 }
1035 fn done(self) -> OrdKeyBatch<L> {
1036 OrdKeyBatch {
1037 updates: self.staging.total(),
1038 storage: self.result,
1039 description: self.description,
1040 value: OrdKeyBatch::<L>::create_value(),
1041 }
1042 }
1043 fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
1044 let starting_updates = self.staging.total();
1046 let mut effort = 0isize;
1047
1048 while self.key_cursor1 < source1.storage.keys.len()
1050 && self.key_cursor2 < source2.storage.keys.len()
1051 && effort < *fuel
1052 {
1053 self.merge_key(&source1.storage, &source2.storage);
1054 effort = (self.staging.total() - starting_updates) as isize;
1056 }
1057
1058 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
1061 self.copy_key(&source1.storage, self.key_cursor1);
1062 self.key_cursor1 += 1;
1063 effort = (self.staging.total() - starting_updates) as isize;
1064 }
1065 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
1066 self.copy_key(&source2.storage, self.key_cursor2);
1067 self.key_cursor2 += 1;
1068 effort = (self.staging.total() - starting_updates) as isize;
1069 }
1070
1071 *fuel -= effort;
1072 }
1073 }
1074
1075 impl<L: Layout> OrdKeyMerger<L> {
1077 fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
1085 self.stash_updates_for_key(source, cursor);
1086 if self.staging.seal(&mut self.result.upds) {
1087 self.result.keys.push_ref(source.keys.index(cursor));
1088 }
1089 }
1090 fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
1095 use ::std::cmp::Ordering;
1096 match source1
1097 .keys
1098 .index(self.key_cursor1)
1099 .cmp(&source2.keys.index(self.key_cursor2))
1100 {
1101 Ordering::Less => {
1102 self.copy_key(source1, self.key_cursor1);
1103 self.key_cursor1 += 1;
1104 }
1105 Ordering::Equal => {
1106 self.stash_updates_for_key(source1, self.key_cursor1);
1108 self.stash_updates_for_key(source2, self.key_cursor2);
1109 if self.staging.seal(&mut self.result.upds) {
1110 self.result
1111 .keys
1112 .push_ref(source1.keys.index(self.key_cursor1));
1113 }
1114 self.key_cursor1 += 1;
1116 self.key_cursor2 += 1;
1117 }
1118 Ordering::Greater => {
1119 self.copy_key(source2, self.key_cursor2);
1120 self.key_cursor2 += 1;
1121 }
1122 }
1123 }
1124
1125 fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
1127 let (lower, upper) = source.upds.bounds(index);
1128 for i in lower..upper {
1129 let (time, diff) = source.upds.get_abs(i);
1131 use crate::lattice::Lattice;
1132 let mut new_time = L::TimeContainer::into_owned(time);
1133 new_time.advance_by(self.description.since().borrow());
1134 self.staging
1135 .push(new_time, L::DiffContainer::into_owned(diff));
1136 }
1137 }
1138 }
1139
1140 pub struct OrdKeyCursor<L: Layout> {
1142 key_cursor: usize,
1144 val_stepped: bool,
1146 phantom: PhantomData<L>,
1148 }
1149
1150 use crate::trace::implementations::WithLayout;
1151 impl<L: Layout<ValContainer: BatchContainer>> WithLayout for OrdKeyCursor<L> {
1152 type Layout = L;
1153 }
1154
1155 impl<L: for<'a> Layout<ValContainer: BatchContainer<Owned: Default>>> Cursor for OrdKeyCursor<L> {
1156 type Storage = OrdKeyBatch<L>;
1157
1158 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
1159 storage.storage.keys.get(self.key_cursor)
1160 }
1161 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
1162 if self.val_valid(storage) {
1163 Some(self.val(storage))
1164 } else {
1165 None
1166 }
1167 }
1168
1169 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
1170 storage.storage.keys.index(self.key_cursor)
1171 }
1172 fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
1173 storage.value.index(0)
1174 }
1175 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
1176 &mut self,
1177 storage: &Self::Storage,
1178 mut logic: L2,
1179 ) {
1180 let (lower, upper) = storage.storage.upds.bounds(self.key_cursor);
1181 for index in lower..upper {
1182 let (time, diff) = storage.storage.upds.get_abs(index);
1183 logic(time, diff);
1184 }
1185 }
1186 fn key_valid(&self, storage: &Self::Storage) -> bool {
1187 self.key_cursor < storage.storage.keys.len()
1188 }
1189 fn val_valid(&self, _storage: &Self::Storage) -> bool {
1190 !self.val_stepped
1191 }
1192 fn step_key(&mut self, storage: &Self::Storage) {
1193 self.key_cursor += 1;
1194 if self.key_valid(storage) {
1195 self.rewind_vals(storage);
1196 } else {
1197 self.key_cursor = storage.storage.keys.len();
1198 }
1199 }
1200 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1201 self.key_cursor +=
1202 storage
1203 .storage
1204 .keys
1205 .advance(self.key_cursor, storage.storage.keys.len(), |x| {
1206 <L::KeyContainer as BatchContainer>::reborrow(x)
1207 .lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
1208 });
1209 if self.key_valid(storage) {
1210 self.rewind_vals(storage);
1211 }
1212 }
1213 fn step_val(&mut self, _storage: &Self::Storage) {
1214 self.val_stepped = true;
1215 }
1216 fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) {}
1217 fn rewind_keys(&mut self, storage: &Self::Storage) {
1218 self.key_cursor = 0;
1219 if self.key_valid(storage) {
1220 self.rewind_vals(storage)
1221 }
1222 }
1223 fn rewind_vals(&mut self, _storage: &Self::Storage) {
1224 self.val_stepped = false;
1225 }
1226 }
1227
1228 pub struct OrdKeyBuilder<L: Layout, CI> {
1230 pub result: OrdKeyStorage<L>,
1234 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
1235 _marker: PhantomData<CI>,
1236 }
1237
1238 impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1239 where
1240 L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1241 L: Layout<ValContainer: BatchContainer<Owned: Default>>,
1242 CI: BuilderInput<
1243 L::KeyContainer,
1244 L::ValContainer,
1245 Time = layout::Time<L>,
1246 Diff = layout::Diff<L>,
1247 >,
1248 {
1249 type Input = CI;
1250 type Time = layout::Time<L>;
1251 type Output = OrdKeyBatch<L>;
1252
1253 fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1254 Self {
1255 result: OrdKeyStorage {
1256 keys: L::KeyContainer::with_capacity(keys),
1257 upds: Upds::with_capacity(keys + 1, upds),
1258 },
1259 staging: UpdsBuilder::default(),
1260 _marker: PhantomData,
1261 }
1262 }
1263
1264 #[inline]
1265 fn push(&mut self, chunk: &mut Self::Input) {
1266 for item in chunk.drain() {
1267 let (key, _val, time, diff) = CI::into_parts(item);
1268 if self.result.keys.is_empty() {
1269 self.result.keys.push_into(key);
1270 self.staging.push(time, diff);
1271 }
1272 else if self
1274 .result
1275 .keys
1276 .last()
1277 .map(|k| CI::key_eq(&key, k))
1278 .unwrap_or(false)
1279 {
1280 self.staging.push(time, diff);
1281 } else {
1282 self.staging.seal(&mut self.result.upds);
1283 self.staging.push(time, diff);
1284 self.result.keys.push_into(key);
1285 }
1286 }
1287 }
1288
1289 #[inline(never)]
1290 fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1291 self.staging.seal(&mut self.result.upds);
1292 OrdKeyBatch {
1293 updates: self.staging.total(),
1294 storage: self.result,
1295 description,
1296 value: OrdKeyBatch::<L>::create_value(),
1297 }
1298 }
1299
1300 fn seal(
1301 chain: &mut Vec<Self::Input>,
1302 description: Description<Self::Time>,
1303 ) -> Self::Output {
1304 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1305 let mut builder = Self::with_capacity(keys, vals, upds);
1306 for mut chunk in chain.drain(..) {
1307 builder.push(&mut chunk);
1308 }
1309
1310 builder.done(description)
1311 }
1312 }
1313}