1use crate::circuit::metadata::OperatorMeta;
30use crate::dynamic::{ClonableTrait, DynDataTyped, DynUnit, Weight};
31use crate::storage::buffer_cache::CacheStats;
32pub use crate::storage::file::{Deserializable, Deserializer, Rkyv, Serializer};
33use crate::trace::cursor::{
34 DefaultPushCursor, FilteredMergeCursor, FilteredMergeCursorWithSnapshot, PushCursor,
35 UnfilteredMergeCursor,
36};
37use crate::utils::IsNone;
38use crate::{dynamic::ArchivedDBData, storage::buffer_cache::FBuf};
39use cursor::CursorFactory;
40use enum_map::Enum;
41use feldera_storage::{FileCommitter, FileReader, StoragePath};
42use rand::{Rng, thread_rng};
43use rkyv::ser::Serializer as _;
44use size_of::SizeOf;
45use std::any::TypeId;
46use std::sync::Arc;
47use std::{fmt::Debug, hash::Hash};
48
49pub mod cursor;
50pub mod filter;
51pub mod layers;
52pub mod ord;
53pub mod spine_async;
54pub use spine_async::{
55 BatchReaderWithSnapshot, ListMerger, MergerType, Spine, SpineSnapshot, WithSnapshot,
56};
57
58#[cfg(test)]
59pub mod test;
60
61pub use ord::{
62 FallbackIndexedWSet, FallbackIndexedWSetBuilder, FallbackIndexedWSetFactories,
63 FallbackKeyBatch, FallbackKeyBatchFactories, FallbackValBatch, FallbackValBatchFactories,
64 FallbackWSet, FallbackWSetBuilder, FallbackWSetFactories, FileIndexedWSet,
65 FileIndexedWSetFactories, FileKeyBatch, FileKeyBatchFactories, FileValBatch,
66 FileValBatchFactories, FileWSet, FileWSetFactories, OrdIndexedWSet, OrdIndexedWSetBuilder,
67 OrdIndexedWSetFactories, OrdKeyBatch, OrdKeyBatchFactories, OrdValBatch, OrdValBatchFactories,
68 OrdWSet, OrdWSetBuilder, OrdWSetFactories, VecIndexedWSet, VecIndexedWSetFactories,
69 VecKeyBatch, VecKeyBatchFactories, VecValBatch, VecValBatchFactories, VecWSet,
70 VecWSetFactories,
71};
72
73use rkyv::{Deserialize, archived_root};
74
75use crate::storage::tracking_bloom_filter::BloomFilterStats;
76use crate::{
77 Error, NumEntries, Timestamp,
78 algebra::MonoidValue,
79 dynamic::{DataTrait, DynPair, DynVec, DynWeightedPairs, Erase, Factory, WeightTrait},
80 storage::file::reader::Error as ReaderError,
81};
82pub use cursor::{Cursor, MergeCursor};
83pub use filter::{Filter, GroupFilter};
84pub use layers::Trie;
85
86pub trait DBData:
94 Default
95 + Clone
96 + Eq
97 + Ord
98 + Hash
99 + SizeOf
100 + Send
101 + Sync
102 + Debug
103 + ArchivedDBData
104 + IsNone<Inner: ArchivedDBData>
105 + 'static
106{
107}
108
109impl<T> DBData for T where
111 T: Default
112 + Clone
113 + Eq
114 + Ord
115 + Hash
116 + SizeOf
117 + Send
118 + Sync
119 + Debug
120 + ArchivedDBData
121 + IsNone<Inner: ArchivedDBData>
122 + 'static
123{
124}
125
126#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive)]
128pub(crate) struct CommittedSpine {
129 pub batches: Vec<String>,
130 pub merged: Vec<(String, String)>,
131 pub effort: u64,
132 pub dirty: bool,
133}
134
135pub fn unaligned_deserialize<T: Deserializable>(bytes: &[u8]) -> T {
142 let mut aligned_bytes = FBuf::new();
143 aligned_bytes.extend_from_slice(bytes);
144 unsafe { archived_root::<T>(&aligned_bytes[..]) }
145 .deserialize(&mut Deserializer::default())
146 .unwrap()
147}
148
149pub trait DBWeight: DBData + MonoidValue {}
168impl<T> DBWeight for T where T: DBData + MonoidValue {}
169
170pub trait BatchReaderFactories<
171 K: DataTrait + ?Sized,
172 V: DataTrait + ?Sized,
173 T,
174 R: WeightTrait + ?Sized,
175>: Clone + Send + Sync
176{
177 fn new<KType, VType, RType>() -> Self
179 where
180 KType: DBData + Erase<K>,
181 VType: DBData + Erase<V>,
182 RType: DBWeight + Erase<R>;
183
184 fn key_factory(&self) -> &'static dyn Factory<K>;
185 fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>>;
186 fn val_factory(&self) -> &'static dyn Factory<V>;
187 fn weight_factory(&self) -> &'static dyn Factory<R>;
188}
189
190pub type WeightedItem<K, V, R> = DynPair<DynPair<K, V>, R>;
192
193pub trait BatchFactories<K: DataTrait + ?Sized, V: DataTrait + ?Sized, T, R: WeightTrait + ?Sized>:
194 BatchReaderFactories<K, V, T, R>
195{
196 fn item_factory(&self) -> &'static dyn Factory<DynPair<K, V>>;
197
198 fn weighted_items_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, V>, R>>;
199 fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<V, R>>;
200 fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, V, R>>;
201
202 fn time_diffs_factory(
204 &self,
205 ) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>>;
206}
207
208pub trait Trace: BatchReader {
216 type Batch: Batch<
218 Key = Self::Key,
219 Val = Self::Val,
220 Time = Self::Time,
221 R = Self::R,
222 Factories = Self::Factories,
223 >;
224
225 fn new(factories: &Self::Factories) -> Self;
227
228 fn set_frontier(&mut self, frontier: &Self::Time);
237
238 fn exert(&mut self, effort: &mut isize);
240
241 fn consolidate(self) -> Option<Self::Batch>;
243
244 fn insert(&mut self, batch: Self::Batch);
246
247 fn insert_arc(&mut self, batch: Arc<Self::Batch>);
250
251 fn clear_dirty_flag(&mut self);
259
260 fn dirty(&self) -> bool;
262
263 fn retain_keys(&mut self, filter: Filter<Self::Key>);
280
281 fn retain_values(&mut self, filter: GroupFilter<Self::Val>);
289
290 fn key_filter(&self) -> &Option<Filter<Self::Key>>;
291 fn value_filter(&self) -> &Option<GroupFilter<Self::Val>>;
292
293 fn save(
297 &mut self,
298 base: &StoragePath,
299 pid: &str,
300 files: &mut Vec<Arc<dyn FileCommitter>>,
301 ) -> Result<(), Error>;
302
303 fn restore(&mut self, base: &StoragePath, pid: &str) -> Result<(), Error>;
306
307 fn metadata(&self, _meta: &mut OperatorMeta) {}
309}
310
311#[derive(Copy, Clone, Debug, PartialEq, Eq, Enum)]
313pub enum BatchLocation {
314 Memory,
316
317 Storage,
319}
320
321pub trait BatchReader: Debug + NumEntries + Rkyv + SizeOf + 'static
346where
347 Self: Sized,
348{
349 type Factories: BatchFactories<Self::Key, Self::Val, Self::Time, Self::R>;
350
351 type Key: DataTrait + ?Sized;
353
354 type Val: DataTrait + ?Sized;
356
357 type Time: Timestamp;
359
360 type R: WeightTrait + ?Sized;
362
363 type Cursor<'s>: Cursor<Self::Key, Self::Val, Self::Time, Self::R> + Clone + Send
365 where
366 Self: 's;
367
368 fn factories(&self) -> Self::Factories;
371
372 fn cursor(&self) -> Self::Cursor<'_>;
374
375 fn push_cursor(
377 &self,
378 ) -> Box<dyn PushCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
379 Box::new(DefaultPushCursor::new(self.cursor()))
380 }
381
382 fn merge_cursor(
384 &self,
385 key_filter: Option<Filter<Self::Key>>,
386 value_filter: Option<GroupFilter<Self::Val>>,
387 ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
388 if key_filter.is_none() && value_filter.is_none() {
389 Box::new(UnfilteredMergeCursor::new(self.cursor()))
390 } else if let Some(GroupFilter::Simple(filter)) = value_filter {
391 Box::new(FilteredMergeCursor::new(
392 self.cursor(),
393 key_filter,
394 Some(filter),
395 ))
396 } else {
397 Box::new(FilteredMergeCursor::new(self.cursor(), key_filter, None))
400 }
401 }
402
403 fn merge_cursor_with_snapshot<'a, S>(
406 &'a self,
407 key_filter: Option<Filter<Self::Key>>,
408 value_filter: Option<GroupFilter<Self::Val>>,
409 snapshot: &'a Option<Arc<S>>,
410 ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + 'a>
411 where
412 S: BatchReader<Key = Self::Key, Val = Self::Val, Time = Self::Time, R = Self::R>,
413 {
414 let Some(snapshot) = snapshot else {
415 return self.merge_cursor(key_filter, value_filter);
416 };
417 if key_filter.is_none() && value_filter.is_none() {
418 Box::new(UnfilteredMergeCursor::new(self.cursor()))
419 } else if value_filter.is_none() {
420 Box::new(FilteredMergeCursor::new(self.cursor(), key_filter, None))
421 } else if let Some(GroupFilter::Simple(filter)) = value_filter {
422 Box::new(FilteredMergeCursor::new(
423 self.cursor(),
424 key_filter,
425 Some(filter),
426 ))
427 } else {
428 Box::new(FilteredMergeCursorWithSnapshot::new(
429 self.cursor(),
430 key_filter,
431 value_filter.unwrap(),
432 snapshot,
433 ))
434 }
435 }
436
437 fn consuming_cursor(
439 &mut self,
440 key_filter: Option<Filter<Self::Key>>,
441 value_filter: Option<GroupFilter<Self::Val>>,
442 ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
443 self.merge_cursor(key_filter, value_filter)
444 }
445 fn key_count(&self) -> usize;
453
454 fn len(&self) -> usize;
456
457 fn approximate_byte_size(&self) -> usize;
469
470 fn filter_stats(&self) -> BloomFilterStats;
478
479 fn location(&self) -> BatchLocation {
481 BatchLocation::Memory
482 }
483
484 fn cache_stats(&self) -> CacheStats {
488 CacheStats::default()
489 }
490
491 fn is_empty(&self) -> bool {
493 self.len() == 0
494 }
495
496 fn maybe_contains_key(&self, _hash: u64) -> bool {
499 true
500 }
501
502 fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
534 where
535 Self::Time: PartialEq<()>,
536 RG: Rng;
537
538 fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<Self::Key>)
549 where
550 Self::Time: PartialEq<()>,
551 {
552 bounds.clear();
553 if num_partitions <= 1 {
554 return;
555 }
556
557 let sample_size = num_partitions * num_partitions;
558
559 let mut sample = self.factories().keys_factory().default_box();
560 self.sample_keys(&mut thread_rng(), sample_size, sample.as_mut());
561
562 let sample_len = sample.len();
564 if sample_len == 0 {
565 return;
566 }
567
568 if sample_len >= num_partitions {
569 for i in 0..num_partitions - 1 {
572 let idx = ((i + 1) * sample_len) / num_partitions;
573 let idx = idx.min(sample_len - 1);
574 bounds.push_ref(sample.index(idx));
575 }
576 } else {
577 for i in 0..sample_len {
579 bounds.push_ref(sample.index(i));
580 }
581 }
582 }
583
584 #[allow(async_fn_in_trait)]
610 async fn fetch<B>(
611 &self,
612 keys: &B,
613 ) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
614 where
615 B: BatchReader<Key = Self::Key, Time = ()>,
616 {
617 let _ = keys;
618 None
619 }
620
621 fn keys(&self) -> Option<&DynVec<Self::Key>> {
622 None
623 }
624}
625
626impl<B> BatchReader for Arc<B>
627where
628 B: BatchReader,
629{
630 type Factories = B::Factories;
631 type Key = B::Key;
632 type Val = B::Val;
633 type Time = B::Time;
634 type R = B::R;
635 type Cursor<'s> = B::Cursor<'s>;
636 fn factories(&self) -> Self::Factories {
637 (**self).factories()
638 }
639 fn cursor(&self) -> Self::Cursor<'_> {
640 (**self).cursor()
641 }
642 fn merge_cursor(
643 &self,
644 key_filter: Option<Filter<Self::Key>>,
645 value_filter: Option<GroupFilter<Self::Val>>,
646 ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
647 (**self).merge_cursor(key_filter, value_filter)
648 }
649 fn key_count(&self) -> usize {
650 (**self).key_count()
651 }
652 fn len(&self) -> usize {
653 (**self).len()
654 }
655 fn approximate_byte_size(&self) -> usize {
656 (**self).approximate_byte_size()
657 }
658 fn filter_stats(&self) -> BloomFilterStats {
659 (**self).filter_stats()
660 }
661 fn location(&self) -> BatchLocation {
662 (**self).location()
663 }
664 fn cache_stats(&self) -> CacheStats {
665 (**self).cache_stats()
666 }
667 fn is_empty(&self) -> bool {
668 (**self).is_empty()
669 }
670 fn maybe_contains_key(&self, hash: u64) -> bool {
671 (**self).maybe_contains_key(hash)
672 }
673 fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
674 where
675 Self::Time: PartialEq<()>,
676 RG: Rng,
677 {
678 (**self).sample_keys(rng, sample_size, sample)
679 }
680 fn consuming_cursor(
681 &mut self,
682 key_filter: Option<Filter<Self::Key>>,
683 value_filter: Option<GroupFilter<Self::Val>>,
684 ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
685 (**self).merge_cursor(key_filter, value_filter)
686 }
687 async fn fetch<KB>(
688 &self,
689 keys: &KB,
690 ) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
691 where
692 KB: BatchReader<Key = Self::Key, Time = ()>,
693 {
694 (**self).fetch(keys).await
695 }
696 fn keys(&self) -> Option<&DynVec<Self::Key>> {
697 (**self).keys()
698 }
699}
700
701pub trait Batch: BatchReader + Clone + Send + Sync
711where
712 Self: Sized,
713{
714 type Timed<T: Timestamp>: Batch<
716 Key = <Self as BatchReader>::Key,
717 Val = <Self as BatchReader>::Val,
718 Time = T,
719 R = <Self as BatchReader>::R,
720 >;
721
722 type Batcher: Batcher<Self>;
724
725 type Builder: Builder<Self>;
727
728 #[allow(clippy::type_complexity)]
730 fn dyn_from_tuples(
731 factories: &Self::Factories,
732 time: Self::Time,
733 tuples: &mut Box<DynWeightedPairs<DynPair<Self::Key, Self::Val>, Self::R>>,
734 ) -> Self {
735 let mut batcher = Self::Batcher::new_batcher(factories, time);
736 batcher.push_batch(tuples);
737 batcher.seal()
738 }
739
740 fn from_batch<BI>(batch: &BI, timestamp: &Self::Time, factories: &Self::Factories) -> Self
753 where
754 BI: BatchReader<Key = Self::Key, Val = Self::Val, Time = (), R = Self::R>,
755 {
756 if TypeId::of::<BI>() == TypeId::of::<Self>() {
760 unsafe { std::mem::transmute::<&BI, &Self>(batch).clone() }
761 } else {
762 Self::from_cursor(
763 batch.cursor(),
764 timestamp,
765 factories,
766 batch.key_count(),
767 batch.len(),
768 )
769 }
770 }
771
772 fn from_arc_batch<BI>(
774 batch: &Arc<BI>,
775 timestamp: &Self::Time,
776 factories: &Self::Factories,
777 ) -> Arc<Self>
778 where
779 BI: BatchReader<Key = Self::Key, Val = Self::Val, Time = (), R = Self::R>,
780 {
781 if TypeId::of::<BI>() == TypeId::of::<Self>() {
785 unsafe { std::mem::transmute::<&Arc<BI>, &Arc<Self>>(batch).clone() }
786 } else {
787 Arc::new(Self::from_cursor(
788 batch.cursor(),
789 timestamp,
790 factories,
791 batch.key_count(),
792 batch.len(),
793 ))
794 }
795 }
796
797 fn from_cursor<C>(
800 mut cursor: C,
801 timestamp: &Self::Time,
802 factories: &Self::Factories,
803 key_capacity: usize,
804 value_capacity: usize,
805 ) -> Self
806 where
807 C: Cursor<Self::Key, Self::Val, (), Self::R>,
808 {
809 let mut builder = Self::Builder::with_capacity(factories, key_capacity, value_capacity);
810 while cursor.key_valid() {
811 let mut any_values = false;
812 while cursor.val_valid() {
813 let weight = cursor.weight();
814 debug_assert!(!weight.is_zero());
815 builder.push_time_diff(timestamp, weight);
816 builder.push_val(cursor.val());
817 any_values = true;
818 cursor.step_val();
819 }
820 if any_values {
821 builder.push_key(cursor.key());
822 }
823 cursor.step_key();
824 }
825 builder.done()
826 }
827
828 fn dyn_empty(factories: &Self::Factories) -> Self {
830 Self::Builder::new_builder(factories).done()
831 }
832
833 fn filter(&self, predicate: &dyn Fn(&Self::Key, &Self::Val) -> bool) -> Self
835 where
836 Self::Time: PartialEq<()> + From<()>,
837 {
838 let factories = self.factories();
839 let mut builder = Self::Builder::new_builder(&factories);
840 let mut cursor = self.cursor();
841
842 while cursor.key_valid() {
843 let mut any_values = false;
844 while cursor.val_valid() {
845 if predicate(cursor.key(), cursor.val()) {
846 builder.push_diff(cursor.weight());
847 builder.push_val(cursor.val());
848 any_values = true;
849 }
850 cursor.step_val();
851 }
852 if any_values {
853 builder.push_key(cursor.key());
854 }
855 cursor.step_key();
856 }
857
858 builder.done()
859 }
860
861 fn persisted(&self) -> Option<Self> {
864 None
865 }
866
867 fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
872 None
873 }
874
875 fn from_path(_factories: &Self::Factories, _path: &StoragePath) -> Result<Self, ReaderError> {
876 Err(ReaderError::Unsupported)
877 }
878}
879
880pub trait Batcher<Output>: SizeOf
882where
883 Output: Batch,
884{
885 fn new_batcher(vtables: &Output::Factories, time: Output::Time) -> Self;
888
889 fn push_batch(
891 &mut self,
892 batch: &mut Box<DynWeightedPairs<DynPair<Output::Key, Output::Val>, Output::R>>,
893 );
894
895 fn push_consolidated_batch(
900 &mut self,
901 batch: &mut Box<DynWeightedPairs<DynPair<Output::Key, Output::Val>, Output::R>>,
902 );
903
904 fn tuples(&self) -> usize;
906
907 fn seal(self) -> Output;
909}
910
911pub trait Builder<Output>: SizeOf
960where
961 Self: Sized,
962 Output: Batch,
963{
964 fn new_builder(factories: &Output::Factories) -> Self {
966 Self::with_capacity(factories, 0, 0)
967 }
968
969 fn with_capacity(
973 factories: &Output::Factories,
974 key_capacity: usize,
975 value_capacity: usize,
976 ) -> Self;
977
978 fn for_merge<'a, B, I>(
982 factories: &Output::Factories,
983 batches: I,
984 location: Option<BatchLocation>,
985 ) -> Self
986 where
987 B: BatchReader,
988 I: IntoIterator<Item = &'a B> + Clone,
989 {
990 let _ = location;
991 let key_capacity = batches.clone().into_iter().map(|b| b.key_count()).sum();
992 let value_capacity = batches.into_iter().map(|b| b.len()).sum();
993 Self::with_capacity(factories, key_capacity, value_capacity)
994 }
995
996 fn push_time_diff(&mut self, time: &Output::Time, weight: &Output::R);
998
999 fn push_time_diff_mut(&mut self, time: &mut Output::Time, weight: &mut Output::R) {
1001 self.push_time_diff(time, weight);
1002 }
1003
1004 fn push_val(&mut self, val: &Output::Val);
1006
1007 fn push_val_mut(&mut self, val: &mut Output::Val) {
1009 self.push_val(val);
1010 }
1011
1012 fn push_key(&mut self, key: &Output::Key);
1014
1015 fn push_key_mut(&mut self, key: &mut Output::Key) {
1017 self.push_key(key);
1018 }
1019
1020 fn push_diff(&mut self, weight: &Output::R)
1022 where
1023 Output::Time: PartialEq<()>,
1024 {
1025 self.push_time_diff(&Output::Time::default(), weight);
1026 }
1027
1028 fn push_diff_mut(&mut self, weight: &mut Output::R)
1030 where
1031 Output::Time: PartialEq<()>,
1032 {
1033 self.push_diff(weight);
1034 }
1035
1036 fn push_val_diff(&mut self, val: &Output::Val, weight: &Output::R)
1038 where
1039 Output::Time: PartialEq<()>,
1040 {
1041 self.push_time_diff(&Output::Time::default(), weight);
1042 self.push_val(val);
1043 }
1044
1045 fn push_val_diff_mut(&mut self, val: &mut Output::Val, weight: &mut Output::R)
1047 where
1048 Output::Time: PartialEq<()>,
1049 {
1050 self.push_val_diff(val, weight);
1051 }
1052
1053 fn reserve(&mut self, additional: usize) {
1055 let _ = additional;
1056 }
1057
1058 fn num_keys(&self) -> usize;
1059 fn num_tuples(&self) -> usize;
1060
1061 fn done(self) -> Output;
1063}
1064
1065pub struct TupleBuilder<B, Output>
1069where
1070 B: Builder<Output>,
1071 Output: Batch,
1072{
1073 builder: B,
1074 kv: Box<DynPair<Output::Key, Output::Val>>,
1075 has_kv: bool,
1076}
1077
1078impl<B, Output> TupleBuilder<B, Output>
1079where
1080 B: Builder<Output>,
1081 Output: Batch,
1082{
1083 pub fn new(factories: &Output::Factories, builder: B) -> Self {
1084 Self {
1085 builder,
1086 kv: factories.item_factory().default_box(),
1087 has_kv: false,
1088 }
1089 }
1090
1091 pub fn num_keys(&self) -> usize {
1092 self.builder.num_keys()
1093 }
1094
1095 pub fn num_tuples(&self) -> usize {
1096 self.builder.num_tuples()
1097 }
1098
1099 pub fn push(&mut self, element: &mut DynPair<DynPair<Output::Key, Output::Val>, Output::R>)
1101 where
1102 Output::Time: PartialEq<()>,
1103 {
1104 let (kv, w) = element.split_mut();
1105 let (k, v) = kv.split_mut();
1106 self.push_vals(k, v, &mut Output::Time::default(), w);
1107 }
1108
1109 pub fn push_refs(
1111 &mut self,
1112 key: &Output::Key,
1113 val: &Output::Val,
1114 time: &Output::Time,
1115 weight: &Output::R,
1116 ) {
1117 if self.has_kv {
1118 let (k, v) = self.kv.split_mut();
1119 if k != key {
1120 self.builder.push_val_mut(v);
1121 self.builder.push_key_mut(k);
1122 self.kv.from_refs(key, val);
1123 } else if v != val {
1124 self.builder.push_val_mut(v);
1125 val.clone_to(v);
1126 }
1127 } else {
1128 self.has_kv = true;
1129 self.kv.from_refs(key, val);
1130 }
1131 self.builder.push_time_diff(time, weight);
1132 }
1133
1134 pub fn push_vals(
1136 &mut self,
1137 key: &mut Output::Key,
1138 val: &mut Output::Val,
1139 time: &mut Output::Time,
1140 weight: &mut Output::R,
1141 ) {
1142 if self.has_kv {
1143 let (k, v) = self.kv.split_mut();
1144 if k != key {
1145 self.builder.push_val_mut(v);
1146 self.builder.push_key_mut(k);
1147 self.kv.from_vals(key, val);
1148 } else if v != val {
1149 self.builder.push_val_mut(v);
1150 val.move_to(v);
1151 }
1152 } else {
1153 self.has_kv = true;
1154 self.kv.from_vals(key, val);
1155 }
1156 self.builder.push_time_diff_mut(time, weight);
1157 }
1158
1159 pub fn reserve(&mut self, additional: usize) {
1160 self.builder.reserve(additional)
1161 }
1162
1163 pub fn extend<'a, I>(&mut self, iter: I)
1165 where
1166 Output::Time: PartialEq<()>,
1167 I: Iterator<Item = &'a mut WeightedItem<Output::Key, Output::Val, Output::R>>,
1168 {
1169 let (lower, upper) = iter.size_hint();
1170 self.reserve(upper.unwrap_or(lower));
1171
1172 for item in iter {
1173 let (kv, w) = item.split_mut();
1174 let (k, v) = kv.split_mut();
1175
1176 self.push_vals(k, v, &mut Output::Time::default(), w);
1177 }
1178 }
1179
1180 pub fn done(mut self) -> Output {
1182 if self.has_kv {
1183 let (k, v) = self.kv.split_mut();
1184 self.builder.push_val_mut(v);
1185 self.builder.push_key_mut(k);
1186 }
1187 self.builder.done()
1188 }
1189}
1190
1191pub fn merge_batches<B, T>(
1199 factories: &B::Factories,
1200 batches: T,
1201 key_filter: &Option<Filter<B::Key>>,
1202 value_filter: &Option<GroupFilter<B::Val>>,
1203) -> B
1204where
1205 T: IntoIterator<Item = B>,
1206 B: Batch,
1207{
1208 let mut batches = batches
1210 .into_iter()
1211 .filter(|b| !b.is_empty())
1212 .collect::<Vec<_>>();
1213
1214 while batches.len() > 1 {
1219 let mut inputs = batches.split_off(batches.len().saturating_sub(64));
1220 let result: B = ListMerger::merge(
1221 factories,
1222 B::Builder::for_merge(factories, &inputs, Some(BatchLocation::Memory)),
1223 inputs
1224 .iter_mut()
1225 .map(|b| b.consuming_cursor(key_filter.clone(), value_filter.clone()))
1226 .collect(),
1227 );
1228 if !result.is_empty() {
1229 batches.push(result);
1230 }
1231 }
1232
1233 batches.pop().unwrap_or_else(|| B::dyn_empty(factories))
1236}
1237
1238pub fn merge_batches_by_reference<'a, B, T>(
1243 factories: &B::Factories,
1244 batches: T,
1245 key_filter: &Option<Filter<B::Key>>,
1246 value_filter: &Option<GroupFilter<B::Val>>,
1247) -> B
1248where
1249 T: IntoIterator<Item = &'a B>,
1250 B: Batch,
1251{
1252 let mut batches = batches
1254 .into_iter()
1255 .filter(|b| !b.is_empty())
1256 .collect::<Vec<_>>();
1257
1258 let mut outputs = Vec::with_capacity(batches.len().div_ceil(64));
1264 while !batches.is_empty() {
1265 let inputs = batches.split_off(batches.len().saturating_sub(64));
1266 let result: B = ListMerger::merge(
1267 factories,
1268 B::Builder::for_merge(
1269 factories,
1270 inputs.iter().cloned(),
1271 Some(BatchLocation::Memory),
1272 ),
1273 inputs
1274 .into_iter()
1275 .map(|b| b.merge_cursor(key_filter.clone(), value_filter.clone()))
1276 .collect(),
1277 );
1278 if !result.is_empty() {
1279 outputs.push(result);
1280 }
1281 }
1282
1283 merge_batches(factories, outputs, key_filter, value_filter)
1285}
1286
1287pub fn eq_batch<A, B, KA, VA, RA, KB, VB, RB>(a: &A, b: &B) -> bool
1295where
1296 A: BatchReader<Key = KA, Val = VA, Time = (), R = RA>,
1297 B: BatchReader<Key = KB, Val = VB, Time = (), R = RB>,
1298 KA: PartialEq<KB> + ?Sized,
1299 VA: PartialEq<VB> + ?Sized,
1300 RA: PartialEq<RB> + ?Sized,
1301 KB: ?Sized,
1302 VB: ?Sized,
1303 RB: ?Sized,
1304{
1305 let mut c1 = a.cursor();
1306 let mut c2 = b.cursor();
1307 while c1.key_valid() && c2.key_valid() {
1308 if c1.key() != c2.key() {
1309 return false;
1310 }
1311 while c1.val_valid() && c2.val_valid() {
1312 if c1.val() != c2.val() || c1.weight() != c2.weight() {
1313 return false;
1314 }
1315 c1.step_val();
1316 c2.step_val();
1317 }
1318 if c1.val_valid() || c2.val_valid() {
1319 return false;
1320 }
1321 c1.step_key();
1322 c2.step_key();
1323 }
1324 !c1.key_valid() && !c2.key_valid()
1325}
1326
1327fn serialize_wset<B, K, R>(batch: &B) -> Vec<u8>
1328where
1329 B: BatchReader<Key = K, Val = DynUnit, Time = (), R = R>,
1330 K: DataTrait + ?Sized,
1331 R: WeightTrait + ?Sized,
1332{
1333 let mut s = Serializer::default();
1334 let mut offsets = Vec::with_capacity(2 * batch.len());
1335 let mut cursor = batch.cursor();
1336 while cursor.key_valid() {
1337 offsets.push(cursor.key().serialize(&mut s).unwrap());
1338 offsets.push(cursor.weight().serialize(&mut s).unwrap());
1339 cursor.step_key();
1340 }
1341 let _offset = s.serialize_value(&offsets).unwrap();
1342 s.into_serializer().into_inner().into_vec()
1343}
1344
1345fn deserialize_wset<B, K, R>(factories: &B::Factories, data: &[u8]) -> B
1346where
1347 B: Batch<Key = K, Val = DynUnit, Time = (), R = R>,
1348 K: DataTrait + ?Sized,
1349 R: WeightTrait + ?Sized,
1350{
1351 let offsets = unsafe { archived_root::<Vec<usize>>(data) };
1352 assert!(offsets.len() % 2 == 0);
1353 let n = offsets.len() / 2;
1354 let mut builder = B::Builder::with_capacity(factories, n, n);
1355 let mut key = factories.key_factory().default_box();
1356 let mut diff = factories.weight_factory().default_box();
1357 for i in 0..n {
1358 unsafe { key.deserialize_from_bytes(data, offsets[i * 2] as usize) };
1359 unsafe { diff.deserialize_from_bytes(data, offsets[i * 2 + 1] as usize) };
1360 builder.push_val_diff(&(), &diff);
1361 builder.push_key(&key);
1362 }
1363 builder.done()
1364}
1365
1366const SEPARATOR: u64 = u64::MAX;
1368
1369pub fn serialize_indexed_wset<B, K, V, R>(batch: &B) -> Vec<u8>
1370where
1371 B: BatchReader<Key = K, Val = V, Time = (), R = R>,
1372 K: DataTrait + ?Sized,
1373 V: DataTrait + ?Sized,
1374 R: WeightTrait + ?Sized,
1375{
1376 let mut s = Serializer::default();
1377 let mut offsets = Vec::with_capacity(2 * batch.len());
1378 let mut cursor = batch.cursor();
1379 offsets.push(batch.len());
1380
1381 while cursor.key_valid() {
1382 offsets.push(cursor.key().serialize(&mut s).unwrap());
1383 while cursor.val_valid() {
1384 offsets.push(cursor.val().serialize(&mut s).unwrap());
1385 offsets.push(cursor.weight().serialize(&mut s).unwrap());
1386
1387 cursor.step_val();
1388 }
1389 cursor.step_key();
1390 offsets.push(SEPARATOR as usize);
1391 }
1392 let _offset = s.serialize_value(&offsets).unwrap();
1393 s.into_serializer().into_inner().into_vec()
1394}
1395
1396pub fn deserialize_indexed_wset<B, K, V, R>(factories: &B::Factories, data: &[u8]) -> B
1397where
1398 B: Batch<Key = K, Val = V, Time = (), R = R>,
1399 K: DataTrait + ?Sized,
1400 V: DataTrait + ?Sized,
1401 R: WeightTrait + ?Sized,
1402{
1403 let offsets = unsafe { archived_root::<Vec<usize>>(data) };
1404 let len = offsets[0];
1405
1406 let mut builder = B::Builder::with_capacity(factories, len as usize, len as usize);
1407 let mut key = factories.key_factory().default_box();
1408 let mut val = factories.val_factory().default_box();
1409 let mut diff = factories.weight_factory().default_box();
1410
1411 let mut current_offset = 1;
1412
1413 while current_offset < offsets.len() {
1414 unsafe { key.deserialize_from_bytes(data, offsets[current_offset] as usize) };
1415 current_offset += 1;
1416 while offsets[current_offset] != SEPARATOR {
1417 unsafe { val.deserialize_from_bytes(data, offsets[current_offset] as usize) };
1418 current_offset += 1;
1419 unsafe { diff.deserialize_from_bytes(data, offsets[current_offset] as usize) };
1420
1421 builder.push_val_diff(&val, &diff);
1422 current_offset += 1;
1423 }
1424
1425 current_offset += 1;
1426 builder.push_key(&key);
1427 }
1428 builder.done()
1429}
1430
1431#[cfg(test)]
1432mod serialize_test {
1433 use crate::{
1434 DynZWeight, OrdIndexedZSet,
1435 algebra::OrdIndexedZSet as DynOrdIndexedZSet,
1436 dynamic::DynData,
1437 indexed_zset,
1438 trace::{BatchReader, deserialize_indexed_wset, serialize_indexed_wset},
1439 };
1440
1441 #[test]
1442 fn test_serialize_indexed_wset() {
1443 let test1: OrdIndexedZSet<u64, u64> = indexed_zset! {};
1444 let test2 = indexed_zset! { 1 => { 1 => 1 } };
1445 let test3 =
1446 indexed_zset! { 1 => { 1 => 1, 2 => 2, 3 => 3 }, 2 => { 1 => 1, 2 => 2, 3 => 3 } };
1447
1448 for test in [test1, test2, test3] {
1449 let serialized = serialize_indexed_wset(&*test);
1450 let deserialized = deserialize_indexed_wset::<
1451 DynOrdIndexedZSet<DynData, DynData>,
1452 DynData,
1453 DynData,
1454 DynZWeight,
1455 >(&test.factories(), &serialized);
1456
1457 assert_eq!(&*test, &deserialized);
1458 }
1459 }
1460
1461 #[test]
1462 fn test_serialize_indexed_wset_tup0_key() {
1463 let test1: OrdIndexedZSet<(), u64> = indexed_zset! {};
1464 let test2 = indexed_zset! { () => { 1 => 1 } };
1465
1466 for test in [test1, test2] {
1467 let serialized = serialize_indexed_wset(&*test);
1468 let deserialized = deserialize_indexed_wset::<
1469 DynOrdIndexedZSet<DynData, DynData>,
1470 DynData,
1471 DynData,
1472 DynZWeight,
1473 >(&test.factories(), &serialized);
1474
1475 assert_eq!(&*test, &deserialized);
1476 }
1477 }
1478
1479 #[test]
1480 fn test_serialize_indexed_wset_tup0_val() {
1481 let test1: OrdIndexedZSet<u64, ()> = indexed_zset! {};
1482 let test2 = indexed_zset! { 1 => { () => 1 } };
1483 let test3 = indexed_zset! { 1 => { () => 1 }, 2 => { () => 1 } };
1484
1485 for test in [test1, test2, test3] {
1486 let serialized = serialize_indexed_wset(&*test);
1487 let deserialized = deserialize_indexed_wset::<
1488 DynOrdIndexedZSet<DynData, DynData>,
1489 DynData,
1490 DynData,
1491 DynZWeight,
1492 >(&test.factories(), &serialized);
1493
1494 assert_eq!(&*test, &deserialized);
1495 }
1496 }
1497}