1use crate::storage::tracking_bloom_filter::BloomFilterStats;
2use crate::trace::cursor::Position;
3use crate::trace::ord::merge_batcher::MergeBatcher;
4use crate::{
5 DBData, DBWeight, NumEntries, Timestamp,
6 algebra::Lattice,
7 dynamic::{
8 DataTrait, DynDataTyped, DynPair, DynVec, DynWeightedPairs, Erase, Factory, LeanVec,
9 WeightTrait, WithFactory,
10 },
11 trace::{
12 Batch, BatchFactories, BatchReader, BatchReaderFactories, Builder, Cursor, Deserializer,
13 Serializer,
14 layers::{
15 Cursor as TrieCursor, Layer, LayerCursor, LayerFactories, Leaf, LeafFactories,
16 OrdOffset, Trie,
17 },
18 },
19 utils::{ConsolidatePairedSlices, Tup2},
20};
21use feldera_storage::FileReader;
22use rand::Rng;
23use rkyv::{Archive, Deserialize, Serialize};
24use size_of::SizeOf;
25use std::any::TypeId;
26use std::fmt::{self, Debug, Display, Formatter};
27use std::sync::Arc;
28
29pub type VecValBatchLayer<K, V, T, R, O> = Layer<K, Layer<V, Leaf<DynDataTyped<T>, R>, O>, O>;
30
31pub struct VecValBatchFactories<K, V, T, R>
32where
33 K: DataTrait + ?Sized,
34 V: DataTrait + ?Sized,
35 R: WeightTrait + ?Sized,
36 T: Timestamp,
37{
38 layer_factories: LayerFactories<K, LayerFactories<V, LeafFactories<DynDataTyped<T>, R>>>,
39 item_factory: &'static dyn Factory<DynPair<K, V>>,
40 consolidate_weights: &'static dyn ConsolidatePairedSlices<DynDataTyped<T>, R>,
41 weighted_item_factory: &'static dyn Factory<DynPair<DynPair<K, V>, R>>,
42 weighted_items_factory: &'static dyn Factory<DynWeightedPairs<DynPair<K, V>, R>>,
43 weighted_vals_factory: &'static dyn Factory<DynWeightedPairs<V, R>>,
44 time_diffs_factory: &'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>,
45}
46
47impl<K, V, T, R> Clone for VecValBatchFactories<K, V, T, R>
48where
49 K: DataTrait + ?Sized,
50 V: DataTrait + ?Sized,
51 R: WeightTrait + ?Sized,
52 T: Timestamp,
53{
54 fn clone(&self) -> Self {
55 Self {
56 layer_factories: self.layer_factories.clone(),
57 item_factory: self.item_factory,
58 consolidate_weights: self.consolidate_weights,
59 weighted_item_factory: self.weighted_item_factory,
60 weighted_items_factory: self.weighted_items_factory,
61 weighted_vals_factory: self.weighted_vals_factory,
62 time_diffs_factory: self.time_diffs_factory,
63 }
64 }
65}
66
67unsafe impl<K, V, T, R> Send for VecValBatchFactories<K, V, T, R>
68where
69 K: DataTrait + ?Sized,
70 V: DataTrait + ?Sized,
71 R: WeightTrait + ?Sized,
72 T: Timestamp,
73{
74}
75
76impl<K, V, T, R> BatchReaderFactories<K, V, T, R> for VecValBatchFactories<K, V, T, R>
77where
78 K: DataTrait + ?Sized,
79 V: DataTrait + ?Sized,
80 R: WeightTrait + ?Sized,
81 T: Timestamp,
82{
83 fn new<KType, VType, RType>() -> Self
84 where
85 KType: DBData + Erase<K>,
86 VType: DBData + Erase<V>,
87 RType: DBWeight + Erase<R>,
88 {
89 Self {
90 layer_factories: LayerFactories::new::<KType>(LayerFactories::new::<VType>(
91 LeafFactories::new::<T, RType>(),
92 )),
93 item_factory: WithFactory::<Tup2<KType, VType>>::FACTORY,
94 consolidate_weights: <dyn ConsolidatePairedSlices<_, _>>::factory::<T, RType>(),
95 weighted_item_factory: WithFactory::<Tup2<Tup2<KType, VType>, RType>>::FACTORY,
96 weighted_items_factory:
97 WithFactory::<LeanVec<Tup2<Tup2<KType, VType>, RType>>>::FACTORY,
98 weighted_vals_factory: WithFactory::<LeanVec<Tup2<VType, RType>>>::FACTORY,
99 time_diffs_factory: WithFactory::<LeanVec<Tup2<T, RType>>>::FACTORY,
100 }
101 }
102
103 fn key_factory(&self) -> &'static dyn Factory<K> {
104 self.layer_factories.key
105 }
106
107 fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>> {
108 self.layer_factories.keys
109 }
110
111 fn val_factory(&self) -> &'static dyn Factory<V> {
112 self.layer_factories.child.key
113 }
114
115 fn weight_factory(&self) -> &'static dyn Factory<R> {
116 self.layer_factories.child.child.diff
117 }
118}
119
120impl<K, V, T, R> BatchFactories<K, V, T, R> for VecValBatchFactories<K, V, T, R>
121where
122 K: DataTrait + ?Sized,
123 V: DataTrait + ?Sized,
124 R: WeightTrait + ?Sized,
125 T: Timestamp,
126{
127 fn item_factory(&self) -> &'static dyn Factory<DynPair<K, V>> {
130 self.item_factory
131 }
132
133 fn weighted_item_factory(&self) -> &'static dyn Factory<DynPair<DynPair<K, V>, R>> {
134 self.weighted_item_factory
135 }
136
137 fn weighted_items_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, V>, R>> {
138 self.weighted_items_factory
139 }
140
141 fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<V, R>> {
142 self.weighted_vals_factory
143 }
144
145 fn time_diffs_factory(
146 &self,
147 ) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>> {
148 Some(self.time_diffs_factory)
149 }
150
151 }
181
182#[derive(SizeOf)]
185pub struct VecValBatch<K, V, T, R, O = usize>
186where
187 K: DataTrait + ?Sized,
188 V: DataTrait + ?Sized,
189 R: WeightTrait + ?Sized,
190 T: Timestamp,
191 O: OrdOffset,
192{
193 #[size_of(skip)]
194 factories: VecValBatchFactories<K, V, T, R>,
195
196 pub layer: VecValBatchLayer<K, V, T, R, O>,
204}
205
206unsafe impl<K, V, T, R, O> Send for VecValBatch<K, V, T, R, O>
207where
208 K: DataTrait + ?Sized,
209 V: DataTrait + ?Sized,
210 R: WeightTrait + ?Sized,
211 T: Timestamp,
212 O: OrdOffset,
213{
214}
215
216impl<K, V, T, R, O> Debug for VecValBatch<K, V, T, R, O>
217where
218 K: DataTrait + ?Sized,
219 V: DataTrait + ?Sized,
220 R: WeightTrait + ?Sized,
221 T: Timestamp,
222 O: OrdOffset,
223{
224 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225 f.debug_struct("VecValBatch")
226 .field("layer", &self.layer)
227 .finish()
228 }
229}
230
231impl<K, V, T: Lattice, R, O: OrdOffset> Deserialize<VecValBatch<K, V, T, R, O>, Deserializer> for ()
232where
233 K: DataTrait + ?Sized,
234 V: DataTrait + ?Sized,
235 R: WeightTrait + ?Sized,
236 T: Timestamp,
237 O: OrdOffset,
238{
239 fn deserialize(
240 &self,
241 _deserializer: &mut Deserializer,
242 ) -> Result<VecValBatch<K, V, T, R, O>, <Deserializer as rkyv::Fallible>::Error> {
243 todo!()
244 }
245}
246
247impl<K, V, T: Lattice, R, O: OrdOffset> Archive for VecValBatch<K, V, T, R, O>
248where
249 K: DataTrait + ?Sized,
250 V: DataTrait + ?Sized,
251 R: WeightTrait + ?Sized,
252 T: Timestamp,
253 O: OrdOffset,
254{
255 type Archived = ();
256 type Resolver = ();
257
258 unsafe fn resolve(&self, _pos: usize, _resolver: Self::Resolver, _out: *mut Self::Archived) {
259 todo!()
260 }
261}
262impl<K, V, T: Lattice, R, O: OrdOffset> Serialize<Serializer> for VecValBatch<K, V, T, R, O>
263where
264 K: DataTrait + ?Sized,
265 V: DataTrait + ?Sized,
266 R: WeightTrait + ?Sized,
267 T: Timestamp,
268 O: OrdOffset,
269{
270 fn serialize(
271 &self,
272 _serializer: &mut Serializer,
273 ) -> Result<Self::Resolver, <Serializer as rkyv::Fallible>::Error> {
274 todo!()
275 }
276}
277
278impl<K, V, T, R, O> Clone for VecValBatch<K, V, T, R, O>
279where
280 K: DataTrait + ?Sized,
281 V: DataTrait + ?Sized,
282 R: WeightTrait + ?Sized,
283 T: Timestamp,
284 O: OrdOffset,
285{
286 fn clone(&self) -> Self {
287 Self {
288 factories: self.factories.clone(),
289 layer: self.layer.clone(),
290 }
291 }
292}
293
294impl<K, V, T, R, O> NumEntries for VecValBatch<K, V, T, R, O>
295where
296 K: DataTrait + ?Sized,
297 V: DataTrait + ?Sized,
298 R: WeightTrait + ?Sized,
299 T: Timestamp,
300 O: OrdOffset,
301{
302 const CONST_NUM_ENTRIES: Option<usize> = <VecValBatchLayer<K, V, T, R, O>>::CONST_NUM_ENTRIES;
303
304 #[inline]
305 fn num_entries_shallow(&self) -> usize {
306 self.layer.num_entries_shallow()
307 }
308
309 #[inline]
310 fn num_entries_deep(&self) -> usize {
311 self.layer.num_entries_deep()
312 }
313}
314
315impl<K, V, T, R, O> Display for VecValBatch<K, V, T, R, O>
316where
317 K: DataTrait + ?Sized,
318 V: DataTrait + ?Sized,
319 R: WeightTrait + ?Sized,
320 T: Timestamp,
321 O: OrdOffset,
322{
323 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
324 writeln!(
325 f,
326 "layer:\n{}",
327 textwrap::indent(&self.layer.to_string(), " ")
328 )
329 }
330}
331
332impl<K, V, T, R, O> BatchReader for VecValBatch<K, V, T, R, O>
333where
334 K: DataTrait + ?Sized,
335 V: DataTrait + ?Sized,
336 R: WeightTrait + ?Sized,
337 T: Timestamp,
338 O: OrdOffset,
339{
340 type Key = K;
341 type Val = V;
342 type Time = T;
343 type R = R;
344 type Factories = VecValBatchFactories<K, V, T, R>;
345
346 type Cursor<'s>
347 = VecValCursor<'s, K, V, T, R, O>
348 where
349 O: 's;
350
351 fn factories(&self) -> Self::Factories {
354 self.factories.clone()
355 }
356
357 fn cursor(&self) -> Self::Cursor<'_> {
358 VecValCursor {
359 cursor: self.layer.cursor(),
360 }
361 }
362
363 fn key_count(&self) -> usize {
368 <VecValBatchLayer<K, V, T, R, O> as Trie>::keys(&self.layer)
369 }
370
371 fn len(&self) -> usize {
372 <VecValBatchLayer<K, V, T, R, O> as Trie>::tuples(&self.layer)
373 }
374
375 fn approximate_byte_size(&self) -> usize {
376 self.layer.approximate_byte_size()
377 }
378
379 fn filter_stats(&self) -> BloomFilterStats {
380 BloomFilterStats::default()
381 }
382
383 fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
384 where
385 Self::Time: PartialEq<()>,
386 RG: Rng,
387 {
388 self.layer.sample_keys(rng, sample_size, sample);
389 }
390}
391
392impl<K, V, T, R, O> Batch for VecValBatch<K, V, T, R, O>
393where
394 K: DataTrait + ?Sized,
395 V: DataTrait + ?Sized,
396 R: WeightTrait + ?Sized,
397 T: Timestamp,
398 O: OrdOffset,
399{
400 type Timed<T2: Timestamp> = VecValBatch<K, V, T2, R, O>;
401 type Batcher = MergeBatcher<Self>;
402 type Builder = VecValBuilder<K, V, T, R, O>;
403
404 fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
405 unimplemented!()
406 }
407
408 }
420
421#[derive(Debug, SizeOf)]
423pub struct VecValCursor<'s, K, V, T, R, O = usize>
424where
425 K: DataTrait + ?Sized,
426 V: DataTrait + ?Sized,
427 R: WeightTrait + ?Sized,
428 T: Timestamp,
429 O: OrdOffset,
430{
431 cursor: LayerCursor<'s, K, Layer<V, Leaf<DynDataTyped<T>, R>, O>, O>,
432}
433
434impl<K, V, T, R, O> Clone for VecValCursor<'_, K, V, T, R, O>
435where
436 K: DataTrait + ?Sized,
437 V: DataTrait + ?Sized,
438 R: WeightTrait + ?Sized,
439 T: Timestamp,
440 O: OrdOffset,
441{
442 fn clone(&self) -> Self {
443 Self {
444 cursor: self.cursor.clone(),
445 }
446 }
447}
448
449impl<K, V, T, R, O> Cursor<K, V, T, R> for VecValCursor<'_, K, V, T, R, O>
450where
451 K: DataTrait + ?Sized,
452 V: DataTrait + ?Sized,
453 R: WeightTrait + ?Sized,
454 T: Timestamp,
455 O: OrdOffset,
456{
457 fn weight_factory(&self) -> &'static dyn Factory<R> {
458 self.cursor.child.child.storage.factories.diff
459 }
460
461 fn key(&self) -> &K {
462 self.cursor.item()
463 }
464
465 fn val(&self) -> &V {
466 self.cursor.child.item()
467 }
468
469 fn map_times(&mut self, logic: &mut dyn FnMut(&T, &R)) {
470 self.cursor.child.child.rewind();
471 while self.cursor.child.child.valid() {
472 logic(
473 self.cursor.child.child.current_key(),
474 self.cursor.child.child.current_diff(),
475 );
476 self.cursor.child.child.step();
477 }
478 }
479
480 fn map_times_through(&mut self, upper: &T, logic: &mut dyn FnMut(&T, &R)) {
481 self.cursor.child.child.rewind();
482 while self.cursor.child.child.valid() {
483 if self.cursor.child.child.item().0.less_equal(upper) {
484 logic(
485 self.cursor.child.child.current_key(),
486 self.cursor.child.child.current_diff(),
487 );
488 }
489 self.cursor.child.child.step();
490 }
491 }
492
493 fn map_values(&mut self, logic: &mut dyn FnMut(&V, &R))
494 where
495 T: PartialEq<()>,
496 {
497 while self.val_valid() {
498 logic(self.val(), self.cursor.child.child.current_diff());
499 self.step_val();
500 }
501 }
502
503 fn weight(&mut self) -> &R
504 where
505 T: PartialEq<()>,
506 {
507 self.weight_checked()
508 }
509
510 fn weight_checked(&mut self) -> &R {
511 if TypeId::of::<T>() == TypeId::of::<()>() {
512 debug_assert!(&self.cursor.child.child.valid());
513 self.cursor.child.child.current_diff()
514 } else {
515 panic!("VecValCursor::weight_checked called on non-unit timestamp type");
516 }
517 }
518
519 fn key_valid(&self) -> bool {
520 self.cursor.valid()
521 }
522 fn val_valid(&self) -> bool {
523 self.cursor.child.valid()
524 }
525 fn step_key(&mut self) {
526 self.cursor.step();
527 }
528 fn step_key_reverse(&mut self) {
529 self.cursor.step_reverse();
530 }
531 fn seek_key(&mut self, key: &K) {
532 self.cursor.seek(key);
533 }
534
535 fn seek_key_exact(&mut self, key: &K, _hash: Option<u64>) -> bool {
536 self.seek_key(key);
537 self.key_valid() && self.key().eq(key)
538 }
539
540 fn seek_key_with(&mut self, predicate: &dyn Fn(&K) -> bool) {
541 self.cursor.seek_with(predicate);
542 }
543 fn seek_key_with_reverse(&mut self, predicate: &dyn Fn(&K) -> bool) {
544 self.cursor.seek_with_reverse(predicate);
545 }
546 fn seek_key_reverse(&mut self, key: &K) {
547 self.cursor.seek_reverse(key);
548 }
549 fn step_val(&mut self) {
550 self.cursor.child.step();
551 }
552 fn seek_val(&mut self, val: &V) {
553 self.cursor.child.seek(val);
554 }
555 fn seek_val_with(&mut self, predicate: &dyn Fn(&V) -> bool) {
556 self.cursor.child.seek_with(predicate);
557 }
558 fn rewind_keys(&mut self) {
559 self.cursor.rewind();
560 }
561 fn fast_forward_keys(&mut self) {
562 self.cursor.fast_forward();
563 }
564 fn rewind_vals(&mut self) {
565 self.cursor.child.rewind();
566 }
567
568 fn step_val_reverse(&mut self) {
569 self.cursor.child.step_reverse();
570 }
571
572 fn seek_val_reverse(&mut self, val: &V) {
573 self.cursor.child.seek_reverse(val);
574 }
575
576 fn seek_val_with_reverse(&mut self, predicate: &dyn Fn(&V) -> bool) {
577 self.cursor.child.seek_with_reverse(predicate);
578 }
579
580 fn fast_forward_vals(&mut self) {
581 self.cursor.child.fast_forward();
582 }
583
584 fn position(&self) -> Option<Position> {
585 Some(Position {
586 total: TrieCursor::keys(&self.cursor) as u64,
587 offset: self.cursor.pos() as u64,
588 })
589 }
590}
591
592#[derive(SizeOf)]
594pub struct VecValBuilder<K, V, T, R, O = usize>
595where
596 K: DataTrait + ?Sized,
597 V: DataTrait + ?Sized,
598 R: WeightTrait + ?Sized,
599 T: Timestamp,
600 O: OrdOffset,
601{
602 #[size_of(skip)]
603 factories: VecValBatchFactories<K, V, T, R>,
604 keys: Box<DynVec<K>>,
605 offs: Vec<O>,
606 vals: Box<DynVec<V>>,
607 val_offs: Vec<O>,
608 times: Box<DynVec<DynDataTyped<T>>>,
609 diffs: Box<DynVec<R>>,
610}
611
612impl<K, V, T, R, O> VecValBuilder<K, V, T, R, O>
613where
614 K: DataTrait + ?Sized,
615 V: DataTrait + ?Sized,
616 R: WeightTrait + ?Sized,
617 T: Timestamp,
618 O: OrdOffset,
619{
620 fn pushed_key(&mut self) {
621 let off = O::from_usize(self.vals.len());
622 debug_assert!(off > *self.offs.last().unwrap());
623 self.offs.push(off);
624 }
625
626 fn pushed_val(&mut self) {
627 let val_off = O::from_usize(self.times.len());
628 debug_assert!(val_off > *self.val_offs.last().unwrap());
629 self.val_offs.push(val_off);
630 }
631}
632
633impl<K, V, T, R, O> Builder<VecValBatch<K, V, T, R, O>> for VecValBuilder<K, V, T, R, O>
634where
635 Self: SizeOf,
636 K: DataTrait + ?Sized,
637 V: DataTrait + ?Sized,
638 R: WeightTrait + ?Sized,
639 T: Timestamp,
640 O: OrdOffset,
641{
642 fn with_capacity(
643 factories: &VecValBatchFactories<K, V, T, R>,
644 key_capacity: usize,
645 value_capacity: usize,
646 ) -> Self {
647 let mut keys = factories.layer_factories.keys.default_box();
648 keys.reserve_exact(key_capacity);
649
650 let mut offs = Vec::with_capacity(key_capacity + 1);
651 offs.push(O::zero());
652
653 let mut vals = factories.layer_factories.child.keys.default_box();
654 vals.reserve_exact(value_capacity);
655
656 let mut val_offs = Vec::with_capacity(value_capacity + 1);
657 val_offs.push(O::zero());
658
659 let mut times = factories.layer_factories.child.child.keys.default_box();
660 times.reserve_exact(value_capacity);
661
662 let mut diffs = factories.layer_factories.child.child.diffs.default_box();
663 diffs.reserve_exact(value_capacity);
664 Self {
665 factories: factories.clone(),
666 keys,
667 offs,
668 vals,
669 val_offs,
670 times,
671 diffs,
672 }
673 }
674
675 fn reserve(&mut self, additional: usize) {
676 self.keys.reserve(additional);
677 self.offs.reserve(additional);
678 self.vals.reserve(additional);
679 self.times.reserve(additional);
680 self.diffs.reserve(additional);
681 }
682
683 fn push_key(&mut self, key: &K) {
684 self.keys.push_ref(key);
685 self.pushed_key();
686 }
687
688 fn push_key_mut(&mut self, key: &mut K) {
689 self.keys.push_val(key);
690 self.pushed_key();
691 }
692
693 fn push_val(&mut self, val: &V) {
694 self.vals.push_ref(val);
695 self.pushed_val();
696 }
697
698 fn push_val_mut(&mut self, val: &mut V) {
699 self.vals.push_val(val);
700 self.pushed_val();
701 }
702
703 fn push_time_diff(&mut self, time: &T, weight: &R) {
704 debug_assert!(!weight.is_zero());
705 self.times.push(time.clone());
706 self.diffs.push_ref(weight);
707 }
708
709 fn push_time_diff_mut(&mut self, time: &mut T, weight: &mut R) {
710 debug_assert!(!weight.is_zero());
711 self.times.push(time.clone());
712 self.diffs.push_val(weight);
713 }
714
715 fn done(self) -> VecValBatch<K, V, T, R, O> {
716 VecValBatch {
717 layer: Layer::from_parts(
718 &self.factories.layer_factories,
719 self.keys,
720 self.offs,
721 Layer::from_parts(
722 &self.factories.layer_factories.child,
723 self.vals,
724 self.val_offs,
725 Leaf::from_parts(
726 &self.factories.layer_factories.child.child,
727 self.times,
728 self.diffs,
729 ),
730 ),
731 ),
732 factories: self.factories,
733 }
734 }
735
736 fn num_keys(&self) -> usize {
737 self.keys.len()
738 }
739
740 fn num_tuples(&self) -> usize {
741 self.diffs.len()
742 }
743}
744
745