use crate::circuit::metadata::OperatorMeta;
use crate::dynamic::{ClonableTrait, DynDataTyped, DynUnit, Weight};
use crate::storage::buffer_cache::CacheStats;
use crate::storage::file::SerializerInner;
pub use crate::storage::file::{DbspSerializer, Deserializable, Deserializer, Rkyv};
use crate::trace::cursor::{
DefaultPushCursor, FilteredMergeCursor, FilteredMergeCursorWithSnapshot, PushCursor,
UnfilteredMergeCursor,
};
use crate::utils::IsNone;
use crate::{dynamic::ArchivedDBData, storage::buffer_cache::FBuf};
use cursor::CursorFactory;
use enum_map::Enum;
use feldera_storage::fbuf::FBufSerializer;
use feldera_storage::{FileCommitter, FileReader, StoragePath};
use rand::{Rng, thread_rng};
use rkyv::ser::Serializer as _;
use size_of::SizeOf;
use std::any::TypeId;
use std::sync::Arc;
use std::{fmt::Debug, hash::Hash};
pub mod cursor;
pub mod filter;
pub mod layers;
pub mod ord;
pub mod spine_async;
pub use spine_async::{BatchReaderWithSnapshot, ListMerger, Spine, SpineSnapshot, WithSnapshot};
#[cfg(test)]
pub mod test;
pub use ord::{
FallbackIndexedWSet, FallbackIndexedWSetBuilder, FallbackIndexedWSetFactories,
FallbackKeyBatch, FallbackKeyBatchFactories, FallbackValBatch, FallbackValBatchFactories,
FallbackWSet, FallbackWSetBuilder, FallbackWSetFactories, FileIndexedWSet,
FileIndexedWSetFactories, FileKeyBatch, FileKeyBatchFactories, FileValBatch,
FileValBatchFactories, FileWSet, FileWSetFactories, OrdIndexedWSet, OrdIndexedWSetBuilder,
OrdIndexedWSetFactories, OrdKeyBatch, OrdKeyBatchFactories, OrdValBatch, OrdValBatchFactories,
OrdWSet, OrdWSetBuilder, OrdWSetFactories, VecIndexedWSet, VecIndexedWSetFactories,
VecKeyBatch, VecKeyBatchFactories, VecValBatch, VecValBatchFactories, VecWSet,
VecWSetFactories,
};
use rkyv::{Deserialize, archived_root};
use crate::{
Error, NumEntries, Timestamp,
algebra::MonoidValue,
dynamic::{DataTrait, DynPair, DynVec, DynWeightedPairs, Erase, Factory, WeightTrait},
storage::file::reader::Error as ReaderError,
storage::filter_stats::FilterStats,
};
pub use cursor::{Cursor, MergeCursor};
pub use filter::{Filter, GroupFilter};
pub use layers::Trie;
pub trait DBData:
Default
+ Clone
+ Eq
+ Ord
+ Hash
+ SizeOf
+ Send
+ Sync
+ Debug
+ ArchivedDBData
+ IsNone<Inner: ArchivedDBData>
+ 'static
{
}
impl<T> DBData for T where
T: Default
+ Clone
+ Eq
+ Ord
+ Hash
+ SizeOf
+ Send
+ Sync
+ Debug
+ ArchivedDBData
+ IsNone<Inner: ArchivedDBData>
+ 'static
{
}
#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive)]
pub(crate) struct CommittedSpine {
pub batches: Vec<String>,
pub merged: Vec<(String, String)>,
pub effort: u64,
pub dirty: bool,
}
pub fn unaligned_deserialize<T: Deserializable>(bytes: &[u8]) -> T {
let mut aligned_bytes = FBuf::new();
aligned_bytes.extend_from_slice(bytes);
aligned_deserialize(&aligned_bytes)
}
pub fn aligned_deserialize<T: Deserializable>(bytes: &[u8]) -> T {
unsafe { archived_root::<T>(bytes) }
.deserialize(&mut Deserializer::default())
.unwrap()
}
pub trait DBWeight: DBData + MonoidValue {}
impl<T> DBWeight for T where T: DBData + MonoidValue {}
pub trait BatchReaderFactories<
K: DataTrait + ?Sized,
V: DataTrait + ?Sized,
T,
R: WeightTrait + ?Sized,
>: Clone + Send + Sync
{
fn new<KType, VType, RType>() -> Self
where
KType: DBData + Erase<K>,
VType: DBData + Erase<V>,
RType: DBWeight + Erase<R>;
fn key_factory(&self) -> &'static dyn Factory<K>;
fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>>;
fn val_factory(&self) -> &'static dyn Factory<V>;
fn weight_factory(&self) -> &'static dyn Factory<R>;
}
pub type WeightedItem<K, V, R> = DynPair<DynPair<K, V>, R>;
pub trait BatchFactories<K: DataTrait + ?Sized, V: DataTrait + ?Sized, T, R: WeightTrait + ?Sized>:
BatchReaderFactories<K, V, T, R>
{
fn item_factory(&self) -> &'static dyn Factory<DynPair<K, V>>;
fn weighted_items_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, V>, R>>;
fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<V, R>>;
fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, V, R>>;
fn time_diffs_factory(
&self,
) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>>;
}
pub trait Trace: BatchReader {
type Batch: Batch<
Key = Self::Key,
Val = Self::Val,
Time = Self::Time,
R = Self::R,
Factories = Self::Factories,
>;
fn new(factories: &Self::Factories) -> Self;
fn set_frontier(&mut self, frontier: &Self::Time);
fn exert(&mut self, effort: &mut isize);
fn consolidate(self) -> Option<Self::Batch>;
fn insert(&mut self, batch: Self::Batch);
fn insert_arc(&mut self, batch: Arc<Self::Batch>);
fn clear_dirty_flag(&mut self);
fn dirty(&self) -> bool;
fn retain_keys(&mut self, filter: Filter<Self::Key>);
fn retain_values(&mut self, filter: GroupFilter<Self::Val>);
fn key_filter(&self) -> &Option<Filter<Self::Key>>;
fn value_filter(&self) -> &Option<GroupFilter<Self::Val>>;
fn save(
&mut self,
base: &StoragePath,
pid: &str,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), Error>;
fn restore(&mut self, base: &StoragePath, pid: &str) -> Result<(), Error>;
fn metadata(&self, _meta: &mut OperatorMeta) {}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Enum)]
pub enum BatchLocation {
Memory,
Storage,
}
pub trait BatchReader: Debug + NumEntries + Rkyv + SizeOf + 'static
where
Self: Sized,
{
type Factories: BatchFactories<Self::Key, Self::Val, Self::Time, Self::R>;
type Key: DataTrait + ?Sized;
type Val: DataTrait + ?Sized;
type Time: Timestamp;
type R: WeightTrait + ?Sized;
type Cursor<'s>: Cursor<Self::Key, Self::Val, Self::Time, Self::R> + Clone + Send
where
Self: 's;
fn factories(&self) -> Self::Factories;
fn cursor(&self) -> Self::Cursor<'_>;
fn push_cursor(
&self,
) -> Box<dyn PushCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
Box::new(DefaultPushCursor::new(self.cursor()))
}
fn merge_cursor(
&self,
key_filter: Option<Filter<Self::Key>>,
value_filter: Option<GroupFilter<Self::Val>>,
) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
if key_filter.is_none() && value_filter.is_none() {
Box::new(UnfilteredMergeCursor::new(self.cursor()))
} else if let Some(GroupFilter::Simple(filter)) = value_filter {
Box::new(FilteredMergeCursor::new(
self.cursor(),
key_filter,
Some(filter),
))
} else {
Box::new(FilteredMergeCursor::new(self.cursor(), key_filter, None))
}
}
fn merge_cursor_with_snapshot<'a, S>(
&'a self,
key_filter: Option<Filter<Self::Key>>,
value_filter: Option<GroupFilter<Self::Val>>,
snapshot: &'a Option<Arc<S>>,
) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + 'a>
where
S: BatchReader<Key = Self::Key, Val = Self::Val, Time = Self::Time, R = Self::R>,
{
let Some(snapshot) = snapshot else {
return self.merge_cursor(key_filter, value_filter);
};
if key_filter.is_none() && value_filter.is_none() {
Box::new(UnfilteredMergeCursor::new(self.cursor()))
} else if value_filter.is_none() {
Box::new(FilteredMergeCursor::new(self.cursor(), key_filter, None))
} else if let Some(GroupFilter::Simple(filter)) = value_filter {
Box::new(FilteredMergeCursor::new(
self.cursor(),
key_filter,
Some(filter),
))
} else {
Box::new(FilteredMergeCursorWithSnapshot::new(
self.cursor(),
key_filter,
value_filter.unwrap(),
snapshot,
))
}
}
fn consuming_cursor(
&mut self,
key_filter: Option<Filter<Self::Key>>,
value_filter: Option<GroupFilter<Self::Val>>,
) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
self.merge_cursor(key_filter, value_filter)
}
fn key_count(&self) -> usize;
fn len(&self) -> usize;
fn approximate_byte_size(&self) -> usize;
fn membership_filter_stats(&self) -> FilterStats;
fn range_filter_stats(&self) -> FilterStats {
FilterStats::default()
}
fn location(&self) -> BatchLocation {
BatchLocation::Memory
}
fn cache_stats(&self) -> CacheStats {
CacheStats::default()
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
where
Self::Time: PartialEq<()>,
RG: Rng;
fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<Self::Key>)
where
Self::Time: PartialEq<()>,
{
bounds.clear();
if num_partitions <= 1 {
return;
}
let sample_size = num_partitions * num_partitions;
let mut sample = self.factories().keys_factory().default_box();
self.sample_keys(&mut thread_rng(), sample_size, sample.as_mut());
let sample_len = sample.len();
if sample_len == 0 {
return;
}
if sample_len >= num_partitions {
for i in 0..num_partitions - 1 {
let idx = ((i + 1) * sample_len) / num_partitions;
let idx = idx.min(sample_len - 1);
bounds.push_ref(sample.index(idx));
}
} else {
for i in 0..sample_len {
bounds.push_ref(sample.index(i));
}
}
}
#[allow(async_fn_in_trait)]
async fn fetch<B>(
&self,
keys: &B,
) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
where
B: BatchReader<Key = Self::Key, Time = ()>,
{
let _ = keys;
None
}
fn keys(&self) -> Option<&DynVec<Self::Key>> {
None
}
}
impl<B> BatchReader for Arc<B>
where
B: BatchReader,
{
type Factories = B::Factories;
type Key = B::Key;
type Val = B::Val;
type Time = B::Time;
type R = B::R;
type Cursor<'s> = B::Cursor<'s>;
fn factories(&self) -> Self::Factories {
(**self).factories()
}
fn cursor(&self) -> Self::Cursor<'_> {
(**self).cursor()
}
fn merge_cursor(
&self,
key_filter: Option<Filter<Self::Key>>,
value_filter: Option<GroupFilter<Self::Val>>,
) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
(**self).merge_cursor(key_filter, value_filter)
}
fn key_count(&self) -> usize {
(**self).key_count()
}
fn len(&self) -> usize {
(**self).len()
}
fn approximate_byte_size(&self) -> usize {
(**self).approximate_byte_size()
}
fn membership_filter_stats(&self) -> FilterStats {
(**self).membership_filter_stats()
}
fn range_filter_stats(&self) -> FilterStats {
(**self).range_filter_stats()
}
fn location(&self) -> BatchLocation {
(**self).location()
}
fn cache_stats(&self) -> CacheStats {
(**self).cache_stats()
}
fn is_empty(&self) -> bool {
(**self).is_empty()
}
fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
where
Self::Time: PartialEq<()>,
RG: Rng,
{
(**self).sample_keys(rng, sample_size, sample)
}
fn consuming_cursor(
&mut self,
key_filter: Option<Filter<Self::Key>>,
value_filter: Option<GroupFilter<Self::Val>>,
) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
(**self).merge_cursor(key_filter, value_filter)
}
async fn fetch<KB>(
&self,
keys: &KB,
) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
where
KB: BatchReader<Key = Self::Key, Time = ()>,
{
(**self).fetch(keys).await
}
fn keys(&self) -> Option<&DynVec<Self::Key>> {
(**self).keys()
}
}
pub trait Batch: BatchReader + Clone + Send + Sync
where
Self: Sized,
{
type Timed<T: Timestamp>: Batch<
Key = <Self as BatchReader>::Key,
Val = <Self as BatchReader>::Val,
Time = T,
R = <Self as BatchReader>::R,
>;
type Batcher: Batcher<Self>;
type Builder: Builder<Self>;
#[allow(clippy::type_complexity)]
fn dyn_from_tuples(
factories: &Self::Factories,
time: Self::Time,
tuples: &mut Box<DynWeightedPairs<DynPair<Self::Key, Self::Val>, Self::R>>,
) -> Self {
let mut batcher = Self::Batcher::new_batcher(factories, time);
batcher.push_batch(tuples);
batcher.seal()
}
fn from_batch<BI>(batch: &BI, timestamp: &Self::Time, factories: &Self::Factories) -> Self
where
BI: BatchReader<Key = Self::Key, Val = Self::Val, Time = (), R = Self::R>,
{
if TypeId::of::<BI>() == TypeId::of::<Self>() {
unsafe { std::mem::transmute::<&BI, &Self>(batch).clone() }
} else {
Self::from_cursor(
batch.cursor(),
timestamp,
factories,
batch.key_count(),
batch.len(),
)
}
}
fn from_arc_batch<BI>(
batch: &Arc<BI>,
timestamp: &Self::Time,
factories: &Self::Factories,
) -> Arc<Self>
where
BI: BatchReader<Key = Self::Key, Val = Self::Val, Time = (), R = Self::R>,
{
if TypeId::of::<BI>() == TypeId::of::<Self>() {
unsafe { std::mem::transmute::<&Arc<BI>, &Arc<Self>>(batch).clone() }
} else {
Arc::new(Self::from_cursor(
batch.cursor(),
timestamp,
factories,
batch.key_count(),
batch.len(),
))
}
}
fn from_cursor<C>(
mut cursor: C,
timestamp: &Self::Time,
factories: &Self::Factories,
key_capacity: usize,
value_capacity: usize,
) -> Self
where
C: Cursor<Self::Key, Self::Val, (), Self::R>,
{
let mut builder = Self::Builder::with_capacity(factories, key_capacity, value_capacity);
while cursor.key_valid() {
let mut any_values = false;
while cursor.val_valid() {
let weight = cursor.weight();
debug_assert!(!weight.is_zero());
builder.push_time_diff(timestamp, weight);
builder.push_val(cursor.val());
any_values = true;
cursor.step_val();
}
if any_values {
builder.push_key(cursor.key());
}
cursor.step_key();
}
builder.done()
}
fn dyn_empty(factories: &Self::Factories) -> Self {
Self::Builder::new_builder(factories).done()
}
fn filter(&self, predicate: &dyn Fn(&Self::Key, &Self::Val) -> bool) -> Self
where
Self::Time: PartialEq<()> + From<()>,
{
let factories = self.factories();
let mut builder = Self::Builder::new_builder(&factories);
let mut cursor = self.cursor();
while cursor.key_valid() {
let mut any_values = false;
while cursor.val_valid() {
if predicate(cursor.key(), cursor.val()) {
builder.push_diff(cursor.weight());
builder.push_val(cursor.val());
any_values = true;
}
cursor.step_val();
}
if any_values {
builder.push_key(cursor.key());
}
cursor.step_key();
}
builder.done()
}
fn persisted(&self) -> Option<Self> {
None
}
fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
None
}
fn from_path(_factories: &Self::Factories, _path: &StoragePath) -> Result<Self, ReaderError> {
Err(ReaderError::Unsupported)
}
fn negative_weight_count(&self) -> Option<u64>;
}
pub trait Batcher<Output>: SizeOf
where
Output: Batch,
{
fn new_batcher(vtables: &Output::Factories, time: Output::Time) -> Self;
fn push_batch(
&mut self,
batch: &mut Box<DynWeightedPairs<DynPair<Output::Key, Output::Val>, Output::R>>,
);
fn push_consolidated_batch(
&mut self,
batch: &mut Box<DynWeightedPairs<DynPair<Output::Key, Output::Val>, Output::R>>,
);
fn tuples(&self) -> usize;
fn seal(self) -> Output;
}
pub trait Builder<Output>: Send + SizeOf
where
Self: Sized,
Output: Batch,
{
fn new_builder(factories: &Output::Factories) -> Self {
Self::with_capacity(factories, 0, 0)
}
fn with_capacity(
factories: &Output::Factories,
key_capacity: usize,
value_capacity: usize,
) -> Self;
fn for_merge<'a, B, I>(
factories: &Output::Factories,
batches: I,
location: Option<BatchLocation>,
) -> Self
where
B: BatchReader,
I: IntoIterator<Item = &'a B> + Clone,
{
let _ = location;
let key_capacity = batches.clone().into_iter().map(|b| b.key_count()).sum();
let value_capacity = batches.into_iter().map(|b| b.len()).sum();
Self::with_capacity(factories, key_capacity, value_capacity)
}
fn push_time_diff(&mut self, time: &Output::Time, weight: &Output::R);
fn push_time_diff_mut(&mut self, time: &mut Output::Time, weight: &mut Output::R) {
self.push_time_diff(time, weight);
}
fn push_val(&mut self, val: &Output::Val);
fn push_val_mut(&mut self, val: &mut Output::Val) {
self.push_val(val);
}
fn push_key(&mut self, key: &Output::Key);
fn push_key_mut(&mut self, key: &mut Output::Key) {
self.push_key(key);
}
fn push_diff(&mut self, weight: &Output::R)
where
Output::Time: PartialEq<()>,
{
self.push_time_diff(&Output::Time::default(), weight);
}
fn push_diff_mut(&mut self, weight: &mut Output::R)
where
Output::Time: PartialEq<()>,
{
self.push_diff(weight);
}
fn push_val_diff(&mut self, val: &Output::Val, weight: &Output::R)
where
Output::Time: PartialEq<()>,
{
self.push_time_diff(&Output::Time::default(), weight);
self.push_val(val);
}
fn push_val_diff_mut(&mut self, val: &mut Output::Val, weight: &mut Output::R)
where
Output::Time: PartialEq<()>,
{
self.push_val_diff(val, weight);
}
fn reserve(&mut self, additional: usize) {
let _ = additional;
}
fn num_keys(&self) -> usize;
fn num_tuples(&self) -> usize;
fn done(self) -> Output;
}
pub struct TupleBuilder<B, Output>
where
B: Builder<Output>,
Output: Batch,
{
builder: B,
kv: Box<DynPair<Output::Key, Output::Val>>,
has_kv: bool,
}
impl<B, Output> TupleBuilder<B, Output>
where
B: Builder<Output>,
Output: Batch,
{
pub fn new(factories: &Output::Factories, builder: B) -> Self {
Self {
builder,
kv: factories.item_factory().default_box(),
has_kv: false,
}
}
pub fn num_keys(&self) -> usize {
self.builder.num_keys()
}
pub fn num_tuples(&self) -> usize {
self.builder.num_tuples()
}
pub fn push(&mut self, element: &mut DynPair<DynPair<Output::Key, Output::Val>, Output::R>)
where
Output::Time: PartialEq<()>,
{
let (kv, w) = element.split_mut();
let (k, v) = kv.split_mut();
self.push_vals(k, v, &mut Output::Time::default(), w);
}
pub fn push_refs(
&mut self,
key: &Output::Key,
val: &Output::Val,
time: &Output::Time,
weight: &Output::R,
) {
if self.has_kv {
let (k, v) = self.kv.split_mut();
if k != key {
self.builder.push_val_mut(v);
self.builder.push_key_mut(k);
self.kv.from_refs(key, val);
} else if v != val {
self.builder.push_val_mut(v);
val.clone_to(v);
}
} else {
self.has_kv = true;
self.kv.from_refs(key, val);
}
self.builder.push_time_diff(time, weight);
}
pub fn push_vals(
&mut self,
key: &mut Output::Key,
val: &mut Output::Val,
time: &mut Output::Time,
weight: &mut Output::R,
) {
if self.has_kv {
let (k, v) = self.kv.split_mut();
if k != key {
self.builder.push_val_mut(v);
self.builder.push_key_mut(k);
self.kv.from_vals(key, val);
} else if v != val {
self.builder.push_val_mut(v);
val.move_to(v);
}
} else {
self.has_kv = true;
self.kv.from_vals(key, val);
}
self.builder.push_time_diff_mut(time, weight);
}
pub fn reserve(&mut self, additional: usize) {
self.builder.reserve(additional)
}
pub fn extend<'a, I>(&mut self, iter: I)
where
Output::Time: PartialEq<()>,
I: Iterator<Item = &'a mut WeightedItem<Output::Key, Output::Val, Output::R>>,
{
let (lower, upper) = iter.size_hint();
self.reserve(upper.unwrap_or(lower));
for item in iter {
let (kv, w) = item.split_mut();
let (k, v) = kv.split_mut();
self.push_vals(k, v, &mut Output::Time::default(), w);
}
}
pub fn done(mut self) -> Output {
if self.has_kv {
let (k, v) = self.kv.split_mut();
self.builder.push_val_mut(v);
self.builder.push_key_mut(k);
}
self.builder.done()
}
}
pub fn merge_batches<B, T>(
factories: &B::Factories,
batches: T,
key_filter: &Option<Filter<B::Key>>,
value_filter: &Option<GroupFilter<B::Val>>,
) -> B
where
T: IntoIterator<Item = B>,
B: Batch,
{
let mut batches = batches
.into_iter()
.filter(|b| !b.is_empty())
.collect::<Vec<_>>();
while batches.len() > 1 {
let mut inputs = batches.split_off(batches.len().saturating_sub(64));
let result: B = ListMerger::merge(
factories,
B::Builder::for_merge(factories, &inputs, Some(BatchLocation::Memory)),
inputs
.iter_mut()
.map(|b| b.consuming_cursor(key_filter.clone(), value_filter.clone()))
.collect(),
);
if !result.is_empty() {
batches.push(result);
}
}
batches.pop().unwrap_or_else(|| B::dyn_empty(factories))
}
pub fn merge_batches_by_reference<'a, B, T>(
factories: &B::Factories,
batches: T,
key_filter: &Option<Filter<B::Key>>,
value_filter: &Option<GroupFilter<B::Val>>,
) -> B
where
T: IntoIterator<Item = &'a B>,
B: Batch,
{
let mut batches = batches
.into_iter()
.filter(|b| !b.is_empty())
.collect::<Vec<_>>();
let mut outputs = Vec::with_capacity(batches.len().div_ceil(64));
while !batches.is_empty() {
let inputs = batches.split_off(batches.len().saturating_sub(64));
let result: B = ListMerger::merge(
factories,
B::Builder::for_merge(
factories,
inputs.iter().cloned(),
Some(BatchLocation::Memory),
),
inputs
.into_iter()
.map(|b| b.merge_cursor(key_filter.clone(), value_filter.clone()))
.collect(),
);
if !result.is_empty() {
outputs.push(result);
}
}
merge_batches(factories, outputs, key_filter, value_filter)
}
pub fn eq_batch<A, B, KA, VA, RA, KB, VB, RB>(a: &A, b: &B) -> bool
where
A: BatchReader<Key = KA, Val = VA, Time = (), R = RA>,
B: BatchReader<Key = KB, Val = VB, Time = (), R = RB>,
KA: PartialEq<KB> + ?Sized,
VA: PartialEq<VB> + ?Sized,
RA: PartialEq<RB> + ?Sized,
KB: ?Sized,
VB: ?Sized,
RB: ?Sized,
{
let mut c1 = a.cursor();
let mut c2 = b.cursor();
while c1.key_valid() && c2.key_valid() {
if c1.key() != c2.key() {
return false;
}
while c1.val_valid() && c2.val_valid() {
if c1.val() != c2.val() || c1.weight() != c2.weight() {
return false;
}
c1.step_val();
c2.step_val();
}
if c1.val_valid() || c2.val_valid() {
return false;
}
c1.step_key();
c2.step_key();
}
!c1.key_valid() && !c2.key_valid()
}
fn serialize_wset<B, K, R>(batch: &B) -> Vec<u8>
where
B: BatchReader<Key = K, Val = DynUnit, Time = (), R = R>,
K: DataTrait + ?Sized,
R: WeightTrait + ?Sized,
{
SerializerInner::to_fbuf_with_thread_local(|s| {
let mut offsets = Vec::with_capacity(2 * batch.len());
let mut cursor = batch.cursor();
while cursor.key_valid() {
offsets.push(cursor.key().serialize(s)?);
offsets.push(cursor.weight().serialize(s)?);
cursor.step_key();
}
s.serialize_value(&offsets)
})
.into_vec()
}
fn deserialize_wset<B, K, R>(factories: &B::Factories, data: &[u8]) -> B
where
B: Batch<Key = K, Val = DynUnit, Time = (), R = R>,
K: DataTrait + ?Sized,
R: WeightTrait + ?Sized,
{
let offsets = unsafe { archived_root::<Vec<usize>>(data) };
assert!(offsets.len() % 2 == 0);
let n = offsets.len() / 2;
let mut builder = B::Builder::with_capacity(factories, n, n);
let mut key = factories.key_factory().default_box();
let mut diff = factories.weight_factory().default_box();
for i in 0..n {
unsafe { key.deserialize_from_bytes(data, offsets[i * 2] as usize) };
unsafe { diff.deserialize_from_bytes(data, offsets[i * 2 + 1] as usize) };
builder.push_val_diff(&(), &diff);
builder.push_key(&key);
}
builder.done()
}
const SEPARATOR: u64 = u64::MAX;
#[cfg(debug_assertions)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum State {
Key,
Val,
Diff,
}
pub struct IndexedWSetSerializer {
fbuf: FBuf,
offsets: Vec<usize>,
n_keys: usize,
n_values: usize,
#[cfg(debug_assertions)]
state: State,
}
impl IndexedWSetSerializer {
pub fn with_capacity(estimated_keys: usize, estimated_values: usize) -> Self {
let mut offsets = Vec::with_capacity(2 + 2 * estimated_keys + 2 * estimated_values);
offsets.push(0);
offsets.push(0);
Self {
fbuf: FBuf::new(),
offsets,
n_keys: 0,
n_values: 0,
#[cfg(debug_assertions)]
state: State::Key,
}
}
pub fn push_diff<R: WeightTrait + ?Sized>(
&mut self,
weight: &R,
serializer_inner: &mut SerializerInner,
) {
#[cfg(debug_assertions)]
{
debug_assert_ne!(self.state, State::Diff);
self.state = State::Diff;
}
serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
self.offsets.push(weight.serialize(s).unwrap())
});
}
pub fn push_val<V: DataTrait + ?Sized>(
&mut self,
val: &V,
serializer_inner: &mut SerializerInner,
) {
#[cfg(debug_assertions)]
{
debug_assert_eq!(self.state, State::Diff);
self.state = State::Val;
}
self.n_values += 1;
serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
self.offsets.push(val.serialize(s).unwrap())
});
}
pub fn push_key<K: DataTrait + ?Sized>(
&mut self,
key: &K,
serializer_inner: &mut SerializerInner,
) {
#[cfg(debug_assertions)]
{
debug_assert_eq!(self.state, State::Val);
self.state = State::Key;
}
self.offsets.push(SEPARATOR as usize);
self.n_keys += 1;
serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
self.offsets.push(key.serialize(s).unwrap())
});
}
pub fn done(mut self, serializer_inner: &mut SerializerInner) -> FBuf {
#[cfg(debug_assertions)]
debug_assert_eq!(self.state, State::Key);
self.offsets[0] = self.n_keys;
self.offsets[1] = self.n_values;
serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
s.serialize_value(&self.offsets).unwrap()
});
self.fbuf
}
}
pub fn serialize_indexed_wset<B, K, V, R>(batch: &B, serializer_inner: &mut SerializerInner) -> FBuf
where
B: BatchReader<Key = K, Val = V, Time = (), R = R>,
K: DataTrait + ?Sized,
V: DataTrait + ?Sized,
R: WeightTrait + ?Sized,
{
let mut serializer = IndexedWSetSerializer::with_capacity(batch.key_count(), batch.len());
let mut cursor = batch.cursor();
while cursor.key_valid() {
while cursor.val_valid() {
serializer.push_diff(cursor.weight(), serializer_inner);
serializer.push_val(cursor.val(), serializer_inner);
cursor.step_val();
}
serializer.push_key(cursor.key(), serializer_inner);
cursor.step_key();
}
serializer.done(serializer_inner)
}
pub fn deserialize_indexed_wset<B, K, V, R>(factories: &B::Factories, data: &[u8]) -> B
where
B: Batch<Key = K, Val = V, Time = (), R = R>,
K: DataTrait + ?Sized,
V: DataTrait + ?Sized,
R: WeightTrait + ?Sized,
{
let offsets = unsafe { archived_root::<Vec<usize>>(data) };
let n_keys = offsets[0] as usize;
let n_values = offsets[1] as usize;
let mut builder = B::Builder::with_capacity(factories, n_keys, n_values);
let mut key = factories.key_factory().default_box();
let mut val = factories.val_factory().default_box();
let mut diff = factories.weight_factory().default_box();
let mut current_offset = 2;
while current_offset < offsets.len() {
while offsets[current_offset] != SEPARATOR {
unsafe { diff.deserialize_from_bytes(data, offsets[current_offset] as usize) };
current_offset += 1;
unsafe { val.deserialize_from_bytes(data, offsets[current_offset] as usize) };
current_offset += 1;
builder.push_val_diff(&val, &diff);
}
current_offset += 1;
unsafe { key.deserialize_from_bytes(data, offsets[current_offset] as usize) };
current_offset += 1;
builder.push_key(&key);
}
builder.done()
}
#[cfg(test)]
mod serialize_test {
use crate::{
DynZWeight, OrdIndexedZSet,
algebra::OrdIndexedZSet as DynOrdIndexedZSet,
dynamic::DynData,
indexed_zset,
storage::file::SerializerInner,
trace::{BatchReader, deserialize_indexed_wset, serialize_indexed_wset},
};
#[test]
fn test_serialize_indexed_wset() {
let test1: OrdIndexedZSet<u64, u64> = indexed_zset! {};
let test2 = indexed_zset! { 1 => { 1 => 1 } };
let test3 =
indexed_zset! { 1 => { 1 => 1, 2 => 2, 3 => 3 }, 2 => { 1 => 1, 2 => 2, 3 => 3 } };
for test in [test1, test2, test3] {
let serialized = serialize_indexed_wset(&*test, &mut SerializerInner::new());
let deserialized = deserialize_indexed_wset::<
DynOrdIndexedZSet<DynData, DynData>,
DynData,
DynData,
DynZWeight,
>(&test.factories(), &serialized);
assert_eq!(&*test, &deserialized);
}
}
#[test]
fn test_serialize_indexed_wset_tup0_key() {
let test1: OrdIndexedZSet<(), u64> = indexed_zset! {};
let test2 = indexed_zset! { () => { 1 => 1 } };
for test in [test1, test2] {
let serialized = serialize_indexed_wset(&*test, &mut SerializerInner::new());
let deserialized = deserialize_indexed_wset::<
DynOrdIndexedZSet<DynData, DynData>,
DynData,
DynData,
DynZWeight,
>(&test.factories(), &serialized);
assert_eq!(&*test, &deserialized);
}
}
#[test]
fn test_serialize_indexed_wset_tup0_val() {
let test1: OrdIndexedZSet<u64, ()> = indexed_zset! {};
let test2 = indexed_zset! { 1 => { () => 1 } };
let test3 = indexed_zset! { 1 => { () => 1 }, 2 => { () => 1 } };
for test in [test1, test2, test3] {
let serialized = serialize_indexed_wset(&*test, &mut SerializerInner::new());
let deserialized = deserialize_indexed_wset::<
DynOrdIndexedZSet<DynData, DynData>,
DynData,
DynData,
DynZWeight,
>(&test.factories(), &serialized);
assert_eq!(&*test, &deserialized);
}
}
}