timely/progress/change_batch.rs
1//! A collection of updates of the form `(T, i64)`.
2
3/// A collection of updates of the form `(T, i64)`.
4///
5/// A `ChangeBatch` accumulates updates of the form `(T, i64)`, where it is capable of consolidating
6/// the representation and removing elements whose `i64` field accumulates to zero.
7///
8/// The implementation is designed to be as lazy as possible, simply appending to a list of updates
9/// until they are required. This means that several seemingly simple operations may be expensive, in
10/// that they may provoke a compaction. I've tried to prevent exposing methods that allow surprisingly
11/// expensive operations; all operations should take an amortized constant or logarithmic time.
12#[derive(Clone, Debug, Eq, PartialEq, Abomonation, Serialize, Deserialize)]
13pub struct ChangeBatch<T> {
14 // A list of updates to which we append.
15 updates: Vec<(T, i64)>,
16 // The length of the prefix of `self.updates` known to be compact.
17 clean: usize,
18}
19
20impl<T> ChangeBatch<T> {
21
22 /// Allocates a new empty `ChangeBatch`.
23 ///
24 /// # Examples
25 ///
26 ///```
27 /// use timely::progress::ChangeBatch;
28 ///
29 /// let mut batch = ChangeBatch::<usize>::new();
30 /// assert!(batch.is_empty());
31 ///```
32 pub fn new() -> ChangeBatch<T> {
33 ChangeBatch {
34 updates: Vec::new(),
35 clean: 0,
36 }
37 }
38
39 /// Allocates a new empty `ChangeBatch` with space for `capacity` updates.
40 ///
41 /// # Examples
42 ///
43 ///```
44 /// use timely::progress::ChangeBatch;
45 ///
46 /// let mut batch = ChangeBatch::<usize>::with_capacity(10);
47 /// assert!(batch.is_empty());
48 ///```
49 pub fn with_capacity(capacity: usize) -> ChangeBatch<T> {
50 ChangeBatch {
51 updates: Vec::with_capacity(capacity),
52 clean: 0,
53 }
54 }
55
56 /// Returns true if the change batch is not guaranteed compact.
57 pub fn is_dirty(&self) -> bool {
58 self.updates.len() > self.clean
59 }
60
61 /// Expose the internal vector of updates.
62 pub fn unstable_internal_updates(&self) -> &Vec<(T, i64)> { &self.updates }
63
64 /// Expose the internal value of `clean`.
65 pub fn unstable_internal_clean(&self) -> usize { self.clean }
66
67 /// Clears the map.
68 ///
69 /// # Examples
70 ///
71 ///```
72 /// use timely::progress::ChangeBatch;
73 ///
74 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
75 /// batch.clear();
76 /// assert!(batch.is_empty());
77 ///```
78 #[inline]
79 pub fn clear(&mut self) {
80 self.updates.clear();
81 self.clean = 0;
82 }
83}
84
85impl<T> ChangeBatch<T>
86where
87 T: Ord,
88{
89
90 /// Allocates a new `ChangeBatch` with a single entry.
91 ///
92 /// # Examples
93 ///
94 ///```
95 /// use timely::progress::ChangeBatch;
96 ///
97 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
98 /// assert!(!batch.is_empty());
99 ///```
100 pub fn new_from(key: T, val: i64) -> ChangeBatch<T> {
101 let mut result = ChangeBatch::new();
102 result.update(key, val);
103 result
104 }
105
106 /// Adds a new update, for `item` with `value`.
107 ///
108 /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
109 /// half the length of the list, which would keep the total footprint within reasonable bounds
110 /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
111 /// is worth paying without some experimentation.
112 ///
113 /// # Examples
114 ///
115 ///```
116 /// use timely::progress::ChangeBatch;
117 ///
118 /// let mut batch = ChangeBatch::<usize>::new();
119 /// batch.update(17, 1);
120 /// assert!(!batch.is_empty());
121 ///```
122 #[inline]
123 pub fn update(&mut self, item: T, value: i64) {
124 self.updates.push((item, value));
125 self.maintain_bounds();
126 }
127
128 /// Performs a sequence of updates described by `iterator`.
129 ///
130 /// # Examples
131 ///
132 ///```
133 /// use timely::progress::ChangeBatch;
134 ///
135 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
136 /// batch.extend(vec![(17, -1)].into_iter());
137 /// assert!(batch.is_empty());
138 ///```
139 #[inline]
140 pub fn extend<I: Iterator<Item=(T, i64)>>(&mut self, iterator: I) {
141 self.updates.extend(iterator);
142 self.maintain_bounds();
143 }
144
145 /// Extracts the `Vec<(T, i64)>` from the map, consuming it.
146 ///
147 /// # Examples
148 ///
149 ///```
150 /// use timely::progress::ChangeBatch;
151 ///
152 /// let batch = ChangeBatch::<usize>::new_from(17, 1);
153 /// assert_eq!(batch.into_inner(), vec![(17, 1)]);
154 ///```
155 pub fn into_inner(mut self) -> Vec<(T, i64)> {
156 self.compact();
157 self.updates
158 }
159
160 /// Iterates over the contents of the map.
161 ///
162 /// # Examples
163 ///
164 ///```
165 /// use timely::progress::ChangeBatch;
166 ///
167 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
168 /// { // scope allows borrow of `batch` to drop.
169 /// let mut iter = batch.iter();
170 /// assert_eq!(iter.next(), Some(&(17, 1)));
171 /// assert_eq!(iter.next(), None);
172 /// }
173 /// assert!(!batch.is_empty());
174 ///```
175 #[inline]
176 pub fn iter(&mut self) -> ::std::slice::Iter<(T, i64)> {
177 self.compact();
178 self.updates.iter()
179 }
180
181 /// Drains the set of updates.
182 ///
183 /// This operation first compacts the set of updates so that the drained results
184 /// have at most one occurence of each item.
185 ///
186 /// # Examples
187 ///
188 ///```
189 /// use timely::progress::ChangeBatch;
190 ///
191 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
192 /// { // scope allows borrow of `batch` to drop.
193 /// let mut iter = batch.drain();
194 /// assert_eq!(iter.next(), Some((17, 1)));
195 /// assert_eq!(iter.next(), None);
196 /// }
197 /// assert!(batch.is_empty());
198 ///```
199 #[inline]
200 pub fn drain(&mut self) -> ::std::vec::Drain<(T, i64)> {
201 self.compact();
202 self.clean = 0;
203 self.updates.drain(..)
204 }
205
206 /// True iff all keys have value zero.
207 ///
208 /// This method requires mutable access to `self` because it may need to compact the representation
209 /// to determine if the batch of updates is indeed empty. We could also implement a weaker form of
210 /// `is_empty` which just checks the length of `self.updates`, and which could confirm the absence of
211 /// any updates, but could report false negatives if there are updates which would cancel.
212 ///
213 /// # Examples
214 ///
215 ///```
216 /// use timely::progress::ChangeBatch;
217 ///
218 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
219 /// batch.update(17, -1);
220 /// assert!(batch.is_empty());
221 ///```
222 #[inline]
223 pub fn is_empty(&mut self) -> bool {
224 if self.clean > self.updates.len() / 2 {
225 false
226 }
227 else {
228 self.compact();
229 self.updates.is_empty()
230 }
231 }
232
233 /// Number of compacted updates.
234 ///
235 /// This method requires mutable access to `self` because it may need to compact the
236 /// representation to determine the number of actual updates.
237 ///
238 /// # Examples
239 ///
240 ///```
241 /// use timely::progress::ChangeBatch;
242 ///
243 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
244 /// batch.update(17, -1);
245 /// batch.update(14, -1);
246 /// assert_eq!(batch.len(), 1);
247 ///```
248 #[inline]
249 pub fn len(&mut self) -> usize {
250 self.compact();
251 self.updates.len()
252 }
253
254 /// Drains `self` into `other`.
255 ///
256 /// This method has similar a effect to calling `other.extend(self.drain())`, but has the
257 /// opportunity to optimize this to a `::std::mem::swap(self, other)` when `other` is empty.
258 /// As many uses of this method are to propagate updates, this optimization can be quite
259 /// handy.
260 ///
261 /// # Examples
262 ///
263 ///```
264 /// use timely::progress::ChangeBatch;
265 ///
266 /// let mut batch1 = ChangeBatch::<usize>::new_from(17, 1);
267 /// let mut batch2 = ChangeBatch::new();
268 /// batch1.drain_into(&mut batch2);
269 /// assert!(batch1.is_empty());
270 /// assert!(!batch2.is_empty());
271 ///```
272 #[inline]
273 pub fn drain_into(&mut self, other: &mut ChangeBatch<T>) where T: Clone {
274 if other.updates.is_empty() {
275 ::std::mem::swap(self, other);
276 }
277 else {
278 other.extend(self.updates.drain(..));
279 self.clean = 0;
280 }
281 }
282
283 /// Compact the internal representation.
284 ///
285 /// This method sort `self.updates` and consolidates elements with equal item, discarding
286 /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
287 /// elements is non-zero.
288 #[inline]
289 pub fn compact(&mut self) {
290 if self.clean < self.updates.len() && self.updates.len() > 1 {
291 self.updates.sort_by(|x,y| x.0.cmp(&y.0));
292 for i in 0 .. self.updates.len() - 1 {
293 if self.updates[i].0 == self.updates[i+1].0 {
294 self.updates[i+1].1 += self.updates[i].1;
295 self.updates[i].1 = 0;
296 }
297 }
298
299 self.updates.retain(|x| x.1 != 0);
300 }
301 self.clean = self.updates.len();
302 }
303
304 /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
305 /// This function tries to minimize work by only compacting if enough work has accumulated.
306 fn maintain_bounds(&mut self) {
307 // if we have more than 32 elements and at least half of them are not clean, compact
308 if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
309 self.compact()
310 }
311 }
312}
313
314impl<T> Default for ChangeBatch<T> {
315 fn default() -> Self {
316 Self::new()
317 }
318}