differential_dataflow/algorithms/
identifiers.rs

1//! Assign unique identifiers to records.
2
3use timely::dataflow::Scope;
4
5use crate::{Collection, ExchangeData, Hashable};
6use crate::lattice::Lattice;
7use crate::operators::*;
8use crate::difference::Abelian;
9
10/// Assign unique identifiers to elements of a collection.
11pub trait Identifiers<G: Scope, D: ExchangeData, R: ExchangeData+Abelian> {
12    /// Assign unique identifiers to elements of a collection.
13    ///
14    /// # Example
15    /// ```
16    /// use differential_dataflow::input::Input;
17    /// use differential_dataflow::algorithms::identifiers::Identifiers;
18    /// use differential_dataflow::operators::Threshold;
19    ///
20    /// ::timely::example(|scope| {
21    ///
22    ///     let identifiers =
23    ///     scope.new_collection_from(1 .. 10).1
24    ///          .identifiers()
25    ///          // assert no conflicts
26    ///          .map(|(data, id)| id)
27    ///          .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
28    ///          .assert_empty();
29    /// });
30    /// ```
31    fn identifiers(&self) -> Collection<G, (D, u64), R>;
32}
33
34impl<G, D, R> Identifiers<G, D, R> for Collection<G, D, R>
35where
36    G: Scope,
37    G::Timestamp: Lattice,
38    D: ExchangeData+::std::hash::Hash,
39    R: ExchangeData+Abelian,
40{
41    fn identifiers(&self) -> Collection<G, (D, u64), R> {
42
43        // The design here is that we iteratively develop a collection
44        // of pairs (round, record), where each pair is a proposal that
45        // the hash for record should be (round, record).hashed().
46        //
47        // Iteratively, any colliding pairs establish a winner (the one
48        // with the lower round, breaking ties by record), and indicate
49        // that the losers should increment their round and try again.
50        //
51        // Non-obviously, this happens via a `reduce` operator that yields
52        // additions and subtractions of losers, rather than reproducing
53        // the winners. This is done under the premise that losers are
54        // very rare, and maintaining winners in both the input and output
55        // of `reduce` is an unneccesary duplication.
56
57        use crate::collection::AsCollection;
58
59        let init = self.map(|record| (0, record));
60        timely::dataflow::operators::generic::operator::empty(&init.scope())
61            .as_collection()
62            .iterate(|diff|
63                init.enter(&diff.scope())
64                    .concat(diff)
65                    .map(|pair| (pair.hashed(), pair))
66                    .reduce(|_hash, input, output| {
67                        // keep round-positive records as changes.
68                        let ((round, record), count) = &input[0];
69                        if *round > 0 {
70                            output.push(((0, record.clone()), count.clone().negate()));
71                            output.push(((*round, record.clone()), count.clone()));
72                        }
73                        // if any losers, increment their rounds.
74                        for ((round, record), count) in input[1..].iter() {
75                            output.push(((0, record.clone()), count.clone().negate()));
76                            output.push(((*round+1, record.clone()), count.clone()));
77                        }
78                    })
79                    .map(|(_hash, pair)| pair)
80            )
81            .concat(&init)
82            .map(|pair| { let hash = pair.hashed(); (pair.1, hash) })
83    }
84}
85
86#[cfg(test)]
87mod tests {
88
89    #[test]
90    fn are_unique() {
91
92        // It is hard to test the above method, because we would want
93        // to exercise the case with hash collisions. Instead, we test
94        // a version with a crippled hash function to see that even if
95        // there are collisions, everyone gets a unique identifier.
96
97        use crate::input::Input;
98        use crate::operators::{Threshold, Reduce};
99        use crate::operators::iterate::Iterate;
100
101        ::timely::example(|scope| {
102
103            let input = scope.new_collection_from(1 .. 4).1;
104
105            use crate::collection::AsCollection;
106
107            let init = input.map(|record| (0, record));
108            timely::dataflow::operators::generic::operator::empty(&init.scope())
109                .as_collection()
110                .iterate(|diff|
111                    init.enter(&diff.scope())
112                        .concat(&diff)
113                        .map(|(round, num)| ((round + num) / 10, (round, num)))
114                        .reduce(|_hash, input, output| {
115                            println!("Input: {:?}", input);
116                            // keep round-positive records as changes.
117                            let ((round, record), count) = &input[0];
118                            if *round > 0 {
119                                output.push(((0, record.clone()), -*count));
120                                output.push(((*round, record.clone()), *count));
121                            }
122                            // if any losers, increment their rounds.
123                            for ((round, record), count) in input[1..].iter() {
124                                output.push(((0, record.clone()), -*count));
125                                output.push(((*round+1, record.clone()), *count));
126                            }
127                        })
128                        .inspect(|x| println!("{:?}", x))
129                        .map(|(_hash, pair)| pair)
130                )
131                .concat(&init)
132                .map(|(round, num)| { (num, (round + num) / 10) })
133                .map(|(_data, id)| id)
134                .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
135                .assert_empty();
136        });
137    }
138}