use std::marker::PhantomData;
pub use local_hash::JoinStreamLocalHash;
pub use local_sort_merge::JoinStreamLocalSortMerge;
pub use ship::{ShipBroadcastRight, ShipHash, ShipStrategy};
pub use crate::operator::join::ship::{JoinStreamShipBroadcastRight, JoinStreamShipHash};
use crate::operator::{Data, DataKey, ExchangeData, KeyerFn, Operator};
use crate::stream::{KeyedStream, Stream};
mod keyed_join;
mod local_hash;
mod local_sort_merge;
mod ship;
pub type InnerJoinTuple<Out1, Out2> = (Out1, Out2);
pub type LeftJoinTuple<Out1, Out2> = (Out1, Option<Out2>);
pub type OuterJoinTuple<Out1, Out2> = (Option<Out1>, Option<Out2>);
#[derive(Clone, Debug)]
pub(crate) enum JoinVariant {
Inner,
Left,
Outer,
}
impl JoinVariant {
pub(crate) fn left_outer(&self) -> bool {
matches!(self, JoinVariant::Left | JoinVariant::Outer)
}
pub(crate) fn right_outer(&self) -> bool {
matches!(self, JoinVariant::Outer)
}
}
pub struct JoinStream<
Key,
Out1: ExchangeData,
Out2: ExchangeData,
OperatorChain1,
OperatorChain2,
Keyer1,
Keyer2,
> where
OperatorChain1: Operator<Out = Out1>,
OperatorChain2: Operator<Out = Out2>,
Keyer1: KeyerFn<Key, Out1>,
Keyer2: KeyerFn<Key, Out2>,
{
pub(crate) lhs: Stream<OperatorChain1>,
pub(crate) rhs: Stream<OperatorChain2>,
pub(crate) keyer1: Keyer1,
pub(crate) keyer2: Keyer2,
_key: PhantomData<Key>,
}
impl<Out: ExchangeData, OperatorChain> Stream<OperatorChain>
where
OperatorChain: Operator<Out = Out> + 'static,
{
pub fn join<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
self,
rhs: Stream<OperatorChain2>,
keyer1: Keyer1,
keyer2: Keyer2,
) -> KeyedStream<impl Operator<Out = (Key, InnerJoinTuple<Out, Out2>)>>
where
Key: DataKey,
OperatorChain2: Operator<Out = Out2> + 'static,
Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>,
Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,
{
self.join_with(rhs, keyer1, keyer2)
.ship_hash()
.local_hash()
.inner()
}
pub fn left_join<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
self,
rhs: Stream<OperatorChain2>,
keyer1: Keyer1,
keyer2: Keyer2,
) -> KeyedStream<impl Operator<Out = (Key, LeftJoinTuple<Out, Out2>)>>
where
Key: DataKey,
OperatorChain2: Operator<Out = Out2> + 'static,
Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>,
Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,
{
self.join_with(rhs, keyer1, keyer2)
.ship_hash()
.local_hash()
.left()
}
pub fn outer_join<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
self,
rhs: Stream<OperatorChain2>,
keyer1: Keyer1,
keyer2: Keyer2,
) -> KeyedStream<impl Operator<Out = (Key, OuterJoinTuple<Out, Out2>)>>
where
Key: DataKey,
OperatorChain2: Operator<Out = Out2> + 'static,
Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>,
Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,
{
self.join_with(rhs, keyer1, keyer2)
.ship_hash()
.local_hash()
.outer()
}
pub fn join_with<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
self,
rhs: Stream<OperatorChain2>,
keyer1: Keyer1,
keyer2: Keyer2,
) -> JoinStream<Key, Out, Out2, OperatorChain, OperatorChain2, Keyer1, Keyer2>
where
OperatorChain2: Operator<Out = Out2>,
Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>,
Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,
{
JoinStream {
lhs: self,
rhs,
keyer1,
keyer2,
_key: PhantomData,
}
}
}
impl<
Key: Data,
Out1: ExchangeData,
Out2: ExchangeData,
OperatorChain1,
OperatorChain2,
Keyer1,
Keyer2,
> JoinStream<Key, Out1, Out2, OperatorChain1, OperatorChain2, Keyer1, Keyer2>
where
OperatorChain1: Operator<Out = Out1> + 'static,
OperatorChain2: Operator<Out = Out2> + 'static,
Keyer1: KeyerFn<Key, Out1>,
Keyer2: KeyerFn<Key, Out2>,
{
pub fn ship_hash(self) -> JoinStreamShipHash<Key, Out1, Out2, Keyer1, Keyer2>
where
Key: DataKey,
{
JoinStreamShipHash::new(self)
}
pub fn ship_broadcast_right(
self,
) -> JoinStreamShipBroadcastRight<Key, Out1, Out2, Keyer1, Keyer2> {
JoinStreamShipBroadcastRight::new(self)
}
}