timely/progress/
change_batch.rs

1//! A collection of updates of the form `(T, i64)`.
2
3/// A collection of updates of the form `(T, i64)`.
4///
5/// A `ChangeBatch` accumulates updates of the form `(T, i64)`, where it is capable of consolidating
6/// the representation and removing elements whose `i64` field accumulates to zero.
7///
8/// The implementation is designed to be as lazy as possible, simply appending to a list of updates
9/// until they are required. This means that several seemingly simple operations may be expensive, in
10/// that they may provoke a compaction. I've tried to prevent exposing methods that allow surprisingly
11/// expensive operations; all operations should take an amortized constant or logarithmic time.
12#[derive(Clone, Debug, Eq, PartialEq, Abomonation, Serialize, Deserialize)]
13pub struct ChangeBatch<T> {
14    // A list of updates to which we append.
15    updates: Vec<(T, i64)>,
16    // The length of the prefix of `self.updates` known to be compact.
17    clean: usize,
18}
19
20impl<T> ChangeBatch<T> {
21
22    /// Allocates a new empty `ChangeBatch`.
23    ///
24    /// # Examples
25    ///
26    ///```
27    /// use timely::progress::ChangeBatch;
28    ///
29    /// let mut batch = ChangeBatch::<usize>::new();
30    /// assert!(batch.is_empty());
31    ///```
32    pub fn new() -> ChangeBatch<T> {
33        ChangeBatch {
34            updates: Vec::new(),
35            clean: 0,
36        }
37    }
38
39    /// Allocates a new empty `ChangeBatch` with space for `capacity` updates.
40    ///
41    /// # Examples
42    ///
43    ///```
44    /// use timely::progress::ChangeBatch;
45    ///
46    /// let mut batch = ChangeBatch::<usize>::with_capacity(10);
47    /// assert!(batch.is_empty());
48    ///```
49    pub fn with_capacity(capacity: usize) -> ChangeBatch<T> {
50        ChangeBatch {
51            updates: Vec::with_capacity(capacity),
52            clean: 0,
53        }
54    }
55
56    /// Returns true if the change batch is not guaranteed compact.
57    pub fn is_dirty(&self) -> bool {
58        self.updates.len() > self.clean
59    }
60
61    /// Expose the internal vector of updates.
62    pub fn unstable_internal_updates(&self) -> &Vec<(T, i64)> { &self.updates }
63
64    /// Expose the internal value of `clean`.
65    pub fn unstable_internal_clean(&self) -> usize { self.clean }
66
67    /// Clears the map.
68    ///
69    /// # Examples
70    ///
71    ///```
72    /// use timely::progress::ChangeBatch;
73    ///
74    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
75    /// batch.clear();
76    /// assert!(batch.is_empty());
77    ///```
78    #[inline]
79    pub fn clear(&mut self) {
80        self.updates.clear();
81        self.clean = 0;
82    }
83}
84
85impl<T> ChangeBatch<T>
86where
87    T: Ord,
88{
89    
90    /// Allocates a new `ChangeBatch` with a single entry.
91    ///
92    /// # Examples
93    ///
94    ///```
95    /// use timely::progress::ChangeBatch;
96    ///
97    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
98    /// assert!(!batch.is_empty());
99    ///```
100    pub fn new_from(key: T, val: i64) -> ChangeBatch<T> {
101        let mut result = ChangeBatch::new();
102        result.update(key, val);
103        result
104    }
105
106    /// Adds a new update, for `item` with `value`.
107    ///
108    /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
109    /// half the length of the list, which would keep the total footprint within reasonable bounds
110    /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
111    /// is worth paying without some experimentation.
112    ///
113    /// # Examples
114    ///
115    ///```
116    /// use timely::progress::ChangeBatch;
117    ///
118    /// let mut batch = ChangeBatch::<usize>::new();
119    /// batch.update(17, 1);
120    /// assert!(!batch.is_empty());
121    ///```
122    #[inline]
123    pub fn update(&mut self, item: T, value: i64) {
124        self.updates.push((item, value));
125        self.maintain_bounds();
126    }
127
128    /// Performs a sequence of updates described by `iterator`.
129    ///
130    /// # Examples
131    ///
132    ///```
133    /// use timely::progress::ChangeBatch;
134    ///
135    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
136    /// batch.extend(vec![(17, -1)].into_iter());
137    /// assert!(batch.is_empty());
138    ///```
139    #[inline]
140    pub fn extend<I: Iterator<Item=(T, i64)>>(&mut self, iterator: I) {
141        self.updates.extend(iterator);
142        self.maintain_bounds();
143    }
144
145    /// Extracts the `Vec<(T, i64)>` from the map, consuming it.
146    ///
147    /// # Examples
148    ///
149    ///```
150    /// use timely::progress::ChangeBatch;
151    ///
152    /// let batch = ChangeBatch::<usize>::new_from(17, 1);
153    /// assert_eq!(batch.into_inner(), vec![(17, 1)]);
154    ///```
155    pub fn into_inner(mut self) -> Vec<(T, i64)> {
156        self.compact();
157        self.updates
158    }
159
160    /// Iterates over the contents of the map.
161    ///
162    /// # Examples
163    ///
164    ///```
165    /// use timely::progress::ChangeBatch;
166    ///
167    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
168    /// {   // scope allows borrow of `batch` to drop.
169    ///     let mut iter = batch.iter();
170    ///     assert_eq!(iter.next(), Some(&(17, 1)));
171    ///     assert_eq!(iter.next(), None);
172    /// }
173    /// assert!(!batch.is_empty());
174    ///```
175    #[inline]
176    pub fn iter(&mut self) -> ::std::slice::Iter<(T, i64)> {
177        self.compact();
178        self.updates.iter()
179    }
180
181    /// Drains the set of updates.
182    ///
183    /// This operation first compacts the set of updates so that the drained results
184    /// have at most one occurence of each item.
185    ///
186    /// # Examples
187    ///
188    ///```
189    /// use timely::progress::ChangeBatch;
190    ///
191    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
192    /// {   // scope allows borrow of `batch` to drop.
193    ///     let mut iter = batch.drain();
194    ///     assert_eq!(iter.next(), Some((17, 1)));
195    ///     assert_eq!(iter.next(), None);
196    /// }
197    /// assert!(batch.is_empty());
198    ///```
199    #[inline]
200    pub fn drain(&mut self) -> ::std::vec::Drain<(T, i64)> {
201        self.compact();
202        self.clean = 0;
203        self.updates.drain(..)
204    }
205
206    /// True iff all keys have value zero.
207    ///
208    /// This method requires mutable access to `self` because it may need to compact the representation
209    /// to determine if the batch of updates is indeed empty. We could also implement a weaker form of
210    /// `is_empty` which just checks the length of `self.updates`, and which could confirm the absence of
211    /// any updates, but could report false negatives if there are updates which would cancel.
212    ///
213    /// # Examples
214    ///
215    ///```
216    /// use timely::progress::ChangeBatch;
217    ///
218    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
219    /// batch.update(17, -1);
220    /// assert!(batch.is_empty());
221    ///```
222    #[inline]
223    pub fn is_empty(&mut self) -> bool {
224        if self.clean > self.updates.len() / 2 {
225            false
226        }
227        else {
228            self.compact();
229            self.updates.is_empty()
230        }
231    }
232
233    /// Number of compacted updates.
234    ///
235    /// This method requires mutable access to `self` because it may need to compact the
236    /// representation to determine the number of actual updates.
237    ///
238    /// # Examples
239    ///
240    ///```
241    /// use timely::progress::ChangeBatch;
242    ///
243    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
244    /// batch.update(17, -1);
245    /// batch.update(14, -1);
246    /// assert_eq!(batch.len(), 1);
247    ///```
248    #[inline]
249    pub fn len(&mut self) -> usize {
250        self.compact();
251        self.updates.len()
252    }
253
254    /// Drains `self` into `other`.
255    ///
256    /// This method has similar a effect to calling `other.extend(self.drain())`, but has the
257    /// opportunity to optimize this to a `::std::mem::swap(self, other)` when `other` is empty.
258    /// As many uses of this method are to propagate updates, this optimization can be quite
259    /// handy.
260    ///
261    /// # Examples
262    ///
263    ///```
264    /// use timely::progress::ChangeBatch;
265    ///
266    /// let mut batch1 = ChangeBatch::<usize>::new_from(17, 1);
267    /// let mut batch2 = ChangeBatch::new();
268    /// batch1.drain_into(&mut batch2);
269    /// assert!(batch1.is_empty());
270    /// assert!(!batch2.is_empty());
271    ///```
272    #[inline]
273    pub fn drain_into(&mut self, other: &mut ChangeBatch<T>) where T: Clone {
274        if other.updates.is_empty() {
275            ::std::mem::swap(self, other);
276        }
277        else {
278            other.extend(self.updates.drain(..));
279            self.clean = 0;
280        }
281    }
282
283    /// Compact the internal representation.
284    ///
285    /// This method sort `self.updates` and consolidates elements with equal item, discarding
286    /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
287    /// elements is non-zero.
288    #[inline]
289    pub fn compact(&mut self) {
290        if self.clean < self.updates.len() && self.updates.len() > 1 {
291            self.updates.sort_by(|x,y| x.0.cmp(&y.0));
292            for i in 0 .. self.updates.len() - 1 {
293                if self.updates[i].0 == self.updates[i+1].0 {
294                    self.updates[i+1].1 += self.updates[i].1;
295                    self.updates[i].1 = 0;
296                }
297            }
298
299            self.updates.retain(|x| x.1 != 0);
300        }
301        self.clean = self.updates.len();
302    }
303
304    /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
305    /// This function tries to minimize work by only compacting if enough work has accumulated.
306    fn maintain_bounds(&mut self) {
307        // if we have more than 32 elements and at least half of them are not clean, compact
308        if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
309            self.compact()
310        }
311    }
312}
313
314impl<T> Default for ChangeBatch<T> {
315    fn default() -> Self {
316        Self::new()
317    }
318}