use std::iter::once;
use crate::{
Circuit, DBData, Stream,
dynamic::{DowncastTrait, DynData, DynUnit, Erase},
operator::dynamic::{
filter_map::DynFilterMap,
join::{
JoinFactories, OuterJoinFactories, StreamAntijoinFactories, StreamJoinFactories,
TraceJoinFuncs,
},
},
typed_batch::{IndexedZSet, OrdIndexedZSet, OrdZSet, ZSet},
};
pub(crate) fn mk_trace_join_funcs<I1, I2, Z, F>(
join: F,
) -> TraceJoinFuncs<I1::DynK, I1::DynV, I2::DynV, Z::DynK, DynUnit>
where
I1: IndexedZSet,
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
Z: ZSet,
Box<Z::DynK>: Clone,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> Z::Key + Clone + 'static,
{
mk_trace_join_flatmap_funcs::<I1, I2, Z, _, _>(move |k, v1, v2| once(join(k, v1, v2)))
}
pub(crate) fn mk_trace_join_flatmap_funcs<I1, I2, Z, F, It>(
join: F,
) -> TraceJoinFuncs<I1::DynK, I1::DynV, I2::DynV, Z::DynK, DynUnit>
where
I1: IndexedZSet,
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
Z: ZSet,
Box<Z::DynK>: Clone,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> It + Clone + 'static,
It: IntoIterator<Item = Z::Key> + 'static,
{
let mut key1: Box<Z::DynK> = Box::<Z::Key>::default().erase_box();
let mut val1: Box<DynUnit> = Box::new(()).erase_box();
let mut key2: Box<Z::DynK> = Box::<Z::Key>::default().erase_box();
let mut val2: Box<DynUnit> = Box::new(()).erase_box();
let join_clone = join.clone();
TraceJoinFuncs {
left: Box::new(move |k, v1, v2, cb| unsafe {
for key in join(k.downcast(), v1.downcast(), v2.downcast()) {
*key1.downcast_mut() = key;
cb(key1.as_mut(), val1.as_mut());
}
}),
right: Box::new(move |k, v2, v1, cb| unsafe {
for key in join_clone(k.downcast(), v1.downcast(), v2.downcast()) {
*key2.downcast_mut() = key;
cb(key2.as_mut(), val2.as_mut());
}
}),
}
}
pub(crate) fn mk_trace_join_generic_funcs<I1, I2, Z, F, It>(
join: F,
) -> TraceJoinFuncs<I1::DynK, I1::DynV, I2::DynV, Z::DynK, Z::DynV>
where
I1: IndexedZSet,
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
Z: IndexedZSet,
Box<Z::DynK>: Clone,
Box<Z::DynV>: Clone,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> It + Clone + 'static,
It: IntoIterator<Item = (Z::Key, Z::Val)> + 'static,
{
let mut key1: Box<Z::DynK> = Box::<Z::Key>::default().erase_box();
let mut val1: Box<Z::DynV> = Box::<Z::Val>::default().erase_box();
let mut key2: Box<Z::DynK> = Box::<Z::Key>::default().erase_box();
let mut val2: Box<Z::DynV> = Box::<Z::Val>::default().erase_box();
let join_clone = join.clone();
TraceJoinFuncs {
left: Box::new(move |k, v1, v2, cb| unsafe {
for (key, val) in join(k.downcast(), v1.downcast(), v2.downcast()) {
*key1.downcast_mut() = key;
*val1.downcast_mut() = val;
cb(key1.as_mut(), val1.as_mut());
}
}),
right: Box::new(move |k, v2, v1, cb| unsafe {
for (key, val) in join_clone(k.downcast(), v1.downcast(), v2.downcast()) {
*key2.downcast_mut() = key;
*val2.downcast_mut() = val;
cb(key2.as_mut(), val2.as_mut());
}
}),
}
}
impl<C, K1, V1> Stream<C, OrdIndexedZSet<K1, V1>>
where
C: Circuit,
K1: DBData,
V1: DBData,
{
#[cfg(not(feature = "backend-mode"))]
#[track_caller]
pub fn join<F, V2, V>(
&self,
other: &Stream<C, OrdIndexedZSet<K1, V2>>,
join: F,
) -> Stream<C, OrdZSet<V>>
where
V2: DBData,
V: DBData,
F: Fn(&K1, &V1, &V2) -> V + Clone + 'static,
{
let join_funcs =
mk_trace_join_funcs::<OrdIndexedZSet<K1, V1>, OrdIndexedZSet<K1, V2>, OrdZSet<V>, _>(
join,
);
let join_factories = JoinFactories::new::<K1, V1, V2, V, ()>();
self.inner()
.dyn_join(&join_factories, &other.inner(), join_funcs)
.typed()
}
#[cfg(not(feature = "backend-mode"))]
#[track_caller]
pub fn join_flatmap<F, V2, V, It>(
&self,
other: &Stream<C, OrdIndexedZSet<K1, V2>>,
join: F,
) -> Stream<C, OrdZSet<V>>
where
V2: DBData,
V: DBData,
F: Fn(&K1, &V1, &V2) -> It + Clone + 'static,
It: IntoIterator<Item = V> + 'static,
{
let join_funcs = mk_trace_join_flatmap_funcs::<
OrdIndexedZSet<K1, V1>,
OrdIndexedZSet<K1, V2>,
OrdZSet<V>,
_,
It,
>(join);
let join_factories = JoinFactories::new::<K1, V1, V2, V, ()>();
self.inner()
.dyn_join(&join_factories, &other.inner(), join_funcs)
.typed()
}
#[cfg(not(feature = "backend-mode"))]
#[track_caller]
pub fn join_index<F, V2, K, V, It>(
&self,
other: &Stream<C, OrdIndexedZSet<K1, V2>>,
join: F,
) -> Stream<C, OrdIndexedZSet<K, V>>
where
V2: DBData,
K: DBData,
V: DBData,
F: Fn(&K1, &V1, &V2) -> It + Clone + 'static,
It: IntoIterator<Item = (K, V)> + 'static,
{
let join_funcs = mk_trace_join_generic_funcs::<
OrdIndexedZSet<K1, V1>,
OrdIndexedZSet<K1, V2>,
OrdIndexedZSet<K, V>,
_,
_,
>(join);
let join_factories = JoinFactories::new::<K1, V1, V2, K, V>();
self.inner()
.dyn_join_index(&join_factories, &other.inner(), join_funcs)
.typed()
}
#[track_caller]
pub fn outer_join_default<F, V2, O>(
&self,
other: &Stream<C, OrdIndexedZSet<K1, V2>>,
join_func: F,
) -> Stream<C, OrdZSet<O>>
where
V2: DBData,
O: DBData,
F: Fn(&K1, &V1, &V2) -> O + Clone + 'static,
{
let join_funcs =
mk_trace_join_funcs::<OrdIndexedZSet<K1, V1>, OrdIndexedZSet<K1, V2>, OrdZSet<O>, _>(
join_func,
);
let factories = OuterJoinFactories::new::<K1, V1, V2, O>();
self.inner()
.dyn_outer_join_default(&factories, &other.inner(), join_funcs)
.typed()
}
}
impl<C, I1> Stream<C, I1>
where
C: Circuit,
I1: IndexedZSet,
I1::InnerBatch: Send,
{
#[track_caller]
pub fn stream_join<F, I2, V>(&self, other: &Stream<C, I2>, join: F) -> Stream<C, OrdZSet<V>>
where
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I2::InnerBatch: Send,
V: DBData,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> V + Clone + 'static,
{
let factories = StreamJoinFactories::new::<I1::Key, I1::Val, I2::Val, V, ()>();
self.inner()
.dyn_stream_join(
&factories,
&other.inner(),
Box::new(move |k, v1, v2, res: &mut DynData, _| unsafe {
*res.downcast_mut() = join(k.downcast(), v1.downcast(), v2.downcast())
}),
)
.typed()
}
#[track_caller]
pub fn stream_join_generic<F, I2, Z>(&self, other: &Stream<C, I2>, join: F) -> Stream<C, Z>
where
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I2::InnerBatch: Send,
Z: IndexedZSet,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> (Z::Key, Z::Val) + Clone + 'static,
{
let factories = StreamJoinFactories::new::<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>();
self.inner()
.dyn_stream_join_generic(
&factories,
&other.inner(),
Box::new(
move |k, v1, v2, resk: &mut Z::DynK, resv: &mut Z::DynV| unsafe {
(*resk.downcast_mut(), *resv.downcast_mut()) =
join(k.downcast(), v1.downcast(), v2.downcast())
},
),
)
.typed()
}
#[track_caller]
pub fn monotonic_stream_join<F, I2, Z>(&self, other: &Stream<C, I2>, join: F) -> Stream<C, Z>
where
I1: IndexedZSet,
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I2::InnerBatch: Send,
Z: ZSet,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> Z::Key + Clone + 'static,
{
let factories = StreamJoinFactories::new::<I1::Key, I1::Val, I2::Val, Z::Key, ()>();
self.inner()
.dyn_monotonic_stream_join(
&factories,
&other.inner(),
Box::new(move |k, v1, v2, res: &mut Z::DynK, _| unsafe {
*res.downcast_mut() = join(k.downcast(), v1.downcast(), v2.downcast())
}),
)
.typed()
}
#[track_caller]
pub fn stream_antijoin<I2>(&self, other: &Stream<C, I2>) -> Stream<C, I1>
where
I1: IndexedZSet,
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I2::Inner: DynFilterMap,
{
let factories = StreamAntijoinFactories::new::<I1::Key, I1::Val, ()>();
self.inner()
.dyn_stream_antijoin(&factories, &other.inner())
.typed()
}
#[track_caller]
pub fn join_generic<I2, F, Z, It>(&self, other: &Stream<C, I2>, join: F) -> Stream<C, Z>
where
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I2::InnerBatch: Send,
Z: IndexedZSet,
Box<Z::DynK>: Clone,
Box<Z::DynV>: Clone,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> It + Clone + 'static,
It: IntoIterator<Item = (Z::Key, Z::Val)> + 'static,
{
let factories = JoinFactories::new::<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>();
let join_funcs = mk_trace_join_generic_funcs::<I1, I2, Z, _, _>(join);
self.inner()
.dyn_join_generic(&factories, &other.inner(), join_funcs)
.typed()
}
#[cfg(not(feature = "backend-mode"))]
#[track_caller]
pub fn antijoin<I2>(&self, other: &Stream<C, I2>) -> Stream<C, I1>
where
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I2::InnerBatch: Send + DynFilterMap,
Box<I1::DynK>: Clone,
Box<I1::DynV>: Clone,
{
let factories =
crate::operator::dynamic::join::AntijoinFactories::new::<I1::Key, I1::Val>();
self.inner()
.dyn_antijoin(&factories, &other.inner())
.typed()
}
#[track_caller]
pub fn outer_join<I2, F, FL, FR, O>(
&self,
other: &Stream<C, I2>,
join_func: F,
left_func: FL,
right_func: FR,
) -> Stream<C, OrdZSet<O>>
where
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I1::Inner: DynFilterMap,
I2::Inner: DynFilterMap,
I2::InnerBatch: Send,
O: DBData,
Box<I1::DynK>: Clone,
Box<I1::DynV>: Clone,
Box<I2::DynV>: Clone,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> O + Clone + 'static,
for<'a> FL: Fn(&I1::Key, &I1::Val) -> O + Clone + 'static,
for<'a> FR: Fn(&I2::Key, &I2::Val) -> O + Clone + 'static,
{
let factories = OuterJoinFactories::new::<I1::Key, I1::Val, I2::Val, O>();
let join_funcs = mk_trace_join_funcs::<I1, I2, OrdZSet<O>, _>(join_func);
self.inner()
.dyn_outer_join(
&factories,
&other.inner(),
join_funcs,
Box::new(move |item, cb| {
let (k, v) = I1::Inner::item_ref_keyval(item);
let mut out = unsafe { left_func(k.downcast(), v.downcast()) };
cb(out.erase_mut(), ().erase_mut());
}),
Box::new(move |item, cb| {
let (k, v) = I2::Inner::item_ref_keyval(item);
let mut out = unsafe { right_func(k.downcast(), v.downcast()) };
cb(out.erase_mut(), ().erase_mut());
}),
)
.typed()
}
}