differential_dataflow/
difference.rs

1//! A type that can be treated as a difference.
2//!
3//! Differential dataflow most commonly tracks the counts associated with records in a multiset, but it
4//! generalizes to tracking any map from the records to an Abelian group. The most common generalization
5//! is when we maintain both a count and another accumulation, for example height. The differential
6//! dataflow collections would then track for each record the total of counts and heights, which allows
7//! us to track something like the average.
8
9#[deprecated]
10pub use self::Abelian as Diff;
11
12/// A type that can be an additive identity for all `Semigroup` implementations.
13///
14/// This method is extracted from `Semigroup` to avoid ambiguity when used.
15/// It refers exclusively to the type itself, and whether it will act as the identity
16/// in the course of `Semigroup<Self>::plus_equals()`.
17pub trait IsZero {
18    /// Returns true if the element is the additive identity.
19    ///
20    /// This is primarily used by differential dataflow to know when it is safe to delete an update.
21    /// When a difference accumulates to zero, the difference has no effect on any accumulation and can
22    /// be removed.
23    ///
24    /// A semigroup is not obligated to have a zero element, and this method could always return
25    /// false in such a setting.
26    fn is_zero(&self) -> bool;
27}
28
29/// A type with addition and a test for zero.
30///
31/// These traits are currently the minimal requirements for a type to be a "difference" in differential
32/// dataflow. Addition allows differential dataflow to compact multiple updates to the same data, and
33/// the test for zero allows differential dataflow to retire updates that have no effect. There is no
34/// requirement that the test for zero ever return true, and the zero value does not need to inhabit the
35/// type.
36///
37/// There is a light presumption of commutativity here, in that while we will largely perform addition
38/// in order of timestamps, for many types of timestamps there is no total order and consequently no
39/// obvious order to respect. Non-commutative semigroups should be used with care.
40pub trait Semigroup<Rhs: ?Sized = Self> : Clone + IsZero {
41    /// The method of `std::ops::AddAssign`, for types that do not implement `AddAssign`.
42    fn plus_equals(&mut self, rhs: &Rhs);
43}
44
45// Blanket implementation to support GATs of the form `&'a Diff`.
46impl<'a, S, T: Semigroup<S>> Semigroup<&'a S> for T {
47    fn plus_equals(&mut self, rhs: &&'a S) {
48        self.plus_equals(&**rhs);
49    }
50}
51
52/// A semigroup with an explicit zero element.
53pub trait Monoid : Semigroup {
54    /// A zero element under the semigroup addition operator.
55    fn zero() -> Self;
56}
57
58/// A `Monoid` with negation.
59///
60/// This trait extends the requirements of `Semigroup` to include a negation operator.
61/// Several differential dataflow operators require negation in order to retract prior outputs, but
62/// not quite as many as you might imagine.
63pub trait Abelian : Monoid {
64    /// The method of `std::ops::Neg`, for types that do not implement `Neg`.
65    fn negate(&mut self);
66}
67
68/// A replacement for `std::ops::Mul` for types that do not implement it.
69pub trait Multiply<Rhs = Self> {
70    /// Output type per the `Mul` trait.
71    type Output;
72    /// Core method per the `Mul` trait.
73    fn multiply(self, rhs: &Rhs) -> Self::Output;
74}
75
76/// Implementation for built-in signed integers.
77macro_rules! builtin_implementation {
78    ($t:ty) => {
79        impl IsZero for $t {
80            #[inline] fn is_zero(&self) -> bool { self == &0 }
81        }
82        impl Semigroup for $t {
83            #[inline] fn plus_equals(&mut self, rhs: &Self) { *self += rhs; }
84        }
85
86        impl Monoid for $t {
87            #[inline] fn zero() -> Self { 0 }
88        }
89
90        impl Multiply<Self> for $t {
91            type Output = Self;
92            fn multiply(self, rhs: &Self) -> Self { self * rhs}
93        }
94    };
95}
96
97macro_rules! builtin_abelian_implementation {
98    ($t:ty) => {
99        impl Abelian for $t {
100            #[inline] fn negate(&mut self) { *self = -*self; }
101        }
102    };
103}
104
105builtin_implementation!(i8);
106builtin_implementation!(i16);
107builtin_implementation!(i32);
108builtin_implementation!(i64);
109builtin_implementation!(i128);
110builtin_implementation!(isize);
111builtin_implementation!(u8);
112builtin_implementation!(u16);
113builtin_implementation!(u32);
114builtin_implementation!(u64);
115builtin_implementation!(u128);
116builtin_implementation!(usize);
117
118builtin_abelian_implementation!(i8);
119builtin_abelian_implementation!(i16);
120builtin_abelian_implementation!(i32);
121builtin_abelian_implementation!(i64);
122builtin_abelian_implementation!(i128);
123builtin_abelian_implementation!(isize);
124
125/// Implementations for wrapping signed integers, which have a different zero.
126macro_rules! wrapping_implementation {
127    ($t:ty) => {
128        impl IsZero for $t {
129            #[inline] fn is_zero(&self) -> bool { self == &std::num::Wrapping(0) }
130        }
131        impl Semigroup for $t {
132            #[inline] fn plus_equals(&mut self, rhs: &Self) { *self += rhs; }
133        }
134
135        impl Monoid for $t {
136            #[inline] fn zero() -> Self { std::num::Wrapping(0) }
137        }
138
139        impl Abelian for $t {
140            #[inline] fn negate(&mut self) { *self = -*self; }
141        }
142
143        impl Multiply<Self> for $t {
144            type Output = Self;
145            fn multiply(self, rhs: &Self) -> Self { self * rhs}
146        }
147    };
148}
149
150wrapping_implementation!(std::num::Wrapping<i8>);
151wrapping_implementation!(std::num::Wrapping<i16>);
152wrapping_implementation!(std::num::Wrapping<i32>);
153wrapping_implementation!(std::num::Wrapping<i64>);
154wrapping_implementation!(std::num::Wrapping<i128>);
155wrapping_implementation!(std::num::Wrapping<isize>);
156
157
158pub use self::present::Present;
159mod present {
160    use serde::{Deserialize, Serialize};
161
162    /// A zero-sized difference that indicates the presence of a record.
163    ///
164    /// This difference type has no negation, and present records cannot be retracted.
165    /// Addition and multiplication maintain presence, and zero does not inhabit the type.
166    ///
167    /// The primary feature of this type is that it has zero size, which reduces the overhead
168    /// of differential dataflow's representations for settings where collections either do
169    /// not change, or for which records are only added (for example, derived facts in Datalog).
170    #[derive(Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
171    pub struct Present;
172
173    impl<T: Clone> super::Multiply<T> for Present {
174        type Output = T;
175        fn multiply(self, rhs: &T) -> T {
176            rhs.clone()
177        }
178    }
179
180    impl super::IsZero for Present {
181        fn is_zero(&self) -> bool { false }
182    }
183
184    impl super::Semigroup for Present {
185        fn plus_equals(&mut self, _rhs: &Self) { }
186    }
187}
188
189// Pair implementations.
190mod tuples {
191
192    use super::{IsZero, Semigroup, Monoid, Abelian, Multiply};
193
194    /// Implementations for tuples. The two arguments must have the same length.
195    macro_rules! tuple_implementation {
196        ( ($($name:ident)*), ($($name2:ident)*) ) => (
197
198            impl<$($name: IsZero),*> IsZero for ($($name,)*) {
199                #[allow(unused_mut)]
200                #[allow(non_snake_case)]
201                #[inline] fn is_zero(&self) -> bool {
202                    let mut zero = true;
203                    let ($(ref $name,)*) = *self;
204                    $( zero &= $name.is_zero(); )*
205                    zero
206                }
207            }
208
209            impl<$($name: Semigroup),*> Semigroup for ($($name,)*) {
210                #[allow(non_snake_case)]
211                #[inline] fn plus_equals(&mut self, rhs: &Self) {
212                    let ($(ref mut $name,)*) = *self;
213                    let ($(ref $name2,)*) = *rhs;
214                    $($name.plus_equals($name2);)*
215                }
216            }
217
218            impl<$($name: Monoid),*> Monoid for ($($name,)*) {
219                #[allow(non_snake_case)]
220                #[inline] fn zero() -> Self {
221                    ( $($name::zero(), )* )
222                }
223            }
224
225            impl<$($name: Abelian),*> Abelian for ($($name,)*) {
226                #[allow(non_snake_case)]
227                #[inline] fn negate(&mut self) {
228                    let ($(ref mut $name,)*) = self;
229                    $($name.negate();)*
230                }
231            }
232
233            impl<T, $($name: Multiply<T>),*> Multiply<T> for ($($name,)*) {
234                type Output = ($(<$name as Multiply<T>>::Output,)*);
235                #[allow(unused_variables)]
236                #[allow(non_snake_case)]
237                #[inline] fn multiply(self, rhs: &T) -> Self::Output {
238                    let ($($name,)*) = self;
239                    ( $($name.multiply(rhs), )* )
240                }
241            }
242        )
243    }
244
245    tuple_implementation!((), ());
246    tuple_implementation!((A1), (A2));
247    tuple_implementation!((A1 B1), (A2 B2));
248    tuple_implementation!((A1 B1 C1), (A2 B2 C2));
249    tuple_implementation!((A1 B1 C1 D1), (A2 B2 C2 D2));
250}
251
252// Vector implementations
253mod vector {
254
255    use super::{IsZero, Semigroup, Monoid, Abelian, Multiply};
256
257    impl<R: IsZero> IsZero for Vec<R> {
258        fn is_zero(&self) -> bool {
259            self.iter().all(|x| x.is_zero())
260        }
261    }
262
263    impl<R: Semigroup> Semigroup for Vec<R> {
264        fn plus_equals(&mut self, rhs: &Self) {
265            self.plus_equals(&rhs[..])
266        }
267    }
268
269    impl<R: Semigroup> Semigroup<[R]> for Vec<R> {
270        fn plus_equals(&mut self, rhs: &[R]) {
271            // Apply all updates to existing elements
272            for (index, update) in rhs.iter().enumerate().take(self.len()) {
273                self[index].plus_equals(update);
274            }
275
276            // Clone leftover elements from `rhs`
277            while self.len() < rhs.len() {
278                let element = &rhs[self.len()];
279                self.push(element.clone());
280            }
281        }
282    }
283
284    #[cfg(test)]
285    mod tests {
286        use crate::difference::Semigroup;
287
288        #[test]
289        fn test_semigroup_vec() {
290            let mut a = vec![1,2,3];
291            a.plus_equals([1,1,1,1].as_slice());
292            assert_eq!(vec![2,3,4,1], a);
293        }
294    }
295
296    impl<R: Monoid> Monoid for Vec<R> {
297        fn zero() -> Self {
298            Self::new()
299        }
300    }
301
302    impl<R: Abelian> Abelian for Vec<R> {
303        fn negate(&mut self) {
304            for update in self.iter_mut() {
305                update.negate();
306            }
307        }
308    }
309
310    impl<T, R: Multiply<T>> Multiply<T> for Vec<R> {
311        type Output = Vec<<R as Multiply<T>>::Output>;
312        fn multiply(self, rhs: &T) -> Self::Output {
313            self.into_iter()
314                .map(|x| x.multiply(rhs))
315                .collect()
316        }
317    }
318}