noir_compute/operator/join/
ship.rs

1#![allow(clippy::type_complexity)]
2
3use std::marker::PhantomData;
4
5use crate::block::NextStrategy;
6use crate::operator::join::local_hash::JoinStreamLocalHash;
7use crate::operator::join::local_sort_merge::JoinStreamLocalSortMerge;
8use crate::operator::join::JoinStream;
9use crate::operator::start::{BinaryStartOperator, Start};
10use crate::operator::{Data, DataKey, ExchangeData, KeyerFn, Operator};
11use crate::stream::Stream;
12
13/// Marker type for remembering that hash is the selected ship strategy.
14#[derive(Clone, Copy)]
15pub struct ShipHash;
16
17/// Marker type for remembering that broadcast_right is the selected ship strategy.
18#[derive(Clone, Copy)]
19pub struct ShipBroadcastRight;
20
21/// Marker trait for the ship strategy marker types.
22pub trait ShipStrategy: Clone + Send {}
23
24impl ShipStrategy for ShipHash {}
25impl ShipStrategy for ShipBroadcastRight {}
26
27/// This is an intermediate type for building a join operator.
28///
29/// The ship strategy has been selected as hash, and now the local strategy has to be selected.
30pub struct JoinStreamShipHash<Key: DataKey, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
31where
32    Keyer1: KeyerFn<Key, Out1>,
33    Keyer2: KeyerFn<Key, Out2>,
34{
35    inner: Stream<BinaryStartOperator<Out1, Out2>>,
36    keyer1: Keyer1,
37    keyer2: Keyer2,
38    _key: PhantomData<Key>,
39}
40
41/// This is an intermediate type for building a join operator.
42///
43/// The ship strategy has been selected as broadcast_right, and now the local strategy has to be
44/// selected.
45pub struct JoinStreamShipBroadcastRight<
46    Key: Data,
47    Out1: ExchangeData,
48    Out2: ExchangeData,
49    Keyer1,
50    Keyer2,
51> where
52    Keyer1: KeyerFn<Key, Out1>,
53    Keyer2: KeyerFn<Key, Out2>,
54{
55    inner: Stream<BinaryStartOperator<Out1, Out2>>,
56    keyer1: Keyer1,
57    keyer2: Keyer2,
58    _key: PhantomData<Key>,
59}
60
61impl<Key: DataKey, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
62    JoinStreamShipHash<Key, Out1, Out2, Keyer1, Keyer2>
63where
64    Keyer1: KeyerFn<Key, Out1>,
65    Keyer2: KeyerFn<Key, Out2>,
66{
67    pub(crate) fn new<OperatorChain1, OperatorChain2>(
68        prev: JoinStream<Key, Out1, Out2, OperatorChain1, OperatorChain2, Keyer1, Keyer2>,
69    ) -> Self
70    where
71        OperatorChain1: Operator<Out = Out1> + 'static,
72        OperatorChain2: Operator<Out = Out2> + 'static,
73    {
74        let keyer1 = prev.keyer1;
75        let keyer2 = prev.keyer2;
76        let next_strategy1 = NextStrategy::group_by(keyer1.clone());
77        let next_strategy2 = NextStrategy::group_by(keyer2.clone());
78        let inner =
79            prev.lhs
80                .binary_connection(prev.rhs, Start::multiple, next_strategy1, next_strategy2);
81        JoinStreamShipHash {
82            inner,
83            keyer1,
84            keyer2,
85            _key: Default::default(),
86        }
87    }
88
89    /// Select _local hash_ as local strategy.
90    ///
91    /// An hash-table will be used to generate the join tuples.
92    pub fn local_hash(self) -> JoinStreamLocalHash<Key, Out1, Out2, Keyer1, Keyer2, ShipHash> {
93        JoinStreamLocalHash::new(self.inner, self.keyer1, self.keyer2)
94    }
95
96    /// Select _sort-merge_ as local strategy.
97    ///
98    /// The tuples will be collected and sorted, then the tuples are generated.
99    pub fn local_sort_merge(
100        self,
101    ) -> JoinStreamLocalSortMerge<Key, Out1, Out2, Keyer1, Keyer2, ShipHash>
102    where
103        Key: Ord,
104    {
105        JoinStreamLocalSortMerge::new(self.inner, self.keyer1, self.keyer2)
106    }
107}
108
109impl<Key: Data, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
110    JoinStreamShipBroadcastRight<Key, Out1, Out2, Keyer1, Keyer2>
111where
112    Keyer1: KeyerFn<Key, Out1>,
113    Keyer2: KeyerFn<Key, Out2>,
114{
115    pub(crate) fn new<OperatorChain1, OperatorChain2>(
116        prev: JoinStream<Key, Out1, Out2, OperatorChain1, OperatorChain2, Keyer1, Keyer2>,
117    ) -> Self
118    where
119        OperatorChain1: Operator<Out = Out1> + 'static,
120        OperatorChain2: Operator<Out = Out2> + 'static,
121    {
122        let keyer1 = prev.keyer1;
123        let keyer2 = prev.keyer2;
124        let inner = prev.lhs.binary_connection(
125            prev.rhs,
126            Start::multiple,
127            NextStrategy::only_one(),
128            NextStrategy::all(),
129        );
130        JoinStreamShipBroadcastRight {
131            inner,
132            keyer1,
133            keyer2,
134            _key: Default::default(),
135        }
136    }
137
138    /// Select _local hash_ as local strategy.
139    ///
140    /// An hash-table will be used to generate the join tuples.
141    pub fn local_hash(
142        self,
143    ) -> JoinStreamLocalHash<Key, Out1, Out2, Keyer1, Keyer2, ShipBroadcastRight>
144    where
145        Key: DataKey,
146    {
147        JoinStreamLocalHash::new(self.inner, self.keyer1, self.keyer2)
148    }
149
150    /// Select _sort-merge_ as local strategy.
151    ///
152    /// The tuples will be collected and sorted, then the tuples are generated.
153    pub fn local_sort_merge(
154        self,
155    ) -> JoinStreamLocalSortMerge<Key, Out1, Out2, Keyer1, Keyer2, ShipBroadcastRight>
156    where
157        Key: Ord,
158    {
159        JoinStreamLocalSortMerge::new(self.inner, self.keyer1, self.keyer2)
160    }
161}