1use std::cmp::Ordering;
9use std::rc::Rc;
10
11use serde::{Deserialize, Serialize};
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::merge_batcher::{ColMerger, MergeBatcher, VecMerger};
16use crate::trace::implementations::spine_fueled::Spine;
17use crate::trace::rc_blanket_impls::RcBuilder;
18use crate::Hashable;
19
20use super::{Layout, TStack, Vector};
21
22use self::val_batch::{RhhValBatch, RhhValBuilder};
23
24pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K, V), T, R)>>>>;
26pub type VecBatcher<K, V, T, R> =
28 MergeBatcher<Vec<((K, V), T, R)>, VecChunker<((K, V), T, R)>, VecMerger<(K, V), T, R>>;
29pub type VecBuilder<K, V, T, R> =
31 RcBuilder<RhhValBuilder<Vector<((K, V), T, R)>, Vec<((K, V), T, R)>>>;
32
33pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K, V), T, R)>>>>;
38pub type ColBatcher<K, V, T, R> =
40 MergeBatcher<Vec<((K, V), T, R)>, ColumnationChunker<((K, V), T, R)>, ColMerger<(K, V), T, R>>;
41pub type ColBuilder<K, V, T, R> =
43 RcBuilder<RhhValBuilder<TStack<((K, V), T, R)>, TimelyStack<((K, V), T, R)>>>;
44
45pub trait HashOrdered: Hashable {}
50
51impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T {}
52
53#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
55pub struct HashWrapper<T: std::hash::Hash + Hashable> {
56 pub inner: T,
58}
59
60impl<T: PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> PartialOrd for HashWrapper<T> {
61 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
62 let this_hash = self.inner.hashed();
63 let that_hash = other.inner.hashed();
64 (this_hash, &self.inner).partial_cmp(&(that_hash, &other.inner))
65 }
66}
67
68impl<T: Ord + PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> Ord for HashWrapper<T> {
69 fn cmp(&self, other: &Self) -> Ordering {
70 self.partial_cmp(other).unwrap()
71 }
72}
73
74impl<T: std::hash::Hash + Hashable> HashOrdered for HashWrapper<T> {}
75
76impl<T: std::hash::Hash + Hashable> Hashable for HashWrapper<T> {
77 type Output = T::Output;
78 fn hashed(&self) -> Self::Output {
79 self.inner.hashed()
80 }
81}
82
83impl<T: std::hash::Hash + Hashable> HashOrdered for &HashWrapper<T> {}
84
85impl<T: std::hash::Hash + Hashable> Hashable for &HashWrapper<T> {
86 type Output = T::Output;
87 fn hashed(&self) -> Self::Output {
88 self.inner.hashed()
89 }
90}
91
92mod val_batch {
93
94 use serde::{Deserialize, Serialize};
95 use std::convert::TryInto;
96 use std::marker::PhantomData;
97 use timely::container::PushInto;
98 use timely::progress::{frontier::AntichainRef, Antichain};
99
100 use crate::hashable::Hashable;
101 use crate::trace::implementations::layout;
102 use crate::trace::implementations::{BatchContainer, BuilderInput};
103 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
104
105 use super::{HashOrdered, Layout};
106
107 #[derive(Debug, Serialize, Deserialize)]
122 pub struct RhhValStorage<L: Layout>
123 where
124 layout::Key<L>: Default + HashOrdered,
125 {
126 pub key_capacity: usize,
130 pub divisor: u64,
133 pub key_count: usize,
135
136 pub keys: L::KeyContainer,
138 pub keys_offs: L::OffsetContainer,
142 pub vals: L::ValContainer,
144 pub vals_offs: L::OffsetContainer,
153 pub times: L::TimeContainer,
155 pub diffs: L::DiffContainer,
157 }
158
159 impl<L: Layout> RhhValStorage<L>
160 where
161 layout::Key<L>: Default + HashOrdered,
162 for<'a> layout::KeyRef<'a, L>: HashOrdered,
163 {
164 fn values_for_key(&self, index: usize) -> (usize, usize) {
166 let lower = self.keys_offs.index(index);
167 let upper = self.keys_offs.index(index + 1);
168 assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
170 (lower, upper)
171 }
172 fn updates_for_value(&self, index: usize) -> (usize, usize) {
174 let mut lower = self.vals_offs.index(index);
175 let upper = self.vals_offs.index(index + 1);
176 if lower == upper {
179 assert!(lower > 0);
180 lower -= 1;
181 }
182 (lower, upper)
183 }
184
185 fn insert_key(&mut self, key: layout::KeyRef<'_, L>, offset: Option<usize>) {
195 let desired = self.desired_location(&key);
196 while self.keys.len() < desired {
199 let current_offset = self.keys_offs.index(self.keys.len());
201 self.keys.push_own(&<layout::Key<L> as Default>::default());
202 self.keys_offs.push_ref(current_offset);
203 }
204
205 self.keys.push_ref(key);
208 if let Some(offset) = offset {
209 self.keys_offs.push_ref(offset);
210 }
211 self.key_count += 1;
212 }
213
214 fn insert_key_own(&mut self, key: &layout::Key<L>, offset: Option<usize>) {
216 let mut key_con = L::KeyContainer::with_capacity(1);
217 key_con.push_own(&key);
218 self.insert_key(key_con.index(0), offset)
219 }
220
221 fn desired_location<K: Hashable>(&self, key: &K) -> usize {
223 if self.divisor == 0 {
224 0
225 } else {
226 (key.hashed().into() / self.divisor)
227 .try_into()
228 .expect("divisor not large enough to force u64 into uisze")
229 }
230 }
231
232 fn advance_key(&self, index: usize, key: layout::KeyRef<'_, L>) -> bool {
234 !self.live_key(index)
236 || self
237 .keys
238 .index(index)
239 .lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
240 }
241
242 fn live_key(&self, index: usize) -> bool {
244 self.keys_offs.index(index) != self.keys_offs.index(index + 1)
245 }
246
247 fn advance_to_live_key(&self, index: &mut usize) {
249 while *index < self.keys.len() && !self.live_key(*index) {
250 *index += 1;
251 }
252 }
253
254 fn divisor_for_capacity(capacity: usize) -> u64 {
261 let capacity: u64 = capacity.try_into().expect("usize exceeds u64");
262 if capacity == 0 || capacity == 1 {
263 0
264 } else {
265 ((1 << 63) / capacity) << 1
266 }
267 }
268 }
269
270 #[derive(Serialize, Deserialize)]
275 #[serde(bound = "
276 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
277 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
278 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
279 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
280 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
281 ")]
282 pub struct RhhValBatch<L: Layout>
283 where
284 layout::Key<L>: Default + HashOrdered,
285 {
286 pub storage: RhhValStorage<L>,
288 pub description: Description<layout::Time<L>>,
290 pub updates: usize,
296 }
297
298 impl<L: Layout> WithLayout for RhhValBatch<L>
299 where
300 layout::Key<L>: Default + HashOrdered,
301 for<'a> layout::KeyRef<'a, L>: HashOrdered,
302 {
303 type Layout = L;
304 }
305
306 impl<L: Layout> BatchReader for RhhValBatch<L>
307 where
308 layout::Key<L>: Default + HashOrdered,
309 for<'a> layout::KeyRef<'a, L>: HashOrdered,
310 {
311 type Cursor = RhhValCursor<L>;
312 fn cursor(&self) -> Self::Cursor {
313 let mut cursor = RhhValCursor {
314 key_cursor: 0,
315 val_cursor: 0,
316 phantom: std::marker::PhantomData,
317 };
318 cursor.step_key(self);
319 cursor
320 }
321 fn len(&self) -> usize {
322 self.updates
325 }
326 fn description(&self) -> &Description<layout::Time<L>> {
327 &self.description
328 }
329 }
330
331 impl<L: Layout> Batch for RhhValBatch<L>
332 where
333 layout::Key<L>: Default + HashOrdered,
334 for<'a> layout::KeyRef<'a, L>: HashOrdered,
335 {
336 type Merger = RhhValMerger<L>;
337
338 fn begin_merge(
339 &self,
340 other: &Self,
341 compaction_frontier: AntichainRef<layout::Time<L>>,
342 ) -> Self::Merger {
343 RhhValMerger::new(self, other, compaction_frontier)
344 }
345
346 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
347 use timely::progress::Timestamp;
348 Self {
349 storage: RhhValStorage {
350 keys: L::KeyContainer::with_capacity(0),
351 keys_offs: L::OffsetContainer::with_capacity(0),
352 vals: L::ValContainer::with_capacity(0),
353 vals_offs: L::OffsetContainer::with_capacity(0),
354 times: L::TimeContainer::with_capacity(0),
355 diffs: L::DiffContainer::with_capacity(0),
356 key_count: 0,
357 key_capacity: 0,
358 divisor: 0,
359 },
360 description: Description::new(
361 lower,
362 upper,
363 Antichain::from_elem(Self::Time::minimum()),
364 ),
365 updates: 0,
366 }
367 }
368 }
369
370 pub struct RhhValMerger<L: Layout>
372 where
373 layout::Key<L>: Default + HashOrdered,
374 {
375 key_cursor1: usize,
377 key_cursor2: usize,
379 result: RhhValStorage<L>,
381 description: Description<layout::Time<L>>,
383
384 update_stash: Vec<(layout::Time<L>, layout::Diff<L>)>,
389 singletons: usize,
391 }
392
393 impl<L: Layout> Merger<RhhValBatch<L>> for RhhValMerger<L>
394 where
395 layout::Key<L>: Default + HashOrdered,
396 RhhValBatch<L>: Batch<Time = layout::Time<L>>,
397 for<'a> layout::KeyRef<'a, L>: HashOrdered,
398 {
399 fn new(
400 batch1: &RhhValBatch<L>,
401 batch2: &RhhValBatch<L>,
402 compaction_frontier: AntichainRef<layout::Time<L>>,
403 ) -> Self {
404 assert!(batch1.upper() == batch2.lower());
405 use crate::lattice::Lattice;
406 let mut since = batch1
407 .description()
408 .since()
409 .join(batch2.description().since());
410 since = since.join(&compaction_frontier.to_owned());
411
412 let description =
413 Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
414
415 let max_cap = batch1.len() + batch2.len();
418 let rhh_cap = 2 * max_cap;
419
420 let batch1 = &batch1.storage;
421 let batch2 = &batch2.storage;
422
423 let mut storage = RhhValStorage {
424 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
425 keys_offs: L::OffsetContainer::with_capacity(
426 batch1.keys_offs.len() + batch2.keys_offs.len(),
427 ),
428 vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
429 vals_offs: L::OffsetContainer::with_capacity(
430 batch1.vals_offs.len() + batch2.vals_offs.len(),
431 ),
432 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
433 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
434 key_count: 0,
435 key_capacity: rhh_cap,
436 divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
437 };
438
439 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
441 keys_offs.push_ref(0);
442 let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
443 vals_offs.push_ref(0);
444
445 RhhValMerger {
446 key_cursor1: 0,
447 key_cursor2: 0,
448 result: storage,
449 description,
450 update_stash: Vec::new(),
451 singletons: 0,
452 }
453 }
454 fn done(self) -> RhhValBatch<L> {
455 RhhValBatch {
456 updates: self.result.times.len() + self.singletons,
457 storage: self.result,
458 description: self.description,
459 }
460 }
461 fn work(&mut self, source1: &RhhValBatch<L>, source2: &RhhValBatch<L>, fuel: &mut isize) {
462 let starting_updates = self.result.times.len();
464 let mut effort = 0isize;
465
466 source1.storage.advance_to_live_key(&mut self.key_cursor1);
467 source2.storage.advance_to_live_key(&mut self.key_cursor2);
468
469 while self.key_cursor1 < source1.storage.keys.len()
471 && self.key_cursor2 < source2.storage.keys.len()
472 && effort < *fuel
473 {
474 self.merge_key(&source1.storage, &source2.storage);
475 source1.storage.advance_to_live_key(&mut self.key_cursor1);
476 source2.storage.advance_to_live_key(&mut self.key_cursor2);
477 effort = (self.result.times.len() - starting_updates) as isize;
479 }
480
481 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
484 self.copy_key(&source1.storage, self.key_cursor1);
485 self.key_cursor1 += 1;
486 source1.storage.advance_to_live_key(&mut self.key_cursor1);
487 effort = (self.result.times.len() - starting_updates) as isize;
488 }
489 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
490 self.copy_key(&source2.storage, self.key_cursor2);
491 self.key_cursor2 += 1;
492 source2.storage.advance_to_live_key(&mut self.key_cursor2);
493 effort = (self.result.times.len() - starting_updates) as isize;
494 }
495
496 *fuel -= effort;
497 }
498 }
499
500 impl<L: Layout> RhhValMerger<L>
502 where
503 layout::Key<L>: Default + HashOrdered,
504 for<'a> layout::KeyRef<'a, L>: HashOrdered,
505 {
506 fn copy_key(&mut self, source: &RhhValStorage<L>, cursor: usize) {
514 let init_vals = self.result.vals.len();
516 let (mut lower, upper) = source.values_for_key(cursor);
517 while lower < upper {
518 self.stash_updates_for_val(source, lower);
519 if let Some(off) = self.consolidate_updates() {
520 self.result.vals_offs.push_ref(off);
521 self.result.vals.push_ref(source.vals.index(lower));
522 }
523 lower += 1;
524 }
525
526 if self.result.vals.len() > init_vals {
528 self.result
529 .insert_key(source.keys.index(cursor), Some(self.result.vals.len()));
530 }
531 }
532 fn merge_key(&mut self, source1: &RhhValStorage<L>, source2: &RhhValStorage<L>) {
537 use ::std::cmp::Ordering;
538 match source1
539 .keys
540 .index(self.key_cursor1)
541 .cmp(&source2.keys.index(self.key_cursor2))
542 {
543 Ordering::Less => {
544 self.copy_key(source1, self.key_cursor1);
545 self.key_cursor1 += 1;
546 }
547 Ordering::Equal => {
548 let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
550 let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
551 if let Some(off) =
552 self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2))
553 {
554 self.result
555 .insert_key(source1.keys.index(self.key_cursor1), Some(off));
556 }
557 self.key_cursor1 += 1;
559 self.key_cursor2 += 1;
560 }
561 Ordering::Greater => {
562 self.copy_key(source2, self.key_cursor2);
563 self.key_cursor2 += 1;
564 }
565 }
566 }
567 fn merge_vals(
572 &mut self,
573 (source1, mut lower1, upper1): (&RhhValStorage<L>, usize, usize),
574 (source2, mut lower2, upper2): (&RhhValStorage<L>, usize, usize),
575 ) -> Option<usize> {
576 let init_vals = self.result.vals.len();
578 while lower1 < upper1 && lower2 < upper2 {
579 use ::std::cmp::Ordering;
583 match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
584 Ordering::Less => {
585 self.stash_updates_for_val(source1, lower1);
587 if let Some(off) = self.consolidate_updates() {
588 self.result.vals_offs.push_ref(off);
589 self.result.vals.push_ref(source1.vals.index(lower1));
590 }
591 lower1 += 1;
592 }
593 Ordering::Equal => {
594 self.stash_updates_for_val(source1, lower1);
595 self.stash_updates_for_val(source2, lower2);
596 if let Some(off) = self.consolidate_updates() {
597 self.result.vals_offs.push_ref(off);
598 self.result.vals.push_ref(source1.vals.index(lower1));
599 }
600 lower1 += 1;
601 lower2 += 1;
602 }
603 Ordering::Greater => {
604 self.stash_updates_for_val(source2, lower2);
606 if let Some(off) = self.consolidate_updates() {
607 self.result.vals_offs.push_ref(off);
608 self.result.vals.push_ref(source2.vals.index(lower2));
609 }
610 lower2 += 1;
611 }
612 }
613 }
614 while lower1 < upper1 {
616 self.stash_updates_for_val(source1, lower1);
617 if let Some(off) = self.consolidate_updates() {
618 self.result.vals_offs.push_ref(off);
619 self.result.vals.push_ref(source1.vals.index(lower1));
620 }
621 lower1 += 1;
622 }
623 while lower2 < upper2 {
624 self.stash_updates_for_val(source2, lower2);
625 if let Some(off) = self.consolidate_updates() {
626 self.result.vals_offs.push_ref(off);
627 self.result.vals.push_ref(source2.vals.index(lower2));
628 }
629 lower2 += 1;
630 }
631
632 if self.result.vals.len() > init_vals {
634 Some(self.result.vals.len())
635 } else {
636 None
637 }
638 }
639
640 fn stash_updates_for_val(&mut self, source: &RhhValStorage<L>, index: usize) {
642 let (lower, upper) = source.updates_for_value(index);
643 for i in lower..upper {
644 let time = source.times.index(i);
646 let diff = source.diffs.index(i);
647 let mut new_time = L::TimeContainer::into_owned(time);
648 use crate::lattice::Lattice;
649 new_time.advance_by(self.description.since().borrow());
650 self.update_stash
651 .push((new_time, L::DiffContainer::into_owned(diff)));
652 }
653 }
654
655 fn consolidate_updates(&mut self) -> Option<usize> {
657 use crate::consolidation;
658 consolidation::consolidate(&mut self.update_stash);
659 if !self.update_stash.is_empty() {
660 let time_diff = self.result.times.last().zip(self.result.diffs.last());
663 let last_eq =
664 self.update_stash
665 .last()
666 .zip(time_diff)
667 .map(|((t1, d1), (t2, d2))| {
668 *t1 == L::TimeContainer::into_owned(t2)
670 && *d1 == L::DiffContainer::into_owned(d2)
671 });
672 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
673 self.update_stash.clear();
675 self.singletons += 1;
676 } else {
677 for (time, diff) in self.update_stash.drain(..) {
679 self.result.times.push_own(&time);
680 self.result.diffs.push_own(&diff);
681 }
682 }
683 Some(self.result.times.len())
684 } else {
685 None
686 }
687 }
688 }
689
690 pub struct RhhValCursor<L: Layout>
698 where
699 layout::Key<L>: Default + HashOrdered,
700 {
701 key_cursor: usize,
703 val_cursor: usize,
705 phantom: PhantomData<L>,
707 }
708
709 use crate::trace::implementations::WithLayout;
710 impl<L: Layout> WithLayout for RhhValCursor<L>
711 where
712 layout::Key<L>: Default + HashOrdered,
713 for<'a> layout::KeyRef<'a, L>: HashOrdered,
714 {
715 type Layout = L;
716 }
717
718 impl<L: Layout> Cursor for RhhValCursor<L>
719 where
720 layout::Key<L>: Default + HashOrdered,
721 for<'a> layout::KeyRef<'a, L>: HashOrdered,
722 {
723 type Storage = RhhValBatch<L>;
724
725 fn get_key<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Key<'a>> {
726 storage.storage.keys.get(self.key_cursor)
727 }
728 fn get_val<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Val<'a>> {
729 if self.val_valid(storage) {
730 storage.storage.vals.get(self.val_cursor)
731 } else {
732 None
733 }
734 }
735 fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> {
736 storage.storage.keys.index(self.key_cursor)
737 }
738 fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> {
739 storage.storage.vals.index(self.val_cursor)
740 }
741 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
742 &mut self,
743 storage: &RhhValBatch<L>,
744 mut logic: L2,
745 ) {
746 let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
747 for index in lower..upper {
748 let time = storage.storage.times.index(index);
749 let diff = storage.storage.diffs.index(index);
750 logic(time, diff);
751 }
752 }
753 fn key_valid(&self, storage: &RhhValBatch<L>) -> bool {
754 self.key_cursor < storage.storage.keys.len()
755 }
756 fn val_valid(&self, storage: &RhhValBatch<L>) -> bool {
757 self.val_cursor < storage.storage.values_for_key(self.key_cursor).1
758 }
759 fn step_key(&mut self, storage: &RhhValBatch<L>) {
760 self.key_cursor += 1;
762 storage.storage.advance_to_live_key(&mut self.key_cursor);
763
764 if self.key_valid(storage) {
765 self.rewind_vals(storage);
766 } else {
767 self.key_cursor = storage.storage.keys.len();
768 }
769 }
770 fn seek_key(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'_>) {
771 let desired = storage.storage.desired_location(&key);
773 if self.key_cursor < desired {
775 self.key_cursor = desired;
776 }
777 while self.key_valid(storage) && storage.storage.advance_key(self.key_cursor, key) {
781 self.key_cursor += 1;
784 }
785
786 if self.key_valid(storage) {
787 self.rewind_vals(storage);
788 }
789 }
790 fn step_val(&mut self, storage: &RhhValBatch<L>) {
791 self.val_cursor += 1;
792 if !self.val_valid(storage) {
793 self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
794 }
795 }
796 fn seek_val(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'_>) {
797 self.val_cursor += storage.storage.vals.advance(
798 self.val_cursor,
799 storage.storage.values_for_key(self.key_cursor).1,
800 |x| {
801 <L::ValContainer as BatchContainer>::reborrow(x)
802 .lt(&<L::ValContainer as BatchContainer>::reborrow(val))
803 },
804 );
805 }
806 fn rewind_keys(&mut self, storage: &RhhValBatch<L>) {
807 self.key_cursor = 0;
808 storage.storage.advance_to_live_key(&mut self.key_cursor);
809
810 if self.key_valid(storage) {
811 self.rewind_vals(storage)
812 }
813 }
814 fn rewind_vals(&mut self, storage: &RhhValBatch<L>) {
815 self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
816 }
817 }
818
819 pub struct RhhValBuilder<L: Layout, CI>
821 where
822 layout::Key<L>: Default + HashOrdered,
823 {
824 result: RhhValStorage<L>,
825 singleton: Option<(layout::Time<L>, layout::Diff<L>)>,
826 singletons: usize,
831 _marker: PhantomData<CI>,
832 }
833
834 impl<L: Layout, CI> RhhValBuilder<L, CI>
835 where
836 layout::Key<L>: Default + HashOrdered,
837 {
838 fn push_update(&mut self, time: layout::Time<L>, diff: layout::Diff<L>) {
850 if self
853 .result
854 .times
855 .last()
856 .map(|t| L::TimeContainer::into_owned(t) == time)
857 .unwrap_or(false)
858 && self
859 .result
860 .diffs
861 .last()
862 .map(|d| L::DiffContainer::into_owned(d) == diff)
863 .unwrap_or(false)
864 {
865 assert!(self.singleton.is_none());
866 self.singleton = Some((time, diff));
867 } else {
868 if let Some((time, diff)) = self.singleton.take() {
870 self.result.times.push_own(&time);
871 self.result.diffs.push_own(&diff);
872 }
873 self.result.times.push_own(&time);
874 self.result.diffs.push_own(&diff);
875 }
876 }
877 }
878
879 impl<L: Layout, CI> Builder for RhhValBuilder<L, CI>
880 where
881 layout::Key<L>: Default + HashOrdered,
882 CI: for<'a> BuilderInput<
883 L::KeyContainer,
884 L::ValContainer,
885 Key<'a> = layout::Key<L>,
886 Time = layout::Time<L>,
887 Diff = layout::Diff<L>,
888 >,
889 for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
890 for<'a> layout::KeyRef<'a, L>: HashOrdered,
891 {
892 type Input = CI;
893 type Time = layout::Time<L>;
894 type Output = RhhValBatch<L>;
895
896 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
897 let rhh_capacity = 2 * keys;
899 let divisor = RhhValStorage::<L>::divisor_for_capacity(rhh_capacity);
900 let keys = rhh_capacity + 10;
904
905 Self {
907 result: RhhValStorage {
908 keys: L::KeyContainer::with_capacity(keys),
909 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
910 vals: L::ValContainer::with_capacity(vals),
911 vals_offs: L::OffsetContainer::with_capacity(vals + 1),
912 times: L::TimeContainer::with_capacity(upds),
913 diffs: L::DiffContainer::with_capacity(upds),
914 key_count: 0,
915 key_capacity: rhh_capacity,
916 divisor,
917 },
918 singleton: None,
919 singletons: 0,
920 _marker: PhantomData,
921 }
922 }
923
924 #[inline]
925 fn push(&mut self, chunk: &mut Self::Input) {
926 for item in chunk.drain() {
927 let (key, val, time, diff) = CI::into_parts(item);
928 if self
930 .result
931 .keys
932 .last()
933 .map(|k| CI::key_eq(&key, k))
934 .unwrap_or(false)
935 {
936 if self
938 .result
939 .vals
940 .last()
941 .map(|v| CI::val_eq(&val, v))
942 .unwrap_or(false)
943 {
944 self.push_update(time, diff);
945 } else {
946 self.result.vals_offs.push_ref(self.result.times.len());
948 if self.singleton.take().is_some() {
949 self.singletons += 1;
950 }
951 self.push_update(time, diff);
952 self.result.vals.push_into(val);
953 }
954 } else {
955 self.result.vals_offs.push_ref(self.result.times.len());
957 if self.singleton.take().is_some() {
958 self.singletons += 1;
959 }
960 self.result.keys_offs.push_ref(self.result.vals.len());
961 self.push_update(time, diff);
962 self.result.vals.push_into(val);
963 self.result.insert_key_own(&key, None);
965 }
966 }
967 }
968
969 #[inline(never)]
970 fn done(mut self, description: Description<Self::Time>) -> RhhValBatch<L> {
971 self.result.vals_offs.push_ref(self.result.times.len());
973 if self.singleton.take().is_some() {
975 self.singletons += 1;
976 }
977 self.result.keys_offs.push_ref(self.result.vals.len());
978 RhhValBatch {
979 updates: self.result.times.len() + self.singletons,
980 storage: self.result,
981 description,
982 }
983 }
984
985 fn seal(
986 chain: &mut Vec<Self::Input>,
987 description: Description<Self::Time>,
988 ) -> Self::Output {
989 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
990 let mut builder = Self::with_capacity(keys, vals, upds);
991 for mut chunk in chain.drain(..) {
992 builder.push(&mut chunk);
993 }
994
995 builder.done(description)
996 }
997 }
998}
999
1000mod key_batch {
1001
1002 }