1use crate::storage::tracking_bloom_filter::BloomFilterStats;
2use crate::trace::ord::merge_batcher::MergeBatcher;
3use crate::{
4 DBData, DBWeight, NumEntries, Timestamp,
5 dynamic::{
6 DataTrait, DynDataTyped, DynPair, DynUnit, DynVec, DynWeightedPairs, Erase, Factory,
7 LeanVec, WeightTrait, WithFactory,
8 },
9 trace::{
10 Batch, BatchFactories, BatchReader, BatchReaderFactories, Builder, Cursor, Deserializer,
11 Serializer, WeightedItem,
12 cursor::Position,
13 layers::{
14 Cursor as TrieCursor, Layer, LayerCursor, LayerFactories, Leaf, LeafFactories,
15 OrdOffset, Trie,
16 },
17 },
18 utils::{ConsolidatePairedSlices, Tup2},
19};
20use feldera_storage::FileReader;
21use rand::Rng;
22use rkyv::{Archive, Deserialize, Serialize};
23use size_of::SizeOf;
24use std::any::TypeId;
25use std::{
26 fmt::{self, Debug, Display},
27 sync::Arc,
28};
29
30pub struct VecKeyBatchFactories<K, T, R>
31where
32 K: DataTrait + ?Sized,
33 T: Timestamp,
34 R: WeightTrait + ?Sized,
35{
36 layer_factories: LayerFactories<K, LeafFactories<DynDataTyped<T>, R>>,
37 consolidate_weights: &'static dyn ConsolidatePairedSlices<DynDataTyped<T>, R>,
38 item_factory: &'static dyn Factory<DynPair<K, DynUnit>>,
39 weighted_item_factory: &'static dyn Factory<WeightedItem<K, DynUnit, R>>,
40 weighted_items_factory: &'static dyn Factory<DynWeightedPairs<DynPair<K, DynUnit>, R>>,
41 weighted_vals_factory: &'static dyn Factory<DynWeightedPairs<DynUnit, R>>,
42 time_diffs_factory: &'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>,
43}
44
45unsafe impl<K, T, R> Send for VecKeyBatchFactories<K, T, R>
46where
47 K: DataTrait + ?Sized,
48 T: Timestamp,
49 R: WeightTrait + ?Sized,
50{
51}
52
53impl<K, T, R> Clone for VecKeyBatchFactories<K, T, R>
54where
55 K: DataTrait + ?Sized,
56 T: Timestamp,
57 R: WeightTrait + ?Sized,
58{
59 fn clone(&self) -> Self {
60 Self {
61 layer_factories: self.layer_factories.clone(),
62 consolidate_weights: self.consolidate_weights,
63 item_factory: self.item_factory,
64 weighted_item_factory: self.weighted_item_factory,
65 weighted_items_factory: self.weighted_items_factory,
66 weighted_vals_factory: self.weighted_vals_factory,
67 time_diffs_factory: self.time_diffs_factory,
68 }
69 }
70}
71
72impl<K, T, R> BatchReaderFactories<K, DynUnit, T, R> for VecKeyBatchFactories<K, T, R>
73where
74 K: DataTrait + ?Sized,
75 T: Timestamp,
76 R: WeightTrait + ?Sized,
77{
78 fn new<KType, VType, RType>() -> Self
79 where
80 KType: DBData + Erase<K>,
81 VType: DBData + Erase<DynUnit>,
82 RType: DBWeight + Erase<R>,
83 {
84 Self {
85 layer_factories: LayerFactories::new::<KType>(
86 <LeafFactories<DynDataTyped<T>, R>>::new::<T, RType>(),
87 ),
88 consolidate_weights: <dyn ConsolidatePairedSlices<_, _>>::factory::<T, RType>(),
89 item_factory: WithFactory::<Tup2<KType, ()>>::FACTORY,
90 weighted_item_factory: WithFactory::<Tup2<Tup2<KType, ()>, RType>>::FACTORY,
91 weighted_items_factory: WithFactory::<LeanVec<Tup2<Tup2<KType, ()>, RType>>>::FACTORY,
92 weighted_vals_factory: WithFactory::<LeanVec<Tup2<(), RType>>>::FACTORY,
93 time_diffs_factory: WithFactory::<LeanVec<Tup2<T, RType>>>::FACTORY,
94 }
95 }
96
97 fn key_factory(&self) -> &'static dyn Factory<K> {
98 self.layer_factories.key
99 }
100
101 fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>> {
102 self.layer_factories.keys
103 }
104
105 fn val_factory(&self) -> &'static dyn Factory<DynUnit> {
106 WithFactory::<()>::FACTORY
107 }
108
109 fn weight_factory(&self) -> &'static dyn Factory<R> {
110 self.layer_factories.child.diff
111 }
112}
113
114impl<K, R, T> BatchFactories<K, DynUnit, T, R> for VecKeyBatchFactories<K, T, R>
115where
116 K: DataTrait + ?Sized,
117 T: Timestamp,
118 R: WeightTrait + ?Sized,
119{
120 fn item_factory(&self) -> &'static dyn Factory<DynPair<K, DynUnit>> {
123 self.item_factory
124 }
125
126 fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, DynUnit, R>> {
127 self.weighted_item_factory
128 }
129
130 fn weighted_items_factory(
131 &self,
132 ) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, DynUnit>, R>> {
133 self.weighted_items_factory
134 }
135
136 fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynUnit, R>> {
137 self.weighted_vals_factory
138 }
139
140 fn time_diffs_factory(
141 &self,
142 ) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>> {
143 Some(self.time_diffs_factory)
144 }
145}
146
147pub type VecKeyBatchLayer<K, T, R, O> = Layer<K, Leaf<DynDataTyped<T>, R>, O>;
148
149#[derive(SizeOf)]
152pub struct VecKeyBatch<K, T, R, O = usize>
153where
154 K: DataTrait + ?Sized,
155 T: Timestamp,
156 R: WeightTrait + ?Sized,
157 O: OrdOffset,
158{
159 pub layer: VecKeyBatchLayer<K, T, R, O>,
161 #[size_of(skip)]
162 factories: VecKeyBatchFactories<K, T, R>,
163}
164
165impl<K, T, R, O> Debug for VecKeyBatch<K, T, R, O>
166where
167 K: DataTrait + ?Sized,
168 R: WeightTrait + ?Sized,
169 T: Timestamp,
170 O: OrdOffset,
171{
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 f.debug_struct("VecKeyBatch")
174 .field("layer", &self.layer)
175 .finish()
176 }
177}
178
179impl<K, T, R, O: OrdOffset> Deserialize<VecKeyBatch<K, T, R, O>, Deserializer> for ()
180where
181 K: DataTrait + ?Sized,
182 T: Timestamp,
183 R: WeightTrait + ?Sized,
184 O: OrdOffset,
185{
186 fn deserialize(
187 &self,
188 _deserializer: &mut Deserializer,
189 ) -> Result<VecKeyBatch<K, T, R, O>, <Deserializer as rkyv::Fallible>::Error> {
190 todo!()
191 }
192}
193
194impl<K, T, R, O> Archive for VecKeyBatch<K, T, R, O>
195where
196 K: DataTrait + ?Sized,
197 T: Timestamp,
198 R: WeightTrait + ?Sized,
199 O: OrdOffset,
200{
201 type Archived = ();
202 type Resolver = ();
203
204 unsafe fn resolve(&self, _pos: usize, _resolver: Self::Resolver, _out: *mut Self::Archived) {
205 todo!()
206 }
207}
208impl<K, T, R, O: OrdOffset> Serialize<Serializer> for VecKeyBatch<K, T, R, O>
209where
210 K: DataTrait + ?Sized,
211 T: Timestamp,
212 R: WeightTrait + ?Sized,
213 O: OrdOffset,
214{
215 fn serialize(
216 &self,
217 _serializer: &mut Serializer,
218 ) -> Result<Self::Resolver, <Serializer as rkyv::Fallible>::Error> {
219 todo!()
220 }
221}
222
223impl<K, T, R, O> Clone for VecKeyBatch<K, T, R, O>
224where
225 K: DataTrait + ?Sized,
226 T: Timestamp,
227 R: WeightTrait + ?Sized,
228 O: OrdOffset,
229{
230 fn clone(&self) -> Self {
231 Self {
232 layer: self.layer.clone(),
233 factories: self.factories.clone(),
234 }
235 }
236}
237
238impl<K, T, R, O> Display for VecKeyBatch<K, T, R, O>
239where
240 K: DataTrait + ?Sized,
241 T: Timestamp,
242 R: WeightTrait + ?Sized,
243 O: OrdOffset,
244{
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 writeln!(
247 f,
248 "layer:\n{}",
249 textwrap::indent(&self.layer.to_string(), " ")
250 )
251 }
252}
253
254impl<K, T, R, O> NumEntries for VecKeyBatch<K, T, R, O>
255where
256 K: DataTrait + ?Sized,
257 T: Timestamp,
258 R: WeightTrait + ?Sized,
259 O: OrdOffset,
260{
261 const CONST_NUM_ENTRIES: Option<usize> = <VecKeyBatchLayer<K, T, R, O>>::CONST_NUM_ENTRIES;
262
263 #[inline]
264 fn num_entries_shallow(&self) -> usize {
265 self.layer.num_entries_shallow()
266 }
267
268 #[inline]
269 fn num_entries_deep(&self) -> usize {
270 self.layer.num_entries_deep()
271 }
272}
273
274impl<K, T, R, O> BatchReader for VecKeyBatch<K, T, R, O>
275where
276 K: DataTrait + ?Sized,
277 T: Timestamp,
278 R: WeightTrait + ?Sized,
279 O: OrdOffset,
280{
281 type Key = K;
282 type Val = DynUnit;
283 type Time = T;
284 type R = R;
285 type Cursor<'s>
286 = ValKeyCursor<'s, K, T, R, O>
287 where
288 O: 's;
289 type Factories = VecKeyBatchFactories<K, T, R>;
290 fn factories(&self) -> Self::Factories {
293 self.factories.clone()
294 }
295
296 fn cursor(&self) -> Self::Cursor<'_> {
297 ValKeyCursor {
298 valid: true,
299 cursor: self.layer.cursor(),
300 }
301 }
302
303 fn key_count(&self) -> usize {
308 <VecKeyBatchLayer<K, T, R, O> as Trie>::keys(&self.layer)
309 }
310
311 fn len(&self) -> usize {
312 <VecKeyBatchLayer<K, T, R, O> as Trie>::tuples(&self.layer)
313 }
314
315 fn approximate_byte_size(&self) -> usize {
316 self.layer.approximate_byte_size()
317 }
318
319 fn filter_stats(&self) -> BloomFilterStats {
320 BloomFilterStats::default()
321 }
322
323 fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
324 where
325 Self::Time: PartialEq<()>,
326 RG: Rng,
327 {
328 self.layer.sample_keys(rng, sample_size, sample);
329 }
330}
331
332impl<K, T, R, O> Batch for VecKeyBatch<K, T, R, O>
333where
334 K: DataTrait + ?Sized,
335 T: Timestamp,
336 R: WeightTrait + ?Sized,
337 O: OrdOffset,
338{
339 type Timed<T2: Timestamp> = VecKeyBatch<K, T2, R, O>;
340 type Batcher = MergeBatcher<Self>;
341 type Builder = VecKeyBuilder<K, T, R, O>;
342 fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
343 unimplemented!()
344 }
345
346 }
350
351#[derive(Debug, SizeOf)]
353pub struct ValKeyCursor<'s, K, T, R, O = usize>
354where
355 K: DataTrait + ?Sized,
356 T: Timestamp,
357 R: WeightTrait + ?Sized,
358 O: OrdOffset,
359{
360 valid: bool,
361 cursor: LayerCursor<'s, K, Leaf<DynDataTyped<T>, R>, O>,
362}
363
364impl<K, T, R, O> Clone for ValKeyCursor<'_, K, T, R, O>
365where
366 K: DataTrait + ?Sized,
367 T: Timestamp,
368 R: WeightTrait + ?Sized,
369 O: OrdOffset,
370{
371 fn clone(&self) -> Self {
372 Self {
373 valid: self.valid,
374 cursor: self.cursor.clone(),
375 }
376 }
377}
378
379impl<K, T, R, O> Cursor<K, DynUnit, T, R> for ValKeyCursor<'_, K, T, R, O>
380where
381 K: DataTrait + ?Sized,
382 T: Timestamp,
383 R: WeightTrait + ?Sized,
384 O: OrdOffset,
385{
386 fn weight_factory(&self) -> &'static dyn Factory<R> {
395 self.cursor.child.storage.factories.diff
396 }
397
398 fn key(&self) -> &K {
399 self.cursor.item()
400 }
401
402 fn val(&self) -> &DynUnit {
403 &()
404 }
405
406 fn map_times(&mut self, logic: &mut dyn FnMut(&T, &R)) {
407 self.cursor.child.rewind();
408 while self.cursor.child.valid() {
409 logic(
410 self.cursor.child.current_key(),
411 self.cursor.child.current_diff(),
412 );
413 self.cursor.child.step();
414 }
415 }
416
417 fn map_times_through(&mut self, upper: &T, logic: &mut dyn FnMut(&T, &R)) {
418 self.cursor.child.rewind();
419 while self.cursor.child.valid() {
420 if self.cursor.child.item().0.less_equal(upper) {
421 logic(
422 self.cursor.child.current_key(),
423 self.cursor.child.current_diff(),
424 );
425 }
426 self.cursor.child.step();
427 }
428 }
429
430 fn weight(&mut self) -> &R
431 where
432 T: PartialEq<()>,
433 {
434 self.weight_checked()
435 }
436
437 fn weight_checked(&mut self) -> &R {
438 if TypeId::of::<T>() == TypeId::of::<()>() {
439 debug_assert!(&self.cursor.child.valid());
440 self.cursor.child.current_diff()
441 } else {
442 panic!("VecKeyCursor::weight_checked called on non-unit timestamp type");
443 }
444 }
445
446 fn map_values(&mut self, logic: &mut dyn FnMut(&DynUnit, &R))
447 where
448 T: PartialEq<()>,
449 {
450 if self.val_valid() {
451 logic(self.val(), self.cursor.child.current_diff());
452 }
453 }
454
455 fn key_valid(&self) -> bool {
456 self.cursor.valid()
457 }
458
459 fn val_valid(&self) -> bool {
460 self.valid
461 }
462
463 fn step_key(&mut self) {
464 self.cursor.step();
465 self.valid = true;
466 }
467
468 fn step_key_reverse(&mut self) {
469 self.cursor.step_reverse();
470 self.valid = true;
471 }
472
473 fn seek_key(&mut self, key: &K) {
474 self.cursor.seek(key);
475 self.valid = true;
476 }
477
478 fn seek_key_exact(&mut self, key: &K, _hash: Option<u64>) -> bool {
479 self.seek_key(key);
480 self.key_valid() && self.key().eq(key)
481 }
482
483 fn seek_key_with(&mut self, predicate: &dyn Fn(&K) -> bool) {
484 self.cursor.seek_with(predicate);
485 self.valid = true;
486 }
487
488 fn seek_key_with_reverse(&mut self, predicate: &dyn Fn(&K) -> bool) {
489 self.cursor.seek_with_reverse(predicate);
490 self.valid = true;
491 }
492
493 fn seek_key_reverse(&mut self, key: &K) {
494 self.cursor.seek_reverse(key);
495 self.valid = true;
496 }
497
498 fn step_val(&mut self) {
499 self.valid = false;
500 }
501
502 fn seek_val(&mut self, _val: &DynUnit) {}
503
504 fn seek_val_with(&mut self, predicate: &dyn Fn(&DynUnit) -> bool) {
505 if !predicate(&()) {
506 self.valid = false;
507 }
508 }
509
510 fn rewind_keys(&mut self) {
511 self.cursor.rewind();
512 self.valid = true;
513 }
514
515 fn fast_forward_keys(&mut self) {
516 self.cursor.fast_forward();
517 self.valid = true;
518 }
519
520 fn rewind_vals(&mut self) {
521 self.valid = true;
522 }
523
524 fn step_val_reverse(&mut self) {
525 self.valid = false;
526 }
527
528 fn seek_val_reverse(&mut self, _val: &DynUnit) {}
529
530 fn seek_val_with_reverse(&mut self, predicate: &dyn Fn(&DynUnit) -> bool) {
531 if !predicate(&()) {
532 self.valid = false;
533 }
534 }
535
536 fn fast_forward_vals(&mut self) {
537 self.valid = true;
538 }
539
540 fn position(&self) -> Option<Position> {
541 Some(Position {
542 total: TrieCursor::keys(&self.cursor) as u64,
543 offset: self.cursor.pos() as u64,
544 })
545 }
546}
547
548#[derive(SizeOf)]
550pub struct VecKeyBuilder<K, T, R, O = usize>
551where
552 K: DataTrait + ?Sized,
553 T: Timestamp,
554 R: WeightTrait + ?Sized,
555 O: OrdOffset,
556{
557 #[size_of(skip)]
558 factories: VecKeyBatchFactories<K, T, R>,
559 keys: Box<DynVec<K>>,
560 offs: Vec<O>,
561 times: Box<DynVec<DynDataTyped<T>>>,
562 diffs: Box<DynVec<R>>,
563}
564
565impl<K, T, R, O> VecKeyBuilder<K, T, R, O>
566where
567 K: DataTrait + ?Sized,
568 T: Timestamp,
569 R: WeightTrait + ?Sized,
570 O: OrdOffset,
571{
572 fn pushed_key(&mut self) {
573 let off = O::from_usize(self.times.len());
574 debug_assert!(off > *self.offs.last().unwrap());
575 self.offs.push(off);
576 }
577}
578
579impl<K, T, R, O> Builder<VecKeyBatch<K, T, R, O>> for VecKeyBuilder<K, T, R, O>
580where
581 K: DataTrait + ?Sized,
582 T: Timestamp,
583 R: WeightTrait + ?Sized,
584 O: OrdOffset,
585{
586 fn with_capacity(
587 factories: &VecKeyBatchFactories<K, T, R>,
588 key_capacity: usize,
589 value_capacity: usize,
590 ) -> Self {
591 let mut keys = factories.layer_factories.keys.default_box();
592 keys.reserve_exact(key_capacity);
593
594 let mut offs = Vec::with_capacity(key_capacity + 1);
595 offs.push(O::zero());
596
597 let mut times = factories.layer_factories.child.keys.default_box();
598 times.reserve_exact(value_capacity);
599
600 let mut diffs = factories.layer_factories.child.diffs.default_box();
601 diffs.reserve_exact(value_capacity);
602 Self {
603 factories: factories.clone(),
604 keys,
605 offs,
606 times,
607 diffs,
608 }
609 }
610
611 fn reserve(&mut self, additional: usize) {
612 self.keys.reserve(additional);
613 self.offs.reserve(additional);
614 self.times.reserve(additional);
615 self.diffs.reserve(additional);
616 }
617
618 fn push_key(&mut self, key: &K) {
619 self.keys.push_ref(key);
620 self.pushed_key();
621 }
622
623 fn push_key_mut(&mut self, key: &mut K) {
624 self.keys.push_val(key);
625 self.pushed_key();
626 }
627
628 fn push_val(&mut self, _val: &DynUnit) {}
629
630 fn push_time_diff(&mut self, time: &T, weight: &R) {
631 debug_assert!(!weight.is_zero());
632 self.times.push(time.clone());
633 self.diffs.push_ref(weight);
634 }
635
636 fn push_time_diff_mut(&mut self, time: &mut T, weight: &mut R) {
637 debug_assert!(!weight.is_zero());
638 self.times.push(time.clone());
639 self.diffs.push_val(weight);
640 }
641
642 fn done(self) -> VecKeyBatch<K, T, R, O> {
643 VecKeyBatch {
644 layer: Layer::from_parts(
645 &self.factories.layer_factories,
646 self.keys,
647 self.offs,
648 Leaf::from_parts(
649 &self.factories.layer_factories.child,
650 self.times,
651 self.diffs,
652 ),
653 ),
654 factories: self.factories,
655 }
656 }
657
658 fn num_keys(&self) -> usize {
659 self.keys.len()
660 }
661
662 fn num_tuples(&self) -> usize {
663 self.diffs.len()
664 }
665}
666
667