Skip to main content

differential_dataflow/
difference.rs

1//! A type that can be treated as a difference.
2//!
3//! Differential dataflow most commonly tracks the counts associated with records in a multiset, but it
4//! generalizes to tracking any map from the records to an Abelian group. The most common generalization
5//! is when we maintain both a count and another accumulation, for example height. The differential
6//! dataflow collections would then track for each record the total of counts and heights, which allows
7//! us to track something like the average.
8
9/// A type that can be an additive identity for all `Semigroup` implementations.
10///
11/// This method is extracted from `Semigroup` to avoid ambiguity when used.
12/// It refers exclusively to the type itself, and whether it will act as the identity
13/// in the course of `Semigroup<Self>::plus_equals()`.
14pub trait IsZero {
15    /// Returns true if the element is the additive identity.
16    ///
17    /// This is primarily used by differential dataflow to know when it is safe to delete an update.
18    /// When a difference accumulates to zero, the difference has no effect on any accumulation and can
19    /// be removed.
20    ///
21    /// A semigroup is not obligated to have a zero element, and this method could always return
22    /// false in such a setting.
23    fn is_zero(&self) -> bool;
24}
25
26/// A type with addition and a test for zero.
27///
28/// These traits are currently the minimal requirements for a type to be a "difference" in differential
29/// dataflow. Addition allows differential dataflow to compact multiple updates to the same data, and
30/// the test for zero allows differential dataflow to retire updates that have no effect. There is no
31/// requirement that the test for zero ever return true, and the zero value does not need to inhabit the
32/// type.
33///
34/// There is a light presumption of commutativity here, in that while we will largely perform addition
35/// in order of timestamps, for many types of timestamps there is no total order and consequently no
36/// obvious order to respect. Non-commutative semigroups should be used with care.
37pub trait Semigroup<Rhs: ?Sized = Self> : Clone + IsZero {
38    /// The method of `std::ops::AddAssign`, for types that do not implement `AddAssign`.
39    fn plus_equals(&mut self, rhs: &Rhs);
40}
41
42// Blanket implementation to support GATs of the form `&'a Diff`.
43impl<'a, S, T: Semigroup<S>> Semigroup<&'a S> for T {
44    fn plus_equals(&mut self, rhs: &&'a S) {
45        self.plus_equals(&**rhs);
46    }
47}
48
49/// A semigroup with an explicit zero element.
50pub trait Monoid : Semigroup {
51    /// A zero element under the semigroup addition operator.
52    fn zero() -> Self;
53}
54
55/// A `Monoid` with negation.
56///
57/// This trait extends the requirements of `Semigroup` to include a negation operator.
58/// Several differential dataflow operators require negation in order to retract prior outputs, but
59/// not quite as many as you might imagine.
60pub trait Abelian : Monoid {
61    /// The method of `std::ops::Neg`, for types that do not implement `Neg`.
62    fn negate(&mut self);
63}
64
65/// A replacement for `std::ops::Mul` for types that do not implement it.
66pub trait Multiply<Rhs = Self> {
67    /// Output type per the `Mul` trait.
68    type Output;
69    /// Core method per the `Mul` trait.
70    fn multiply(self, rhs: &Rhs) -> Self::Output;
71}
72
73/// Implementation for built-in signed integers.
74macro_rules! builtin_implementation {
75    ($t:ty) => {
76        impl IsZero for $t {
77            #[inline] fn is_zero(&self) -> bool { self == &0 }
78        }
79        impl Semigroup for $t {
80            #[inline] fn plus_equals(&mut self, rhs: &Self) { *self += rhs; }
81        }
82
83        impl Monoid for $t {
84            #[inline] fn zero() -> Self { 0 }
85        }
86
87        impl Multiply<Self> for $t {
88            type Output = Self;
89            fn multiply(self, rhs: &Self) -> Self { self * rhs}
90        }
91    };
92}
93
94macro_rules! builtin_abelian_implementation {
95    ($t:ty) => {
96        impl Abelian for $t {
97            #[inline] fn negate(&mut self) { *self = -*self; }
98        }
99    };
100}
101
102builtin_implementation!(i8);
103builtin_implementation!(i16);
104builtin_implementation!(i32);
105builtin_implementation!(i64);
106builtin_implementation!(i128);
107builtin_implementation!(isize);
108builtin_implementation!(u8);
109builtin_implementation!(u16);
110builtin_implementation!(u32);
111builtin_implementation!(u64);
112builtin_implementation!(u128);
113builtin_implementation!(usize);
114
115builtin_abelian_implementation!(i8);
116builtin_abelian_implementation!(i16);
117builtin_abelian_implementation!(i32);
118builtin_abelian_implementation!(i64);
119builtin_abelian_implementation!(i128);
120builtin_abelian_implementation!(isize);
121
122/// Implementations for wrapping signed integers, which have a different zero.
123macro_rules! wrapping_implementation {
124    ($t:ty) => {
125        impl IsZero for $t {
126            #[inline] fn is_zero(&self) -> bool { self == &std::num::Wrapping(0) }
127        }
128        impl Semigroup for $t {
129            #[inline] fn plus_equals(&mut self, rhs: &Self) { *self += rhs; }
130        }
131
132        impl Monoid for $t {
133            #[inline] fn zero() -> Self { std::num::Wrapping(0) }
134        }
135
136        impl Abelian for $t {
137            #[inline] fn negate(&mut self) { *self = -*self; }
138        }
139
140        impl Multiply<Self> for $t {
141            type Output = Self;
142            fn multiply(self, rhs: &Self) -> Self { self * rhs}
143        }
144    };
145}
146
147wrapping_implementation!(std::num::Wrapping<i8>);
148wrapping_implementation!(std::num::Wrapping<i16>);
149wrapping_implementation!(std::num::Wrapping<i32>);
150wrapping_implementation!(std::num::Wrapping<i64>);
151wrapping_implementation!(std::num::Wrapping<i128>);
152wrapping_implementation!(std::num::Wrapping<isize>);
153
154
155pub use self::present::Present;
156mod present {
157    use serde::{Deserialize, Serialize};
158
159    /// A zero-sized difference that indicates the presence of a record.
160    ///
161    /// This difference type has no negation, and present records cannot be retracted.
162    /// Addition and multiplication maintain presence, and zero does not inhabit the type.
163    ///
164    /// The primary feature of this type is that it has zero size, which reduces the overhead
165    /// of differential dataflow's representations for settings where collections either do
166    /// not change, or for which records are only added (for example, derived facts in Datalog).
167    #[derive(Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash, columnar::Columnar)]
168    pub struct Present;
169
170    impl<T: Clone> super::Multiply<T> for Present {
171        type Output = T;
172        fn multiply(self, rhs: &T) -> T {
173            rhs.clone()
174        }
175    }
176
177    impl super::IsZero for Present {
178        fn is_zero(&self) -> bool { false }
179    }
180
181    impl super::Semigroup for Present {
182        fn plus_equals(&mut self, _rhs: &Self) { }
183    }
184
185    impl columnation::Columnation for Present {
186       type InnerRegion = columnation::CopyRegion<Present>;
187    }
188}
189
190// Pair implementations.
191mod tuples {
192
193    use super::{IsZero, Semigroup, Monoid, Abelian, Multiply};
194
195    /// Implementations for tuples. The two arguments must have the same length.
196    macro_rules! tuple_implementation {
197        ( ($($name:ident)*), ($($name2:ident)*) ) => (
198
199            impl<$($name: IsZero),*> IsZero for ($($name,)*) {
200                #[allow(unused_mut)]
201                #[allow(non_snake_case)]
202                #[inline] fn is_zero(&self) -> bool {
203                    let mut zero = true;
204                    let ($(ref $name,)*) = *self;
205                    $( zero &= $name.is_zero(); )*
206                    zero
207                }
208            }
209
210            impl<$($name: Semigroup),*> Semigroup for ($($name,)*) {
211                #[allow(non_snake_case)]
212                #[inline] fn plus_equals(&mut self, rhs: &Self) {
213                    let ($(ref mut $name,)*) = *self;
214                    let ($(ref $name2,)*) = *rhs;
215                    $($name.plus_equals($name2);)*
216                }
217            }
218
219            impl<$($name: Monoid),*> Monoid for ($($name,)*) {
220                #[allow(non_snake_case)]
221                #[inline] fn zero() -> Self {
222                    ( $($name::zero(), )* )
223                }
224            }
225
226            impl<$($name: Abelian),*> Abelian for ($($name,)*) {
227                #[allow(non_snake_case)]
228                #[inline] fn negate(&mut self) {
229                    let ($(ref mut $name,)*) = self;
230                    $($name.negate();)*
231                }
232            }
233
234            impl<T, $($name: Multiply<T>),*> Multiply<T> for ($($name,)*) {
235                type Output = ($(<$name as Multiply<T>>::Output,)*);
236                #[allow(unused_variables)]
237                #[allow(non_snake_case)]
238                #[inline] fn multiply(self, rhs: &T) -> Self::Output {
239                    let ($($name,)*) = self;
240                    ( $($name.multiply(rhs), )* )
241                }
242            }
243        )
244    }
245
246    tuple_implementation!((), ());
247    tuple_implementation!((A1), (A2));
248    tuple_implementation!((A1 B1), (A2 B2));
249    tuple_implementation!((A1 B1 C1), (A2 B2 C2));
250    tuple_implementation!((A1 B1 C1 D1), (A2 B2 C2 D2));
251}
252
253// Vector implementations
254mod vector {
255
256    use super::{IsZero, Semigroup, Monoid, Abelian, Multiply};
257
258    impl<R: IsZero> IsZero for Vec<R> {
259        fn is_zero(&self) -> bool {
260            self.iter().all(|x| x.is_zero())
261        }
262    }
263
264    impl<R: Semigroup> Semigroup for Vec<R> {
265        fn plus_equals(&mut self, rhs: &Self) {
266            self.plus_equals(&rhs[..])
267        }
268    }
269
270    impl<R: Semigroup> Semigroup<[R]> for Vec<R> {
271        fn plus_equals(&mut self, rhs: &[R]) {
272            // Apply all updates to existing elements
273            for (index, update) in rhs.iter().enumerate().take(self.len()) {
274                self[index].plus_equals(update);
275            }
276
277            // Clone leftover elements from `rhs`
278            while self.len() < rhs.len() {
279                let element = &rhs[self.len()];
280                self.push(element.clone());
281            }
282        }
283    }
284
285    #[cfg(test)]
286    mod tests {
287        use crate::difference::Semigroup;
288
289        #[test]
290        fn test_semigroup_vec() {
291            let mut a = vec![1,2,3];
292            a.plus_equals([1,1,1,1].as_slice());
293            assert_eq!(vec![2,3,4,1], a);
294        }
295    }
296
297    impl<R: Monoid> Monoid for Vec<R> {
298        fn zero() -> Self {
299            Self::new()
300        }
301    }
302
303    impl<R: Abelian> Abelian for Vec<R> {
304        fn negate(&mut self) {
305            for update in self.iter_mut() {
306                update.negate();
307            }
308        }
309    }
310
311    impl<T, R: Multiply<T>> Multiply<T> for Vec<R> {
312        type Output = Vec<<R as Multiply<T>>::Output>;
313        fn multiply(self, rhs: &T) -> Self::Output {
314            self.into_iter()
315                .map(|x| x.multiply(rhs))
316                .collect()
317        }
318    }
319}