1use std::collections::VecDeque;
14use timely::container::{ContainerBuilder, PushInto};
15use crate::Data;
16use crate::difference::Semigroup;
17
18#[inline]
24pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
25 consolidate_from(vec, 0);
26}
27
28#[inline]
34pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
35 let length = consolidate_slice(&mut vec[offset..]);
36 vec.truncate(offset + length);
37}
38
39#[inline]
41pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
42 if slice.len() > 1 {
43 consolidate_slice_slow(slice)
44 }
45 else {
46 slice.iter().filter(|x| !x.1.is_zero()).count()
47 }
48}
49
50fn consolidate_slice_slow<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
52 slice.sort_by(|x,y| x.0.cmp(&y.0));
55
56 let mut offset = 0;
58 let mut accum = slice[offset].1.clone();
59
60 for index in 1 .. slice.len() {
61 if slice[index].0 == slice[index-1].0 {
62 accum.plus_equals(&slice[index].1);
63 }
64 else {
65 if !accum.is_zero() {
66 slice.swap(offset, index-1);
67 slice[offset].1.clone_from(&accum);
68 offset += 1;
69 }
70 accum.clone_from(&slice[index].1);
71 }
72 }
73 if !accum.is_zero() {
74 slice.swap(offset, slice.len()-1);
75 slice[offset].1 = accum;
76 offset += 1;
77 }
78
79 offset
80}
81
82#[inline]
88pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
89 consolidate_updates_from(vec, 0);
90}
91
92#[inline]
98pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
99 let length = consolidate_updates_slice(&mut vec[offset..]);
100 vec.truncate(offset + length);
101}
102
103#[inline]
105pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
106
107 if slice.len() > 1 {
108 consolidate_updates_slice_slow(slice)
109 }
110 else {
111 slice.iter().filter(|x| !x.2.is_zero()).count()
112 }
113}
114
115fn consolidate_updates_slice_slow<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
117 slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
120
121 let mut offset = 0;
123 let mut accum = slice[offset].2.clone();
124
125 for index in 1 .. slice.len() {
126 if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
127 accum.plus_equals(&slice[index].2);
128 }
129 else {
130 if !accum.is_zero() {
131 slice.swap(offset, index-1);
132 slice[offset].2.clone_from(&accum);
133 offset += 1;
134 }
135 accum.clone_from(&slice[index].2);
136 }
137 }
138 if !accum.is_zero() {
139 slice.swap(offset, slice.len()-1);
140 slice[offset].2 = accum;
141 offset += 1;
142 }
143
144 offset
145}
146
147
148#[derive(Default)]
151pub struct ConsolidatingContainerBuilder<C>{
152 current: C,
153 empty: Vec<C>,
154 outbound: VecDeque<C>,
155}
156
157impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
158where
159 D: Data,
160 T: Data,
161 R: Semigroup+'static,
162{
163 #[cold]
166 fn consolidate_and_flush_through(&mut self, multiple: usize) {
167 let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
168 consolidate_updates(&mut self.current);
169 let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
170 while drain.peek().is_some() {
171 let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
172 container.clear();
173 container.extend((&mut drain).take(preferred_capacity));
174 self.outbound.push_back(container);
175 }
176 }
177}
178
179impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
180where
181 D: Data,
182 T: Data,
183 R: Semigroup+'static,
184 Vec<(D, T, R)>: PushInto<P>,
185{
186 #[inline]
190 fn push_into(&mut self, item: P) {
191 let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
192 if self.current.capacity() < preferred_capacity * 2 {
193 self.current.reserve(preferred_capacity * 2 - self.current.capacity());
194 }
195 self.current.push_into(item);
196 if self.current.len() == self.current.capacity() {
197 self.consolidate_and_flush_through(preferred_capacity);
199 }
200 }
201}
202
203impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
204where
205 D: Data,
206 T: Data,
207 R: Semigroup+'static,
208{
209 type Container = Vec<(D,T,R)>;
210
211 #[inline]
212 fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
213 if let Some(container) = self.outbound.pop_front() {
214 self.empty.push(container);
215 self.empty.last_mut()
216 } else {
217 None
218 }
219 }
220
221 #[inline]
222 fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
223 if !self.current.is_empty() {
224 self.consolidate_and_flush_through(1);
226 self.empty.truncate(2);
229 }
230 self.extract()
231 }
232}
233
234pub trait Consolidate {
245 fn len(&self) -> usize;
247 fn clear(&mut self);
249 fn consolidate_into(&mut self, target: &mut Self);
251}
252
253impl<D: Ord, T: Ord, R: Semigroup> Consolidate for Vec<(D, T, R)> {
254 fn len(&self) -> usize { Vec::len(self) }
255 fn clear(&mut self) { Vec::clear(self) }
256 fn consolidate_into(&mut self, target: &mut Self) {
257 consolidate_updates(self);
258 std::mem::swap(self, target);
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265
266 #[test]
267 fn test_consolidate() {
268 let test_cases = vec![
269 (
270 vec![("a", -1), ("b", -2), ("a", 1)],
271 vec![("b", -2)],
272 ),
273 (
274 vec![("a", -1), ("b", 0), ("a", 1)],
275 vec![],
276 ),
277 (
278 vec![("a", 0)],
279 vec![],
280 ),
281 (
282 vec![("a", 0), ("b", 0)],
283 vec![],
284 ),
285 (
286 vec![("a", 1), ("b", 1)],
287 vec![("a", 1), ("b", 1)],
288 ),
289 ];
290
291 for (mut input, output) in test_cases {
292 consolidate(&mut input);
293 assert_eq!(input, output);
294 }
295 }
296
297
298 #[test]
299 fn test_consolidate_updates() {
300 let test_cases = vec![
301 (
302 vec![("a", 1, -1), ("b", 1, -2), ("a", 1, 1)],
303 vec![("b", 1, -2)],
304 ),
305 (
306 vec![("a", 1, -1), ("b", 1, 0), ("a", 1, 1)],
307 vec![],
308 ),
309 (
310 vec![("a", 1, 0)],
311 vec![],
312 ),
313 (
314 vec![("a", 1, 0), ("b", 1, 0)],
315 vec![],
316 ),
317 (
318 vec![("a", 1, 1), ("b", 2, 1)],
319 vec![("a", 1, 1), ("b", 2, 1)],
320 ),
321 ];
322
323 for (mut input, output) in test_cases {
324 consolidate_updates(&mut input);
325 assert_eq!(input, output);
326 }
327 }
328
329 #[test]
330 fn test_consolidating_container_builder() {
331 let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
332 for _ in 0..1024 {
333 ccb.push_into((0, 0, 0));
334 }
335 assert_eq!(ccb.extract(), None);
336 assert_eq!(ccb.finish(), None);
337
338 for i in 0..1024 {
339 ccb.push_into((i, 0, 1));
340 }
341
342 let mut collected = Vec::default();
343 while let Some(container) = ccb.finish() {
344 collected.append(container);
345 }
346 collected.sort();
348 for i in 0..1024 {
349 assert_eq!((i, 0, 1), collected[i]);
350 }
351 }
352
353 #[test]
354 fn test_consolidate_into() {
355 let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
356 let mut target = Vec::default();
357 data.sort();
358 data.consolidate_into(&mut target);
359 assert_eq!(target, [(2, 1, 1)]);
360 }
361
362 #[cfg(not(debug_assertions))]
363 const LEN: usize = 256 << 10;
364 #[cfg(not(debug_assertions))]
365 const REPS: usize = 10 << 10;
366
367 #[cfg(debug_assertions)]
368 const LEN: usize = 256 << 1;
369 #[cfg(debug_assertions)]
370 const REPS: usize = 10 << 1;
371
372 #[test]
373 fn test_consolidator_duration() {
374 let mut data = Vec::with_capacity(LEN);
375 let mut data2 = Vec::with_capacity(LEN);
376 let mut target = Vec::new();
377 let mut duration = std::time::Duration::default();
378 for _ in 0..REPS {
379 data.clear();
380 data2.clear();
381 target.clear();
382 data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
383 data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
384 data.sort_by(|x,y| x.0.cmp(&y.0));
385 let start = std::time::Instant::now();
386 data.consolidate_into(&mut target);
387 duration += start.elapsed();
388
389 consolidate_updates(&mut data2);
390 assert_eq!(target, data2);
391 }
392 println!("elapsed consolidator {duration:?}");
393 }
394
395 #[test]
396 fn test_consolidator_duration_vec() {
397 let mut data = Vec::with_capacity(LEN);
398 let mut duration = std::time::Duration::default();
399 for _ in 0..REPS {
400 data.clear();
401 data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
402 data.sort_by(|x,y| x.0.cmp(&y.0));
403 let start = std::time::Instant::now();
404 consolidate_updates(&mut data);
405 duration += start.elapsed();
406 }
407 println!("elapsed vec {duration:?}");
408 }
409}