Skip to main content

dbsp/trace/ord/file/
key_batch.rs

1use crate::storage::tracking_bloom_filter::BloomFilterStats;
2use crate::trace::cursor::Position;
3use crate::{
4    DBData, DBWeight, NumEntries, Runtime, Timestamp,
5    dynamic::{
6        DataTrait, DynDataTyped, DynOpt, DynPair, DynUnit, DynVec, DynWeightedPairs, Erase,
7        Factory, LeanVec, WeightTrait, WithFactory,
8    },
9    storage::{
10        buffer_cache::CacheStats,
11        file::{
12            Factories as FileFactories,
13            reader::{Cursor as FileCursor, Error as ReaderError, Reader},
14            writer::Writer2,
15        },
16    },
17    trace::{
18        Batch, BatchFactories, BatchLocation, BatchReader, BatchReaderFactories, Builder, Cursor,
19        WeightedItem,
20        ord::{file::UnwrapStorage, merge_batcher::MergeBatcher},
21    },
22    utils::Tup2,
23};
24use feldera_storage::{FileReader, StoragePath};
25use rand::{Rng, seq::index::sample};
26use rkyv::{Archive, Archived, Deserialize, Fallible, Serialize, ser::Serializer};
27use size_of::SizeOf;
28use std::any::TypeId;
29use std::{
30    fmt::{self, Debug},
31    sync::Arc,
32};
33
34pub struct FileKeyBatchFactories<K, T, R>
35where
36    K: DataTrait + ?Sized,
37    T: Timestamp,
38    R: WeightTrait + ?Sized,
39{
40    key_factory: &'static dyn Factory<K>,
41    weight_factory: &'static dyn Factory<R>,
42    weights_factory: &'static dyn Factory<DynVec<R>>,
43    keys_factory: &'static dyn Factory<DynVec<K>>,
44    item_factory: &'static dyn Factory<DynPair<K, DynUnit>>,
45    factories0: FileFactories<K, DynUnit>,
46    factories1: FileFactories<DynDataTyped<T>, R>,
47    opt_key_factory: &'static dyn Factory<DynOpt<K>>,
48    weighted_item_factory: &'static dyn Factory<WeightedItem<K, DynUnit, R>>,
49    weighted_items_factory: &'static dyn Factory<DynWeightedPairs<DynPair<K, DynUnit>, R>>,
50    weighted_vals_factory: &'static dyn Factory<DynWeightedPairs<DynUnit, R>>,
51    pub timediff_factory: &'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>,
52}
53
54impl<K, T, R> Clone for FileKeyBatchFactories<K, T, R>
55where
56    K: DataTrait + ?Sized,
57    T: Timestamp,
58    R: WeightTrait + ?Sized,
59{
60    fn clone(&self) -> Self {
61        Self {
62            key_factory: self.key_factory,
63            weight_factory: self.weight_factory,
64            weights_factory: self.weights_factory,
65            keys_factory: self.keys_factory,
66            item_factory: self.item_factory,
67            factories0: self.factories0.clone(),
68            factories1: self.factories1.clone(),
69            opt_key_factory: self.opt_key_factory,
70            weighted_item_factory: self.weighted_item_factory,
71            weighted_items_factory: self.weighted_items_factory,
72            weighted_vals_factory: self.weighted_vals_factory,
73            timediff_factory: self.timediff_factory,
74        }
75    }
76}
77
78impl<K, T, R> BatchReaderFactories<K, DynUnit, T, R> for FileKeyBatchFactories<K, T, R>
79where
80    K: DataTrait + ?Sized,
81    T: Timestamp,
82    R: WeightTrait + ?Sized,
83{
84    fn new<KType, VType, RType>() -> Self
85    where
86        KType: DBData + Erase<K>,
87        VType: DBData + Erase<DynUnit>,
88        RType: DBWeight + Erase<R>,
89    {
90        Self {
91            key_factory: WithFactory::<KType>::FACTORY,
92            weight_factory: WithFactory::<RType>::FACTORY,
93            weights_factory: WithFactory::<LeanVec<RType>>::FACTORY,
94            keys_factory: WithFactory::<LeanVec<KType>>::FACTORY,
95            item_factory: WithFactory::<Tup2<KType, ()>>::FACTORY,
96            factories0: FileFactories::new::<KType, ()>(),
97            factories1: FileFactories::new::<T, RType>(),
98            opt_key_factory: WithFactory::<Option<KType>>::FACTORY,
99            weighted_item_factory: WithFactory::<Tup2<Tup2<KType, ()>, RType>>::FACTORY,
100            weighted_items_factory: WithFactory::<LeanVec<Tup2<Tup2<KType, ()>, RType>>>::FACTORY,
101            weighted_vals_factory: WithFactory::<LeanVec<Tup2<(), RType>>>::FACTORY,
102            timediff_factory: WithFactory::<LeanVec<Tup2<T, RType>>>::FACTORY,
103        }
104    }
105
106    fn key_factory(&self) -> &'static dyn Factory<K> {
107        self.key_factory
108    }
109
110    fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>> {
111        self.keys_factory
112    }
113
114    fn val_factory(&self) -> &'static dyn Factory<DynUnit> {
115        WithFactory::<()>::FACTORY
116    }
117
118    fn weight_factory(&self) -> &'static dyn Factory<R> {
119        self.weight_factory
120    }
121}
122
123impl<K, T, R> BatchFactories<K, DynUnit, T, R> for FileKeyBatchFactories<K, T, R>
124where
125    K: DataTrait + ?Sized,
126    T: Timestamp,
127    R: WeightTrait + ?Sized,
128{
129    fn item_factory(&self) -> &'static dyn Factory<DynPair<K, DynUnit>> {
130        self.item_factory
131    }
132
133    fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, DynUnit, R>> {
134        self.weighted_item_factory
135    }
136
137    fn weighted_items_factory(
138        &self,
139    ) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, DynUnit>, R>> {
140        self.weighted_items_factory
141    }
142
143    fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynUnit, R>> {
144        self.weighted_vals_factory
145    }
146
147    fn time_diffs_factory(
148        &self,
149    ) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>> {
150        Some(self.timediff_factory)
151    }
152}
153
154/// A batch of keys with weights and times.
155///
156/// Each tuple in `FileKeyBatch<K, T, R>` has key type `K`, value type `()`,
157/// weight type `R`, and time type `R`.
158#[derive(SizeOf)]
159pub struct FileKeyBatch<K, T, R>
160where
161    K: DataTrait + ?Sized,
162    T: Timestamp,
163    R: WeightTrait + ?Sized,
164{
165    #[size_of(skip)]
166    factories: FileKeyBatchFactories<K, T, R>,
167    #[allow(clippy::type_complexity)]
168    file: Arc<
169        Reader<(
170            &'static K,
171            &'static DynUnit,
172            (&'static DynDataTyped<T>, &'static R, ()),
173        )>,
174    >,
175}
176
177impl<K, T, R> Debug for FileKeyBatch<K, T, R>
178where
179    K: DataTrait + ?Sized,
180    T: Timestamp,
181    R: WeightTrait + ?Sized,
182{
183    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
184        write!(f, "FileKeyBatch {{ data: ")?;
185        let mut cursor = self.cursor();
186        let mut n_keys = 0;
187        while cursor.key_valid() {
188            if n_keys > 0 {
189                write!(f, ", ")?;
190            }
191            write!(f, "{:?}(", cursor.key())?;
192            let mut n_values = 0;
193            cursor.map_times(&mut |time, diff| {
194                if n_values > 0 {
195                    let _ = write!(f, ", ");
196                }
197                let _ = write!(f, "({time:?}, {diff:+?})");
198                n_values += 1;
199            });
200            write!(f, ")")?;
201            n_keys += 1;
202            cursor.step_key();
203        }
204        write!(f, " }}")
205    }
206}
207
208impl<K, T, R> Clone for FileKeyBatch<K, T, R>
209where
210    K: DataTrait + ?Sized,
211    T: Timestamp,
212    R: WeightTrait + ?Sized,
213{
214    fn clone(&self) -> Self {
215        Self {
216            factories: self.factories.clone(),
217            file: self.file.clone(),
218        }
219    }
220}
221
222impl<K, T, R> NumEntries for FileKeyBatch<K, T, R>
223where
224    K: DataTrait + ?Sized,
225    T: Timestamp,
226    R: WeightTrait + ?Sized,
227{
228    const CONST_NUM_ENTRIES: Option<usize> = None;
229
230    fn num_entries_shallow(&self) -> usize {
231        self.file.rows().len() as usize
232    }
233
234    fn num_entries_deep(&self) -> usize {
235        self.file.n_rows(1) as usize
236    }
237}
238
239impl<K, T, R> BatchReader for FileKeyBatch<K, T, R>
240where
241    K: DataTrait + ?Sized,
242    T: Timestamp,
243    R: WeightTrait + ?Sized,
244{
245    type Factories = FileKeyBatchFactories<K, T, R>;
246    type Key = K;
247    type Val = DynUnit;
248    type Time = T;
249    type R = R;
250    type Cursor<'s> = FileKeyCursor<'s, K, T, R>;
251
252    fn factories(&self) -> Self::Factories {
253        self.factories.clone()
254    }
255
256    fn cursor(&self) -> Self::Cursor<'_> {
257        FileKeyCursor::new(self)
258    }
259
260    #[inline]
261    fn key_count(&self) -> usize {
262        self.file.n_rows(0) as usize
263    }
264
265    #[inline]
266    fn len(&self) -> usize {
267        self.file.n_rows(1) as usize
268    }
269
270    fn approximate_byte_size(&self) -> usize {
271        self.file.byte_size().unwrap_storage() as usize
272    }
273
274    fn filter_stats(&self) -> BloomFilterStats {
275        self.file.filter_stats()
276    }
277
278    #[inline]
279    fn location(&self) -> BatchLocation {
280        BatchLocation::Storage
281    }
282
283    fn cache_stats(&self) -> CacheStats {
284        self.file.cache_stats()
285    }
286
287    fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, output: &mut DynVec<Self::Key>)
288    where
289        RG: Rng,
290    {
291        let size = self.key_count();
292        let mut cursor = self.cursor();
293        if sample_size >= size {
294            output.reserve(size);
295
296            while cursor.key_valid() {
297                output.push_ref(cursor.key());
298                cursor.step_key();
299            }
300        } else {
301            output.reserve(sample_size);
302
303            let mut indexes = sample(rng, size, sample_size).into_vec();
304            indexes.sort_unstable();
305            for index in indexes.into_iter() {
306                cursor.move_key(|key_cursor| unsafe { key_cursor.move_to_row(index as u64) });
307                output.push_ref(cursor.key());
308            }
309        }
310    }
311
312    fn maybe_contains_key(&self, hash: u64) -> bool {
313        self.file.maybe_contains_key(hash)
314    }
315}
316
317impl<K, T, R> Batch for FileKeyBatch<K, T, R>
318where
319    K: DataTrait + ?Sized,
320    T: Timestamp,
321    R: WeightTrait + ?Sized,
322{
323    type Timed<T2: Timestamp> = FileKeyBatch<K, T2, R>;
324    type Batcher = MergeBatcher<Self>;
325    type Builder = FileKeyBuilder<K, T, R>;
326
327    fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
328        self.file.mark_for_checkpoint();
329        Some(self.file.file_handle().clone())
330    }
331
332    fn from_path(factories: &Self::Factories, path: &StoragePath) -> Result<Self, ReaderError> {
333        let any_factory0 = factories.factories0.any_factories();
334        let any_factory1 = factories.factories1.any_factories();
335        let file = Arc::new(Reader::open(
336            &[&any_factory0, &any_factory1],
337            Runtime::buffer_cache,
338            &*Runtime::storage_backend().unwrap_storage(),
339            path,
340        )?);
341
342        Ok(Self {
343            factories: factories.clone(),
344            file,
345        })
346    }
347}
348
349type RawKeyCursor<'s, K, T, R> = FileCursor<
350    's,
351    K,
352    DynUnit,
353    (&'static DynDataTyped<T>, &'static R, ()),
354    (
355        &'static K,
356        &'static DynUnit,
357        (&'static DynDataTyped<T>, &'static R, ()),
358    ),
359>;
360
361/// A cursor for navigating a single layer.
362#[derive(Debug, SizeOf)]
363pub struct FileKeyCursor<'s, K, T, R>
364where
365    K: DataTrait + ?Sized,
366    T: Timestamp,
367    R: WeightTrait + ?Sized,
368{
369    batch: &'s FileKeyBatch<K, T, R>,
370    pub(crate) cursor: RawKeyCursor<'s, K, T, R>,
371    val_valid: bool,
372
373    pub(crate) time: Box<DynDataTyped<T>>,
374    pub(crate) diff: Box<R>,
375}
376
377impl<K, T, R> Clone for FileKeyCursor<'_, K, T, R>
378where
379    K: DataTrait + ?Sized,
380    T: Timestamp,
381    R: WeightTrait + ?Sized,
382{
383    fn clone(&self) -> Self {
384        Self {
385            batch: self.batch,
386            cursor: self.cursor.clone(),
387            val_valid: self.val_valid,
388
389            // These don't need to be cloned because they're only used for
390            // temporary storage.
391            time: self.batch.factories.factories1.key_factory.default_box(),
392            diff: self.batch.factories.weight_factory.default_box(),
393        }
394    }
395}
396
397impl<'s, K, T, R> FileKeyCursor<'s, K, T, R>
398where
399    K: DataTrait + ?Sized,
400    T: Timestamp,
401    R: WeightTrait + ?Sized,
402{
403    fn new_from(batch: &'s FileKeyBatch<K, T, R>, lower_bound: usize) -> Self {
404        let cursor = unsafe {
405            batch
406                .file
407                .rows()
408                .subset(lower_bound as u64..)
409                .first()
410                .unwrap_storage()
411        };
412        let key_valid = cursor.has_value();
413
414        Self {
415            batch,
416            cursor,
417            val_valid: key_valid,
418            time: batch.factories.factories1.key_factory.default_box(),
419            diff: batch.factories.weight_factory.default_box(),
420        }
421    }
422
423    fn new(batch: &'s FileKeyBatch<K, T, R>) -> Self {
424        Self::new_from(batch, 0)
425    }
426
427    fn move_key<F>(&mut self, op: F)
428    where
429        F: Fn(&mut RawKeyCursor<'s, K, T, R>) -> Result<(), ReaderError>,
430    {
431        op(&mut self.cursor).unwrap_storage();
432        self.val_valid = self.cursor.has_value();
433    }
434}
435
436impl<K, T, R> Cursor<K, DynUnit, T, R> for FileKeyCursor<'_, K, T, R>
437where
438    K: DataTrait + ?Sized,
439    T: Timestamp,
440    R: WeightTrait + ?Sized,
441{
442    fn weight_factory(&self) -> &'static dyn Factory<R> {
443        self.batch.factories.weight_factory
444    }
445
446    fn key(&self) -> &K {
447        debug_assert!(self.key_valid());
448        self.cursor.key().unwrap()
449    }
450
451    fn val(&self) -> &DynUnit {
452        &()
453    }
454
455    fn map_times(&mut self, logic: &mut dyn FnMut(&T, &R)) {
456        let mut val_cursor = unsafe {
457            self.cursor
458                .next_column()
459                .unwrap_storage()
460                .first()
461                .unwrap_storage()
462        };
463        while unsafe { val_cursor.item((self.time.as_mut(), &mut self.diff)) }.is_some() {
464            logic(self.time.as_ref(), self.diff.as_ref());
465            unsafe { val_cursor.move_next() }.unwrap_storage();
466        }
467    }
468
469    fn map_times_through(&mut self, upper: &T, logic: &mut dyn FnMut(&T, &R)) {
470        let mut val_cursor = unsafe {
471            self.cursor
472                .next_column()
473                .unwrap_storage()
474                .first()
475                .unwrap_storage()
476        };
477        while unsafe { val_cursor.item((self.time.as_mut(), &mut self.diff)) }.is_some() {
478            if self.time.less_equal(upper) {
479                logic(self.time.as_ref(), self.diff.as_ref());
480            }
481            unsafe { val_cursor.move_next() }.unwrap_storage();
482        }
483    }
484
485    fn map_values(&mut self, logic: &mut dyn FnMut(&DynUnit, &R))
486    where
487        T: PartialEq<()>,
488    {
489        if self.val_valid {
490            logic(&(), self.weight())
491        }
492    }
493
494    fn weight(&mut self) -> &R
495    where
496        T: PartialEq<()>,
497    {
498        self.weight_checked()
499    }
500
501    fn weight_checked(&mut self) -> &R {
502        if TypeId::of::<T>() == TypeId::of::<()>() {
503            let val_cursor = unsafe {
504                self.cursor
505                    .next_column()
506                    .unwrap_storage()
507                    .first()
508                    .unwrap_storage()
509            };
510            unsafe { val_cursor.aux(&mut self.diff) }.unwrap();
511            self.diff.as_ref()
512        } else {
513            panic!("FileKeyCursor::weight_checked called on non-unit timestamp type");
514        }
515    }
516
517    fn key_valid(&self) -> bool {
518        self.cursor.has_value()
519    }
520
521    fn val_valid(&self) -> bool {
522        self.val_valid
523    }
524
525    fn step_key(&mut self) {
526        self.move_key(|key_cursor| unsafe { key_cursor.move_next() });
527    }
528
529    fn step_key_reverse(&mut self) {
530        self.move_key(|key_cursor| unsafe { key_cursor.move_prev() });
531    }
532
533    fn seek_key(&mut self, key: &K) {
534        self.move_key(|key_cursor| unsafe { key_cursor.advance_to_value_or_larger(key) });
535    }
536
537    fn seek_key_exact(&mut self, key: &K, hash: Option<u64>) -> bool {
538        let hash = hash.unwrap_or_else(|| key.default_hash());
539        if !self.batch.maybe_contains_key(hash) {
540            return false;
541        }
542        self.seek_key(key);
543        self.key_valid() && self.key().eq(key)
544    }
545
546    fn seek_key_with(&mut self, predicate: &dyn Fn(&K) -> bool) {
547        self.move_key(|key_cursor| unsafe { key_cursor.seek_forward_until(predicate) });
548    }
549
550    fn seek_key_with_reverse(&mut self, predicate: &dyn Fn(&K) -> bool) {
551        self.move_key(|key_cursor| unsafe { key_cursor.seek_backward_until(predicate) });
552    }
553
554    fn seek_key_reverse(&mut self, key: &K) {
555        self.move_key(|key_cursor| unsafe { key_cursor.rewind_to_value_or_smaller(key) });
556    }
557
558    fn step_val(&mut self) {
559        self.val_valid = false;
560    }
561
562    fn seek_val(&mut self, _val: &DynUnit) {}
563
564    fn seek_val_with(&mut self, predicate: &dyn Fn(&DynUnit) -> bool) {
565        if !predicate(&()) {
566            self.val_valid = false;
567        }
568    }
569
570    fn rewind_keys(&mut self) {
571        self.move_key(|key_cursor| unsafe { key_cursor.move_first() });
572    }
573
574    fn fast_forward_keys(&mut self) {
575        self.move_key(|key_cursor| unsafe { key_cursor.move_last() });
576    }
577
578    fn rewind_vals(&mut self) {
579        self.val_valid = true;
580    }
581
582    fn step_val_reverse(&mut self) {
583        self.val_valid = false;
584    }
585
586    fn seek_val_reverse(&mut self, _val: &DynUnit) {}
587
588    fn seek_val_with_reverse(&mut self, predicate: &dyn Fn(&DynUnit) -> bool) {
589        if !predicate(&()) {
590            self.val_valid = false;
591        }
592    }
593
594    fn fast_forward_vals(&mut self) {
595        self.val_valid = true;
596    }
597
598    fn position(&self) -> Option<Position> {
599        Some(Position {
600            total: self.cursor.len(),
601            offset: self.cursor.absolute_position(),
602        })
603    }
604}
605
606/// A builder for creating layers from unsorted update tuples.
607#[derive(SizeOf)]
608pub struct FileKeyBuilder<K, T, R>
609where
610    K: DataTrait + ?Sized,
611    T: Timestamp,
612    R: WeightTrait + ?Sized,
613{
614    #[size_of(skip)]
615    factories: FileKeyBatchFactories<K, T, R>,
616    #[size_of(skip)]
617    writer: Writer2<K, DynUnit, DynDataTyped<T>, R>,
618    key: Box<DynOpt<K>>,
619    num_tuples: usize,
620}
621
622impl<K, T, R> Builder<FileKeyBatch<K, T, R>> for FileKeyBuilder<K, T, R>
623where
624    Self: SizeOf,
625    K: DataTrait + ?Sized,
626    T: Timestamp,
627    R: WeightTrait + ?Sized,
628{
629    #[inline]
630    fn with_capacity(
631        factories: &FileKeyBatchFactories<K, T, R>,
632        key_capacity: usize,
633        _value_capacity: usize,
634    ) -> Self {
635        Self {
636            factories: factories.clone(),
637            writer: Writer2::new(
638                &factories.factories0,
639                &factories.factories1,
640                Runtime::buffer_cache,
641                &*Runtime::storage_backend().unwrap_storage(),
642                Runtime::file_writer_parameters(),
643                key_capacity,
644            )
645            .unwrap_storage(),
646            key: factories.opt_key_factory.default_box(),
647            num_tuples: 0,
648        }
649    }
650
651    fn push_key(&mut self, key: &K) {
652        self.writer.write0((key, &())).unwrap_storage();
653    }
654
655    fn push_val(&mut self, _val: &DynUnit) {}
656
657    fn push_time_diff(&mut self, time: &T, weight: &R) {
658        debug_assert!(!weight.is_zero());
659        self.writer.write1((time, weight)).unwrap_storage();
660        self.num_tuples += 1;
661    }
662
663    fn done(self) -> FileKeyBatch<K, T, R> {
664        FileKeyBatch {
665            factories: self.factories,
666            file: Arc::new(self.writer.into_reader().unwrap_storage()),
667        }
668    }
669
670    fn num_keys(&self) -> usize {
671        self.writer.n_rows() as usize
672    }
673
674    fn num_tuples(&self) -> usize {
675        self.num_tuples
676    }
677}
678
679impl<K, T, R> Archive for FileKeyBatch<K, T, R>
680where
681    K: DataTrait + ?Sized,
682    T: Timestamp,
683    R: WeightTrait + ?Sized,
684{
685    type Archived = ();
686    type Resolver = ();
687
688    unsafe fn resolve(&self, _pos: usize, _resolver: Self::Resolver, _out: *mut Self::Archived) {
689        unimplemented!();
690    }
691}
692
693impl<K, T, R, S> Serialize<S> for FileKeyBatch<K, T, R>
694where
695    K: DataTrait + ?Sized,
696    T: Timestamp,
697    R: WeightTrait + ?Sized,
698    S: Serializer + ?Sized,
699{
700    fn serialize(&self, _serializer: &mut S) -> Result<Self::Resolver, S::Error> {
701        unimplemented!();
702    }
703}
704
705impl<K, T, R, D> Deserialize<FileKeyBatch<K, T, R>, D> for Archived<FileKeyBatch<K, T, R>>
706where
707    K: DataTrait + ?Sized,
708    T: Timestamp,
709    R: WeightTrait + ?Sized,
710    D: Fallible,
711{
712    fn deserialize(&self, _deserializer: &mut D) -> Result<FileKeyBatch<K, T, R>, D::Error> {
713        unimplemented!();
714    }
715}