differential_dataflow/algorithms/identifiers.rs
1//! Assign unique identifiers to records.
2
3use timely::dataflow::Scope;
4
5use crate::{VecCollection, ExchangeData, Hashable};
6use crate::lattice::Lattice;
7use crate::operators::*;
8use crate::difference::Abelian;
9
10/// Assign unique identifiers to elements of a collection.
11pub trait Identifiers<G: Scope, D: ExchangeData, R: ExchangeData+Abelian> {
12 /// Assign unique identifiers to elements of a collection.
13 ///
14 /// # Example
15 /// ```
16 /// use differential_dataflow::input::Input;
17 /// use differential_dataflow::algorithms::identifiers::Identifiers;
18 ///
19 /// ::timely::example(|scope| {
20 ///
21 /// let identifiers =
22 /// scope.new_collection_from(1 .. 10).1
23 /// .identifiers()
24 /// // assert no conflicts
25 /// .map(|(data, id)| id)
26 /// .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
27 /// .assert_empty();
28 /// });
29 /// ```
30 fn identifiers(self) -> VecCollection<G, (D, u64), R>;
31}
32
33impl<G, D, R> Identifiers<G, D, R> for VecCollection<G, D, R>
34where
35 G: Scope<Timestamp: Lattice>,
36 D: ExchangeData + ::std::hash::Hash,
37 R: ExchangeData + Abelian,
38{
39 fn identifiers(self) -> VecCollection<G, (D, u64), R> {
40
41 // The design here is that we iteratively develop a collection
42 // of pairs (round, record), where each pair is a proposal that
43 // the hash for record should be (round, record).hashed().
44 //
45 // Iteratively, any colliding pairs establish a winner (the one
46 // with the lower round, breaking ties by record), and indicate
47 // that the losers should increment their round and try again.
48 //
49 // Non-obviously, this happens via a `reduce` operator that yields
50 // additions and subtractions of losers, rather than reproducing
51 // the winners. This is done under the premise that losers are
52 // very rare, and maintaining winners in both the input and output
53 // of `reduce` is an unnecessary duplication.
54
55 use crate::collection::AsCollection;
56
57 let init = self.map(|record| (0, record));
58 timely::dataflow::operators::generic::operator::empty(&init.scope())
59 .as_collection()
60 .iterate(|scope, diff|
61 init.clone()
62 .enter(&scope)
63 .concat(diff)
64 .map(|pair| (pair.hashed(), pair))
65 .reduce(|_hash, input, output| {
66 // keep round-positive records as changes.
67 let ((round, record), count) = &input[0];
68 if *round > 0 {
69 let mut neg_count = count.clone();
70 neg_count.negate();
71 output.push(((0, record.clone()), neg_count));
72 output.push(((*round, record.clone()), count.clone()));
73 }
74 // if any losers, increment their rounds.
75 for ((round, record), count) in input[1..].iter() {
76 let mut neg_count = count.clone();
77 neg_count.negate();
78 output.push(((0, record.clone()), neg_count));
79 output.push(((*round+1, record.clone()), count.clone()));
80 }
81 })
82 .map(|(_hash, pair)| pair)
83 )
84 .concat(init)
85 .map(|pair| { let hash = pair.hashed(); (pair.1, hash) })
86 }
87}
88
89#[cfg(test)]
90mod tests {
91
92 #[test]
93 fn are_unique() {
94
95 // It is hard to test the above method, because we would want
96 // to exercise the case with hash collisions. Instead, we test
97 // a version with a crippled hash function to see that even if
98 // there are collisions, everyone gets a unique identifier.
99
100 use crate::input::Input;
101 use crate::operators::iterate::Iterate;
102
103 ::timely::example(|scope| {
104
105 let input = scope.new_collection_from(1 .. 4).1;
106
107 use crate::collection::AsCollection;
108
109 let init = input.map(|record| (0, record));
110 timely::dataflow::operators::generic::operator::empty(&init.scope())
111 .as_collection()
112 .iterate(|scope, diff|
113 init.clone()
114 .enter(&scope)
115 .concat(diff)
116 .map(|(round, num)| ((round + num) / 10, (round, num)))
117 .reduce(|_hash, input, output| {
118 println!("Input: {:?}", input);
119 // keep round-positive records as changes.
120 let ((round, record), count) = &input[0];
121 if *round > 0 {
122 output.push(((0, record.clone()), -*count));
123 output.push(((*round, record.clone()), *count));
124 }
125 // if any losers, increment their rounds.
126 for ((round, record), count) in input[1..].iter() {
127 output.push(((0, record.clone()), -*count));
128 output.push(((*round+1, record.clone()), *count));
129 }
130 })
131 .inspect(|x| println!("{:?}", x))
132 .map(|(_hash, pair)| pair)
133 )
134 .concat(init)
135 .map(|(round, num)| { (num, (round + num) / 10) })
136 .map(|(_data, id)| id)
137 .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
138 .assert_empty();
139 });
140 }
141}