1use std::rc::Rc;
9use std::cmp::Ordering;
10
11use serde::{Deserialize, Serialize};
12
13use crate::Hashable;
14use crate::containers::TimelyStack;
15use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker};
16use crate::trace::implementations::merge_batcher::MergeBatcher;
17use crate::trace::implementations::merge_batcher::container::{VecInternalMerger, ColInternalMerger};
18use crate::trace::implementations::spine_fueled::Spine;
19use crate::trace::rc_blanket_impls::RcBuilder;
20
21use super::{Layout, Vector, TStack};
22
23use self::val_batch::{RhhValBatch, RhhValBuilder};
24
25pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
27pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ContainerChunker<Vec<((K,V),T,R)>>, VecInternalMerger<(K, V), T, R>>;
29pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
31
32pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K,V),T,R)>>>>;
37pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColInternalMerger<(K,V),T,R>>;
39pub type ColBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
41
42pub trait HashOrdered: Hashable { }
47
48impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T { }
49
50#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
52pub struct HashWrapper<T: std::hash::Hash + Hashable> {
53 pub inner: T
55}
56
57impl<T: PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> PartialOrd for HashWrapper<T> {
58 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
59 let this_hash = self.inner.hashed();
60 let that_hash = other.inner.hashed();
61 (this_hash, &self.inner).partial_cmp(&(that_hash, &other.inner))
62 }
63}
64
65impl<T: Ord + PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> Ord for HashWrapper<T> {
66 fn cmp(&self, other: &Self) -> Ordering {
67 self.partial_cmp(other).unwrap()
68 }
69}
70
71impl<T: std::hash::Hash + Hashable> HashOrdered for HashWrapper<T> { }
72
73impl<T: std::hash::Hash + Hashable> Hashable for HashWrapper<T> {
74 type Output = T::Output;
75 fn hashed(&self) -> Self::Output { self.inner.hashed() }
76}
77
78impl<T: std::hash::Hash + Hashable> HashOrdered for &HashWrapper<T> { }
79
80impl<T: std::hash::Hash + Hashable> Hashable for &HashWrapper<T> {
81 type Output = T::Output;
82 fn hashed(&self) -> Self::Output { self.inner.hashed() }
83}
84
85mod val_batch {
86
87 use std::convert::TryInto;
88 use std::marker::PhantomData;
89 use serde::{Deserialize, Serialize};
90 use timely::container::PushInto;
91 use timely::progress::{Antichain, frontier::AntichainRef};
92
93 use crate::hashable::Hashable;
94 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
95 use crate::trace::implementations::{BatchContainer, BuilderInput};
96 use crate::trace::implementations::layout;
97
98 use super::{Layout, HashOrdered};
99
100 #[derive(Debug, Serialize, Deserialize)]
115 pub struct RhhValStorage<L: Layout>
116 where
117 layout::Key<L>: Default + HashOrdered,
118 {
119
120 pub key_capacity: usize,
124 pub divisor: u64,
127 pub key_count: usize,
129
130 pub keys: L::KeyContainer,
132 pub keys_offs: L::OffsetContainer,
136 pub vals: L::ValContainer,
138 pub vals_offs: L::OffsetContainer,
147 pub times: L::TimeContainer,
149 pub diffs: L::DiffContainer,
151 }
152
153 impl<L: Layout> RhhValStorage<L>
154 where
155 layout::Key<L>: Default + HashOrdered,
156 for<'a> layout::KeyRef<'a, L>: HashOrdered,
157 {
158 fn values_for_key(&self, index: usize) -> (usize, usize) {
160 let lower = self.keys_offs.index(index);
161 let upper = self.keys_offs.index(index+1);
162 assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
164 (lower, upper)
165 }
166 fn updates_for_value(&self, index: usize) -> (usize, usize) {
168 let mut lower = self.vals_offs.index(index);
169 let upper = self.vals_offs.index(index+1);
170 if lower == upper {
173 assert!(lower > 0);
174 lower -= 1;
175 }
176 (lower, upper)
177 }
178
179 fn insert_key(&mut self, key: layout::KeyRef<'_, L>, offset: Option<usize>) {
189 let desired = self.desired_location(&key);
190 while self.keys.len() < desired {
193 let current_offset = self.keys_offs.index(self.keys.len());
195 self.keys.push_own(&<layout::Key<L> as Default>::default());
196 self.keys_offs.push_ref(current_offset);
197 }
198
199 self.keys.push_ref(key);
202 if let Some(offset) = offset {
203 self.keys_offs.push_ref(offset);
204 }
205 self.key_count += 1;
206 }
207
208 fn insert_key_own(&mut self, key: &layout::Key<L>, offset: Option<usize>) {
210 let mut key_con = L::KeyContainer::with_capacity(1);
211 key_con.push_own(&key);
212 self.insert_key(key_con.index(0), offset)
213 }
214
215 fn desired_location<K: Hashable>(&self, key: &K) -> usize {
217 if self.divisor == 0 { 0 }
218 else {
219 (key.hashed().into() / self.divisor).try_into().expect("divisor not large enough to force u64 into uisze")
220 }
221 }
222
223 fn advance_key(&self, index: usize, key: layout::KeyRef<'_, L>) -> bool {
225 !self.live_key(index) || self.keys.index(index).lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
227 }
228
229 fn live_key(&self, index: usize) -> bool {
231 self.keys_offs.index(index) != self.keys_offs.index(index+1)
232 }
233
234 fn advance_to_live_key(&self, index: &mut usize) {
236 while *index < self.keys.len() && !self.live_key(*index) {
237 *index += 1;
238 }
239 }
240
241 fn divisor_for_capacity(capacity: usize) -> u64 {
248 let capacity: u64 = capacity.try_into().expect("usize exceeds u64");
249 if capacity == 0 || capacity == 1 { 0 }
250 else {
251 ((1 << 63) / capacity) << 1
252 }
253 }
254 }
255
256 #[derive(Serialize, Deserialize)]
261 #[serde(bound = "
262 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
263 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
264 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
265 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
266 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
267 ")]
268 pub struct RhhValBatch<L: Layout>
269 where
270 layout::Key<L>: Default + HashOrdered,
271 {
272 pub storage: RhhValStorage<L>,
274 pub description: Description<layout::Time<L>>,
276 pub updates: usize,
282 }
283
284 impl<L: Layout> WithLayout for RhhValBatch<L>
285 where
286 layout::Key<L>: Default + HashOrdered,
287 for<'a> layout::KeyRef<'a, L>: HashOrdered,
288 {
289 type Layout = L;
290 }
291
292 impl<L: Layout> BatchReader for RhhValBatch<L>
293 where
294 layout::Key<L>: Default + HashOrdered,
295 for<'a> layout::KeyRef<'a, L>: HashOrdered,
296 {
297 type Cursor = RhhValCursor<L>;
298 fn cursor(&self) -> Self::Cursor {
299 let mut cursor = RhhValCursor {
300 key_cursor: 0,
301 val_cursor: 0,
302 phantom: std::marker::PhantomData,
303 };
304 cursor.step_key(self);
305 cursor
306 }
307 fn len(&self) -> usize {
308 self.updates
311 }
312 fn description(&self) -> &Description<layout::Time<L>> { &self.description }
313 }
314
315 impl<L: Layout> Batch for RhhValBatch<L>
316 where
317 layout::Key<L>: Default + HashOrdered,
318 for<'a> layout::KeyRef<'a, L>: HashOrdered,
319 {
320 type Merger = RhhValMerger<L>;
321
322 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
323 RhhValMerger::new(self, other, compaction_frontier)
324 }
325
326 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
327 use timely::progress::Timestamp;
328 Self {
329 storage: RhhValStorage {
330 keys: L::KeyContainer::with_capacity(0),
331 keys_offs: L::OffsetContainer::with_capacity(0),
332 vals: L::ValContainer::with_capacity(0),
333 vals_offs: L::OffsetContainer::with_capacity(0),
334 times: L::TimeContainer::with_capacity(0),
335 diffs: L::DiffContainer::with_capacity(0),
336 key_count: 0,
337 key_capacity: 0,
338 divisor: 0,
339 },
340 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
341 updates: 0,
342 }
343 }
344 }
345
346 pub struct RhhValMerger<L: Layout>
348 where
349 layout::Key<L>: Default + HashOrdered,
350 {
351 key_cursor1: usize,
353 key_cursor2: usize,
355 result: RhhValStorage<L>,
357 description: Description<layout::Time<L>>,
359
360 update_stash: Vec<(layout::Time<L>, layout::Diff<L>)>,
365 singletons: usize,
367 }
368
369 impl<L: Layout> Merger<RhhValBatch<L>> for RhhValMerger<L>
370 where
371 layout::Key<L>: Default + HashOrdered,
372 RhhValBatch<L>: Batch<Time=layout::Time<L>>,
373 for<'a> layout::KeyRef<'a, L>: HashOrdered,
374 {
375 fn new(batch1: &RhhValBatch<L>, batch2: &RhhValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
376
377 assert!(batch1.upper() == batch2.lower());
378 use crate::lattice::Lattice;
379 let mut since = batch1.description().since().join(batch2.description().since());
380 since = since.join(&compaction_frontier.to_owned());
381
382 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
383
384 let max_cap = batch1.len() + batch2.len();
387 let rhh_cap = 2 * max_cap;
388
389 let batch1 = &batch1.storage;
390 let batch2 = &batch2.storage;
391
392 let mut storage = RhhValStorage {
393 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
394 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
395 vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
396 vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
397 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
398 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
399 key_count: 0,
400 key_capacity: rhh_cap,
401 divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
402 };
403
404 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
406 keys_offs.push_ref(0);
407 let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
408 vals_offs.push_ref(0);
409
410 RhhValMerger {
411 key_cursor1: 0,
412 key_cursor2: 0,
413 result: storage,
414 description,
415 update_stash: Vec::new(),
416 singletons: 0,
417 }
418 }
419 fn done(self) -> RhhValBatch<L> {
420 RhhValBatch {
421 updates: self.result.times.len() + self.singletons,
422 storage: self.result,
423 description: self.description,
424 }
425 }
426 fn work(&mut self, source1: &RhhValBatch<L>, source2: &RhhValBatch<L>, fuel: &mut isize) {
427
428 let starting_updates = self.result.times.len();
430 let mut effort = 0isize;
431
432 source1.storage.advance_to_live_key(&mut self.key_cursor1);
433 source2.storage.advance_to_live_key(&mut self.key_cursor2);
434
435 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
437 self.merge_key(&source1.storage, &source2.storage);
438 source1.storage.advance_to_live_key(&mut self.key_cursor1);
439 source2.storage.advance_to_live_key(&mut self.key_cursor2);
440 effort = (self.result.times.len() - starting_updates) as isize;
442 }
443
444 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
447 self.copy_key(&source1.storage, self.key_cursor1);
448 self.key_cursor1 += 1;
449 source1.storage.advance_to_live_key(&mut self.key_cursor1);
450 effort = (self.result.times.len() - starting_updates) as isize;
451 }
452 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
453 self.copy_key(&source2.storage, self.key_cursor2);
454 self.key_cursor2 += 1;
455 source2.storage.advance_to_live_key(&mut self.key_cursor2);
456 effort = (self.result.times.len() - starting_updates) as isize;
457 }
458
459 *fuel -= effort;
460 }
461 }
462
463 impl<L: Layout> RhhValMerger<L>
465 where
466 layout::Key<L>: Default + HashOrdered,
467 for<'a> layout::KeyRef<'a, L>: HashOrdered,
468 {
469 fn copy_key(&mut self, source: &RhhValStorage<L>, cursor: usize) {
477 let init_vals = self.result.vals.len();
479 let (mut lower, upper) = source.values_for_key(cursor);
480 while lower < upper {
481 self.stash_updates_for_val(source, lower);
482 if let Some(off) = self.consolidate_updates() {
483 self.result.vals_offs.push_ref(off);
484 self.result.vals.push_ref(source.vals.index(lower));
485 }
486 lower += 1;
487 }
488
489 if self.result.vals.len() > init_vals {
491 self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len()));
492 }
493 }
494 fn merge_key(&mut self, source1: &RhhValStorage<L>, source2: &RhhValStorage<L>) {
499
500 use ::std::cmp::Ordering;
501 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
502 Ordering::Less => {
503 self.copy_key(source1, self.key_cursor1);
504 self.key_cursor1 += 1;
505 },
506 Ordering::Equal => {
507 let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
509 let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
510 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
511 self.result.insert_key(source1.keys.index(self.key_cursor1), Some(off));
512 }
513 self.key_cursor1 += 1;
515 self.key_cursor2 += 1;
516 },
517 Ordering::Greater => {
518 self.copy_key(source2, self.key_cursor2);
519 self.key_cursor2 += 1;
520 },
521 }
522 }
523 fn merge_vals(
528 &mut self,
529 (source1, mut lower1, upper1): (&RhhValStorage<L>, usize, usize),
530 (source2, mut lower2, upper2): (&RhhValStorage<L>, usize, usize),
531 ) -> Option<usize> {
532 let init_vals = self.result.vals.len();
534 while lower1 < upper1 && lower2 < upper2 {
535 use ::std::cmp::Ordering;
539 match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
540 Ordering::Less => {
541 self.stash_updates_for_val(source1, lower1);
543 if let Some(off) = self.consolidate_updates() {
544 self.result.vals_offs.push_ref(off);
545 self.result.vals.push_ref(source1.vals.index(lower1));
546 }
547 lower1 += 1;
548 },
549 Ordering::Equal => {
550 self.stash_updates_for_val(source1, lower1);
551 self.stash_updates_for_val(source2, lower2);
552 if let Some(off) = self.consolidate_updates() {
553 self.result.vals_offs.push_ref(off);
554 self.result.vals.push_ref(source1.vals.index(lower1));
555 }
556 lower1 += 1;
557 lower2 += 1;
558 },
559 Ordering::Greater => {
560 self.stash_updates_for_val(source2, lower2);
562 if let Some(off) = self.consolidate_updates() {
563 self.result.vals_offs.push_ref(off);
564 self.result.vals.push_ref(source2.vals.index(lower2));
565 }
566 lower2 += 1;
567 },
568 }
569 }
570 while lower1 < upper1 {
572 self.stash_updates_for_val(source1, lower1);
573 if let Some(off) = self.consolidate_updates() {
574 self.result.vals_offs.push_ref(off);
575 self.result.vals.push_ref(source1.vals.index(lower1));
576 }
577 lower1 += 1;
578 }
579 while lower2 < upper2 {
580 self.stash_updates_for_val(source2, lower2);
581 if let Some(off) = self.consolidate_updates() {
582 self.result.vals_offs.push_ref(off);
583 self.result.vals.push_ref(source2.vals.index(lower2));
584 }
585 lower2 += 1;
586 }
587
588 if self.result.vals.len() > init_vals {
590 Some(self.result.vals.len())
591 } else {
592 None
593 }
594 }
595
596 fn stash_updates_for_val(&mut self, source: &RhhValStorage<L>, index: usize) {
598 let (lower, upper) = source.updates_for_value(index);
599 for i in lower .. upper {
600 let time = source.times.index(i);
602 let diff = source.diffs.index(i);
603 let mut new_time = L::TimeContainer::into_owned(time);
604 use crate::lattice::Lattice;
605 new_time.advance_by(self.description.since().borrow());
606 self.update_stash.push((new_time, L::DiffContainer::into_owned(diff)));
607 }
608 }
609
610 fn consolidate_updates(&mut self) -> Option<usize> {
612 use crate::consolidation;
613 consolidation::consolidate(&mut self.update_stash);
614 if !self.update_stash.is_empty() {
615 let time_diff = self.result.times.last().zip(self.result.diffs.last());
618 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
619 *t1 == L::TimeContainer::into_owned(t2) && *d1 == L::DiffContainer::into_owned(d2)
621 });
622 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
623 self.update_stash.clear();
625 self.singletons += 1;
626 }
627 else {
628 for (time, diff) in self.update_stash.drain(..) {
630 self.result.times.push_own(&time);
631 self.result.diffs.push_own(&diff);
632 }
633 }
634 Some(self.result.times.len())
635 } else {
636 None
637 }
638 }
639 }
640
641
642 pub struct RhhValCursor<L: Layout>
650 where
651 layout::Key<L>: Default + HashOrdered,
652 {
653 key_cursor: usize,
655 val_cursor: usize,
657 phantom: PhantomData<L>,
659 }
660
661 use crate::trace::implementations::WithLayout;
662 impl<L: Layout> WithLayout for RhhValCursor<L>
663 where
664 layout::Key<L>: Default + HashOrdered,
665 for<'a> layout::KeyRef<'a, L>: HashOrdered,
666 {
667 type Layout = L;
668 }
669
670 impl<L: Layout> Cursor for RhhValCursor<L>
671 where
672 layout::Key<L>: Default + HashOrdered,
673 for<'a> layout::KeyRef<'a, L>: HashOrdered,
674 {
675 type Storage = RhhValBatch<L>;
676
677 fn get_key<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
678 fn get_val<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Val<'a>> { if self.val_valid(storage) { storage.storage.vals.get(self.val_cursor) } else { None } }
679 fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
680 fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
681 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
682 let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
683 for index in lower .. upper {
684 let time = storage.storage.times.index(index);
685 let diff = storage.storage.diffs.index(index);
686 logic(time, diff);
687 }
688 }
689 fn key_valid(&self, storage: &RhhValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
690 fn val_valid(&self, storage: &RhhValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
691 fn step_key(&mut self, storage: &RhhValBatch<L>){
692 self.key_cursor += 1;
694 storage.storage.advance_to_live_key(&mut self.key_cursor);
695
696 if self.key_valid(storage) {
697 self.rewind_vals(storage);
698 }
699 else {
700 self.key_cursor = storage.storage.keys.len();
701 }
702 }
703 fn seek_key(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'_>) {
704 let desired = storage.storage.desired_location(&key);
706 if self.key_cursor < desired {
708 self.key_cursor = desired;
709 }
710 while self.key_valid(storage) && storage.storage.advance_key(self.key_cursor, key) {
714 self.key_cursor += 1;
717 }
718
719 if self.key_valid(storage) {
720 self.rewind_vals(storage);
721 }
722 }
723 fn step_val(&mut self, storage: &RhhValBatch<L>) {
724 self.val_cursor += 1;
725 if !self.val_valid(storage) {
726 self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
727 }
728 }
729 fn seek_val(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'_>) {
730 self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
731 }
732 fn rewind_keys(&mut self, storage: &RhhValBatch<L>) {
733 self.key_cursor = 0;
734 storage.storage.advance_to_live_key(&mut self.key_cursor);
735
736 if self.key_valid(storage) {
737 self.rewind_vals(storage)
738 }
739 }
740 fn rewind_vals(&mut self, storage: &RhhValBatch<L>) {
741 self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
742 }
743 }
744
745 pub struct RhhValBuilder<L: Layout, CI>
747 where
748 layout::Key<L>: Default + HashOrdered,
749 {
750 result: RhhValStorage<L>,
751 singleton: Option<(layout::Time<L>, layout::Diff<L>)>,
752 singletons: usize,
757 _marker: PhantomData<CI>,
758 }
759
760 impl<L: Layout, CI> RhhValBuilder<L, CI>
761 where
762 layout::Key<L>: Default + HashOrdered,
763 {
764 fn push_update(&mut self, time: layout::Time<L>, diff: layout::Diff<L>) {
776 if self.result.times.last().map(|t| L::TimeContainer::into_owned(t) == time).unwrap_or(false) && self.result.diffs.last().map(|d| L::DiffContainer::into_owned(d) == diff).unwrap_or(false) {
779 assert!(self.singleton.is_none());
780 self.singleton = Some((time, diff));
781 }
782 else {
783 if let Some((time, diff)) = self.singleton.take() {
785 self.result.times.push_own(&time);
786 self.result.diffs.push_own(&diff);
787 }
788 self.result.times.push_own(&time);
789 self.result.diffs.push_own(&diff);
790 }
791 }
792 }
793
794 impl<L: Layout, CI> Builder for RhhValBuilder<L, CI>
795 where
796 layout::Key<L>: Default + HashOrdered,
797 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Key<'a> = layout::Key<L>, Time=layout::Time<L>, Diff=layout::Diff<L>>,
798 for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
799 for<'a> layout::KeyRef<'a, L>: HashOrdered,
800 {
801 type Input = CI;
802 type Time = layout::Time<L>;
803 type Output = RhhValBatch<L>;
804
805 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
806
807 let rhh_capacity = 2 * keys;
809 let divisor = RhhValStorage::<L>::divisor_for_capacity(rhh_capacity);
810 let keys = rhh_capacity + 10;
814
815 Self {
817 result: RhhValStorage {
818 keys: L::KeyContainer::with_capacity(keys),
819 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
820 vals: L::ValContainer::with_capacity(vals),
821 vals_offs: L::OffsetContainer::with_capacity(vals + 1),
822 times: L::TimeContainer::with_capacity(upds),
823 diffs: L::DiffContainer::with_capacity(upds),
824 key_count: 0,
825 key_capacity: rhh_capacity,
826 divisor,
827 },
828 singleton: None,
829 singletons: 0,
830 _marker: PhantomData,
831 }
832 }
833
834 #[inline]
835 fn push(&mut self, chunk: &mut Self::Input) {
836 for item in chunk.drain() {
837 let (key, val, time, diff) = CI::into_parts(item);
838 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
840 if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
842 self.push_update(time, diff);
843 } else {
844 self.result.vals_offs.push_ref(self.result.times.len());
846 if self.singleton.take().is_some() { self.singletons += 1; }
847 self.push_update(time, diff);
848 self.result.vals.push_into(val);
849 }
850 } else {
851 self.result.vals_offs.push_ref(self.result.times.len());
853 if self.singleton.take().is_some() { self.singletons += 1; }
854 self.result.keys_offs.push_ref(self.result.vals.len());
855 self.push_update(time, diff);
856 self.result.vals.push_into(val);
857 self.result.insert_key_own(&key, None);
859 }
860 }
861 }
862
863 #[inline(never)]
864 fn done(mut self, description: Description<Self::Time>) -> RhhValBatch<L> {
865 self.result.vals_offs.push_ref(self.result.times.len());
867 if self.singleton.take().is_some() { self.singletons += 1; }
869 self.result.keys_offs.push_ref(self.result.vals.len());
870 RhhValBatch {
871 updates: self.result.times.len() + self.singletons,
872 storage: self.result,
873 description,
874 }
875 }
876
877 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
878 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
879 let mut builder = Self::with_capacity(keys, vals, upds);
880 for mut chunk in chain.drain(..) {
881 builder.push(&mut chunk);
882 }
883
884 builder.done(description)
885 }
886 }
887
888}
889
890mod key_batch {
891
892 }