differential_dataflow/algorithms/identifiers.rs
1//! Assign unique identifiers to records.
2
3use timely::dataflow::Scope;
4
5use crate::{Collection, ExchangeData, Hashable};
6use crate::lattice::Lattice;
7use crate::operators::*;
8use crate::difference::Abelian;
9
10/// Assign unique identifiers to elements of a collection.
11pub trait Identifiers<G: Scope, D: ExchangeData, R: ExchangeData+Abelian> {
12 /// Assign unique identifiers to elements of a collection.
13 ///
14 /// # Example
15 /// ```
16 /// use differential_dataflow::input::Input;
17 /// use differential_dataflow::algorithms::identifiers::Identifiers;
18 /// use differential_dataflow::operators::Threshold;
19 ///
20 /// ::timely::example(|scope| {
21 ///
22 /// let identifiers =
23 /// scope.new_collection_from(1 .. 10).1
24 /// .identifiers()
25 /// // assert no conflicts
26 /// .map(|(data, id)| id)
27 /// .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
28 /// .assert_empty();
29 /// });
30 /// ```
31 fn identifiers(&self) -> Collection<G, (D, u64), R>;
32}
33
34impl<G, D, R> Identifiers<G, D, R> for Collection<G, D, R>
35where
36 G: Scope,
37 G::Timestamp: Lattice,
38 D: ExchangeData+::std::hash::Hash,
39 R: ExchangeData+Abelian,
40{
41 fn identifiers(&self) -> Collection<G, (D, u64), R> {
42
43 // The design here is that we iteratively develop a collection
44 // of pairs (round, record), where each pair is a proposal that
45 // the hash for record should be (round, record).hashed().
46 //
47 // Iteratively, any colliding pairs establish a winner (the one
48 // with the lower round, breaking ties by record), and indicate
49 // that the losers should increment their round and try again.
50 //
51 // Non-obviously, this happens via a `reduce` operator that yields
52 // additions and subtractions of losers, rather than reproducing
53 // the winners. This is done under the premise that losers are
54 // very rare, and maintaining winners in both the input and output
55 // of `reduce` is an unneccesary duplication.
56
57 use crate::collection::AsCollection;
58
59 let init = self.map(|record| (0, record));
60 timely::dataflow::operators::generic::operator::empty(&init.scope())
61 .as_collection()
62 .iterate(|diff|
63 init.enter(&diff.scope())
64 .concat(diff)
65 .map(|pair| (pair.hashed(), pair))
66 .reduce(|_hash, input, output| {
67 // keep round-positive records as changes.
68 let ((round, record), count) = &input[0];
69 if *round > 0 {
70 output.push(((0, record.clone()), count.clone().negate()));
71 output.push(((*round, record.clone()), count.clone()));
72 }
73 // if any losers, increment their rounds.
74 for ((round, record), count) in input[1..].iter() {
75 output.push(((0, record.clone()), count.clone().negate()));
76 output.push(((*round+1, record.clone()), count.clone()));
77 }
78 })
79 .map(|(_hash, pair)| pair)
80 )
81 .concat(&init)
82 .map(|pair| { let hash = pair.hashed(); (pair.1, hash) })
83 }
84}
85
86#[cfg(test)]
87mod tests {
88
89 #[test]
90 fn are_unique() {
91
92 // It is hard to test the above method, because we would want
93 // to exercise the case with hash collisions. Instead, we test
94 // a version with a crippled hash function to see that even if
95 // there are collisions, everyone gets a unique identifier.
96
97 use crate::input::Input;
98 use crate::operators::{Threshold, Reduce};
99 use crate::operators::iterate::Iterate;
100
101 ::timely::example(|scope| {
102
103 let input = scope.new_collection_from(1 .. 4).1;
104
105 use crate::collection::AsCollection;
106
107 let init = input.map(|record| (0, record));
108 timely::dataflow::operators::generic::operator::empty(&init.scope())
109 .as_collection()
110 .iterate(|diff|
111 init.enter(&diff.scope())
112 .concat(&diff)
113 .map(|(round, num)| ((round + num) / 10, (round, num)))
114 .reduce(|_hash, input, output| {
115 println!("Input: {:?}", input);
116 // keep round-positive records as changes.
117 let ((round, record), count) = &input[0];
118 if *round > 0 {
119 output.push(((0, record.clone()), -*count));
120 output.push(((*round, record.clone()), *count));
121 }
122 // if any losers, increment their rounds.
123 for ((round, record), count) in input[1..].iter() {
124 output.push(((0, record.clone()), -*count));
125 output.push(((*round+1, record.clone()), *count));
126 }
127 })
128 .inspect(|x| println!("{:?}", x))
129 .map(|(_hash, pair)| pair)
130 )
131 .concat(&init)
132 .map(|(round, num)| { (num, (round + num) / 10) })
133 .map(|(_data, id)| id)
134 .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
135 .assert_empty();
136 });
137 }
138}