1use std::rc::Rc;
12
13use crate::trace::implementations::spine_fueled::Spine;
14use crate::trace::implementations::merge_batcher::MergeBatcher;
15use crate::trace::implementations::merge_batcher_col::ColumnatedMergeBatcher;
16use crate::trace::rc_blanket_impls::RcBuilder;
17
18use super::{Update, Layout, Vector, TStack, Preferred};
19
20pub use self::val_batch::{OrdValBatch, OrdValBuilder};
21pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
22
23pub type OrdValSpine<K, V, T, R> = Spine<
25 Rc<OrdValBatch<Vector<((K,V),T,R)>>>,
26 MergeBatcher<K,V,T,R>,
27 RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>>>,
28>;
29pub type ColValSpine<K, V, T, R> = Spine<
34 Rc<OrdValBatch<TStack<((K,V),T,R)>>>,
35 ColumnatedMergeBatcher<K,V,T,R>,
36 RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>>>,
37>;
38
39pub type OrdKeySpine<K, T, R> = Spine<
41 Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>,
42 MergeBatcher<K,(),T,R>,
43 RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>>>,
44>;
45pub type ColKeySpine<K, T, R> = Spine<
50 Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>,
51 ColumnatedMergeBatcher<K,(),T,R>,
52 RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>>>,
53>;
54
55pub type PreferredSpine<K, V, T, R> = Spine<
57 Rc<OrdValBatch<Preferred<K,V,T,R>>>,
58 ColumnatedMergeBatcher<<K as ToOwned>::Owned,<V as ToOwned>::Owned,T,R>,
59 RcBuilder<OrdValBuilder<Preferred<K,V,T,R>>>,
60>;
61
62
63mod val_batch {
67
68 use std::marker::PhantomData;
69 use abomonation_derive::Abomonation;
70 use timely::progress::{Antichain, frontier::AntichainRef};
71
72 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
73 use crate::trace::implementations::{BatchContainer, OffsetList};
74 use crate::trace::cursor::MyTrait;
75
76 use super::{Layout, Update};
77
78 #[derive(Abomonation, Debug)]
80 pub struct OrdValStorage<L: Layout> {
81 pub keys: L::KeyContainer,
83 pub keys_offs: OffsetList,
87 pub vals: L::ValContainer,
89 pub vals_offs: OffsetList,
98 pub updates: L::UpdContainer,
100 }
101
102 impl<L: Layout> OrdValStorage<L> {
103 fn values_for_key(&self, index: usize) -> (usize, usize) {
105 (self.keys_offs.index(index), self.keys_offs.index(index+1))
106 }
107 fn updates_for_value(&self, index: usize) -> (usize, usize) {
109 let mut lower = self.vals_offs.index(index);
110 let upper = self.vals_offs.index(index+1);
111 if lower == upper {
114 assert!(lower > 0);
115 lower -= 1;
116 }
117 (lower, upper)
118 }
119 }
120
121 #[derive(Abomonation)]
126 pub struct OrdValBatch<L: Layout> {
127 pub storage: OrdValStorage<L>,
129 pub description: Description<<L::Target as Update>::Time>,
131 pub updates: usize,
137 }
138
139 impl<L: Layout> BatchReader for OrdValBatch<L> {
140 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
141 type KeyOwned = <L::Target as Update>::Key;
142 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
143 type ValOwned = <L::Target as Update>::Val;
144 type Time = <L::Target as Update>::Time;
145 type Diff = <L::Target as Update>::Diff;
146
147 type Cursor = OrdValCursor<L>;
148 fn cursor(&self) -> Self::Cursor {
149 OrdValCursor {
150 key_cursor: 0,
151 val_cursor: 0,
152 phantom: std::marker::PhantomData,
153 }
154 }
155 fn len(&self) -> usize {
156 self.updates
159 }
160 fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
161 }
162
163 impl<L: Layout> Batch for OrdValBatch<L> {
164 type Merger = OrdValMerger<L>;
165
166 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
167 OrdValMerger::new(self, other, compaction_frontier)
168 }
169 }
170
171 pub struct OrdValMerger<L: Layout> {
173 key_cursor1: usize,
175 key_cursor2: usize,
177 result: OrdValStorage<L>,
179 description: Description<<L::Target as Update>::Time>,
181
182 update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
187 singletons: usize,
189 }
190
191 impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
192 where
193 OrdValBatch<L>: Batch<Time=<L::Target as Update>::Time>
194 {
195 fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
196
197 assert!(batch1.upper() == batch2.lower());
198 use crate::lattice::Lattice;
199 let mut since = batch1.description().since().join(batch2.description().since());
200 since = since.join(&compaction_frontier.to_owned());
201
202 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
203
204 let batch1 = &batch1.storage;
205 let batch2 = &batch2.storage;
206
207 let mut storage = OrdValStorage {
208 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
209 keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
210 vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
211 vals_offs: OffsetList::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
212 updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates),
213 };
214
215 storage.keys_offs.push(0);
216 storage.vals_offs.push(0);
217
218 OrdValMerger {
219 key_cursor1: 0,
220 key_cursor2: 0,
221 result: storage,
222 description,
223 update_stash: Vec::new(),
224 singletons: 0,
225 }
226 }
227 fn done(self) -> OrdValBatch<L> {
228 OrdValBatch {
229 updates: self.result.updates.len() + self.singletons,
230 storage: self.result,
231 description: self.description,
232 }
233 }
234 fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
235
236 let starting_updates = self.result.updates.len();
238 let mut effort = 0isize;
239
240 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
242 self.merge_key(&source1.storage, &source2.storage);
243 effort = (self.result.updates.len() - starting_updates) as isize;
245 }
246
247 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
250 self.copy_key(&source1.storage, self.key_cursor1);
251 self.key_cursor1 += 1;
252 effort = (self.result.updates.len() - starting_updates) as isize;
253 }
254 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
255 self.copy_key(&source2.storage, self.key_cursor2);
256 self.key_cursor2 += 1;
257 effort = (self.result.updates.len() - starting_updates) as isize;
258 }
259
260 *fuel -= effort;
261 }
262 }
263
264 impl<L: Layout> OrdValMerger<L> {
266 fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
274 let init_vals = self.result.vals.len();
276 let (mut lower, upper) = source.values_for_key(cursor);
277 while lower < upper {
278 self.stash_updates_for_val(source, lower);
279 if let Some(off) = self.consolidate_updates() {
280 self.result.vals_offs.push(off);
281 self.result.vals.copy(source.vals.index(lower));
282 }
283 lower += 1;
284 }
285
286 if self.result.vals.len() > init_vals {
288 self.result.keys.copy(source.keys.index(cursor));
289 self.result.keys_offs.push(self.result.vals.len());
290 }
291 }
292 fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
297 use ::std::cmp::Ordering;
298 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
299 Ordering::Less => {
300 self.copy_key(source1, self.key_cursor1);
301 self.key_cursor1 += 1;
302 },
303 Ordering::Equal => {
304 let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
306 let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
307 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
308 self.result.keys.copy(source1.keys.index(self.key_cursor1));
309 self.result.keys_offs.push(off);
310 }
311 self.key_cursor1 += 1;
313 self.key_cursor2 += 1;
314 },
315 Ordering::Greater => {
316 self.copy_key(source2, self.key_cursor2);
317 self.key_cursor2 += 1;
318 },
319 }
320 }
321 fn merge_vals(
326 &mut self,
327 (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
328 (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
329 ) -> Option<usize> {
330 let init_vals = self.result.vals.len();
332 while lower1 < upper1 && lower2 < upper2 {
333 use ::std::cmp::Ordering;
337 match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
338 Ordering::Less => {
339 self.stash_updates_for_val(source1, lower1);
341 if let Some(off) = self.consolidate_updates() {
342 self.result.vals_offs.push(off);
343 self.result.vals.copy(source1.vals.index(lower1));
344 }
345 lower1 += 1;
346 },
347 Ordering::Equal => {
348 self.stash_updates_for_val(source1, lower1);
349 self.stash_updates_for_val(source2, lower2);
350 if let Some(off) = self.consolidate_updates() {
351 self.result.vals_offs.push(off);
352 self.result.vals.copy(source1.vals.index(lower1));
353 }
354 lower1 += 1;
355 lower2 += 1;
356 },
357 Ordering::Greater => {
358 self.stash_updates_for_val(source2, lower2);
360 if let Some(off) = self.consolidate_updates() {
361 self.result.vals_offs.push(off);
362 self.result.vals.copy(source2.vals.index(lower2));
363 }
364 lower2 += 1;
365 },
366 }
367 }
368 while lower1 < upper1 {
370 self.stash_updates_for_val(source1, lower1);
371 if let Some(off) = self.consolidate_updates() {
372 self.result.vals_offs.push(off);
373 self.result.vals.copy(source1.vals.index(lower1));
374 }
375 lower1 += 1;
376 }
377 while lower2 < upper2 {
378 self.stash_updates_for_val(source2, lower2);
379 if let Some(off) = self.consolidate_updates() {
380 self.result.vals_offs.push(off);
381 self.result.vals.copy(source2.vals.index(lower2));
382 }
383 lower2 += 1;
384 }
385
386 if self.result.vals.len() > init_vals {
388 Some(self.result.vals.len())
389 } else {
390 None
391 }
392 }
393
394 fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
396 let (lower, upper) = source.updates_for_value(index);
397 for i in lower .. upper {
398 let (time, diff) = &source.updates.index(i).into_owned();
400 use crate::lattice::Lattice;
401 let mut new_time = time.clone();
402 new_time.advance_by(self.description.since().borrow());
403 self.update_stash.push((new_time, diff.clone()));
404 }
405 }
406
407 fn consolidate_updates(&mut self) -> Option<usize> {
409 use crate::consolidation;
410 consolidation::consolidate(&mut self.update_stash);
411 if !self.update_stash.is_empty() {
412 if self.update_stash.len() == 1 && self.result.updates.last().map(|ud| self.update_stash.last().unwrap().equals(ud)).unwrap_or(false) {
415 self.update_stash.clear();
417 self.singletons += 1;
418 }
419 else {
420 for item in self.update_stash.drain(..) {
422 self.result.updates.push(item);
423 }
424 }
425 Some(self.result.updates.len())
426 } else {
427 None
428 }
429 }
430 }
431
432 pub struct OrdValCursor<L: Layout> {
434 key_cursor: usize,
436 val_cursor: usize,
438 phantom: PhantomData<L>,
440 }
441
442 impl<L: Layout> Cursor for OrdValCursor<L> {
443
444 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
445 type KeyOwned = <L::Target as Update>::Key;
446 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
447 type ValOwned = <L::Target as Update>::Val;
448 type Time = <L::Target as Update>::Time;
449 type Diff = <L::Target as Update>::Diff;
450
451 type Storage = OrdValBatch<L>;
452
453 fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
454 fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
455 fn map_times<L2: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
456 let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
457 for index in lower .. upper {
458 let (time, diff) = &storage.storage.updates.index(index);
459 logic(time, diff);
460 }
461 }
462 fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
463 fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
464 fn step_key(&mut self, storage: &OrdValBatch<L>){
465 self.key_cursor += 1;
466 if self.key_valid(storage) {
467 self.rewind_vals(storage);
468 }
469 else {
470 self.key_cursor = storage.storage.keys.len();
471 }
472 }
473 fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
474 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(&key));
475 if self.key_valid(storage) {
476 self.rewind_vals(storage);
477 }
478 }
479 fn step_val(&mut self, storage: &OrdValBatch<L>) {
480 self.val_cursor += 1;
481 if !self.val_valid(storage) {
482 self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
483 }
484 }
485 fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
486 self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(&val));
487 }
488 fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
489 self.key_cursor = 0;
490 if self.key_valid(storage) {
491 self.rewind_vals(storage)
492 }
493 }
494 fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
495 self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
496 }
497 }
498
499 pub struct OrdValBuilder<L: Layout> {
501 result: OrdValStorage<L>,
502 singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
503 singletons: usize,
508 }
509
510 impl<L: Layout> OrdValBuilder<L> {
511 fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
523 if self.result.updates.last().map(|(t, d)| t == &time && d == &diff) == Some(true) {
525 assert!(self.singleton.is_none());
526 self.singleton = Some((time, diff));
527 }
528 else {
529 if let Some(time_diff) = self.singleton.take() {
531 self.result.updates.push(time_diff);
532 }
533 self.result.updates.push((time, diff));
534 }
535 }
536 }
537
538 impl<L: Layout> Builder for OrdValBuilder<L> {
539
540 type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
541 type Time = <L::Target as Update>::Time;
542 type Output = OrdValBatch<L>;
543
544 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
545 Self {
547 result: OrdValStorage {
548 keys: L::KeyContainer::with_capacity(keys),
549 keys_offs: OffsetList::with_capacity(keys + 1),
550 vals: L::ValContainer::with_capacity(vals),
551 vals_offs: OffsetList::with_capacity(vals + 1),
552 updates: L::UpdContainer::with_capacity(upds),
553 },
554 singleton: None,
555 singletons: 0,
556 }
557 }
558
559 #[inline]
560 fn push(&mut self, ((key, val), time, diff): Self::Item) {
561
562 if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
564 if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) {
566 self.push_update(time, diff);
567 } else {
568 self.result.vals_offs.push(self.result.updates.len());
570 if self.singleton.take().is_some() { self.singletons += 1; }
571 self.push_update(time, diff);
572 self.result.vals.push(val);
573 }
574 } else {
575 self.result.vals_offs.push(self.result.updates.len());
577 if self.singleton.take().is_some() { self.singletons += 1; }
578 self.result.keys_offs.push(self.result.vals.len());
579 self.push_update(time, diff);
580 self.result.vals.push(val);
581 self.result.keys.push(key);
582 }
583 }
584
585 #[inline]
586 fn copy(&mut self, ((key, val), time, diff): &Self::Item) {
587
588 if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {
590 if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) {
592 self.push_update(time.clone(), diff.clone());
595 } else {
596 self.result.vals_offs.push(self.result.updates.len());
598 if self.singleton.take().is_some() { self.singletons += 1; }
600 self.push_update(time.clone(), diff.clone());
601 self.result.vals.copy_push(val);
602 }
603 } else {
604 self.result.vals_offs.push(self.result.updates.len());
606 if self.singleton.take().is_some() { self.singletons += 1; }
608 self.result.keys_offs.push(self.result.vals.len());
609 self.push_update(time.clone(), diff.clone());
610 self.result.vals.copy_push(val);
611 self.result.keys.copy_push(key);
612 }
613 }
614
615 #[inline(never)]
616 fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> OrdValBatch<L> {
617 self.result.vals_offs.push(self.result.updates.len());
619 if self.singleton.take().is_some() { self.singletons += 1; }
621 self.result.keys_offs.push(self.result.vals.len());
622 OrdValBatch {
623 updates: self.result.updates.len() + self.singletons,
624 storage: self.result,
625 description: Description::new(lower, upper, since),
626 }
627 }
628 }
629
630}
631
632mod key_batch {
633
634 use std::marker::PhantomData;
635 use abomonation_derive::Abomonation;
636 use timely::progress::{Antichain, frontier::AntichainRef};
637
638 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
639 use crate::trace::implementations::{BatchContainer, OffsetList};
640 use crate::trace::cursor::MyTrait;
641
642 use super::{Layout, Update};
643
644 #[derive(Abomonation, Debug)]
646 pub struct OrdKeyStorage<L: Layout> {
647 pub keys: L::KeyContainer,
649 pub keys_offs: OffsetList,
658 pub updates: L::UpdContainer,
660 }
661
662 impl<L: Layout> OrdKeyStorage<L> {
663 fn updates_for_key(&self, index: usize) -> (usize, usize) {
665 let mut lower = self.keys_offs.index(index);
666 let upper = self.keys_offs.index(index+1);
667 if lower == upper {
670 assert!(lower > 0);
671 lower -= 1;
672 }
673 (lower, upper)
674 }
675 }
676
677 #[derive(Abomonation)]
682 pub struct OrdKeyBatch<L: Layout> {
683 pub storage: OrdKeyStorage<L>,
685 pub description: Description<<L::Target as Update>::Time>,
687 pub updates: usize,
693 }
694
695 impl<L: Layout> BatchReader for OrdKeyBatch<L> {
696
697 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
698 type KeyOwned = <L::Target as Update>::Key;
699 type Val<'a> = &'a ();
700 type ValOwned = ();
701 type Time = <L::Target as Update>::Time;
702 type Diff = <L::Target as Update>::Diff;
703
704 type Cursor = OrdKeyCursor<L>;
705 fn cursor(&self) -> Self::Cursor {
706 OrdKeyCursor {
707 key_cursor: 0,
708 val_stepped: false,
709 phantom: std::marker::PhantomData,
710 }
711 }
712 fn len(&self) -> usize {
713 self.updates
716 }
717 fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
718 }
719
720 impl<L: Layout> Batch for OrdKeyBatch<L> {
721 type Merger = OrdKeyMerger<L>;
722
723 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
724 OrdKeyMerger::new(self, other, compaction_frontier)
725 }
726 }
727
728 pub struct OrdKeyMerger<L: Layout> {
730 key_cursor1: usize,
732 key_cursor2: usize,
734 result: OrdKeyStorage<L>,
736 description: Description<<L::Target as Update>::Time>,
738
739 update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
744 singletons: usize,
746 }
747
748 impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
749 where
750 OrdKeyBatch<L>: Batch<Time=<L::Target as Update>::Time>
751 {
752 fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
753
754 assert!(batch1.upper() == batch2.lower());
755 use crate::lattice::Lattice;
756 let mut since = batch1.description().since().join(batch2.description().since());
757 since = since.join(&compaction_frontier.to_owned());
758
759 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
760
761 let batch1 = &batch1.storage;
762 let batch2 = &batch2.storage;
763
764 let mut storage = OrdKeyStorage {
765 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
766 keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
767 updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates),
768 };
769
770 storage.keys_offs.push(0);
771
772 OrdKeyMerger {
773 key_cursor1: 0,
774 key_cursor2: 0,
775 result: storage,
776 description,
777 update_stash: Vec::new(),
778 singletons: 0,
779 }
780 }
781 fn done(self) -> OrdKeyBatch<L> {
782 OrdKeyBatch {
783 updates: self.result.updates.len() + self.singletons,
784 storage: self.result,
785 description: self.description,
786 }
787 }
788 fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
789
790 let starting_updates = self.result.updates.len();
792 let mut effort = 0isize;
793
794 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
796 self.merge_key(&source1.storage, &source2.storage);
797 effort = (self.result.updates.len() - starting_updates) as isize;
799 }
800
801 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
804 self.copy_key(&source1.storage, self.key_cursor1);
805 self.key_cursor1 += 1;
806 effort = (self.result.updates.len() - starting_updates) as isize;
807 }
808 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
809 self.copy_key(&source2.storage, self.key_cursor2);
810 self.key_cursor2 += 1;
811 effort = (self.result.updates.len() - starting_updates) as isize;
812 }
813
814 *fuel -= effort;
815 }
816 }
817
818 impl<L: Layout> OrdKeyMerger<L> {
820 fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
828 self.stash_updates_for_key(source, cursor);
829 if let Some(off) = self.consolidate_updates() {
830 self.result.keys_offs.push(off);
831 self.result.keys.copy(source.keys.index(cursor));
832 }
833 }
834 fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
839 use ::std::cmp::Ordering;
840 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
841 Ordering::Less => {
842 self.copy_key(source1, self.key_cursor1);
843 self.key_cursor1 += 1;
844 },
845 Ordering::Equal => {
846 self.stash_updates_for_key(source1, self.key_cursor1);
848 self.stash_updates_for_key(source2, self.key_cursor2);
849 if let Some(off) = self.consolidate_updates() {
850 self.result.keys_offs.push(off);
851 self.result.keys.copy(source1.keys.index(self.key_cursor1));
852 }
853 self.key_cursor1 += 1;
855 self.key_cursor2 += 1;
856 },
857 Ordering::Greater => {
858 self.copy_key(source2, self.key_cursor2);
859 self.key_cursor2 += 1;
860 },
861 }
862 }
863
864 fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
866 let (lower, upper) = source.updates_for_key(index);
867 for i in lower .. upper {
868 let (time, diff) = &source.updates.index(i);
870 use crate::lattice::Lattice;
871 let mut new_time = time.clone();
872 new_time.advance_by(self.description.since().borrow());
873 self.update_stash.push((new_time, diff.clone()));
874 }
875 }
876
877 fn consolidate_updates(&mut self) -> Option<usize> {
879 use crate::consolidation;
880 consolidation::consolidate(&mut self.update_stash);
881 if !self.update_stash.is_empty() {
882 if self.update_stash.len() == 1 && self.update_stash.last() == self.result.updates.last() {
885 self.update_stash.clear();
887 self.singletons += 1;
888 }
889 else {
890 for item in self.update_stash.drain(..) {
892 self.result.updates.push(item);
893 }
894 }
895 Some(self.result.updates.len())
896 } else {
897 None
898 }
899 }
900 }
901
902 pub struct OrdKeyCursor<L: Layout> {
904 key_cursor: usize,
906 val_stepped: bool,
908 phantom: PhantomData<L>,
910 }
911
912 impl<L: Layout> Cursor for OrdKeyCursor<L> {
913
914 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
915 type KeyOwned = <L::Target as Update>::Key;
916 type Val<'a> = &'a ();
917 type ValOwned = ();
918 type Time = <L::Target as Update>::Time;
919 type Diff = <L::Target as Update>::Diff;
920
921 type Storage = OrdKeyBatch<L>;
922
923 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
924 fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
925 fn map_times<L2: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L2) {
926 let (lower, upper) = storage.storage.updates_for_key(self.key_cursor);
927 for index in lower .. upper {
928 let (time, diff) = &storage.storage.updates.index(index);
929 logic(time, diff);
930 }
931 }
932 fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
933 fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
934 fn step_key(&mut self, storage: &Self::Storage){
935 self.key_cursor += 1;
936 if self.key_valid(storage) {
937 self.rewind_vals(storage);
938 }
939 else {
940 self.key_cursor = storage.storage.keys.len();
941 }
942 }
943 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
944 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(&key));
945 if self.key_valid(storage) {
946 self.rewind_vals(storage);
947 }
948 }
949 fn step_val(&mut self, _storage: &Self::Storage) {
950 self.val_stepped = true;
951 }
952 fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
953 fn rewind_keys(&mut self, storage: &Self::Storage) {
954 self.key_cursor = 0;
955 if self.key_valid(storage) {
956 self.rewind_vals(storage)
957 }
958 }
959 fn rewind_vals(&mut self, _storage: &Self::Storage) {
960 self.val_stepped = false;
961 }
962 }
963
964 pub struct OrdKeyBuilder<L: Layout> {
966 result: OrdKeyStorage<L>,
967 singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
968 singletons: usize,
973 }
974
975 impl<L: Layout> OrdKeyBuilder<L> {
976 fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
988 if self.result.updates.last().map(|(t, d)| t == &time && d == &diff) == Some(true) {
990 assert!(self.singleton.is_none());
991 self.singleton = Some((time, diff));
992 }
993 else {
994 if let Some(time_diff) = self.singleton.take() {
996 self.result.updates.push(time_diff);
997 }
998 self.result.updates.push((time, diff));
999 }
1000 }
1001 }
1002
1003 impl<L: Layout> Builder for OrdKeyBuilder<L> {
1004
1005 type Item = ((<L::Target as Update>::Key, ()), <L::Target as Update>::Time, <L::Target as Update>::Diff);
1006 type Time = <L::Target as Update>::Time;
1007 type Output = OrdKeyBatch<L>;
1008
1009 fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1010 Self {
1012 result: OrdKeyStorage {
1013 keys: L::KeyContainer::with_capacity(keys),
1014 keys_offs: OffsetList::with_capacity(keys + 1),
1015 updates: L::UpdContainer::with_capacity(upds),
1016 },
1017 singleton: None,
1018 singletons: 0,
1019 }
1020 }
1021
1022 #[inline]
1023 fn push(&mut self, ((key, ()), time, diff): Self::Item) {
1024
1025 if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
1027 self.push_update(time, diff);
1028 } else {
1029 self.result.keys_offs.push(self.result.updates.len());
1031 if self.singleton.take().is_some() { self.singletons += 1; }
1033 self.push_update(time, diff);
1034 self.result.keys.push(key);
1035 }
1036 }
1037
1038 #[inline]
1039 fn copy(&mut self, ((key, ()), time, diff): &Self::Item) {
1040
1041 if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {
1043 self.push_update(time.clone(), diff.clone());
1044 } else {
1045 self.result.keys_offs.push(self.result.updates.len());
1047 if self.singleton.take().is_some() { self.singletons += 1; }
1049 self.push_update(time.clone(), diff.clone());
1050 self.result.keys.copy_push(key);
1051 }
1052 }
1053
1054 #[inline(never)]
1055 fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> OrdKeyBatch<L> {
1056 self.result.keys_offs.push(self.result.updates.len());
1058 if self.singleton.take().is_some() { self.singletons += 1; }
1060 OrdKeyBatch {
1061 updates: self.result.updates.len() + self.singletons,
1062 storage: self.result,
1063 description: Description::new(lower, upper, since),
1064 }
1065 }
1066 }
1067
1068}