1use crate::circuit::metadata::OperatorMeta;
30use crate::dynamic::{ClonableTrait, DynDataTyped, DynUnit, Weight};
31use crate::storage::buffer_cache::CacheStats;
32use crate::storage::file::SerializerInner;
33use crate::storage::file::TouchedWindowCount;
34pub use crate::storage::file::{DbspSerializer, Deserializable, Deserializer, Rkyv};
35use crate::storage::file::{FilterKind, FilterStats};
36use crate::trace::cursor::{
37 DefaultPushCursor, FilteredMergeCursor, FilteredMergeCursorWithSnapshot, PushCursor,
38 UnfilteredMergeCursor,
39};
40use crate::utils::{IsNone, SupportsRoaring};
41use crate::{dynamic::ArchivedDBData, storage::buffer_cache::FBuf};
42use cursor::CursorFactory;
43use enum_map::Enum;
44use feldera_storage::fbuf::FBufSerializer;
45use feldera_storage::{FileCommitter, FileReader, StoragePath};
46use rand::{Rng, thread_rng};
47use rkyv::ser::Serializer as _;
48use size_of::SizeOf;
49use std::any::TypeId;
50use std::future::Future;
51use std::sync::Arc;
52use std::{fmt::Debug, hash::Hash};
53
54pub mod cursor;
55pub mod filter;
56pub mod layers;
57pub mod ord;
58mod sampling;
59pub mod spine_async;
60pub(crate) use sampling::sample_keys_from_batches;
61pub use spine_async::{BatchReaderWithSnapshot, ListMerger, Spine, SpineSnapshot, WithSnapshot};
62
63#[cfg(test)]
64pub mod test;
65
66pub use ord::{
67 FallbackIndexedWSet, FallbackIndexedWSetBuilder, FallbackIndexedWSetFactories,
68 FallbackKeyBatch, FallbackKeyBatchFactories, FallbackValBatch, FallbackValBatchFactories,
69 FallbackWSet, FallbackWSetBuilder, FallbackWSetFactories, FileIndexedWSet,
70 FileIndexedWSetFactories, FileKeyBatch, FileKeyBatchFactories, FileValBatch,
71 FileValBatchFactories, FileWSet, FileWSetFactories, OrdIndexedWSet, OrdIndexedWSetBuilder,
72 OrdIndexedWSetFactories, OrdKeyBatch, OrdKeyBatchFactories, OrdValBatch, OrdValBatchFactories,
73 OrdWSet, OrdWSetBuilder, OrdWSetFactories, VecIndexedWSet, VecIndexedWSetFactories,
74 VecKeyBatch, VecKeyBatchFactories, VecValBatch, VecValBatchFactories, VecWSet,
75 VecWSetFactories,
76};
77
78use rkyv::bytecheck;
79use rkyv::{Deserialize, archived_root};
80
81use crate::{
82 Error, NumEntries, Timestamp,
83 algebra::MonoidValue,
84 dynamic::{DataTrait, DynPair, DynVec, DynWeightedPairs, Erase, Factory, WeightTrait},
85 storage::file::reader::Error as ReaderError,
86};
87pub use cursor::{Cursor, MergeCursor};
88pub use filter::{BatchFilterStats, BatchFilters, Filter, GroupFilter};
89pub use layers::Trie;
90
91pub trait DBData:
99 Default
100 + Clone
101 + Eq
102 + Ord
103 + Hash
104 + SizeOf
105 + Send
106 + Sync
107 + Debug
108 + ArchivedDBData
109 + IsNone<Inner: ArchivedDBData>
110 + SupportsRoaring
111 + 'static
112{
113}
114
115impl<T> DBData for T where
117 T: Default
118 + Clone
119 + Eq
120 + Ord
121 + Hash
122 + SizeOf
123 + Send
124 + Sync
125 + Debug
126 + ArchivedDBData
127 + IsNone<Inner: ArchivedDBData>
128 + SupportsRoaring
129 + 'static
130{
131}
132
133#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive)]
135#[archive_attr(derive(rkyv::CheckBytes))]
136pub(crate) struct CommittedSpine {
137 pub batches: Vec<String>,
138 pub merged: Vec<(String, String)>,
139 pub effort: u64,
140 pub dirty: bool,
141}
142
143pub fn unaligned_deserialize<T: Deserializable>(bytes: &[u8]) -> T {
146 let mut aligned_bytes = FBuf::new();
147 aligned_bytes.extend_from_slice(bytes);
148 aligned_deserialize(&aligned_bytes)
149}
150
151pub fn aligned_deserialize<T: Deserializable>(bytes: &[u8]) -> T {
154 unsafe { archived_root::<T>(bytes) }
155 .deserialize(&mut Deserializer::default())
156 .unwrap()
157}
158
159pub trait DBWeight: DBData + MonoidValue {}
178impl<T> DBWeight for T where T: DBData + MonoidValue {}
179
180pub trait BatchReaderFactories<
181 K: DataTrait + ?Sized,
182 V: DataTrait + ?Sized,
183 T,
184 R: WeightTrait + ?Sized,
185>: Clone + Send + Sync
186{
187 fn new<KType, VType, RType>() -> Self
189 where
190 KType: DBData + Erase<K>,
191 VType: DBData + Erase<V>,
192 RType: DBWeight + Erase<R>;
193
194 fn key_factory(&self) -> &'static dyn Factory<K>;
195 fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>>;
196 fn val_factory(&self) -> &'static dyn Factory<V>;
197 fn weight_factory(&self) -> &'static dyn Factory<R>;
198}
199
200pub type WeightedItem<K, V, R> = DynPair<DynPair<K, V>, R>;
202
203pub trait BatchFactories<K: DataTrait + ?Sized, V: DataTrait + ?Sized, T, R: WeightTrait + ?Sized>:
204 BatchReaderFactories<K, V, T, R>
205{
206 fn item_factory(&self) -> &'static dyn Factory<DynPair<K, V>>;
207
208 fn weighted_items_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, V>, R>>;
209 fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<V, R>>;
210 fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, V, R>>;
211
212 fn time_diffs_factory(
214 &self,
215 ) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>>;
216}
217
218pub trait Trace: BatchReader {
226 type Batch: Batch<
228 Key = Self::Key,
229 Val = Self::Val,
230 Time = Self::Time,
231 R = Self::R,
232 Factories = Self::Factories,
233 >;
234
235 fn new(factories: &Self::Factories) -> Self;
237
238 fn set_frontier(&mut self, frontier: &Self::Time);
247
248 fn exert(&mut self, effort: &mut isize);
250
251 fn consolidate(self) -> Option<Self::Batch>;
253
254 fn insert(&mut self, batch: impl Into<Arc<Self::Batch>>) -> impl Future<Output = ()>;
259
260 fn clear_dirty_flag(&mut self);
268
269 fn dirty(&self) -> bool;
271
272 fn retain_keys(&mut self, filter: Filter<Self::Key>);
289
290 fn retain_values(&mut self, filter: GroupFilter<Self::Val>);
298
299 fn key_filter(&self) -> &Option<Filter<Self::Key>>;
300 fn value_filter(&self) -> &Option<GroupFilter<Self::Val>>;
301
302 fn save(
306 &mut self,
307 base: &StoragePath,
308 pid: &str,
309 files: &mut Vec<Arc<dyn FileCommitter>>,
310 ) -> Result<(), Error>;
311
312 fn restore(&mut self, base: &StoragePath, pid: &str) -> Result<(), Error>;
315
316 fn metadata(&self, _meta: &mut OperatorMeta) {}
318
319 fn initiate_compaction(&self);
320}
321
322#[derive(Copy, Clone, Debug, PartialEq, Eq, Enum)]
324pub enum BatchLocation {
325 Memory,
327
328 Storage,
330}
331
332pub trait BatchReader: Debug + NumEntries + Rkyv + SizeOf + 'static
357where
358 Self: Sized,
359{
360 type Factories: BatchFactories<Self::Key, Self::Val, Self::Time, Self::R>;
361
362 type Key: DataTrait + ?Sized;
364
365 type Val: DataTrait + ?Sized;
367
368 type Time: Timestamp;
370
371 type R: WeightTrait + ?Sized;
373
374 type Cursor<'s>: Cursor<Self::Key, Self::Val, Self::Time, Self::R> + Clone + Send
376 where
377 Self: 's;
378
379 fn factories(&self) -> Self::Factories;
382
383 fn cursor(&self) -> Self::Cursor<'_>;
385
386 fn push_cursor(
388 &self,
389 ) -> Box<dyn PushCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
390 Box::new(DefaultPushCursor::new(self.cursor()))
391 }
392
393 fn merge_cursor(
395 &self,
396 key_filter: Option<Filter<Self::Key>>,
397 value_filter: Option<GroupFilter<Self::Val>>,
398 ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
399 if key_filter.is_none() && value_filter.is_none() {
400 Box::new(UnfilteredMergeCursor::new(self.cursor()))
401 } else if let Some(GroupFilter::Simple(filter)) = value_filter {
402 Box::new(FilteredMergeCursor::new(
403 self.cursor(),
404 key_filter,
405 Some(filter),
406 ))
407 } else {
408 Box::new(FilteredMergeCursor::new(self.cursor(), key_filter, None))
411 }
412 }
413
414 fn merge_cursor_with_snapshot<'a, S>(
417 &'a self,
418 key_filter: Option<Filter<Self::Key>>,
419 value_filter: Option<GroupFilter<Self::Val>>,
420 snapshot: &'a Option<Arc<S>>,
421 ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + 'a>
422 where
423 S: BatchReader<Key = Self::Key, Val = Self::Val, Time = Self::Time, R = Self::R>,
424 {
425 let Some(snapshot) = snapshot else {
426 return self.merge_cursor(key_filter, value_filter);
427 };
428 if key_filter.is_none() && value_filter.is_none() {
429 Box::new(UnfilteredMergeCursor::new(self.cursor()))
430 } else if value_filter.is_none() {
431 Box::new(FilteredMergeCursor::new(self.cursor(), key_filter, None))
432 } else if let Some(GroupFilter::Simple(filter)) = value_filter {
433 Box::new(FilteredMergeCursor::new(
434 self.cursor(),
435 key_filter,
436 Some(filter),
437 ))
438 } else {
439 Box::new(FilteredMergeCursorWithSnapshot::new(
440 self.cursor(),
441 key_filter,
442 value_filter.unwrap(),
443 snapshot,
444 ))
445 }
446 }
447
448 fn consuming_cursor(
450 &mut self,
451 key_filter: Option<Filter<Self::Key>>,
452 value_filter: Option<GroupFilter<Self::Val>>,
453 ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
454 self.merge_cursor(key_filter, value_filter)
455 }
456 fn key_count(&self) -> usize;
464
465 fn len(&self) -> usize;
467
468 fn approximate_byte_size(&self) -> usize;
480
481 fn membership_filter_stats(&self) -> FilterStats {
487 FilterStats::default()
488 }
489
490 fn membership_filter_kind(&self) -> FilterKind {
493 FilterKind::None
494 }
495
496 fn range_filter_stats(&self) -> FilterStats {
502 FilterStats::default()
503 }
504
505 fn location(&self) -> BatchLocation {
507 BatchLocation::Memory
508 }
509
510 fn cache_stats(&self) -> CacheStats {
514 CacheStats::default()
515 }
516
517 fn is_empty(&self) -> bool {
519 self.len() == 0
520 }
521
522 fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
554 where
555 RG: Rng;
556
557 fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<Self::Key>)
568 where
569 Self::Time: PartialEq<()>,
570 {
571 bounds.clear();
572 if num_partitions <= 1 {
573 return;
574 }
575
576 let sample_size = num_partitions * num_partitions;
577
578 let mut sample = self.factories().keys_factory().default_box();
579 self.sample_keys(&mut thread_rng(), sample_size, sample.as_mut());
580
581 let sample_len = sample.len();
583 if sample_len == 0 {
584 return;
585 }
586
587 if sample_len >= num_partitions {
588 for i in 0..num_partitions - 1 {
591 let idx = ((i + 1) * sample_len) / num_partitions;
592 let idx = idx.min(sample_len - 1);
593 bounds.push_ref(sample.index(idx));
594 }
595 } else {
596 for i in 0..sample_len {
598 bounds.push_ref(sample.index(i));
599 }
600 }
601 }
602
603 #[allow(async_fn_in_trait)]
629 async fn fetch<B>(
630 &self,
631 keys: &B,
632 ) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
633 where
634 B: BatchReader<Key = Self::Key, Time = ()>,
635 {
636 let _ = keys;
637 None
638 }
639
640 fn keys(&self) -> Option<&DynVec<Self::Key>> {
641 None
642 }
643}
644
645impl<B> BatchReader for Arc<B>
646where
647 B: BatchReader,
648{
649 type Factories = B::Factories;
650 type Key = B::Key;
651 type Val = B::Val;
652 type Time = B::Time;
653 type R = B::R;
654 type Cursor<'s> = B::Cursor<'s>;
655 fn factories(&self) -> Self::Factories {
656 (**self).factories()
657 }
658 fn cursor(&self) -> Self::Cursor<'_> {
659 (**self).cursor()
660 }
661 fn merge_cursor(
662 &self,
663 key_filter: Option<Filter<Self::Key>>,
664 value_filter: Option<GroupFilter<Self::Val>>,
665 ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
666 (**self).merge_cursor(key_filter, value_filter)
667 }
668 fn key_count(&self) -> usize {
669 (**self).key_count()
670 }
671 fn len(&self) -> usize {
672 (**self).len()
673 }
674 fn approximate_byte_size(&self) -> usize {
675 (**self).approximate_byte_size()
676 }
677 fn membership_filter_stats(&self) -> FilterStats {
678 (**self).membership_filter_stats()
679 }
680 fn membership_filter_kind(&self) -> FilterKind {
681 (**self).membership_filter_kind()
682 }
683 fn range_filter_stats(&self) -> FilterStats {
684 (**self).range_filter_stats()
685 }
686 fn location(&self) -> BatchLocation {
687 (**self).location()
688 }
689 fn cache_stats(&self) -> CacheStats {
690 (**self).cache_stats()
691 }
692 fn is_empty(&self) -> bool {
693 (**self).is_empty()
694 }
695 fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
696 where
697 RG: Rng,
698 {
699 (**self).sample_keys(rng, sample_size, sample)
700 }
701 fn consuming_cursor(
702 &mut self,
703 key_filter: Option<Filter<Self::Key>>,
704 value_filter: Option<GroupFilter<Self::Val>>,
705 ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
706 (**self).merge_cursor(key_filter, value_filter)
707 }
708 async fn fetch<KB>(
709 &self,
710 keys: &KB,
711 ) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
712 where
713 KB: BatchReader<Key = Self::Key, Time = ()>,
714 {
715 (**self).fetch(keys).await
716 }
717 fn keys(&self) -> Option<&DynVec<Self::Key>> {
718 (**self).keys()
719 }
720}
721
722pub trait Batch: BatchReader + Clone + Send + Sync
732where
733 Self: Sized,
734{
735 type Timed<T: Timestamp>: Batch<
737 Key = <Self as BatchReader>::Key,
738 Val = <Self as BatchReader>::Val,
739 Time = T,
740 R = <Self as BatchReader>::R,
741 >;
742
743 type Batcher: Batcher<Self>;
745
746 type Builder: Builder<Self>;
748
749 #[allow(clippy::type_complexity)]
751 fn dyn_from_tuples(
752 factories: &Self::Factories,
753 time: Self::Time,
754 tuples: &mut Box<DynWeightedPairs<DynPair<Self::Key, Self::Val>, Self::R>>,
755 ) -> Self {
756 let mut batcher = Self::Batcher::new_batcher(factories, time);
757 batcher.push_batch(tuples);
758 batcher.seal()
759 }
760
761 fn from_batch<BI>(batch: &BI, timestamp: &Self::Time, factories: &Self::Factories) -> Self
774 where
775 BI: BatchReader<Key = Self::Key, Val = Self::Val, Time = (), R = Self::R>,
776 {
777 if TypeId::of::<BI>() == TypeId::of::<Self>() {
781 unsafe { std::mem::transmute::<&BI, &Self>(batch).clone() }
782 } else {
783 Self::from_cursor(
784 batch.cursor(),
785 timestamp,
786 factories,
787 batch.key_count(),
788 batch.len(),
789 )
790 }
791 }
792
793 fn from_arc_batch<BI>(
795 batch: &Arc<BI>,
796 timestamp: &Self::Time,
797 factories: &Self::Factories,
798 ) -> Arc<Self>
799 where
800 BI: BatchReader<Key = Self::Key, Val = Self::Val, Time = (), R = Self::R>,
801 {
802 if TypeId::of::<BI>() == TypeId::of::<Self>() {
806 unsafe { std::mem::transmute::<&Arc<BI>, &Arc<Self>>(batch).clone() }
807 } else {
808 Arc::new(Self::from_cursor(
809 batch.cursor(),
810 timestamp,
811 factories,
812 batch.key_count(),
813 batch.len(),
814 ))
815 }
816 }
817
818 fn from_cursor<C>(
821 mut cursor: C,
822 timestamp: &Self::Time,
823 factories: &Self::Factories,
824 key_capacity: usize,
825 value_capacity: usize,
826 ) -> Self
827 where
828 C: Cursor<Self::Key, Self::Val, (), Self::R>,
829 {
830 let mut builder = Self::Builder::with_capacity(factories, key_capacity, value_capacity);
831 while cursor.key_valid() {
832 let mut any_values = false;
833 while cursor.val_valid() {
834 let weight = cursor.weight();
835 debug_assert!(!weight.is_zero());
836 builder.push_time_diff(timestamp, weight);
837 builder.push_val(cursor.val());
838 any_values = true;
839 cursor.step_val();
840 }
841 if any_values {
842 builder.push_key(cursor.key());
843 }
844 cursor.step_key();
845 }
846 builder.done()
847 }
848
849 fn dyn_empty(factories: &Self::Factories) -> Self {
851 Self::Builder::new_builder(factories).done()
852 }
853
854 fn filter(&self, predicate: &dyn Fn(&Self::Key, &Self::Val) -> bool) -> Self
856 where
857 Self::Time: PartialEq<()> + From<()>,
858 {
859 let factories = self.factories();
860 let mut builder = Self::Builder::new_builder(&factories);
861 let mut cursor = self.cursor();
862
863 while cursor.key_valid() {
864 let mut any_values = false;
865 while cursor.val_valid() {
866 if predicate(cursor.key(), cursor.val()) {
867 builder.push_diff(cursor.weight());
868 builder.push_val(cursor.val());
869 any_values = true;
870 }
871 cursor.step_val();
872 }
873 if any_values {
874 builder.push_key(cursor.key());
875 }
876 cursor.step_key();
877 }
878
879 builder.done()
880 }
881
882 fn persisted(&self) -> Option<Self> {
885 None
886 }
887
888 fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
893 None
894 }
895
896 fn from_path(_factories: &Self::Factories, _path: &StoragePath) -> Result<Self, ReaderError> {
897 Err(ReaderError::Unsupported)
898 }
899
900 fn key_bounds(&self) -> Option<(&Self::Key, &Self::Key)>;
909
910 fn touched_window_count(&self) -> TouchedWindowCount;
922
923 fn negative_weight_count(&self) -> Option<u64>;
934}
935
936pub trait Batcher<Output>: SizeOf
938where
939 Output: Batch,
940{
941 fn new_batcher(vtables: &Output::Factories, time: Output::Time) -> Self;
944
945 fn push_batch(
947 &mut self,
948 batch: &mut Box<DynWeightedPairs<DynPair<Output::Key, Output::Val>, Output::R>>,
949 );
950
951 fn push_consolidated_batch(
956 &mut self,
957 batch: &mut Box<DynWeightedPairs<DynPair<Output::Key, Output::Val>, Output::R>>,
958 );
959
960 fn tuples(&self) -> usize;
962
963 fn seal(self) -> Output;
965}
966
967pub trait Builder<Output>: Send + SizeOf
1016where
1017 Self: Sized,
1018 Output: Batch,
1019{
1020 fn new_builder(factories: &Output::Factories) -> Self {
1022 Self::with_capacity(factories, 0, 0)
1023 }
1024
1025 fn with_capacity_in_location(
1033 factories: &Output::Factories,
1034 key_capacity: usize,
1035 value_capacity: usize,
1036 location: Option<BatchLocation>,
1037 ) -> Self;
1038
1039 fn with_capacity(
1043 factories: &Output::Factories,
1044 key_capacity: usize,
1045 value_capacity: usize,
1046 ) -> Self {
1047 Self::with_capacity_in_location(factories, key_capacity, value_capacity, None)
1048 }
1049
1050 fn for_merge<'a, B, I>(
1054 factories: &Output::Factories,
1055 batches: I,
1056 location: Option<BatchLocation>,
1057 ) -> Self
1058 where
1059 B: Batch<Key = Output::Key, Val = Output::Val, Time = Output::Time, R = Output::R>,
1060 I: IntoIterator<Item = &'a B> + Clone,
1061 {
1062 let key_capacity = batches.clone().into_iter().map(|b| b.key_count()).sum();
1063 let value_capacity = batches.into_iter().map(|b| b.len()).sum();
1064 Self::with_capacity_in_location(factories, key_capacity, value_capacity, location)
1065 }
1066
1067 fn push_time_diff(&mut self, time: &Output::Time, weight: &Output::R);
1069
1070 fn push_time_diff_mut(&mut self, time: &mut Output::Time, weight: &mut Output::R) {
1072 self.push_time_diff(time, weight);
1073 }
1074
1075 fn push_val(&mut self, val: &Output::Val);
1077
1078 fn push_val_mut(&mut self, val: &mut Output::Val) {
1080 self.push_val(val);
1081 }
1082
1083 fn push_key(&mut self, key: &Output::Key);
1085
1086 fn push_key_mut(&mut self, key: &mut Output::Key) {
1088 self.push_key(key);
1089 }
1090
1091 fn push_diff(&mut self, weight: &Output::R)
1093 where
1094 Output::Time: PartialEq<()>,
1095 {
1096 self.push_time_diff(&Output::Time::default(), weight);
1097 }
1098
1099 fn push_diff_mut(&mut self, weight: &mut Output::R)
1101 where
1102 Output::Time: PartialEq<()>,
1103 {
1104 self.push_diff(weight);
1105 }
1106
1107 fn push_val_diff(&mut self, val: &Output::Val, weight: &Output::R)
1109 where
1110 Output::Time: PartialEq<()>,
1111 {
1112 self.push_time_diff(&Output::Time::default(), weight);
1113 self.push_val(val);
1114 }
1115
1116 fn push_val_diff_mut(&mut self, val: &mut Output::Val, weight: &mut Output::R)
1118 where
1119 Output::Time: PartialEq<()>,
1120 {
1121 self.push_val_diff(val, weight);
1122 }
1123
1124 fn reserve(&mut self, additional: usize) {
1126 let _ = additional;
1127 }
1128
1129 fn num_keys(&self) -> usize;
1130 fn num_tuples(&self) -> usize;
1131
1132 fn done(self) -> Output;
1134}
1135
1136pub struct TupleBuilder<B, Output>
1140where
1141 B: Builder<Output>,
1142 Output: Batch,
1143{
1144 builder: B,
1145 kv: Box<DynPair<Output::Key, Output::Val>>,
1146 has_kv: bool,
1147}
1148
1149impl<B, Output> TupleBuilder<B, Output>
1150where
1151 B: Builder<Output>,
1152 Output: Batch,
1153{
1154 pub fn new(factories: &Output::Factories, builder: B) -> Self {
1155 Self {
1156 builder,
1157 kv: factories.item_factory().default_box(),
1158 has_kv: false,
1159 }
1160 }
1161
1162 pub fn num_keys(&self) -> usize {
1163 self.builder.num_keys()
1164 }
1165
1166 pub fn num_tuples(&self) -> usize {
1167 self.builder.num_tuples()
1168 }
1169
1170 pub fn push(&mut self, element: &mut DynPair<DynPair<Output::Key, Output::Val>, Output::R>)
1172 where
1173 Output::Time: PartialEq<()>,
1174 {
1175 let (kv, w) = element.split_mut();
1176 let (k, v) = kv.split_mut();
1177 self.push_vals(k, v, &mut Output::Time::default(), w);
1178 }
1179
1180 pub fn push_refs(
1182 &mut self,
1183 key: &Output::Key,
1184 val: &Output::Val,
1185 time: &Output::Time,
1186 weight: &Output::R,
1187 ) {
1188 if self.has_kv {
1189 let (k, v) = self.kv.split_mut();
1190 if k != key {
1191 self.builder.push_val_mut(v);
1192 self.builder.push_key_mut(k);
1193 self.kv.from_refs(key, val);
1194 } else if v != val {
1195 self.builder.push_val_mut(v);
1196 val.clone_to(v);
1197 }
1198 } else {
1199 self.has_kv = true;
1200 self.kv.from_refs(key, val);
1201 }
1202 self.builder.push_time_diff(time, weight);
1203 }
1204
1205 pub fn push_vals(
1207 &mut self,
1208 key: &mut Output::Key,
1209 val: &mut Output::Val,
1210 time: &mut Output::Time,
1211 weight: &mut Output::R,
1212 ) {
1213 if self.has_kv {
1214 let (k, v) = self.kv.split_mut();
1215 if k != key {
1216 self.builder.push_val_mut(v);
1217 self.builder.push_key_mut(k);
1218 self.kv.from_vals(key, val);
1219 } else if v != val {
1220 self.builder.push_val_mut(v);
1221 val.move_to(v);
1222 }
1223 } else {
1224 self.has_kv = true;
1225 self.kv.from_vals(key, val);
1226 }
1227 self.builder.push_time_diff_mut(time, weight);
1228 }
1229
1230 pub fn reserve(&mut self, additional: usize) {
1231 self.builder.reserve(additional)
1232 }
1233
1234 pub fn extend<'a, I>(&mut self, iter: I)
1236 where
1237 Output::Time: PartialEq<()>,
1238 I: Iterator<Item = &'a mut WeightedItem<Output::Key, Output::Val, Output::R>>,
1239 {
1240 let (lower, upper) = iter.size_hint();
1241 self.reserve(upper.unwrap_or(lower));
1242
1243 for item in iter {
1244 let (kv, w) = item.split_mut();
1245 let (k, v) = kv.split_mut();
1246
1247 self.push_vals(k, v, &mut Output::Time::default(), w);
1248 }
1249 }
1250
1251 pub fn done(mut self) -> Output {
1253 if self.has_kv {
1254 let (k, v) = self.kv.split_mut();
1255 self.builder.push_val_mut(v);
1256 self.builder.push_key_mut(k);
1257 }
1258 self.builder.done()
1259 }
1260}
1261
1262pub fn merge_batches<B, T>(
1270 factories: &B::Factories,
1271 batches: T,
1272 key_filter: &Option<Filter<B::Key>>,
1273 value_filter: &Option<GroupFilter<B::Val>>,
1274) -> B
1275where
1276 T: IntoIterator<Item = B>,
1277 B: Batch,
1278{
1279 let mut batches = batches
1281 .into_iter()
1282 .filter(|b| !b.is_empty())
1283 .collect::<Vec<_>>();
1284
1285 while batches.len() > 1 {
1290 let mut inputs = batches.split_off(batches.len().saturating_sub(64));
1291 let result: B = ListMerger::merge(
1292 factories,
1293 B::Builder::for_merge(factories, &inputs, Some(BatchLocation::Memory)),
1294 inputs
1295 .iter_mut()
1296 .map(|b| b.consuming_cursor(key_filter.clone(), value_filter.clone()))
1297 .collect(),
1298 );
1299 if !result.is_empty() {
1300 batches.push(result);
1301 }
1302 }
1303
1304 batches.pop().unwrap_or_else(|| B::dyn_empty(factories))
1307}
1308
1309pub fn merge_batches_by_reference<'a, B, T>(
1314 factories: &B::Factories,
1315 batches: T,
1316 key_filter: &Option<Filter<B::Key>>,
1317 value_filter: &Option<GroupFilter<B::Val>>,
1318) -> B
1319where
1320 T: IntoIterator<Item = &'a B>,
1321 B: Batch,
1322{
1323 let mut batches = batches
1325 .into_iter()
1326 .filter(|b| !b.is_empty())
1327 .collect::<Vec<_>>();
1328
1329 let mut outputs = Vec::with_capacity(batches.len().div_ceil(64));
1335 while !batches.is_empty() {
1336 let inputs = batches.split_off(batches.len().saturating_sub(64));
1337 let result: B = ListMerger::merge(
1338 factories,
1339 B::Builder::for_merge(
1340 factories,
1341 inputs.iter().cloned(),
1342 Some(BatchLocation::Memory),
1343 ),
1344 inputs
1345 .into_iter()
1346 .map(|b| b.merge_cursor(key_filter.clone(), value_filter.clone()))
1347 .collect(),
1348 );
1349 if !result.is_empty() {
1350 outputs.push(result);
1351 }
1352 }
1353
1354 merge_batches(factories, outputs, key_filter, value_filter)
1356}
1357
1358pub fn eq_batch<A, B, KA, VA, RA, KB, VB, RB>(a: &A, b: &B) -> bool
1366where
1367 A: BatchReader<Key = KA, Val = VA, Time = (), R = RA>,
1368 B: BatchReader<Key = KB, Val = VB, Time = (), R = RB>,
1369 KA: PartialEq<KB> + ?Sized,
1370 VA: PartialEq<VB> + ?Sized,
1371 RA: PartialEq<RB> + ?Sized,
1372 KB: ?Sized,
1373 VB: ?Sized,
1374 RB: ?Sized,
1375{
1376 let mut c1 = a.cursor();
1377 let mut c2 = b.cursor();
1378 while c1.key_valid() && c2.key_valid() {
1379 if c1.key() != c2.key() {
1380 return false;
1381 }
1382 while c1.val_valid() && c2.val_valid() {
1383 if c1.val() != c2.val() || c1.weight() != c2.weight() {
1384 return false;
1385 }
1386 c1.step_val();
1387 c2.step_val();
1388 }
1389 if c1.val_valid() || c2.val_valid() {
1390 return false;
1391 }
1392 c1.step_key();
1393 c2.step_key();
1394 }
1395 !c1.key_valid() && !c2.key_valid()
1396}
1397
1398fn serialize_wset<B, K, R>(batch: &B) -> Vec<u8>
1399where
1400 B: BatchReader<Key = K, Val = DynUnit, Time = (), R = R>,
1401 K: DataTrait + ?Sized,
1402 R: WeightTrait + ?Sized,
1403{
1404 SerializerInner::to_fbuf_with_thread_local(|s| {
1405 let mut offsets = Vec::with_capacity(2 * batch.len());
1406 let mut cursor = batch.cursor();
1407 while cursor.key_valid() {
1408 offsets.push(cursor.key().serialize(s)?);
1409 offsets.push(cursor.weight().serialize(s)?);
1410 cursor.step_key();
1411 }
1412 s.serialize_value(&offsets)
1413 })
1414 .into_vec()
1415}
1416
1417fn deserialize_wset<B, K, R>(factories: &B::Factories, data: &[u8]) -> B
1418where
1419 B: Batch<Key = K, Val = DynUnit, Time = (), R = R>,
1420 K: DataTrait + ?Sized,
1421 R: WeightTrait + ?Sized,
1422{
1423 let offsets = unsafe { archived_root::<Vec<usize>>(data) };
1424 assert!(offsets.len() % 2 == 0);
1425 let n = offsets.len() / 2;
1426 let mut builder = B::Builder::with_capacity(factories, n, n);
1427 let mut key = factories.key_factory().default_box();
1428 let mut diff = factories.weight_factory().default_box();
1429 for i in 0..n {
1430 unsafe { key.deserialize_from_bytes(data, offsets[i * 2] as usize) };
1431 unsafe { diff.deserialize_from_bytes(data, offsets[i * 2 + 1] as usize) };
1432 builder.push_val_diff(&(), &diff);
1433 builder.push_key(&key);
1434 }
1435 builder.done()
1436}
1437
1438const SEPARATOR: u64 = u64::MAX;
1440
1441#[cfg(debug_assertions)]
1442#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1443enum State {
1444 Key,
1445 Val,
1446 Diff,
1447}
1448
1449pub struct IndexedWSetSerializer {
1450 fbuf: FBuf,
1451 offsets: Vec<usize>,
1452 n_keys: usize,
1453 n_values: usize,
1454 #[cfg(debug_assertions)]
1455 state: State,
1456}
1457
1458impl IndexedWSetSerializer {
1459 pub fn with_capacity(estimated_keys: usize, estimated_values: usize) -> Self {
1460 let mut offsets = Vec::with_capacity(2 + 2 * estimated_keys + 2 * estimated_values);
1461 offsets.push(0);
1462 offsets.push(0);
1463 Self {
1464 fbuf: FBuf::new(),
1465 offsets,
1466 n_keys: 0,
1467 n_values: 0,
1468 #[cfg(debug_assertions)]
1469 state: State::Key,
1470 }
1471 }
1472
1473 pub fn push_diff<R: WeightTrait + ?Sized>(
1474 &mut self,
1475 weight: &R,
1476 serializer_inner: &mut SerializerInner,
1477 ) {
1478 #[cfg(debug_assertions)]
1479 {
1480 debug_assert_ne!(self.state, State::Diff);
1481 self.state = State::Diff;
1482 }
1483
1484 serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
1485 self.offsets.push(weight.serialize(s).unwrap())
1486 });
1487 }
1488
1489 pub fn push_val<V: DataTrait + ?Sized>(
1490 &mut self,
1491 val: &V,
1492 serializer_inner: &mut SerializerInner,
1493 ) {
1494 #[cfg(debug_assertions)]
1495 {
1496 debug_assert_eq!(self.state, State::Diff);
1497 self.state = State::Val;
1498 }
1499
1500 self.n_values += 1;
1501 serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
1502 self.offsets.push(val.serialize(s).unwrap())
1503 });
1504 }
1505
1506 pub fn push_key<K: DataTrait + ?Sized>(
1507 &mut self,
1508 key: &K,
1509 serializer_inner: &mut SerializerInner,
1510 ) {
1511 #[cfg(debug_assertions)]
1512 {
1513 debug_assert_eq!(self.state, State::Val);
1514 self.state = State::Key;
1515 }
1516
1517 self.offsets.push(SEPARATOR as usize);
1518 self.n_keys += 1;
1519 serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
1520 self.offsets.push(key.serialize(s).unwrap())
1521 });
1522 }
1523
1524 pub fn done(mut self, serializer_inner: &mut SerializerInner) -> FBuf {
1525 #[cfg(debug_assertions)]
1526 debug_assert_eq!(self.state, State::Key);
1527 self.offsets[0] = self.n_keys;
1528 self.offsets[1] = self.n_values;
1529 serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
1530 s.serialize_value(&self.offsets).unwrap()
1531 });
1532 self.fbuf
1533 }
1534}
1535
1536pub fn serialize_indexed_wset<B, K, V, R>(batch: &B, serializer_inner: &mut SerializerInner) -> FBuf
1537where
1538 B: BatchReader<Key = K, Val = V, Time = (), R = R>,
1539 K: DataTrait + ?Sized,
1540 V: DataTrait + ?Sized,
1541 R: WeightTrait + ?Sized,
1542{
1543 let mut serializer = IndexedWSetSerializer::with_capacity(batch.key_count(), batch.len());
1544 let mut cursor = batch.cursor();
1545
1546 while cursor.key_valid() {
1547 while cursor.val_valid() {
1548 serializer.push_diff(cursor.weight(), serializer_inner);
1549 serializer.push_val(cursor.val(), serializer_inner);
1550 cursor.step_val();
1551 }
1552 serializer.push_key(cursor.key(), serializer_inner);
1553 cursor.step_key();
1554 }
1555 serializer.done(serializer_inner)
1556}
1557
1558pub fn deserialize_indexed_wset<B, K, V, R>(factories: &B::Factories, data: &[u8]) -> B
1559where
1560 B: Batch<Key = K, Val = V, Time = (), R = R>,
1561 K: DataTrait + ?Sized,
1562 V: DataTrait + ?Sized,
1563 R: WeightTrait + ?Sized,
1564{
1565 let offsets = unsafe { archived_root::<Vec<usize>>(data) };
1566 let n_keys = offsets[0] as usize;
1567 let n_values = offsets[1] as usize;
1568
1569 let mut builder = B::Builder::with_capacity(factories, n_keys, n_values);
1570 let mut key = factories.key_factory().default_box();
1571 let mut val = factories.val_factory().default_box();
1572 let mut diff = factories.weight_factory().default_box();
1573
1574 let mut current_offset = 2;
1575
1576 while current_offset < offsets.len() {
1577 while offsets[current_offset] != SEPARATOR {
1578 unsafe { diff.deserialize_from_bytes(data, offsets[current_offset] as usize) };
1579 current_offset += 1;
1580 unsafe { val.deserialize_from_bytes(data, offsets[current_offset] as usize) };
1581 current_offset += 1;
1582
1583 builder.push_val_diff(&val, &diff);
1584 }
1585 current_offset += 1;
1586
1587 unsafe { key.deserialize_from_bytes(data, offsets[current_offset] as usize) };
1588 current_offset += 1;
1589
1590 builder.push_key(&key);
1591 }
1592 builder.done()
1593}
1594
1595#[cfg(test)]
1596mod serialize_test {
1597 use crate::{
1598 DynZWeight, OrdIndexedZSet,
1599 algebra::OrdIndexedZSet as DynOrdIndexedZSet,
1600 dynamic::DynData,
1601 indexed_zset,
1602 storage::file::SerializerInner,
1603 trace::{BatchReader, deserialize_indexed_wset, serialize_indexed_wset},
1604 };
1605
1606 #[test]
1607 fn test_serialize_indexed_wset() {
1608 let test1: OrdIndexedZSet<u64, u64> = indexed_zset! {};
1609 let test2 = indexed_zset! { 1 => { 1 => 1 } };
1610 let test3 =
1611 indexed_zset! { 1 => { 1 => 1, 2 => 2, 3 => 3 }, 2 => { 1 => 1, 2 => 2, 3 => 3 } };
1612
1613 for test in [test1, test2, test3] {
1614 let serialized = serialize_indexed_wset(&*test, &mut SerializerInner::new());
1615 let deserialized = deserialize_indexed_wset::<
1616 DynOrdIndexedZSet<DynData, DynData>,
1617 DynData,
1618 DynData,
1619 DynZWeight,
1620 >(&test.factories(), &serialized);
1621
1622 assert_eq!(&*test, &deserialized);
1623 }
1624 }
1625
1626 #[test]
1627 fn test_serialize_indexed_wset_tup0_key() {
1628 let test1: OrdIndexedZSet<(), u64> = indexed_zset! {};
1629 let test2 = indexed_zset! { () => { 1 => 1 } };
1630
1631 for test in [test1, test2] {
1632 let serialized = serialize_indexed_wset(&*test, &mut SerializerInner::new());
1633 let deserialized = deserialize_indexed_wset::<
1634 DynOrdIndexedZSet<DynData, DynData>,
1635 DynData,
1636 DynData,
1637 DynZWeight,
1638 >(&test.factories(), &serialized);
1639
1640 assert_eq!(&*test, &deserialized);
1641 }
1642 }
1643
1644 #[test]
1645 fn test_serialize_indexed_wset_tup0_val() {
1646 let test1: OrdIndexedZSet<u64, ()> = indexed_zset! {};
1647 let test2 = indexed_zset! { 1 => { () => 1 } };
1648 let test3 = indexed_zset! { 1 => { () => 1 }, 2 => { () => 1 } };
1649
1650 for test in [test1, test2, test3] {
1651 let serialized = serialize_indexed_wset(&*test, &mut SerializerInner::new());
1652 let deserialized = deserialize_indexed_wset::<
1653 DynOrdIndexedZSet<DynData, DynData>,
1654 DynData,
1655 DynData,
1656 DynZWeight,
1657 >(&test.factories(), &serialized);
1658
1659 assert_eq!(&*test, &deserialized);
1660 }
1661 }
1662}