use crate::algebra::ZBatchReader;
use crate::circuit::circuit_builder::StreamId;
use crate::circuit::metadata::{
BatchSizeStats, INPUT_BATCHES_STATS, MEMORY_ALLOCATIONS_COUNT, OUTPUT_BATCHES_STATS,
};
use crate::circuit::splitter_output_chunk_size;
use crate::dynamic::{ClonableTrait, Data, DynData};
use crate::operator::async_stream_operators::{StreamingBinaryOperator, StreamingBinaryWrapper};
use crate::trace::spine_async::{SpineCursor, WithSnapshot};
use crate::trace::{Spine, TupleBuilder};
use crate::{
DBData, Runtime, Timestamp, ZWeight,
algebra::{
AddByRef, HasOne, HasZero, IndexedZSet, Lattice, OrdIndexedZSet, OrdIndexedZSetFactories,
PartialOrder, ZRingValue,
},
circuit::{
Circuit, Scope, Stream, WithClock,
metadata::{
MetaItem, OperatorLocation, OperatorMeta, SHARED_MEMORY_BYTES, STATE_RECORDS_COUNT,
USED_MEMORY_BYTES,
},
operator_traits::{Operator, UnaryOperator},
},
circuit_cache_key,
dynamic::{DynPair, DynWeightedPairs, Erase},
trace::{Batch, BatchFactories, BatchReader, BatchReaderFactories, Builder, Cursor},
utils::Tup2,
};
use crate::{NestedCircuit, Position, RootCircuit};
use async_stream::stream;
use futures::Stream as AsyncStream;
use size_of::SizeOf;
use std::cell::{Cell, RefCell};
use std::panic::Location;
use std::rc::Rc;
use std::{
borrow::Cow,
cmp::{Ordering, min},
collections::BTreeMap,
marker::PhantomData,
ops::Neg,
};
use super::filter_map::DynFilterMap;
use super::{MonoIndexedZSet, MonoZSet};
circuit_cache_key!(DistinctId<C, D>(StreamId => Stream<C, D>));
circuit_cache_key!(DistinctIncrementalId<C, D>(StreamId => Stream<C, D>));
pub struct DistinctFactories<Z: IndexedZSet, T: Timestamp> {
pub input_factories: Z::Factories,
trace_factories: <T::TimedBatch<Z> as BatchReader>::Factories,
aux_factories: OrdIndexedZSetFactories<Z::Key, Z::Val>,
}
impl<Z: IndexedZSet, T: Timestamp> Clone for DistinctFactories<Z, T> {
fn clone(&self) -> Self {
Self {
input_factories: self.input_factories.clone(),
trace_factories: self.trace_factories.clone(),
aux_factories: self.aux_factories.clone(),
}
}
}
impl<Z, T> DistinctFactories<Z, T>
where
Z: IndexedZSet,
T: Timestamp,
{
pub fn new<KType, VType>() -> Self
where
KType: DBData + Erase<Z::Key>,
VType: DBData + Erase<Z::Val>,
{
Self {
input_factories: BatchReaderFactories::new::<KType, VType, ZWeight>(),
trace_factories: BatchReaderFactories::new::<KType, VType, ZWeight>(),
aux_factories: BatchReaderFactories::new::<KType, VType, ZWeight>(),
}
}
}
pub struct HashDistinctFactories<Z: IndexedZSet, T: Timestamp> {
pub input_factories: Z::Factories,
pub distinct_factories: DistinctFactories<OrdIndexedZSet<DynData, DynPair<Z::Key, Z::Val>>, T>,
}
impl<Z: IndexedZSet, T: Timestamp> Clone for HashDistinctFactories<Z, T> {
fn clone(&self) -> Self {
Self {
input_factories: self.input_factories.clone(),
distinct_factories: self.distinct_factories.clone(),
}
}
}
impl<Z, T> HashDistinctFactories<Z, T>
where
Z: IndexedZSet,
T: Timestamp,
{
pub fn new<KType, VType>() -> Self
where
KType: DBData + Erase<Z::Key>,
VType: DBData + Erase<Z::Val>,
{
Self {
input_factories: BatchReaderFactories::new::<KType, VType, ZWeight>(),
distinct_factories: DistinctFactories::new::<u64, Tup2<KType, VType>>(),
}
}
}
impl<C, D> Stream<C, D>
where
C: Circuit,
D: 'static,
{
pub fn mark_distinct(&self) -> Self {
self.circuit()
.cache_insert(DistinctIncrementalId::new(self.stream_id()), self.clone());
self.clone()
}
pub fn has_distinct_version(&self) -> bool {
self.circuit()
.cache_contains(&DistinctIncrementalId::<C, D>::new(self.stream_id()))
}
pub fn is_distinct(&self) -> bool {
self.circuit()
.cache_get(&DistinctIncrementalId::new(self.stream_id()))
.is_some_and(|value: Stream<C, D>| value.ptr_eq(self))
}
pub fn try_distinct_version(&self) -> Self {
self.circuit()
.cache_get(&DistinctIncrementalId::new(self.stream_id()))
.unwrap_or_else(|| self.clone())
}
pub fn mark_distinct_if<C2, D2>(&self, input: &Stream<C2, D2>)
where
C2: Circuit,
D2: 'static,
{
if input.has_distinct_version() {
self.mark_distinct();
}
}
}
impl Stream<RootCircuit, MonoIndexedZSet> {
pub fn dyn_distinct_mono(
&self,
factories: &DistinctFactories<MonoIndexedZSet, ()>,
) -> Stream<RootCircuit, MonoIndexedZSet> {
self.dyn_distinct(factories)
}
pub fn dyn_hash_distinct_mono(
&self,
factories: &HashDistinctFactories<MonoIndexedZSet, ()>,
) -> Stream<RootCircuit, MonoIndexedZSet> {
self.dyn_hash_distinct(factories)
}
}
impl Stream<RootCircuit, MonoZSet> {
pub fn dyn_distinct_mono(
&self,
factories: &DistinctFactories<MonoZSet, ()>,
) -> Stream<RootCircuit, MonoZSet> {
self.dyn_distinct(factories)
}
pub fn dyn_hash_distinct_mono(
&self,
factories: &HashDistinctFactories<MonoZSet, ()>,
) -> Stream<RootCircuit, MonoZSet> {
self.dyn_hash_distinct(factories)
}
}
impl Stream<NestedCircuit, MonoIndexedZSet> {
pub fn dyn_distinct_mono(
&self,
factories: &DistinctFactories<MonoIndexedZSet, <NestedCircuit as WithClock>::Time>,
) -> Stream<NestedCircuit, MonoIndexedZSet> {
self.dyn_distinct(factories)
}
pub fn dyn_hash_distinct_mono(
&self,
factories: &HashDistinctFactories<MonoIndexedZSet, <NestedCircuit as WithClock>::Time>,
) -> Stream<NestedCircuit, MonoIndexedZSet> {
self.dyn_hash_distinct(factories)
}
}
impl Stream<NestedCircuit, MonoZSet> {
pub fn dyn_distinct_mono(
&self,
factories: &DistinctFactories<MonoZSet, <NestedCircuit as WithClock>::Time>,
) -> Stream<NestedCircuit, MonoZSet> {
self.dyn_distinct(factories)
}
pub fn dyn_hash_distinct_mono(
&self,
factories: &HashDistinctFactories<MonoZSet, <NestedCircuit as WithClock>::Time>,
) -> Stream<NestedCircuit, MonoZSet> {
self.dyn_hash_distinct(factories)
}
}
impl<C, Z> Stream<C, Z>
where
C: Circuit,
{
pub fn dyn_stream_distinct(&self, input_factories: &Z::Factories) -> Stream<C, Z>
where
Z: IndexedZSet + Send,
{
let stream = self.dyn_shard(input_factories);
self.circuit()
.cache_get_or_insert_with(DistinctId::new(stream.stream_id()), || {
self.circuit()
.add_unary_operator(Distinct::new(), &stream)
.mark_sharded()
.mark_distinct()
})
.clone()
}
pub fn dyn_hash_distinct(&self, factories: &HashDistinctFactories<Z, C::Time>) -> Stream<C, Z>
where
Z: IndexedZSet + Send + DynFilterMap,
{
let circuit = self.circuit();
circuit.region("hash_distinct", || {
let stream = self.dyn_shard(&factories.input_factories);
circuit
.cache_get_or_insert_with(DistinctIncrementalId::new(stream.stream_id()), || {
let by_hash: Stream<C, OrdIndexedZSet<DynData, DynPair<Z::Key, Z::Val>>> =
stream
.dyn_map_index(
&factories.distinct_factories.input_factories,
Box::new(|item, kv| {
let (k, v) = Z::item_ref_keyval(item);
let (new_k, new_v) = kv.split_mut();
let hash = k.default_hash();
Erase::<DynData>::erase(&hash).clone_to(new_k);
new_v.from_refs(k, v);
}),
)
.mark_sharded();
let distinct =
Stream::dyn_distinct_inner(&by_hash, &factories.distinct_factories);
distinct
.dyn_map_generic(
&factories.input_factories,
Box::new(|(_hash, kv), new_kv| kv.clone_to(new_kv)),
)
.mark_sharded()
.mark_distinct()
})
.clone()
})
}
pub fn dyn_distinct(&self, factories: &DistinctFactories<Z, C::Time>) -> Stream<C, Z>
where
Z: IndexedZSet + Send,
{
let circuit = self.circuit();
circuit.region("distinct", || {
let stream = self.dyn_shard(&factories.input_factories);
circuit
.cache_get_or_insert_with(DistinctIncrementalId::new(stream.stream_id()), || {
stream.dyn_distinct_inner(factories)
})
.clone()
})
}
pub fn dyn_distinct_inner(&self, factories: &DistinctFactories<Z, C::Time>) -> Stream<C, Z>
where
Z: IndexedZSet + Send,
{
let circuit = self.circuit();
assert!(self.is_sharded());
if circuit.root_scope() == 0 {
circuit.add_binary_operator(
StreamingBinaryWrapper::new(DistinctIncrementalTotal::new(
Location::caller(),
&factories.input_factories,
)),
&self.dyn_accumulate(&factories.input_factories),
&self
.dyn_accumulate_integrate_trace(&factories.input_factories)
.accumulate_delay_trace(),
)
} else {
circuit.add_binary_operator(
StreamingBinaryWrapper::new(DistinctIncremental::new(
Location::caller(),
&factories.input_factories,
&factories.aux_factories,
circuit.clone(),
)),
&self.dyn_accumulate(&factories.input_factories),
&self.dyn_accumulate_trace(&factories.trace_factories, &factories.input_factories),
)
}
.mark_sharded()
.mark_distinct()
}
}
pub struct Distinct<Z> {
_type: PhantomData<Z>,
}
impl<Z> Distinct<Z> {
pub fn new() -> Self {
Self { _type: PhantomData }
}
}
impl<Z> Default for Distinct<Z> {
fn default() -> Self {
Self::new()
}
}
impl<Z> Operator for Distinct<Z>
where
Z: 'static,
{
fn name(&self) -> Cow<'static, str> {
Cow::from("Distinct")
}
fn fixedpoint(&self, _scope: Scope) -> bool {
true
}
}
impl<Z> UnaryOperator<Z, Z> for Distinct<Z>
where
Z: IndexedZSet,
{
async fn eval(&mut self, input: &Z) -> Z {
input.distinct()
}
async fn eval_owned(&mut self, input: Z) -> Z {
input.distinct_owned()
}
}
struct DistinctIncrementalTotal<Z: IndexedZSet, I> {
input_factories: Z::Factories,
location: &'static Location<'static>,
input_batch_stats: RefCell<BatchSizeStats>,
output_batch_stats: RefCell<BatchSizeStats>,
chunk_size: usize,
_type: PhantomData<(Z, I)>,
}
impl<Z: IndexedZSet, I> DistinctIncrementalTotal<Z, I> {
pub fn new(location: &'static Location<'static>, input_factories: &Z::Factories) -> Self {
Self {
input_factories: input_factories.clone(),
location,
input_batch_stats: RefCell::new(BatchSizeStats::new()),
output_batch_stats: RefCell::new(BatchSizeStats::new()),
chunk_size: splitter_output_chunk_size(),
_type: PhantomData,
}
}
fn maybe_yield(
&self,
builder: &mut Z::Builder,
delta_cursor: &mut SpineCursor<Z>,
any_values: &mut bool,
) -> Option<(Z, bool, Option<Position>)> {
if builder.num_tuples() >= self.chunk_size {
if *any_values {
builder.push_key(delta_cursor.key());
*any_values = false;
}
let builder = std::mem::replace(
builder,
Z::Builder::with_capacity(
&self.input_factories,
builder.num_keys(),
self.chunk_size,
),
);
let result = builder.done();
self.output_batch_stats.borrow_mut().add_batch(result.len());
Some((result, false, delta_cursor.position()))
} else {
None
}
}
}
impl<Z, I> Operator for DistinctIncrementalTotal<Z, I>
where
Z: IndexedZSet,
I: 'static,
{
fn name(&self) -> Cow<'static, str> {
Cow::from("DistinctIncrementalTotal")
}
fn location(&self) -> OperatorLocation {
Some(self.location)
}
fn metadata(&self, meta: &mut OperatorMeta) {
meta.extend(metadata! {
INPUT_BATCHES_STATS => self.input_batch_stats.borrow().metadata(),
OUTPUT_BATCHES_STATS => self.output_batch_stats.borrow().metadata(),
});
}
fn fixedpoint(&self, _scope: Scope) -> bool {
true
}
}
impl<Z, I> StreamingBinaryOperator<Option<Spine<Z>>, I, Z> for DistinctIncrementalTotal<Z, I>
where
Z: IndexedZSet,
I: WithSnapshot<Batch = Z> + 'static,
{
fn eval(
self: Rc<Self>,
delta: &Option<Spine<Z>>,
delayed_integral: &I,
) -> impl AsyncStream<Item = (Z, bool, Option<Position>)> + 'static {
let delta = delta.as_ref().map(|b| b.ro_snapshot());
let delayed_integral = if delta.is_some() {
Some(delayed_integral.ro_snapshot())
} else {
None
};
stream! {
let Some(delta) = delta else {
yield (Z::dyn_empty(&self.input_factories), true, None);
return
};
self.input_batch_stats.borrow_mut().add_batch(delta.len());
let mut builder = Z::Builder::with_capacity(&self.input_factories, self.chunk_size, self.chunk_size);
let mut delta_cursor = delta.cursor();
let fetched = if Runtime::with_dev_tweaks(|d| d.fetch_distinct == Some(true)) {
delayed_integral.as_ref().unwrap().fetch(&delta).await
} else {
None
};
let mut integral_cursor = match &fetched {
Some(fetched) => fetched.get_cursor(),
None => Box::new(delayed_integral.unwrap().cursor()),
};
while delta_cursor.key_valid() {
let mut any_values = false;
if integral_cursor.seek_key_exact(delta_cursor.key(), None) {
while delta_cursor.val_valid() {
let w = **delta_cursor.weight();
let v = delta_cursor.val();
integral_cursor.seek_val(v);
let old_weight = if integral_cursor.val_valid() && integral_cursor.val() == v {
**integral_cursor.weight()
} else {
HasZero::zero()
};
let new_weight = old_weight.add_by_ref(&w);
if old_weight.le0() {
if new_weight.ge0() && !new_weight.is_zero() {
builder.push_val_diff(v, ZWeight::one().erase());
any_values = true;
}
} else if new_weight.le0() {
builder.push_val_diff(v, ZWeight::one().neg().erase());
any_values = true;
}
if let Some(output) = self.maybe_yield(&mut builder, &mut delta_cursor, &mut any_values) {
yield output;
}
delta_cursor.step_val();
}
} else {
while delta_cursor.val_valid() {
let new_weight = **delta_cursor.weight();
debug_assert!(!new_weight.is_zero());
if new_weight.ge0() {
builder.push_val_diff(delta_cursor.val(), ZWeight::one().erase());
any_values = true;
}
if let Some(output) = self.maybe_yield(&mut builder, &mut delta_cursor, &mut any_values) {
yield output;
}
delta_cursor.step_val();
}
};
if any_values {
builder.push_key(delta_cursor.key());
}
delta_cursor.step_key();
}
let result = builder.done();
self.output_batch_stats.borrow_mut().add_batch(result.len());
yield (result, true, delta_cursor.position())
}
}
}
type KeysOfInterest<TS, K, V, R> = BTreeMap<TS, Box<DynWeightedPairs<DynPair<K, V>, R>>>;
#[derive(SizeOf)]
struct DistinctIncremental<Z, T, Clk>
where
Z: IndexedZSet,
T: WithSnapshot,
{
#[size_of(skip)]
input_factories: Z::Factories,
location: &'static Location<'static>,
#[size_of(skip)]
aux_factories: OrdIndexedZSetFactories<Z::Key, Z::Val>,
#[size_of(skip)]
clock: Clk,
keys_of_interest:
RefCell<KeysOfInterest<<T::Batch as BatchReader>::Time, Z::Key, Z::Val, Z::R>>,
empty_input: Cell<bool>,
empty_output: Cell<bool>,
distinct_vals: RefCell<Vec<(Option<<T::Batch as BatchReader>::Time>, ZWeight)>>,
#[size_of(skip)]
input_batch_stats: RefCell<BatchSizeStats>,
#[size_of(skip)]
output_batch_stats: RefCell<BatchSizeStats>,
chunk_size: usize,
_type: PhantomData<(Z, T)>,
}
impl<Z, T, Clk> DistinctIncremental<Z, T, Clk>
where
Z: IndexedZSet,
T: WithSnapshot,
T::Batch: ZBatchReader<Key = Z::Key, Val = Z::Val>,
Clk: WithClock<Time = <T::Batch as BatchReader>::Time>,
{
fn new(
location: &'static Location<'static>,
input_factories: &Z::Factories,
aux_factories: &OrdIndexedZSetFactories<Z::Key, Z::Val>,
clock: Clk,
) -> Self {
let depth = <T::Batch as BatchReader>::Time::NESTING_DEPTH;
Self {
location,
input_factories: input_factories.clone(),
aux_factories: aux_factories.clone(),
clock,
keys_of_interest: RefCell::new(BTreeMap::new()),
empty_input: Cell::new(false),
empty_output: Cell::new(false),
distinct_vals: RefCell::new(vec![(None, HasZero::zero()); 2 << depth]),
input_batch_stats: RefCell::new(BatchSizeStats::new()),
output_batch_stats: RefCell::new(BatchSizeStats::new()),
chunk_size: splitter_output_chunk_size(),
_type: PhantomData,
}
}
fn eval_keyval(
&self,
time: &Clk::Time,
key: &Z::Key,
val: &Z::Val,
trace_cursor: &mut SpineCursor<T::Batch>,
output: &mut TupleBuilder<Z::Builder, Z>,
item: &mut DynPair<DynPair<Z::Key, Z::Val>, Z::R>,
) {
trace_cursor.seek_val(val);
if trace_cursor.val_valid() && trace_cursor.val() == val {
let mut time_of_interest = None;
self.clear_distinct_vals();
trace_cursor.map_times(&mut |trace_ts, weight| {
for (ts, total_weight) in self.distinct_vals.borrow_mut().iter_mut() {
if let Some(ts) = ts
&& trace_ts.less_equal(ts)
{
*total_weight += **weight;
}
}
if !trace_ts.less_equal(time) {
time_of_interest = match time_of_interest.take() {
None => Some(time.join(trace_ts)),
Some(time_of_interest) => Some(min(time_of_interest, time.join(trace_ts))),
}
}
});
for (_time, weight) in self.distinct_vals.borrow_mut().iter_mut() {
if weight.le0() {
*weight = HasZero::zero();
} else {
*weight = HasOne::one();
}
}
let output_weight = Self::partial_derivative(&self.distinct_vals.borrow());
if !output_weight.is_zero() {
output.push_refs(key, val, &(), output_weight.erase());
}
let (kv, weight) = item.split_mut();
**weight = HasOne::one();
kv.from_refs(key, val);
if let Some(t) = time_of_interest {
self.keys_of_interest
.borrow_mut()
.entry(t)
.or_insert_with(|| self.aux_factories.weighted_items_factory().default_box())
.push_val(&mut *item);
}
}
}
fn maybe_yield<C>(
&self,
delta_cursor: &C,
result_builder: &mut TupleBuilder<Z::Builder, Z>,
) -> Option<(Z, bool, Option<Position>)>
where
C: Cursor<Z::Key, Z::Val, Z::Time, Z::R>,
{
if result_builder.num_tuples() >= self.chunk_size {
let builder = std::mem::replace(
result_builder,
TupleBuilder::new(
&self.input_factories,
Z::Builder::with_capacity(
&self.input_factories,
result_builder.num_keys(),
self.chunk_size,
),
),
);
let result = builder.done();
self.empty_output
.update(|empty_output| empty_output & result.is_empty());
self.output_batch_stats.borrow_mut().add_batch(result.len());
Some((result, false, delta_cursor.position()))
} else {
None
}
}
fn init_distinct_vals(
vals: &mut [(Option<<T::Batch as BatchReader>::Time>, ZWeight)],
ts: Option<<T::Batch as BatchReader>::Time>,
) {
if vals.len() == 1 {
vals[0] = (ts, HasZero::zero());
} else {
let half_len = vals.len() >> 1;
Self::init_distinct_vals(
&mut vals[0..half_len],
ts.as_ref()
.and_then(|ts| ts.checked_recede(half_len.ilog2() as Scope)),
);
Self::init_distinct_vals(&mut vals[half_len..], ts);
}
}
fn clear_distinct_vals(&self) {
for (_time, val) in self.distinct_vals.borrow_mut().iter_mut() {
*val = HasZero::zero();
}
}
fn partial_derivative(vals: &[(Option<<T::Batch as BatchReader>::Time>, ZWeight)]) -> ZWeight {
if vals.len() == 1 {
vals[0].1
} else {
Self::partial_derivative(&vals[vals.len() >> 1..])
+ Self::partial_derivative(&vals[0..vals.len() >> 1]).neg()
}
}
}
impl<Z, T, Clk> Operator for DistinctIncremental<Z, T, Clk>
where
Z: IndexedZSet,
T: WithSnapshot + 'static,
T::Batch: ZBatchReader<Key = Z::Key, Val = Z::Val>,
Clk: WithClock<Time = <T::Batch as BatchReader>::Time> + 'static,
{
fn name(&self) -> Cow<'static, str> {
Cow::Borrowed("DistinctIncremental")
}
fn location(&self) -> OperatorLocation {
Some(self.location)
}
fn metadata(&self, meta: &mut OperatorMeta) {
let size: usize = self
.keys_of_interest
.borrow()
.values()
.map(|v| v.len())
.sum();
let bytes = self.size_of();
meta.extend(metadata! {
STATE_RECORDS_COUNT => MetaItem::Count(size),
INPUT_BATCHES_STATS => self.input_batch_stats.borrow().metadata(),
OUTPUT_BATCHES_STATS => self.output_batch_stats.borrow().metadata(),
USED_MEMORY_BYTES => MetaItem::bytes(bytes.used_bytes()),
MEMORY_ALLOCATIONS_COUNT => MetaItem::Count(bytes.distinct_allocations()),
SHARED_MEMORY_BYTES => MetaItem::bytes(bytes.shared_bytes()),
});
}
fn clock_start(&mut self, scope: Scope) {
if scope == 0 {
self.empty_input.set(false);
self.empty_output.set(false);
}
}
fn clock_end(&mut self, scope: Scope) {
debug_assert!(self.keys_of_interest.borrow().keys().all(|ts| {
if ts.less_equal(&self.clock.time().epoch_end(scope)) {
eprintln!(
"ts: {ts:?}, epoch_end: {:?}",
self.clock.time().epoch_end(scope)
);
}
!ts.less_equal(&self.clock.time().epoch_end(scope))
}));
}
fn fixedpoint(&self, scope: Scope) -> bool {
let epoch_end = self.clock.time().epoch_end(scope);
self.empty_input.get()
&& self.empty_output.get()
&& self
.keys_of_interest
.borrow()
.keys()
.all(|ts| !ts.less_equal(&epoch_end))
}
}
impl<Z, T, Clk> StreamingBinaryOperator<Option<Spine<Z>>, T, Z> for DistinctIncremental<Z, T, Clk>
where
Z: IndexedZSet,
T: WithSnapshot + 'static,
T::Batch: ZBatchReader<Key = Z::Key, Val = Z::Val>,
Clk: WithClock<Time = <T::Batch as BatchReader>::Time> + 'static,
{
fn eval(
self: Rc<Self>,
delta: &Option<Spine<Z>>,
trace: &T,
) -> impl AsyncStream<Item = (Z, bool, Option<Position>)> + 'static {
let delta = delta.as_ref().map(|b| b.ro_snapshot());
let trace = if delta.is_some() {
Some(trace.ro_snapshot())
} else {
None
};
self.empty_output.set(true);
stream! {
let Some(delta) = delta else {
yield (Z::dyn_empty(&self.input_factories), true, None);
return;
};
let time = self.clock.time();
self.input_batch_stats.borrow_mut().add_batch(delta.len());
Self::init_distinct_vals(&mut self.distinct_vals.borrow_mut(), Some(time.clone()));
self.empty_input.set(delta.is_empty());
let result_builder = Z::Builder::with_capacity(&self.input_factories, self.chunk_size, self.chunk_size);
let mut result_builder = TupleBuilder::new(&self.input_factories, result_builder);
let mut delta_cursor = delta.cursor();
let mut trace_cursor = trace.unwrap().cursor();
let mut keys_of_interest = self
.keys_of_interest
.borrow_mut()
.remove(&time)
.unwrap_or_else(|| self.aux_factories.weighted_items_factory().default_box());
let keys_of_interest = <OrdIndexedZSet<Z::Key, Z::Val>>::dyn_from_tuples(
&self.aux_factories,
(),
&mut keys_of_interest,
);
let mut keys_of_interest_cursor = keys_of_interest.cursor();
let mut item = self.aux_factories.weighted_item_factory().default_box();
while delta_cursor.key_valid() && keys_of_interest_cursor.key_valid() {
match delta_cursor.key().cmp(keys_of_interest_cursor.key()) {
Ordering::Less => {
if trace_cursor.seek_key_exact(delta_cursor.key(), None) {
while delta_cursor.val_valid() {
self.eval_keyval(
&time,
delta_cursor.key(),
delta_cursor.val(),
&mut trace_cursor,
&mut result_builder,
&mut *item,
);
if let Some(output) = self.maybe_yield(&delta_cursor, &mut result_builder) {
yield output;
}
delta_cursor.step_val();
}
}
delta_cursor.step_key();
}
Ordering::Greater => {
if trace_cursor.seek_key_exact(keys_of_interest_cursor.key(), None) {
while keys_of_interest_cursor.val_valid() {
self.eval_keyval(
&time,
keys_of_interest_cursor.key(),
keys_of_interest_cursor.val(),
&mut trace_cursor,
&mut result_builder,
&mut *item,
);
if let Some(output) = self.maybe_yield(&delta_cursor, &mut result_builder) {
yield output;
}
keys_of_interest_cursor.step_val();
}
}
keys_of_interest_cursor.step_key();
}
Ordering::Equal => {
if trace_cursor.seek_key_exact(keys_of_interest_cursor.key(), None) {
while delta_cursor.val_valid() && keys_of_interest_cursor.val_valid() {
match delta_cursor.val().cmp(keys_of_interest_cursor.val()) {
Ordering::Less => {
self.eval_keyval(
&time,
delta_cursor.key(),
delta_cursor.val(),
&mut trace_cursor,
&mut result_builder,
&mut *item,
);
delta_cursor.step_val();
}
Ordering::Greater => {
self.eval_keyval(
&time,
keys_of_interest_cursor.key(),
keys_of_interest_cursor.val(),
&mut trace_cursor,
&mut result_builder,
&mut *item,
);
keys_of_interest_cursor.step_val();
}
Ordering::Equal => {
self.eval_keyval(
&time,
delta_cursor.key(),
delta_cursor.val(),
&mut trace_cursor,
&mut result_builder,
&mut *item,
);
delta_cursor.step_val();
keys_of_interest_cursor.step_val();
}
}
if let Some(output) = self.maybe_yield(&delta_cursor, &mut result_builder) {
yield output;
}
}
while delta_cursor.val_valid() {
self.eval_keyval(
&time,
delta_cursor.key(),
delta_cursor.val(),
&mut trace_cursor,
&mut result_builder,
&mut *item,
);
if let Some(output) = self.maybe_yield(&delta_cursor, &mut result_builder) {
yield output;
}
delta_cursor.step_val();
}
while keys_of_interest_cursor.val_valid() {
self.eval_keyval(
&time,
keys_of_interest_cursor.key(),
keys_of_interest_cursor.val(),
&mut trace_cursor,
&mut result_builder,
&mut *item,
);
if let Some(output) = self.maybe_yield(&delta_cursor, &mut result_builder) {
yield output;
}
keys_of_interest_cursor.step_val();
}
}
delta_cursor.step_key();
keys_of_interest_cursor.step_key();
}
}
}
while delta_cursor.key_valid() {
if trace_cursor.seek_key_exact(delta_cursor.key(), None) {
while delta_cursor.val_valid() {
self.eval_keyval(
&time,
delta_cursor.key(),
delta_cursor.val(),
&mut trace_cursor,
&mut result_builder,
&mut *item,
);
if let Some(output) = self.maybe_yield(&delta_cursor, &mut result_builder) {
yield output;
}
delta_cursor.step_val();
}
}
delta_cursor.step_key();
}
while keys_of_interest_cursor.key_valid() {
if trace_cursor.seek_key_exact(keys_of_interest_cursor.key(), None) {
while keys_of_interest_cursor.val_valid() {
self.eval_keyval(
&time,
keys_of_interest_cursor.key(),
keys_of_interest_cursor.val(),
&mut trace_cursor,
&mut result_builder,
&mut *item,
);
if let Some(output) = self.maybe_yield(&delta_cursor, &mut result_builder) {
yield output;
}
keys_of_interest_cursor.step_val();
}
}
keys_of_interest_cursor.step_key();
}
let result = result_builder.done();
self.empty_output.update(|empty_output| empty_output & result.is_empty());
self.output_batch_stats.borrow_mut().add_batch(result.len());
yield (result, true, delta_cursor.position());
}
}
}
#[cfg(test)]
mod test {
use anyhow::Result as AnyResult;
use std::{cell::RefCell, rc::Rc};
use crate::{
Circuit, IndexedZSetHandle, RootCircuit, Runtime, ZSetHandle,
circuit::CircuitConfig,
indexed_zset,
operator::{GeneratorNested, OutputHandle},
typed_batch::{IndexedZSetReader, OrdIndexedZSet, OrdZSet, SpineSnapshot},
utils::Tup2,
zset,
};
use proptest::{collection, prelude::*};
fn do_distinct_inc_test_mt(workers: usize) {
let hruntime = Runtime::run(workers, |_parker| {
distinct_inc_test();
})
.expect("Runtime successful");
hruntime.join().unwrap();
}
#[test]
fn distinct_inc_test_mt() {
do_distinct_inc_test_mt(1);
do_distinct_inc_test_mt(2);
do_distinct_inc_test_mt(4);
do_distinct_inc_test_mt(16);
}
#[test]
fn distinct_inc_test() {
let mut circuit = Runtime::init_circuit(1, move |circuit| {
let mut inputs = vec![
vec![zset! { 1 => 1, 2 => 1 }, zset! { 2 => -1, 3 => 2, 4 => 2 }],
vec![zset! { 2 => 1, 3 => 1 }, zset! { 3 => -2, 4 => -1 }],
vec![
zset! { 5 => 1, 6 => 1 },
zset! { 2 => -1, 7 => 1 },
zset! { 2 => 1, 7 => -1, 8 => 2, 9 => 1 },
],
]
.into_iter();
circuit
.iterate(|child| {
let counter = Rc::new(RefCell::new(0));
let counter_clone = counter.clone();
let input = child.add_source(GeneratorNested::new(
Box::new(move || {
*counter_clone.borrow_mut() = 0;
if Runtime::worker_index() == 0 {
let mut deltas = inputs.next().unwrap_or_default().into_iter();
Box::new(move || deltas.next().unwrap_or_else(|| zset! {}))
} else {
Box::new(|| zset! {})
}
}),
zset! {},
));
let distinct_inc = input.distinct().gather(0);
let hash_distinct_inc = input.hash_distinct().gather(0);
hash_distinct_inc.accumulate_apply2(&distinct_inc, |d1, d2| {
assert_eq!(d1.iter().collect::<Vec<_>>(), d2.iter().collect::<Vec<_>>())
});
Ok((
async move || {
*counter.borrow_mut() += 1;
Ok(*counter.borrow() == 4)
},
(),
))
})
.unwrap();
Ok(())
})
.unwrap()
.0;
for _ in 0..3 {
circuit.transaction().unwrap();
}
}
#[test]
fn distinct_indexed_test_small_steps() {
distinct_indexed_test(false);
}
#[test]
fn distinct_indexed_test_big_step() {
distinct_indexed_test(true);
}
fn distinct_indexed_test(transaction: bool) {
let inputs = vec![
vec![
Tup2(1, Tup2(0, 1)),
Tup2(1, Tup2(1, 2)),
Tup2(2, Tup2(0, 1)),
Tup2(2, Tup2(1, 1)),
],
vec![Tup2(3, Tup2(1, 1)), Tup2(2, Tup2(1, 1))],
vec![Tup2(1, Tup2(1, 3)), Tup2(2, Tup2(1, -3))],
];
let expected_outputs = vec![
indexed_zset! { 1 => { 0 => 1, 1 => 1}, 2 => { 0 => 1, 1 => 1 } },
indexed_zset! { 1 => { 0 => 1, 1 => 1}, 2 => { 0 => 1, 1 => 1 }, 3 => { 1 => 1 } },
indexed_zset! { 1 => { 0 => 1, 1 => 1}, 2 => { 0 => 1 }, 3 => { 1 => 1 } },
];
let (mut circuit, (input, stream_distinct_output, distinct_output, hash_distinct_output)) =
Runtime::init_circuit(4, move |circuit| {
let (input, input_handle) = circuit.add_input_indexed_zset::<u64, u64>();
let stream_distinct_output = circuit
.non_incremental(&input, |_child, input| {
Ok(input.integrate().stream_distinct())
})
.unwrap()
.accumulate_output();
let distinct_output = input.distinct().accumulate_integrate().accumulate_output();
let hash_distinct_output = input
.hash_distinct()
.accumulate_integrate()
.accumulate_output();
Ok((
input_handle,
stream_distinct_output,
distinct_output,
hash_distinct_output,
))
})
.unwrap();
if transaction {
circuit.start_transaction().unwrap();
for mut i in inputs.into_iter() {
input.append(&mut i);
circuit.step().unwrap();
}
circuit.commit_transaction().unwrap();
let expected_output = expected_outputs.last().unwrap().clone();
assert_eq!(
stream_distinct_output.concat().consolidate(),
expected_output
);
assert_eq!(distinct_output.concat().consolidate(), expected_output);
assert_eq!(hash_distinct_output.concat().consolidate(), expected_output);
} else {
for (mut i, o) in inputs.into_iter().zip(expected_outputs.into_iter()) {
input.append(&mut i);
circuit.transaction().unwrap();
assert_eq!(stream_distinct_output.concat().consolidate(), o);
assert_eq!(distinct_output.concat().consolidate(), o);
assert_eq!(hash_distinct_output.concat().consolidate(), o);
}
}
circuit.kill().unwrap();
}
type TestZSet = OrdZSet<u64>;
type TestIndexedZSet = OrdIndexedZSet<u64, i64>;
const MAX_ROUNDS: usize = 15;
const MAX_ITERATIONS: usize = 15;
const NUM_KEYS: u64 = 10;
const MAX_VAL: i64 = 3;
const MAX_TUPLES: usize = 10;
fn test_zset() -> impl Strategy<Value = Vec<Tup2<u64, i64>>> {
collection::vec((0..NUM_KEYS, -1..=1i64), 0..MAX_TUPLES)
.prop_map(|tuples| tuples.into_iter().map(|(k, w)| Tup2(k, w)).collect())
}
fn test_input() -> impl Strategy<Value = Vec<Vec<Tup2<u64, i64>>>> {
collection::vec(test_zset(), 0..MAX_ROUNDS * MAX_ITERATIONS)
}
fn test_indexed_zset() -> impl Strategy<Value = TestIndexedZSet> {
collection::vec(
(
(0..NUM_KEYS, -MAX_VAL..MAX_VAL).prop_map(|(x, y)| Tup2(x, y)),
-1..=1i64,
)
.prop_map(|(x, y)| Tup2(x, y)),
0..MAX_TUPLES,
)
.prop_map(|tuples| OrdIndexedZSet::from_tuples((), tuples))
}
fn test_indexed_vec() -> impl Strategy<Value = Vec<Tup2<u64, Tup2<i64, i64>>>> {
collection::vec(
(
(0..NUM_KEYS, -MAX_VAL..MAX_VAL).prop_map(|(x, y)| Tup2(x, y)),
-1..=1i64,
)
.prop_map(|(x, y)| Tup2(x.0, Tup2(x.1, y))),
0..MAX_TUPLES,
)
}
fn test_indexed_input() -> impl Strategy<Value = Vec<Vec<Tup2<u64, Tup2<i64, i64>>>>> {
collection::vec(test_indexed_vec(), 0..MAX_ROUNDS * MAX_ITERATIONS)
}
fn test_indexed_nested_input() -> impl Strategy<Value = Vec<Vec<TestIndexedZSet>>> {
collection::vec(
collection::vec(test_indexed_zset(), 0..MAX_ITERATIONS),
0..MAX_ROUNDS,
)
}
fn distinct_test_circuit(
circuit: &mut RootCircuit,
) -> AnyResult<(
ZSetHandle<u64>,
OutputHandle<SpineSnapshot<TestZSet>>,
OutputHandle<SpineSnapshot<TestZSet>>,
OutputHandle<SpineSnapshot<TestZSet>>,
)> {
let (input, input_handle) = circuit.add_input_zset::<u64>();
let distinct_inc = input.distinct().accumulate_output();
let hash_distinct_inc = input.hash_distinct().accumulate_output();
let distinct_noninc = circuit
.non_incremental(&input, |_child_circuit, input| {
Ok(input.integrate().stream_distinct().differentiate())
})
.unwrap()
.accumulate_output();
Ok((
input_handle,
distinct_inc,
hash_distinct_inc,
distinct_noninc,
))
}
fn distinct_indexed_test_circuit(
circuit: &mut RootCircuit,
) -> AnyResult<(
IndexedZSetHandle<u64, i64>,
OutputHandle<SpineSnapshot<TestIndexedZSet>>,
OutputHandle<SpineSnapshot<TestIndexedZSet>>,
OutputHandle<SpineSnapshot<TestIndexedZSet>>,
)> {
let (input, input_handle) = circuit.add_input_indexed_zset::<u64, i64>();
let distinct_inc = input.distinct().accumulate_output();
let hash_distinct_inc = input.hash_distinct().accumulate_output();
let distinct_noninc = circuit
.non_incremental(&input, |_child_circuit, input| {
Ok(input.integrate().stream_distinct().differentiate())
})
.unwrap()
.accumulate_output();
Ok((
input_handle,
distinct_inc,
hash_distinct_inc,
distinct_noninc,
))
}
fn distinct_indexed_nested_test_circuit(
circuit: &mut RootCircuit,
inputs: Vec<Vec<TestIndexedZSet>>,
) -> AnyResult<()> {
let mut inputs = inputs.into_iter();
circuit
.iterate(|child| {
let counter = Rc::new(RefCell::new(0));
let counter_clone = counter.clone();
let input = child.add_source(GeneratorNested::new(
Box::new(move || {
*counter_clone.borrow_mut() = 0;
if Runtime::worker_index() == 0 {
let mut deltas = inputs.next().unwrap_or_default().into_iter();
Box::new(move || deltas.next().unwrap_or_else(|| indexed_zset! {}))
} else {
Box::new(|| indexed_zset! {})
}
}),
indexed_zset! {},
));
let distinct_inc = input.distinct().gather(0);
let hash_distinct_inc = input.hash_distinct().gather(0);
let distinct_noninc = child
.non_incremental(&input, |_child_circuit, input| {
Ok(input
.integrate_nested()
.integrate()
.stream_distinct()
.differentiate()
.differentiate_nested()
.gather(0))
})
.unwrap();
distinct_inc.accumulate_apply2(&distinct_noninc, |d1, d2| {
assert_eq!(d1.iter().collect::<Vec<_>>(), d2.iter().collect::<Vec<_>>());
});
distinct_inc.accumulate_apply2(&hash_distinct_inc, |d1, d2| {
assert_eq!(d1.iter().collect::<Vec<_>>(), d2.iter().collect::<Vec<_>>());
});
Ok((
async move || {
*counter.borrow_mut() += 1;
Ok(*counter.borrow() == MAX_ITERATIONS)
},
(),
))
})
.unwrap();
Ok(())
}
fn proptest_distinct_test_mt(
inputs: Vec<Vec<Tup2<u64, i64>>>,
workers: usize,
transaction: bool,
) {
let (mut circuit, (input_handle, inc_output, hash_inc_output, noninc_output)) =
Runtime::init_circuit(
CircuitConfig::from(workers).with_splitter_chunk_size_records(2),
distinct_test_circuit,
)
.unwrap();
if transaction {
circuit.start_transaction().unwrap();
for mut input_batch in inputs.into_iter() {
input_handle.append(&mut input_batch);
circuit.step().unwrap();
}
circuit.commit_transaction().unwrap();
let noninc_output = noninc_output.concat().consolidate();
let inc_output = inc_output.concat().consolidate();
let hash_inc_output = hash_inc_output.concat().consolidate();
assert_eq!(noninc_output, hash_inc_output);
assert_eq!(noninc_output, inc_output);
assert_eq!(hash_inc_output, inc_output);
} else {
for mut input_batch in inputs.into_iter() {
input_handle.append(&mut input_batch);
circuit.transaction().unwrap();
let noninc_output = noninc_output.concat().consolidate();
let inc_output = inc_output.concat().consolidate();
let hash_inc_output = hash_inc_output.concat().consolidate();
assert_eq!(noninc_output, hash_inc_output);
assert_eq!(noninc_output, inc_output);
assert_eq!(hash_inc_output, inc_output);
}
}
circuit.kill().unwrap();
}
fn proptest_distinct_indexed_test_mt(
inputs: Vec<Vec<Tup2<u64, Tup2<i64, i64>>>>,
workers: usize,
transaction: bool,
) {
let (mut circuit, (input_handle, inc_output, hash_inc_output, noninc_output)) =
Runtime::init_circuit(workers, distinct_indexed_test_circuit).unwrap();
if transaction {
circuit.start_transaction().unwrap();
for mut input_batch in inputs.into_iter() {
input_handle.append(&mut input_batch);
circuit.step().unwrap();
}
circuit.commit_transaction().unwrap();
let noninc_output = noninc_output.concat().consolidate();
let inc_output = inc_output.concat().consolidate();
let hash_inc_output = hash_inc_output.concat().consolidate();
assert_eq!(noninc_output, hash_inc_output);
assert_eq!(noninc_output, inc_output);
assert_eq!(hash_inc_output, inc_output);
} else {
for mut input_batch in inputs.into_iter() {
input_handle.append(&mut input_batch);
circuit.transaction().unwrap();
let noninc_output = noninc_output.concat().consolidate();
let inc_output = inc_output.concat().consolidate();
let hash_inc_output = hash_inc_output.concat().consolidate();
assert_eq!(noninc_output, hash_inc_output);
assert_eq!(noninc_output, inc_output);
assert_eq!(hash_inc_output, inc_output);
}
}
circuit.kill().unwrap();
}
proptest! {
#[test]
fn proptest_distinct_test_mt_small_step(inputs in test_input(), workers in (2..=16usize)) {
proptest_distinct_test_mt(inputs, workers, false);
}
#[test]
fn proptest_distinct_test_mt_big_step(inputs in test_input(), workers in (2..=16usize)) {
proptest_distinct_test_mt(inputs, workers, true);
}
#[test]
fn proptest_distinct_indexed_test_mt_small_steps(inputs in test_indexed_input(), workers in (2..=4usize)) {
proptest_distinct_indexed_test_mt(inputs, workers, false);
}
#[test]
fn proptest_distinct_indexed_test_mt_big_step(inputs in test_indexed_input(), workers in (2..=4usize)) {
proptest_distinct_indexed_test_mt(inputs, workers, true);
}
#[test]
fn proptest_distinct_indexed_nested_test_mt(inputs in test_indexed_nested_input(), workers in (2..=4usize)) {
let iterations = inputs.len();
let mut circuit = Runtime::init_circuit(workers, |circuit| distinct_indexed_nested_test_circuit(circuit, inputs)).unwrap().0;
for _ in 0..iterations {
circuit.transaction().unwrap();
}
circuit.kill().unwrap();
}
}
}