Skip to main content

differential_dataflow/algorithms/
identifiers.rs

1//! Assign unique identifiers to records.
2
3use timely::dataflow::Scope;
4
5use crate::{VecCollection, ExchangeData, Hashable};
6use crate::lattice::Lattice;
7use crate::operators::*;
8use crate::difference::Abelian;
9
10/// Assign unique identifiers to elements of a collection.
11pub trait Identifiers<G: Scope, D: ExchangeData, R: ExchangeData+Abelian> {
12    /// Assign unique identifiers to elements of a collection.
13    ///
14    /// # Example
15    /// ```
16    /// use differential_dataflow::input::Input;
17    /// use differential_dataflow::algorithms::identifiers::Identifiers;
18    ///
19    /// ::timely::example(|scope| {
20    ///
21    ///     let identifiers =
22    ///     scope.new_collection_from(1 .. 10).1
23    ///          .identifiers()
24    ///          // assert no conflicts
25    ///          .map(|(data, id)| id)
26    ///          .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
27    ///          .assert_empty();
28    /// });
29    /// ```
30    fn identifiers(self) -> VecCollection<G, (D, u64), R>;
31}
32
33impl<G, D, R> Identifiers<G, D, R> for VecCollection<G, D, R>
34where
35    G: Scope<Timestamp: Lattice>,
36    D: ExchangeData + ::std::hash::Hash,
37    R: ExchangeData + Abelian,
38{
39    fn identifiers(self) -> VecCollection<G, (D, u64), R> {
40
41        // The design here is that we iteratively develop a collection
42        // of pairs (round, record), where each pair is a proposal that
43        // the hash for record should be (round, record).hashed().
44        //
45        // Iteratively, any colliding pairs establish a winner (the one
46        // with the lower round, breaking ties by record), and indicate
47        // that the losers should increment their round and try again.
48        //
49        // Non-obviously, this happens via a `reduce` operator that yields
50        // additions and subtractions of losers, rather than reproducing
51        // the winners. This is done under the premise that losers are
52        // very rare, and maintaining winners in both the input and output
53        // of `reduce` is an unnecessary duplication.
54
55        use crate::collection::AsCollection;
56
57        let init = self.map(|record| (0, record));
58        timely::dataflow::operators::generic::operator::empty(&init.scope())
59            .as_collection()
60            .iterate(|scope, diff|
61                init.clone()
62                    .enter(&scope)
63                    .concat(diff)
64                    .map(|pair| (pair.hashed(), pair))
65                    .reduce(|_hash, input, output| {
66                        // keep round-positive records as changes.
67                        let ((round, record), count) = &input[0];
68                        if *round > 0 {
69                            let mut neg_count = count.clone();
70                            neg_count.negate();
71                            output.push(((0, record.clone()), neg_count));
72                            output.push(((*round, record.clone()), count.clone()));
73                        }
74                        // if any losers, increment their rounds.
75                        for ((round, record), count) in input[1..].iter() {
76                            let mut neg_count = count.clone();
77                            neg_count.negate();
78                            output.push(((0, record.clone()), neg_count));
79                            output.push(((*round+1, record.clone()), count.clone()));
80                        }
81                    })
82                    .map(|(_hash, pair)| pair)
83            )
84            .concat(init)
85            .map(|pair| { let hash = pair.hashed(); (pair.1, hash) })
86    }
87}
88
89#[cfg(test)]
90mod tests {
91
92    #[test]
93    fn are_unique() {
94
95        // It is hard to test the above method, because we would want
96        // to exercise the case with hash collisions. Instead, we test
97        // a version with a crippled hash function to see that even if
98        // there are collisions, everyone gets a unique identifier.
99
100        use crate::input::Input;
101        use crate::operators::iterate::Iterate;
102
103        ::timely::example(|scope| {
104
105            let input = scope.new_collection_from(1 .. 4).1;
106
107            use crate::collection::AsCollection;
108
109            let init = input.map(|record| (0, record));
110            timely::dataflow::operators::generic::operator::empty(&init.scope())
111                .as_collection()
112                .iterate(|scope, diff|
113                    init.clone()
114                        .enter(&scope)
115                        .concat(diff)
116                        .map(|(round, num)| ((round + num) / 10, (round, num)))
117                        .reduce(|_hash, input, output| {
118                            println!("Input: {:?}", input);
119                            // keep round-positive records as changes.
120                            let ((round, record), count) = &input[0];
121                            if *round > 0 {
122                                output.push(((0, record.clone()), -*count));
123                                output.push(((*round, record.clone()), *count));
124                            }
125                            // if any losers, increment their rounds.
126                            for ((round, record), count) in input[1..].iter() {
127                                output.push(((0, record.clone()), -*count));
128                                output.push(((*round+1, record.clone()), *count));
129                            }
130                        })
131                        .inspect(|x| println!("{:?}", x))
132                        .map(|(_hash, pair)| pair)
133                )
134                .concat(init)
135                .map(|(round, num)| { (num, (round + num) / 10) })
136                .map(|(_data, id)| id)
137                .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
138                .assert_empty();
139        });
140    }
141}