1use std::collections::VecDeque;
14use columnation::Columnation;
15use timely::container::{ContainerBuilder, PushInto};
16use crate::Data;
17use crate::difference::Semigroup;
18
19#[inline]
25pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
26 consolidate_from(vec, 0);
27}
28
29#[inline]
35pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
36 let length = consolidate_slice(&mut vec[offset..]);
37 vec.truncate(offset + length);
38}
39
40#[inline]
42pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
43 if slice.len() > 1 {
44 consolidate_slice_slow(slice)
45 }
46 else {
47 slice.iter().filter(|x| !x.1.is_zero()).count()
48 }
49}
50
51fn consolidate_slice_slow<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
53 slice.sort_by(|x,y| x.0.cmp(&y.0));
56
57 let mut offset = 0;
59 let mut accum = slice[offset].1.clone();
60
61 for index in 1 .. slice.len() {
62 if slice[index].0 == slice[index-1].0 {
63 accum.plus_equals(&slice[index].1);
64 }
65 else {
66 if !accum.is_zero() {
67 slice.swap(offset, index-1);
68 slice[offset].1.clone_from(&accum);
69 offset += 1;
70 }
71 accum.clone_from(&slice[index].1);
72 }
73 }
74 if !accum.is_zero() {
75 slice.swap(offset, slice.len()-1);
76 slice[offset].1 = accum;
77 offset += 1;
78 }
79
80 offset
81}
82
83#[inline]
89pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
90 consolidate_updates_from(vec, 0);
91}
92
93#[inline]
99pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
100 let length = consolidate_updates_slice(&mut vec[offset..]);
101 vec.truncate(offset + length);
102}
103
104#[inline]
106pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
107
108 if slice.len() > 1 {
109 consolidate_updates_slice_slow(slice)
110 }
111 else {
112 slice.iter().filter(|x| !x.2.is_zero()).count()
113 }
114}
115
116fn consolidate_updates_slice_slow<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
118 slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
121
122 let mut offset = 0;
124 let mut accum = slice[offset].2.clone();
125
126 for index in 1 .. slice.len() {
127 if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
128 accum.plus_equals(&slice[index].2);
129 }
130 else {
131 if !accum.is_zero() {
132 slice.swap(offset, index-1);
133 slice[offset].2.clone_from(&accum);
134 offset += 1;
135 }
136 accum.clone_from(&slice[index].2);
137 }
138 }
139 if !accum.is_zero() {
140 slice.swap(offset, slice.len()-1);
141 slice[offset].2 = accum;
142 offset += 1;
143 }
144
145 offset
146}
147
148
149#[derive(Default)]
152pub struct ConsolidatingContainerBuilder<C>{
153 current: C,
154 empty: Vec<C>,
155 outbound: VecDeque<C>,
156}
157
158impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
159where
160 D: Data,
161 T: Data,
162 R: Semigroup+'static,
163{
164 #[cold]
167 fn consolidate_and_flush_through(&mut self, multiple: usize) {
168 let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
169 consolidate_updates(&mut self.current);
170 let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
171 while drain.peek().is_some() {
172 let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
173 container.clear();
174 container.extend((&mut drain).take(preferred_capacity));
175 self.outbound.push_back(container);
176 }
177 }
178}
179
180impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
181where
182 D: Data,
183 T: Data,
184 R: Semigroup+'static,
185 Vec<(D, T, R)>: PushInto<P>,
186{
187 #[inline]
191 fn push_into(&mut self, item: P) {
192 let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
193 if self.current.capacity() < preferred_capacity * 2 {
194 self.current.reserve(preferred_capacity * 2 - self.current.capacity());
195 }
196 self.current.push_into(item);
197 if self.current.len() == self.current.capacity() {
198 self.consolidate_and_flush_through(preferred_capacity);
200 }
201 }
202}
203
204impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
205where
206 D: Data,
207 T: Data,
208 R: Semigroup+'static,
209{
210 type Container = Vec<(D,T,R)>;
211
212 #[inline]
213 fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
214 if let Some(container) = self.outbound.pop_front() {
215 self.empty.push(container);
216 self.empty.last_mut()
217 } else {
218 None
219 }
220 }
221
222 #[inline]
223 fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
224 if !self.current.is_empty() {
225 self.consolidate_and_flush_through(1);
227 self.empty.truncate(2);
230 }
231 self.extract()
232 }
233}
234
235pub trait Consolidate {
246 fn len(&self) -> usize;
248 fn clear(&mut self);
250 fn consolidate_into(&mut self, target: &mut Self);
252}
253
254impl<D: Ord, T: Ord, R: Semigroup> Consolidate for Vec<(D, T, R)> {
255 fn len(&self) -> usize { Vec::len(self) }
256 fn clear(&mut self) { Vec::clear(self) }
257 fn consolidate_into(&mut self, target: &mut Self) {
258 consolidate_updates(self);
259 std::mem::swap(self, target);
260 }
261}
262
263impl<D: Ord + Columnation, T: Ord + Columnation, R: Semigroup + Columnation> Consolidate for crate::containers::TimelyStack<(D, T, R)> {
264 fn len(&self) -> usize { self[..].len() }
265 fn clear(&mut self) { crate::containers::TimelyStack::clear(self) }
266 fn consolidate_into(&mut self, target: &mut Self) {
267 let len = self[..].len();
268 let mut indices: Vec<usize> = (0..len).collect();
269 indices.sort_unstable_by(|&i, &j| {
270 let (d1, t1, _) = &self[i];
271 let (d2, t2, _) = &self[j];
272 (d1, t1).cmp(&(d2, t2))
273 });
274 target.clear();
275 let mut idx = 0;
276 while idx < indices.len() {
277 let (d, t, r) = &self[indices[idx]];
278 let mut r_owned = r.clone();
279 idx += 1;
280 while idx < indices.len() {
281 let (d2, t2, r2) = &self[indices[idx]];
282 if d == d2 && t == t2 {
283 r_owned.plus_equals(r2);
284 idx += 1;
285 } else { break; }
286 }
287 if !r_owned.is_zero() {
288 target.copy_destructured(d, t, &r_owned);
289 }
290 }
291 self.clear();
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298
299 #[test]
300 fn test_consolidate() {
301 let test_cases = vec![
302 (
303 vec![("a", -1), ("b", -2), ("a", 1)],
304 vec![("b", -2)],
305 ),
306 (
307 vec![("a", -1), ("b", 0), ("a", 1)],
308 vec![],
309 ),
310 (
311 vec![("a", 0)],
312 vec![],
313 ),
314 (
315 vec![("a", 0), ("b", 0)],
316 vec![],
317 ),
318 (
319 vec![("a", 1), ("b", 1)],
320 vec![("a", 1), ("b", 1)],
321 ),
322 ];
323
324 for (mut input, output) in test_cases {
325 consolidate(&mut input);
326 assert_eq!(input, output);
327 }
328 }
329
330
331 #[test]
332 fn test_consolidate_updates() {
333 let test_cases = vec![
334 (
335 vec![("a", 1, -1), ("b", 1, -2), ("a", 1, 1)],
336 vec![("b", 1, -2)],
337 ),
338 (
339 vec![("a", 1, -1), ("b", 1, 0), ("a", 1, 1)],
340 vec![],
341 ),
342 (
343 vec![("a", 1, 0)],
344 vec![],
345 ),
346 (
347 vec![("a", 1, 0), ("b", 1, 0)],
348 vec![],
349 ),
350 (
351 vec![("a", 1, 1), ("b", 2, 1)],
352 vec![("a", 1, 1), ("b", 2, 1)],
353 ),
354 ];
355
356 for (mut input, output) in test_cases {
357 consolidate_updates(&mut input);
358 assert_eq!(input, output);
359 }
360 }
361
362 #[test]
363 fn test_consolidating_container_builder() {
364 let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
365 for _ in 0..1024 {
366 ccb.push_into((0, 0, 0));
367 }
368 assert_eq!(ccb.extract(), None);
369 assert_eq!(ccb.finish(), None);
370
371 for i in 0..1024 {
372 ccb.push_into((i, 0, 1));
373 }
374
375 let mut collected = Vec::default();
376 while let Some(container) = ccb.finish() {
377 collected.append(container);
378 }
379 collected.sort();
381 for i in 0..1024 {
382 assert_eq!((i, 0, 1), collected[i]);
383 }
384 }
385
386 #[test]
387 fn test_consolidate_into() {
388 let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
389 let mut target = Vec::default();
390 data.sort();
391 data.consolidate_into(&mut target);
392 assert_eq!(target, [(2, 1, 1)]);
393 }
394
395 #[cfg(not(debug_assertions))]
396 const LEN: usize = 256 << 10;
397 #[cfg(not(debug_assertions))]
398 const REPS: usize = 10 << 10;
399
400 #[cfg(debug_assertions)]
401 const LEN: usize = 256 << 1;
402 #[cfg(debug_assertions)]
403 const REPS: usize = 10 << 1;
404
405 #[test]
406 fn test_consolidator_duration() {
407 let mut data = Vec::with_capacity(LEN);
408 let mut data2 = Vec::with_capacity(LEN);
409 let mut target = Vec::new();
410 let mut duration = std::time::Duration::default();
411 for _ in 0..REPS {
412 data.clear();
413 data2.clear();
414 target.clear();
415 data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
416 data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
417 data.sort_by(|x,y| x.0.cmp(&y.0));
418 let start = std::time::Instant::now();
419 data.consolidate_into(&mut target);
420 duration += start.elapsed();
421
422 consolidate_updates(&mut data2);
423 assert_eq!(target, data2);
424 }
425 println!("elapsed consolidator {duration:?}");
426 }
427
428 #[test]
429 fn test_consolidator_duration_vec() {
430 let mut data = Vec::with_capacity(LEN);
431 let mut duration = std::time::Duration::default();
432 for _ in 0..REPS {
433 data.clear();
434 data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
435 data.sort_by(|x,y| x.0.cmp(&y.0));
436 let start = std::time::Instant::now();
437 consolidate_updates(&mut data);
438 duration += start.elapsed();
439 }
440 println!("elapsed vec {duration:?}");
441 }
442}