Skip to main content

dbsp/trace/ord/fallback/
val_batch.rs

1use std::fmt::{Display, Formatter};
2use std::sync::Arc;
3
4use super::utils::{copy_to_builder, pick_merge_destination};
5use crate::storage::buffer_cache::CacheStats;
6use crate::storage::tracking_bloom_filter::BloomFilterStats;
7use crate::trace::cursor::{DelegatingCursor, PushCursor};
8use crate::trace::ord::file::val_batch::FileValBuilder;
9use crate::trace::ord::vec::val_batch::VecValBuilder;
10use crate::trace::{BatchLocation, GroupFilter, MergeCursor};
11use crate::{
12    DBData, DBWeight, NumEntries, Timestamp,
13    dynamic::{
14        DataTrait, DynDataTyped, DynPair, DynVec, DynWeightedPairs, Erase, Factory, WeightTrait,
15    },
16    storage::file::reader::Error as ReaderError,
17    trace::{
18        Batch, BatchFactories, BatchReader, BatchReaderFactories, Builder, FileValBatch,
19        FileValBatchFactories, Filter, VecValBatch, VecValBatchFactories, WeightedItem,
20        ord::merge_batcher::MergeBatcher,
21    },
22};
23use derive_more::Debug;
24use feldera_storage::{FileReader, StoragePath};
25use rand::Rng;
26use rkyv::{Archive, Archived, Deserialize, Fallible, Serialize, ser::Serializer};
27use size_of::SizeOf;
28
29pub struct FallbackValBatchFactories<K, V, T, R>
30where
31    K: DataTrait + ?Sized,
32    V: DataTrait + ?Sized,
33    T: Timestamp,
34    R: WeightTrait + ?Sized,
35{
36    file: FileValBatchFactories<K, V, T, R>,
37    vec: VecValBatchFactories<K, V, T, R>,
38}
39
40impl<K, V, T, R> Clone for FallbackValBatchFactories<K, V, T, R>
41where
42    K: DataTrait + ?Sized,
43    V: DataTrait + ?Sized,
44    T: Timestamp,
45    R: WeightTrait + ?Sized,
46{
47    fn clone(&self) -> Self {
48        Self {
49            file: self.file.clone(),
50            vec: self.vec.clone(),
51        }
52    }
53}
54
55impl<K, V, T, R> BatchReaderFactories<K, V, T, R> for FallbackValBatchFactories<K, V, T, R>
56where
57    K: DataTrait + ?Sized,
58    V: DataTrait + ?Sized,
59    T: Timestamp,
60    R: WeightTrait + ?Sized,
61{
62    fn new<KType, VType, RType>() -> Self
63    where
64        KType: DBData + Erase<K>,
65        VType: DBData + Erase<V>,
66        RType: DBWeight + Erase<R>,
67    {
68        Self {
69            file: FileValBatchFactories::new::<KType, VType, RType>(),
70            vec: VecValBatchFactories::new::<KType, VType, RType>(),
71        }
72    }
73
74    fn key_factory(&self) -> &'static dyn Factory<K> {
75        self.file.key_factory()
76    }
77
78    fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>> {
79        self.file.keys_factory()
80    }
81
82    fn val_factory(&self) -> &'static dyn Factory<V> {
83        self.file.val_factory()
84    }
85
86    fn weight_factory(&self) -> &'static dyn Factory<R> {
87        self.file.weight_factory()
88    }
89}
90
91impl<K, V, T, R> BatchFactories<K, V, T, R> for FallbackValBatchFactories<K, V, T, R>
92where
93    K: DataTrait + ?Sized,
94    V: DataTrait + ?Sized,
95    T: Timestamp,
96    R: WeightTrait + ?Sized,
97{
98    fn item_factory(&self) -> &'static dyn Factory<DynPair<K, V>> {
99        self.file.item_factory()
100    }
101
102    fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, V, R>> {
103        self.file.weighted_item_factory()
104    }
105
106    fn weighted_items_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, V>, R>> {
107        self.file.weighted_items_factory()
108    }
109
110    fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<V, R>> {
111        self.file.weighted_vals_factory()
112    }
113
114    fn time_diffs_factory(
115        &self,
116    ) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>> {
117        self.file.time_diffs_factory()
118    }
119}
120
121/// An immutable collection of update tuples, from a contiguous interval of
122/// logical times.
123#[derive(Debug, SizeOf)]
124pub struct FallbackValBatch<K, V, T, R>
125where
126    K: DataTrait + ?Sized,
127    V: DataTrait + ?Sized,
128    T: Timestamp,
129    R: WeightTrait + ?Sized,
130{
131    #[size_of(skip)]
132    #[debug(skip)]
133    factories: FallbackValBatchFactories<K, V, T, R>,
134    inner: Inner<K, V, T, R>,
135}
136
137#[derive(Debug, SizeOf)]
138enum Inner<K, V, T, R>
139where
140    K: DataTrait + ?Sized,
141    V: DataTrait + ?Sized,
142    T: Timestamp,
143    R: WeightTrait + ?Sized,
144{
145    Vec(VecValBatch<K, V, T, R>),
146    File(FileValBatch<K, V, T, R>),
147}
148
149impl<K, V, T, R> Clone for FallbackValBatch<K, V, T, R>
150where
151    K: DataTrait + ?Sized,
152    V: DataTrait + ?Sized,
153    T: Timestamp,
154    R: WeightTrait + ?Sized,
155{
156    fn clone(&self) -> Self {
157        Self {
158            factories: self.factories.clone(),
159            inner: match &self.inner {
160                Inner::Vec(vec) => Inner::Vec(vec.clone()),
161                Inner::File(file) => Inner::File(file.clone()),
162            },
163        }
164    }
165}
166
167impl<K, V, T, R> NumEntries for FallbackValBatch<K, V, T, R>
168where
169    K: DataTrait + ?Sized,
170    V: DataTrait + ?Sized,
171    T: Timestamp,
172    R: WeightTrait + ?Sized,
173{
174    const CONST_NUM_ENTRIES: Option<usize> = None;
175
176    #[inline]
177    fn num_entries_shallow(&self) -> usize {
178        match &self.inner {
179            Inner::Vec(vec) => vec.num_entries_shallow(),
180            Inner::File(file) => file.num_entries_shallow(),
181        }
182    }
183
184    #[inline]
185    fn num_entries_deep(&self) -> usize {
186        match &self.inner {
187            Inner::Vec(vec) => vec.num_entries_deep(),
188            Inner::File(file) => file.num_entries_deep(),
189        }
190    }
191}
192
193impl<K, V, T, R> Display for FallbackValBatch<K, V, T, R>
194where
195    K: DataTrait + ?Sized,
196    V: DataTrait + ?Sized,
197    T: Timestamp,
198    R: WeightTrait + ?Sized,
199{
200    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
201        match &self.inner {
202            Inner::Vec(vec) => Display::fmt(vec, f),
203            Inner::File(file) => Display::fmt(file, f),
204        }
205    }
206}
207
208impl<K, V, T, R> BatchReader for FallbackValBatch<K, V, T, R>
209where
210    K: DataTrait + ?Sized,
211    V: DataTrait + ?Sized,
212    T: Timestamp,
213    R: WeightTrait + ?Sized,
214{
215    type Factories = FallbackValBatchFactories<K, V, T, R>;
216    type Key = K;
217    type Val = V;
218    type Time = T;
219    type R = R;
220
221    type Cursor<'s> = DelegatingCursor<'s, K, V, T, R>;
222
223    fn factories(&self) -> Self::Factories {
224        self.factories.clone()
225    }
226
227    fn cursor(&self) -> Self::Cursor<'_> {
228        DelegatingCursor(match &self.inner {
229            Inner::Vec(vec) => Box::new(vec.cursor()),
230            Inner::File(file) => Box::new(file.cursor()),
231        })
232    }
233
234    fn push_cursor(
235        &self,
236    ) -> Box<dyn PushCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
237        match &self.inner {
238            Inner::Vec(vec) => vec.push_cursor(),
239            Inner::File(file) => file.push_cursor(),
240        }
241    }
242
243    fn merge_cursor(
244        &self,
245        key_filter: Option<Filter<Self::Key>>,
246        value_filter: Option<GroupFilter<Self::Val>>,
247    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
248        match &self.inner {
249            Inner::Vec(vec) => vec.merge_cursor(key_filter, value_filter),
250            Inner::File(file) => file.merge_cursor(key_filter, value_filter),
251        }
252    }
253
254    fn key_count(&self) -> usize {
255        match &self.inner {
256            Inner::Vec(vec) => vec.key_count(),
257            Inner::File(file) => file.key_count(),
258        }
259    }
260
261    fn len(&self) -> usize {
262        match &self.inner {
263            Inner::Vec(vec) => vec.len(),
264            Inner::File(file) => file.len(),
265        }
266    }
267
268    #[inline]
269    fn approximate_byte_size(&self) -> usize {
270        match &self.inner {
271            Inner::File(file) => file.approximate_byte_size(),
272            Inner::Vec(vec) => vec.approximate_byte_size(),
273        }
274    }
275
276    #[inline]
277    fn filter_stats(&self) -> BloomFilterStats {
278        match &self.inner {
279            Inner::File(file) => file.filter_stats(),
280            Inner::Vec(vec) => vec.filter_stats(),
281        }
282    }
283
284    #[inline]
285    fn location(&self) -> BatchLocation {
286        match &self.inner {
287            Inner::Vec(vec) => vec.location(),
288            Inner::File(file) => file.location(),
289        }
290    }
291
292    fn cache_stats(&self) -> CacheStats {
293        match &self.inner {
294            Inner::Vec(vec) => vec.cache_stats(),
295            Inner::File(file) => file.cache_stats(),
296        }
297    }
298
299    fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, output: &mut DynVec<Self::Key>)
300    where
301        RG: Rng,
302        T: PartialEq<()>,
303    {
304        match &self.inner {
305            Inner::Vec(vec) => vec.sample_keys(rng, sample_size, output),
306            Inner::File(file) => file.sample_keys(rng, sample_size, output),
307        }
308    }
309
310    fn maybe_contains_key(&self, hash: u64) -> bool {
311        match &self.inner {
312            Inner::Vec(vec) => vec.maybe_contains_key(hash),
313            Inner::File(file) => file.maybe_contains_key(hash),
314        }
315    }
316}
317
318impl<K, V, T, R> Batch for FallbackValBatch<K, V, T, R>
319where
320    K: DataTrait + ?Sized,
321    V: DataTrait + ?Sized,
322    T: Timestamp,
323    R: WeightTrait + ?Sized,
324{
325    type Timed<T2: Timestamp> = FallbackValBatch<K, V, T2, R>;
326    type Batcher = MergeBatcher<Self>;
327    type Builder = FallbackValBuilder<K, V, T, R>;
328
329    fn persisted(&self) -> Option<Self> {
330        match &self.inner {
331            Inner::Vec(vec) => {
332                let mut file = FileValBuilder::with_capacity(
333                    &self.factories.file,
334                    self.key_count(),
335                    self.len(),
336                );
337                copy_to_builder(&mut file, vec.cursor());
338                Some(Self {
339                    inner: Inner::File(file.done()),
340                    factories: self.factories.clone(),
341                })
342            }
343            Inner::File(_) => None,
344        }
345    }
346
347    fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
348        match &self.inner {
349            Inner::Vec(vec) => vec.file_reader(),
350            Inner::File(file) => file.file_reader(),
351        }
352    }
353
354    fn from_path(factories: &Self::Factories, path: &StoragePath) -> Result<Self, ReaderError> {
355        Ok(Self {
356            factories: factories.clone(),
357            inner: Inner::File(FileValBatch::<K, V, T, R>::from_path(
358                &factories.file,
359                path,
360            )?),
361        })
362    }
363}
364
365/// A builder for creating layers from unsorted update tuples.
366#[derive(SizeOf)]
367pub struct FallbackValBuilder<K, V, T, R>
368where
369    K: DataTrait + ?Sized,
370    V: DataTrait + ?Sized,
371    T: Timestamp,
372    R: WeightTrait + ?Sized,
373{
374    #[size_of(skip)]
375    factories: FallbackValBatchFactories<K, V, T, R>,
376    inner: BuilderInner<K, V, T, R>,
377}
378
379#[derive(SizeOf)]
380enum BuilderInner<K, V, T, R>
381where
382    K: DataTrait + ?Sized,
383    V: DataTrait + ?Sized,
384    T: Timestamp,
385    R: WeightTrait + ?Sized,
386{
387    Vec(VecValBuilder<K, V, T, R, usize>),
388    File(FileValBuilder<K, V, T, R>),
389}
390
391impl<K, V, T, R> Builder<FallbackValBatch<K, V, T, R>> for FallbackValBuilder<K, V, T, R>
392where
393    Self: SizeOf,
394    K: DataTrait + ?Sized,
395    V: DataTrait + ?Sized,
396    T: Timestamp,
397    R: WeightTrait + ?Sized,
398{
399    fn with_capacity(
400        factories: &FallbackValBatchFactories<K, V, T, R>,
401        key_capacity: usize,
402        value_capacity: usize,
403    ) -> Self {
404        Self {
405            factories: factories.clone(),
406            inner: BuilderInner::Vec(VecValBuilder::with_capacity(
407                &factories.vec,
408                key_capacity,
409                value_capacity,
410            )),
411        }
412    }
413
414    fn for_merge<'a, B, I>(
415        factories: &FallbackValBatchFactories<K, V, T, R>,
416        batches: I,
417        location: Option<BatchLocation>,
418    ) -> Self
419    where
420        B: BatchReader,
421        I: IntoIterator<Item = &'a B> + Clone,
422    {
423        let key_capacity = batches.clone().into_iter().map(|b| b.key_count()).sum();
424        let value_capacity = batches.clone().into_iter().map(|b| b.len()).sum();
425        Self {
426            factories: factories.clone(),
427            inner: match pick_merge_destination(batches, location) {
428                BatchLocation::Memory => BuilderInner::Vec(VecValBuilder::with_capacity(
429                    &factories.vec,
430                    key_capacity,
431                    value_capacity,
432                )),
433                BatchLocation::Storage => BuilderInner::File(FileValBuilder::with_capacity(
434                    &factories.file,
435                    key_capacity,
436                    value_capacity,
437                )),
438            },
439        }
440    }
441
442    fn push_time_diff(&mut self, time: &T, weight: &R) {
443        match &mut self.inner {
444            BuilderInner::Vec(vec) => vec.push_time_diff(time, weight),
445            BuilderInner::File(file) => file.push_time_diff(time, weight),
446        }
447    }
448
449    fn push_val(&mut self, val: &V) {
450        match &mut self.inner {
451            BuilderInner::Vec(vec) => vec.push_val(val),
452            BuilderInner::File(file) => file.push_val(val),
453        }
454    }
455
456    fn push_key(&mut self, key: &K) {
457        match &mut self.inner {
458            BuilderInner::Vec(vec) => vec.push_key(key),
459            BuilderInner::File(file) => file.push_key(key),
460        }
461    }
462
463    fn push_time_diff_mut(&mut self, time: &mut T, weight: &mut R) {
464        match &mut self.inner {
465            BuilderInner::Vec(vec) => vec.push_time_diff_mut(time, weight),
466            BuilderInner::File(file) => file.push_time_diff_mut(time, weight),
467        }
468    }
469
470    fn push_val_mut(&mut self, val: &mut V) {
471        match &mut self.inner {
472            BuilderInner::Vec(vec) => vec.push_val_mut(val),
473            BuilderInner::File(file) => file.push_val_mut(val),
474        }
475    }
476
477    fn push_key_mut(&mut self, key: &mut K) {
478        match &mut self.inner {
479            BuilderInner::Vec(vec) => vec.push_key_mut(key),
480            BuilderInner::File(file) => file.push_key_mut(key),
481        }
482    }
483
484    fn push_val_diff(&mut self, val: &V, weight: &R)
485    where
486        T: PartialEq<()>,
487    {
488        match &mut self.inner {
489            BuilderInner::Vec(vec) => vec.push_val_diff(val, weight),
490            BuilderInner::File(file) => file.push_val_diff(val, weight),
491        }
492    }
493
494    fn push_val_diff_mut(&mut self, val: &mut V, weight: &mut R)
495    where
496        T: PartialEq<()>,
497    {
498        match &mut self.inner {
499            BuilderInner::Vec(vec) => vec.push_val_diff_mut(val, weight),
500            BuilderInner::File(file) => file.push_val_diff_mut(val, weight),
501        }
502    }
503
504    fn reserve(&mut self, additional: usize) {
505        match &mut self.inner {
506            BuilderInner::Vec(vec) => vec.reserve(additional),
507            BuilderInner::File(file) => file.reserve(additional),
508        }
509    }
510
511    fn done(self) -> FallbackValBatch<K, V, T, R> {
512        FallbackValBatch {
513            factories: self.factories,
514            inner: match self.inner {
515                BuilderInner::File(file) => Inner::File(file.done()),
516                BuilderInner::Vec(vec) => Inner::Vec(vec.done()),
517            },
518        }
519    }
520
521    fn num_keys(&self) -> usize {
522        match &self.inner {
523            BuilderInner::Vec(vec) => vec.num_keys(),
524            BuilderInner::File(file) => file.num_keys(),
525        }
526    }
527
528    fn num_tuples(&self) -> usize {
529        match &self.inner {
530            BuilderInner::Vec(vec) => vec.num_tuples(),
531            BuilderInner::File(file) => file.num_tuples(),
532        }
533    }
534}
535
536impl<K, V, T, R> Archive for FallbackValBatch<K, V, T, R>
537where
538    K: DataTrait + ?Sized,
539    V: DataTrait + ?Sized,
540    T: Timestamp,
541    R: WeightTrait + ?Sized,
542{
543    type Archived = ();
544    type Resolver = ();
545
546    unsafe fn resolve(&self, _pos: usize, _resolver: Self::Resolver, _out: *mut Self::Archived) {
547        unimplemented!();
548    }
549}
550
551impl<K, V, T, R, S> Serialize<S> for FallbackValBatch<K, V, T, R>
552where
553    K: DataTrait + ?Sized,
554    V: DataTrait + ?Sized,
555    T: Timestamp,
556    R: WeightTrait + ?Sized,
557    S: Serializer + ?Sized,
558{
559    fn serialize(&self, _serializer: &mut S) -> Result<Self::Resolver, S::Error> {
560        unimplemented!();
561    }
562}
563
564impl<K, V, T, R, D> Deserialize<FallbackValBatch<K, V, T, R>, D>
565    for Archived<FallbackValBatch<K, V, T, R>>
566where
567    K: DataTrait + ?Sized,
568    V: DataTrait + ?Sized,
569    T: Timestamp,
570    R: WeightTrait + ?Sized,
571    D: Fallible,
572{
573    fn deserialize(&self, _deserializer: &mut D) -> Result<FallbackValBatch<K, V, T, R>, D::Error> {
574        unimplemented!();
575    }
576}