Skip to main content

dbsp/trace/ord/vec/
key_batch.rs

1use crate::storage::tracking_bloom_filter::BloomFilterStats;
2use crate::trace::ord::merge_batcher::MergeBatcher;
3use crate::{
4    DBData, DBWeight, NumEntries, Timestamp,
5    dynamic::{
6        DataTrait, DynDataTyped, DynPair, DynUnit, DynVec, DynWeightedPairs, Erase, Factory,
7        LeanVec, WeightTrait, WithFactory,
8    },
9    trace::{
10        Batch, BatchFactories, BatchReader, BatchReaderFactories, Builder, Cursor, Deserializer,
11        Serializer, WeightedItem,
12        cursor::Position,
13        layers::{
14            Cursor as TrieCursor, Layer, LayerCursor, LayerFactories, Leaf, LeafFactories,
15            OrdOffset, Trie,
16        },
17    },
18    utils::{ConsolidatePairedSlices, Tup2},
19};
20use feldera_storage::FileReader;
21use rand::Rng;
22use rkyv::{Archive, Deserialize, Serialize};
23use size_of::SizeOf;
24use std::any::TypeId;
25use std::{
26    fmt::{self, Debug, Display},
27    sync::Arc,
28};
29
30pub struct VecKeyBatchFactories<K, T, R>
31where
32    K: DataTrait + ?Sized,
33    T: Timestamp,
34    R: WeightTrait + ?Sized,
35{
36    layer_factories: LayerFactories<K, LeafFactories<DynDataTyped<T>, R>>,
37    consolidate_weights: &'static dyn ConsolidatePairedSlices<DynDataTyped<T>, R>,
38    item_factory: &'static dyn Factory<DynPair<K, DynUnit>>,
39    weighted_item_factory: &'static dyn Factory<WeightedItem<K, DynUnit, R>>,
40    weighted_items_factory: &'static dyn Factory<DynWeightedPairs<DynPair<K, DynUnit>, R>>,
41    weighted_vals_factory: &'static dyn Factory<DynWeightedPairs<DynUnit, R>>,
42    time_diffs_factory: &'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>,
43}
44
45unsafe impl<K, T, R> Send for VecKeyBatchFactories<K, T, R>
46where
47    K: DataTrait + ?Sized,
48    T: Timestamp,
49    R: WeightTrait + ?Sized,
50{
51}
52
53impl<K, T, R> Clone for VecKeyBatchFactories<K, T, R>
54where
55    K: DataTrait + ?Sized,
56    T: Timestamp,
57    R: WeightTrait + ?Sized,
58{
59    fn clone(&self) -> Self {
60        Self {
61            layer_factories: self.layer_factories.clone(),
62            consolidate_weights: self.consolidate_weights,
63            item_factory: self.item_factory,
64            weighted_item_factory: self.weighted_item_factory,
65            weighted_items_factory: self.weighted_items_factory,
66            weighted_vals_factory: self.weighted_vals_factory,
67            time_diffs_factory: self.time_diffs_factory,
68        }
69    }
70}
71
72impl<K, T, R> BatchReaderFactories<K, DynUnit, T, R> for VecKeyBatchFactories<K, T, R>
73where
74    K: DataTrait + ?Sized,
75    T: Timestamp,
76    R: WeightTrait + ?Sized,
77{
78    fn new<KType, VType, RType>() -> Self
79    where
80        KType: DBData + Erase<K>,
81        VType: DBData + Erase<DynUnit>,
82        RType: DBWeight + Erase<R>,
83    {
84        Self {
85            layer_factories: LayerFactories::new::<KType>(
86                <LeafFactories<DynDataTyped<T>, R>>::new::<T, RType>(),
87            ),
88            consolidate_weights: <dyn ConsolidatePairedSlices<_, _>>::factory::<T, RType>(),
89            item_factory: WithFactory::<Tup2<KType, ()>>::FACTORY,
90            weighted_item_factory: WithFactory::<Tup2<Tup2<KType, ()>, RType>>::FACTORY,
91            weighted_items_factory: WithFactory::<LeanVec<Tup2<Tup2<KType, ()>, RType>>>::FACTORY,
92            weighted_vals_factory: WithFactory::<LeanVec<Tup2<(), RType>>>::FACTORY,
93            time_diffs_factory: WithFactory::<LeanVec<Tup2<T, RType>>>::FACTORY,
94        }
95    }
96
97    fn key_factory(&self) -> &'static dyn Factory<K> {
98        self.layer_factories.key
99    }
100
101    fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>> {
102        self.layer_factories.keys
103    }
104
105    fn val_factory(&self) -> &'static dyn Factory<DynUnit> {
106        WithFactory::<()>::FACTORY
107    }
108
109    fn weight_factory(&self) -> &'static dyn Factory<R> {
110        self.layer_factories.child.diff
111    }
112}
113
114impl<K, R, T> BatchFactories<K, DynUnit, T, R> for VecKeyBatchFactories<K, T, R>
115where
116    K: DataTrait + ?Sized,
117    T: Timestamp,
118    R: WeightTrait + ?Sized,
119{
120    // type BatchItemFactory = BatchItemFactory<K, (), K, R>;
121
122    fn item_factory(&self) -> &'static dyn Factory<DynPair<K, DynUnit>> {
123        self.item_factory
124    }
125
126    fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, DynUnit, R>> {
127        self.weighted_item_factory
128    }
129
130    fn weighted_items_factory(
131        &self,
132    ) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, DynUnit>, R>> {
133        self.weighted_items_factory
134    }
135
136    fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynUnit, R>> {
137        self.weighted_vals_factory
138    }
139
140    fn time_diffs_factory(
141        &self,
142    ) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>> {
143        Some(self.time_diffs_factory)
144    }
145}
146
147pub type VecKeyBatchLayer<K, T, R, O> = Layer<K, Leaf<DynDataTyped<T>, R>, O>;
148
149/// An immutable collection of update tuples, from a contiguous interval of
150/// logical times.
151#[derive(SizeOf)]
152pub struct VecKeyBatch<K, T, R, O = usize>
153where
154    K: DataTrait + ?Sized,
155    T: Timestamp,
156    R: WeightTrait + ?Sized,
157    O: OrdOffset,
158{
159    /// Where all the dataz is.
160    pub layer: VecKeyBatchLayer<K, T, R, O>,
161    #[size_of(skip)]
162    factories: VecKeyBatchFactories<K, T, R>,
163}
164
165impl<K, T, R, O> Debug for VecKeyBatch<K, T, R, O>
166where
167    K: DataTrait + ?Sized,
168    R: WeightTrait + ?Sized,
169    T: Timestamp,
170    O: OrdOffset,
171{
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        f.debug_struct("VecKeyBatch")
174            .field("layer", &self.layer)
175            .finish()
176    }
177}
178
179impl<K, T, R, O: OrdOffset> Deserialize<VecKeyBatch<K, T, R, O>, Deserializer> for ()
180where
181    K: DataTrait + ?Sized,
182    T: Timestamp,
183    R: WeightTrait + ?Sized,
184    O: OrdOffset,
185{
186    fn deserialize(
187        &self,
188        _deserializer: &mut Deserializer,
189    ) -> Result<VecKeyBatch<K, T, R, O>, <Deserializer as rkyv::Fallible>::Error> {
190        todo!()
191    }
192}
193
194impl<K, T, R, O> Archive for VecKeyBatch<K, T, R, O>
195where
196    K: DataTrait + ?Sized,
197    T: Timestamp,
198    R: WeightTrait + ?Sized,
199    O: OrdOffset,
200{
201    type Archived = ();
202    type Resolver = ();
203
204    unsafe fn resolve(&self, _pos: usize, _resolver: Self::Resolver, _out: *mut Self::Archived) {
205        todo!()
206    }
207}
208impl<K, T, R, O: OrdOffset> Serialize<Serializer> for VecKeyBatch<K, T, R, O>
209where
210    K: DataTrait + ?Sized,
211    T: Timestamp,
212    R: WeightTrait + ?Sized,
213    O: OrdOffset,
214{
215    fn serialize(
216        &self,
217        _serializer: &mut Serializer,
218    ) -> Result<Self::Resolver, <Serializer as rkyv::Fallible>::Error> {
219        todo!()
220    }
221}
222
223impl<K, T, R, O> Clone for VecKeyBatch<K, T, R, O>
224where
225    K: DataTrait + ?Sized,
226    T: Timestamp,
227    R: WeightTrait + ?Sized,
228    O: OrdOffset,
229{
230    fn clone(&self) -> Self {
231        Self {
232            layer: self.layer.clone(),
233            factories: self.factories.clone(),
234        }
235    }
236}
237
238impl<K, T, R, O> Display for VecKeyBatch<K, T, R, O>
239where
240    K: DataTrait + ?Sized,
241    T: Timestamp,
242    R: WeightTrait + ?Sized,
243    O: OrdOffset,
244{
245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246        writeln!(
247            f,
248            "layer:\n{}",
249            textwrap::indent(&self.layer.to_string(), "    ")
250        )
251    }
252}
253
254impl<K, T, R, O> NumEntries for VecKeyBatch<K, T, R, O>
255where
256    K: DataTrait + ?Sized,
257    T: Timestamp,
258    R: WeightTrait + ?Sized,
259    O: OrdOffset,
260{
261    const CONST_NUM_ENTRIES: Option<usize> = <VecKeyBatchLayer<K, T, R, O>>::CONST_NUM_ENTRIES;
262
263    #[inline]
264    fn num_entries_shallow(&self) -> usize {
265        self.layer.num_entries_shallow()
266    }
267
268    #[inline]
269    fn num_entries_deep(&self) -> usize {
270        self.layer.num_entries_deep()
271    }
272}
273
274impl<K, T, R, O> BatchReader for VecKeyBatch<K, T, R, O>
275where
276    K: DataTrait + ?Sized,
277    T: Timestamp,
278    R: WeightTrait + ?Sized,
279    O: OrdOffset,
280{
281    type Key = K;
282    type Val = DynUnit;
283    type Time = T;
284    type R = R;
285    type Cursor<'s>
286        = ValKeyCursor<'s, K, T, R, O>
287    where
288        O: 's;
289    type Factories = VecKeyBatchFactories<K, T, R>;
290    // type Consumer = VecKeyConsumer<K, T, R, O>;
291
292    fn factories(&self) -> Self::Factories {
293        self.factories.clone()
294    }
295
296    fn cursor(&self) -> Self::Cursor<'_> {
297        ValKeyCursor {
298            valid: true,
299            cursor: self.layer.cursor(),
300        }
301    }
302
303    /*fn consumer(self) -> Self::Consumer {
304        todo!()
305    }*/
306
307    fn key_count(&self) -> usize {
308        <VecKeyBatchLayer<K, T, R, O> as Trie>::keys(&self.layer)
309    }
310
311    fn len(&self) -> usize {
312        <VecKeyBatchLayer<K, T, R, O> as Trie>::tuples(&self.layer)
313    }
314
315    fn approximate_byte_size(&self) -> usize {
316        self.layer.approximate_byte_size()
317    }
318
319    fn filter_stats(&self) -> BloomFilterStats {
320        BloomFilterStats::default()
321    }
322
323    fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
324    where
325        Self::Time: PartialEq<()>,
326        RG: Rng,
327    {
328        self.layer.sample_keys(rng, sample_size, sample);
329    }
330}
331
332impl<K, T, R, O> Batch for VecKeyBatch<K, T, R, O>
333where
334    K: DataTrait + ?Sized,
335    T: Timestamp,
336    R: WeightTrait + ?Sized,
337    O: OrdOffset,
338{
339    type Timed<T2: Timestamp> = VecKeyBatch<K, T2, R, O>;
340    type Batcher = MergeBatcher<Self>;
341    type Builder = VecKeyBuilder<K, T, R, O>;
342    fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
343        unimplemented!()
344    }
345
346    /*fn from_keys(time: Self::Time, keys: Vec<(Self::Key, Self::R)>) -> Self {
347        Self::from_tuples(time, keys)
348    }*/
349}
350
351/// A cursor for navigating a single layer.
352#[derive(Debug, SizeOf)]
353pub struct ValKeyCursor<'s, K, T, R, O = usize>
354where
355    K: DataTrait + ?Sized,
356    T: Timestamp,
357    R: WeightTrait + ?Sized,
358    O: OrdOffset,
359{
360    valid: bool,
361    cursor: LayerCursor<'s, K, Leaf<DynDataTyped<T>, R>, O>,
362}
363
364impl<K, T, R, O> Clone for ValKeyCursor<'_, K, T, R, O>
365where
366    K: DataTrait + ?Sized,
367    T: Timestamp,
368    R: WeightTrait + ?Sized,
369    O: OrdOffset,
370{
371    fn clone(&self) -> Self {
372        Self {
373            valid: self.valid,
374            cursor: self.cursor.clone(),
375        }
376    }
377}
378
379impl<K, T, R, O> Cursor<K, DynUnit, T, R> for ValKeyCursor<'_, K, T, R, O>
380where
381    K: DataTrait + ?Sized,
382    T: Timestamp,
383    R: WeightTrait + ?Sized,
384    O: OrdOffset,
385{
386    // fn key_factory(&self) -> &'static Factory<K> {
387    //     self.cursor.storage.factories.key
388    // }
389
390    // fn val_factory(&self) -> &'static Factory<()> {
391    //     todo!()
392    // }
393
394    fn weight_factory(&self) -> &'static dyn Factory<R> {
395        self.cursor.child.storage.factories.diff
396    }
397
398    fn key(&self) -> &K {
399        self.cursor.item()
400    }
401
402    fn val(&self) -> &DynUnit {
403        &()
404    }
405
406    fn map_times(&mut self, logic: &mut dyn FnMut(&T, &R)) {
407        self.cursor.child.rewind();
408        while self.cursor.child.valid() {
409            logic(
410                self.cursor.child.current_key(),
411                self.cursor.child.current_diff(),
412            );
413            self.cursor.child.step();
414        }
415    }
416
417    fn map_times_through(&mut self, upper: &T, logic: &mut dyn FnMut(&T, &R)) {
418        self.cursor.child.rewind();
419        while self.cursor.child.valid() {
420            if self.cursor.child.item().0.less_equal(upper) {
421                logic(
422                    self.cursor.child.current_key(),
423                    self.cursor.child.current_diff(),
424                );
425            }
426            self.cursor.child.step();
427        }
428    }
429
430    fn weight(&mut self) -> &R
431    where
432        T: PartialEq<()>,
433    {
434        self.weight_checked()
435    }
436
437    fn weight_checked(&mut self) -> &R {
438        if TypeId::of::<T>() == TypeId::of::<()>() {
439            debug_assert!(&self.cursor.child.valid());
440            self.cursor.child.current_diff()
441        } else {
442            panic!("VecKeyCursor::weight_checked called on non-unit timestamp type");
443        }
444    }
445
446    fn map_values(&mut self, logic: &mut dyn FnMut(&DynUnit, &R))
447    where
448        T: PartialEq<()>,
449    {
450        if self.val_valid() {
451            logic(self.val(), self.cursor.child.current_diff());
452        }
453    }
454
455    fn key_valid(&self) -> bool {
456        self.cursor.valid()
457    }
458
459    fn val_valid(&self) -> bool {
460        self.valid
461    }
462
463    fn step_key(&mut self) {
464        self.cursor.step();
465        self.valid = true;
466    }
467
468    fn step_key_reverse(&mut self) {
469        self.cursor.step_reverse();
470        self.valid = true;
471    }
472
473    fn seek_key(&mut self, key: &K) {
474        self.cursor.seek(key);
475        self.valid = true;
476    }
477
478    fn seek_key_exact(&mut self, key: &K, _hash: Option<u64>) -> bool {
479        self.seek_key(key);
480        self.key_valid() && self.key().eq(key)
481    }
482
483    fn seek_key_with(&mut self, predicate: &dyn Fn(&K) -> bool) {
484        self.cursor.seek_with(predicate);
485        self.valid = true;
486    }
487
488    fn seek_key_with_reverse(&mut self, predicate: &dyn Fn(&K) -> bool) {
489        self.cursor.seek_with_reverse(predicate);
490        self.valid = true;
491    }
492
493    fn seek_key_reverse(&mut self, key: &K) {
494        self.cursor.seek_reverse(key);
495        self.valid = true;
496    }
497
498    fn step_val(&mut self) {
499        self.valid = false;
500    }
501
502    fn seek_val(&mut self, _val: &DynUnit) {}
503
504    fn seek_val_with(&mut self, predicate: &dyn Fn(&DynUnit) -> bool) {
505        if !predicate(&()) {
506            self.valid = false;
507        }
508    }
509
510    fn rewind_keys(&mut self) {
511        self.cursor.rewind();
512        self.valid = true;
513    }
514
515    fn fast_forward_keys(&mut self) {
516        self.cursor.fast_forward();
517        self.valid = true;
518    }
519
520    fn rewind_vals(&mut self) {
521        self.valid = true;
522    }
523
524    fn step_val_reverse(&mut self) {
525        self.valid = false;
526    }
527
528    fn seek_val_reverse(&mut self, _val: &DynUnit) {}
529
530    fn seek_val_with_reverse(&mut self, predicate: &dyn Fn(&DynUnit) -> bool) {
531        if !predicate(&()) {
532            self.valid = false;
533        }
534    }
535
536    fn fast_forward_vals(&mut self) {
537        self.valid = true;
538    }
539
540    fn position(&self) -> Option<Position> {
541        Some(Position {
542            total: TrieCursor::keys(&self.cursor) as u64,
543            offset: self.cursor.pos() as u64,
544        })
545    }
546}
547
548/// A builder for creating layers from unsorted update tuples.
549#[derive(SizeOf)]
550pub struct VecKeyBuilder<K, T, R, O = usize>
551where
552    K: DataTrait + ?Sized,
553    T: Timestamp,
554    R: WeightTrait + ?Sized,
555    O: OrdOffset,
556{
557    #[size_of(skip)]
558    factories: VecKeyBatchFactories<K, T, R>,
559    keys: Box<DynVec<K>>,
560    offs: Vec<O>,
561    times: Box<DynVec<DynDataTyped<T>>>,
562    diffs: Box<DynVec<R>>,
563}
564
565impl<K, T, R, O> VecKeyBuilder<K, T, R, O>
566where
567    K: DataTrait + ?Sized,
568    T: Timestamp,
569    R: WeightTrait + ?Sized,
570    O: OrdOffset,
571{
572    fn pushed_key(&mut self) {
573        let off = O::from_usize(self.times.len());
574        debug_assert!(off > *self.offs.last().unwrap());
575        self.offs.push(off);
576    }
577}
578
579impl<K, T, R, O> Builder<VecKeyBatch<K, T, R, O>> for VecKeyBuilder<K, T, R, O>
580where
581    K: DataTrait + ?Sized,
582    T: Timestamp,
583    R: WeightTrait + ?Sized,
584    O: OrdOffset,
585{
586    fn with_capacity(
587        factories: &VecKeyBatchFactories<K, T, R>,
588        key_capacity: usize,
589        value_capacity: usize,
590    ) -> Self {
591        let mut keys = factories.layer_factories.keys.default_box();
592        keys.reserve_exact(key_capacity);
593
594        let mut offs = Vec::with_capacity(key_capacity + 1);
595        offs.push(O::zero());
596
597        let mut times = factories.layer_factories.child.keys.default_box();
598        times.reserve_exact(value_capacity);
599
600        let mut diffs = factories.layer_factories.child.diffs.default_box();
601        diffs.reserve_exact(value_capacity);
602        Self {
603            factories: factories.clone(),
604            keys,
605            offs,
606            times,
607            diffs,
608        }
609    }
610
611    fn reserve(&mut self, additional: usize) {
612        self.keys.reserve(additional);
613        self.offs.reserve(additional);
614        self.times.reserve(additional);
615        self.diffs.reserve(additional);
616    }
617
618    fn push_key(&mut self, key: &K) {
619        self.keys.push_ref(key);
620        self.pushed_key();
621    }
622
623    fn push_key_mut(&mut self, key: &mut K) {
624        self.keys.push_val(key);
625        self.pushed_key();
626    }
627
628    fn push_val(&mut self, _val: &DynUnit) {}
629
630    fn push_time_diff(&mut self, time: &T, weight: &R) {
631        debug_assert!(!weight.is_zero());
632        self.times.push(time.clone());
633        self.diffs.push_ref(weight);
634    }
635
636    fn push_time_diff_mut(&mut self, time: &mut T, weight: &mut R) {
637        debug_assert!(!weight.is_zero());
638        self.times.push(time.clone());
639        self.diffs.push_val(weight);
640    }
641
642    fn done(self) -> VecKeyBatch<K, T, R, O> {
643        VecKeyBatch {
644            layer: Layer::from_parts(
645                &self.factories.layer_factories,
646                self.keys,
647                self.offs,
648                Leaf::from_parts(
649                    &self.factories.layer_factories.child,
650                    self.times,
651                    self.diffs,
652                ),
653            ),
654            factories: self.factories,
655        }
656    }
657
658    fn num_keys(&self) -> usize {
659        self.keys.len()
660    }
661
662    fn num_tuples(&self) -> usize {
663        self.diffs.len()
664    }
665}
666
667/*pub struct VecKeyConsumer<K, T, R, O>
668where
669    K: 'static,
670    T: 'static,
671    R: 'static,
672    O: OrdOffset,
673{
674    consumer: OrderedLayerConsumer<K, T, R, O>,
675}
676
677impl<K, T, R, O> Consumer<K, (), R, T> for VecKeyConsumer<K, T, R, O>
678where
679    O: OrdOffset,
680{
681    type ValueConsumer<'a> = VecKeyValueConsumer<'a, K, T, R, O>
682    where
683        Self: 'a;
684
685    fn key_valid(&self) -> bool {
686        self.consumer.key_valid()
687    }
688
689    fn peek_key(&self) -> &K {
690        self.consumer.peek_key()
691    }
692
693    fn next_key(&mut self) -> (K, Self::ValueConsumer<'_>) {
694        let (key, values) = self.consumer.next_key();
695        (key, VecKeyValueConsumer::new(values))
696    }
697
698    fn seek_key(&mut self, key: &K)
699    where
700        K: Ord,
701    {
702        self.consumer.seek_key(key);
703    }
704}
705
706pub struct VecKeyValueConsumer<'a, K, T, R, O>
707where
708    T: 'static,
709    R: 'static,
710{
711    consumer: OrderedLayerValues<'a, T, R>,
712    __type: PhantomData<(K, O)>,
713}
714
715impl<'a, K, T, R, O> VecKeyValueConsumer<'a, K, T, R, O> {
716    const fn new(consumer: OrderedLayerValues<'a, T, R>) -> Self {
717        Self {
718            consumer,
719            __type: PhantomData,
720        }
721    }
722}
723
724impl<'a, K, T, R, O> ValueConsumer<'a, (), R, T> for VecKeyValueConsumer<'a, K, T, R, O> {
725    fn value_valid(&self) -> bool {
726        self.consumer.value_valid()
727    }
728
729    fn next_value(&mut self) -> ((), R, T) {
730        let (time, diff, ()) = self.consumer.next_value();
731        ((), diff, time)
732    }
733
734    fn remaining_values(&self) -> usize {
735        self.consumer.remaining_values()
736    }
737}
738*/