1use multimap::MultiMap;
4use pin_project::pin_project;
5use serde_closure::FnMutNamed;
6use std::{
7 hash::Hash, pin::Pin, task::{Context, Poll}, vec
8};
9
10use super::{FilterMapSync, MapSync, ParallelPipe, ParallelStream};
11
12#[pin_project]
13#[must_use]
14pub struct LeftJoin<P, K, V1, V2> {
15 #[pin]
16 right: MapSync<P, LeftJoinClosure<K, V1, V2>>,
17}
18
19impl<P, K, V1, V2> LeftJoin<P, K, V1, V2> {
20 pub fn new(pipe: P, right: MultiMap<K, V2>) -> Self {
21 Self {
22 right: MapSync::new(pipe, LeftJoinClosure::new(right)),
23 }
24 }
25}
26
27impl_par_dist! {
28 impl<P, K, V1, V2> ParallelStream for LeftJoin<P, K, V1, V2>
29 where
30 P: ParallelStream<Item = (K, V1)>,
31 K: Eq + Hash + Clone + Send + 'static,
32 V1: 'static,
33 V2: Clone + Send + 'static,
34 {
35 type Item = (K, V1, ImplIter<V2>);
36 type Task = <MapSync<P, LeftJoinClosure<K, V1, V2>> as ParallelStream>::Task;
37
38 fn size_hint(&self) -> (usize, Option<usize>) {
39 self.right.size_hint()
40 }
41 fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> {
42 self.project().right.next_task(cx)
43 }
44 }
45
46 impl<P, K, V1, V2, Input> ParallelPipe<Input> for LeftJoin<P, K, V1, V2>
47 where
48 P: ParallelPipe<Input, Output = (K, V1)>,
49 K: Eq + Hash + Clone + Send + 'static,
50 V1: 'static,
51 V2: Clone + Send + 'static,
52 {
53 type Output = (K, V1, ImplIter<V2>);
54 type Task = <MapSync<P, LeftJoinClosure<K, V1, V2>> as ParallelPipe<Input>>::Task;
55
56 fn task(&self) -> Self::Task {
57 self.right.task()
58 }
59 }
60}
61
62FnMutNamed! {
63 pub type LeftJoinClosure<K, V1, V2> = |self, right: MultiMap<K, V2>|item=> (K, V1)| -> (K, V1, ImplIter<V2>) where ; where K: Eq, K: Hash, V2: Clone {
64 let v2 = self.right.get_vec(&item.0).map_or_else(Vec::new, Clone::clone).into_iter();
65 (item.0, item.1, ImplIter(v2))
66 }
67}
68
69#[pin_project]
70#[must_use]
71pub struct InnerJoin<P, K, V1, V2> {
72 #[pin]
73 right: FilterMapSync<P, InnerJoinClosure<K, V1, V2>>,
74}
75
76impl<P, K, V1, V2> InnerJoin<P, K, V1, V2> {
77 pub fn new(pipe: P, right: MultiMap<K, V2>) -> Self {
78 Self {
79 right: FilterMapSync::new(pipe, InnerJoinClosure::new(right)),
80 }
81 }
82}
83
84impl_par_dist! {
85 impl<P, K, V1, V2> ParallelStream for InnerJoin<P, K, V1, V2>
86 where
87 P: ParallelStream<Item = (K, V1)>,
88 K: Eq + Hash + Clone + Send + 'static,
89 V1: 'static,
90 V2: Clone + Send + 'static,
91 {
92 type Item = (K, ImplIter<V1>, ImplIter<V2>);
93 type Task = <FilterMapSync<P, InnerJoinClosure<K, V1, V2>> as ParallelStream>::Task;
94
95 fn size_hint(&self) -> (usize, Option<usize>) {
96 self.right.size_hint()
97 }
98 fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> {
99 self.project().right.next_task(cx)
100 }
101 }
102
103 impl<P, K, V1, V2, Input> ParallelPipe<Input> for InnerJoin<P, K, V1, V2>
104 where
105 P: ParallelPipe<Input, Output = (K, V1)>,
106 K: Eq + Hash + Clone + Send + 'static,
107 V1: 'static,
108 V2: Clone + Send + 'static,
109 {
110 type Output = (K, ImplIter<V1>, ImplIter<V2>);
111 type Task = <FilterMapSync<P, InnerJoinClosure<K, V1, V2>> as ParallelPipe<Input>>::Task;
112
113 fn task(&self) -> Self::Task {
114 self.right.task()
115 }
116 }
117}
118
119FnMutNamed! {
120 pub type InnerJoinClosure<K, V1, V2> = |self, right: MultiMap<K, V2>|item=> (K, V1)| -> Option<(K, ImplIter<V1>, ImplIter<V2>)> where ; where K: Eq, K: Hash, V2: Clone {
121 self.right.get_vec(&item.0).map(|v2| {
122 (item.0, ImplIter(vec![item.1].into_iter()), ImplIter(v2.clone().into_iter()))
123 })
124 }
125}
126
127pub struct ImplIter<T>(vec::IntoIter<T>);
128impl<T> Iterator for ImplIter<T> {
129 type Item = T;
130
131 fn next(&mut self) -> Option<Self::Item> {
132 self.0.next()
133 }
134}