1use super::{Relation, Variable};
4use std::cell::Ref;
5use std::ops::Deref;
6
7pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
13 input1: &Variable<(Key, Val1)>,
14 input2: impl JoinInput<'me, (Key, Val2)>,
15 output: &Variable<Result>,
16 mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
17) {
18 let mut results = Vec::new();
19
20 let recent1 = input1.recent();
21 let recent2 = input2.recent();
22
23 {
24 let mut closure = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2));
27
28 for batch2 in input2.stable().iter() {
29 join_helper(&recent1, &batch2, &mut closure);
30 }
31
32 for batch1 in input1.stable().iter() {
33 join_helper(&batch1, &recent2, &mut closure);
34 }
35
36 join_helper(&recent1, &recent2, &mut closure);
37 }
38
39 output.insert(Relation::from_vec(results));
40}
41
42pub(crate) fn join_into_relation<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
44 input1: &Relation<(Key, Val1)>,
45 input2: &Relation<(Key, Val2)>,
46 mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
47) -> Relation<Result> {
48 let mut results = Vec::new();
49
50 join_helper(&input1.elements, &input2.elements, |k, v1, v2| {
51 results.push(logic(k, v1, v2));
52 });
53
54 Relation::from_vec(results)
55}
56
57pub(crate) fn antijoin<'me, Key: Ord, Val: Ord, Result: Ord>(
59 input1: impl JoinInput<'me, (Key, Val)>,
60 input2: &Relation<Key>,
61 mut logic: impl FnMut(&Key, &Val) -> Result,
62) -> Relation<Result> {
63 let mut tuples2 = &input2[..];
64
65 let results = input1
66 .recent()
67 .iter()
68 .filter(|(ref key, _)| {
69 tuples2 = gallop(tuples2, |k| k < key);
70 tuples2.first() != Some(key)
71 })
72 .map(|(ref key, ref val)| logic(key, val))
73 .collect::<Vec<_>>();
74
75 Relation::from_vec(results)
76}
77
78fn join_helper<K: Ord, V1, V2>(
79 mut slice1: &[(K, V1)],
80 mut slice2: &[(K, V2)],
81 mut result: impl FnMut(&K, &V1, &V2),
82) {
83 while !slice1.is_empty() && !slice2.is_empty() {
84 use std::cmp::Ordering;
85
86 match slice1[0].0.cmp(&slice2[0].0) {
88 Ordering::Less => {
89 slice1 = gallop(slice1, |x| x.0 < slice2[0].0);
90 }
91 Ordering::Equal => {
92 let count1 = slice1.iter().take_while(|x| x.0 == slice1[0].0).count();
94 let count2 = slice2.iter().take_while(|x| x.0 == slice2[0].0).count();
95
96 for index1 in 0..count1 {
98 for s2 in slice2[..count2].iter() {
99 result(&slice1[0].0, &slice1[index1].1, &s2.1);
100 }
101 }
102
103 slice1 = &slice1[count1..];
105 slice2 = &slice2[count2..];
106 }
107 Ordering::Greater => {
108 slice2 = gallop(slice2, |x| x.0 < slice1[0].0);
109 }
110 }
111 }
112}
113
114pub(crate) fn gallop<T>(mut slice: &[T], mut cmp: impl FnMut(&T) -> bool) -> &[T] {
115 if !slice.is_empty() && cmp(&slice[0]) {
117 let mut step = 1;
118 while step < slice.len() && cmp(&slice[step]) {
119 slice = &slice[step..];
120 step <<= 1;
121 }
122
123 step >>= 1;
124 while step > 0 {
125 if step < slice.len() && cmp(&slice[step]) {
126 slice = &slice[step..];
127 }
128 step >>= 1;
129 }
130
131 slice = &slice[1..]; }
133
134 slice
135}
136
137pub trait JoinInput<'me, Tuple: Ord>: Copy {
139 type RecentTuples: Deref<Target = [Tuple]>;
143
144 type StableTuples: Deref<Target = [Relation<Tuple>]>;
148
149 fn recent(self) -> Self::RecentTuples;
151
152 fn stable(self) -> Self::StableTuples;
154}
155
156impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Variable<Tuple> {
157 type RecentTuples = Ref<'me, [Tuple]>;
158 type StableTuples = Ref<'me, [Relation<Tuple>]>;
159
160 fn recent(self) -> Self::RecentTuples {
161 Ref::map(self.recent.borrow(), |r| &r.elements[..])
162 }
163
164 fn stable(self) -> Self::StableTuples {
165 Ref::map(self.stable.borrow(), |v| &v[..])
166 }
167}
168
169impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Relation<Tuple> {
170 type RecentTuples = &'me [Tuple];
171 type StableTuples = &'me [Relation<Tuple>];
172
173 fn recent(self) -> Self::RecentTuples {
174 &[]
175 }
176
177 fn stable(self) -> Self::StableTuples {
178 std::slice::from_ref(self)
179 }
180}