noir_compute/operator/join/
ship.rs1#![allow(clippy::type_complexity)]
2
3use std::marker::PhantomData;
4
5use crate::block::NextStrategy;
6use crate::operator::join::local_hash::JoinStreamLocalHash;
7use crate::operator::join::local_sort_merge::JoinStreamLocalSortMerge;
8use crate::operator::join::JoinStream;
9use crate::operator::start::{BinaryStartOperator, Start};
10use crate::operator::{Data, DataKey, ExchangeData, KeyerFn, Operator};
11use crate::stream::Stream;
12
13#[derive(Clone, Copy)]
15pub struct ShipHash;
16
17#[derive(Clone, Copy)]
19pub struct ShipBroadcastRight;
20
21pub trait ShipStrategy: Clone + Send {}
23
24impl ShipStrategy for ShipHash {}
25impl ShipStrategy for ShipBroadcastRight {}
26
27pub struct JoinStreamShipHash<Key: DataKey, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
31where
32 Keyer1: KeyerFn<Key, Out1>,
33 Keyer2: KeyerFn<Key, Out2>,
34{
35 inner: Stream<BinaryStartOperator<Out1, Out2>>,
36 keyer1: Keyer1,
37 keyer2: Keyer2,
38 _key: PhantomData<Key>,
39}
40
41pub struct JoinStreamShipBroadcastRight<
46 Key: Data,
47 Out1: ExchangeData,
48 Out2: ExchangeData,
49 Keyer1,
50 Keyer2,
51> where
52 Keyer1: KeyerFn<Key, Out1>,
53 Keyer2: KeyerFn<Key, Out2>,
54{
55 inner: Stream<BinaryStartOperator<Out1, Out2>>,
56 keyer1: Keyer1,
57 keyer2: Keyer2,
58 _key: PhantomData<Key>,
59}
60
61impl<Key: DataKey, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
62 JoinStreamShipHash<Key, Out1, Out2, Keyer1, Keyer2>
63where
64 Keyer1: KeyerFn<Key, Out1>,
65 Keyer2: KeyerFn<Key, Out2>,
66{
67 pub(crate) fn new<OperatorChain1, OperatorChain2>(
68 prev: JoinStream<Key, Out1, Out2, OperatorChain1, OperatorChain2, Keyer1, Keyer2>,
69 ) -> Self
70 where
71 OperatorChain1: Operator<Out = Out1> + 'static,
72 OperatorChain2: Operator<Out = Out2> + 'static,
73 {
74 let keyer1 = prev.keyer1;
75 let keyer2 = prev.keyer2;
76 let next_strategy1 = NextStrategy::group_by(keyer1.clone());
77 let next_strategy2 = NextStrategy::group_by(keyer2.clone());
78 let inner =
79 prev.lhs
80 .binary_connection(prev.rhs, Start::multiple, next_strategy1, next_strategy2);
81 JoinStreamShipHash {
82 inner,
83 keyer1,
84 keyer2,
85 _key: Default::default(),
86 }
87 }
88
89 pub fn local_hash(self) -> JoinStreamLocalHash<Key, Out1, Out2, Keyer1, Keyer2, ShipHash> {
93 JoinStreamLocalHash::new(self.inner, self.keyer1, self.keyer2)
94 }
95
96 pub fn local_sort_merge(
100 self,
101 ) -> JoinStreamLocalSortMerge<Key, Out1, Out2, Keyer1, Keyer2, ShipHash>
102 where
103 Key: Ord,
104 {
105 JoinStreamLocalSortMerge::new(self.inner, self.keyer1, self.keyer2)
106 }
107}
108
109impl<Key: Data, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
110 JoinStreamShipBroadcastRight<Key, Out1, Out2, Keyer1, Keyer2>
111where
112 Keyer1: KeyerFn<Key, Out1>,
113 Keyer2: KeyerFn<Key, Out2>,
114{
115 pub(crate) fn new<OperatorChain1, OperatorChain2>(
116 prev: JoinStream<Key, Out1, Out2, OperatorChain1, OperatorChain2, Keyer1, Keyer2>,
117 ) -> Self
118 where
119 OperatorChain1: Operator<Out = Out1> + 'static,
120 OperatorChain2: Operator<Out = Out2> + 'static,
121 {
122 let keyer1 = prev.keyer1;
123 let keyer2 = prev.keyer2;
124 let inner = prev.lhs.binary_connection(
125 prev.rhs,
126 Start::multiple,
127 NextStrategy::only_one(),
128 NextStrategy::all(),
129 );
130 JoinStreamShipBroadcastRight {
131 inner,
132 keyer1,
133 keyer2,
134 _key: Default::default(),
135 }
136 }
137
138 pub fn local_hash(
142 self,
143 ) -> JoinStreamLocalHash<Key, Out1, Out2, Keyer1, Keyer2, ShipBroadcastRight>
144 where
145 Key: DataKey,
146 {
147 JoinStreamLocalHash::new(self.inner, self.keyer1, self.keyer2)
148 }
149
150 pub fn local_sort_merge(
154 self,
155 ) -> JoinStreamLocalSortMerge<Key, Out1, Out2, Keyer1, Keyer2, ShipBroadcastRight>
156 where
157 Key: Ord,
158 {
159 JoinStreamLocalSortMerge::new(self.inner, self.keyer1, self.keyer2)
160 }
161}