1use crate::storage::tracking_bloom_filter::BloomFilterStats;
2use crate::trace::cursor::Position;
3use crate::{
4 DBData, DBWeight, NumEntries, Runtime, Timestamp,
5 dynamic::{
6 DataTrait, DynDataTyped, DynOpt, DynPair, DynUnit, DynVec, DynWeightedPairs, Erase,
7 Factory, LeanVec, WeightTrait, WithFactory,
8 },
9 storage::{
10 buffer_cache::CacheStats,
11 file::{
12 Factories as FileFactories,
13 reader::{Cursor as FileCursor, Error as ReaderError, Reader},
14 writer::Writer2,
15 },
16 },
17 trace::{
18 Batch, BatchFactories, BatchLocation, BatchReader, BatchReaderFactories, Builder, Cursor,
19 WeightedItem,
20 ord::{file::UnwrapStorage, merge_batcher::MergeBatcher},
21 },
22 utils::Tup2,
23};
24use feldera_storage::{FileReader, StoragePath};
25use rand::{Rng, seq::index::sample};
26use rkyv::{Archive, Archived, Deserialize, Fallible, Serialize, ser::Serializer};
27use size_of::SizeOf;
28use std::any::TypeId;
29use std::{
30 fmt::{self, Debug},
31 sync::Arc,
32};
33
34pub struct FileKeyBatchFactories<K, T, R>
35where
36 K: DataTrait + ?Sized,
37 T: Timestamp,
38 R: WeightTrait + ?Sized,
39{
40 key_factory: &'static dyn Factory<K>,
41 weight_factory: &'static dyn Factory<R>,
42 weights_factory: &'static dyn Factory<DynVec<R>>,
43 keys_factory: &'static dyn Factory<DynVec<K>>,
44 item_factory: &'static dyn Factory<DynPair<K, DynUnit>>,
45 factories0: FileFactories<K, DynUnit>,
46 factories1: FileFactories<DynDataTyped<T>, R>,
47 opt_key_factory: &'static dyn Factory<DynOpt<K>>,
48 weighted_item_factory: &'static dyn Factory<WeightedItem<K, DynUnit, R>>,
49 weighted_items_factory: &'static dyn Factory<DynWeightedPairs<DynPair<K, DynUnit>, R>>,
50 weighted_vals_factory: &'static dyn Factory<DynWeightedPairs<DynUnit, R>>,
51 pub timediff_factory: &'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>,
52}
53
54impl<K, T, R> Clone for FileKeyBatchFactories<K, T, R>
55where
56 K: DataTrait + ?Sized,
57 T: Timestamp,
58 R: WeightTrait + ?Sized,
59{
60 fn clone(&self) -> Self {
61 Self {
62 key_factory: self.key_factory,
63 weight_factory: self.weight_factory,
64 weights_factory: self.weights_factory,
65 keys_factory: self.keys_factory,
66 item_factory: self.item_factory,
67 factories0: self.factories0.clone(),
68 factories1: self.factories1.clone(),
69 opt_key_factory: self.opt_key_factory,
70 weighted_item_factory: self.weighted_item_factory,
71 weighted_items_factory: self.weighted_items_factory,
72 weighted_vals_factory: self.weighted_vals_factory,
73 timediff_factory: self.timediff_factory,
74 }
75 }
76}
77
78impl<K, T, R> BatchReaderFactories<K, DynUnit, T, R> for FileKeyBatchFactories<K, T, R>
79where
80 K: DataTrait + ?Sized,
81 T: Timestamp,
82 R: WeightTrait + ?Sized,
83{
84 fn new<KType, VType, RType>() -> Self
85 where
86 KType: DBData + Erase<K>,
87 VType: DBData + Erase<DynUnit>,
88 RType: DBWeight + Erase<R>,
89 {
90 Self {
91 key_factory: WithFactory::<KType>::FACTORY,
92 weight_factory: WithFactory::<RType>::FACTORY,
93 weights_factory: WithFactory::<LeanVec<RType>>::FACTORY,
94 keys_factory: WithFactory::<LeanVec<KType>>::FACTORY,
95 item_factory: WithFactory::<Tup2<KType, ()>>::FACTORY,
96 factories0: FileFactories::new::<KType, ()>(),
97 factories1: FileFactories::new::<T, RType>(),
98 opt_key_factory: WithFactory::<Option<KType>>::FACTORY,
99 weighted_item_factory: WithFactory::<Tup2<Tup2<KType, ()>, RType>>::FACTORY,
100 weighted_items_factory: WithFactory::<LeanVec<Tup2<Tup2<KType, ()>, RType>>>::FACTORY,
101 weighted_vals_factory: WithFactory::<LeanVec<Tup2<(), RType>>>::FACTORY,
102 timediff_factory: WithFactory::<LeanVec<Tup2<T, RType>>>::FACTORY,
103 }
104 }
105
106 fn key_factory(&self) -> &'static dyn Factory<K> {
107 self.key_factory
108 }
109
110 fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>> {
111 self.keys_factory
112 }
113
114 fn val_factory(&self) -> &'static dyn Factory<DynUnit> {
115 WithFactory::<()>::FACTORY
116 }
117
118 fn weight_factory(&self) -> &'static dyn Factory<R> {
119 self.weight_factory
120 }
121}
122
123impl<K, T, R> BatchFactories<K, DynUnit, T, R> for FileKeyBatchFactories<K, T, R>
124where
125 K: DataTrait + ?Sized,
126 T: Timestamp,
127 R: WeightTrait + ?Sized,
128{
129 fn item_factory(&self) -> &'static dyn Factory<DynPair<K, DynUnit>> {
130 self.item_factory
131 }
132
133 fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, DynUnit, R>> {
134 self.weighted_item_factory
135 }
136
137 fn weighted_items_factory(
138 &self,
139 ) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, DynUnit>, R>> {
140 self.weighted_items_factory
141 }
142
143 fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynUnit, R>> {
144 self.weighted_vals_factory
145 }
146
147 fn time_diffs_factory(
148 &self,
149 ) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>> {
150 Some(self.timediff_factory)
151 }
152}
153
154#[derive(SizeOf)]
159pub struct FileKeyBatch<K, T, R>
160where
161 K: DataTrait + ?Sized,
162 T: Timestamp,
163 R: WeightTrait + ?Sized,
164{
165 #[size_of(skip)]
166 factories: FileKeyBatchFactories<K, T, R>,
167 #[allow(clippy::type_complexity)]
168 file: Arc<
169 Reader<(
170 &'static K,
171 &'static DynUnit,
172 (&'static DynDataTyped<T>, &'static R, ()),
173 )>,
174 >,
175}
176
177impl<K, T, R> Debug for FileKeyBatch<K, T, R>
178where
179 K: DataTrait + ?Sized,
180 T: Timestamp,
181 R: WeightTrait + ?Sized,
182{
183 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
184 write!(f, "FileKeyBatch {{ data: ")?;
185 let mut cursor = self.cursor();
186 let mut n_keys = 0;
187 while cursor.key_valid() {
188 if n_keys > 0 {
189 write!(f, ", ")?;
190 }
191 write!(f, "{:?}(", cursor.key())?;
192 let mut n_values = 0;
193 cursor.map_times(&mut |time, diff| {
194 if n_values > 0 {
195 let _ = write!(f, ", ");
196 }
197 let _ = write!(f, "({time:?}, {diff:+?})");
198 n_values += 1;
199 });
200 write!(f, ")")?;
201 n_keys += 1;
202 cursor.step_key();
203 }
204 write!(f, " }}")
205 }
206}
207
208impl<K, T, R> Clone for FileKeyBatch<K, T, R>
209where
210 K: DataTrait + ?Sized,
211 T: Timestamp,
212 R: WeightTrait + ?Sized,
213{
214 fn clone(&self) -> Self {
215 Self {
216 factories: self.factories.clone(),
217 file: self.file.clone(),
218 }
219 }
220}
221
222impl<K, T, R> NumEntries for FileKeyBatch<K, T, R>
223where
224 K: DataTrait + ?Sized,
225 T: Timestamp,
226 R: WeightTrait + ?Sized,
227{
228 const CONST_NUM_ENTRIES: Option<usize> = None;
229
230 fn num_entries_shallow(&self) -> usize {
231 self.file.rows().len() as usize
232 }
233
234 fn num_entries_deep(&self) -> usize {
235 self.file.n_rows(1) as usize
236 }
237}
238
239impl<K, T, R> BatchReader for FileKeyBatch<K, T, R>
240where
241 K: DataTrait + ?Sized,
242 T: Timestamp,
243 R: WeightTrait + ?Sized,
244{
245 type Factories = FileKeyBatchFactories<K, T, R>;
246 type Key = K;
247 type Val = DynUnit;
248 type Time = T;
249 type R = R;
250 type Cursor<'s> = FileKeyCursor<'s, K, T, R>;
251
252 fn factories(&self) -> Self::Factories {
253 self.factories.clone()
254 }
255
256 fn cursor(&self) -> Self::Cursor<'_> {
257 FileKeyCursor::new(self)
258 }
259
260 #[inline]
261 fn key_count(&self) -> usize {
262 self.file.n_rows(0) as usize
263 }
264
265 #[inline]
266 fn len(&self) -> usize {
267 self.file.n_rows(1) as usize
268 }
269
270 fn approximate_byte_size(&self) -> usize {
271 self.file.byte_size().unwrap_storage() as usize
272 }
273
274 fn filter_stats(&self) -> BloomFilterStats {
275 self.file.filter_stats()
276 }
277
278 #[inline]
279 fn location(&self) -> BatchLocation {
280 BatchLocation::Storage
281 }
282
283 fn cache_stats(&self) -> CacheStats {
284 self.file.cache_stats()
285 }
286
287 fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, output: &mut DynVec<Self::Key>)
288 where
289 RG: Rng,
290 {
291 let size = self.key_count();
292 let mut cursor = self.cursor();
293 if sample_size >= size {
294 output.reserve(size);
295
296 while cursor.key_valid() {
297 output.push_ref(cursor.key());
298 cursor.step_key();
299 }
300 } else {
301 output.reserve(sample_size);
302
303 let mut indexes = sample(rng, size, sample_size).into_vec();
304 indexes.sort_unstable();
305 for index in indexes.into_iter() {
306 cursor.move_key(|key_cursor| unsafe { key_cursor.move_to_row(index as u64) });
307 output.push_ref(cursor.key());
308 }
309 }
310 }
311
312 fn maybe_contains_key(&self, hash: u64) -> bool {
313 self.file.maybe_contains_key(hash)
314 }
315}
316
317impl<K, T, R> Batch for FileKeyBatch<K, T, R>
318where
319 K: DataTrait + ?Sized,
320 T: Timestamp,
321 R: WeightTrait + ?Sized,
322{
323 type Timed<T2: Timestamp> = FileKeyBatch<K, T2, R>;
324 type Batcher = MergeBatcher<Self>;
325 type Builder = FileKeyBuilder<K, T, R>;
326
327 fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
328 self.file.mark_for_checkpoint();
329 Some(self.file.file_handle().clone())
330 }
331
332 fn from_path(factories: &Self::Factories, path: &StoragePath) -> Result<Self, ReaderError> {
333 let any_factory0 = factories.factories0.any_factories();
334 let any_factory1 = factories.factories1.any_factories();
335 let file = Arc::new(Reader::open(
336 &[&any_factory0, &any_factory1],
337 Runtime::buffer_cache,
338 &*Runtime::storage_backend().unwrap_storage(),
339 path,
340 )?);
341
342 Ok(Self {
343 factories: factories.clone(),
344 file,
345 })
346 }
347}
348
349type RawKeyCursor<'s, K, T, R> = FileCursor<
350 's,
351 K,
352 DynUnit,
353 (&'static DynDataTyped<T>, &'static R, ()),
354 (
355 &'static K,
356 &'static DynUnit,
357 (&'static DynDataTyped<T>, &'static R, ()),
358 ),
359>;
360
361#[derive(Debug, SizeOf)]
363pub struct FileKeyCursor<'s, K, T, R>
364where
365 K: DataTrait + ?Sized,
366 T: Timestamp,
367 R: WeightTrait + ?Sized,
368{
369 batch: &'s FileKeyBatch<K, T, R>,
370 pub(crate) cursor: RawKeyCursor<'s, K, T, R>,
371 val_valid: bool,
372
373 pub(crate) time: Box<DynDataTyped<T>>,
374 pub(crate) diff: Box<R>,
375}
376
377impl<K, T, R> Clone for FileKeyCursor<'_, K, T, R>
378where
379 K: DataTrait + ?Sized,
380 T: Timestamp,
381 R: WeightTrait + ?Sized,
382{
383 fn clone(&self) -> Self {
384 Self {
385 batch: self.batch,
386 cursor: self.cursor.clone(),
387 val_valid: self.val_valid,
388
389 time: self.batch.factories.factories1.key_factory.default_box(),
392 diff: self.batch.factories.weight_factory.default_box(),
393 }
394 }
395}
396
397impl<'s, K, T, R> FileKeyCursor<'s, K, T, R>
398where
399 K: DataTrait + ?Sized,
400 T: Timestamp,
401 R: WeightTrait + ?Sized,
402{
403 fn new_from(batch: &'s FileKeyBatch<K, T, R>, lower_bound: usize) -> Self {
404 let cursor = unsafe {
405 batch
406 .file
407 .rows()
408 .subset(lower_bound as u64..)
409 .first()
410 .unwrap_storage()
411 };
412 let key_valid = cursor.has_value();
413
414 Self {
415 batch,
416 cursor,
417 val_valid: key_valid,
418 time: batch.factories.factories1.key_factory.default_box(),
419 diff: batch.factories.weight_factory.default_box(),
420 }
421 }
422
423 fn new(batch: &'s FileKeyBatch<K, T, R>) -> Self {
424 Self::new_from(batch, 0)
425 }
426
427 fn move_key<F>(&mut self, op: F)
428 where
429 F: Fn(&mut RawKeyCursor<'s, K, T, R>) -> Result<(), ReaderError>,
430 {
431 op(&mut self.cursor).unwrap_storage();
432 self.val_valid = self.cursor.has_value();
433 }
434}
435
436impl<K, T, R> Cursor<K, DynUnit, T, R> for FileKeyCursor<'_, K, T, R>
437where
438 K: DataTrait + ?Sized,
439 T: Timestamp,
440 R: WeightTrait + ?Sized,
441{
442 fn weight_factory(&self) -> &'static dyn Factory<R> {
443 self.batch.factories.weight_factory
444 }
445
446 fn key(&self) -> &K {
447 debug_assert!(self.key_valid());
448 self.cursor.key().unwrap()
449 }
450
451 fn val(&self) -> &DynUnit {
452 &()
453 }
454
455 fn map_times(&mut self, logic: &mut dyn FnMut(&T, &R)) {
456 let mut val_cursor = unsafe {
457 self.cursor
458 .next_column()
459 .unwrap_storage()
460 .first()
461 .unwrap_storage()
462 };
463 while unsafe { val_cursor.item((self.time.as_mut(), &mut self.diff)) }.is_some() {
464 logic(self.time.as_ref(), self.diff.as_ref());
465 unsafe { val_cursor.move_next() }.unwrap_storage();
466 }
467 }
468
469 fn map_times_through(&mut self, upper: &T, logic: &mut dyn FnMut(&T, &R)) {
470 let mut val_cursor = unsafe {
471 self.cursor
472 .next_column()
473 .unwrap_storage()
474 .first()
475 .unwrap_storage()
476 };
477 while unsafe { val_cursor.item((self.time.as_mut(), &mut self.diff)) }.is_some() {
478 if self.time.less_equal(upper) {
479 logic(self.time.as_ref(), self.diff.as_ref());
480 }
481 unsafe { val_cursor.move_next() }.unwrap_storage();
482 }
483 }
484
485 fn map_values(&mut self, logic: &mut dyn FnMut(&DynUnit, &R))
486 where
487 T: PartialEq<()>,
488 {
489 if self.val_valid {
490 logic(&(), self.weight())
491 }
492 }
493
494 fn weight(&mut self) -> &R
495 where
496 T: PartialEq<()>,
497 {
498 self.weight_checked()
499 }
500
501 fn weight_checked(&mut self) -> &R {
502 if TypeId::of::<T>() == TypeId::of::<()>() {
503 let val_cursor = unsafe {
504 self.cursor
505 .next_column()
506 .unwrap_storage()
507 .first()
508 .unwrap_storage()
509 };
510 unsafe { val_cursor.aux(&mut self.diff) }.unwrap();
511 self.diff.as_ref()
512 } else {
513 panic!("FileKeyCursor::weight_checked called on non-unit timestamp type");
514 }
515 }
516
517 fn key_valid(&self) -> bool {
518 self.cursor.has_value()
519 }
520
521 fn val_valid(&self) -> bool {
522 self.val_valid
523 }
524
525 fn step_key(&mut self) {
526 self.move_key(|key_cursor| unsafe { key_cursor.move_next() });
527 }
528
529 fn step_key_reverse(&mut self) {
530 self.move_key(|key_cursor| unsafe { key_cursor.move_prev() });
531 }
532
533 fn seek_key(&mut self, key: &K) {
534 self.move_key(|key_cursor| unsafe { key_cursor.advance_to_value_or_larger(key) });
535 }
536
537 fn seek_key_exact(&mut self, key: &K, hash: Option<u64>) -> bool {
538 let hash = hash.unwrap_or_else(|| key.default_hash());
539 if !self.batch.maybe_contains_key(hash) {
540 return false;
541 }
542 self.seek_key(key);
543 self.key_valid() && self.key().eq(key)
544 }
545
546 fn seek_key_with(&mut self, predicate: &dyn Fn(&K) -> bool) {
547 self.move_key(|key_cursor| unsafe { key_cursor.seek_forward_until(predicate) });
548 }
549
550 fn seek_key_with_reverse(&mut self, predicate: &dyn Fn(&K) -> bool) {
551 self.move_key(|key_cursor| unsafe { key_cursor.seek_backward_until(predicate) });
552 }
553
554 fn seek_key_reverse(&mut self, key: &K) {
555 self.move_key(|key_cursor| unsafe { key_cursor.rewind_to_value_or_smaller(key) });
556 }
557
558 fn step_val(&mut self) {
559 self.val_valid = false;
560 }
561
562 fn seek_val(&mut self, _val: &DynUnit) {}
563
564 fn seek_val_with(&mut self, predicate: &dyn Fn(&DynUnit) -> bool) {
565 if !predicate(&()) {
566 self.val_valid = false;
567 }
568 }
569
570 fn rewind_keys(&mut self) {
571 self.move_key(|key_cursor| unsafe { key_cursor.move_first() });
572 }
573
574 fn fast_forward_keys(&mut self) {
575 self.move_key(|key_cursor| unsafe { key_cursor.move_last() });
576 }
577
578 fn rewind_vals(&mut self) {
579 self.val_valid = true;
580 }
581
582 fn step_val_reverse(&mut self) {
583 self.val_valid = false;
584 }
585
586 fn seek_val_reverse(&mut self, _val: &DynUnit) {}
587
588 fn seek_val_with_reverse(&mut self, predicate: &dyn Fn(&DynUnit) -> bool) {
589 if !predicate(&()) {
590 self.val_valid = false;
591 }
592 }
593
594 fn fast_forward_vals(&mut self) {
595 self.val_valid = true;
596 }
597
598 fn position(&self) -> Option<Position> {
599 Some(Position {
600 total: self.cursor.len(),
601 offset: self.cursor.absolute_position(),
602 })
603 }
604}
605
606#[derive(SizeOf)]
608pub struct FileKeyBuilder<K, T, R>
609where
610 K: DataTrait + ?Sized,
611 T: Timestamp,
612 R: WeightTrait + ?Sized,
613{
614 #[size_of(skip)]
615 factories: FileKeyBatchFactories<K, T, R>,
616 #[size_of(skip)]
617 writer: Writer2<K, DynUnit, DynDataTyped<T>, R>,
618 key: Box<DynOpt<K>>,
619 num_tuples: usize,
620}
621
622impl<K, T, R> Builder<FileKeyBatch<K, T, R>> for FileKeyBuilder<K, T, R>
623where
624 Self: SizeOf,
625 K: DataTrait + ?Sized,
626 T: Timestamp,
627 R: WeightTrait + ?Sized,
628{
629 #[inline]
630 fn with_capacity(
631 factories: &FileKeyBatchFactories<K, T, R>,
632 key_capacity: usize,
633 _value_capacity: usize,
634 ) -> Self {
635 Self {
636 factories: factories.clone(),
637 writer: Writer2::new(
638 &factories.factories0,
639 &factories.factories1,
640 Runtime::buffer_cache,
641 &*Runtime::storage_backend().unwrap_storage(),
642 Runtime::file_writer_parameters(),
643 key_capacity,
644 )
645 .unwrap_storage(),
646 key: factories.opt_key_factory.default_box(),
647 num_tuples: 0,
648 }
649 }
650
651 fn push_key(&mut self, key: &K) {
652 self.writer.write0((key, &())).unwrap_storage();
653 }
654
655 fn push_val(&mut self, _val: &DynUnit) {}
656
657 fn push_time_diff(&mut self, time: &T, weight: &R) {
658 debug_assert!(!weight.is_zero());
659 self.writer.write1((time, weight)).unwrap_storage();
660 self.num_tuples += 1;
661 }
662
663 fn done(self) -> FileKeyBatch<K, T, R> {
664 FileKeyBatch {
665 factories: self.factories,
666 file: Arc::new(self.writer.into_reader().unwrap_storage()),
667 }
668 }
669
670 fn num_keys(&self) -> usize {
671 self.writer.n_rows() as usize
672 }
673
674 fn num_tuples(&self) -> usize {
675 self.num_tuples
676 }
677}
678
679impl<K, T, R> Archive for FileKeyBatch<K, T, R>
680where
681 K: DataTrait + ?Sized,
682 T: Timestamp,
683 R: WeightTrait + ?Sized,
684{
685 type Archived = ();
686 type Resolver = ();
687
688 unsafe fn resolve(&self, _pos: usize, _resolver: Self::Resolver, _out: *mut Self::Archived) {
689 unimplemented!();
690 }
691}
692
693impl<K, T, R, S> Serialize<S> for FileKeyBatch<K, T, R>
694where
695 K: DataTrait + ?Sized,
696 T: Timestamp,
697 R: WeightTrait + ?Sized,
698 S: Serializer + ?Sized,
699{
700 fn serialize(&self, _serializer: &mut S) -> Result<Self::Resolver, S::Error> {
701 unimplemented!();
702 }
703}
704
705impl<K, T, R, D> Deserialize<FileKeyBatch<K, T, R>, D> for Archived<FileKeyBatch<K, T, R>>
706where
707 K: DataTrait + ?Sized,
708 T: Timestamp,
709 R: WeightTrait + ?Sized,
710 D: Fallible,
711{
712 fn deserialize(&self, _deserializer: &mut D) -> Result<FileKeyBatch<K, T, R>, D::Error> {
713 unimplemented!();
714 }
715}