1use std::rc::Rc;
9use std::cmp::Ordering;
10
11use serde::{Deserialize, Serialize};
12
13use crate::Hashable;
14use crate::trace::implementations::chunker::ContainerChunker;
15use crate::trace::implementations::merge_batcher::MergeBatcher;
16use crate::trace::implementations::merge_batcher::container::VecInternalMerger;
17use crate::trace::implementations::spine_fueled::Spine;
18use crate::trace::rc_blanket_impls::RcBuilder;
19
20use super::{Layout, Vector};
21
22use self::val_batch::{RhhValBatch, RhhValBuilder};
23
24pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
26pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ContainerChunker<Vec<((K,V),T,R)>>, VecInternalMerger<(K, V), T, R>>;
28pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31pub trait HashOrdered: Hashable { }
36
37impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T { }
38
39#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
41pub struct HashWrapper<T: std::hash::Hash + Hashable> {
42 pub inner: T
44}
45
46impl<T: PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> PartialOrd for HashWrapper<T> {
47 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
48 let this_hash = self.inner.hashed();
49 let that_hash = other.inner.hashed();
50 (this_hash, &self.inner).partial_cmp(&(that_hash, &other.inner))
51 }
52}
53
54impl<T: Ord + PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> Ord for HashWrapper<T> {
55 fn cmp(&self, other: &Self) -> Ordering {
56 self.partial_cmp(other).unwrap()
57 }
58}
59
60impl<T: std::hash::Hash + Hashable> HashOrdered for HashWrapper<T> { }
61
62impl<T: std::hash::Hash + Hashable> Hashable for HashWrapper<T> {
63 type Output = T::Output;
64 fn hashed(&self) -> Self::Output { self.inner.hashed() }
65}
66
67impl<T: std::hash::Hash + Hashable> HashOrdered for &HashWrapper<T> { }
68
69impl<T: std::hash::Hash + Hashable> Hashable for &HashWrapper<T> {
70 type Output = T::Output;
71 fn hashed(&self) -> Self::Output { self.inner.hashed() }
72}
73
74mod val_batch {
75
76 use std::convert::TryInto;
77 use std::marker::PhantomData;
78 use serde::{Deserialize, Serialize};
79 use timely::container::PushInto;
80 use timely::progress::{Antichain, frontier::AntichainRef};
81
82 use crate::hashable::Hashable;
83 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
84 use crate::trace::implementations::{BatchContainer, BuilderInput};
85 use crate::trace::implementations::layout;
86
87 use super::{Layout, HashOrdered};
88
89 #[derive(Debug, Serialize, Deserialize)]
104 pub struct RhhValStorage<L: Layout>
105 where
106 layout::Key<L>: Default + HashOrdered,
107 {
108
109 pub key_capacity: usize,
113 pub divisor: u64,
116 pub key_count: usize,
118
119 pub keys: L::KeyContainer,
121 pub keys_offs: L::OffsetContainer,
125 pub vals: L::ValContainer,
127 pub vals_offs: L::OffsetContainer,
136 pub times: L::TimeContainer,
138 pub diffs: L::DiffContainer,
140 }
141
142 impl<L: Layout> RhhValStorage<L>
143 where
144 layout::Key<L>: Default + HashOrdered,
145 for<'a> layout::KeyRef<'a, L>: HashOrdered,
146 {
147 fn values_for_key(&self, index: usize) -> (usize, usize) {
149 let lower = self.keys_offs.index(index);
150 let upper = self.keys_offs.index(index+1);
151 assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
153 (lower, upper)
154 }
155 fn updates_for_value(&self, index: usize) -> (usize, usize) {
157 let mut lower = self.vals_offs.index(index);
158 let upper = self.vals_offs.index(index+1);
159 if lower == upper {
162 assert!(lower > 0);
163 lower -= 1;
164 }
165 (lower, upper)
166 }
167
168 fn insert_key(&mut self, key: layout::KeyRef<'_, L>, offset: Option<usize>) {
178 let desired = self.desired_location(&key);
179 while self.keys.len() < desired {
182 let current_offset = self.keys_offs.index(self.keys.len());
184 self.keys.push_own(&<layout::Key<L> as Default>::default());
185 self.keys_offs.push_ref(current_offset);
186 }
187
188 self.keys.push_ref(key);
191 if let Some(offset) = offset {
192 self.keys_offs.push_ref(offset);
193 }
194 self.key_count += 1;
195 }
196
197 fn insert_key_own(&mut self, key: &layout::Key<L>, offset: Option<usize>) {
199 let mut key_con = L::KeyContainer::with_capacity(1);
200 key_con.push_own(&key);
201 self.insert_key(key_con.index(0), offset)
202 }
203
204 fn desired_location<K: Hashable>(&self, key: &K) -> usize {
206 if self.divisor == 0 { 0 }
207 else {
208 (key.hashed().into() / self.divisor).try_into().expect("divisor not large enough to force u64 into uisze")
209 }
210 }
211
212 fn advance_key(&self, index: usize, key: layout::KeyRef<'_, L>) -> bool {
214 !self.live_key(index) || self.keys.index(index).lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
216 }
217
218 fn live_key(&self, index: usize) -> bool {
220 self.keys_offs.index(index) != self.keys_offs.index(index+1)
221 }
222
223 fn advance_to_live_key(&self, index: &mut usize) {
225 while *index < self.keys.len() && !self.live_key(*index) {
226 *index += 1;
227 }
228 }
229
230 fn divisor_for_capacity(capacity: usize) -> u64 {
237 let capacity: u64 = capacity.try_into().expect("usize exceeds u64");
238 if capacity == 0 || capacity == 1 { 0 }
239 else {
240 ((1 << 63) / capacity) << 1
241 }
242 }
243 }
244
245 #[derive(Serialize, Deserialize)]
250 #[serde(bound = "
251 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
252 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
253 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
254 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
255 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
256 ")]
257 pub struct RhhValBatch<L: Layout>
258 where
259 layout::Key<L>: Default + HashOrdered,
260 {
261 pub storage: RhhValStorage<L>,
263 pub description: Description<layout::Time<L>>,
265 pub updates: usize,
271 }
272
273 impl<L: Layout> WithLayout for RhhValBatch<L>
274 where
275 layout::Key<L>: Default + HashOrdered,
276 for<'a> layout::KeyRef<'a, L>: HashOrdered,
277 {
278 type Layout = L;
279 }
280
281 impl<L: Layout> BatchReader for RhhValBatch<L>
282 where
283 layout::Key<L>: Default + HashOrdered,
284 for<'a> layout::KeyRef<'a, L>: HashOrdered,
285 {
286 type Cursor = RhhValCursor<L>;
287 fn cursor(&self) -> Self::Cursor {
288 let mut cursor = RhhValCursor {
289 key_cursor: 0,
290 val_cursor: 0,
291 phantom: std::marker::PhantomData,
292 };
293 cursor.step_key(self);
294 cursor
295 }
296 fn len(&self) -> usize {
297 self.updates
300 }
301 fn description(&self) -> &Description<layout::Time<L>> { &self.description }
302 }
303
304 impl<L: Layout> Batch for RhhValBatch<L>
305 where
306 layout::Key<L>: Default + HashOrdered,
307 for<'a> layout::KeyRef<'a, L>: HashOrdered,
308 {
309 type Merger = RhhValMerger<L>;
310
311 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
312 RhhValMerger::new(self, other, compaction_frontier)
313 }
314
315 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
316 use timely::progress::Timestamp;
317 Self {
318 storage: RhhValStorage {
319 keys: L::KeyContainer::with_capacity(0),
320 keys_offs: L::OffsetContainer::with_capacity(0),
321 vals: L::ValContainer::with_capacity(0),
322 vals_offs: L::OffsetContainer::with_capacity(0),
323 times: L::TimeContainer::with_capacity(0),
324 diffs: L::DiffContainer::with_capacity(0),
325 key_count: 0,
326 key_capacity: 0,
327 divisor: 0,
328 },
329 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
330 updates: 0,
331 }
332 }
333 }
334
335 pub struct RhhValMerger<L: Layout>
337 where
338 layout::Key<L>: Default + HashOrdered,
339 {
340 key_cursor1: usize,
342 key_cursor2: usize,
344 result: RhhValStorage<L>,
346 description: Description<layout::Time<L>>,
348
349 update_stash: Vec<(layout::Time<L>, layout::Diff<L>)>,
354 singletons: usize,
356 }
357
358 impl<L: Layout> Merger<RhhValBatch<L>> for RhhValMerger<L>
359 where
360 layout::Key<L>: Default + HashOrdered,
361 RhhValBatch<L>: Batch<Time=layout::Time<L>>,
362 for<'a> layout::KeyRef<'a, L>: HashOrdered,
363 {
364 fn new(batch1: &RhhValBatch<L>, batch2: &RhhValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
365
366 assert!(batch1.upper() == batch2.lower());
367 use crate::lattice::Lattice;
368 let mut since = batch1.description().since().join(batch2.description().since());
369 since = since.join(&compaction_frontier.to_owned());
370
371 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
372
373 let max_cap = batch1.len() + batch2.len();
376 let rhh_cap = 2 * max_cap;
377
378 let batch1 = &batch1.storage;
379 let batch2 = &batch2.storage;
380
381 let mut storage = RhhValStorage {
382 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
383 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
384 vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
385 vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
386 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
387 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
388 key_count: 0,
389 key_capacity: rhh_cap,
390 divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
391 };
392
393 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
395 keys_offs.push_ref(0);
396 let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
397 vals_offs.push_ref(0);
398
399 RhhValMerger {
400 key_cursor1: 0,
401 key_cursor2: 0,
402 result: storage,
403 description,
404 update_stash: Vec::new(),
405 singletons: 0,
406 }
407 }
408 fn done(self) -> RhhValBatch<L> {
409 RhhValBatch {
410 updates: self.result.times.len() + self.singletons,
411 storage: self.result,
412 description: self.description,
413 }
414 }
415 fn work(&mut self, source1: &RhhValBatch<L>, source2: &RhhValBatch<L>, fuel: &mut isize) {
416
417 let starting_updates = self.result.times.len();
419 let mut effort = 0isize;
420
421 source1.storage.advance_to_live_key(&mut self.key_cursor1);
422 source2.storage.advance_to_live_key(&mut self.key_cursor2);
423
424 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
426 self.merge_key(&source1.storage, &source2.storage);
427 source1.storage.advance_to_live_key(&mut self.key_cursor1);
428 source2.storage.advance_to_live_key(&mut self.key_cursor2);
429 effort = (self.result.times.len() - starting_updates) as isize;
431 }
432
433 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
436 self.copy_key(&source1.storage, self.key_cursor1);
437 self.key_cursor1 += 1;
438 source1.storage.advance_to_live_key(&mut self.key_cursor1);
439 effort = (self.result.times.len() - starting_updates) as isize;
440 }
441 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
442 self.copy_key(&source2.storage, self.key_cursor2);
443 self.key_cursor2 += 1;
444 source2.storage.advance_to_live_key(&mut self.key_cursor2);
445 effort = (self.result.times.len() - starting_updates) as isize;
446 }
447
448 *fuel -= effort;
449 }
450 }
451
452 impl<L: Layout> RhhValMerger<L>
454 where
455 layout::Key<L>: Default + HashOrdered,
456 for<'a> layout::KeyRef<'a, L>: HashOrdered,
457 {
458 fn copy_key(&mut self, source: &RhhValStorage<L>, cursor: usize) {
466 let init_vals = self.result.vals.len();
468 let (mut lower, upper) = source.values_for_key(cursor);
469 while lower < upper {
470 self.stash_updates_for_val(source, lower);
471 if let Some(off) = self.consolidate_updates() {
472 self.result.vals_offs.push_ref(off);
473 self.result.vals.push_ref(source.vals.index(lower));
474 }
475 lower += 1;
476 }
477
478 if self.result.vals.len() > init_vals {
480 self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len()));
481 }
482 }
483 fn merge_key(&mut self, source1: &RhhValStorage<L>, source2: &RhhValStorage<L>) {
488
489 use ::std::cmp::Ordering;
490 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
491 Ordering::Less => {
492 self.copy_key(source1, self.key_cursor1);
493 self.key_cursor1 += 1;
494 },
495 Ordering::Equal => {
496 let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
498 let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
499 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
500 self.result.insert_key(source1.keys.index(self.key_cursor1), Some(off));
501 }
502 self.key_cursor1 += 1;
504 self.key_cursor2 += 1;
505 },
506 Ordering::Greater => {
507 self.copy_key(source2, self.key_cursor2);
508 self.key_cursor2 += 1;
509 },
510 }
511 }
512 fn merge_vals(
517 &mut self,
518 (source1, mut lower1, upper1): (&RhhValStorage<L>, usize, usize),
519 (source2, mut lower2, upper2): (&RhhValStorage<L>, usize, usize),
520 ) -> Option<usize> {
521 let init_vals = self.result.vals.len();
523 while lower1 < upper1 && lower2 < upper2 {
524 use ::std::cmp::Ordering;
528 match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
529 Ordering::Less => {
530 self.stash_updates_for_val(source1, lower1);
532 if let Some(off) = self.consolidate_updates() {
533 self.result.vals_offs.push_ref(off);
534 self.result.vals.push_ref(source1.vals.index(lower1));
535 }
536 lower1 += 1;
537 },
538 Ordering::Equal => {
539 self.stash_updates_for_val(source1, lower1);
540 self.stash_updates_for_val(source2, lower2);
541 if let Some(off) = self.consolidate_updates() {
542 self.result.vals_offs.push_ref(off);
543 self.result.vals.push_ref(source1.vals.index(lower1));
544 }
545 lower1 += 1;
546 lower2 += 1;
547 },
548 Ordering::Greater => {
549 self.stash_updates_for_val(source2, lower2);
551 if let Some(off) = self.consolidate_updates() {
552 self.result.vals_offs.push_ref(off);
553 self.result.vals.push_ref(source2.vals.index(lower2));
554 }
555 lower2 += 1;
556 },
557 }
558 }
559 while lower1 < upper1 {
561 self.stash_updates_for_val(source1, lower1);
562 if let Some(off) = self.consolidate_updates() {
563 self.result.vals_offs.push_ref(off);
564 self.result.vals.push_ref(source1.vals.index(lower1));
565 }
566 lower1 += 1;
567 }
568 while lower2 < upper2 {
569 self.stash_updates_for_val(source2, lower2);
570 if let Some(off) = self.consolidate_updates() {
571 self.result.vals_offs.push_ref(off);
572 self.result.vals.push_ref(source2.vals.index(lower2));
573 }
574 lower2 += 1;
575 }
576
577 if self.result.vals.len() > init_vals {
579 Some(self.result.vals.len())
580 } else {
581 None
582 }
583 }
584
585 fn stash_updates_for_val(&mut self, source: &RhhValStorage<L>, index: usize) {
587 let (lower, upper) = source.updates_for_value(index);
588 for i in lower .. upper {
589 let time = source.times.index(i);
591 let diff = source.diffs.index(i);
592 let mut new_time = L::TimeContainer::into_owned(time);
593 use crate::lattice::Lattice;
594 new_time.advance_by(self.description.since().borrow());
595 self.update_stash.push((new_time, L::DiffContainer::into_owned(diff)));
596 }
597 }
598
599 fn consolidate_updates(&mut self) -> Option<usize> {
601 use crate::consolidation;
602 consolidation::consolidate(&mut self.update_stash);
603 if !self.update_stash.is_empty() {
604 let time_diff = self.result.times.last().zip(self.result.diffs.last());
607 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
608 *t1 == L::TimeContainer::into_owned(t2) && *d1 == L::DiffContainer::into_owned(d2)
610 });
611 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
612 self.update_stash.clear();
614 self.singletons += 1;
615 }
616 else {
617 for (time, diff) in self.update_stash.drain(..) {
619 self.result.times.push_own(&time);
620 self.result.diffs.push_own(&diff);
621 }
622 }
623 Some(self.result.times.len())
624 } else {
625 None
626 }
627 }
628 }
629
630
631 pub struct RhhValCursor<L: Layout>
639 where
640 layout::Key<L>: Default + HashOrdered,
641 {
642 key_cursor: usize,
644 val_cursor: usize,
646 phantom: PhantomData<L>,
648 }
649
650 use crate::trace::implementations::WithLayout;
651 impl<L: Layout> WithLayout for RhhValCursor<L>
652 where
653 layout::Key<L>: Default + HashOrdered,
654 for<'a> layout::KeyRef<'a, L>: HashOrdered,
655 {
656 type Layout = L;
657 }
658
659 impl<L: Layout> Cursor for RhhValCursor<L>
660 where
661 layout::Key<L>: Default + HashOrdered,
662 for<'a> layout::KeyRef<'a, L>: HashOrdered,
663 {
664 type Storage = RhhValBatch<L>;
665
666 fn get_key<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
667 fn get_val<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Val<'a>> { if self.val_valid(storage) { storage.storage.vals.get(self.val_cursor) } else { None } }
668 fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
669 fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
670 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
671 let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
672 for index in lower .. upper {
673 let time = storage.storage.times.index(index);
674 let diff = storage.storage.diffs.index(index);
675 logic(time, diff);
676 }
677 }
678 fn key_valid(&self, storage: &RhhValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
679 fn val_valid(&self, storage: &RhhValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
680 fn step_key(&mut self, storage: &RhhValBatch<L>){
681 self.key_cursor += 1;
683 storage.storage.advance_to_live_key(&mut self.key_cursor);
684
685 if self.key_valid(storage) {
686 self.rewind_vals(storage);
687 }
688 else {
689 self.key_cursor = storage.storage.keys.len();
690 }
691 }
692 fn seek_key(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'_>) {
693 let desired = storage.storage.desired_location(&key);
695 if self.key_cursor < desired {
697 self.key_cursor = desired;
698 }
699 while self.key_valid(storage) && storage.storage.advance_key(self.key_cursor, key) {
703 self.key_cursor += 1;
706 }
707
708 if self.key_valid(storage) {
709 self.rewind_vals(storage);
710 }
711 }
712 fn step_val(&mut self, storage: &RhhValBatch<L>) {
713 self.val_cursor += 1;
714 if !self.val_valid(storage) {
715 self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
716 }
717 }
718 fn seek_val(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'_>) {
719 self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
720 }
721 fn rewind_keys(&mut self, storage: &RhhValBatch<L>) {
722 self.key_cursor = 0;
723 storage.storage.advance_to_live_key(&mut self.key_cursor);
724
725 if self.key_valid(storage) {
726 self.rewind_vals(storage)
727 }
728 }
729 fn rewind_vals(&mut self, storage: &RhhValBatch<L>) {
730 self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
731 }
732 }
733
734 pub struct RhhValBuilder<L: Layout, CI>
736 where
737 layout::Key<L>: Default + HashOrdered,
738 {
739 result: RhhValStorage<L>,
740 singleton: Option<(layout::Time<L>, layout::Diff<L>)>,
741 singletons: usize,
746 _marker: PhantomData<CI>,
747 }
748
749 impl<L: Layout, CI> RhhValBuilder<L, CI>
750 where
751 layout::Key<L>: Default + HashOrdered,
752 {
753 fn push_update(&mut self, time: layout::Time<L>, diff: layout::Diff<L>) {
765 if self.result.times.last().map(|t| L::TimeContainer::into_owned(t) == time).unwrap_or(false) && self.result.diffs.last().map(|d| L::DiffContainer::into_owned(d) == diff).unwrap_or(false) {
768 assert!(self.singleton.is_none());
769 self.singleton = Some((time, diff));
770 }
771 else {
772 if let Some((time, diff)) = self.singleton.take() {
774 self.result.times.push_own(&time);
775 self.result.diffs.push_own(&diff);
776 }
777 self.result.times.push_own(&time);
778 self.result.diffs.push_own(&diff);
779 }
780 }
781 }
782
783 impl<L: Layout, CI> Builder for RhhValBuilder<L, CI>
784 where
785 layout::Key<L>: Default + HashOrdered,
786 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Key<'a> = layout::Key<L>, Time=layout::Time<L>, Diff=layout::Diff<L>>,
787 for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
788 for<'a> layout::KeyRef<'a, L>: HashOrdered,
789 {
790 type Input = CI;
791 type Time = layout::Time<L>;
792 type Output = RhhValBatch<L>;
793
794 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
795
796 let rhh_capacity = 2 * keys;
798 let divisor = RhhValStorage::<L>::divisor_for_capacity(rhh_capacity);
799 let keys = rhh_capacity + 10;
803
804 Self {
806 result: RhhValStorage {
807 keys: L::KeyContainer::with_capacity(keys),
808 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
809 vals: L::ValContainer::with_capacity(vals),
810 vals_offs: L::OffsetContainer::with_capacity(vals + 1),
811 times: L::TimeContainer::with_capacity(upds),
812 diffs: L::DiffContainer::with_capacity(upds),
813 key_count: 0,
814 key_capacity: rhh_capacity,
815 divisor,
816 },
817 singleton: None,
818 singletons: 0,
819 _marker: PhantomData,
820 }
821 }
822
823 #[inline]
824 fn push(&mut self, chunk: &mut Self::Input) {
825 for item in chunk.drain() {
826 let (key, val, time, diff) = CI::into_parts(item);
827 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
829 if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
831 self.push_update(time, diff);
832 } else {
833 self.result.vals_offs.push_ref(self.result.times.len());
835 if self.singleton.take().is_some() { self.singletons += 1; }
836 self.push_update(time, diff);
837 self.result.vals.push_into(val);
838 }
839 } else {
840 self.result.vals_offs.push_ref(self.result.times.len());
842 if self.singleton.take().is_some() { self.singletons += 1; }
843 self.result.keys_offs.push_ref(self.result.vals.len());
844 self.push_update(time, diff);
845 self.result.vals.push_into(val);
846 self.result.insert_key_own(&key, None);
848 }
849 }
850 }
851
852 #[inline(never)]
853 fn done(mut self, description: Description<Self::Time>) -> RhhValBatch<L> {
854 self.result.vals_offs.push_ref(self.result.times.len());
856 if self.singleton.take().is_some() { self.singletons += 1; }
858 self.result.keys_offs.push_ref(self.result.vals.len());
859 RhhValBatch {
860 updates: self.result.times.len() + self.singletons,
861 storage: self.result,
862 description,
863 }
864 }
865
866 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
867 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
868 let mut builder = Self::with_capacity(keys, vals, upds);
869 for mut chunk in chain.drain(..) {
870 builder.push(&mut chunk);
871 }
872
873 builder.done(description)
874 }
875 }
876
877}
878
879mod key_batch {
880
881 }