use feldera_types::config::StorageConfig;
use crate::{
CmpFunc, DBData, IndexedZSetHandle, OrdIndexedZSet, OrdZSet, OutputHandle, RootCircuit,
Runtime, Stream, ZSetHandle, ZWeight,
circuit::dbsp_handle::CircuitStorageConfig,
default_hash, indexed_zset,
operator::{
Max, Min,
time_series::{RelOffset, RelRange},
},
typed_batch::SpineSnapshot,
utils::{Tup2, Tup3, Tup4, test::init_test_logger},
};
use std::{cmp::Ordering, fmt::Debug, marker::PhantomData, path::PathBuf, sync::Arc};
use super::{CircuitConfig, dbsp_handle::Mode};
const NUM_WORKERS: usize = 4;
trait TestDataType {
type InputHandles: Send + 'static;
type OutputHandles: Send + 'static;
type Chunk: Clone;
type ZSet: Debug + PartialEq + Eq;
fn push_inputs(chunks: Self::Chunk, handles: &Self::InputHandles);
fn read_outputs(handles: &Self::OutputHandles) -> Self::ZSet;
}
macro_rules! impl_test_data {
($tname:ident, $($name:tt: $t:ident),*) => {
struct $tname<$($t: DBData),*> {
phantom: PhantomData<($($t),*)>,
}
impl<$($t),*> TestDataType for $tname<$($t),*>
where
$($t: DBData),*
{
type InputHandles = ($(ZSetHandle<$t>),*);
type OutputHandles = ($(OutputHandle<SpineSnapshot<OrdZSet<$t>>>),*);
type Chunk = ($(Vec<Tup2<$t, ZWeight>>),*);
type ZSet = ($(OrdZSet<$t>),*);
fn push_inputs(mut chunks: Self::Chunk, handles: &Self::InputHandles) {
$(
handles.$name.append(&mut chunks.$name);
)*
}
fn read_outputs(handles: &Self::OutputHandles) -> Self::ZSet {
($(SpineSnapshot::<OrdZSet<$t>>::concat(&handles.$name.take_from_all()).consolidate()),*)
}
}
};
}
impl_test_data!(TestData2, 0: T1, 1: T2);
impl_test_data!(TestData4, 0: T1, 1: T2, 2: T3, 3: T4);
struct TestData1<T1: DBData> {
phantom: PhantomData<T1>,
}
impl TestDataType for () {
type InputHandles = ();
type OutputHandles = ();
type Chunk = ();
type ZSet = ();
fn push_inputs(mut _chunks: Self::Chunk, _handles: &Self::InputHandles) {}
fn read_outputs(_handles: &Self::OutputHandles) -> Self::ZSet {}
}
impl<T1> TestDataType for TestData1<T1>
where
T1: DBData,
{
type InputHandles = ZSetHandle<T1>;
type OutputHandles = OutputHandle<SpineSnapshot<OrdZSet<T1>>>;
type Chunk = Vec<Tup2<T1, ZWeight>>;
type ZSet = OrdZSet<T1>;
fn push_inputs(mut chunks: Self::Chunk, handles: &Self::InputHandles) {
handles.append(&mut chunks);
}
fn read_outputs(handles: &Self::OutputHandles) -> Self::ZSet {
SpineSnapshot::<OrdZSet<T1>>::concat(&handles.take_from_all()).consolidate()
}
}
type CircuitFn<I1, I2, O1, O2> = Arc<
dyn Fn(
&mut RootCircuit,
) -> (
<I1 as TestDataType>::InputHandles,
<I2 as TestDataType>::InputHandles,
<O1 as TestDataType>::OutputHandles,
<O2 as TestDataType>::OutputHandles,
) + Send
+ Sync,
>;
fn circuit_config(path: &PathBuf) -> CircuitConfig {
CircuitConfig::with_workers(NUM_WORKERS)
.with_splitter_chunk_size_records(2)
.with_mode(Mode::Persistent)
.with_storage(Some(
CircuitStorageConfig::for_config(
StorageConfig {
path: path.to_string_lossy().into_owned(),
cache: Default::default(),
},
Default::default(),
)
.unwrap(),
))
}
fn test_replay<I1, I2, I3, O1, O2, O3>(
circuit_constructor1: CircuitFn<I1, I2, O1, O2>,
circuit_constructor2: CircuitFn<I2, I3, O2, O3>,
inputs1: Vec<I1::Chunk>,
inputs2_1: Vec<I2::Chunk>,
inputs2_2: Vec<I2::Chunk>,
inputs3: Vec<I3::Chunk>,
) where
I1: TestDataType,
I2: TestDataType,
I3: TestDataType,
O1: TestDataType,
O2: TestDataType,
O3: TestDataType,
{
assert_eq!(inputs1.len(), inputs2_1.len());
assert_eq!(inputs2_2.len(), inputs3.len());
init_test_logger();
let path = tempfile::tempdir().unwrap().keep();
println!("Running replay_test in {}", path.display());
let mut reference_output1 = Vec::new();
let mut reference_output2 = Vec::new();
let mut reference_output2_2 = Vec::new();
let mut reference_output3 = Vec::new();
{
println!("Running first circuit to get reference output");
let circuit_constructor1_clone = circuit_constructor1.clone();
let (mut circuit, (input_handles1, input_handles2, output_handles1, output_handles2)) =
Runtime::init_circuit(circuit_config(&path), move |circuit| {
Ok(circuit_constructor1_clone(circuit))
})
.unwrap();
for (data1, data2) in std::iter::zip(&inputs1, &inputs2_1) {
I1::push_inputs(data1.clone(), &input_handles1);
I2::push_inputs(data2.clone(), &input_handles2);
circuit.transaction().unwrap();
reference_output1.push(O1::read_outputs(&output_handles1));
reference_output2.push(O2::read_outputs(&output_handles2));
}
for data2 in inputs2_2.iter() {
I2::push_inputs(data2.clone(), &input_handles2);
circuit.transaction().unwrap();
reference_output2.push(O2::read_outputs(&output_handles2));
}
circuit.kill().unwrap();
println!("Running second circuit to get reference output");
let circuit_constructor2_clone = circuit_constructor2.clone();
let (mut circuit, (input_handles2, input_handles3, output_handles2, output_handles3)) =
Runtime::init_circuit(circuit_config(&path), move |circuit| {
Ok(circuit_constructor2_clone(circuit))
})
.unwrap();
for data2 in inputs2_1.iter() {
I2::push_inputs(data2.clone(), &input_handles2);
circuit.transaction().unwrap();
reference_output2_2.push(O2::read_outputs(&output_handles2));
}
for (data2, data3) in std::iter::zip(&inputs2_2, &inputs3) {
I2::push_inputs(data2.clone(), &input_handles2);
I3::push_inputs(data3.clone(), &input_handles3);
circuit.transaction().unwrap();
reference_output2_2.push(O2::read_outputs(&output_handles2));
reference_output3.push(O3::read_outputs(&output_handles3));
}
circuit.kill().unwrap();
assert_eq!(reference_output2, reference_output2_2);
}
let mut actual_output1 = Vec::new();
let mut actual_output2 = Vec::new();
let mut actual_output3 = Vec::new();
let checkpoint = {
println!("Running first circuit to create checkpoint");
let circuit_constructor1_clone = circuit_constructor1.clone();
let (mut circuit, (input_handles1, input_handles2, output_handles1, output_handles2)) =
Runtime::init_circuit(circuit_config(&path), move |circuit| {
Ok(circuit_constructor1_clone(circuit))
})
.unwrap();
for (data1, data2) in std::iter::zip(inputs1, inputs2_1) {
I1::push_inputs(data1.clone(), &input_handles1);
I2::push_inputs(data2.clone(), &input_handles2);
circuit.transaction().unwrap();
actual_output1.push(O1::read_outputs(&output_handles1));
actual_output2.push(O2::read_outputs(&output_handles2));
}
let checkpoint = circuit.checkpoint().run().unwrap();
circuit.kill().unwrap();
checkpoint
};
assert_eq!(reference_output1, actual_output1);
{
println!("Restarting circuit from checkpoint {}", checkpoint.uuid);
let mut circuit_config = circuit_config(&path);
circuit_config.storage.as_mut().unwrap().init_checkpoint = Some(checkpoint.uuid);
let circuit_constructor2_clone = circuit_constructor2.clone();
let (mut circuit, (input_handles2, input_handles3, output_handles2, output_handles3)) =
Runtime::init_circuit(circuit_config, move |circuit| {
Ok(circuit_constructor2_clone(circuit))
})
.unwrap();
while circuit.bootstrap_in_progress() {
circuit.transaction().unwrap();
}
println!("Replay finished");
for (data2, data3) in std::iter::zip(&inputs2_2, &inputs3) {
I2::push_inputs(data2.clone(), &input_handles2);
I3::push_inputs(data3.clone(), &input_handles3);
circuit.transaction().unwrap();
actual_output2.push(O2::read_outputs(&output_handles2));
actual_output3.push(O3::read_outputs(&output_handles3));
}
println!("Additional transactions completed");
circuit.kill().unwrap();
}
println!("Reference output1: {:?}", reference_output1);
println!("Reference output3: {:?}", reference_output3);
println!("Actual output1: {:?}", actual_output1);
println!("Actual output3: {:?}", actual_output3);
assert_eq!(reference_output2, actual_output2);
assert_eq!(reference_output3, actual_output3);
}
fn sequence(from: u64, to: u64) -> Vec<Vec<Tup2<u64, ZWeight>>> {
(from..to).map(|x| vec![Tup2(x, 1)]).collect()
}
fn chain(from: u64, to: u64) -> Vec<Vec<Tup2<Tup2<u64, u64>, ZWeight>>> {
(from..to).map(|x| vec![Tup2(Tup2(x, x + 1), 1)]).collect()
}
fn linear_circuit1(
circuit: &mut RootCircuit,
) -> (
ZSetHandle<u64>,
(),
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
(),
) {
let (input_stream, input_handle) = circuit.add_input_zset::<u64>();
let output_handle = input_stream
.map(|x| x % 1000)
.accumulate_output_persistent(Some("output1"));
(input_handle, (), output_handle, ())
}
fn linear_circuit2(
circuit: &mut RootCircuit,
) -> (
(),
ZSetHandle<u64>,
(),
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
) {
let (input_stream, input_handle) = circuit.add_input_zset::<u64>();
let output_handle = input_stream
.map(|x| x >> 1)
.accumulate_output_persistent(Some("output2"));
((), input_handle, (), output_handle)
}
#[test]
fn test_linear_circuit() {
test_replay::<TestData1<u64>, (), TestData1<u64>, TestData1<u64>, (), TestData1<u64>>(
Arc::new(linear_circuit1),
Arc::new(linear_circuit2),
sequence(0, 2),
std::iter::repeat_n((), 2).collect(),
std::iter::repeat_n((), 2).collect(),
sequence(3, 5),
);
}
fn linear_circuit_materialized_inputs1(
circuit: &mut RootCircuit,
) -> (
ZSetHandle<u64>,
ZSetHandle<u64>,
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
let (input_stream2, input_handle2) = circuit.add_input_zset::<u64>();
input_stream2.set_persistent_id(Some("input2"));
input_stream1.integrate_trace();
input_stream2.integrate_trace();
let output_handle1 = input_stream1
.map(|x| x % 1000)
.accumulate_output_persistent(Some("output1"));
let output_handle2 = input_stream2
.map(|x| x + 5)
.accumulate_output_persistent(Some("output2"));
(input_handle1, input_handle2, output_handle1, output_handle2)
}
fn linear_circuit_materialized_inputs2(
circuit: &mut RootCircuit,
) -> (
ZSetHandle<u64>,
ZSetHandle<u64>,
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
) {
let (input_stream2, input_handle2) = circuit.add_input_zset::<u64>();
input_stream2.set_persistent_id(Some("input2"));
let (input_stream3, input_handle3) = circuit.add_input_zset::<u64>();
input_stream3.set_persistent_id(Some("input3"));
input_stream2.integrate_trace();
input_stream3.integrate_trace();
let output_handle2 = input_stream2
.map(|x| x + 5)
.accumulate_output_persistent(Some("output2"));
let output_handle3 = input_stream3
.map(|x| x >> 1)
.accumulate_output_persistent(Some("output3"));
(input_handle2, input_handle3, output_handle2, output_handle3)
}
#[test]
fn test_linear_circuit_materialized_inputs() {
test_replay::<
TestData1<u64>,
TestData1<u64>,
TestData1<u64>,
TestData1<u64>,
TestData1<u64>,
TestData1<u64>,
>(
Arc::new(linear_circuit_materialized_inputs1),
Arc::new(linear_circuit_materialized_inputs2),
sequence(0, 2),
sequence(0, 2),
sequence(3, 5),
sequence(3, 5),
);
}
fn linear_circuit_materialized_inputs1_2(
circuit: &mut RootCircuit,
) -> (
ZSetHandle<u64>,
ZSetHandle<u64>,
(
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
),
(),
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
let (input_stream2, input_handle2) = circuit.add_input_zset::<u64>();
input_stream2.set_persistent_id(Some("input2"));
input_stream1.integrate_trace();
input_stream2.integrate_trace();
let output_handle1 = input_stream1
.map(|x| x % 1000)
.accumulate_output_persistent(Some("output1"));
let output_handle2 = input_stream2
.map(|x| x + 5)
.accumulate_output_persistent(Some("output2"));
(
input_handle1,
input_handle2,
(output_handle1, output_handle2),
(),
)
}
fn linear_circuit_materialized_inputs2_2(
circuit: &mut RootCircuit,
) -> (
ZSetHandle<u64>,
ZSetHandle<u64>,
(),
(
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
),
) {
let (input_stream2, input_handle2) = circuit.add_input_zset::<u64>();
input_stream2.set_persistent_id(Some("input2"));
let (input_stream3, input_handle3) = circuit.add_input_zset::<u64>();
input_stream3.set_persistent_id(Some("input3"));
input_stream2.integrate_trace();
input_stream3.integrate_trace();
let output_handle2 = input_stream2
.map(|x| x + 5)
.accumulate_output_persistent(Some("output2_2"));
let output_handle3 = input_stream3
.map(|x| x >> 1)
.accumulate_output_persistent(Some("output3"));
(
input_handle2,
input_handle3,
(),
(output_handle2, output_handle3),
)
}
#[test]
fn test_linear_circuit_materialized_inputs_with_backfill() {
test_replay::<
TestData1<u64>,
TestData1<u64>,
TestData1<u64>,
TestData2<u64, u64>,
(),
TestData2<u64, u64>,
>(
Arc::new(linear_circuit_materialized_inputs1_2),
Arc::new(linear_circuit_materialized_inputs2_2),
sequence(0, 2),
sequence(0, 2),
sequence(3, 5),
sequence(3, 5),
);
}
fn join_circuit1(
balancing: bool,
circuit: &mut RootCircuit,
) -> (
(),
(ZSetHandle<u64>, ZSetHandle<u64>),
(),
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
let (input_stream2, input_handle2) = circuit.add_input_zset::<u64>();
input_stream2.set_persistent_id(Some("input2"));
let input_stream1_indexed = input_stream1
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream1_indexed"));
let input_stream2_indexed = input_stream2
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream2_indexed"));
let (_j1_indexed, output_handle1) = if balancing {
let j1 = input_stream1_indexed
.join_balanced_inner(&input_stream2_indexed, |key, _v1, _v2| *key)
.set_persistent_id(Some("j1"));
let j1_indexed = j1
.map_index(|x| (*x, *x))
.set_persistent_id(Some("j1-indexed"));
j1_indexed.join_balanced_inner(&input_stream1_indexed, |key, _v1, _v2| *key);
(j1_indexed, j1.accumulate_output_persistent(Some("output1")))
} else {
let j1 = input_stream1_indexed
.join(&input_stream2_indexed, |key, _v1, _v2| *key)
.set_persistent_id(Some("j1"));
let j1_indexed = j1
.map_index(|x| (*x, *x))
.set_persistent_id(Some("j1-indexed"));
j1_indexed.join(&input_stream1_indexed, |key, _v1, _v2| *key);
(j1_indexed, j1.accumulate_output_persistent(Some("output1")))
};
((), (input_handle1, input_handle2), (), output_handle1)
}
fn join_circuit2(
balancing: bool,
circuit: &mut RootCircuit,
) -> (
(ZSetHandle<u64>, ZSetHandle<u64>),
ZSetHandle<u64>,
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
let (input_stream2, input_handle2) = circuit.add_input_zset::<u64>();
input_stream2.set_persistent_id(Some("input2"));
let (input_stream3, input_handle3) = circuit.add_input_zset::<u64>();
input_stream3.set_persistent_id(Some("input3"));
let input_stream1_indexed = input_stream1
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream1_indexed"));
let input_stream2_indexed = input_stream2
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream2_indexed"));
let input_stream3_indexed = input_stream3
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream3_indexed"));
let (j1_indexed, output_handle1) = if balancing {
let j1 = input_stream1_indexed
.join_balanced_inner(&input_stream2_indexed, |key, _v1, _v2| *key)
.set_persistent_id(Some("j1"));
let j1_indexed = j1
.map_index(|x| (*x, *x))
.set_persistent_id(Some("j1-indexed"));
j1_indexed.join_balanced_inner(&input_stream1_indexed, |key, _v1, _v2| *key);
(j1_indexed, j1.accumulate_output_persistent(Some("output1")))
} else {
let j1 = input_stream1_indexed
.join(&input_stream2_indexed, |key, _v1, _v2| *key)
.set_persistent_id(Some("j1"));
let j1_indexed = j1
.map_index(|x| (*x, *x))
.set_persistent_id(Some("j1-indexed"));
j1_indexed.join(&input_stream1_indexed, |key, _v1, _v2| *key);
(j1_indexed, j1.accumulate_output_persistent(Some("output1")))
};
let output_handle2 = if balancing {
let j2 = input_stream2_indexed
.join_balanced_inner(&input_stream3_indexed, |key, _v1, _v2| *key)
.set_persistent_id(Some("j2"));
let j2_indexed = j2
.map_index(|x| (*x, *x))
.set_persistent_id(Some("j2-indexed"));
let j3 = j2_indexed
.join_balanced_inner(&j1_indexed, |key, _v1, _v2| *key)
.set_persistent_id(Some("j3"));
j3.accumulate_output_persistent(Some("output2"))
} else {
let j2 = input_stream2_indexed
.join(&input_stream3_indexed, |key, _v1, _v2| *key)
.set_persistent_id(Some("j2"));
let j2_indexed = j2
.map_index(|x| (*x, *x))
.set_persistent_id(Some("j2-indexed"));
let j3 = j2_indexed
.join(&j1_indexed, |key, _v1, _v2| *key)
.set_persistent_id(Some("j3"));
j3.accumulate_output_persistent(Some("output2"))
};
(
(input_handle1, input_handle2),
input_handle3,
output_handle1,
output_handle2,
)
}
#[test]
fn test_join_circuit() {
test_replay::<(), TestData2<u64, u64>, TestData1<u64>, (), TestData1<u64>, TestData1<u64>>(
Arc::new(|circuit| join_circuit1(false, circuit)),
Arc::new(|circuit| join_circuit2(false, circuit)),
std::iter::repeat_n((), 2).collect(),
std::iter::zip(sequence(0, 2), sequence(2, 4)).collect(),
std::iter::zip(sequence(2, 4), sequence(0, 2)).collect(),
sequence(1, 3),
);
}
#[test]
fn test_balanced_join_circuit() {
test_replay::<(), TestData2<u64, u64>, TestData1<u64>, (), TestData1<u64>, TestData1<u64>>(
Arc::new(|circuit| join_circuit1(true, circuit)),
Arc::new(|circuit| join_circuit2(true, circuit)),
std::iter::repeat_n((), 2).collect(),
std::iter::zip(sequence(0, 2), sequence(2, 4)).collect(),
std::iter::zip(sequence(2, 4), sequence(0, 2)).collect(),
sequence(1, 3),
);
}
fn balancer_circuit1(
circuit: &mut RootCircuit,
) -> (
(),
(
ZSetHandle<u64>,
ZSetHandle<u64>,
ZSetHandle<u64>,
ZSetHandle<u64>,
),
(),
(
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
),
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
let (input_stream2, input_handle2) = circuit.add_input_zset::<u64>();
input_stream2.set_persistent_id(Some("input2"));
let (input_stream3, input_handle3) = circuit.add_input_zset::<u64>();
input_stream3.set_persistent_id(Some("input3"));
let (input_stream4, input_handle4) = circuit.add_input_zset::<u64>();
input_stream4.set_persistent_id(Some("input4"));
input_stream1.integrate_trace();
input_stream2.integrate_trace();
input_stream3.integrate_trace();
input_stream4.integrate_trace();
let input_stream1_indexed = input_stream1
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream1_indexed"));
let input_stream2_indexed = input_stream2
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream2_indexed"));
let input_stream3_indexed = input_stream3
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream3_indexed"));
let input_stream4_indexed = input_stream4
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream4_indexed"));
let output_handle1 = input_stream1_indexed
.join_balanced_inner(&input_stream2_indexed, |key, _v1, _v2| *key)
.accumulate_output_persistent(Some("output1"));
let output_handle2 = input_stream3_indexed
.join_balanced_inner(&input_stream4_indexed, |key, _v1, _v2| *key)
.accumulate_output_persistent(Some("output2"));
(
(),
(input_handle1, input_handle2, input_handle3, input_handle4),
(),
(output_handle1, output_handle2),
)
}
fn balancer_circuit2(
circuit: &mut RootCircuit,
) -> (
(
ZSetHandle<u64>,
ZSetHandle<u64>,
ZSetHandle<u64>,
ZSetHandle<u64>,
),
(),
(
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
),
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
let (input_stream2, input_handle2) = circuit.add_input_zset::<u64>();
input_stream2.set_persistent_id(Some("input2"));
let (input_stream3, input_handle3) = circuit.add_input_zset::<u64>();
input_stream3.set_persistent_id(Some("input3"));
let (input_stream4, input_handle4) = circuit.add_input_zset::<u64>();
input_stream4.set_persistent_id(Some("input4"));
input_stream1.integrate_trace();
input_stream2.integrate_trace();
input_stream3.integrate_trace();
input_stream4.integrate_trace();
let input_stream1_indexed = input_stream1
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream1_indexed"));
let input_stream2_indexed = input_stream2
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream2_indexed"));
let input_stream3_indexed = input_stream3
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream3_indexed"));
let input_stream4_indexed = input_stream4
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream4_indexed"));
let output_handle1 = input_stream1_indexed
.join_balanced_inner(&input_stream2_indexed, |key, _v1, _v2| *key)
.accumulate_output_persistent(Some("output1"));
let output_handle2 = input_stream3_indexed
.join_balanced_inner(&input_stream4_indexed, |key, _v1, _v2| *key)
.accumulate_output_persistent(Some("output2"));
let output_handle3 = input_stream2_indexed
.join_balanced_inner(&input_stream3_indexed, |key, _v1, _v2| *key)
.accumulate_output_persistent(Some("output3"));
(
(input_handle1, input_handle2, input_handle3, input_handle4),
(),
(output_handle1, output_handle2),
output_handle3,
)
}
#[test]
fn test_balancer1() {
test_replay::<(), TestData4<u64, u64, u64, u64>, (), (), TestData2<u64, u64>, TestData1<u64>>(
Arc::new(|circuit| balancer_circuit1(circuit)),
Arc::new(|circuit| balancer_circuit2(circuit)),
vec![(); 100],
itertools::izip!(
sequence(0, 100),
sequence(0, 100),
sequence(0, 100),
sequence(0, 100),
)
.collect(),
itertools::izip!(
sequence(100, 200),
sequence(100, 200),
sequence(100, 200),
sequence(100, 200)
)
.collect(),
vec![(); 100],
);
}
#[test]
fn test_balancer2() {
let skewed_sequence_small1 = (0..1000)
.filter(|x| default_hash(&x) % NUM_WORKERS as u64 == 0)
.map(|x| vec![Tup2(x, 1)])
.collect::<Vec<_>>();
let skewed_sequence_large1 = (0..1000)
.filter(|x| default_hash(&x) % NUM_WORKERS as u64 == 0)
.map(|x| vec![Tup2(x, 100)])
.collect::<Vec<_>>();
let skewed_sequence_small2 = (1000..2000)
.filter(|x| default_hash(&x) % NUM_WORKERS as u64 == 0)
.map(|x| vec![Tup2(x, 1)])
.collect::<Vec<_>>();
let skewed_sequence_large2 = (1000..2000)
.filter(|x| default_hash(&x) % NUM_WORKERS as u64 == 0)
.map(|x| vec![Tup2(x, 100)])
.collect::<Vec<_>>();
test_replay::<(), TestData4<u64, u64, u64, u64>, (), (), TestData2<u64, u64>, TestData1<u64>>(
Arc::new(|circuit| balancer_circuit1(circuit)),
Arc::new(|circuit| balancer_circuit2(circuit)),
vec![(); skewed_sequence_small1.len()],
itertools::izip!(
skewed_sequence_large1.clone(),
skewed_sequence_small1.clone(),
skewed_sequence_small1.clone(),
skewed_sequence_large1.clone(),
)
.collect(),
itertools::izip!(
skewed_sequence_large2.clone(),
skewed_sequence_small2.clone(),
skewed_sequence_small2.clone(),
skewed_sequence_large2.clone(),
)
.collect(),
vec![(); skewed_sequence_small2.len()],
);
}
fn aggregate_circuit1(
circuit: &mut RootCircuit,
) -> (
(),
ZSetHandle<u64>,
(),
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
input_stream1.integrate_trace();
let input_stream1_indexed = input_stream1
.map_index(|x| (x % 2, *x))
.set_persistent_id(Some("input_stream1_indexed"));
let aggregate1 = input_stream1_indexed
.aggregate_persistent(Some("aggregate1"), Max)
.set_persistent_id(Some("aggregate1"));
let aggregate1_flat = aggregate1
.map(|(_k, v)| *v)
.set_persistent_id(Some("aggregate1_flat"));
let output_handle1 = aggregate1_flat.accumulate_output_persistent(Some("output1"));
((), input_handle1, (), output_handle1)
}
fn aggregate_circuit2(
circuit: &mut RootCircuit,
) -> (
ZSetHandle<u64>,
(),
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
(
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
OutputHandle<SpineSnapshot<OrdZSet<u64>>>,
),
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
input_stream1.integrate_trace();
let input_stream1_indexed = input_stream1
.map_index(|x| (x % 2, *x))
.set_persistent_id(Some("input_stream1_indexed"));
let aggregate1 = input_stream1_indexed
.aggregate_persistent(Some("aggregate1"), Max)
.set_persistent_id(Some("aggregate1"));
let aggregate1_flat = aggregate1
.map(|(_k, v)| *v)
.set_persistent_id(Some("aggregate1_flat"));
let aggregate2 = aggregate1
.aggregate_persistent(Some("aggregate2"), Min)
.set_persistent_id(Some("aggregate2"));
let aggregate2_flat = aggregate2
.map(|(_k, v)| *v)
.set_persistent_id(Some("aggregate2_flat"));
let aggregate3 = input_stream1_indexed
.aggregate_persistent(Some("aggregate3"), Min)
.set_persistent_id(Some("aggregate3"));
let aggregate3_flat = aggregate3
.map(|(_k, v)| *v)
.set_persistent_id(Some("aggregate3_flat"));
let output_handle1 = aggregate1_flat.accumulate_output_persistent(Some("output1"));
let output_handle2 = aggregate2_flat.accumulate_output_persistent(Some("output2"));
let output_handle3 = aggregate3_flat.accumulate_output_persistent(Some("output3"));
(
input_handle1,
(),
output_handle1,
(output_handle2, output_handle3),
)
}
#[test]
fn test_aggregate_circuit() {
test_replay::<(), TestData1<u64>, (), (), TestData1<u64>, TestData2<u64, u64>>(
Arc::new(aggregate_circuit1),
Arc::new(aggregate_circuit2),
std::iter::repeat_n((), 5).collect(),
sequence(0, 5),
sequence(5, 10),
std::iter::repeat_n((), 5).collect(),
);
}
fn recursive_circuit1(
circuit: &mut RootCircuit,
) -> (
(),
ZSetHandle<Tup2<u64, u64>>,
(),
OutputHandle<SpineSnapshot<OrdZSet<Tup2<u64, u64>>>>,
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<Tup2<u64, u64>>();
input_stream1.set_persistent_id(Some("input1"));
input_stream1.integrate_trace();
let paths = circuit
.recursive(|child, paths: Stream<_, OrdZSet<Tup2<u64, u64>>>| {
let edges = input_stream1.delta0(child);
let paths_indexed = paths.map_index(|&Tup2(x, y)| (y, x));
let edges_indexed = edges.map_index(|Tup2(x, y)| (*x, *y));
Ok(edges.plus(&paths_indexed.join(&edges_indexed, |_via, from, to| Tup2(*from, *to))))
})
.unwrap();
let output_handle1 = paths.accumulate_output_persistent(Some("output1"));
((), input_handle1, (), output_handle1)
}
fn recursive_circuit2(
circuit: &mut RootCircuit,
) -> (
ZSetHandle<Tup2<u64, u64>>,
(),
OutputHandle<SpineSnapshot<OrdZSet<Tup2<u64, u64>>>>,
OutputHandle<SpineSnapshot<OrdZSet<Tup2<u64, u64>>>>,
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<Tup2<u64, u64>>();
input_stream1.set_persistent_id(Some("input1"));
input_stream1.integrate_trace();
let paths = circuit
.recursive(|child, paths: Stream<_, OrdZSet<Tup2<u64, u64>>>| {
let edges = input_stream1.delta0(child);
let paths_indexed = paths.map_index(|&Tup2(x, y)| (y, x));
let edges_indexed = edges.map_index(|Tup2(x, y)| (*x, *y));
Ok(edges.plus(&paths_indexed.join(&edges_indexed, |_via, from, to| Tup2(*from, *to))))
})
.unwrap();
let output_handle1 = paths.accumulate_output_persistent(Some("output1"));
let output_handle2 = paths.accumulate_output_persistent(Some("output2"));
(input_handle1, (), output_handle1, output_handle2)
}
#[test]
fn test_recursive_circuit1() {
test_replay::<
(),
TestData1<Tup2<u64, u64>>,
(),
(),
TestData1<Tup2<u64, u64>>,
TestData1<Tup2<u64, u64>>,
>(
Arc::new(recursive_circuit1),
Arc::new(recursive_circuit2),
std::iter::repeat_n((), 5).collect(),
chain(0, 5),
chain(5, 10),
std::iter::repeat_n((), 5).collect(),
);
}
struct Asc<T>(PhantomData<T>);
impl<T: DBData> CmpFunc<T> for Asc<T> {
fn cmp(left: &T, right: &T) -> std::cmp::Ordering {
left.cmp(right)
}
}
struct RankValOrd(PhantomData<()>);
impl CmpFunc<Tup2<u64, u64>> for RankValOrd {
fn cmp(left: &Tup2<u64, u64>, right: &Tup2<u64, u64>) -> Ordering {
left.0.cmp(&right.0).then_with(|| left.1.cmp(&right.1))
}
}
fn lag_circuit1(
circuit: &mut RootCircuit,
) -> (
(),
ZSetHandle<u64>,
(),
OutputHandle<SpineSnapshot<OrdZSet<Tup2<u64, u64>>>>,
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
input_stream1.integrate_trace();
let input_stream1_indexed = input_stream1
.map_index(|x| (x % 3, *x))
.set_persistent_id(Some("input_stream1_indexed"));
let lag1 = input_stream1_indexed
.lag_custom_order_persistent::<_, _, _, Asc<_>, _>(
Some("lag1"),
1,
|x| x.cloned().unwrap_or(0),
|x, y| Tup2(*x, *y),
)
.set_persistent_id(Some("lag1"));
let lag1_flat = lag1.map(|(_k, v)| *v).set_persistent_id(Some("lag1_flat"));
let output_handle1 = lag1_flat.accumulate_output_persistent(Some("output1"));
((), input_handle1, (), output_handle1)
}
fn lag_circuit2(
circuit: &mut RootCircuit,
) -> (
ZSetHandle<u64>,
(),
OutputHandle<SpineSnapshot<OrdZSet<Tup2<u64, u64>>>>,
(
OutputHandle<SpineSnapshot<OrdZSet<Tup3<u64, u64, u64>>>>,
OutputHandle<SpineSnapshot<OrdZSet<Tup2<u64, u64>>>>,
),
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
input_stream1.integrate_trace();
let input_stream1_indexed = input_stream1
.map_index(|x| (x % 3, *x))
.set_persistent_id(Some("input_stream1_indexed"));
let lag1 = input_stream1_indexed
.lag_custom_order_persistent::<_, _, _, Asc<_>, _>(
Some("lag1"),
1,
|x| x.cloned().unwrap_or(0),
|x, y| Tup2(*x, *y),
)
.set_persistent_id(Some("lag1"));
let lag1_flat = lag1.map(|(_k, v)| *v).set_persistent_id(Some("lag1_flat"));
let lag2 = lag1
.lag_custom_order_persistent::<_, _, _, Asc<_>, _>(
Some("lag2"),
1,
|x| x.cloned().unwrap_or(Tup2(0, 0)).1,
|Tup2(x, y), z| Tup3(*x, *y, *z),
)
.set_persistent_id(Some("lag2"));
let lag2_flat = lag2.map(|(_k, v)| *v).set_persistent_id(Some("lag2_flat"));
let lag3 = input_stream1_indexed
.lag_custom_order_persistent::<_, _, _, Asc<_>, _>(
Some("lag3"),
1,
|x| x.cloned().unwrap_or(0),
|x, y| Tup2(*x, *y),
)
.set_persistent_id(Some("lag3"));
let lag3_flat = lag3.map(|(_k, v)| *v).set_persistent_id(Some("lag3_flat"));
let output_handle1 = lag1_flat.accumulate_output_persistent(Some("output1"));
let output_handle2 = lag2_flat.accumulate_output_persistent(Some("output2"));
let output_handle3 = lag3_flat.accumulate_output_persistent(Some("output3"));
(
input_handle1,
(),
output_handle1,
(output_handle2, output_handle3),
)
}
#[test]
fn test_lag_circuit() {
test_replay::<
(),
TestData1<u64>,
(),
(),
TestData1<Tup2<u64, u64>>,
TestData2<Tup3<u64, u64, u64>, Tup2<u64, u64>>,
>(
Arc::new(lag_circuit1),
Arc::new(lag_circuit2),
std::iter::repeat_n((), 10).collect(),
sequence(0, 10),
sequence(10, 20),
std::iter::repeat_n((), 10).collect(),
);
}
struct RankValOrd3(PhantomData<()>);
impl CmpFunc<Tup3<u64, u64, i64>> for RankValOrd3 {
fn cmp(left: &Tup3<u64, u64, i64>, right: &Tup3<u64, u64, i64>) -> Ordering {
left.0
.cmp(&right.0)
.then_with(|| left.1.cmp(&right.1).then_with(|| left.2.cmp(&right.2)))
}
}
fn rank_circuit1(
circuit: &mut RootCircuit,
) -> (
(),
ZSetHandle<u64>,
(),
OutputHandle<SpineSnapshot<OrdZSet<Tup4<u64, u64, u64, i64>>>>,
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
let input_stream1_indexed = input_stream1
.map_index(|x| (x % 3, Tup2(*x, *x % 5)))
.set_persistent_id(Some("input_stream1_indexed"));
let rank1 = input_stream1_indexed
.rank_custom_order_persistent::<RankValOrd, u64, _, _, _, _>(
Some("rank1"),
|v| v.0,
|a, b| a.0.cmp(b),
|rank, t| Tup3(t.0, t.1, rank),
)
.set_persistent_id(Some("rank1"));
let rank1_flat = rank1
.map(|(k, t)| Tup4(*k, t.0, t.1, t.2))
.set_persistent_id(Some("rank1_flat"));
let output_handle1 = rank1_flat.accumulate_output_persistent(Some("output1"));
((), input_handle1, (), output_handle1)
}
fn rank_circuit2(
circuit: &mut RootCircuit,
) -> (
ZSetHandle<u64>,
(),
OutputHandle<SpineSnapshot<OrdZSet<Tup4<u64, u64, u64, i64>>>>,
OutputHandle<SpineSnapshot<OrdZSet<Tup4<u64, u64, u64, i64>>>>,
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
input_stream1.integrate_trace();
let input_stream1_indexed = input_stream1
.map_index(|x| (x % 3, Tup2(*x, *x % 5)))
.set_persistent_id(Some("input_stream1_indexed"));
let rank1 = input_stream1_indexed
.rank_custom_order_persistent::<RankValOrd, u64, _, _, _, _>(
Some("rank1"),
|v| v.0,
|a, b| a.0.cmp(b),
|rank, t| Tup3(t.0, t.1, rank),
)
.set_persistent_id(Some("rank1"));
let rank1_flat = rank1
.map(|(k, t)| Tup4(*k, t.0, t.1, t.2))
.set_persistent_id(Some("rank1_flat"));
let dense1 = rank1
.dense_rank_custom_order_persistent::<RankValOrd3, u64, _, _, _, _>(
Some("dense1"),
|v| v.0,
|a, b| a.0.cmp(b),
|rank, t| Tup3(t.0, t.1, rank),
)
.set_persistent_id(Some("dense1"));
let dense1_flat = dense1
.map(|(k, t)| Tup4(*k, t.0, t.1, t.2))
.set_persistent_id(Some("dense1_flat"));
let output_handle1 = rank1_flat.accumulate_output_persistent(Some("output1"));
let output_handle2 = dense1_flat.accumulate_output_persistent(Some("output2"));
(input_handle1, (), output_handle1, output_handle2)
}
#[test]
fn test_rank_circuit() {
test_replay::<
(),
TestData1<u64>,
(),
(),
TestData1<Tup4<u64, u64, u64, i64>>,
TestData1<Tup4<u64, u64, u64, i64>>,
>(
Arc::new(rank_circuit1),
Arc::new(rank_circuit2),
std::iter::repeat_n((), 10).collect(),
sequence(0, 10),
sequence(10, 20),
std::iter::repeat_n((), 10).collect(),
);
}
fn rolling_circuit1(
circuit: &mut RootCircuit,
) -> (
(),
ZSetHandle<u64>,
OutputHandle<SpineSnapshot<OrdZSet<Tup2<u64, Option<u64>>>>>,
OutputHandle<SpineSnapshot<OrdZSet<Tup2<u64, Option<u64>>>>>,
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
input_stream1.integrate_trace();
let input_stream1_indexed = input_stream1
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream1_indexed"));
let rolling1 = input_stream1_indexed
.partitioned_rolling_aggregate_persistent(
Some("rolling1"),
|x| (x % 3, *x),
Min,
RelRange::new(RelOffset::Before(10), RelOffset::After(0)),
)
.set_persistent_id(Some("rolling1"));
let rolling1_flat = rolling1
.map_index(|(k, v)| (*k, *v))
.map(|(_k, v)| *v)
.set_persistent_id(Some("rolling1_flat"));
let output_handle1 = rolling1_flat.accumulate_output_persistent(Some("output1"));
let rolling2 = input_stream1_indexed
.partitioned_rolling_aggregate_persistent(
Some("rolling2"),
|x| (x % 5, *x),
Min,
RelRange::new(RelOffset::Before(10), RelOffset::After(0)),
)
.set_persistent_id(Some("rolling2"));
let rolling2_flat = rolling2
.map_index(|(k, v)| (*k, *v))
.map(|(_k, v)| *v)
.set_persistent_id(Some("rolling2_flat"));
let output_handle2 = rolling2_flat.accumulate_output_persistent(Some("output2"));
((), input_handle1, output_handle1, output_handle2)
}
fn rolling_circuit2(
circuit: &mut RootCircuit,
) -> (
ZSetHandle<u64>,
(),
OutputHandle<SpineSnapshot<OrdZSet<Tup2<u64, Option<u64>>>>>,
(
OutputHandle<SpineSnapshot<OrdZSet<Tup2<u64, Option<u64>>>>>,
OutputHandle<SpineSnapshot<OrdZSet<Tup2<u64, Option<u64>>>>>,
),
) {
let (input_stream1, input_handle1) = circuit.add_input_zset::<u64>();
input_stream1.set_persistent_id(Some("input1"));
input_stream1.integrate_trace();
let input_stream1_indexed = input_stream1
.map_index(|x| (*x, *x))
.set_persistent_id(Some("input_stream1_indexed"));
let rolling2 = input_stream1_indexed
.partitioned_rolling_aggregate_persistent(
Some("rolling2"),
|x| (x % 5, *x),
Min,
RelRange::new(RelOffset::Before(10), RelOffset::After(0)),
)
.set_persistent_id(Some("rolling2"));
let rolling2_flat = rolling2
.map_index(|(k, v)| (*k, *v))
.map(|(_k, v)| *v)
.set_persistent_id(Some("rolling2_flat"));
let output_handle2 = rolling2_flat.accumulate_output_persistent(Some("output2"));
let rolling3 = rolling2_flat
.map_index(|Tup2(x, y)| (*x, Tup2(*x, *y)))
.partitioned_rolling_aggregate_persistent(
Some("rolling3"),
|x| (x.0 % 7, x.0),
Min,
RelRange::new(RelOffset::Before(10), RelOffset::After(0)),
)
.set_persistent_id(Some("rolling3"));
let rolling3_flat = rolling3
.map_index(|(k, v)| (*k, *v))
.map(|(_k, v)| *v)
.set_persistent_id(Some("rolling3_flat"));
let output_handle3 = rolling3_flat.accumulate_output_persistent(Some("output3"));
let rolling4 = input_stream1_indexed
.partitioned_rolling_aggregate_persistent(
Some("rolling4"),
|x| (x % 2, *x),
Min,
RelRange::new(RelOffset::Before(10), RelOffset::After(0)),
)
.set_persistent_id(Some("rolling4"));
let rolling4_flat = rolling4
.map_index(|(k, v)| (*k, *v))
.map(|(_k, v)| *v)
.set_persistent_id(Some("rolling4_flat"));
let output_handle4 = rolling4_flat.accumulate_output_persistent(Some("output4"));
(
input_handle1,
(),
output_handle2,
(output_handle3, output_handle4),
)
}
#[test]
fn test_rolling_circuit() {
test_replay::<
(),
TestData1<u64>,
(),
TestData1<Tup2<u64, Option<u64>>>,
TestData1<Tup2<u64, Option<u64>>>,
TestData2<Tup2<u64, Option<u64>>, Tup2<u64, Option<u64>>>,
>(
Arc::new(rolling_circuit1),
Arc::new(rolling_circuit2),
std::iter::repeat_n((), 20).collect(),
sequence(0, 20),
sequence(20, 40),
std::iter::repeat_n((), 20).collect(),
);
}
#[test]
fn regression1() {
init_test_logger();
let path = tempfile::tempdir().unwrap().keep();
let (mut circuit1, input_handle1) =
Runtime::init_circuit(circuit_config(&path), move |circuit| {
let (input_stream, input_handle) = circuit
.add_input_map_persistent::<u64, u64, u64, _>(Some("input_map"), |v, u| *v = *u);
input_stream.set_persistent_id(Some("input_map"));
Ok(input_handle)
})
.unwrap();
input_handle1.push(0, crate::operator::Update::Insert(0));
circuit1.transaction().unwrap();
let checkpoint = circuit1.checkpoint().run().unwrap();
circuit1.kill().unwrap();
let mut circuit_config = circuit_config(&path);
circuit_config.storage.as_mut().unwrap().init_checkpoint = Some(checkpoint.uuid);
let (mut circuit2, (_input_handle2, output_handle2)) =
Runtime::init_circuit(circuit_config, move |circuit| {
let (input_stream, input_handle) = circuit
.add_input_map_persistent::<u64, u64, u64, _>(Some("input_map"), |v, u| *v = *u);
input_stream.set_persistent_id(Some("input_map"));
let aggregate = input_stream
.aggregate_persistent(Some("aggregate1"), Max)
.set_persistent_id(Some("aggregate1"));
let output_handle = aggregate
.accumulate_trace()
.apply(|trace| trace.ro_snapshot().consolidate())
.output_persistent(Some("output"));
Ok((input_handle, output_handle))
})
.unwrap();
while circuit2.bootstrap_in_progress() {
circuit2.transaction().unwrap();
}
println!("Replay finished");
let actual_output = &output_handle2.concat().consolidate();
assert_eq!(actual_output, &indexed_zset!(0 => {0 => 1}));
}
#[derive(Clone, Copy, Debug)]
enum ReplayTraceKind {
IntegrateTrace,
AccumulateTrace,
}
type IndexedReplayBatch = Vec<Tup2<u64, Tup2<u64, ZWeight>>>;
fn add_replay_trace(
stream: &Stream<RootCircuit, OrdIndexedZSet<u64, u64>>,
trace_kind: ReplayTraceKind,
) {
match trace_kind {
ReplayTraceKind::IntegrateTrace => {
stream.integrate_trace();
}
ReplayTraceKind::AccumulateTrace => {
stream.accumulate_trace();
}
}
}
fn transactional_bootstrap_circuit1(
circuit: &mut RootCircuit,
trace_kind: ReplayTraceKind,
) -> IndexedZSetHandle<u64, u64> {
let (input_stream, input_handle) = circuit.add_input_indexed_zset::<u64, u64>();
input_stream.set_persistent_id(Some("input"));
add_replay_trace(&input_stream, trace_kind);
input_handle
}
fn transactional_bootstrap_circuit2(
circuit: &mut RootCircuit,
trace_kind: ReplayTraceKind,
) -> (
IndexedZSetHandle<u64, u64>,
OutputHandle<SpineSnapshot<OrdIndexedZSet<u64, u64>>>,
) {
let (input_stream, input_handle) = circuit.add_input_indexed_zset::<u64, u64>();
input_stream.set_persistent_id(Some("input"));
add_replay_trace(&input_stream, trace_kind);
let output_handle = input_stream.accumulate_output_persistent(Some("output"));
(input_handle, output_handle)
}
fn replay_batch_to_indexed_zset(batches: &[IndexedReplayBatch]) -> OrdIndexedZSet<u64, u64> {
OrdIndexedZSet::from_tuples(
(),
batches
.iter()
.flatten()
.map(|Tup2(key, Tup2(value, weight))| Tup2(Tup2(*key, *value), *weight))
.collect(),
)
}
fn run_transactional_bootstrap_test(
trace_kind: ReplayTraceKind,
batches: Vec<IndexedReplayBatch>,
expect_multistep_replay: bool,
) {
init_test_logger();
let path = tempfile::tempdir().unwrap().keep();
let expected = replay_batch_to_indexed_zset(&batches);
let checkpoint = {
let (mut circuit, input_handle) =
Runtime::init_circuit(circuit_config(&path), move |circuit| {
Ok(transactional_bootstrap_circuit1(circuit, trace_kind))
})
.unwrap();
for mut batch in batches.clone() {
input_handle.append(&mut batch);
circuit.transaction().unwrap();
}
let checkpoint = circuit.checkpoint().run().unwrap();
circuit.kill().unwrap();
checkpoint
};
let mut circuit_config = circuit_config(&path);
circuit_config.storage.as_mut().unwrap().init_checkpoint = Some(checkpoint.uuid);
let (mut circuit, (_input_handle, output_handle)) =
Runtime::init_circuit(circuit_config, move |circuit| {
Ok(transactional_bootstrap_circuit2(circuit, trace_kind))
})
.unwrap();
assert_eq!(output_handle.num_nonempty_mailboxes(), 0);
if circuit.bootstrap_in_progress() {
circuit.start_transaction().unwrap();
circuit.start_commit_transaction().unwrap();
let mut incomplete_commit_steps = 0;
loop {
let commit_complete = circuit.step().unwrap();
if commit_complete {
break;
}
incomplete_commit_steps += 1;
}
if expect_multistep_replay {
assert!(
incomplete_commit_steps > 0,
"{trace_kind:?} replay finished in a single commit step despite the splitter chunk size"
);
}
}
assert!(!circuit.bootstrap_in_progress());
assert_eq!(output_handle.concat().consolidate(), expected);
circuit.kill().unwrap();
}
fn transactional_bootstrap_cases() -> Vec<(Vec<IndexedReplayBatch>, bool)> {
vec![
(vec![], false),
(vec![vec![Tup2(1, Tup2(10, 1))]], false),
(
vec![vec![Tup2(1, Tup2(10, 1)), Tup2(1, Tup2(11, 1))]],
false,
),
(
vec![
vec![
Tup2(1, Tup2(10, 1)),
Tup2(1, Tup2(11, 1)),
Tup2(2, Tup2(20, 1)),
],
vec![
Tup2(1, Tup2(11, -1)),
Tup2(1, Tup2(12, 1)),
Tup2(4, Tup2(40, 2)),
Tup2(5, Tup2(50, 2)),
Tup2(6, Tup2(50, 2)),
Tup2(7, Tup2(50, 2)),
Tup2(8, Tup2(50, 2)),
Tup2(9, Tup2(50, 2)),
],
],
true,
),
]
}
#[test]
fn test_integrate_trace_bootstrap_is_transactional() {
for (batches, expect_multistep_replay) in transactional_bootstrap_cases() {
run_transactional_bootstrap_test(
ReplayTraceKind::IntegrateTrace,
batches,
expect_multistep_replay,
);
}
}
#[test]
fn test_accumulate_trace_bootstrap_is_transactional() {
for (batches, expect_multistep_replay) in transactional_bootstrap_cases() {
run_transactional_bootstrap_test(
ReplayTraceKind::AccumulateTrace,
batches,
expect_multistep_replay,
);
}
}