1use std::rc::Rc;
12
13use crate::trace::implementations::chunker::ContainerChunker;
14use crate::trace::implementations::spine_fueled::Spine;
15use crate::trace::implementations::merge_batcher::MergeBatcher;
16use crate::trace::implementations::merge_batcher::container::VecInternalMerger;
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Layout, Vector};
20
21pub use self::val_batch::{OrdValBatch, OrdValBuilder};
22pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
23
24pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
26pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ContainerChunker<Vec<((K,V),T,R)>>, VecInternalMerger<(K, V), T, R>>;
28pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
36pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ContainerChunker<Vec<((K,()),T,R)>>, VecInternalMerger<(K, ()), T, R>>;
38pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
40
41pub use layers::{Vals, Upds};
45pub mod layers {
54
55 use serde::{Deserialize, Serialize};
56 use crate::trace::implementations::BatchContainer;
57
58 #[derive(Debug, Serialize, Deserialize)]
60 pub struct Vals<O, V> {
61 pub offs: O,
65 pub vals: V,
67 }
68
69 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Default for Vals<O, V> {
70 fn default() -> Self { Self::with_capacity(0, 0) }
71 }
72
73 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
74 #[inline(always)] pub fn bounds(&self, index: usize) -> (usize, usize) {
76 (self.offs.index(index), self.offs.index(index+1))
77 }
78 pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> V::ReadItem<'_> {
85 self.get_abs(self.bounds(list_idx).0 + item_idx)
86 }
87
88 pub fn len(&self) -> usize { self.offs.len() - 1 }
90 pub fn get_abs(&self, index: usize) -> V::ReadItem<'_> {
92 self.vals.index(index)
93 }
94 pub fn with_capacity(o_size: usize, v_size: usize) -> Self {
96 let mut offs = <O as BatchContainer>::with_capacity(o_size);
97 offs.push_ref(0);
98 Self {
99 offs,
100 vals: <V as BatchContainer>::with_capacity(v_size),
101 }
102 }
103 pub fn merge_capacity(this: &Self, that: &Self) -> Self {
105 let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
106 offs.push_ref(0);
107 Self {
108 offs,
109 vals: <V as BatchContainer>::merge_capacity(&this.vals, &that.vals),
110 }
111 }
112 }
113
114 #[derive(Debug, Serialize, Deserialize)]
119 pub struct Upds<O, T, D> {
120 pub offs: O,
122 pub times: T,
124 pub diffs: D,
126 }
127
128 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Default for Upds<O, T, D> {
129 fn default() -> Self { Self::with_capacity(0, 0) }
130 }
131 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Upds<O, T, D> {
132 pub fn bounds(&self, index: usize) -> (usize, usize) {
134 let mut lower = self.offs.index(index);
135 let upper = self.offs.index(index+1);
136 if lower == upper {
139 assert!(lower > 0);
140 lower -= 1;
141 }
142 (lower, upper)
143 }
144 pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
151 self.get_abs(self.bounds(list_idx).0 + item_idx)
152 }
153
154 pub fn len(&self) -> usize { self.offs.len() - 1 }
156 pub fn get_abs(&self, index: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
158 (self.times.index(index), self.diffs.index(index))
159 }
160 pub fn with_capacity(o_size: usize, u_size: usize) -> Self {
162 let mut offs = <O as BatchContainer>::with_capacity(o_size);
163 offs.push_ref(0);
164 Self {
165 offs,
166 times: <T as BatchContainer>::with_capacity(u_size),
167 diffs: <D as BatchContainer>::with_capacity(u_size),
168 }
169 }
170 pub fn merge_capacity(this: &Self, that: &Self) -> Self {
172 let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
173 offs.push_ref(0);
174 Self {
175 offs,
176 times: <T as BatchContainer>::merge_capacity(&this.times, &that.times),
177 diffs: <D as BatchContainer>::merge_capacity(&this.diffs, &that.diffs),
178 }
179 }
180 }
181
182 pub struct UpdsBuilder<T: BatchContainer, D: BatchContainer> {
184 stash: Vec<(T::Owned, D::Owned)>,
189 total: usize,
193
194 time_con: T,
196 diff_con: D,
198 }
199
200 impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
201 fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
202 }
203
204
205 impl<T, D> UpdsBuilder<T, D>
206 where
207 T: BatchContainer<Owned: Ord>,
208 D: BatchContainer<Owned: crate::difference::Semigroup>,
209 {
210 pub fn push(&mut self, time: T::Owned, diff: D::Owned) {
212 self.stash.push((time, diff));
213 }
214
215 pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
219 use crate::consolidation;
220 consolidation::consolidate(&mut self.stash);
221 if self.stash.is_empty() { return false; }
223 if self.stash.len() == 1 {
225 let (time, diff) = self.stash.last().unwrap();
226 self.time_con.clear(); self.time_con.push_own(time);
227 self.diff_con.clear(); self.diff_con.push_own(diff);
228 if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
229 self.total += 1;
230 self.stash.clear();
231 upds.offs.push_ref(upds.times.len());
232 return true;
233 }
234 }
235 self.total += self.stash.len();
237 for (time, diff) in self.stash.drain(..) {
238 upds.times.push_own(&time);
239 upds.diffs.push_own(&diff);
240 }
241 upds.offs.push_ref(upds.times.len());
242 true
243 }
244
245 pub fn total(&self) -> usize { self.total }
247 }
248}
249
250pub mod val_batch {
252
253 use std::marker::PhantomData;
254 use serde::{Deserialize, Serialize};
255 use timely::container::PushInto;
256 use timely::progress::{Antichain, frontier::AntichainRef};
257
258 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
259 use crate::trace::implementations::{BatchContainer, BuilderInput};
260 use crate::trace::implementations::layout;
261
262 use super::{Layout, Vals, Upds, layers::UpdsBuilder};
263
264 #[derive(Debug, Serialize, Deserialize)]
266 #[serde(bound = "
267 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
268 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
269 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
270 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
271 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
272 ")]
273 pub struct OrdValStorage<L: Layout> {
274 pub keys: L::KeyContainer,
276 pub vals: Vals<L::OffsetContainer, L::ValContainer>,
278 pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
280 }
281
282 #[derive(Serialize, Deserialize)]
287 #[serde(bound = "
288 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
289 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
290 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
291 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
292 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
293 ")]
294 pub struct OrdValBatch<L: Layout> {
295 pub storage: OrdValStorage<L>,
297 pub description: Description<layout::Time<L>>,
299 pub updates: usize,
305 }
306
307 impl<L: Layout> WithLayout for OrdValBatch<L> {
308 type Layout = L;
309 }
310
311 impl<L: Layout> BatchReader for OrdValBatch<L> {
312
313 type Cursor = OrdValCursor<L>;
314 fn cursor(&self) -> Self::Cursor {
315 OrdValCursor {
316 key_cursor: 0,
317 val_cursor: 0,
318 phantom: PhantomData,
319 }
320 }
321 fn len(&self) -> usize {
322 self.updates
325 }
326 fn description(&self) -> &Description<layout::Time<L>> { &self.description }
327 }
328
329 impl<L: Layout> Batch for OrdValBatch<L> {
330 type Merger = OrdValMerger<L>;
331
332 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
333 OrdValMerger::new(self, other, compaction_frontier)
334 }
335
336 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
337 use timely::progress::Timestamp;
338 Self {
339 storage: OrdValStorage {
340 keys: L::KeyContainer::with_capacity(0),
341 vals: Default::default(),
342 upds: Default::default(),
343 },
344 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
345 updates: 0,
346 }
347 }
348 }
349
350 pub struct OrdValMerger<L: Layout> {
352 key_cursor1: usize,
354 key_cursor2: usize,
356 result: OrdValStorage<L>,
358 description: Description<layout::Time<L>>,
360 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
362 }
363
364 impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
365 where
366 OrdValBatch<L>: Batch<Time=layout::Time<L>>,
367 {
368 fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
369
370 assert!(batch1.upper() == batch2.lower());
371 use crate::lattice::Lattice;
372 let mut since = batch1.description().since().join(batch2.description().since());
373 since = since.join(&compaction_frontier.to_owned());
374
375 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
376
377 let batch1 = &batch1.storage;
378 let batch2 = &batch2.storage;
379
380 OrdValMerger {
381 key_cursor1: 0,
382 key_cursor2: 0,
383 result: OrdValStorage {
384 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
385 vals: Vals::merge_capacity(&batch1.vals, &batch2.vals),
386 upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
387 },
388 description,
389 staging: UpdsBuilder::default(),
390 }
391 }
392 fn done(self) -> OrdValBatch<L> {
393 OrdValBatch {
394 updates: self.staging.total(),
395 storage: self.result,
396 description: self.description,
397 }
398 }
399 fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
400
401 let starting_updates = self.staging.total();
403 let mut effort = 0isize;
404
405 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
407 self.merge_key(&source1.storage, &source2.storage);
408 effort = (self.staging.total() - starting_updates) as isize;
410 }
411
412 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
415 self.copy_key(&source1.storage, self.key_cursor1);
416 self.key_cursor1 += 1;
417 effort = (self.staging.total() - starting_updates) as isize;
418 }
419 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
420 self.copy_key(&source2.storage, self.key_cursor2);
421 self.key_cursor2 += 1;
422 effort = (self.staging.total() - starting_updates) as isize;
423 }
424
425 *fuel -= effort;
426 }
427 }
428
429 impl<L: Layout> OrdValMerger<L> {
431 fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
439 let init_vals = self.result.vals.vals.len();
441 let (mut lower, upper) = source.vals.bounds(cursor);
442 while lower < upper {
443 self.stash_updates_for_val(source, lower);
444 if self.staging.seal(&mut self.result.upds) {
445 self.result.vals.vals.push_ref(source.vals.get_abs(lower));
446 }
447 lower += 1;
448 }
449
450 if self.result.vals.vals.len() > init_vals {
452 self.result.keys.push_ref(source.keys.index(cursor));
453 self.result.vals.offs.push_ref(self.result.vals.vals.len());
454 }
455 }
456 fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
461 use ::std::cmp::Ordering;
462 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
463 Ordering::Less => {
464 self.copy_key(source1, self.key_cursor1);
465 self.key_cursor1 += 1;
466 },
467 Ordering::Equal => {
468 let (lower1, upper1) = source1.vals.bounds(self.key_cursor1);
470 let (lower2, upper2) = source2.vals.bounds(self.key_cursor2);
471 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
472 self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
473 self.result.vals.offs.push_ref(off);
474 }
475 self.key_cursor1 += 1;
477 self.key_cursor2 += 1;
478 },
479 Ordering::Greater => {
480 self.copy_key(source2, self.key_cursor2);
481 self.key_cursor2 += 1;
482 },
483 }
484 }
485 fn merge_vals(
490 &mut self,
491 (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
492 (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
493 ) -> Option<usize> {
494 let init_vals = self.result.vals.vals.len();
496 while lower1 < upper1 && lower2 < upper2 {
497 use ::std::cmp::Ordering;
501 match source1.vals.get_abs(lower1).cmp(&source2.vals.get_abs(lower2)) {
502 Ordering::Less => {
503 self.stash_updates_for_val(source1, lower1);
505 if self.staging.seal(&mut self.result.upds) {
506 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
507 }
508 lower1 += 1;
509 },
510 Ordering::Equal => {
511 self.stash_updates_for_val(source1, lower1);
512 self.stash_updates_for_val(source2, lower2);
513 if self.staging.seal(&mut self.result.upds) {
514 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
515 }
516 lower1 += 1;
517 lower2 += 1;
518 },
519 Ordering::Greater => {
520 self.stash_updates_for_val(source2, lower2);
522 if self.staging.seal(&mut self.result.upds) {
523 self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
524 }
525 lower2 += 1;
526 },
527 }
528 }
529 while lower1 < upper1 {
531 self.stash_updates_for_val(source1, lower1);
532 if self.staging.seal(&mut self.result.upds) {
533 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
534 }
535 lower1 += 1;
536 }
537 while lower2 < upper2 {
538 self.stash_updates_for_val(source2, lower2);
539 if self.staging.seal(&mut self.result.upds) {
540 self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
541 }
542 lower2 += 1;
543 }
544
545 if self.result.vals.vals.len() > init_vals {
547 Some(self.result.vals.vals.len())
548 } else {
549 None
550 }
551 }
552
553 fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
555 let (lower, upper) = source.upds.bounds(index);
556 for i in lower .. upper {
557 let (time, diff) = source.upds.get_abs(i);
559 use crate::lattice::Lattice;
560 let mut new_time: layout::Time<L> = L::TimeContainer::into_owned(time);
561 new_time.advance_by(self.description.since().borrow());
562 self.staging.push(new_time, L::DiffContainer::into_owned(diff));
563 }
564 }
565 }
566
567 pub struct OrdValCursor<L: Layout> {
569 key_cursor: usize,
571 val_cursor: usize,
573 phantom: PhantomData<L>,
575 }
576
577 use crate::trace::implementations::WithLayout;
578 impl<L: Layout> WithLayout for OrdValCursor<L> {
579 type Layout = L;
580 }
581
582 impl<L: Layout> Cursor for OrdValCursor<L> {
583
584 type Storage = OrdValBatch<L>;
585
586 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
587 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
588
589 fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
590 fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.get_abs(self.val_cursor) }
591 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
592 let (lower, upper) = storage.storage.upds.bounds(self.val_cursor);
593 for index in lower .. upper {
594 let (time, diff) = storage.storage.upds.get_abs(index);
595 logic(time, diff);
596 }
597 }
598 fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
599 fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.vals.bounds(self.key_cursor).1 }
600 fn step_key(&mut self, storage: &OrdValBatch<L>){
601 self.key_cursor += 1;
602 if self.key_valid(storage) {
603 self.rewind_vals(storage);
604 }
605 else {
606 self.key_cursor = storage.storage.keys.len();
607 }
608 }
609 fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
610 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
611 if self.key_valid(storage) {
612 self.rewind_vals(storage);
613 }
614 }
615 fn step_val(&mut self, storage: &OrdValBatch<L>) {
616 self.val_cursor += 1;
617 if !self.val_valid(storage) {
618 self.val_cursor = storage.storage.vals.bounds(self.key_cursor).1;
619 }
620 }
621 fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
622 self.val_cursor += storage.storage.vals.vals.advance(self.val_cursor, storage.storage.vals.bounds(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
623 }
624 fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
625 self.key_cursor = 0;
626 if self.key_valid(storage) {
627 self.rewind_vals(storage)
628 }
629 }
630 fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
631 self.val_cursor = storage.storage.vals.bounds(self.key_cursor).0;
632 }
633 }
634
635 pub struct OrdValBuilder<L: Layout, CI> {
637 pub result: OrdValStorage<L>,
641 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
642 _marker: PhantomData<CI>,
643 }
644
645 impl<L, CI> Builder for OrdValBuilder<L, CI>
646 where
647 L: for<'a> Layout<
648 KeyContainer: PushInto<CI::Key<'a>>,
649 ValContainer: PushInto<CI::Val<'a>>,
650 >,
651 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
652 {
653
654 type Input = CI;
655 type Time = layout::Time<L>;
656 type Output = OrdValBatch<L>;
657
658 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
659 Self {
660 result: OrdValStorage {
661 keys: L::KeyContainer::with_capacity(keys),
662 vals: Vals::with_capacity(keys + 1, vals),
663 upds: Upds::with_capacity(vals + 1, upds),
664 },
665 staging: UpdsBuilder::default(),
666 _marker: PhantomData,
667 }
668 }
669
670 #[inline]
671 fn push(&mut self, chunk: &mut Self::Input) {
672 for item in chunk.drain() {
673 let (key, val, time, diff) = CI::into_parts(item);
674
675 if self.result.keys.is_empty() {
677 self.result.vals.vals.push_into(val);
678 self.result.keys.push_into(key);
679 self.staging.push(time, diff);
680 }
681 else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
683 if self.result.vals.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
685 self.staging.push(time, diff);
686 } else {
687 self.staging.seal(&mut self.result.upds);
689 self.staging.push(time, diff);
690 self.result.vals.vals.push_into(val);
691 }
692 } else {
693 self.staging.seal(&mut self.result.upds);
695 self.staging.push(time, diff);
696 self.result.vals.offs.push_ref(self.result.vals.vals.len());
697 self.result.vals.vals.push_into(val);
698 self.result.keys.push_into(key);
699 }
700 }
701 }
702
703 #[inline(never)]
704 fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
705 self.staging.seal(&mut self.result.upds);
706 self.result.vals.offs.push_ref(self.result.vals.vals.len());
707 OrdValBatch {
708 updates: self.staging.total(),
709 storage: self.result,
710 description,
711 }
712 }
713
714 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
715 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
716 let mut builder = Self::with_capacity(keys, vals, upds);
717 for mut chunk in chain.drain(..) {
718 builder.push(&mut chunk);
719 }
720
721 builder.done(description)
722 }
723 }
724}
725
726pub mod key_batch {
728
729 use std::marker::PhantomData;
730 use serde::{Deserialize, Serialize};
731 use timely::container::PushInto;
732 use timely::progress::{Antichain, frontier::AntichainRef};
733
734 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
735 use crate::trace::implementations::{BatchContainer, BuilderInput};
736 use crate::trace::implementations::layout;
737
738 use super::{Layout, Upds, layers::UpdsBuilder};
739
740 #[derive(Debug, Serialize, Deserialize)]
742 #[serde(bound = "
743 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
744 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
745 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
746 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
747 ")]
748 pub struct OrdKeyStorage<L: Layout> {
749 pub keys: L::KeyContainer,
751 pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
753 }
754
755 #[derive(Serialize, Deserialize)]
760 #[serde(bound = "
761 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
762 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
763 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
764 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
765 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
766 ")]
767 pub struct OrdKeyBatch<L: Layout> {
768 pub storage: OrdKeyStorage<L>,
770 pub description: Description<layout::Time<L>>,
772 pub updates: usize,
778
779 pub value: L::ValContainer,
781 }
782
783 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> OrdKeyBatch<L> {
784 pub fn create_value() -> L::ValContainer {
786 let mut value = L::ValContainer::with_capacity(1);
787 value.push_own(&Default::default());
788 value
789 }
790 }
791
792 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> WithLayout for OrdKeyBatch<L> {
793 type Layout = L;
794 }
795
796 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> BatchReader for OrdKeyBatch<L> {
797
798 type Cursor = OrdKeyCursor<L>;
799 fn cursor(&self) -> Self::Cursor {
800 OrdKeyCursor {
801 key_cursor: 0,
802 val_stepped: false,
803 phantom: std::marker::PhantomData,
804 }
805 }
806 fn len(&self) -> usize {
807 self.updates
810 }
811 fn description(&self) -> &Description<layout::Time<L>> { &self.description }
812 }
813
814 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Batch for OrdKeyBatch<L> {
815 type Merger = OrdKeyMerger<L>;
816
817 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
818 OrdKeyMerger::new(self, other, compaction_frontier)
819 }
820
821 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
822 use timely::progress::Timestamp;
823 Self {
824 storage: OrdKeyStorage {
825 keys: L::KeyContainer::with_capacity(0),
826 upds: Upds::default(),
827 },
828 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
829 updates: 0,
830 value: Self::create_value(),
831 }
832 }
833 }
834
835 pub struct OrdKeyMerger<L: Layout> {
837 key_cursor1: usize,
839 key_cursor2: usize,
841 result: OrdKeyStorage<L>,
843 description: Description<layout::Time<L>>,
845
846 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
848 }
849
850 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
851 where
852 OrdKeyBatch<L>: Batch<Time=layout::Time<L>>,
853 {
854 fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
855
856 assert!(batch1.upper() == batch2.lower());
857 use crate::lattice::Lattice;
858 let mut since = batch1.description().since().join(batch2.description().since());
859 since = since.join(&compaction_frontier.to_owned());
860
861 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
862
863 let batch1 = &batch1.storage;
864 let batch2 = &batch2.storage;
865
866 OrdKeyMerger {
867 key_cursor1: 0,
868 key_cursor2: 0,
869 result: OrdKeyStorage {
870 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
871 upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
872 },
873 description,
874 staging: UpdsBuilder::default(),
875 }
876 }
877 fn done(self) -> OrdKeyBatch<L> {
878 OrdKeyBatch {
879 updates: self.staging.total(),
880 storage: self.result,
881 description: self.description,
882 value: OrdKeyBatch::<L>::create_value(),
883 }
884 }
885 fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
886
887 let starting_updates = self.staging.total();
889 let mut effort = 0isize;
890
891 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
893 self.merge_key(&source1.storage, &source2.storage);
894 effort = (self.staging.total() - starting_updates) as isize;
896 }
897
898 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
901 self.copy_key(&source1.storage, self.key_cursor1);
902 self.key_cursor1 += 1;
903 effort = (self.staging.total() - starting_updates) as isize;
904 }
905 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
906 self.copy_key(&source2.storage, self.key_cursor2);
907 self.key_cursor2 += 1;
908 effort = (self.staging.total() - starting_updates) as isize;
909 }
910
911 *fuel -= effort;
912 }
913 }
914
915 impl<L: Layout> OrdKeyMerger<L> {
917 fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
925 self.stash_updates_for_key(source, cursor);
926 if self.staging.seal(&mut self.result.upds) {
927 self.result.keys.push_ref(source.keys.index(cursor));
928 }
929 }
930 fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
935 use ::std::cmp::Ordering;
936 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
937 Ordering::Less => {
938 self.copy_key(source1, self.key_cursor1);
939 self.key_cursor1 += 1;
940 },
941 Ordering::Equal => {
942 self.stash_updates_for_key(source1, self.key_cursor1);
944 self.stash_updates_for_key(source2, self.key_cursor2);
945 if self.staging.seal(&mut self.result.upds) {
946 self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
947 }
948 self.key_cursor1 += 1;
950 self.key_cursor2 += 1;
951 },
952 Ordering::Greater => {
953 self.copy_key(source2, self.key_cursor2);
954 self.key_cursor2 += 1;
955 },
956 }
957 }
958
959 fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
961 let (lower, upper) = source.upds.bounds(index);
962 for i in lower .. upper {
963 let (time, diff) = source.upds.get_abs(i);
965 use crate::lattice::Lattice;
966 let mut new_time = L::TimeContainer::into_owned(time);
967 new_time.advance_by(self.description.since().borrow());
968 self.staging.push(new_time, L::DiffContainer::into_owned(diff));
969 }
970 }
971 }
972
973 pub struct OrdKeyCursor<L: Layout> {
975 key_cursor: usize,
977 val_stepped: bool,
979 phantom: PhantomData<L>,
981 }
982
983 use crate::trace::implementations::WithLayout;
984 impl<L: Layout<ValContainer: BatchContainer>> WithLayout for OrdKeyCursor<L> {
985 type Layout = L;
986 }
987
988 impl<L: for<'a> Layout<ValContainer: BatchContainer<Owned: Default>>> Cursor for OrdKeyCursor<L> {
989
990 type Storage = OrdKeyBatch<L>;
991
992 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
993 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
994
995 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
996 fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { storage.value.index(0) }
997 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
998 let (lower, upper) = storage.storage.upds.bounds(self.key_cursor);
999 for index in lower .. upper {
1000 let (time, diff) = storage.storage.upds.get_abs(index);
1001 logic(time, diff);
1002 }
1003 }
1004 fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1005 fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1006 fn step_key(&mut self, storage: &Self::Storage){
1007 self.key_cursor += 1;
1008 if self.key_valid(storage) {
1009 self.rewind_vals(storage);
1010 }
1011 else {
1012 self.key_cursor = storage.storage.keys.len();
1013 }
1014 }
1015 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1016 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1017 if self.key_valid(storage) {
1018 self.rewind_vals(storage);
1019 }
1020 }
1021 fn step_val(&mut self, _storage: &Self::Storage) {
1022 self.val_stepped = true;
1023 }
1024 fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1025 fn rewind_keys(&mut self, storage: &Self::Storage) {
1026 self.key_cursor = 0;
1027 if self.key_valid(storage) {
1028 self.rewind_vals(storage)
1029 }
1030 }
1031 fn rewind_vals(&mut self, _storage: &Self::Storage) {
1032 self.val_stepped = false;
1033 }
1034 }
1035
1036 pub struct OrdKeyBuilder<L: Layout, CI> {
1038 pub result: OrdKeyStorage<L>,
1042 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
1043 _marker: PhantomData<CI>,
1044 }
1045
1046 impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1047 where
1048 L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1049 L: Layout<ValContainer: BatchContainer<Owned: Default>>,
1050 CI: BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
1051 {
1052
1053 type Input = CI;
1054 type Time = layout::Time<L>;
1055 type Output = OrdKeyBatch<L>;
1056
1057 fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1058 Self {
1059 result: OrdKeyStorage {
1060 keys: L::KeyContainer::with_capacity(keys),
1061 upds: Upds::with_capacity(keys+1, upds),
1062 },
1063 staging: UpdsBuilder::default(),
1064 _marker: PhantomData,
1065 }
1066 }
1067
1068 #[inline]
1069 fn push(&mut self, chunk: &mut Self::Input) {
1070 for item in chunk.drain() {
1071 let (key, _val, time, diff) = CI::into_parts(item);
1072 if self.result.keys.is_empty() {
1073 self.result.keys.push_into(key);
1074 self.staging.push(time, diff);
1075 }
1076 else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1078 self.staging.push(time, diff);
1079 } else {
1080 self.staging.seal(&mut self.result.upds);
1081 self.staging.push(time, diff);
1082 self.result.keys.push_into(key);
1083 }
1084 }
1085 }
1086
1087 #[inline(never)]
1088 fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1089 self.staging.seal(&mut self.result.upds);
1090 OrdKeyBatch {
1091 updates: self.staging.total(),
1092 storage: self.result,
1093 description,
1094 value: OrdKeyBatch::<L>::create_value(),
1095 }
1096 }
1097
1098 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1099 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1100 let mut builder = Self::with_capacity(keys, vals, upds);
1101 for mut chunk in chain.drain(..) {
1102 builder.push(&mut chunk);
1103 }
1104
1105 builder.done(description)
1106 }
1107 }
1108
1109}