1use std::rc::Rc;
12
13use crate::trace::implementations::spine_fueled::Spine;
14use crate::trace::implementations::merge_batcher::MergeBatcher;
15use crate::trace::implementations::merge_batcher::vec::VecMerger;
16use crate::trace::rc_blanket_impls::RcBuilder;
17
18use super::{Layout, Vector};
19
20pub use self::val_batch::{OrdValBatch, OrdValBuilder};
21pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
22
23pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
25pub type OrdValBatcher<K, V, T, R> = MergeBatcher<VecMerger<(K, V), T, R>>;
27pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
29
30pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
32pub type OrdKeyBatcher<K, T, R> = MergeBatcher<VecMerger<(K, ()), T, R>>;
34pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
36
37pub use layers::{Vals, Upds};
38pub mod layers {
47
48 use serde::{Deserialize, Serialize};
49 use crate::trace::implementations::BatchContainer;
50
51 #[derive(Debug, Serialize, Deserialize)]
53 pub struct Vals<O, V> {
54 pub offs: O,
58 pub vals: V,
60 }
61
62 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Default for Vals<O, V> {
63 fn default() -> Self { Self::with_capacity(0, 0) }
64 }
65
66 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
67 #[inline(always)] pub fn bounds(&self, index: usize) -> (usize, usize) {
69 (self.offs.index(index), self.offs.index(index+1))
70 }
71 pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> V::ReadItem<'_> {
78 self.get_abs(self.bounds(list_idx).0 + item_idx)
79 }
80
81 pub fn len(&self) -> usize { self.offs.len() - 1 }
83 pub fn get_abs(&self, index: usize) -> V::ReadItem<'_> {
85 self.vals.index(index)
86 }
87 pub fn with_capacity(o_size: usize, v_size: usize) -> Self {
89 let mut offs = <O as BatchContainer>::with_capacity(o_size);
90 offs.push_ref(0);
91 Self {
92 offs,
93 vals: <V as BatchContainer>::with_capacity(v_size),
94 }
95 }
96 pub fn merge_capacity(this: &Self, that: &Self) -> Self {
98 let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
99 offs.push_ref(0);
100 Self {
101 offs,
102 vals: <V as BatchContainer>::merge_capacity(&this.vals, &that.vals),
103 }
104 }
105 }
106
107 #[derive(Debug, Serialize, Deserialize)]
112 pub struct Upds<O, T, D> {
113 pub offs: O,
115 pub times: T,
117 pub diffs: D,
119 }
120
121 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Default for Upds<O, T, D> {
122 fn default() -> Self { Self::with_capacity(0, 0) }
123 }
124 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Upds<O, T, D> {
125 pub fn bounds(&self, index: usize) -> (usize, usize) {
127 let mut lower = self.offs.index(index);
128 let upper = self.offs.index(index+1);
129 if lower == upper {
132 assert!(lower > 0);
133 lower -= 1;
134 }
135 (lower, upper)
136 }
137 pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
144 self.get_abs(self.bounds(list_idx).0 + item_idx)
145 }
146
147 pub fn len(&self) -> usize { self.offs.len() - 1 }
149 pub fn get_abs(&self, index: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
151 (self.times.index(index), self.diffs.index(index))
152 }
153 pub fn with_capacity(o_size: usize, u_size: usize) -> Self {
155 let mut offs = <O as BatchContainer>::with_capacity(o_size);
156 offs.push_ref(0);
157 Self {
158 offs,
159 times: <T as BatchContainer>::with_capacity(u_size),
160 diffs: <D as BatchContainer>::with_capacity(u_size),
161 }
162 }
163 pub fn merge_capacity(this: &Self, that: &Self) -> Self {
165 let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
166 offs.push_ref(0);
167 Self {
168 offs,
169 times: <T as BatchContainer>::merge_capacity(&this.times, &that.times),
170 diffs: <D as BatchContainer>::merge_capacity(&this.diffs, &that.diffs),
171 }
172 }
173 }
174
175 pub struct UpdsBuilder<T: BatchContainer, D: BatchContainer> {
177 stash: Vec<(T::Owned, D::Owned)>,
182 total: usize,
186
187 time_con: T,
189 diff_con: D,
191 }
192
193 impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
194 fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
195 }
196
197
198 impl<T, D> UpdsBuilder<T, D>
199 where
200 T: BatchContainer<Owned: Ord>,
201 D: BatchContainer<Owned: crate::difference::Semigroup>,
202 {
203 pub fn push(&mut self, time: T::Owned, diff: D::Owned) {
205 self.stash.push((time, diff));
206 }
207
208 pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
212 use crate::consolidation;
213 consolidation::consolidate(&mut self.stash);
214 if self.stash.is_empty() { return false; }
216 if self.stash.len() == 1 {
218 let (time, diff) = self.stash.last().unwrap();
219 self.time_con.clear(); self.time_con.push_own(time);
220 self.diff_con.clear(); self.diff_con.push_own(diff);
221 if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
222 self.total += 1;
223 self.stash.clear();
224 upds.offs.push_ref(upds.times.len());
225 return true;
226 }
227 }
228 self.total += self.stash.len();
230 for (time, diff) in self.stash.drain(..) {
231 upds.times.push_own(&time);
232 upds.diffs.push_own(&diff);
233 }
234 upds.offs.push_ref(upds.times.len());
235 true
236 }
237
238 pub fn total(&self) -> usize { self.total }
240 }
241}
242
243pub mod val_batch {
245
246 use std::marker::PhantomData;
247 use serde::{Deserialize, Serialize};
248 use timely::container::PushInto;
249 use timely::progress::{Antichain, frontier::AntichainRef};
250
251 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
252 use crate::trace::implementations::{BatchContainer, BuilderInput};
253 use crate::trace::implementations::layout;
254
255 use super::{Layout, Vals, Upds, layers::UpdsBuilder};
256
257 #[derive(Debug, Serialize, Deserialize)]
259 #[serde(bound = "
260 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
261 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
262 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
263 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
264 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
265 ")]
266 pub struct OrdValStorage<L: Layout> {
267 pub keys: L::KeyContainer,
269 pub vals: Vals<L::OffsetContainer, L::ValContainer>,
271 pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
273 }
274
275 #[derive(Serialize, Deserialize)]
280 #[serde(bound = "
281 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
282 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
283 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
284 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
285 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
286 ")]
287 pub struct OrdValBatch<L: Layout> {
288 pub storage: OrdValStorage<L>,
290 pub description: Description<layout::Time<L>>,
292 pub updates: usize,
298 }
299
300 impl<L: Layout> WithLayout for OrdValBatch<L> {
301 type Layout = L;
302 }
303
304 impl<L: Layout> BatchReader for OrdValBatch<L> {
305
306 type Cursor = OrdValCursor<L>;
307 fn cursor(&self) -> Self::Cursor {
308 OrdValCursor {
309 key_cursor: 0,
310 val_cursor: 0,
311 phantom: PhantomData,
312 }
313 }
314 fn len(&self) -> usize {
315 self.updates
318 }
319 fn description(&self) -> &Description<layout::Time<L>> { &self.description }
320 }
321
322 impl<L: Layout> Batch for OrdValBatch<L> {
323 type Merger = OrdValMerger<L>;
324
325 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
326 OrdValMerger::new(self, other, compaction_frontier)
327 }
328
329 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
330 use timely::progress::Timestamp;
331 Self {
332 storage: OrdValStorage {
333 keys: L::KeyContainer::with_capacity(0),
334 vals: Default::default(),
335 upds: Default::default(),
336 },
337 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
338 updates: 0,
339 }
340 }
341 }
342
343 pub struct OrdValMerger<L: Layout> {
345 key_cursor1: usize,
347 key_cursor2: usize,
349 result: OrdValStorage<L>,
351 description: Description<layout::Time<L>>,
353 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
355 }
356
357 impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
358 where
359 OrdValBatch<L>: Batch<Time=layout::Time<L>>,
360 {
361 fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
362
363 assert!(batch1.upper() == batch2.lower());
364 use crate::lattice::Lattice;
365 let mut since = batch1.description().since().join(batch2.description().since());
366 since = since.join(&compaction_frontier.to_owned());
367
368 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
369
370 let batch1 = &batch1.storage;
371 let batch2 = &batch2.storage;
372
373 OrdValMerger {
374 key_cursor1: 0,
375 key_cursor2: 0,
376 result: OrdValStorage {
377 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
378 vals: Vals::merge_capacity(&batch1.vals, &batch2.vals),
379 upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
380 },
381 description,
382 staging: UpdsBuilder::default(),
383 }
384 }
385 fn done(self) -> OrdValBatch<L> {
386 OrdValBatch {
387 updates: self.staging.total(),
388 storage: self.result,
389 description: self.description,
390 }
391 }
392 fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
393
394 let starting_updates = self.staging.total();
396 let mut effort = 0isize;
397
398 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
400 self.merge_key(&source1.storage, &source2.storage);
401 effort = (self.staging.total() - starting_updates) as isize;
403 }
404
405 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
408 self.copy_key(&source1.storage, self.key_cursor1);
409 self.key_cursor1 += 1;
410 effort = (self.staging.total() - starting_updates) as isize;
411 }
412 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
413 self.copy_key(&source2.storage, self.key_cursor2);
414 self.key_cursor2 += 1;
415 effort = (self.staging.total() - starting_updates) as isize;
416 }
417
418 *fuel -= effort;
419 }
420 }
421
422 impl<L: Layout> OrdValMerger<L> {
424 fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
432 let init_vals = self.result.vals.vals.len();
434 let (mut lower, upper) = source.vals.bounds(cursor);
435 while lower < upper {
436 self.stash_updates_for_val(source, lower);
437 if self.staging.seal(&mut self.result.upds) {
438 self.result.vals.vals.push_ref(source.vals.get_abs(lower));
439 }
440 lower += 1;
441 }
442
443 if self.result.vals.vals.len() > init_vals {
445 self.result.keys.push_ref(source.keys.index(cursor));
446 self.result.vals.offs.push_ref(self.result.vals.vals.len());
447 }
448 }
449 fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
454 use ::std::cmp::Ordering;
455 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
456 Ordering::Less => {
457 self.copy_key(source1, self.key_cursor1);
458 self.key_cursor1 += 1;
459 },
460 Ordering::Equal => {
461 let (lower1, upper1) = source1.vals.bounds(self.key_cursor1);
463 let (lower2, upper2) = source2.vals.bounds(self.key_cursor2);
464 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
465 self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
466 self.result.vals.offs.push_ref(off);
467 }
468 self.key_cursor1 += 1;
470 self.key_cursor2 += 1;
471 },
472 Ordering::Greater => {
473 self.copy_key(source2, self.key_cursor2);
474 self.key_cursor2 += 1;
475 },
476 }
477 }
478 fn merge_vals(
483 &mut self,
484 (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
485 (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
486 ) -> Option<usize> {
487 let init_vals = self.result.vals.vals.len();
489 while lower1 < upper1 && lower2 < upper2 {
490 use ::std::cmp::Ordering;
494 match source1.vals.get_abs(lower1).cmp(&source2.vals.get_abs(lower2)) {
495 Ordering::Less => {
496 self.stash_updates_for_val(source1, lower1);
498 if self.staging.seal(&mut self.result.upds) {
499 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
500 }
501 lower1 += 1;
502 },
503 Ordering::Equal => {
504 self.stash_updates_for_val(source1, lower1);
505 self.stash_updates_for_val(source2, lower2);
506 if self.staging.seal(&mut self.result.upds) {
507 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
508 }
509 lower1 += 1;
510 lower2 += 1;
511 },
512 Ordering::Greater => {
513 self.stash_updates_for_val(source2, lower2);
515 if self.staging.seal(&mut self.result.upds) {
516 self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
517 }
518 lower2 += 1;
519 },
520 }
521 }
522 while lower1 < upper1 {
524 self.stash_updates_for_val(source1, lower1);
525 if self.staging.seal(&mut self.result.upds) {
526 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
527 }
528 lower1 += 1;
529 }
530 while lower2 < upper2 {
531 self.stash_updates_for_val(source2, lower2);
532 if self.staging.seal(&mut self.result.upds) {
533 self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
534 }
535 lower2 += 1;
536 }
537
538 if self.result.vals.vals.len() > init_vals {
540 Some(self.result.vals.vals.len())
541 } else {
542 None
543 }
544 }
545
546 fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
548 let (lower, upper) = source.upds.bounds(index);
549 for i in lower .. upper {
550 let (time, diff) = source.upds.get_abs(i);
552 use crate::lattice::Lattice;
553 let mut new_time: layout::Time<L> = L::TimeContainer::into_owned(time);
554 new_time.advance_by(self.description.since().borrow());
555 self.staging.push(new_time, L::DiffContainer::into_owned(diff));
556 }
557 }
558 }
559
560 pub struct OrdValCursor<L: Layout> {
562 key_cursor: usize,
564 val_cursor: usize,
566 phantom: PhantomData<L>,
568 }
569
570 use crate::trace::implementations::WithLayout;
571 impl<L: Layout> WithLayout for OrdValCursor<L> {
572 type Layout = L;
573 }
574
575 impl<L: Layout> Cursor for OrdValCursor<L> {
576
577 type Storage = OrdValBatch<L>;
578
579 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
580 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
581
582 fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
583 fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.get_abs(self.val_cursor) }
584 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
585 let (lower, upper) = storage.storage.upds.bounds(self.val_cursor);
586 for index in lower .. upper {
587 let (time, diff) = storage.storage.upds.get_abs(index);
588 logic(time, diff);
589 }
590 }
591 fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
592 fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.vals.bounds(self.key_cursor).1 }
593 fn step_key(&mut self, storage: &OrdValBatch<L>){
594 self.key_cursor += 1;
595 if self.key_valid(storage) {
596 self.rewind_vals(storage);
597 }
598 else {
599 self.key_cursor = storage.storage.keys.len();
600 }
601 }
602 fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
603 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
604 if self.key_valid(storage) {
605 self.rewind_vals(storage);
606 }
607 }
608 fn step_val(&mut self, storage: &OrdValBatch<L>) {
609 self.val_cursor += 1;
610 if !self.val_valid(storage) {
611 self.val_cursor = storage.storage.vals.bounds(self.key_cursor).1;
612 }
613 }
614 fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
615 self.val_cursor += storage.storage.vals.vals.advance(self.val_cursor, storage.storage.vals.bounds(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
616 }
617 fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
618 self.key_cursor = 0;
619 if self.key_valid(storage) {
620 self.rewind_vals(storage)
621 }
622 }
623 fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
624 self.val_cursor = storage.storage.vals.bounds(self.key_cursor).0;
625 }
626 }
627
628 pub struct OrdValBuilder<L: Layout, CI> {
630 pub result: OrdValStorage<L>,
634 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
635 _marker: PhantomData<CI>,
636 }
637
638 impl<L, CI> Builder for OrdValBuilder<L, CI>
639 where
640 L: for<'a> Layout<
641 KeyContainer: PushInto<CI::Key<'a>>,
642 ValContainer: PushInto<CI::Val<'a>>,
643 >,
644 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
645 {
646
647 type Input = CI;
648 type Time = layout::Time<L>;
649 type Output = OrdValBatch<L>;
650
651 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
652 Self {
653 result: OrdValStorage {
654 keys: L::KeyContainer::with_capacity(keys),
655 vals: Vals::with_capacity(keys + 1, vals),
656 upds: Upds::with_capacity(vals + 1, upds),
657 },
658 staging: UpdsBuilder::default(),
659 _marker: PhantomData,
660 }
661 }
662
663 #[inline]
664 fn push(&mut self, chunk: &mut Self::Input) {
665 for item in chunk.drain() {
666 let (key, val, time, diff) = CI::into_parts(item);
667
668 if self.result.keys.is_empty() {
670 self.result.vals.vals.push_into(val);
671 self.result.keys.push_into(key);
672 self.staging.push(time, diff);
673 }
674 else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
676 if self.result.vals.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
678 self.staging.push(time, diff);
679 } else {
680 self.staging.seal(&mut self.result.upds);
682 self.staging.push(time, diff);
683 self.result.vals.vals.push_into(val);
684 }
685 } else {
686 self.staging.seal(&mut self.result.upds);
688 self.staging.push(time, diff);
689 self.result.vals.offs.push_ref(self.result.vals.vals.len());
690 self.result.vals.vals.push_into(val);
691 self.result.keys.push_into(key);
692 }
693 }
694 }
695
696 #[inline(never)]
697 fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
698 self.staging.seal(&mut self.result.upds);
699 self.result.vals.offs.push_ref(self.result.vals.vals.len());
700 OrdValBatch {
701 updates: self.staging.total(),
702 storage: self.result,
703 description,
704 }
705 }
706
707 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
708 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
709 let mut builder = Self::with_capacity(keys, vals, upds);
710 for mut chunk in chain.drain(..) {
711 builder.push(&mut chunk);
712 }
713
714 builder.done(description)
715 }
716 }
717}
718
719pub mod key_batch {
721
722 use std::marker::PhantomData;
723 use serde::{Deserialize, Serialize};
724 use timely::container::PushInto;
725 use timely::progress::{Antichain, frontier::AntichainRef};
726
727 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
728 use crate::trace::implementations::{BatchContainer, BuilderInput};
729 use crate::trace::implementations::layout;
730
731 use super::{Layout, Upds, layers::UpdsBuilder};
732
733 #[derive(Debug, Serialize, Deserialize)]
735 #[serde(bound = "
736 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
737 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
738 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
739 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
740 ")]
741 pub struct OrdKeyStorage<L: Layout> {
742 pub keys: L::KeyContainer,
744 pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
746 }
747
748 #[derive(Serialize, Deserialize)]
753 #[serde(bound = "
754 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
755 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
756 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
757 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
758 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
759 ")]
760 pub struct OrdKeyBatch<L: Layout> {
761 pub storage: OrdKeyStorage<L>,
763 pub description: Description<layout::Time<L>>,
765 pub updates: usize,
771
772 pub value: L::ValContainer,
774 }
775
776 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> OrdKeyBatch<L> {
777 pub fn create_value() -> L::ValContainer {
779 let mut value = L::ValContainer::with_capacity(1);
780 value.push_own(&Default::default());
781 value
782 }
783 }
784
785 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> WithLayout for OrdKeyBatch<L> {
786 type Layout = L;
787 }
788
789 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> BatchReader for OrdKeyBatch<L> {
790
791 type Cursor = OrdKeyCursor<L>;
792 fn cursor(&self) -> Self::Cursor {
793 OrdKeyCursor {
794 key_cursor: 0,
795 val_stepped: false,
796 phantom: std::marker::PhantomData,
797 }
798 }
799 fn len(&self) -> usize {
800 self.updates
803 }
804 fn description(&self) -> &Description<layout::Time<L>> { &self.description }
805 }
806
807 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Batch for OrdKeyBatch<L> {
808 type Merger = OrdKeyMerger<L>;
809
810 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
811 OrdKeyMerger::new(self, other, compaction_frontier)
812 }
813
814 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
815 use timely::progress::Timestamp;
816 Self {
817 storage: OrdKeyStorage {
818 keys: L::KeyContainer::with_capacity(0),
819 upds: Upds::default(),
820 },
821 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
822 updates: 0,
823 value: Self::create_value(),
824 }
825 }
826 }
827
828 pub struct OrdKeyMerger<L: Layout> {
830 key_cursor1: usize,
832 key_cursor2: usize,
834 result: OrdKeyStorage<L>,
836 description: Description<layout::Time<L>>,
838
839 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
841 }
842
843 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
844 where
845 OrdKeyBatch<L>: Batch<Time=layout::Time<L>>,
846 {
847 fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
848
849 assert!(batch1.upper() == batch2.lower());
850 use crate::lattice::Lattice;
851 let mut since = batch1.description().since().join(batch2.description().since());
852 since = since.join(&compaction_frontier.to_owned());
853
854 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
855
856 let batch1 = &batch1.storage;
857 let batch2 = &batch2.storage;
858
859 OrdKeyMerger {
860 key_cursor1: 0,
861 key_cursor2: 0,
862 result: OrdKeyStorage {
863 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
864 upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
865 },
866 description,
867 staging: UpdsBuilder::default(),
868 }
869 }
870 fn done(self) -> OrdKeyBatch<L> {
871 OrdKeyBatch {
872 updates: self.staging.total(),
873 storage: self.result,
874 description: self.description,
875 value: OrdKeyBatch::<L>::create_value(),
876 }
877 }
878 fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
879
880 let starting_updates = self.staging.total();
882 let mut effort = 0isize;
883
884 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
886 self.merge_key(&source1.storage, &source2.storage);
887 effort = (self.staging.total() - starting_updates) as isize;
889 }
890
891 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
894 self.copy_key(&source1.storage, self.key_cursor1);
895 self.key_cursor1 += 1;
896 effort = (self.staging.total() - starting_updates) as isize;
897 }
898 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
899 self.copy_key(&source2.storage, self.key_cursor2);
900 self.key_cursor2 += 1;
901 effort = (self.staging.total() - starting_updates) as isize;
902 }
903
904 *fuel -= effort;
905 }
906 }
907
908 impl<L: Layout> OrdKeyMerger<L> {
910 fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
918 self.stash_updates_for_key(source, cursor);
919 if self.staging.seal(&mut self.result.upds) {
920 self.result.keys.push_ref(source.keys.index(cursor));
921 }
922 }
923 fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
928 use ::std::cmp::Ordering;
929 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
930 Ordering::Less => {
931 self.copy_key(source1, self.key_cursor1);
932 self.key_cursor1 += 1;
933 },
934 Ordering::Equal => {
935 self.stash_updates_for_key(source1, self.key_cursor1);
937 self.stash_updates_for_key(source2, self.key_cursor2);
938 if self.staging.seal(&mut self.result.upds) {
939 self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
940 }
941 self.key_cursor1 += 1;
943 self.key_cursor2 += 1;
944 },
945 Ordering::Greater => {
946 self.copy_key(source2, self.key_cursor2);
947 self.key_cursor2 += 1;
948 },
949 }
950 }
951
952 fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
954 let (lower, upper) = source.upds.bounds(index);
955 for i in lower .. upper {
956 let (time, diff) = source.upds.get_abs(i);
958 use crate::lattice::Lattice;
959 let mut new_time = L::TimeContainer::into_owned(time);
960 new_time.advance_by(self.description.since().borrow());
961 self.staging.push(new_time, L::DiffContainer::into_owned(diff));
962 }
963 }
964 }
965
966 pub struct OrdKeyCursor<L: Layout> {
968 key_cursor: usize,
970 val_stepped: bool,
972 phantom: PhantomData<L>,
974 }
975
976 use crate::trace::implementations::WithLayout;
977 impl<L: Layout<ValContainer: BatchContainer>> WithLayout for OrdKeyCursor<L> {
978 type Layout = L;
979 }
980
981 impl<L: for<'a> Layout<ValContainer: BatchContainer<Owned: Default>>> Cursor for OrdKeyCursor<L> {
982
983 type Storage = OrdKeyBatch<L>;
984
985 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
986 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
987
988 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
989 fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { storage.value.index(0) }
990 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
991 let (lower, upper) = storage.storage.upds.bounds(self.key_cursor);
992 for index in lower .. upper {
993 let (time, diff) = storage.storage.upds.get_abs(index);
994 logic(time, diff);
995 }
996 }
997 fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
998 fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
999 fn step_key(&mut self, storage: &Self::Storage){
1000 self.key_cursor += 1;
1001 if self.key_valid(storage) {
1002 self.rewind_vals(storage);
1003 }
1004 else {
1005 self.key_cursor = storage.storage.keys.len();
1006 }
1007 }
1008 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1009 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1010 if self.key_valid(storage) {
1011 self.rewind_vals(storage);
1012 }
1013 }
1014 fn step_val(&mut self, _storage: &Self::Storage) {
1015 self.val_stepped = true;
1016 }
1017 fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1018 fn rewind_keys(&mut self, storage: &Self::Storage) {
1019 self.key_cursor = 0;
1020 if self.key_valid(storage) {
1021 self.rewind_vals(storage)
1022 }
1023 }
1024 fn rewind_vals(&mut self, _storage: &Self::Storage) {
1025 self.val_stepped = false;
1026 }
1027 }
1028
1029 pub struct OrdKeyBuilder<L: Layout, CI> {
1031 pub result: OrdKeyStorage<L>,
1035 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
1036 _marker: PhantomData<CI>,
1037 }
1038
1039 impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1040 where
1041 L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1042 L: Layout<ValContainer: BatchContainer<Owned: Default>>,
1043 CI: BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
1044 {
1045
1046 type Input = CI;
1047 type Time = layout::Time<L>;
1048 type Output = OrdKeyBatch<L>;
1049
1050 fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1051 Self {
1052 result: OrdKeyStorage {
1053 keys: L::KeyContainer::with_capacity(keys),
1054 upds: Upds::with_capacity(keys+1, upds),
1055 },
1056 staging: UpdsBuilder::default(),
1057 _marker: PhantomData,
1058 }
1059 }
1060
1061 #[inline]
1062 fn push(&mut self, chunk: &mut Self::Input) {
1063 for item in chunk.drain() {
1064 let (key, _val, time, diff) = CI::into_parts(item);
1065 if self.result.keys.is_empty() {
1066 self.result.keys.push_into(key);
1067 self.staging.push(time, diff);
1068 }
1069 else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1071 self.staging.push(time, diff);
1072 } else {
1073 self.staging.seal(&mut self.result.upds);
1074 self.staging.push(time, diff);
1075 self.result.keys.push_into(key);
1076 }
1077 }
1078 }
1079
1080 #[inline(never)]
1081 fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1082 self.staging.seal(&mut self.result.upds);
1083 OrdKeyBatch {
1084 updates: self.staging.total(),
1085 storage: self.result,
1086 description,
1087 value: OrdKeyBatch::<L>::create_value(),
1088 }
1089 }
1090
1091 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1092 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1093 let mut builder = Self::with_capacity(keys, vals, upds);
1094 for mut chunk in chain.drain(..) {
1095 builder.push(&mut chunk);
1096 }
1097
1098 builder.done(description)
1099 }
1100 }
1101
1102}