amadeus_core/par_stream/
join.rs

1// TODO: remove the allocation
2
3use multimap::MultiMap;
4use pin_project::pin_project;
5use serde_closure::FnMutNamed;
6use std::{
7	hash::Hash, pin::Pin, task::{Context, Poll}, vec
8};
9
10use super::{FilterMapSync, MapSync, ParallelPipe, ParallelStream};
11
12#[pin_project]
13#[must_use]
14pub struct LeftJoin<P, K, V1, V2> {
15	#[pin]
16	right: MapSync<P, LeftJoinClosure<K, V1, V2>>,
17}
18
19impl<P, K, V1, V2> LeftJoin<P, K, V1, V2> {
20	pub fn new(pipe: P, right: MultiMap<K, V2>) -> Self {
21		Self {
22			right: MapSync::new(pipe, LeftJoinClosure::new(right)),
23		}
24	}
25}
26
27impl_par_dist! {
28	impl<P, K, V1, V2> ParallelStream for LeftJoin<P, K, V1, V2>
29	where
30		P: ParallelStream<Item = (K, V1)>,
31		K: Eq + Hash + Clone + Send + 'static,
32		V1: 'static,
33		V2: Clone + Send + 'static,
34	{
35		type Item = (K, V1, ImplIter<V2>);
36		type Task = <MapSync<P, LeftJoinClosure<K, V1, V2>> as ParallelStream>::Task;
37
38		fn size_hint(&self) -> (usize, Option<usize>) {
39			self.right.size_hint()
40		}
41		fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> {
42			self.project().right.next_task(cx)
43		}
44	}
45
46	impl<P, K, V1, V2, Input> ParallelPipe<Input> for LeftJoin<P, K, V1, V2>
47	where
48		P: ParallelPipe<Input, Output = (K, V1)>,
49		K: Eq + Hash + Clone + Send + 'static,
50		V1: 'static,
51		V2: Clone + Send + 'static,
52	{
53		type Output = (K, V1, ImplIter<V2>);
54		type Task = <MapSync<P, LeftJoinClosure<K, V1, V2>> as ParallelPipe<Input>>::Task;
55
56		fn task(&self) -> Self::Task {
57			self.right.task()
58		}
59	}
60}
61
62FnMutNamed! {
63	pub type LeftJoinClosure<K, V1, V2> = |self, right: MultiMap<K, V2>|item=> (K, V1)| -> (K, V1, ImplIter<V2>) where ; where K: Eq, K: Hash, V2: Clone {
64		let v2 = self.right.get_vec(&item.0).map_or_else(Vec::new, Clone::clone).into_iter();
65		(item.0, item.1, ImplIter(v2))
66	}
67}
68
69#[pin_project]
70#[must_use]
71pub struct InnerJoin<P, K, V1, V2> {
72	#[pin]
73	right: FilterMapSync<P, InnerJoinClosure<K, V1, V2>>,
74}
75
76impl<P, K, V1, V2> InnerJoin<P, K, V1, V2> {
77	pub fn new(pipe: P, right: MultiMap<K, V2>) -> Self {
78		Self {
79			right: FilterMapSync::new(pipe, InnerJoinClosure::new(right)),
80		}
81	}
82}
83
84impl_par_dist! {
85	impl<P, K, V1, V2> ParallelStream for InnerJoin<P, K, V1, V2>
86	where
87		P: ParallelStream<Item = (K, V1)>,
88		K: Eq + Hash + Clone + Send + 'static,
89		V1: 'static,
90		V2: Clone + Send + 'static,
91	{
92		type Item = (K, ImplIter<V1>, ImplIter<V2>);
93		type Task = <FilterMapSync<P, InnerJoinClosure<K, V1, V2>> as ParallelStream>::Task;
94
95		fn size_hint(&self) -> (usize, Option<usize>) {
96			self.right.size_hint()
97		}
98		fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> {
99			self.project().right.next_task(cx)
100		}
101	}
102
103	impl<P, K, V1, V2, Input> ParallelPipe<Input> for InnerJoin<P, K, V1, V2>
104	where
105		P: ParallelPipe<Input, Output = (K, V1)>,
106		K: Eq + Hash + Clone + Send + 'static,
107		V1: 'static,
108		V2: Clone + Send + 'static,
109	{
110		type Output = (K, ImplIter<V1>, ImplIter<V2>);
111		type Task = <FilterMapSync<P, InnerJoinClosure<K, V1, V2>> as ParallelPipe<Input>>::Task;
112
113		fn task(&self) -> Self::Task {
114			self.right.task()
115		}
116	}
117}
118
119FnMutNamed! {
120	pub type InnerJoinClosure<K, V1, V2> = |self, right: MultiMap<K, V2>|item=> (K, V1)| -> Option<(K, ImplIter<V1>, ImplIter<V2>)> where ; where K: Eq, K: Hash, V2: Clone {
121		self.right.get_vec(&item.0).map(|v2| {
122			(item.0, ImplIter(vec![item.1].into_iter()), ImplIter(v2.clone().into_iter()))
123		})
124	}
125}
126
127pub struct ImplIter<T>(vec::IntoIter<T>);
128impl<T> Iterator for ImplIter<T> {
129	type Item = T;
130
131	fn next(&mut self) -> Option<Self::Item> {
132		self.0.next()
133	}
134}