1use std::rc::Rc;
9use std::cmp::Ordering;
10
11use serde::{Deserialize, Serialize};
12
13use crate::Hashable;
14use crate::containers::TimelyStack;
15use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::implementations::spine_fueled::Spine;
18use crate::trace::rc_blanket_impls::RcBuilder;
19
20use super::{Update, Layout, Vector, TStack};
21
22use self::val_batch::{RhhValBatch, RhhValBuilder};
23
24pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
26pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K,V),T,R)>>>>;
36pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38pub type ColBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41pub trait HashOrdered: Hashable { }
46
47impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T { }
48
49#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
51pub struct HashWrapper<T: std::hash::Hash + Hashable> {
52 pub inner: T
54}
55
56impl<T: PartialOrd + std::hash::Hash + Hashable> PartialOrd for HashWrapper<T>
57where <T as Hashable>::Output: PartialOrd {
58 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
59 let this_hash = self.inner.hashed();
60 let that_hash = other.inner.hashed();
61 (this_hash, &self.inner).partial_cmp(&(that_hash, &other.inner))
62 }
63}
64
65impl<T: Ord + PartialOrd + std::hash::Hash + Hashable> Ord for HashWrapper<T>
66where <T as Hashable>::Output: PartialOrd {
67 fn cmp(&self, other: &Self) -> Ordering {
68 self.partial_cmp(other).unwrap()
69 }
70}
71
72impl<T: std::hash::Hash + Hashable> HashOrdered for HashWrapper<T> { }
73
74impl<T: std::hash::Hash + Hashable> Hashable for HashWrapper<T> {
75 type Output = T::Output;
76 fn hashed(&self) -> Self::Output { self.inner.hashed() }
77}
78
79impl<T: std::hash::Hash + Hashable> HashOrdered for &HashWrapper<T> { }
80
81impl<T: std::hash::Hash + Hashable> Hashable for &HashWrapper<T> {
82 type Output = T::Output;
83 fn hashed(&self) -> Self::Output { self.inner.hashed() }
84}
85
86mod val_batch {
87
88 use std::convert::TryInto;
89 use std::marker::PhantomData;
90 use serde::{Deserialize, Serialize};
91 use timely::container::PushInto;
92 use timely::progress::{Antichain, frontier::AntichainRef};
93
94 use crate::hashable::Hashable;
95 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
96 use crate::trace::implementations::{BatchContainer, BuilderInput};
97 use crate::IntoOwned;
98
99 use super::{Layout, Update, HashOrdered};
100
101 #[derive(Debug, Serialize, Deserialize)]
116 pub struct RhhValStorage<L: Layout>
117 where
118 <L::Target as Update>::Key: Default + HashOrdered,
119 {
120
121 pub key_capacity: usize,
125 pub divisor: u64,
128 pub key_count: usize,
130
131 pub keys: L::KeyContainer,
133 pub keys_offs: L::OffsetContainer,
137 pub vals: L::ValContainer,
139 pub vals_offs: L::OffsetContainer,
148 pub times: L::TimeContainer,
150 pub diffs: L::DiffContainer,
152 }
153
154 impl<L: Layout> RhhValStorage<L>
155 where
156 <L::Target as Update>::Key: Default + HashOrdered,
157 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
158 {
159 fn values_for_key(&self, index: usize) -> (usize, usize) {
161 let lower = self.keys_offs.index(index);
162 let upper = self.keys_offs.index(index+1);
163 assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
165 (lower, upper)
166 }
167 fn updates_for_value(&self, index: usize) -> (usize, usize) {
169 let mut lower = self.vals_offs.index(index);
170 let upper = self.vals_offs.index(index+1);
171 if lower == upper {
174 assert!(lower > 0);
175 lower -= 1;
176 }
177 (lower, upper)
178 }
179
180 fn insert_key(&mut self, key: <L::KeyContainer as BatchContainer>::ReadItem<'_>, offset: Option<usize>) {
190 let desired = self.desired_location(&key);
191 while self.keys.len() < desired {
194 let current_offset = self.keys_offs.index(self.keys.len());
196 self.keys.push(<<L::Target as Update>::Key as Default>::default());
197 self.keys_offs.push(current_offset);
198 }
199
200 self.keys.push(key);
203 if let Some(offset) = offset {
204 self.keys_offs.push(offset);
205 }
206 self.key_count += 1;
207 }
208
209 fn desired_location<K: Hashable>(&self, key: &K) -> usize {
211 if self.divisor == 0 { 0 }
212 else {
213 (key.hashed().into() / self.divisor).try_into().expect("divisor not large enough to force u64 into uisze")
214 }
215 }
216
217 fn advance_key(&self, index: usize, key: <L::KeyContainer as BatchContainer>::ReadItem<'_>) -> bool {
219 !self.live_key(index) || self.keys.index(index).lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
221 }
222
223 fn live_key(&self, index: usize) -> bool {
225 self.keys_offs.index(index) != self.keys_offs.index(index+1)
226 }
227
228 fn advance_to_live_key(&self, index: &mut usize) {
230 while *index < self.keys.len() && !self.live_key(*index) {
231 *index += 1;
232 }
233 }
234
235 fn divisor_for_capacity(capacity: usize) -> u64 {
242 let capacity: u64 = capacity.try_into().expect("usize exceeds u64");
243 if capacity == 0 || capacity == 1 { 0 }
244 else {
245 ((1 << 63) / capacity) << 1
246 }
247 }
248 }
249
250 #[derive(Serialize, Deserialize)]
255 #[serde(bound = "
256 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
257 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
258 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
259 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
260 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
261 ")]
262 pub struct RhhValBatch<L: Layout>
263 where
264 <L::Target as Update>::Key: Default + HashOrdered,
265 {
266 pub storage: RhhValStorage<L>,
268 pub description: Description<<L::Target as Update>::Time>,
270 pub updates: usize,
276 }
277
278 impl<L: Layout> BatchReader for RhhValBatch<L>
279 where
280 <L::Target as Update>::Key: Default + HashOrdered,
281 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
282 {
283 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
284 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
285 type Time = <L::Target as Update>::Time;
286 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
287 type Diff = <L::Target as Update>::Diff;
288 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
289
290 type Cursor = RhhValCursor<L>;
291 fn cursor(&self) -> Self::Cursor {
292 let mut cursor = RhhValCursor {
293 key_cursor: 0,
294 val_cursor: 0,
295 phantom: std::marker::PhantomData,
296 };
297 cursor.step_key(self);
298 cursor
299 }
300 fn len(&self) -> usize {
301 self.updates
304 }
305 fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
306 }
307
308 impl<L: Layout> Batch for RhhValBatch<L>
309 where
310 <L::Target as Update>::Key: Default + HashOrdered,
311 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
312 {
313 type Merger = RhhValMerger<L>;
314
315 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
316 RhhValMerger::new(self, other, compaction_frontier)
317 }
318
319 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
320 use timely::progress::Timestamp;
321 Self {
322 storage: RhhValStorage {
323 keys: L::KeyContainer::with_capacity(0),
324 keys_offs: L::OffsetContainer::with_capacity(0),
325 vals: L::ValContainer::with_capacity(0),
326 vals_offs: L::OffsetContainer::with_capacity(0),
327 times: L::TimeContainer::with_capacity(0),
328 diffs: L::DiffContainer::with_capacity(0),
329 key_count: 0,
330 key_capacity: 0,
331 divisor: 0,
332 },
333 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
334 updates: 0,
335 }
336 }
337 }
338
339 pub struct RhhValMerger<L: Layout>
341 where
342 <L::Target as Update>::Key: Default + HashOrdered,
343 {
344 key_cursor1: usize,
346 key_cursor2: usize,
348 result: RhhValStorage<L>,
350 description: Description<<L::Target as Update>::Time>,
352
353 update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
358 singletons: usize,
360 }
361
362 impl<L: Layout> Merger<RhhValBatch<L>> for RhhValMerger<L>
363 where
364 <L::Target as Update>::Key: Default + HashOrdered,
365 RhhValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
366 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
367 {
368 fn new(batch1: &RhhValBatch<L>, batch2: &RhhValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
369
370 assert!(batch1.upper() == batch2.lower());
371 use crate::lattice::Lattice;
372 let mut since = batch1.description().since().join(batch2.description().since());
373 since = since.join(&compaction_frontier.to_owned());
374
375 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
376
377 let max_cap = batch1.len() + batch2.len();
380 let rhh_cap = 2 * max_cap;
381
382 let batch1 = &batch1.storage;
383 let batch2 = &batch2.storage;
384
385 let mut storage = RhhValStorage {
386 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
387 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
388 vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
389 vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
390 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
391 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
392 key_count: 0,
393 key_capacity: rhh_cap,
394 divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
395 };
396
397 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
399 keys_offs.push(0);
400 let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
401 vals_offs.push(0);
402
403 RhhValMerger {
404 key_cursor1: 0,
405 key_cursor2: 0,
406 result: storage,
407 description,
408 update_stash: Vec::new(),
409 singletons: 0,
410 }
411 }
412 fn done(self) -> RhhValBatch<L> {
413 RhhValBatch {
414 updates: self.result.times.len() + self.singletons,
415 storage: self.result,
416 description: self.description,
417 }
418 }
419 fn work(&mut self, source1: &RhhValBatch<L>, source2: &RhhValBatch<L>, fuel: &mut isize) {
420
421 let starting_updates = self.result.times.len();
423 let mut effort = 0isize;
424
425 source1.storage.advance_to_live_key(&mut self.key_cursor1);
426 source2.storage.advance_to_live_key(&mut self.key_cursor2);
427
428 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
430 self.merge_key(&source1.storage, &source2.storage);
431 source1.storage.advance_to_live_key(&mut self.key_cursor1);
432 source2.storage.advance_to_live_key(&mut self.key_cursor2);
433 effort = (self.result.times.len() - starting_updates) as isize;
435 }
436
437 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
440 self.copy_key(&source1.storage, self.key_cursor1);
441 self.key_cursor1 += 1;
442 source1.storage.advance_to_live_key(&mut self.key_cursor1);
443 effort = (self.result.times.len() - starting_updates) as isize;
444 }
445 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
446 self.copy_key(&source2.storage, self.key_cursor2);
447 self.key_cursor2 += 1;
448 source2.storage.advance_to_live_key(&mut self.key_cursor2);
449 effort = (self.result.times.len() - starting_updates) as isize;
450 }
451
452 *fuel -= effort;
453 }
454 }
455
456 impl<L: Layout> RhhValMerger<L>
458 where
459 <L::Target as Update>::Key: Default + HashOrdered,
460 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
461 {
462 fn copy_key(&mut self, source: &RhhValStorage<L>, cursor: usize) {
470 let init_vals = self.result.vals.len();
472 let (mut lower, upper) = source.values_for_key(cursor);
473 while lower < upper {
474 self.stash_updates_for_val(source, lower);
475 if let Some(off) = self.consolidate_updates() {
476 self.result.vals_offs.push(off);
477 self.result.vals.push(source.vals.index(lower));
478 }
479 lower += 1;
480 }
481
482 if self.result.vals.len() > init_vals {
484 self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len()));
485 }
486 }
487 fn merge_key(&mut self, source1: &RhhValStorage<L>, source2: &RhhValStorage<L>) {
492
493 use ::std::cmp::Ordering;
494 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
495 Ordering::Less => {
496 self.copy_key(source1, self.key_cursor1);
497 self.key_cursor1 += 1;
498 },
499 Ordering::Equal => {
500 let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
502 let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
503 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
504 self.result.insert_key(source1.keys.index(self.key_cursor1), Some(off));
505 }
506 self.key_cursor1 += 1;
508 self.key_cursor2 += 1;
509 },
510 Ordering::Greater => {
511 self.copy_key(source2, self.key_cursor2);
512 self.key_cursor2 += 1;
513 },
514 }
515 }
516 fn merge_vals(
521 &mut self,
522 (source1, mut lower1, upper1): (&RhhValStorage<L>, usize, usize),
523 (source2, mut lower2, upper2): (&RhhValStorage<L>, usize, usize),
524 ) -> Option<usize> {
525 let init_vals = self.result.vals.len();
527 while lower1 < upper1 && lower2 < upper2 {
528 use ::std::cmp::Ordering;
532 match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
533 Ordering::Less => {
534 self.stash_updates_for_val(source1, lower1);
536 if let Some(off) = self.consolidate_updates() {
537 self.result.vals_offs.push(off);
538 self.result.vals.push(source1.vals.index(lower1));
539 }
540 lower1 += 1;
541 },
542 Ordering::Equal => {
543 self.stash_updates_for_val(source1, lower1);
544 self.stash_updates_for_val(source2, lower2);
545 if let Some(off) = self.consolidate_updates() {
546 self.result.vals_offs.push(off);
547 self.result.vals.push(source1.vals.index(lower1));
548 }
549 lower1 += 1;
550 lower2 += 1;
551 },
552 Ordering::Greater => {
553 self.stash_updates_for_val(source2, lower2);
555 if let Some(off) = self.consolidate_updates() {
556 self.result.vals_offs.push(off);
557 self.result.vals.push(source2.vals.index(lower2));
558 }
559 lower2 += 1;
560 },
561 }
562 }
563 while lower1 < upper1 {
565 self.stash_updates_for_val(source1, lower1);
566 if let Some(off) = self.consolidate_updates() {
567 self.result.vals_offs.push(off);
568 self.result.vals.push(source1.vals.index(lower1));
569 }
570 lower1 += 1;
571 }
572 while lower2 < upper2 {
573 self.stash_updates_for_val(source2, lower2);
574 if let Some(off) = self.consolidate_updates() {
575 self.result.vals_offs.push(off);
576 self.result.vals.push(source2.vals.index(lower2));
577 }
578 lower2 += 1;
579 }
580
581 if self.result.vals.len() > init_vals {
583 Some(self.result.vals.len())
584 } else {
585 None
586 }
587 }
588
589 fn stash_updates_for_val(&mut self, source: &RhhValStorage<L>, index: usize) {
591 let (lower, upper) = source.updates_for_value(index);
592 for i in lower .. upper {
593 let time = source.times.index(i);
595 let diff = source.diffs.index(i);
596 let mut new_time = time.into_owned();
597 use crate::lattice::Lattice;
598 new_time.advance_by(self.description.since().borrow());
599 self.update_stash.push((new_time, diff.into_owned()));
600 }
601 }
602
603 fn consolidate_updates(&mut self) -> Option<usize> {
605 use crate::consolidation;
606 consolidation::consolidate(&mut self.update_stash);
607 if !self.update_stash.is_empty() {
608 let time_diff = self.result.times.last().zip(self.result.diffs.last());
611 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
612 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
613 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
614 t1.eq(&t2) && d1.eq(&d2)
615 });
616 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
617 self.update_stash.clear();
619 self.singletons += 1;
620 }
621 else {
622 for (time, diff) in self.update_stash.drain(..) {
624 self.result.times.push(time);
625 self.result.diffs.push(diff);
626 }
627 }
628 Some(self.result.times.len())
629 } else {
630 None
631 }
632 }
633 }
634
635
636 pub struct RhhValCursor<L: Layout>
644 where
645 <L::Target as Update>::Key: Default + HashOrdered,
646 {
647 key_cursor: usize,
649 val_cursor: usize,
651 phantom: PhantomData<L>,
653 }
654
655 impl<L: Layout> Cursor for RhhValCursor<L>
656 where
657 <L::Target as Update>::Key: Default + HashOrdered,
658 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
659 {
660 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
661 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
662 type Time = <L::Target as Update>::Time;
663 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
664 type Diff = <L::Target as Update>::Diff;
665 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
666
667 type Storage = RhhValBatch<L>;
668
669 fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> {
670 storage.storage.keys.index(self.key_cursor)
671 }
672 fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
673 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
674 let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
675 for index in lower .. upper {
676 let time = storage.storage.times.index(index);
677 let diff = storage.storage.diffs.index(index);
678 logic(time, diff);
679 }
680 }
681 fn key_valid(&self, storage: &RhhValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
682 fn val_valid(&self, storage: &RhhValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
683 fn step_key(&mut self, storage: &RhhValBatch<L>){
684 self.key_cursor += 1;
686 storage.storage.advance_to_live_key(&mut self.key_cursor);
687
688 if self.key_valid(storage) {
689 self.rewind_vals(storage);
690 }
691 else {
692 self.key_cursor = storage.storage.keys.len();
693 }
694 }
695 fn seek_key(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'_>) {
696 let desired = storage.storage.desired_location(&key);
698 if self.key_cursor < desired {
700 self.key_cursor = desired;
701 }
702 while self.key_valid(storage) && storage.storage.advance_key(self.key_cursor, key) {
706 self.key_cursor += 1;
709 }
710
711 if self.key_valid(storage) {
712 self.rewind_vals(storage);
713 }
714 }
715 fn step_val(&mut self, storage: &RhhValBatch<L>) {
716 self.val_cursor += 1;
717 if !self.val_valid(storage) {
718 self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
719 }
720 }
721 fn seek_val(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'_>) {
722 self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
723 }
724 fn rewind_keys(&mut self, storage: &RhhValBatch<L>) {
725 self.key_cursor = 0;
726 storage.storage.advance_to_live_key(&mut self.key_cursor);
727
728 if self.key_valid(storage) {
729 self.rewind_vals(storage)
730 }
731 }
732 fn rewind_vals(&mut self, storage: &RhhValBatch<L>) {
733 self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
734 }
735 }
736
737 pub struct RhhValBuilder<L: Layout, CI>
739 where
740 <L::Target as Update>::Key: Default + HashOrdered,
741 {
742 result: RhhValStorage<L>,
743 singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
744 singletons: usize,
749 _marker: PhantomData<CI>,
750 }
751
752 impl<L: Layout, CI> RhhValBuilder<L, CI>
753 where
754 <L::Target as Update>::Key: Default + HashOrdered,
755 {
756 fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
768 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time);
770 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff);
771 if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
772 assert!(self.singleton.is_none());
773 self.singleton = Some((time, diff));
774 }
775 else {
776 if let Some((time, diff)) = self.singleton.take() {
778 self.result.times.push(time);
779 self.result.diffs.push(diff);
780 }
781 self.result.times.push(time);
782 self.result.diffs.push(diff);
783 }
784 }
785 }
786
787 impl<L: Layout, CI> Builder for RhhValBuilder<L, CI>
788 where
789 <L::Target as Update>::Key: Default + HashOrdered,
790 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Key<'a> = <L::Target as Update>::Key, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
791 for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
792 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered + IntoOwned<'a, Owned = <L::Target as Update>::Key>,
793 {
794 type Input = CI;
795 type Time = <L::Target as Update>::Time;
796 type Output = RhhValBatch<L>;
797
798 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
799
800 let rhh_capacity = 2 * keys;
802 let divisor = RhhValStorage::<L>::divisor_for_capacity(rhh_capacity);
803 let keys = rhh_capacity + 10;
807
808 Self {
810 result: RhhValStorage {
811 keys: L::KeyContainer::with_capacity(keys),
812 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
813 vals: L::ValContainer::with_capacity(vals),
814 vals_offs: L::OffsetContainer::with_capacity(vals + 1),
815 times: L::TimeContainer::with_capacity(upds),
816 diffs: L::DiffContainer::with_capacity(upds),
817 key_count: 0,
818 key_capacity: rhh_capacity,
819 divisor,
820 },
821 singleton: None,
822 singletons: 0,
823 _marker: PhantomData,
824 }
825 }
826
827 #[inline]
828 fn push(&mut self, chunk: &mut Self::Input) {
829 for item in chunk.drain() {
830 let (key, val, time, diff) = CI::into_parts(item);
831 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
833 if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
835 self.push_update(time, diff);
836 } else {
837 self.result.vals_offs.push(self.result.times.len());
839 if self.singleton.take().is_some() { self.singletons += 1; }
840 self.push_update(time, diff);
841 self.result.vals.push(val);
842 }
843 } else {
844 self.result.vals_offs.push(self.result.times.len());
846 if self.singleton.take().is_some() { self.singletons += 1; }
847 self.result.keys_offs.push(self.result.vals.len());
848 self.push_update(time, diff);
849 self.result.vals.push(val);
850 self.result.insert_key(IntoOwned::borrow_as(&key), None);
852 }
853 }
854 }
855
856 #[inline(never)]
857 fn done(mut self, description: Description<Self::Time>) -> RhhValBatch<L> {
858 self.result.vals_offs.push(self.result.times.len());
860 if self.singleton.take().is_some() { self.singletons += 1; }
862 self.result.keys_offs.push(self.result.vals.len());
863 RhhValBatch {
864 updates: self.result.times.len() + self.singletons,
865 storage: self.result,
866 description,
867 }
868 }
869
870 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
871 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
872 let mut builder = Self::with_capacity(keys, vals, upds);
873 for mut chunk in chain.drain(..) {
874 builder.push(&mut chunk);
875 }
876
877 builder.done(description)
878 }
879 }
880
881}
882
883mod key_batch {
884
885 }