Skip to main content

palimpsest_dataflow/
difference.rs

1//! A type that can be treated as a difference.
2//!
3//! Differential dataflow most commonly tracks the counts associated with records in a multiset, but it
4//! generalizes to tracking any map from the records to an Abelian group. The most common generalization
5//! is when we maintain both a count and another accumulation, for example height. The differential
6//! dataflow collections would then track for each record the total of counts and heights, which allows
7//! us to track something like the average.
8
9/// A type that can be an additive identity for all `Semigroup` implementations.
10///
11/// This method is extracted from `Semigroup` to avoid ambiguity when used.
12/// It refers exclusively to the type itself, and whether it will act as the identity
13/// in the course of `Semigroup<Self>::plus_equals()`.
14pub trait IsZero {
15    /// Returns true if the element is the additive identity.
16    ///
17    /// This is primarily used by differential dataflow to know when it is safe to delete an update.
18    /// When a difference accumulates to zero, the difference has no effect on any accumulation and can
19    /// be removed.
20    ///
21    /// A semigroup is not obligated to have a zero element, and this method could always return
22    /// false in such a setting.
23    fn is_zero(&self) -> bool;
24}
25
26/// A type with addition and a test for zero.
27///
28/// These traits are currently the minimal requirements for a type to be a "difference" in differential
29/// dataflow. Addition allows differential dataflow to compact multiple updates to the same data, and
30/// the test for zero allows differential dataflow to retire updates that have no effect. There is no
31/// requirement that the test for zero ever return true, and the zero value does not need to inhabit the
32/// type.
33///
34/// There is a light presumption of commutativity here, in that while we will largely perform addition
35/// in order of timestamps, for many types of timestamps there is no total order and consequently no
36/// obvious order to respect. Non-commutative semigroups should be used with care.
37pub trait Semigroup<Rhs: ?Sized = Self>: Clone + IsZero {
38    /// The method of `std::ops::AddAssign`, for types that do not implement `AddAssign`.
39    fn plus_equals(&mut self, rhs: &Rhs);
40}
41
42// Blanket implementation to support GATs of the form `&'a Diff`.
43impl<'a, S, T: Semigroup<S>> Semigroup<&'a S> for T {
44    fn plus_equals(&mut self, rhs: &&'a S) {
45        self.plus_equals(&**rhs);
46    }
47}
48
49/// A semigroup with an explicit zero element.
50pub trait Monoid: Semigroup {
51    /// A zero element under the semigroup addition operator.
52    fn zero() -> Self;
53}
54
55/// A `Monoid` with negation.
56///
57/// This trait extends the requirements of `Semigroup` to include a negation operator.
58/// Several differential dataflow operators require negation in order to retract prior outputs, but
59/// not quite as many as you might imagine.
60pub trait Abelian: Monoid {
61    /// The method of `std::ops::Neg`, for types that do not implement `Neg`.
62    fn negate(&mut self);
63}
64
65/// A replacement for `std::ops::Mul` for types that do not implement it.
66pub trait Multiply<Rhs = Self> {
67    /// Output type per the `Mul` trait.
68    type Output;
69    /// Core method per the `Mul` trait.
70    fn multiply(self, rhs: &Rhs) -> Self::Output;
71}
72
73/// Implementation for built-in signed integers.
74macro_rules! builtin_implementation {
75    ($t:ty) => {
76        impl IsZero for $t {
77            #[inline]
78            fn is_zero(&self) -> bool {
79                self == &0
80            }
81        }
82        impl Semigroup for $t {
83            #[inline]
84            fn plus_equals(&mut self, rhs: &Self) {
85                *self += rhs;
86            }
87        }
88
89        impl Monoid for $t {
90            #[inline]
91            fn zero() -> Self {
92                0
93            }
94        }
95
96        impl Multiply<Self> for $t {
97            type Output = Self;
98            fn multiply(self, rhs: &Self) -> Self {
99                self * rhs
100            }
101        }
102    };
103}
104
105macro_rules! builtin_abelian_implementation {
106    ($t:ty) => {
107        impl Abelian for $t {
108            #[inline]
109            fn negate(&mut self) {
110                *self = -*self;
111            }
112        }
113    };
114}
115
116builtin_implementation!(i8);
117builtin_implementation!(i16);
118builtin_implementation!(i32);
119builtin_implementation!(i64);
120builtin_implementation!(i128);
121builtin_implementation!(isize);
122builtin_implementation!(u8);
123builtin_implementation!(u16);
124builtin_implementation!(u32);
125builtin_implementation!(u64);
126builtin_implementation!(u128);
127builtin_implementation!(usize);
128
129builtin_abelian_implementation!(i8);
130builtin_abelian_implementation!(i16);
131builtin_abelian_implementation!(i32);
132builtin_abelian_implementation!(i64);
133builtin_abelian_implementation!(i128);
134builtin_abelian_implementation!(isize);
135
136/// Implementations for wrapping signed integers, which have a different zero.
137macro_rules! wrapping_implementation {
138    ($t:ty) => {
139        impl IsZero for $t {
140            #[inline]
141            fn is_zero(&self) -> bool {
142                self == &std::num::Wrapping(0)
143            }
144        }
145        impl Semigroup for $t {
146            #[inline]
147            fn plus_equals(&mut self, rhs: &Self) {
148                *self += rhs;
149            }
150        }
151
152        impl Monoid for $t {
153            #[inline]
154            fn zero() -> Self {
155                std::num::Wrapping(0)
156            }
157        }
158
159        impl Abelian for $t {
160            #[inline]
161            fn negate(&mut self) {
162                *self = -*self;
163            }
164        }
165
166        impl Multiply<Self> for $t {
167            type Output = Self;
168            fn multiply(self, rhs: &Self) -> Self {
169                self * rhs
170            }
171        }
172    };
173}
174
175wrapping_implementation!(std::num::Wrapping<i8>);
176wrapping_implementation!(std::num::Wrapping<i16>);
177wrapping_implementation!(std::num::Wrapping<i32>);
178wrapping_implementation!(std::num::Wrapping<i64>);
179wrapping_implementation!(std::num::Wrapping<i128>);
180wrapping_implementation!(std::num::Wrapping<isize>);
181
182pub use self::present::Present;
183mod present {
184    use serde::{Deserialize, Serialize};
185
186    /// A zero-sized difference that indicates the presence of a record.
187    ///
188    /// This difference type has no negation, and present records cannot be retracted.
189    /// Addition and multiplication maintain presence, and zero does not inhabit the type.
190    ///
191    /// The primary feature of this type is that it has zero size, which reduces the overhead
192    /// of differential dataflow's representations for settings where collections either do
193    /// not change, or for which records are only added (for example, derived facts in Datalog).
194    #[derive(Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
195    pub struct Present;
196
197    impl<T: Clone> super::Multiply<T> for Present {
198        type Output = T;
199        fn multiply(self, rhs: &T) -> T {
200            rhs.clone()
201        }
202    }
203
204    impl super::IsZero for Present {
205        fn is_zero(&self) -> bool {
206            false
207        }
208    }
209
210    impl super::Semigroup for Present {
211        fn plus_equals(&mut self, _rhs: &Self) {}
212    }
213}
214
215// Pair implementations.
216mod tuples {
217
218    use super::{Abelian, IsZero, Monoid, Multiply, Semigroup};
219
220    /// Implementations for tuples. The two arguments must have the same length.
221    macro_rules! tuple_implementation {
222        ( ($($name:ident)*), ($($name2:ident)*) ) => (
223
224            impl<$($name: IsZero),*> IsZero for ($($name,)*) {
225                #[allow(unused_mut)]
226                #[allow(non_snake_case)]
227                #[inline] fn is_zero(&self) -> bool {
228                    let mut zero = true;
229                    let ($(ref $name,)*) = *self;
230                    $( zero &= $name.is_zero(); )*
231                    zero
232                }
233            }
234
235            impl<$($name: Semigroup),*> Semigroup for ($($name,)*) {
236                #[allow(non_snake_case)]
237                #[inline] fn plus_equals(&mut self, rhs: &Self) {
238                    let ($(ref mut $name,)*) = *self;
239                    let ($(ref $name2,)*) = *rhs;
240                    $($name.plus_equals($name2);)*
241                }
242            }
243
244            impl<$($name: Monoid),*> Monoid for ($($name,)*) {
245                #[allow(non_snake_case)]
246                #[inline] fn zero() -> Self {
247                    ( $($name::zero(), )* )
248                }
249            }
250
251            impl<$($name: Abelian),*> Abelian for ($($name,)*) {
252                #[allow(non_snake_case)]
253                #[inline] fn negate(&mut self) {
254                    let ($(ref mut $name,)*) = self;
255                    $($name.negate();)*
256                }
257            }
258
259            impl<T, $($name: Multiply<T>),*> Multiply<T> for ($($name,)*) {
260                type Output = ($(<$name as Multiply<T>>::Output,)*);
261                #[allow(unused_variables)]
262                #[allow(non_snake_case)]
263                #[inline] fn multiply(self, rhs: &T) -> Self::Output {
264                    let ($($name,)*) = self;
265                    ( $($name.multiply(rhs), )* )
266                }
267            }
268        )
269    }
270
271    tuple_implementation!((), ());
272    tuple_implementation!((A1), (A2));
273    tuple_implementation!((A1 B1), (A2 B2));
274    tuple_implementation!((A1 B1 C1), (A2 B2 C2));
275    tuple_implementation!((A1 B1 C1 D1), (A2 B2 C2 D2));
276}
277
278// Vector implementations
279mod vector {
280
281    use super::{Abelian, IsZero, Monoid, Multiply, Semigroup};
282
283    impl<R: IsZero> IsZero for Vec<R> {
284        fn is_zero(&self) -> bool {
285            self.iter().all(|x| x.is_zero())
286        }
287    }
288
289    impl<R: Semigroup> Semigroup for Vec<R> {
290        fn plus_equals(&mut self, rhs: &Self) {
291            self.plus_equals(&rhs[..])
292        }
293    }
294
295    impl<R: Semigroup> Semigroup<[R]> for Vec<R> {
296        fn plus_equals(&mut self, rhs: &[R]) {
297            // Apply all updates to existing elements
298            for (index, update) in rhs.iter().enumerate().take(self.len()) {
299                self[index].plus_equals(update);
300            }
301
302            // Clone leftover elements from `rhs`
303            while self.len() < rhs.len() {
304                let element = &rhs[self.len()];
305                self.push(element.clone());
306            }
307        }
308    }
309
310    #[cfg(test)]
311    mod tests {
312        use crate::difference::Semigroup;
313
314        #[test]
315        fn test_semigroup_vec() {
316            let mut a = vec![1, 2, 3];
317            a.plus_equals([1, 1, 1, 1].as_slice());
318            assert_eq!(vec![2, 3, 4, 1], a);
319        }
320    }
321
322    impl<R: Monoid> Monoid for Vec<R> {
323        fn zero() -> Self {
324            Self::new()
325        }
326    }
327
328    impl<R: Abelian> Abelian for Vec<R> {
329        fn negate(&mut self) {
330            for update in self.iter_mut() {
331                update.negate();
332            }
333        }
334    }
335
336    impl<T, R: Multiply<T>> Multiply<T> for Vec<R> {
337        type Output = Vec<<R as Multiply<T>>::Output>;
338        fn multiply(self, rhs: &T) -> Self::Output {
339            self.into_iter().map(|x| x.multiply(rhs)).collect()
340        }
341    }
342}