use std::{
cmp::max,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
use feldera_storage::tokio::TOKIO;
use proptest::{collection::vec, prelude::*, strategy::BoxedStrategy};
use size_of::SizeOf;
use tempfile::tempdir;
use crate::{
DynZWeight, Runtime, ZWeight,
algebra::{
AddByRef, IndexedZSet, NegByRef, OrdIndexedZSet, OrdIndexedZSetFactories, OrdZSet,
OrdZSetFactories, ZBatch, ZSet,
},
circuit::{CircuitConfig, mkconfig},
dynamic::{DowncastTrait, DynData, DynUnit, DynWeightedPairs, Erase, LeanVec, pair::DynPair},
storage::{buffer_cache::CacheStats, file::FilterKind},
trace::{
Batch, BatchLocation, BatchReader, BatchReaderFactories, Builder, FileIndexedWSetFactories,
FileWSetFactories, GroupFilter, ListMerger, Spine, Trace, VecIndexedWSet,
VecIndexedWSetFactories, VecKeyBatch, VecKeyBatchFactories, VecValBatch,
VecValBatchFactories, VecWSet, VecWSetFactories,
cursor::{Cursor, CursorPair},
ord::{
FileIndexedWSet, FileKeyBatch, FileKeyBatchFactories, FileValBatch,
FileValBatchFactories, FileWSet, OrdKeyBatch, OrdKeyBatchFactories, OrdValBatch,
OrdValBatchFactories,
},
test::test_batch::{
TestBatch, TestBatchFactories, assert_batch_cursors_eq, assert_batch_eq,
assert_trace_eq, test_batch_sampling, test_trace_sampling,
},
},
utils::{Tup1, Tup2, Tup3, Tup4},
};
use super::Filter;
use itertools::Itertools;
pub mod test_batch;
type DynI32 = DynData;
fn kvtr_batch(
max_key: i32,
max_val: i32,
max_time: u32,
max_weight: ZWeight,
max_tuples: usize,
) -> BoxedStrategy<Vec<Tup4<i32, i32, u32, ZWeight>>> {
vec(
(0..max_key, 0..max_val, 0..max_time, -max_weight..max_weight)
.prop_map(|(k, v, t, r)| Tup4(k, v, t, r)),
max_tuples,
)
.boxed()
}
fn ktr_batch(
max_key: i32,
max_time: u32,
max_weight: ZWeight,
max_tuples: usize,
) -> BoxedStrategy<Vec<Tup3<i32, u32, ZWeight>>> {
vec(
(0..max_key, 0..max_time, -max_weight..max_weight).prop_map(|(k, t, r)| Tup3(k, t, r)),
max_tuples,
)
.boxed()
}
fn kr_batches(
max_key: i32,
max_weight: ZWeight,
max_tuples: usize,
max_batches: usize,
) -> BoxedStrategy<Vec<(Vec<Tup2<i32, ZWeight>>, i32)>> {
vec(
(
vec(
(0..max_key, -max_weight..max_weight).prop_map(|(x, y)| Tup2(x, y)),
0..max_tuples,
),
(0..max_key),
),
0..max_batches,
)
.boxed()
}
fn kvr_batches(
max_key: i32,
max_val: i32,
max_weight: ZWeight,
max_tuples: usize,
max_batches: usize,
) -> BoxedStrategy<Vec<(Vec<Tup2<Tup2<i32, i32>, ZWeight>>, i32, i32)>> {
vec(
(
vec(
(
(0..max_key, 0..max_val).prop_map(|(x, y)| Tup2(x, y)),
-max_weight..max_weight,
)
.prop_map(|(x, y)| Tup2(x, y)),
0..max_tuples,
),
(0..max_key),
(0..max_val),
),
0..max_batches,
)
.boxed()
}
fn kvr_batches_monotone_keys(
window_size: i32,
window_step: i32,
max_value: i32,
max_tuples: usize,
batches: usize,
) -> BoxedStrategy<Vec<Vec<Tup2<Tup2<i32, i32>, ZWeight>>>> {
(0..batches)
.map(|i| {
vec(
(
(
i as i32 * window_step..i as i32 * window_step + window_size,
0..max_value,
)
.prop_map(|(x, y)| Tup2(x, y)),
1..2i64,
)
.prop_map(|(x, y)| Tup2(x, y)),
0..max_tuples,
)
})
.collect::<Vec<_>>()
.boxed()
}
fn kvr_batches_monotone_values(
max_key: i32,
window_size: i32,
window_step: i32,
max_tuples: usize,
batches: usize,
) -> BoxedStrategy<Vec<Vec<Tup2<Tup2<i32, i32>, ZWeight>>>> {
(0..batches)
.map(|i| {
vec(
(
(
0..max_key,
i as i32 * window_step..i as i32 * window_step + window_size,
)
.prop_map(|(x, y)| Tup2(x, y)),
1..2i64,
)
.prop_map(|(x, y)| Tup2(x, y)),
0..max_tuples,
)
})
.collect::<Vec<_>>()
.boxed()
}
fn indexed_zset_tuples(
tuples: Vec<Tup2<Tup2<i32, i32>, ZWeight>>,
) -> Box<DynWeightedPairs<DynPair<DynI32, DynI32>, DynZWeight>> {
Box::new(LeanVec::from(tuples)).erase_box()
}
pub fn zset_tuples(
tuples: Vec<Tup2<i32, ZWeight>>,
) -> Box<DynWeightedPairs<DynPair<DynI32, DynUnit>, DynZWeight>> {
Box::new(LeanVec::from(
tuples
.into_iter()
.map(|Tup2(k, w)| Tup2(Tup2(k, ()), w))
.collect::<Vec<_>>(),
))
.erase_box()
}
fn test_zset_spine<B: ZSet<Key = DynI32>>(
factories: &B::Factories,
batches: Vec<(Vec<Tup2<i32, ZWeight>>, i32)>,
seed: u64,
) {
let mut trace: Spine<B> = Spine::new(factories);
let mut ref_trace: TestBatch<DynI32, DynUnit , (), DynZWeight> =
TestBatch::new(&TestBatchFactories::new());
let mut kbound = 0;
for (tuples, bound) in batches.into_iter() {
let mut erased_tuples = zset_tuples(tuples.clone());
let batch = B::dyn_from_tuples(factories, (), &mut erased_tuples.clone());
let ref_batch: TestBatch<DynData, DynUnit, (), DynZWeight> =
TestBatch::dyn_from_tuples(&TestBatchFactories::new(), (), &mut erased_tuples);
test_batch_sampling(&batch);
assert_batch_eq(&batch, &ref_batch);
TOKIO.block_on(ref_trace.insert(ref_batch));
assert_batch_cursors_eq(
CursorPair::new(&mut batch.cursor(), &mut trace.cursor()),
&ref_trace,
seed,
);
TOKIO.block_on(trace.insert(batch));
test_trace_sampling(&trace);
assert_trace_eq(&trace, &ref_trace);
kbound = max(kbound, bound);
trace.retain_keys(Filter::new(Box::new(move |key| {
*key.downcast_checked::<i32>() >= kbound
})));
ref_trace.retain_keys(Filter::new(Box::new(move |key| {
*key.downcast_checked::<i32>() >= kbound
})));
test_trace_sampling(&trace);
assert_trace_eq(&trace, &ref_trace);
}
}
fn test_indexed_zset_spine<B: IndexedZSet<Key = DynI32, Val = DynI32>>(
factories: &B::Factories,
batches: Vec<(Vec<Tup2<Tup2<i32, i32>, ZWeight>>, i32, i32)>,
seed: u64,
) {
let mut trace: Spine<B> = Spine::new(factories);
let mut ref_trace: TestBatch<DynI32, DynI32, (), DynZWeight> =
TestBatch::new(&TestBatchFactories::new());
let mut bound = 0;
let mut kbound = 0;
for (tuples, key_bound, val_bound) in batches.into_iter() {
let mut erased_tuples = indexed_zset_tuples(tuples);
let batch = B::dyn_from_tuples(factories, (), &mut erased_tuples.clone());
let ref_batch =
TestBatch::dyn_from_tuples(&TestBatchFactories::new(), (), &mut erased_tuples);
test_batch_sampling(&batch);
assert_batch_eq(&batch, &ref_batch);
assert_batch_cursors_eq(batch.cursor(), &ref_batch, seed);
TOKIO.block_on(ref_trace.insert(ref_batch));
assert_batch_cursors_eq(
CursorPair::new(&mut batch.cursor(), &mut trace.cursor()),
&ref_trace,
seed,
);
TOKIO.block_on(trace.insert(batch));
test_trace_sampling(&trace);
assert_trace_eq(&trace, &ref_trace);
assert_batch_cursors_eq(trace.cursor(), &ref_trace, seed);
kbound = max(kbound, key_bound);
trace.retain_keys(Filter::new(Box::new(move |key| {
*key.downcast_checked::<i32>() >= kbound
})));
ref_trace.retain_keys(Filter::new(Box::new(move |key| {
*key.downcast_checked::<i32>() >= kbound
})));
test_trace_sampling(&trace);
bound = max(bound, val_bound);
trace.retain_values(GroupFilter::Simple(Filter::new(Box::new(
move |val: &DynI32| *val.downcast_checked::<i32>() >= bound,
))));
ref_trace.retain_values(GroupFilter::Simple(Filter::new(Box::new(
move |val: &DynI32| *val.downcast_checked::<i32>() >= bound,
))));
test_trace_sampling(&trace);
assert_trace_eq(&trace, &ref_trace);
assert_batch_cursors_eq(trace.cursor(), &ref_trace, seed);
}
}
fn test_val_batch_trace_spine<B: ZBatch<Key = DynI32, Val = DynI32, Time = u32>>(
factories: &B::Factories,
batches: Vec<(Vec<Tup2<Tup2<i32, i32>, ZWeight>>, i32, i32)>,
seed: u64,
) {
let mut trace: Spine<B> = Spine::new(factories);
let mut ref_trace: TestBatch<DynI32, DynI32, u32, DynZWeight> =
TestBatch::new(&TestBatchFactories::new());
let mut bound = 0;
let mut kbound = 0;
for (time, (tuples, key_bound, val_bound)) in batches.into_iter().enumerate() {
let mut erased_tuples = indexed_zset_tuples(tuples);
let batch = B::dyn_from_tuples(factories, time as u32, &mut erased_tuples.clone());
let ref_batch =
TestBatch::dyn_from_tuples(&TestBatchFactories::new(), time as u32, &mut erased_tuples);
assert_batch_eq(&batch, &ref_batch);
assert_batch_cursors_eq(batch.cursor(), &ref_batch, seed);
TOKIO.block_on(ref_trace.insert(ref_batch));
assert_batch_cursors_eq(
CursorPair::new(&mut trace.cursor(), &mut batch.cursor()),
&ref_trace,
seed,
);
TOKIO.block_on(trace.insert(batch));
assert_trace_eq(&trace, &ref_trace);
assert_batch_cursors_eq(trace.cursor(), &ref_trace, seed);
kbound = max(kbound, key_bound);
trace.retain_keys(Filter::new(Box::new(move |key| {
*key.downcast_checked::<i32>() >= kbound
})));
ref_trace.retain_keys(Filter::new(Box::new(move |key| {
*key.downcast_checked::<i32>() >= kbound
})));
bound = max(bound, val_bound);
trace.retain_values(GroupFilter::Simple(Filter::new(Box::new(
move |val: &DynI32| *val.downcast_checked::<i32>() >= bound,
))));
ref_trace.retain_values(GroupFilter::Simple(Filter::new(Box::new(
move |val: &DynI32| *val.downcast_checked::<i32>() >= bound,
))));
assert_trace_eq(&trace, &ref_trace);
assert_batch_cursors_eq(trace.cursor(), &ref_trace, seed);
}
}
fn val_batch_from_tuples<B>(factories: &B::Factories, tuples: &[Tup4<i32, i32, u32, ZWeight>]) -> B
where
B: ZBatch<Key = DynI32, Val = DynI32, Time = u32>,
{
let mut builder = B::Builder::with_capacity(factories, tuples.len(), tuples.len());
#[allow(clippy::into_iter_on_ref)]
for (key, vtds) in &tuples.into_iter().chunk_by(|Tup4(key, _, _, _)| key) {
for (val, tds) in &vtds.into_iter().chunk_by(|Tup4(_, val, _, _)| val) {
for Tup4(_, _, time, diff) in tds {
builder.push_time_diff(time, diff);
}
builder.push_val(val);
}
builder.push_key(key);
}
builder.done()
}
fn test_val_batch_trace_builder<B>(
factories: &B::Factories,
mut tuples: Vec<Tup4<i32, i32, u32, ZWeight>>,
seed: u64,
) where
B: ZBatch<Key = DynI32, Val = DynI32, Time = u32>,
{
tuples.sort_unstable();
tuples.retain(|Tup4(_k, _v, _t, r)| *r != 0);
tuples.dedup_by_key(|Tup4(k, v, t, _r)| (*k, *v, *t));
let ref_batch = val_batch_from_tuples::<TestBatch<DynI32, DynI32, u32, DynZWeight>>(
&TestBatchFactories::new(),
&tuples,
);
let batch = val_batch_from_tuples::<B>(factories, &tuples);
assert_batch_eq(&batch, &ref_batch);
assert_batch_cursors_eq(batch.cursor(), &ref_batch, seed);
}
fn timed_batch_from_tuples<B>(factories: &B::Factories, tuples: &[Tup3<i32, u32, ZWeight>]) -> B
where
B: ZBatch<Key = DynI32, Val = DynUnit, Time = u32>,
{
let mut builder = B::Builder::with_capacity(factories, tuples.len(), tuples.len());
#[allow(clippy::into_iter_on_ref)]
for (key, tds) in &tuples.into_iter().chunk_by(|Tup3(key, _time, _diff)| key) {
for Tup3(_key, time, diff) in tds {
builder.push_time_diff(time, diff);
}
builder.push_val(&());
builder.push_key(key);
}
builder.done()
}
fn test_key_batch_builder<B>(
factories: &B::Factories,
mut tuples: Vec<Tup3<i32, u32, ZWeight>>,
seed: u64,
) where
B: ZBatch<Key = DynI32, Val = DynUnit, Time = u32>,
{
tuples.sort_unstable();
tuples.retain(|Tup3(_k, _t, r)| *r != 0);
tuples.dedup_by_key(|Tup3(k, t, _r)| (*k, *t));
let ref_batch = timed_batch_from_tuples::<TestBatch<DynI32, DynUnit, u32, DynZWeight>>(
&TestBatchFactories::new(),
&tuples,
);
let batch = timed_batch_from_tuples::<B>(factories, &tuples);
assert_batch_eq(&batch, &ref_batch);
assert_batch_cursors_eq(batch.cursor(), &ref_batch, seed);
}
fn test_key_batch_spine<B: ZBatch<Key = DynI32, Val = DynUnit, Time = u32>>(
factories: &B::Factories,
batches: Vec<(Vec<Tup2<i32, ZWeight>>, i32)>,
seed: u64,
) {
let mut trace: Spine<B> = Spine::new(factories);
let mut ref_trace: TestBatch<DynI32, DynUnit , u32, DynZWeight> =
TestBatch::new(&TestBatchFactories::new());
let mut kbound = 0;
for (time, (tuples, bound)) in batches.into_iter().enumerate() {
let mut erased_tuples = zset_tuples(tuples.clone());
let batch = B::dyn_from_tuples(factories, time as u32, &mut erased_tuples.clone());
let ref_batch =
TestBatch::dyn_from_tuples(&TestBatchFactories::new(), time as u32, &mut erased_tuples);
assert_batch_eq(&batch, &ref_batch);
TOKIO.block_on(ref_trace.insert(ref_batch));
assert_batch_cursors_eq(
CursorPair::new(&mut trace.cursor(), &mut batch.cursor()),
&ref_trace,
seed,
);
TOKIO.block_on(trace.insert(batch));
assert_trace_eq(&trace, &ref_trace);
kbound = max(bound, kbound);
trace.retain_keys(Filter::new(Box::new(move |key| {
*key.downcast_checked::<i32>() >= kbound
})));
ref_trace.retain_keys(Filter::new(Box::new(move |key| {
*key.downcast_checked::<i32>() >= kbound
})));
assert_trace_eq(&trace, &ref_trace);
}
}
fn assert_out_of_range_seek_uses_range_filter<B>(batch: &B, low: i32, high: i32)
where
B: BatchReader<Key = DynI32, Time = ()>,
{
let range_before = batch.range_filter_stats();
let membership_before = batch.membership_filter_stats();
assert!(range_before.size_byte > 0);
let low: Box<DynI32> = Box::new(low).erase_box();
let high: Box<DynI32> = Box::new(high).erase_box();
let mut cursor = batch.cursor();
assert!(!cursor.seek_key_exact(low.as_ref(), None));
assert!(!cursor.seek_key_exact(high.as_ref(), None));
let range_after = batch.range_filter_stats();
let membership_after = batch.membership_filter_stats();
assert_eq!(range_after.size_byte, range_before.size_byte);
assert_eq!(range_after.hits, range_before.hits);
assert_eq!(range_after.misses, range_before.misses + 2);
assert_eq!(membership_after, membership_before);
}
#[test]
fn test_file_wset_neg_by_ref_preserves_key_bounds() {
run_in_circuit_with_storage(|| {
let factories = <FileWSetFactories<DynI32, DynZWeight>>::new::<i32, (), ZWeight>();
let tuples = vec![Tup2(-10, 1), Tup2(0, -2), Tup2(25, 3)];
let mut erased_tuples = zset_tuples(tuples.clone());
let batch =
FileWSet::<DynI32, DynZWeight>::dyn_from_tuples(&factories, (), &mut erased_tuples);
let negated = batch.neg_by_ref();
let mut expected_tuples = zset_tuples(
tuples
.into_iter()
.map(|Tup2(key, weight)| Tup2(key, -weight))
.collect(),
);
let expected =
TestBatch::dyn_from_tuples(&TestBatchFactories::new(), (), &mut expected_tuples);
assert_batch_eq(&negated, &expected);
assert_out_of_range_seek_uses_range_filter(&negated, -20, 40);
});
}
#[test]
fn test_file_indexed_wset_neg_by_ref_preserves_key_bounds() {
run_in_circuit_with_storage(|| {
let factories =
<FileIndexedWSetFactories<DynI32, DynI32, DynZWeight>>::new::<i32, i32, ZWeight>();
let tuples = vec![
Tup2(Tup2(-10, 1), 1),
Tup2(Tup2(0, 5), -2),
Tup2(Tup2(25, 7), 3),
];
let mut erased_tuples = indexed_zset_tuples(tuples.clone());
let batch = FileIndexedWSet::<DynI32, DynI32, DynZWeight>::dyn_from_tuples(
&factories,
(),
&mut erased_tuples,
);
let negated = batch.neg_by_ref();
let mut expected_tuples = indexed_zset_tuples(
tuples
.into_iter()
.map(|Tup2(Tup2(key, val), weight)| Tup2(Tup2(key, val), -weight))
.collect(),
);
let expected =
TestBatch::dyn_from_tuples(&TestBatchFactories::new(), (), &mut expected_tuples);
assert_batch_eq(&negated, &expected);
assert_out_of_range_seek_uses_range_filter(&negated, -20, 40);
});
}
#[test]
fn test_file_indexed_wset_key_bounds() {
run_in_circuit_with_storage(|| {
let factories =
<FileIndexedWSetFactories<DynI32, DynI32, DynZWeight>>::new::<i32, i32, ZWeight>();
let tuples = vec![
Tup2(Tup2(-10, 1), 1),
Tup2(Tup2(0, 5), -2),
Tup2(Tup2(25, 7), 3),
];
let mut erased_tuples = indexed_zset_tuples(tuples);
let batch = FileIndexedWSet::<DynI32, DynI32, DynZWeight>::dyn_from_tuples(
&factories,
(),
&mut erased_tuples,
);
let Some((min, max)) = batch.key_bounds() else {
panic!("expected non-empty key bounds");
};
assert_eq!(*min.downcast_checked::<i32>(), -10);
assert_eq!(*max.downcast_checked::<i32>(), 25);
let empty_batch =
<FileIndexedWSet<DynI32, DynI32, DynZWeight> as Batch>::Builder::with_capacity(
&factories, 0, 0,
)
.done();
assert!(empty_batch.key_bounds().is_none());
});
}
#[test]
fn test_file_key_batch_key_bounds() {
run_in_circuit_with_storage(|| {
let factories =
<FileKeyBatchFactories<DynData, u32, DynZWeight>>::new::<i16, (), ZWeight>();
let mut batch = <FileKeyBatch<DynData, u32, DynZWeight> as Batch>::Builder::with_capacity(
&factories, 3, 3,
);
for (key, time, weight) in [(-7i16, 1u32, 1i64), (3, 2, -2), (12, 4, 3)] {
let weight: ZWeight = weight;
batch.push_time_diff(&time, weight.erase());
batch.push_val(&());
batch.push_key(key.erase());
}
let batch = batch.done();
let Some((min, max)) = batch.key_bounds() else {
panic!("expected non-empty key bounds");
};
assert_eq!(*min.downcast_checked::<i16>(), -7);
assert_eq!(*max.downcast_checked::<i16>(), 12);
let empty_batch =
<FileKeyBatch<DynData, u32, DynZWeight> as Batch>::Builder::with_capacity(
&factories, 0, 0,
)
.done();
assert!(empty_batch.key_bounds().is_none());
});
}
#[test]
fn test_file_val_batch_key_bounds() {
run_in_circuit_with_storage(|| {
let factories =
<FileValBatchFactories<DynData, DynData, u32, DynZWeight>>::new::<u64, i16, ZWeight>();
let mut batch =
<FileValBatch<DynData, DynData, u32, DynZWeight> as Batch>::Builder::with_capacity(
&factories, 3, 3,
);
for (key, val, time, weight) in [(5u64, -1i16, 1u32, 1i64), (11, 2, 2, -1), (42, 9, 7, 4)] {
let weight: ZWeight = weight;
batch.push_time_diff(&time, weight.erase());
batch.push_val(val.erase());
batch.push_key(key.erase());
}
let batch = batch.done();
let Some((min, max)) = batch.key_bounds() else {
panic!("expected non-empty key bounds");
};
assert_eq!(*min.downcast_checked::<u64>(), 5);
assert_eq!(*max.downcast_checked::<u64>(), 42);
let empty_batch =
<FileValBatch<DynData, DynData, u32, DynZWeight> as Batch>::Builder::with_capacity(
&factories, 0, 0,
)
.done();
assert!(empty_batch.key_bounds().is_none());
});
}
#[test]
fn test_file_wset_key_bounds() {
run_in_circuit_with_storage(|| {
let batch = build_file_wset_tup1_i32(&[-4, 6, 15]);
let Some((min, max)) = batch.key_bounds() else {
panic!("expected non-empty key bounds");
};
assert_eq!(min.downcast_checked::<Tup1<i32>>(), &Tup1(-4));
assert_eq!(max.downcast_checked::<Tup1<i32>>(), &Tup1(15));
let factories = <FileWSetFactories<DynData, DynZWeight>>::new::<Tup1<i32>, (), ZWeight>();
let empty_batch =
<FileWSet<DynData, DynZWeight> as Batch>::Builder::with_capacity(&factories, 0, 0)
.done();
assert!(empty_batch.key_bounds().is_none());
});
}
#[test]
fn test_vec_indexed_wset_key_bounds() {
let factories =
<VecIndexedWSetFactories<DynData, DynData, DynZWeight>>::new::<i64, u8, ZWeight>();
let mut batch = <VecIndexedWSet<DynData, DynData, DynZWeight> as Batch>::Builder::with_capacity(
&factories, 3, 3,
);
for (key, val, weight) in [(-50i64, 1u8, 1i64), (4, 2, -1), (999, 3, 5)] {
let weight: ZWeight = weight;
batch.push_val_diff(val.erase(), weight.erase());
batch.push_key(key.erase());
}
let batch = batch.done();
let Some((min, max)) = batch.key_bounds() else {
panic!("expected non-empty key bounds");
};
assert_eq!(*min.downcast_checked::<i64>(), -50);
assert_eq!(*max.downcast_checked::<i64>(), 999);
let empty_batch =
<VecIndexedWSet<DynData, DynData, DynZWeight> as Batch>::Builder::with_capacity(
&factories, 0, 0,
)
.done();
assert!(empty_batch.key_bounds().is_none());
}
#[test]
fn test_vec_key_batch_key_bounds() {
let factories = <VecKeyBatchFactories<DynData, u32, DynZWeight>>::new::<String, (), ZWeight>();
let mut batch =
<VecKeyBatch<DynData, u32, DynZWeight> as Batch>::Builder::with_capacity(&factories, 3, 3);
for (key, time, weight) in [
(String::from("ant"), 1u32, 1i64),
(String::from("bee"), 2, -3),
(String::from("cat"), 7, 4),
] {
let weight: ZWeight = weight;
batch.push_time_diff(&time, weight.erase());
batch.push_val(&());
batch.push_key(key.erase());
}
let batch = batch.done();
let Some((min, max)) = batch.key_bounds() else {
panic!("expected non-empty key bounds");
};
assert_eq!(min.downcast_checked::<String>(), "ant");
assert_eq!(max.downcast_checked::<String>(), "cat");
let empty_batch =
<VecKeyBatch<DynData, u32, DynZWeight> as Batch>::Builder::with_capacity(&factories, 0, 0)
.done();
assert!(empty_batch.key_bounds().is_none());
}
#[test]
fn test_vec_val_batch_key_bounds() {
let factories =
<VecValBatchFactories<DynData, DynData, u32, DynZWeight>>::new::<u16, i32, ZWeight>();
let mut batch =
<VecValBatch<DynData, DynData, u32, DynZWeight> as Batch>::Builder::with_capacity(
&factories, 3, 3,
);
for (key, val, time, weight) in [(2u16, -8i32, 1u32, 1i64), (10, 4, 2, -2), (70, 9, 9, 3)] {
let weight: ZWeight = weight;
batch.push_time_diff(&time, weight.erase());
batch.push_val(val.erase());
batch.push_key(key.erase());
}
let batch = batch.done();
let Some((min, max)) = batch.key_bounds() else {
panic!("expected non-empty key bounds");
};
assert_eq!(*min.downcast_checked::<u16>(), 2);
assert_eq!(*max.downcast_checked::<u16>(), 70);
let empty_batch =
<VecValBatch<DynData, DynData, u32, DynZWeight> as Batch>::Builder::with_capacity(
&factories, 0, 0,
)
.done();
assert!(empty_batch.key_bounds().is_none());
}
#[test]
fn test_vec_wset_key_bounds() {
let factories = <VecWSetFactories<DynData, DynZWeight>>::new::<u8, (), ZWeight>();
let mut batch =
<VecWSet<DynData, DynZWeight> as Batch>::Builder::with_capacity(&factories, 3, 0);
for key in [1u8, 4, 9] {
let weight: ZWeight = 1;
batch.push_val_diff(&(), weight.erase());
batch.push_key(key.erase());
}
let batch = batch.done();
let Some((min, max)) = batch.key_bounds() else {
panic!("expected non-empty key bounds");
};
assert_eq!(*min.downcast_checked::<u8>(), 1);
assert_eq!(*max.downcast_checked::<u8>(), 9);
let empty_batch =
<VecWSet<DynData, DynZWeight> as Batch>::Builder::with_capacity(&factories, 0, 0).done();
assert!(empty_batch.key_bounds().is_none());
}
proptest! {
#[test]
fn test_truncate_key_bounded_memory(batches in kvr_batches_monotone_keys(100, 20, 50, 20, 500)) {
Runtime::run(CircuitConfig::with_workers(1), move |_parker| {
let factories = <OrdIndexedZSetFactories<DynI32, DynI32>>::new::<i32, i32, ZWeight>();
let mut trace: Spine<OrdIndexedZSet<DynI32, DynI32>> = Spine::new(&factories);
for (i, tuples) in batches.into_iter().enumerate() {
let mut erased_tuples = indexed_zset_tuples(tuples);
let batch = <OrdIndexedZSet<DynI32, DynI32>>::dyn_from_tuples(&factories, (), &mut erased_tuples);
test_batch_sampling(&batch);
TOKIO.block_on(trace.insert(batch));
trace.retain_keys(Filter::new(Box::new(move |x| *x.downcast_checked::<i32>() >= ((i * 20) as i32))));
trace.complete_merges();
let trace_total_bytes = trace.size_of().total_bytes();
assert!(trace_total_bytes < 200000, "total bytes={}", trace_total_bytes);
}
}).unwrap().join().unwrap();
}
#[test]
fn test_truncate_value_bounded_memory(batches in kvr_batches_monotone_values(50, 100, 20, 20, 500)) {
Runtime::run(CircuitConfig::with_workers(1), move |_parker| {
let factories = <OrdIndexedZSetFactories<DynI32, DynI32>>::new::<i32, i32, ZWeight>();
let mut trace: Spine<OrdIndexedZSet<DynI32, DynI32>> = Spine::new(&factories);
for (i, tuples) in batches.into_iter().enumerate() {
let mut erased_tuples = indexed_zset_tuples(tuples);
let batch = <OrdIndexedZSet<DynI32, DynI32>>::dyn_from_tuples(&factories, (), &mut erased_tuples);
test_batch_sampling(&batch);
trace.retain_values(GroupFilter::Simple(Filter::new(Box::new(
move |x: &DynI32| *x.downcast_checked::<i32>() >= ((i * 20) as i32),
))));
TOKIO.block_on(trace.insert(batch));
trace.complete_merges();
let trace_total_bytes = trace.size_of().total_bytes();
assert!(trace_total_bytes < 200000, "total bytes={}", trace_total_bytes);
}
}).unwrap().join().unwrap();
}
#[test]
fn test_vec_zset_spine(batches in kr_batches(50, 2, 100, 20), seed in 0..u64::MAX) {
let factories = <OrdZSetFactories<DynI32>>::new::<i32, (), ZWeight>();
Runtime::run(CircuitConfig::with_workers(1), move |_parker| {
test_zset_spine::<OrdZSet<DynI32>>(&factories, batches, seed)
}).unwrap().join().unwrap();
}
#[test]
fn test_file_zset_spine(batches in kr_batches(50, 2, 50, 10), seed in 0..u64::MAX) {
let tempdir = tempfile::tempdir().unwrap();
Runtime::run(CircuitConfig::with_workers(1).with_temporary_storage(tempdir.path()), move |_parker| {
let factories = <FileWSetFactories<DynI32, DynZWeight>>::new::<i32, (), ZWeight>();
test_zset_spine::<FileWSet<DynI32, DynZWeight>>(&factories, batches, seed)
}).unwrap().join().unwrap();
}
#[test]
fn test_vec_indexed_zset_spine(batches in kvr_batches(100, 5, 2, 500, 20), seed in 0..u64::MAX) {
Runtime::run(CircuitConfig::with_workers(1), move |_parker| {
let factories = <OrdIndexedZSetFactories<DynI32, DynI32>>::new::<i32, i32, ZWeight>();
test_indexed_zset_spine::<OrdIndexedZSet<DynI32, DynI32>>(&factories, batches, seed)
}).unwrap().join().unwrap();
}
#[test]
fn test_file_indexed_zset_spine(batches in kvr_batches(100, 5, 2, 200, 10), seed in 0..u64::MAX) {
let tempdir = tempfile::tempdir().unwrap();
Runtime::run(CircuitConfig::with_workers(1).with_temporary_storage(tempdir.path()), move |_parker| {
let factories = <FileIndexedWSetFactories<DynI32, DynI32, DynZWeight>>::new::<i32, i32, ZWeight>();
test_indexed_zset_spine::<FileIndexedWSet<DynI32, DynI32, DynZWeight>>(&factories, batches, seed)
}).unwrap().join().unwrap();
}
#[test]
fn test_indexed_zset_spine_even_values(batches in kvr_batches(100, 5, 2, 500, 10), seed in 0..u64::MAX) {
Runtime::run(CircuitConfig::with_workers(1), move |_parker| {
let factories = <OrdIndexedZSetFactories<DynI32, DynI32>>::new::<i32, i32, ZWeight>();
let mut trace: Spine<OrdIndexedZSet<DynI32, DynI32>> = Spine::new(&factories);
let mut ref_trace: TestBatch<DynI32, DynI32, (), DynZWeight> = TestBatch::new(&TestBatchFactories::new());
trace.retain_values(GroupFilter::Simple(Filter::new(Box::new(
move |val: &DynI32| *val.downcast_checked::<i32>() % 2 == 0,
))));
ref_trace.retain_values(GroupFilter::Simple(Filter::new(Box::new(
move |val: &DynI32| *val.downcast_checked::<i32>() % 2 == 0,
))));
for (tuples, _key_bound, _val_bound) in batches.into_iter() {
let mut erased_tuples = indexed_zset_tuples(tuples);
let batch = OrdIndexedZSet::dyn_from_tuples(&factories, (), &mut erased_tuples.clone());
let ref_batch = TestBatch::dyn_from_tuples(&TestBatchFactories::new(), (), &mut erased_tuples);
test_batch_sampling(&batch);
assert_batch_eq(&batch, &ref_batch);
assert_batch_cursors_eq(batch.cursor(), &ref_batch, seed);
TOKIO.block_on(ref_trace.insert(ref_batch));
assert_batch_cursors_eq(CursorPair::new(&mut batch.cursor(), &mut trace.cursor()), &ref_trace, seed);
TOKIO.block_on(trace.insert(batch));
test_trace_sampling(&trace);
assert_trace_eq(&trace, &ref_trace);
assert_batch_cursors_eq(trace.cursor(), &ref_trace, seed);
}
}).unwrap().join().unwrap();
}
#[test]
fn test_indexed_zset_spine_even_keys(batches in kvr_batches(100, 5, 2, 500, 10), seed in 0..u64::MAX) {
Runtime::run(CircuitConfig::with_workers(1), move |_parker| {
let factories = <OrdIndexedZSetFactories<DynI32, DynI32>>::new::<i32, i32, ZWeight>();
let mut trace: Spine<OrdIndexedZSet<DynI32, DynI32>> = Spine::new(&factories);
let mut ref_trace: TestBatch<DynI32, DynI32, (), DynZWeight> = TestBatch::new(&TestBatchFactories::new());
trace.retain_keys(Filter::new(Box::new(move |val| *val.downcast_checked::<i32>() % 2 == 0)));
ref_trace.retain_keys(Filter::new(Box::new(move |val| *val.downcast_checked::<i32>() % 2 == 0)));
for (tuples, _key_bound, _val_bound) in batches.into_iter() {
let mut erased_tuples = indexed_zset_tuples(tuples);
let batch = OrdIndexedZSet::dyn_from_tuples(&factories, (), &mut erased_tuples.clone());
let ref_batch = TestBatch::dyn_from_tuples(&TestBatchFactories::new(), (), &mut erased_tuples);
test_batch_sampling(&batch);
assert_batch_eq(&batch, &ref_batch);
assert_batch_cursors_eq(batch.cursor(), &ref_batch, seed);
TOKIO.block_on(ref_trace.insert(ref_batch));
assert_batch_cursors_eq(CursorPair::new(&mut batch.cursor(), &mut trace.cursor()), &ref_trace, seed);
TOKIO.block_on(trace.insert(batch));
test_trace_sampling(&trace);
assert_trace_eq(&trace, &ref_trace);
assert_batch_cursors_eq(trace.cursor(), &ref_trace, seed);
}
}).unwrap().join().unwrap();
}
#[test]
fn test_vec_key_batch_trace_spine(batches in kr_batches(100, 2, 500, 20), seed in 0..u64::MAX) {
Runtime::run(CircuitConfig::with_workers(1), move |_parker| {
let factories = <OrdKeyBatchFactories<DynI32, u32, DynZWeight>>::new::<i32, (), ZWeight>();
test_key_batch_spine::<OrdKeyBatch<DynI32, u32, DynZWeight>>(&factories, batches, seed)
}).unwrap().join().unwrap();
}
#[test]
fn test_file_key_batch_spine(batches in kr_batches(100, 2, 200, 10), seed in 0..u64::MAX) {
run_in_circuit_with_storage(move || {
let factories = <FileKeyBatchFactories<DynI32, u32, DynZWeight>>::new::<i32, (), ZWeight>();
test_key_batch_spine::<FileKeyBatch<DynI32, u32, DynZWeight>>(&factories, batches, seed);
});
}
#[test]
fn test_vec_val_batch_spine(batches in kvr_batches(100, 5, 2, 300, 20), seed in 0..u64::MAX) {
let factories = <OrdValBatchFactories<DynI32, DynI32, u32, DynZWeight>>::new::<i32, i32, ZWeight>();
Runtime::run(CircuitConfig::with_workers(1), move |_parker| {
test_val_batch_trace_spine::<OrdValBatch<DynI32, DynI32, u32, DynZWeight>>(&factories, batches, seed)
}).unwrap().join().unwrap();
}
#[test]
fn test_file_val_batch_spine(batches in kvr_batches(100, 5, 2, 100, 10), seed in 0..u64::MAX) {
run_in_circuit_with_storage(move || {
let factories =
<FileValBatchFactories<DynI32, DynI32, u32, DynZWeight>>::new::<i32, i32, ZWeight>();
test_val_batch_trace_spine::<FileValBatch<DynI32, DynI32, u32, DynZWeight>>(&factories, batches, seed);
});
}
#[test]
fn test_val_batch_spine_retain_even_values(batches in kvr_batches(100, 5, 2, 300, 20), seed in 0..u64::MAX) {
Runtime::run(CircuitConfig::with_workers(1), move |_parker| {
let factories = <OrdValBatchFactories<DynI32, DynI32, u32, DynZWeight>>::new::<i32, i32, ZWeight>();
let mut trace: Spine<OrdValBatch<DynI32, DynI32, u32, DynZWeight>> = Spine::new(&factories);
let mut ref_trace: TestBatch<DynI32, DynI32, u32, DynZWeight> = TestBatch::new(&TestBatchFactories::new());
trace.retain_values(GroupFilter::Simple(Filter::new(Box::new(
move |val: &DynI32| *val.downcast_checked::<i32>() % 2 == 0,
))));
ref_trace.retain_values(GroupFilter::Simple(Filter::new(Box::new(
move |val: &DynI32| *val.downcast_checked::<i32>() % 2 == 0,
))));
for (time, (tuples, _key_bound, _val_bound)) in batches.into_iter().enumerate() {
let mut erased_tuples = indexed_zset_tuples(tuples);
let batch = OrdValBatch::dyn_from_tuples(&factories, time as u32, &mut erased_tuples.clone());
let ref_batch = TestBatch::dyn_from_tuples(&TestBatchFactories::new(), time as u32, &mut erased_tuples);
assert_batch_eq(&batch, &ref_batch);
assert_batch_cursors_eq(batch.cursor(), &ref_batch, seed);
TOKIO.block_on(ref_trace.insert(ref_batch));
assert_batch_cursors_eq(CursorPair::new(&mut trace.cursor(), &mut batch.cursor()), &ref_trace, seed);
TOKIO.block_on(trace.insert(batch));
assert_trace_eq(&trace, &ref_trace);
assert_batch_cursors_eq(trace.cursor(), &ref_trace, seed);
}
}).unwrap().join().unwrap();
}
#[test]
fn test_val_batch_spine_retain_even_keys(batches in kvr_batches(100, 5, 2, 300, 10), seed in 0..u64::MAX) {
Runtime::run(CircuitConfig::with_workers(1), move |_parker| {
let factories = <OrdValBatchFactories<DynI32, DynI32, u32, DynZWeight>>::new::<i32, i32, ZWeight>();
let mut trace: Spine<OrdValBatch<DynI32, DynI32, u32, DynZWeight>> = Spine::new(&factories);
let mut ref_trace: TestBatch<DynI32, DynI32, u32, DynZWeight> = TestBatch::new(&TestBatchFactories::new());
trace.retain_keys(Filter::new(Box::new(move |key| *key.downcast_checked::<i32>() % 2 == 0)));
ref_trace.retain_keys(Filter::new(Box::new(move |key| *key.downcast_checked::<i32>() % 2 == 0)));
for (time, (tuples, _key_bound, _val_bound)) in batches.into_iter().enumerate() {
let mut erased_tuples = indexed_zset_tuples(tuples);
let batch = OrdValBatch::dyn_from_tuples(&factories, time as u32, &mut erased_tuples.clone());
let ref_batch = TestBatch::dyn_from_tuples(&TestBatchFactories::new(), time as u32, &mut erased_tuples);
assert_batch_eq(&batch, &ref_batch);
assert_batch_cursors_eq(batch.cursor(), &ref_batch, seed);
TOKIO.block_on(ref_trace.insert(ref_batch));
assert_batch_cursors_eq(CursorPair::new(&mut trace.cursor(), &mut batch.cursor()), &ref_trace, seed);
TOKIO.block_on(trace.insert(batch));
assert_trace_eq(&trace, &ref_trace);
assert_batch_cursors_eq(trace.cursor(), &ref_trace, seed);
}
}).unwrap().join().unwrap();
}
}
fn run_in_circuit_with_storage<F>(f: F)
where
F: FnOnce() + Clone + Send + 'static,
{
let _temp_dir = tempdir().expect("Can't create temp dir for storage");
run_in_circuit_with_storage_config(mkconfig(_temp_dir.path()), f);
}
fn run_in_circuit_with_storage_config<F>(config: CircuitConfig, f: F)
where
F: FnOnce() + Clone + Send + 'static,
{
let count = Arc::new(AtomicUsize::new(0));
Runtime::init_circuit(config, {
let count = count.clone();
move |_| {
count.fetch_add(1, Ordering::Relaxed);
f();
Ok(())
}
})
.unwrap();
assert_eq!(count.load(Ordering::Relaxed), 1);
}
fn total_cache_accesses(stats: CacheStats) -> u64 {
stats
.0
.iter()
.map(|(_, accesses)| accesses.iter().map(|(_, counts)| counts.count).sum::<u64>())
.sum()
}
fn build_file_wset_u32(keys: &[u32]) -> FileWSet<DynData, DynZWeight> {
let factories = <FileWSetFactories<DynData, DynZWeight>>::new::<u32, (), ZWeight>();
let mut builder =
<FileWSet<DynData, DynZWeight> as Batch>::Builder::with_capacity(&factories, keys.len(), 0);
for key in keys {
let weight: ZWeight = 1;
builder.push_time_diff(&(), weight.erase());
builder.push_key(key.erase());
}
builder.done()
}
fn build_file_wset_tup1_i32(keys: &[i32]) -> FileWSet<DynData, DynZWeight> {
let factories = <FileWSetFactories<DynData, DynZWeight>>::new::<Tup1<i32>, (), ZWeight>();
let mut builder =
<FileWSet<DynData, DynZWeight> as Batch>::Builder::with_capacity(&factories, keys.len(), 0);
for key in keys {
let weight: ZWeight = 1;
builder.push_time_diff(&(), weight.erase());
builder.push_key(Tup1(*key).erase());
}
builder.done()
}
fn build_fallback_wset_i32(keys: &[i32]) -> crate::trace::FallbackWSet<DynI32, DynZWeight> {
let factories =
<crate::trace::FallbackWSetFactories<DynI32, DynZWeight>>::new::<i32, (), ZWeight>();
let mut erased_tuples = zset_tuples(keys.iter().copied().map(|key| Tup2(key, 1)).collect());
crate::trace::FallbackWSet::<DynI32, DynZWeight>::dyn_from_tuples(
&factories,
(),
&mut erased_tuples,
)
}
#[test]
fn test_file_wset_roaring_u32_seek_key_exact_skips_absent_reads() {
let _temp_dir = tempdir().expect("Can't create temp dir for storage");
let mut config = mkconfig(_temp_dir.path());
config.dev_tweaks.enable_roaring = Some(true);
run_in_circuit_with_storage_config(config, move || {
let batch = build_file_wset_u32(&[1, 3, 7]);
let mut cursor = batch.cursor();
let before = total_cache_accesses(batch.cache_stats());
let missing = 2u32;
assert!(!cursor.seek_key_exact(missing.erase(), None));
assert_eq!(total_cache_accesses(batch.cache_stats()), before);
let present = 3u32;
assert!(cursor.seek_key_exact(present.erase(), None));
});
}
#[test]
fn test_file_wset_tup1_i32_roaring_seek_key_exact_skips_absent_reads() {
let _temp_dir = tempdir().expect("Can't create temp dir for storage");
let mut config = mkconfig(_temp_dir.path());
config.dev_tweaks.enable_roaring = Some(true);
run_in_circuit_with_storage_config(config, move || {
let batch = build_file_wset_tup1_i32(&[-7, 1, 3]);
let mut cursor = batch.cursor();
let before = total_cache_accesses(batch.cache_stats());
let missing = Tup1(2i32);
assert!(!cursor.seek_key_exact(missing.erase(), None));
assert_eq!(total_cache_accesses(batch.cache_stats()), before);
let present = Tup1(3i32);
assert!(cursor.seek_key_exact(present.erase(), None));
});
}
#[test]
fn test_file_wset_roaring_filter_rebuilt_after_merge() {
let _temp_dir = tempdir().expect("Can't create temp dir for storage");
let mut config = mkconfig(_temp_dir.path());
config.dev_tweaks.enable_roaring = Some(true);
run_in_circuit_with_storage_config(config, move || {
let lhs = build_file_wset_u32(&[1, 5]);
let rhs = build_file_wset_u32(&[3, 7]);
let merged = lhs.add_by_ref(&rhs);
let mut cursor = merged.cursor();
let before = total_cache_accesses(merged.cache_stats());
let missing = 4u32;
assert!(!cursor.seek_key_exact(missing.erase(), None));
assert_eq!(total_cache_accesses(merged.cache_stats()), before);
let present = 7u32;
assert!(cursor.seek_key_exact(present.erase(), None));
});
}
#[test]
fn test_fallback_wset_roaring_filter_rebuilt_after_storage_merge() {
let _temp_dir = tempdir().expect("Can't create temp dir for storage");
let mut config = mkconfig(_temp_dir.path());
config.dev_tweaks.enable_roaring = Some(true);
config.storage.as_mut().unwrap().options.min_storage_bytes = Some(0);
run_in_circuit_with_storage_config(config, move || {
let lhs = build_fallback_wset_i32(&[1, 5]);
let rhs = build_fallback_wset_i32(&[3, 7]);
let factories =
<crate::trace::FallbackWSetFactories<DynI32, DynZWeight>>::new::<i32, (), ZWeight>();
let merged: crate::trace::FallbackWSet<DynI32, DynZWeight> = ListMerger::merge(
&factories,
<crate::trace::FallbackWSet<DynI32, DynZWeight> as Batch>::Builder::for_merge(
&factories,
[&lhs, &rhs],
Some(BatchLocation::Storage),
),
vec![lhs.merge_cursor(None, None), rhs.merge_cursor(None, None)],
);
assert_eq!(merged.membership_filter_kind(), FilterKind::Roaring);
let mut cursor = merged.cursor();
let before = total_cache_accesses(merged.cache_stats());
let missing = 4i32;
assert!(!cursor.seek_key_exact(missing.erase(), None));
assert_eq!(total_cache_accesses(merged.cache_stats()), before);
});
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(1000))]
#[test]
fn test_vec_val_batch_builder(batch in kvtr_batch(5, 5, 5, 5, 20), seed in 0..u64::MAX) {
let factories = <OrdValBatchFactories<DynI32, DynI32, u32, DynZWeight>>::new::<i32, i32, ZWeight>();
test_val_batch_trace_builder::<OrdValBatch<DynI32, DynI32, u32, DynZWeight>>(&factories, batch, seed)
}
#[test]
fn test_file_val_batch_builder(batch in kvtr_batch(5, 5, 5, 5, 20), seed in 0..u64::MAX) {
run_in_circuit_with_storage(move || {
let factories = <FileValBatchFactories<DynI32, DynI32, u32, DynZWeight>>::new::<i32, i32, ZWeight>();
test_val_batch_trace_builder::<FileValBatch<DynI32, DynI32, u32, DynZWeight>>(&factories, batch, seed);
});
}
#[test]
fn test_vec_key_batch_builder(batch in ktr_batch(5, 5, 5, 20), seed in 0..u64::MAX) {
let factories = <OrdKeyBatchFactories<DynI32, u32, DynZWeight>>::new::<i32, (), ZWeight>();
test_key_batch_builder::<OrdKeyBatch<DynI32, u32, DynZWeight>>(&factories, batch, seed)
}
#[test]
fn test_file_key_batch_builder(batch in ktr_batch(5, 5, 5, 20), seed in 0..u64::MAX) {
run_in_circuit_with_storage(move || {
let factories = <FileKeyBatchFactories<DynI32, u32, DynZWeight>>::new::<i32, (), ZWeight>();
test_key_batch_builder::<FileKeyBatch<DynI32, u32, DynZWeight>>(&factories, batch, seed);
});
}
}