1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
use crate::{
Circuit, Stream,
dynamic::DynPair,
operator::dynamic::semijoin::SemijoinStreamFactories,
typed_batch::{IndexedZSet, ZSet},
};
impl<C, Pairs> Stream<C, Pairs>
where
C: Circuit,
Pairs: IndexedZSet,
Pairs::InnerBatch: Send,
{
/// Semijoin two streams of batches.
///
/// The operator takes two streams of batches indexed with the same key type
/// (`Pairs::Key = Keys::Key`) and outputs a stream obtained by joining each
/// pair of inputs.
///
/// Input streams will typically be produced by [`Stream::map_index()`].
///
/// #### Type arguments
///
/// * `Pairs` - batch type in the first input stream.
/// * `Keys` - batch type in the second input stream.
/// * `Out` - output Z-set type.
#[track_caller]
pub fn semijoin_stream<Keys, Out>(&self, keys: &Stream<C, Keys>) -> Stream<C, Out>
where
Keys: ZSet<Key = Pairs::Key, DynK = Pairs::DynK>,
Keys::InnerBatch: Send,
Out: ZSet<Key = (Pairs::Key, Pairs::Val), DynK = DynPair<Pairs::DynK, Pairs::DynV>>,
{
let factories = SemijoinStreamFactories::<Pairs::Inner, Keys::Inner, Out::Inner>::new::<
Pairs::Key,
Pairs::Val,
>();
self.inner()
.dyn_semijoin_stream(&factories, &keys.inner())
.typed()
}
}