#![allow(clippy::type_complexity)]
use std::marker::PhantomData;
use crate::block::NextStrategy;
use crate::operator::join::local_hash::JoinStreamLocalHash;
use crate::operator::join::local_sort_merge::JoinStreamLocalSortMerge;
use crate::operator::join::JoinStream;
use crate::operator::start::{BinaryStartOperator, Start};
use crate::operator::{Data, DataKey, ExchangeData, KeyerFn, Operator};
use crate::stream::Stream;
#[derive(Clone, Copy)]
pub struct ShipHash;
#[derive(Clone, Copy)]
pub struct ShipBroadcastRight;
pub trait ShipStrategy: Clone + Send {}
impl ShipStrategy for ShipHash {}
impl ShipStrategy for ShipBroadcastRight {}
pub struct JoinStreamShipHash<Key: DataKey, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
where
Keyer1: KeyerFn<Key, Out1>,
Keyer2: KeyerFn<Key, Out2>,
{
inner: Stream<BinaryStartOperator<Out1, Out2>>,
keyer1: Keyer1,
keyer2: Keyer2,
_key: PhantomData<Key>,
}
pub struct JoinStreamShipBroadcastRight<
Key: Data,
Out1: ExchangeData,
Out2: ExchangeData,
Keyer1,
Keyer2,
> where
Keyer1: KeyerFn<Key, Out1>,
Keyer2: KeyerFn<Key, Out2>,
{
inner: Stream<BinaryStartOperator<Out1, Out2>>,
keyer1: Keyer1,
keyer2: Keyer2,
_key: PhantomData<Key>,
}
impl<Key: DataKey, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
JoinStreamShipHash<Key, Out1, Out2, Keyer1, Keyer2>
where
Keyer1: KeyerFn<Key, Out1>,
Keyer2: KeyerFn<Key, Out2>,
{
pub(crate) fn new<OperatorChain1, OperatorChain2>(
prev: JoinStream<Key, Out1, Out2, OperatorChain1, OperatorChain2, Keyer1, Keyer2>,
) -> Self
where
OperatorChain1: Operator<Out = Out1> + 'static,
OperatorChain2: Operator<Out = Out2> + 'static,
{
let keyer1 = prev.keyer1;
let keyer2 = prev.keyer2;
let next_strategy1 = NextStrategy::group_by(keyer1.clone());
let next_strategy2 = NextStrategy::group_by(keyer2.clone());
let inner =
prev.lhs
.binary_connection(prev.rhs, Start::multiple, next_strategy1, next_strategy2);
JoinStreamShipHash {
inner,
keyer1,
keyer2,
_key: Default::default(),
}
}
pub fn local_hash(self) -> JoinStreamLocalHash<Key, Out1, Out2, Keyer1, Keyer2, ShipHash> {
JoinStreamLocalHash::new(self.inner, self.keyer1, self.keyer2)
}
pub fn local_sort_merge(
self,
) -> JoinStreamLocalSortMerge<Key, Out1, Out2, Keyer1, Keyer2, ShipHash>
where
Key: Ord,
{
JoinStreamLocalSortMerge::new(self.inner, self.keyer1, self.keyer2)
}
}
impl<Key: Data, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
JoinStreamShipBroadcastRight<Key, Out1, Out2, Keyer1, Keyer2>
where
Keyer1: KeyerFn<Key, Out1>,
Keyer2: KeyerFn<Key, Out2>,
{
pub(crate) fn new<OperatorChain1, OperatorChain2>(
prev: JoinStream<Key, Out1, Out2, OperatorChain1, OperatorChain2, Keyer1, Keyer2>,
) -> Self
where
OperatorChain1: Operator<Out = Out1> + 'static,
OperatorChain2: Operator<Out = Out2> + 'static,
{
let keyer1 = prev.keyer1;
let keyer2 = prev.keyer2;
let inner = prev.lhs.binary_connection(
prev.rhs,
Start::multiple,
NextStrategy::only_one(),
NextStrategy::all(),
);
JoinStreamShipBroadcastRight {
inner,
keyer1,
keyer2,
_key: Default::default(),
}
}
pub fn local_hash(
self,
) -> JoinStreamLocalHash<Key, Out1, Out2, Keyer1, Keyer2, ShipBroadcastRight>
where
Key: DataKey,
{
JoinStreamLocalHash::new(self.inner, self.keyer1, self.keyer2)
}
pub fn local_sort_merge(
self,
) -> JoinStreamLocalSortMerge<Key, Out1, Out2, Keyer1, Keyer2, ShipBroadcastRight>
where
Key: Ord,
{
JoinStreamLocalSortMerge::new(self.inner, self.keyer1, self.keyer2)
}
}