#![allow(clippy::type_complexity)]
use std::cmp::Ordering;
use crate::{
DBData, DynZWeight, RootCircuit, Runtime, ZWeight,
circuit::CircuitConfig,
dynamic::{DowncastTrait, DynData, DynPair},
indexed_zset,
operator::{CmpFunc, IndexedZSetHandle, OutputHandle},
trace::{
Cursor, SpineSnapshot as DynSpineSnapshot, Trace,
test::test_batch::{TestBatch, TestBatchFactories, assert_batch_eq, assert_typed_batch_eq},
},
typed_batch::{
BatchReader, DynBatchReader, DynOrdIndexedZSet, OrdIndexedZSet, SpineSnapshot, TypedBatch,
},
utils::{Tup2, Tup3, Tup4},
};
use anyhow::Result as AnyResult;
use proptest::{collection::vec, prelude::*};
fn input_trace(
max_key: i32,
max_val: i32,
max_batch_size: usize,
max_batches: usize,
) -> impl Strategy<Value = Vec<Vec<(i32, i32, ZWeight)>>> {
vec(
vec((0..max_key, 0..max_val, -1..2i64), 0..max_batch_size),
0..max_batches,
)
}
impl TestBatch<DynData, DynData, (), DynZWeight> {
fn topk_asc<K, V>(&self, k: usize) -> Self
where
K: DBData,
V: DBData,
{
let mut result: Vec<((K, V, ()), ZWeight)> = Vec::new();
let mut cursor = self.cursor();
while cursor.key_valid() {
let mut count = 0;
while cursor.val_valid() && count < k {
let w = *cursor.weight().downcast_checked::<ZWeight>();
result.push((
(
cursor.key().downcast_checked::<K>().clone(),
cursor.val().downcast_checked::<V>().clone(),
(),
),
w,
));
count += 1;
cursor.step_val();
}
cursor.step_key();
}
TestBatch::from_typed_data(result.as_slice())
}
fn topk_desc<K, V>(&self, k: usize) -> Self
where
K: DBData,
V: DBData,
{
let mut result: Vec<((K, V, ()), ZWeight)> = Vec::new();
let mut cursor = self.cursor();
while cursor.key_valid() {
let mut count = 0;
cursor.fast_forward_vals();
while cursor.val_valid() && count < k {
let w = *cursor.weight().downcast_checked::<ZWeight>();
result.push((
(
cursor.key().downcast_checked::<K>().clone(),
cursor.val().downcast_checked::<V>().clone(),
(),
),
w,
));
count += 1;
cursor.step_val_reverse();
}
cursor.step_key();
}
TestBatch::from_typed_data(result.as_slice())
}
fn lag<K, V>(
&self,
lag: usize,
) -> TestBatch<DynData, DynPair<DynData, DynData> , (), DynZWeight>
where
K: DBData,
V: DBData,
{
let mut result = Vec::new();
let mut cursor = self.cursor();
while cursor.key_valid() {
let mut vals = Vec::new();
while cursor.val_valid() {
let w = *cursor.weight().downcast_checked::<ZWeight>();
for _i in 0..w {
vals.push(cursor.val().downcast_checked::<V>().clone());
}
cursor.step_val();
}
for i in 0..vals.len() {
let v = vals[i].clone();
let old_v = if i >= lag {
Some(vals[i - lag].clone())
} else {
None
};
result.push((
(
cursor.key().downcast_checked::<K>().clone(),
Tup2(v, old_v),
(),
),
1,
));
}
cursor.step_key();
}
TestBatch::from_typed_data(result.as_slice())
}
fn lead<K, V>(
&self,
lag: usize,
) -> TestBatch<DynData, DynPair<DynData, DynData> , (), DynZWeight>
where
K: DBData,
V: DBData,
{
let mut result = Vec::new();
let mut cursor = self.cursor();
while cursor.key_valid() {
let mut vals = Vec::new();
while cursor.val_valid() {
let w = *cursor.weight().downcast_checked::<ZWeight>();
for _i in 0..w {
vals.push(cursor.val().downcast_checked::<V>().clone());
}
cursor.step_val();
}
for i in 0..vals.len() {
let v = vals[i].clone();
let old_v = if vals.len() - i > lag {
Some(vals[i + lag].clone())
} else {
None
};
result.push((
(
cursor.key().downcast_checked::<K>().clone(),
Tup2(v, old_v),
(),
),
1,
));
}
cursor.step_key();
}
TestBatch::from_typed_data(result.as_slice())
}
}
fn topk_test_circuit(
circuit: &mut RootCircuit,
) -> AnyResult<(
IndexedZSetHandle<i32, i32>,
OutputHandle<SpineSnapshot<OrdIndexedZSet<i32, i32>>>,
OutputHandle<SpineSnapshot<OrdIndexedZSet<i32, i32>>>,
)> {
let (input_stream, input_handle) = circuit.add_input_indexed_zset::<i32, i32>();
let topk_asc_handle = input_stream
.topk_asc(5)
.accumulate_integrate()
.accumulate_output();
let topk_desc_handle = input_stream
.topk_desc(5)
.accumulate_integrate()
.accumulate_output();
Ok((input_handle, topk_asc_handle, topk_desc_handle))
}
fn topk_custom_ord_test_circuit(
circuit: &mut RootCircuit,
) -> AnyResult<(
IndexedZSetHandle<i32, Tup3<String, i32, i32>>,
OutputHandle<OrdIndexedZSet<i32, Tup3<String, i32, i32>>>,
OutputHandle<OrdIndexedZSet<i32, Tup4<i64, String, i32, i32>>>,
OutputHandle<OrdIndexedZSet<i32, Tup4<i64, String, i32, i32>>>,
OutputHandle<OrdIndexedZSet<i32, Tup4<i64, String, i32, i32>>>,
)> {
let (input_stream, input_handle) =
circuit.add_input_indexed_zset::<i32, Tup3<String, i32, i32>>();
struct AscDesc;
impl CmpFunc<Tup3<String, i32, i32>> for AscDesc {
fn cmp(
left: &Tup3<String, i32, i32>,
right: &Tup3<String, i32, i32>,
) -> std::cmp::Ordering {
let ord = left.1.cmp(&right.1);
if ord == Ordering::Equal {
let ord = right.2.cmp(&left.2);
if ord == Ordering::Equal {
left.0.cmp(&right.0)
} else {
ord
}
} else {
ord
}
}
}
let topk_handle = input_stream
.topk_custom_order::<AscDesc>(5)
.integrate()
.output();
let topk_rank_handle = input_stream
.topk_rank_custom_order::<AscDesc, _, _, _>(
5,
|Tup3(_, x1, y1), Tup3(_, x2, y2)| x1 == x2 && y1 == y2,
|rank, Tup3(s, x, y)| Tup4(rank, s.clone(), *x, *y),
)
.integrate()
.output();
let topk_dense_rank_handle = input_stream
.topk_dense_rank_custom_order::<AscDesc, _, _, _>(
5,
|Tup3(_, x1, y1), Tup3(_, x2, y2)| x1 == x2 && y1 == y2,
|rank, Tup3(s, x, y)| Tup4(rank, s.clone(), *x, *y),
)
.integrate()
.output();
let topk_row_number_handle = input_stream
.topk_row_number_custom_order::<AscDesc, _, _>(5, |rank, Tup3(s, x, y)| {
Tup4(rank, s.clone(), *x, *y)
})
.integrate()
.output();
Ok((
input_handle,
topk_handle,
topk_rank_handle,
topk_dense_rank_handle,
topk_row_number_handle,
))
}
fn lag_test_circuit(
circuit: &mut RootCircuit,
) -> AnyResult<(
IndexedZSetHandle<i32, i32>,
OutputHandle<
TypedBatch<
i32,
Tup2<i32, Option<i32>>,
ZWeight,
DynSpineSnapshot<
DynOrdIndexedZSet<
DynData,
DynPair<DynData , DynData >,
>,
>,
>,
>,
)> {
let (input_stream, input_handle) = circuit.add_input_indexed_zset::<i32, i32>();
let lag_handle = input_stream
.lag(3, |v| v.cloned())
.accumulate_integrate()
.accumulate_output();
Ok((input_handle, lag_handle))
}
fn lead_test_circuit(
circuit: &mut RootCircuit,
) -> AnyResult<(
IndexedZSetHandle<i32, i32>,
OutputHandle<
TypedBatch<
i32,
Tup2<i32, Option<i32>>,
ZWeight,
DynSpineSnapshot<
DynOrdIndexedZSet<
DynData,
DynPair<DynData , DynData >,
>,
>,
>,
>,
)> {
let (input_stream, input_handle) = circuit.add_input_indexed_zset::<i32, i32>();
let lead_handle = input_stream
.lag(-3, |v| v.cloned())
.accumulate_integrate()
.accumulate_output();
Ok((input_handle, lead_handle))
}
fn lag_custom_order_test_circuit(
circuit: &mut RootCircuit,
) -> AnyResult<(
IndexedZSetHandle<i32, Tup2<i32, String>>,
OutputHandle<SpineSnapshot<OrdIndexedZSet<i32, Tup3<i32, String, Option<Tup2<i32, String>>>>>>,
)> {
struct AscDesc;
impl CmpFunc<Tup2<i32, String>> for AscDesc {
fn cmp(left: &Tup2<i32, String>, right: &Tup2<i32, String>) -> std::cmp::Ordering {
let ord = left.0.cmp(&right.0);
if ord == Ordering::Equal {
right.1.cmp(&left.1)
} else {
ord
}
}
}
let (input_stream, input_handle) = circuit.add_input_indexed_zset::<i32, Tup2<i32, String>>();
let lag_handle = input_stream
.lag_custom_order::<_, _, _, AscDesc, _>(
3,
|v| v.cloned(),
|v, vl| Tup3(v.0, v.1.clone(), vl.clone()),
)
.accumulate_integrate()
.accumulate_output();
Ok((input_handle, lag_handle))
}
fn lead_test(trace: Vec<Vec<(i32, i32, ZWeight)>>, transaction: bool) {
let (mut dbsp, (input_handle, lead_handle)) = Runtime::init_circuit(
CircuitConfig::from(4).with_splitter_chunk_size_records(2),
lead_test_circuit,
)
.unwrap();
let mut ref_trace = TestBatch::new(&TestBatchFactories::new());
if transaction {
dbsp.start_transaction().unwrap();
for batch in trace.into_iter() {
let records = batch
.iter()
.map(|(k, v, r)| ((*k, *v, ()), *r))
.collect::<Vec<_>>();
let ref_batch = TestBatch::from_typed_data(&records);
ref_trace.insert(ref_batch);
for (k, v, r) in batch.into_iter() {
input_handle.push(k, (v, r));
}
dbsp.step().unwrap();
}
dbsp.commit_transaction().unwrap();
let lead_result = lead_handle.concat().consolidate();
let ref_lead = ref_trace.lead::<i32, i32>(3);
assert_batch_eq(lead_result.inner(), &ref_lead);
} else {
for batch in trace.into_iter() {
let records = batch
.iter()
.map(|(k, v, r)| ((*k, *v, ()), *r))
.collect::<Vec<_>>();
let ref_batch = TestBatch::from_typed_data(&records);
ref_trace.insert(ref_batch);
for (k, v, r) in batch.into_iter() {
input_handle.push(k, (v, r));
}
dbsp.transaction().unwrap();
let lead_result = lead_handle.concat().consolidate();
let ref_lead = ref_trace.lead::<i32, i32>(3);
assert_batch_eq(lead_result.inner(), &ref_lead);
}
}
}
fn lag_test(trace: Vec<Vec<(i32, i32, ZWeight)>>, transaction: bool) {
let (mut dbsp, (input_handle, lag_handle)) = Runtime::init_circuit(
CircuitConfig::from(4).with_splitter_chunk_size_records(2),
lag_test_circuit,
)
.unwrap();
let mut ref_trace = TestBatch::new(&TestBatchFactories::new());
if transaction {
dbsp.start_transaction().unwrap();
for batch in trace.into_iter() {
let records = batch
.iter()
.map(|(k, v, r)| ((*k, *v, ()), *r))
.collect::<Vec<_>>();
let ref_batch = TestBatch::from_typed_data(&records);
ref_trace.insert(ref_batch);
for (k, v, r) in batch.into_iter() {
input_handle.push(k, (v, r));
}
dbsp.step().unwrap();
}
dbsp.commit_transaction().unwrap();
let lag_result = lag_handle.concat().consolidate();
let ref_lag = ref_trace.lag::<i32, i32>(3);
assert_batch_eq(lag_result.inner(), &ref_lag);
} else {
for batch in trace.into_iter() {
let records = batch
.iter()
.map(|(k, v, r)| ((*k, *v, ()), *r))
.collect::<Vec<_>>();
let ref_batch = TestBatch::from_typed_data(&records);
ref_trace.insert(ref_batch);
for (k, v, r) in batch.into_iter() {
input_handle.push(k, (v, r));
}
dbsp.transaction().unwrap();
let lag_result = SpineSnapshot::<TypedBatch<i32, Tup2<i32, Option<i32>>, _, _>>::concat(
&lag_handle.take_from_all(),
);
let ref_lag = ref_trace.lag::<i32, i32>(3);
assert_batch_eq(lag_result.inner(), &ref_lag);
}
}
}
#[test]
fn test_lead_regressions() {
let trace = vec![vec![(0, 0, 1), (0, 73, -1), (0, 1, 1)], vec![(0, 0, 1)]];
lead_test(trace, false);
}
#[test]
fn test_lag_regressions() {
let traces = vec![
vec![vec![(2, 64, -1), (2, 0, 1)]],
vec![vec![(0, 0, -1)]],
vec![vec![(4, 0, 4), (4, 4, 1)]],
vec![vec![(4, 0, 4), (4, 69, 2)]],
vec![
vec![(0, 87, 1)],
vec![(0, 84, 1), (0, 87, 1), (0, 88, 1)],
vec![(0, 0, -1)],
],
];
for trace in traces {
lag_test(trace, false);
}
}
#[test]
fn test_topk_custom_ord() {
let (
mut dbsp,
(
input_handle,
topk_handle,
topk_rank_handle,
topk_dense_rank_handle,
topk_row_number_handle,
),
) = Runtime::init_circuit(4, topk_custom_ord_test_circuit).unwrap();
let trace = vec![
vec![
(1, Tup3("foo".to_string(), 10, 100), 1),
(1, Tup3("foo".to_string(), 9, 99), 1),
(1, Tup3("foo".to_string(), 8, 98), 1),
(1, Tup3("foo".to_string(), 10, 90), 1),
(1, Tup3("foo".to_string(), 9, 98), 1),
(1, Tup3("foo".to_string(), 8, 97), 1),
],
vec![
(1, Tup3("foo".to_string(), 10, 80), 1),
(1, Tup3("foo".to_string(), 9, 97), 1),
(1, Tup3("foo".to_string(), 8, 96), 1),
(1, Tup3("foo".to_string(), 10, 79), 1),
(1, Tup3("foo".to_string(), 9, 96), 1),
(1, Tup3("foo".to_string(), 8, 95), 1),
],
vec![
(1, Tup3("foo".to_string(), 9, 99), -1),
(1, Tup3("foo".to_string(), 8, 98), -1),
(1, Tup3("foo".to_string(), 9, 98), -1),
(1, Tup3("foo".to_string(), 8, 97), -1),
],
vec![(1, Tup3("bar".to_string(), 8, 96), 1)],
vec![(1, Tup3("foo".to_string(), 7, 96), 1)],
vec![
(1, Tup3("baz".to_string(), 8, 96), 1),
(1, Tup3("buzz".to_string(), 8, 96), 1),
(1, Tup3("foobar".to_string(), 8, 96), 1),
(1, Tup3("fubar".to_string(), 8, 96), 1),
],
vec![
(1, Tup3("foo".to_string(), 7, 96), 1),
(1, Tup3("baz".to_string(), 8, 96), 1),
],
];
let mut expected_output = vec![indexed_zset! {
1 => {Tup3("foo".to_string(), 8, 98) => 1, Tup3("foo".to_string(), 8, 97) => 1, Tup3("foo".to_string(), 9, 99) => 1, Tup3("foo".to_string(), 9, 98) => 1, Tup3("foo".to_string(), 10, 100) => 1},
},
indexed_zset! {
1 => {Tup3("foo".to_string(), 8, 98) => 1, Tup3("foo".to_string(), 8, 97) => 1, Tup3("foo".to_string(), 8, 96) => 1, Tup3("foo".to_string(), 8, 95) => 1, Tup3("foo".to_string(), 9, 99) => 1},
},
indexed_zset! {
1 => {Tup3("foo".to_string(), 8, 96) => 1, Tup3("foo".to_string(), 8, 95) => 1, Tup3("foo".to_string(), 9, 97) => 1, Tup3("foo".to_string(), 9, 96) => 1, Tup3("foo".to_string(), 10, 100) => 1},
},
indexed_zset! {
1 => {Tup3("bar".to_string(), 8, 96) => 1, Tup3("foo".to_string(), 8, 96) => 1, Tup3("foo".to_string(), 8, 95) => 1, Tup3("foo".to_string(), 9, 97) => 1, Tup3("foo".to_string(), 9, 96) => 1}
},
indexed_zset! {
1 => {Tup3("foo".to_string(), 7, 96) => 1, Tup3("bar".to_string(), 8, 96) => 1, Tup3("foo".to_string(), 8, 96) => 1, Tup3("foo".to_string(), 8, 95) => 1, Tup3("foo".to_string(), 9, 97) => 1},
},
indexed_zset! {
1 => {Tup3("foo".to_string(), 7, 96) => 1, Tup3("bar".to_string(), 8, 96) => 1, Tup3("baz".to_string(), 8, 96) => 1, Tup3("buzz".to_string(), 8, 96) => 1, Tup3("foo".to_string(), 8, 96) => 1},
},
indexed_zset! {
1 => {Tup3("foo".to_string(), 7, 96) => 2, Tup3("bar".to_string(), 8, 96) => 1, Tup3("baz".to_string(), 8, 96) => 2, Tup3("buzz".to_string(), 8, 96) => 1, Tup3("foo".to_string(), 8, 96) => 1},
}]
.into_iter();
let mut expected_ranked_output = vec![indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 8, 98) => 1, Tup4(2, "foo".to_string(), 8, 97) => 1, Tup4(3, "foo".to_string(), 9, 99) => 1, Tup4(4, "foo".to_string(), 9, 98) => 1, Tup4(5, "foo".to_string(), 10, 100) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 8, 98) => 1, Tup4(2, "foo".to_string(), 8, 97) => 1, Tup4(3, "foo".to_string(), 8, 96) => 1, Tup4(4, "foo".to_string(), 8, 95) => 1, Tup4(5, "foo".to_string(), 9, 99) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 8, 96) => 1, Tup4(2, "foo".to_string(), 8, 95) => 1, Tup4(3, "foo".to_string(), 9, 97) => 1, Tup4(4, "foo".to_string(), 9, 96) => 1, Tup4(5, "foo".to_string(), 10, 100) => 1},
},
indexed_zset! {
1 => {Tup4(1, "bar".to_string(), 8, 96) => 1, Tup4(1, "foo".to_string(), 8, 96) => 1, Tup4(3, "foo".to_string(), 8, 95) => 1, Tup4(4, "foo".to_string(), 9, 97) => 1, Tup4(5, "foo".to_string(), 9, 96) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 7, 96) => 1, Tup4(2, "bar".to_string(), 8, 96) => 1, Tup4(2, "foo".to_string(), 8, 96) => 1, Tup4(4, "foo".to_string(), 8, 95) => 1, Tup4(5, "foo".to_string(), 9, 97) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 7, 96) => 1, Tup4(2, "bar".to_string(), 8, 96) => 1, Tup4(2, "baz".to_string(), 8, 96) => 1, Tup4(2, "buzz".to_string(), 8, 96) => 1, Tup4(2, "foo".to_string(), 8, 96) => 1, Tup4(2, "foobar".to_string(), 8, 96) => 1, Tup4(2, "fubar".to_string(), 8, 96) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 7, 96) => 2, Tup4(3, "bar".to_string(), 8, 96) => 1, Tup4(3, "baz".to_string(), 8, 96) => 2, Tup4(3, "buzz".to_string(), 8, 96) => 1, Tup4(3, "foo".to_string(), 8, 96) => 1, Tup4(3, "foobar".to_string(), 8, 96) => 1, Tup4(3, "fubar".to_string(), 8, 96) => 1},
}]
.into_iter();
let mut expected_dense_ranked_output = vec![indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 8, 98) => 1, Tup4(2, "foo".to_string(), 8, 97) => 1, Tup4(3, "foo".to_string(), 9, 99) => 1, Tup4(4, "foo".to_string(), 9, 98) => 1, Tup4(5, "foo".to_string(), 10, 100) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 8, 98) => 1, Tup4(2, "foo".to_string(), 8, 97) => 1, Tup4(3, "foo".to_string(), 8, 96) => 1, Tup4(4, "foo".to_string(), 8, 95) => 1, Tup4(5, "foo".to_string(), 9, 99) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 8, 96) => 1, Tup4(2, "foo".to_string(), 8, 95) => 1, Tup4(3, "foo".to_string(), 9, 97) => 1, Tup4(4, "foo".to_string(), 9, 96) => 1, Tup4(5, "foo".to_string(), 10, 100) => 1},
},
indexed_zset! {
1 => {Tup4(1, "bar".to_string(), 8, 96) => 1, Tup4(1, "foo".to_string(), 8, 96) => 1, Tup4(2, "foo".to_string(), 8, 95) => 1, Tup4(3, "foo".to_string(), 9, 97) => 1, Tup4(4, "foo".to_string(), 9, 96) => 1, Tup4(5, "foo".to_string(), 10, 100) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 7, 96) => 1, Tup4(2, "bar".to_string(), 8, 96) => 1, Tup4(2, "foo".to_string(), 8, 96) => 1, Tup4(3, "foo".to_string(), 8, 95) => 1, Tup4(4, "foo".to_string(), 9, 97) => 1, Tup4(5, "foo".to_string(), 9, 96) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 7, 96) => 1, Tup4(2, "bar".to_string(), 8, 96) => 1, Tup4(2, "baz".to_string(), 8, 96) => 1, Tup4(2, "buzz".to_string(), 8, 96) => 1, Tup4(2, "foo".to_string(), 8, 96) => 1, Tup4(2, "foobar".to_string(), 8, 96) => 1, Tup4(2, "fubar".to_string(), 8, 96) => 1, Tup4(3, "foo".to_string(), 8, 95) => 1, Tup4(4, "foo".to_string(), 9, 97) => 1, Tup4(5, "foo".to_string(), 9, 96) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 7, 96) => 2, Tup4(2, "bar".to_string(), 8, 96) => 1, Tup4(2, "baz".to_string(), 8, 96) => 2, Tup4(2, "buzz".to_string(), 8, 96) => 1, Tup4(2, "foo".to_string(), 8, 96) => 1, Tup4(2, "foobar".to_string(), 8, 96) => 1, Tup4(2, "fubar".to_string(), 8, 96) => 1, Tup4(3, "foo".to_string(), 8, 95) => 1, Tup4(4, "foo".to_string(), 9, 97) => 1, Tup4(5, "foo".to_string(), 9, 96) => 1},
}]
.into_iter();
let mut expected_row_number_output = vec![indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 8, 98) => 1, Tup4(2, "foo".to_string(), 8, 97) => 1, Tup4(3, "foo".to_string(), 9, 99) => 1, Tup4(4, "foo".to_string(), 9, 98) => 1, Tup4(5, "foo".to_string(), 10, 100) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 8, 98) => 1, Tup4(2, "foo".to_string(), 8, 97) => 1, Tup4(3, "foo".to_string(), 8, 96) => 1, Tup4(4, "foo".to_string(), 8, 95) => 1, Tup4(5, "foo".to_string(), 9, 99) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 8, 96) => 1, Tup4(2, "foo".to_string(), 8, 95) => 1, Tup4(3, "foo".to_string(), 9, 97) => 1, Tup4(4, "foo".to_string(), 9, 96) => 1, Tup4(5, "foo".to_string(), 10, 100) => 1},
},
indexed_zset! {
1 => {Tup4(1, "bar".to_string(), 8, 96) => 1, Tup4(2, "foo".to_string(), 8, 96) => 1, Tup4(3, "foo".to_string(), 8, 95) => 1, Tup4(4, "foo".to_string(), 9, 97) => 1, Tup4(5, "foo".to_string(), 9, 96) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 7, 96) => 1, Tup4(2, "bar".to_string(), 8, 96) => 1, Tup4(3, "foo".to_string(), 8, 96) => 1, Tup4(4, "foo".to_string(), 8, 95) => 1, Tup4(5, "foo".to_string(), 9, 97) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 7, 96) => 1, Tup4(2, "bar".to_string(), 8, 96) => 1, Tup4(3, "baz".to_string(), 8, 96) => 1, Tup4(4, "buzz".to_string(), 8, 96) => 1, Tup4(5, "foo".to_string(), 8, 96) => 1},
},
indexed_zset! {
1 => {Tup4(1, "foo".to_string(), 7, 96) => 1, Tup4(2, "foo".to_string(), 7, 96) => 1, Tup4(3, "bar".to_string(), 8, 96) => 1, Tup4(4, "baz".to_string(), 8, 96) => 1, Tup4(5, "baz".to_string(), 8, 96) => 1},
}]
.into_iter();
for batch in trace.into_iter() {
for (k, v, r) in batch.into_iter() {
input_handle.push(k, (v, r));
}
dbsp.transaction().unwrap();
let topk_result = topk_handle.consolidate();
assert_typed_batch_eq(&topk_result, &expected_output.next().unwrap());
let topk_rank_result = topk_rank_handle.consolidate();
assert_typed_batch_eq(&topk_rank_result, &expected_ranked_output.next().unwrap());
let topk_dense_rank_result = topk_dense_rank_handle.consolidate();
assert_typed_batch_eq(
&topk_dense_rank_result,
&expected_dense_ranked_output.next().unwrap(),
);
let topk_row_number_result = topk_row_number_handle.consolidate();
assert_typed_batch_eq(
&topk_row_number_result,
&expected_row_number_output.next().unwrap(),
);
}
}
#[test]
fn test_lag_custom_ord_small_step() {
test_lag_custom_ord(false)
}
#[test]
fn test_lag_custom_ord_big_step() {
test_lag_custom_ord(true)
}
fn test_lag_custom_ord(transaction: bool) {
let (mut dbsp, (input_handle, lag_handle)) = Runtime::init_circuit(
CircuitConfig::from(4).with_splitter_chunk_size_records(2),
lag_custom_order_test_circuit,
)
.unwrap();
let trace = [
vec![
(1, Tup2(1, "f".to_string()), 1),
(1, Tup2(1, "e".to_string()), 1),
(1, Tup2(1, "d".to_string()), 1),
(1, Tup2(1, "c".to_string()), 1),
(1, Tup2(1, "b".to_string()), 1),
(1, Tup2(1, "a".to_string()), 1),
(1, Tup2(2, "d".to_string()), 1),
(1, Tup2(2, "c".to_string()), 1),
(1, Tup2(2, "b".to_string()), 1),
(1, Tup2(2, "a".to_string()), 1),
],
vec![
(1, Tup2(2, "e".to_string()), 1),
(1, Tup2(2, "d".to_string()), 1),
(1, Tup2(3, "d".to_string()), 1),
(1, Tup2(3, "c".to_string()), 1),
(1, Tup2(3, "b".to_string()), 1),
(1, Tup2(3, "a".to_string()), 1),
],
vec![
(1, Tup2(1, "f".to_string()), -1),
(1, Tup2(1, "d".to_string()), -1),
(1, Tup2(1, "b".to_string()), -1),
(1, Tup2(2, "d".to_string()), -1),
(1, Tup2(2, "b".to_string()), -1),
],
];
let expected_output: Vec<OrdIndexedZSet<i32, Tup3<i32, String, Option<Tup2<i32, String>>>>> = vec![
indexed_zset! {
1i32 => { Tup3(1i32, "f".to_string(), None) => 1
, Tup3(1i32, "e".to_string(), None) => 1
, Tup3(1i32, "d".to_string(), None) => 1
, Tup3(1i32, "c".to_string(), Some(Tup2(1, "f".to_string()))) => 1
, Tup3(1i32, "b".to_string(), Some(Tup2(1, "e".to_string()))) => 1
, Tup3(1i32, "a".to_string(), Some(Tup2(1, "d".to_string()))) => 1
, Tup3(2i32, "d".to_string(), Some(Tup2(1, "c".to_string()))) => 1
, Tup3(2i32, "c".to_string(), Some(Tup2(1, "b".to_string()))) => 1
, Tup3(2i32, "b".to_string(), Some(Tup2(1, "a".to_string()))) => 1
, Tup3(2i32, "a".to_string(), Some(Tup2(2, "d".to_string()))) => 1
}
},
indexed_zset! {
1i32 => { Tup3(1i32, "f".to_string(), None) => 1
, Tup3(1i32, "e".to_string(), None) => 1
, Tup3(1i32, "d".to_string(), None) => 1
, Tup3(1i32, "c".to_string(), Some(Tup2(1, "f".to_string()))) => 1
, Tup3(1i32, "b".to_string(), Some(Tup2(1, "e".to_string()))) => 1
, Tup3(1i32, "a".to_string(), Some(Tup2(1, "d".to_string()))) => 1
, Tup3(2i32, "e".to_string(), Some(Tup2(1, "c".to_string()))) => 1
, Tup3(2i32, "d".to_string(), Some(Tup2(1, "b".to_string()))) => 1
, Tup3(2i32, "d".to_string(), Some(Tup2(1, "a".to_string()))) => 1
, Tup3(2i32, "c".to_string(), Some(Tup2(2, "e".to_string()))) => 1
, Tup3(2i32, "b".to_string(), Some(Tup2(2, "d".to_string()))) => 1
, Tup3(2i32, "a".to_string(), Some(Tup2(2, "d".to_string()))) => 1
, Tup3(3i32, "d".to_string(), Some(Tup2(2, "c".to_string()))) => 1
, Tup3(3i32, "c".to_string(), Some(Tup2(2, "b".to_string()))) => 1
, Tup3(3i32, "b".to_string(), Some(Tup2(2, "a".to_string()))) => 1
, Tup3(3i32, "a".to_string(), Some(Tup2(3, "d".to_string()))) => 1
}
},
indexed_zset! {
1i32 => { Tup3(1i32, "e".to_string(), None) => 1
, Tup3(1i32, "c".to_string(), None) => 1
, Tup3(1i32, "a".to_string(), None) => 1
, Tup3(2i32, "e".to_string(), Some(Tup2(1, "e".to_string()))) => 1
, Tup3(2i32, "d".to_string(), Some(Tup2(1, "c".to_string()))) => 1
, Tup3(2i32, "c".to_string(), Some(Tup2(1, "a".to_string()))) => 1
, Tup3(2i32, "a".to_string(), Some(Tup2(2, "e".to_string()))) => 1
, Tup3(3i32, "d".to_string(), Some(Tup2(2, "d".to_string()))) => 1
, Tup3(3i32, "c".to_string(), Some(Tup2(2, "c".to_string()))) => 1
, Tup3(3i32, "b".to_string(), Some(Tup2(2, "a".to_string()))) => 1
, Tup3(3i32, "a".to_string(), Some(Tup2(3, "d".to_string()))) => 1
}
},
];
if transaction {
dbsp.start_transaction().unwrap();
for i in trace {
println!("step");
let batch = i.clone();
for (k, v, r) in batch.into_iter() {
input_handle.push(k, (v, r));
}
dbsp.step().unwrap();
}
dbsp.commit_transaction().unwrap();
let topk_result = lag_handle.concat().consolidate();
assert_typed_batch_eq(&topk_result, &expected_output.last().unwrap().clone());
} else {
for i in 0..trace.len() {
println!("step");
let batch = trace[i].clone();
for (k, v, r) in batch.into_iter() {
input_handle.push(k, (v, r));
}
dbsp.transaction().unwrap();
let topk_result = lag_handle.concat().consolidate();
assert_typed_batch_eq(&topk_result, &expected_output[i]);
}
}
}
fn test_topk(trace: Vec<Vec<(i32, i32, ZWeight)>>, transaction: bool) {
let (mut dbsp, (input_handle, topk_asc_handle, topk_desc_handle)) = Runtime::init_circuit(
CircuitConfig::from(4).with_splitter_chunk_size_records(2),
topk_test_circuit,
)
.unwrap();
let mut ref_trace = TestBatch::new(&TestBatchFactories::new());
if transaction {
dbsp.start_transaction().unwrap();
for batch in trace.into_iter() {
let records = batch
.iter()
.map(|(k, v, r)| ((*k, *v, ()), *r))
.collect::<Vec<_>>();
let ref_batch = TestBatch::from_typed_data(&records);
ref_trace.insert(ref_batch);
for (k, v, r) in batch.into_iter() {
input_handle.push(k, (v, r));
}
dbsp.step().unwrap();
}
dbsp.commit_transaction().unwrap();
let topk_asc_result = topk_asc_handle.concat().consolidate();
let topk_desc_result = topk_desc_handle.concat().consolidate();
let ref_topk_asc = ref_trace.topk_asc::<i32, i32>(5);
let ref_topk_desc = ref_trace.topk_desc::<i32, i32>(5);
assert_batch_eq(topk_asc_result.inner(), &ref_topk_asc);
assert_batch_eq(topk_desc_result.inner(), &ref_topk_desc);
} else {
for batch in trace.into_iter() {
let records = batch
.iter()
.map(|(k, v, r)| ((*k, *v, ()), *r))
.collect::<Vec<_>>();
let ref_batch = TestBatch::from_typed_data(&records);
ref_trace.insert(ref_batch);
for (k, v, r) in batch.into_iter() {
input_handle.push(k, (v, r));
}
dbsp.transaction().unwrap();
let topk_asc_result = topk_asc_handle.concat().consolidate();
let topk_desc_result = topk_desc_handle.concat().consolidate();
let ref_topk_asc = ref_trace.topk_asc::<i32, i32>(5);
let ref_topk_desc = ref_trace.topk_desc::<i32, i32>(5);
assert_batch_eq(topk_asc_result.inner(), &ref_topk_asc);
assert_batch_eq(topk_desc_result.inner(), &ref_topk_desc);
}
}
}
proptest! {
#[test]
fn test_topk_small_step(trace in input_trace(5, 1_000, 200, 20)) {
test_topk(trace, false)
}
#[test]
fn test_topk_big_step(trace in input_trace(5, 1_000, 200, 20)) {
test_topk(trace, true)
}
#[test]
fn test_lag_small_step(trace in input_trace(5, 100, 200, 20)) {
lag_test(trace, false)
}
#[test]
fn test_lag_big_step(trace in input_trace(5, 100, 200, 20)) {
lag_test(trace, true)
}
#[test]
fn test_lead_small_step(trace in input_trace(5, 100, 200, 20)) {
lead_test(trace, false)
}
#[test]
fn test_lead_big_step(trace in input_trace(5, 100, 200, 20)) {
lead_test(trace, true)
}
}