datafrog/
join.rs

1//! Join functionality.
2
3use super::{Relation, Variable};
4use std::cell::Ref;
5use std::ops::Deref;
6
7/// Implements `join`. Note that `input1` must be a variable, but
8/// `input2` can be either a variable or a relation. This is necessary
9/// because relations have no "recent" tuples, so the fn would be a
10/// guaranteed no-op if both arguments were relations.  See also
11/// `join_into_relation`.
12pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
13    input1: &Variable<(Key, Val1)>,
14    input2: impl JoinInput<'me, (Key, Val2)>,
15    output: &Variable<Result>,
16    mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
17) {
18    let mut results = Vec::new();
19
20    let recent1 = input1.recent();
21    let recent2 = input2.recent();
22
23    {
24        // scoped to let `closure` drop borrow of `results`.
25
26        let mut closure = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2));
27
28        for batch2 in input2.stable().iter() {
29            join_helper(&recent1, &batch2, &mut closure);
30        }
31
32        for batch1 in input1.stable().iter() {
33            join_helper(&batch1, &recent2, &mut closure);
34        }
35
36        join_helper(&recent1, &recent2, &mut closure);
37    }
38
39    output.insert(Relation::from_vec(results));
40}
41
42/// Join, but for two relations.
43pub(crate) fn join_into_relation<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
44    input1: &Relation<(Key, Val1)>,
45    input2: &Relation<(Key, Val2)>,
46    mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
47) -> Relation<Result> {
48    let mut results = Vec::new();
49
50    join_helper(&input1.elements, &input2.elements, |k, v1, v2| {
51        results.push(logic(k, v1, v2));
52    });
53
54    Relation::from_vec(results)
55}
56
57/// Moves all recent tuples from `input1` that are not present in `input2` into `output`.
58pub(crate) fn antijoin<'me, Key: Ord, Val: Ord, Result: Ord>(
59    input1: impl JoinInput<'me, (Key, Val)>,
60    input2: &Relation<Key>,
61    mut logic: impl FnMut(&Key, &Val) -> Result,
62) -> Relation<Result> {
63    let mut tuples2 = &input2[..];
64
65    let results = input1
66        .recent()
67        .iter()
68        .filter(|(ref key, _)| {
69            tuples2 = gallop(tuples2, |k| k < key);
70            tuples2.first() != Some(key)
71        })
72        .map(|(ref key, ref val)| logic(key, val))
73        .collect::<Vec<_>>();
74
75    Relation::from_vec(results)
76}
77
78fn join_helper<K: Ord, V1, V2>(
79    mut slice1: &[(K, V1)],
80    mut slice2: &[(K, V2)],
81    mut result: impl FnMut(&K, &V1, &V2),
82) {
83    while !slice1.is_empty() && !slice2.is_empty() {
84        use std::cmp::Ordering;
85
86        // If the keys match produce tuples, else advance the smaller key until they might.
87        match slice1[0].0.cmp(&slice2[0].0) {
88            Ordering::Less => {
89                slice1 = gallop(slice1, |x| x.0 < slice2[0].0);
90            }
91            Ordering::Equal => {
92                // Determine the number of matching keys in each slice.
93                let count1 = slice1.iter().take_while(|x| x.0 == slice1[0].0).count();
94                let count2 = slice2.iter().take_while(|x| x.0 == slice2[0].0).count();
95
96                // Produce results from the cross-product of matches.
97                for index1 in 0..count1 {
98                    for s2 in slice2[..count2].iter() {
99                        result(&slice1[0].0, &slice1[index1].1, &s2.1);
100                    }
101                }
102
103                // Advance slices past this key.
104                slice1 = &slice1[count1..];
105                slice2 = &slice2[count2..];
106            }
107            Ordering::Greater => {
108                slice2 = gallop(slice2, |x| x.0 < slice1[0].0);
109            }
110        }
111    }
112}
113
114pub(crate) fn gallop<T>(mut slice: &[T], mut cmp: impl FnMut(&T) -> bool) -> &[T] {
115    // if empty slice, or already >= element, return
116    if !slice.is_empty() && cmp(&slice[0]) {
117        let mut step = 1;
118        while step < slice.len() && cmp(&slice[step]) {
119            slice = &slice[step..];
120            step <<= 1;
121        }
122
123        step >>= 1;
124        while step > 0 {
125            if step < slice.len() && cmp(&slice[step]) {
126                slice = &slice[step..];
127            }
128            step >>= 1;
129        }
130
131        slice = &slice[1..]; // advance one, as we always stayed < value
132    }
133
134    slice
135}
136
137/// An input that can be used with `from_join`; either a `Variable` or a `Relation`.
138pub trait JoinInput<'me, Tuple: Ord>: Copy {
139    /// If we are on iteration N of the loop, these are the tuples
140    /// added on iteration N-1. (For a `Relation`, this is always an
141    /// empty slice.)
142    type RecentTuples: Deref<Target = [Tuple]>;
143
144    /// If we are on iteration N of the loop, these are the tuples
145    /// added on iteration N - 2 or before. (For a `Relation`, this is
146    /// just `self`.)
147    type StableTuples: Deref<Target = [Relation<Tuple>]>;
148
149    /// Get the set of recent tuples.
150    fn recent(self) -> Self::RecentTuples;
151
152    /// Get the set of stable tuples.
153    fn stable(self) -> Self::StableTuples;
154}
155
156impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Variable<Tuple> {
157    type RecentTuples = Ref<'me, [Tuple]>;
158    type StableTuples = Ref<'me, [Relation<Tuple>]>;
159
160    fn recent(self) -> Self::RecentTuples {
161        Ref::map(self.recent.borrow(), |r| &r.elements[..])
162    }
163
164    fn stable(self) -> Self::StableTuples {
165        Ref::map(self.stable.borrow(), |v| &v[..])
166    }
167}
168
169impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Relation<Tuple> {
170    type RecentTuples = &'me [Tuple];
171    type StableTuples = &'me [Relation<Tuple>];
172
173    fn recent(self) -> Self::RecentTuples {
174        &[]
175    }
176
177    fn stable(self) -> Self::StableTuples {
178        std::slice::from_ref(self)
179    }
180}