1use std::fmt::{Display, Formatter};
2use std::sync::Arc;
3
4use super::utils::{copy_to_builder, pick_merge_destination};
5use crate::storage::buffer_cache::CacheStats;
6use crate::storage::tracking_bloom_filter::BloomFilterStats;
7use crate::trace::cursor::{DelegatingCursor, PushCursor};
8use crate::trace::ord::file::val_batch::FileValBuilder;
9use crate::trace::ord::vec::val_batch::VecValBuilder;
10use crate::trace::{BatchLocation, GroupFilter, MergeCursor};
11use crate::{
12 DBData, DBWeight, NumEntries, Timestamp,
13 dynamic::{
14 DataTrait, DynDataTyped, DynPair, DynVec, DynWeightedPairs, Erase, Factory, WeightTrait,
15 },
16 storage::file::reader::Error as ReaderError,
17 trace::{
18 Batch, BatchFactories, BatchReader, BatchReaderFactories, Builder, FileValBatch,
19 FileValBatchFactories, Filter, VecValBatch, VecValBatchFactories, WeightedItem,
20 ord::merge_batcher::MergeBatcher,
21 },
22};
23use derive_more::Debug;
24use feldera_storage::{FileReader, StoragePath};
25use rand::Rng;
26use rkyv::{Archive, Archived, Deserialize, Fallible, Serialize, ser::Serializer};
27use size_of::SizeOf;
28
29pub struct FallbackValBatchFactories<K, V, T, R>
30where
31 K: DataTrait + ?Sized,
32 V: DataTrait + ?Sized,
33 T: Timestamp,
34 R: WeightTrait + ?Sized,
35{
36 file: FileValBatchFactories<K, V, T, R>,
37 vec: VecValBatchFactories<K, V, T, R>,
38}
39
40impl<K, V, T, R> Clone for FallbackValBatchFactories<K, V, T, R>
41where
42 K: DataTrait + ?Sized,
43 V: DataTrait + ?Sized,
44 T: Timestamp,
45 R: WeightTrait + ?Sized,
46{
47 fn clone(&self) -> Self {
48 Self {
49 file: self.file.clone(),
50 vec: self.vec.clone(),
51 }
52 }
53}
54
55impl<K, V, T, R> BatchReaderFactories<K, V, T, R> for FallbackValBatchFactories<K, V, T, R>
56where
57 K: DataTrait + ?Sized,
58 V: DataTrait + ?Sized,
59 T: Timestamp,
60 R: WeightTrait + ?Sized,
61{
62 fn new<KType, VType, RType>() -> Self
63 where
64 KType: DBData + Erase<K>,
65 VType: DBData + Erase<V>,
66 RType: DBWeight + Erase<R>,
67 {
68 Self {
69 file: FileValBatchFactories::new::<KType, VType, RType>(),
70 vec: VecValBatchFactories::new::<KType, VType, RType>(),
71 }
72 }
73
74 fn key_factory(&self) -> &'static dyn Factory<K> {
75 self.file.key_factory()
76 }
77
78 fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>> {
79 self.file.keys_factory()
80 }
81
82 fn val_factory(&self) -> &'static dyn Factory<V> {
83 self.file.val_factory()
84 }
85
86 fn weight_factory(&self) -> &'static dyn Factory<R> {
87 self.file.weight_factory()
88 }
89}
90
91impl<K, V, T, R> BatchFactories<K, V, T, R> for FallbackValBatchFactories<K, V, T, R>
92where
93 K: DataTrait + ?Sized,
94 V: DataTrait + ?Sized,
95 T: Timestamp,
96 R: WeightTrait + ?Sized,
97{
98 fn item_factory(&self) -> &'static dyn Factory<DynPair<K, V>> {
99 self.file.item_factory()
100 }
101
102 fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, V, R>> {
103 self.file.weighted_item_factory()
104 }
105
106 fn weighted_items_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, V>, R>> {
107 self.file.weighted_items_factory()
108 }
109
110 fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<V, R>> {
111 self.file.weighted_vals_factory()
112 }
113
114 fn time_diffs_factory(
115 &self,
116 ) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>> {
117 self.file.time_diffs_factory()
118 }
119}
120
121#[derive(Debug, SizeOf)]
124pub struct FallbackValBatch<K, V, T, R>
125where
126 K: DataTrait + ?Sized,
127 V: DataTrait + ?Sized,
128 T: Timestamp,
129 R: WeightTrait + ?Sized,
130{
131 #[size_of(skip)]
132 #[debug(skip)]
133 factories: FallbackValBatchFactories<K, V, T, R>,
134 inner: Inner<K, V, T, R>,
135}
136
137#[derive(Debug, SizeOf)]
138enum Inner<K, V, T, R>
139where
140 K: DataTrait + ?Sized,
141 V: DataTrait + ?Sized,
142 T: Timestamp,
143 R: WeightTrait + ?Sized,
144{
145 Vec(VecValBatch<K, V, T, R>),
146 File(FileValBatch<K, V, T, R>),
147}
148
149impl<K, V, T, R> Clone for FallbackValBatch<K, V, T, R>
150where
151 K: DataTrait + ?Sized,
152 V: DataTrait + ?Sized,
153 T: Timestamp,
154 R: WeightTrait + ?Sized,
155{
156 fn clone(&self) -> Self {
157 Self {
158 factories: self.factories.clone(),
159 inner: match &self.inner {
160 Inner::Vec(vec) => Inner::Vec(vec.clone()),
161 Inner::File(file) => Inner::File(file.clone()),
162 },
163 }
164 }
165}
166
167impl<K, V, T, R> NumEntries for FallbackValBatch<K, V, T, R>
168where
169 K: DataTrait + ?Sized,
170 V: DataTrait + ?Sized,
171 T: Timestamp,
172 R: WeightTrait + ?Sized,
173{
174 const CONST_NUM_ENTRIES: Option<usize> = None;
175
176 #[inline]
177 fn num_entries_shallow(&self) -> usize {
178 match &self.inner {
179 Inner::Vec(vec) => vec.num_entries_shallow(),
180 Inner::File(file) => file.num_entries_shallow(),
181 }
182 }
183
184 #[inline]
185 fn num_entries_deep(&self) -> usize {
186 match &self.inner {
187 Inner::Vec(vec) => vec.num_entries_deep(),
188 Inner::File(file) => file.num_entries_deep(),
189 }
190 }
191}
192
193impl<K, V, T, R> Display for FallbackValBatch<K, V, T, R>
194where
195 K: DataTrait + ?Sized,
196 V: DataTrait + ?Sized,
197 T: Timestamp,
198 R: WeightTrait + ?Sized,
199{
200 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
201 match &self.inner {
202 Inner::Vec(vec) => Display::fmt(vec, f),
203 Inner::File(file) => Display::fmt(file, f),
204 }
205 }
206}
207
208impl<K, V, T, R> BatchReader for FallbackValBatch<K, V, T, R>
209where
210 K: DataTrait + ?Sized,
211 V: DataTrait + ?Sized,
212 T: Timestamp,
213 R: WeightTrait + ?Sized,
214{
215 type Factories = FallbackValBatchFactories<K, V, T, R>;
216 type Key = K;
217 type Val = V;
218 type Time = T;
219 type R = R;
220
221 type Cursor<'s> = DelegatingCursor<'s, K, V, T, R>;
222
223 fn factories(&self) -> Self::Factories {
224 self.factories.clone()
225 }
226
227 fn cursor(&self) -> Self::Cursor<'_> {
228 DelegatingCursor(match &self.inner {
229 Inner::Vec(vec) => Box::new(vec.cursor()),
230 Inner::File(file) => Box::new(file.cursor()),
231 })
232 }
233
234 fn push_cursor(
235 &self,
236 ) -> Box<dyn PushCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
237 match &self.inner {
238 Inner::Vec(vec) => vec.push_cursor(),
239 Inner::File(file) => file.push_cursor(),
240 }
241 }
242
243 fn merge_cursor(
244 &self,
245 key_filter: Option<Filter<Self::Key>>,
246 value_filter: Option<GroupFilter<Self::Val>>,
247 ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
248 match &self.inner {
249 Inner::Vec(vec) => vec.merge_cursor(key_filter, value_filter),
250 Inner::File(file) => file.merge_cursor(key_filter, value_filter),
251 }
252 }
253
254 fn key_count(&self) -> usize {
255 match &self.inner {
256 Inner::Vec(vec) => vec.key_count(),
257 Inner::File(file) => file.key_count(),
258 }
259 }
260
261 fn len(&self) -> usize {
262 match &self.inner {
263 Inner::Vec(vec) => vec.len(),
264 Inner::File(file) => file.len(),
265 }
266 }
267
268 #[inline]
269 fn approximate_byte_size(&self) -> usize {
270 match &self.inner {
271 Inner::File(file) => file.approximate_byte_size(),
272 Inner::Vec(vec) => vec.approximate_byte_size(),
273 }
274 }
275
276 #[inline]
277 fn filter_stats(&self) -> BloomFilterStats {
278 match &self.inner {
279 Inner::File(file) => file.filter_stats(),
280 Inner::Vec(vec) => vec.filter_stats(),
281 }
282 }
283
284 #[inline]
285 fn location(&self) -> BatchLocation {
286 match &self.inner {
287 Inner::Vec(vec) => vec.location(),
288 Inner::File(file) => file.location(),
289 }
290 }
291
292 fn cache_stats(&self) -> CacheStats {
293 match &self.inner {
294 Inner::Vec(vec) => vec.cache_stats(),
295 Inner::File(file) => file.cache_stats(),
296 }
297 }
298
299 fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, output: &mut DynVec<Self::Key>)
300 where
301 RG: Rng,
302 T: PartialEq<()>,
303 {
304 match &self.inner {
305 Inner::Vec(vec) => vec.sample_keys(rng, sample_size, output),
306 Inner::File(file) => file.sample_keys(rng, sample_size, output),
307 }
308 }
309
310 fn maybe_contains_key(&self, hash: u64) -> bool {
311 match &self.inner {
312 Inner::Vec(vec) => vec.maybe_contains_key(hash),
313 Inner::File(file) => file.maybe_contains_key(hash),
314 }
315 }
316}
317
318impl<K, V, T, R> Batch for FallbackValBatch<K, V, T, R>
319where
320 K: DataTrait + ?Sized,
321 V: DataTrait + ?Sized,
322 T: Timestamp,
323 R: WeightTrait + ?Sized,
324{
325 type Timed<T2: Timestamp> = FallbackValBatch<K, V, T2, R>;
326 type Batcher = MergeBatcher<Self>;
327 type Builder = FallbackValBuilder<K, V, T, R>;
328
329 fn persisted(&self) -> Option<Self> {
330 match &self.inner {
331 Inner::Vec(vec) => {
332 let mut file = FileValBuilder::with_capacity(
333 &self.factories.file,
334 self.key_count(),
335 self.len(),
336 );
337 copy_to_builder(&mut file, vec.cursor());
338 Some(Self {
339 inner: Inner::File(file.done()),
340 factories: self.factories.clone(),
341 })
342 }
343 Inner::File(_) => None,
344 }
345 }
346
347 fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
348 match &self.inner {
349 Inner::Vec(vec) => vec.file_reader(),
350 Inner::File(file) => file.file_reader(),
351 }
352 }
353
354 fn from_path(factories: &Self::Factories, path: &StoragePath) -> Result<Self, ReaderError> {
355 Ok(Self {
356 factories: factories.clone(),
357 inner: Inner::File(FileValBatch::<K, V, T, R>::from_path(
358 &factories.file,
359 path,
360 )?),
361 })
362 }
363}
364
365#[derive(SizeOf)]
367pub struct FallbackValBuilder<K, V, T, R>
368where
369 K: DataTrait + ?Sized,
370 V: DataTrait + ?Sized,
371 T: Timestamp,
372 R: WeightTrait + ?Sized,
373{
374 #[size_of(skip)]
375 factories: FallbackValBatchFactories<K, V, T, R>,
376 inner: BuilderInner<K, V, T, R>,
377}
378
379#[derive(SizeOf)]
380enum BuilderInner<K, V, T, R>
381where
382 K: DataTrait + ?Sized,
383 V: DataTrait + ?Sized,
384 T: Timestamp,
385 R: WeightTrait + ?Sized,
386{
387 Vec(VecValBuilder<K, V, T, R, usize>),
388 File(FileValBuilder<K, V, T, R>),
389}
390
391impl<K, V, T, R> Builder<FallbackValBatch<K, V, T, R>> for FallbackValBuilder<K, V, T, R>
392where
393 Self: SizeOf,
394 K: DataTrait + ?Sized,
395 V: DataTrait + ?Sized,
396 T: Timestamp,
397 R: WeightTrait + ?Sized,
398{
399 fn with_capacity(
400 factories: &FallbackValBatchFactories<K, V, T, R>,
401 key_capacity: usize,
402 value_capacity: usize,
403 ) -> Self {
404 Self {
405 factories: factories.clone(),
406 inner: BuilderInner::Vec(VecValBuilder::with_capacity(
407 &factories.vec,
408 key_capacity,
409 value_capacity,
410 )),
411 }
412 }
413
414 fn for_merge<'a, B, I>(
415 factories: &FallbackValBatchFactories<K, V, T, R>,
416 batches: I,
417 location: Option<BatchLocation>,
418 ) -> Self
419 where
420 B: BatchReader,
421 I: IntoIterator<Item = &'a B> + Clone,
422 {
423 let key_capacity = batches.clone().into_iter().map(|b| b.key_count()).sum();
424 let value_capacity = batches.clone().into_iter().map(|b| b.len()).sum();
425 Self {
426 factories: factories.clone(),
427 inner: match pick_merge_destination(batches, location) {
428 BatchLocation::Memory => BuilderInner::Vec(VecValBuilder::with_capacity(
429 &factories.vec,
430 key_capacity,
431 value_capacity,
432 )),
433 BatchLocation::Storage => BuilderInner::File(FileValBuilder::with_capacity(
434 &factories.file,
435 key_capacity,
436 value_capacity,
437 )),
438 },
439 }
440 }
441
442 fn push_time_diff(&mut self, time: &T, weight: &R) {
443 match &mut self.inner {
444 BuilderInner::Vec(vec) => vec.push_time_diff(time, weight),
445 BuilderInner::File(file) => file.push_time_diff(time, weight),
446 }
447 }
448
449 fn push_val(&mut self, val: &V) {
450 match &mut self.inner {
451 BuilderInner::Vec(vec) => vec.push_val(val),
452 BuilderInner::File(file) => file.push_val(val),
453 }
454 }
455
456 fn push_key(&mut self, key: &K) {
457 match &mut self.inner {
458 BuilderInner::Vec(vec) => vec.push_key(key),
459 BuilderInner::File(file) => file.push_key(key),
460 }
461 }
462
463 fn push_time_diff_mut(&mut self, time: &mut T, weight: &mut R) {
464 match &mut self.inner {
465 BuilderInner::Vec(vec) => vec.push_time_diff_mut(time, weight),
466 BuilderInner::File(file) => file.push_time_diff_mut(time, weight),
467 }
468 }
469
470 fn push_val_mut(&mut self, val: &mut V) {
471 match &mut self.inner {
472 BuilderInner::Vec(vec) => vec.push_val_mut(val),
473 BuilderInner::File(file) => file.push_val_mut(val),
474 }
475 }
476
477 fn push_key_mut(&mut self, key: &mut K) {
478 match &mut self.inner {
479 BuilderInner::Vec(vec) => vec.push_key_mut(key),
480 BuilderInner::File(file) => file.push_key_mut(key),
481 }
482 }
483
484 fn push_val_diff(&mut self, val: &V, weight: &R)
485 where
486 T: PartialEq<()>,
487 {
488 match &mut self.inner {
489 BuilderInner::Vec(vec) => vec.push_val_diff(val, weight),
490 BuilderInner::File(file) => file.push_val_diff(val, weight),
491 }
492 }
493
494 fn push_val_diff_mut(&mut self, val: &mut V, weight: &mut R)
495 where
496 T: PartialEq<()>,
497 {
498 match &mut self.inner {
499 BuilderInner::Vec(vec) => vec.push_val_diff_mut(val, weight),
500 BuilderInner::File(file) => file.push_val_diff_mut(val, weight),
501 }
502 }
503
504 fn reserve(&mut self, additional: usize) {
505 match &mut self.inner {
506 BuilderInner::Vec(vec) => vec.reserve(additional),
507 BuilderInner::File(file) => file.reserve(additional),
508 }
509 }
510
511 fn done(self) -> FallbackValBatch<K, V, T, R> {
512 FallbackValBatch {
513 factories: self.factories,
514 inner: match self.inner {
515 BuilderInner::File(file) => Inner::File(file.done()),
516 BuilderInner::Vec(vec) => Inner::Vec(vec.done()),
517 },
518 }
519 }
520
521 fn num_keys(&self) -> usize {
522 match &self.inner {
523 BuilderInner::Vec(vec) => vec.num_keys(),
524 BuilderInner::File(file) => file.num_keys(),
525 }
526 }
527
528 fn num_tuples(&self) -> usize {
529 match &self.inner {
530 BuilderInner::Vec(vec) => vec.num_tuples(),
531 BuilderInner::File(file) => file.num_tuples(),
532 }
533 }
534}
535
536impl<K, V, T, R> Archive for FallbackValBatch<K, V, T, R>
537where
538 K: DataTrait + ?Sized,
539 V: DataTrait + ?Sized,
540 T: Timestamp,
541 R: WeightTrait + ?Sized,
542{
543 type Archived = ();
544 type Resolver = ();
545
546 unsafe fn resolve(&self, _pos: usize, _resolver: Self::Resolver, _out: *mut Self::Archived) {
547 unimplemented!();
548 }
549}
550
551impl<K, V, T, R, S> Serialize<S> for FallbackValBatch<K, V, T, R>
552where
553 K: DataTrait + ?Sized,
554 V: DataTrait + ?Sized,
555 T: Timestamp,
556 R: WeightTrait + ?Sized,
557 S: Serializer + ?Sized,
558{
559 fn serialize(&self, _serializer: &mut S) -> Result<Self::Resolver, S::Error> {
560 unimplemented!();
561 }
562}
563
564impl<K, V, T, R, D> Deserialize<FallbackValBatch<K, V, T, R>, D>
565 for Archived<FallbackValBatch<K, V, T, R>>
566where
567 K: DataTrait + ?Sized,
568 V: DataTrait + ?Sized,
569 T: Timestamp,
570 R: WeightTrait + ?Sized,
571 D: Fallible,
572{
573 fn deserialize(&self, _deserializer: &mut D) -> Result<FallbackValBatch<K, V, T, R>, D::Error> {
574 unimplemented!();
575 }
576}