differential_dataflow/
difference.rs

1//! A type that can be treated as a difference.
2//!
3//! Differential dataflow most commonly tracks the counts associated with records in a multiset, but it
4//! generalizes to tracking any map from the records to an Abelian group. The most common generalization
5//! is when we maintain both a count and another accumulation, for example height. The differential
6//! dataflow collections would then track for each record the total of counts and heights, which allows
7//! us to track something like the average.
8
9use crate::Data;
10
11#[deprecated]
12pub use self::Abelian as Diff;
13
14/// A type with addition and a test for zero.
15///
16/// These traits are currently the minimal requirements for a type to be a "difference" in differential
17/// dataflow. Addition allows differential dataflow to compact multiple updates to the same data, and
18/// the test for zero allows differential dataflow to retire updates that have no effect. There is no
19/// requirement that the test for zero ever return true, and the zero value does not need to inhabit the
20/// type.
21///
22/// There is a light presumption of commutativity here, in that while we will largely perform addition
23/// in order of timestamps, for many types of timestamps there is no total order and consequently no
24/// obvious order to respect. Non-commutative semigroups should be used with care.
25pub trait Semigroup : ::std::marker::Sized + Data + Clone {
26    /// The method of `std::ops::AddAssign`, for types that do not implement `AddAssign`.
27    fn plus_equals(&mut self, rhs: &Self);
28    /// Returns true if the element is the additive identity.
29    ///
30    /// This is primarily used by differential dataflow to know when it is safe to delete an update.
31    /// When a difference accumulates to zero, the difference has no effect on any accumulation and can
32    /// be removed.
33    ///
34    /// A semigroup is not obligated to have a zero element, and this method could always return
35    /// false in such a setting.
36    fn is_zero(&self) -> bool;
37}
38
39/// A semigroup with an explicit zero element.
40pub trait Monoid : Semigroup {
41    /// A zero element under the semigroup addition operator.
42    fn zero() -> Self;
43}
44
45/// A `Monoid` with negation.
46///
47/// This trait extends the requirements of `Semigroup` to include a negation operator.
48/// Several differential dataflow operators require negation in order to retract prior outputs, but
49/// not quite as many as you might imagine.
50pub trait Abelian : Monoid {
51    /// The method of `std::ops::Neg`, for types that do not implement `Neg`.
52    fn negate(self) -> Self;
53}
54
55/// A replacement for `std::ops::Mul` for types that do not implement it.
56pub trait Multiply<Rhs = Self> {
57    /// Output type per the `Mul` trait.
58    type Output;
59    /// Core method per the `Mul` trait.
60    fn multiply(self, rhs: &Rhs) -> Self::Output;
61}
62
63/// Implementation for built-in signed integers.
64macro_rules! builtin_implementation {
65    ($t:ty) => {
66        impl Semigroup for $t {
67            #[inline] fn plus_equals(&mut self, rhs: &Self) { *self += rhs; }
68            #[inline] fn is_zero(&self) -> bool { self == &0 }
69        }
70
71        impl Monoid for $t {
72            #[inline] fn zero() -> Self { 0 }
73        }
74
75        impl Multiply<Self> for $t {
76            type Output = Self;
77            fn multiply(self, rhs: &Self) -> Self { self * rhs}
78        }
79    };
80}
81
82macro_rules! builtin_abelian_implementation {
83    ($t:ty) => {
84        impl Abelian for $t {
85            #[inline] fn negate(self) -> Self { -self }
86        }
87    };
88}
89
90builtin_implementation!(i8);
91builtin_implementation!(i16);
92builtin_implementation!(i32);
93builtin_implementation!(i64);
94builtin_implementation!(i128);
95builtin_implementation!(isize);
96builtin_implementation!(u8);
97builtin_implementation!(u16);
98builtin_implementation!(u32);
99builtin_implementation!(u64);
100builtin_implementation!(u128);
101builtin_implementation!(usize);
102
103builtin_abelian_implementation!(i8);
104builtin_abelian_implementation!(i16);
105builtin_abelian_implementation!(i32);
106builtin_abelian_implementation!(i64);
107builtin_abelian_implementation!(i128);
108builtin_abelian_implementation!(isize);
109
110/// Implementations for wrapping signed integers, which have a different zero.
111macro_rules! wrapping_implementation {
112    ($t:ty) => {
113        impl Semigroup for $t {
114            #[inline] fn plus_equals(&mut self, rhs: &Self) { *self += rhs; }
115            #[inline] fn is_zero(&self) -> bool { self == &std::num::Wrapping(0) }
116        }
117
118        impl Monoid for $t {
119            #[inline] fn zero() -> Self { std::num::Wrapping(0) }
120        }
121
122        impl Abelian for $t {
123            #[inline] fn negate(self) -> Self { -self }
124        }
125
126        impl Multiply<Self> for $t {
127            type Output = Self;
128            fn multiply(self, rhs: &Self) -> Self { self * rhs}
129        }
130    };
131}
132
133wrapping_implementation!(std::num::Wrapping<i8>);
134wrapping_implementation!(std::num::Wrapping<i16>);
135wrapping_implementation!(std::num::Wrapping<i32>);
136wrapping_implementation!(std::num::Wrapping<i64>);
137wrapping_implementation!(std::num::Wrapping<i128>);
138wrapping_implementation!(std::num::Wrapping<isize>);
139
140
141pub use self::present::Present;
142mod present {
143    use abomonation_derive::Abomonation;
144    use serde::{Deserialize, Serialize};
145
146    /// A zero-sized difference that indicates the presence of a record.
147    ///
148    /// This difference type has no negation, and present records cannot be retracted.
149    /// Addition and multiplication maintain presence, and zero does not inhabit the type.
150    ///
151    /// The primary feature of this type is that it has zero size, which reduces the overhead
152    /// of differential dataflow's representations for settings where collections either do
153    /// not change, or for which records are only added (for example, derived facts in Datalog).
154    #[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
155    pub struct Present;
156
157    impl<T: Clone> super::Multiply<T> for Present {
158        type Output = T;
159        fn multiply(self, rhs: &T) -> T {
160            rhs.clone()
161        }
162    }
163
164    impl super::Semigroup for Present {
165        fn plus_equals(&mut self, _rhs: &Self) { }
166        fn is_zero(&self) -> bool { false }
167    }
168}
169
170// Pair implementations.
171mod tuples {
172
173    use super::{Semigroup, Monoid, Abelian, Multiply};
174
175    /// Implementations for tuples. The two arguments must have the same length.
176    macro_rules! tuple_implementation {
177        ( ($($name:ident)*), ($($name2:ident)*) ) => (
178            impl<$($name: Semigroup),*> Semigroup for ($($name,)*) {
179                #[allow(non_snake_case)]
180                #[inline] fn plus_equals(&mut self, rhs: &Self) {
181                    let ($(ref mut $name,)*) = *self;
182                    let ($(ref $name2,)*) = *rhs;
183                    $($name.plus_equals($name2);)*
184                }
185                #[allow(unused_mut)]
186                #[allow(non_snake_case)]
187                #[inline] fn is_zero(&self) -> bool {
188                    let mut zero = true;
189                    let ($(ref $name,)*) = *self;
190                    $( zero &= $name.is_zero(); )*
191                    zero
192                }
193            }
194
195            impl<$($name: Monoid),*> Monoid for ($($name,)*) {
196                #[allow(non_snake_case)]
197                #[inline] fn zero() -> Self {
198                    ( $($name::zero(), )* )
199                }
200            }
201
202            impl<$($name: Abelian),*> Abelian for ($($name,)*) {
203                #[allow(non_snake_case)]
204                #[inline] fn negate(self) -> Self {
205                    let ($($name,)*) = self;
206                    ( $($name.negate(), )* )
207                }
208            }
209
210            impl<T, $($name: Multiply<T>),*> Multiply<T> for ($($name,)*) {
211                type Output = ($(<$name as Multiply<T>>::Output,)*);
212                #[allow(unused_variables)]
213                #[allow(non_snake_case)]
214                #[inline] fn multiply(self, rhs: &T) -> Self::Output {
215                    let ($($name,)*) = self;
216                    ( $($name.multiply(rhs), )* )
217                }
218            }
219        )
220    }
221
222    tuple_implementation!((), ());
223    tuple_implementation!((A1), (A2));
224    tuple_implementation!((A1 B1), (A2 B2));
225    tuple_implementation!((A1 B1 C1), (A2 B2 C2));
226    tuple_implementation!((A1 B1 C1 D1), (A2 B2 C2 D2));
227}
228
229// Vector implementations
230mod vector {
231
232    use super::{Semigroup, Monoid, Abelian, Multiply};
233
234    impl<R: Semigroup> Semigroup for Vec<R> {
235        fn plus_equals(&mut self, rhs: &Self) {
236            // Ensure sufficient length to receive addition.
237            while self.len() < rhs.len() {
238                let element = &rhs[self.len()];
239                self.push(element.clone());
240            }
241
242            // As other is not longer, apply updates without tests.
243            for (index, update) in rhs.iter().enumerate() {
244                self[index].plus_equals(update);
245            }
246        }
247        fn is_zero(&self) -> bool {
248            self.iter().all(|x| x.is_zero())
249        }
250    }
251
252    impl<R: Monoid> Monoid for Vec<R> {
253        fn zero() -> Self {
254            Self::new()
255        }
256    }
257
258    impl<R: Abelian> Abelian for Vec<R> {
259        fn negate(mut self) -> Self {
260            for update in self.iter_mut() {
261                *update = update.clone().negate();
262            }
263            self
264        }
265    }
266
267    impl<T, R: Multiply<T>> Multiply<T> for Vec<R> {
268        type Output = Vec<<R as Multiply<T>>::Output>;
269        fn multiply(self, rhs: &T) -> Self::Output {
270            self.into_iter()
271                .map(|x| x.multiply(rhs))
272                .collect()
273        }
274    }
275}