use std::collections::VecDeque;
use timely::container::{ContainerBuilder, PushInto};
use crate::Data;
use crate::difference::Semigroup;
#[inline]
pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
consolidate_from(vec, 0);
}
#[inline]
pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
let length = consolidate_slice(&mut vec[offset..]);
vec.truncate(offset + length);
}
#[inline]
pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
if slice.len() > 1 {
consolidate_slice_slow(slice)
}
else {
slice.iter().filter(|x| !x.1.is_zero()).count()
}
}
fn consolidate_slice_slow<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
slice.sort_by(|x,y| x.0.cmp(&y.0));
let mut offset = 0;
let mut accum = slice[offset].1.clone();
for index in 1 .. slice.len() {
if slice[index].0 == slice[index-1].0 {
accum.plus_equals(&slice[index].1);
}
else {
if !accum.is_zero() {
slice.swap(offset, index-1);
slice[offset].1.clone_from(&accum);
offset += 1;
}
accum.clone_from(&slice[index].1);
}
}
if !accum.is_zero() {
slice.swap(offset, slice.len()-1);
slice[offset].1 = accum;
offset += 1;
}
offset
}
#[inline]
pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
consolidate_updates_from(vec, 0);
}
#[inline]
pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
let length = consolidate_updates_slice(&mut vec[offset..]);
vec.truncate(offset + length);
}
#[inline]
pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
if slice.len() > 1 {
consolidate_updates_slice_slow(slice)
}
else {
slice.iter().filter(|x| !x.2.is_zero()).count()
}
}
fn consolidate_updates_slice_slow<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
let mut offset = 0;
let mut accum = slice[offset].2.clone();
for index in 1 .. slice.len() {
if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
accum.plus_equals(&slice[index].2);
}
else {
if !accum.is_zero() {
slice.swap(offset, index-1);
slice[offset].2.clone_from(&accum);
offset += 1;
}
accum.clone_from(&slice[index].2);
}
}
if !accum.is_zero() {
slice.swap(offset, slice.len()-1);
slice[offset].2 = accum;
offset += 1;
}
offset
}
#[derive(Default)]
pub struct ConsolidatingContainerBuilder<C>{
current: C,
empty: Vec<C>,
outbound: VecDeque<C>,
}
impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
where
D: Data,
T: Data,
R: Semigroup+'static,
{
#[cold]
fn consolidate_and_flush_through(&mut self, multiple: usize) {
let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
consolidate_updates(&mut self.current);
let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
while drain.peek().is_some() {
let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
container.clear();
container.extend((&mut drain).take(preferred_capacity));
self.outbound.push_back(container);
}
}
}
impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
where
D: Data,
T: Data,
R: Semigroup+'static,
Vec<(D, T, R)>: PushInto<P>,
{
#[inline]
fn push_into(&mut self, item: P) {
let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
if self.current.capacity() < preferred_capacity * 2 {
self.current.reserve(preferred_capacity * 2 - self.current.capacity());
}
self.current.push_into(item);
if self.current.len() == self.current.capacity() {
self.consolidate_and_flush_through(preferred_capacity);
}
}
}
impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
where
D: Data,
T: Data,
R: Semigroup+'static,
{
type Container = Vec<(D,T,R)>;
#[inline]
fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
if let Some(container) = self.outbound.pop_front() {
self.empty.push(container);
self.empty.last_mut()
} else {
None
}
}
#[inline]
fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
if !self.current.is_empty() {
self.consolidate_and_flush_through(1);
self.empty.truncate(2);
}
self.extract()
}
}
pub trait Consolidate {
fn len(&self) -> usize;
fn clear(&mut self);
fn consolidate_into(&mut self, target: &mut Self);
}
impl<D: Ord, T: Ord, R: Semigroup> Consolidate for Vec<(D, T, R)> {
fn len(&self) -> usize { Vec::len(self) }
fn clear(&mut self) { Vec::clear(self) }
fn consolidate_into(&mut self, target: &mut Self) {
consolidate_updates(self);
std::mem::swap(self, target);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_consolidate() {
let test_cases = vec![
(
vec![("a", -1), ("b", -2), ("a", 1)],
vec![("b", -2)],
),
(
vec![("a", -1), ("b", 0), ("a", 1)],
vec![],
),
(
vec![("a", 0)],
vec![],
),
(
vec![("a", 0), ("b", 0)],
vec![],
),
(
vec![("a", 1), ("b", 1)],
vec![("a", 1), ("b", 1)],
),
];
for (mut input, output) in test_cases {
consolidate(&mut input);
assert_eq!(input, output);
}
}
#[test]
fn test_consolidate_updates() {
let test_cases = vec![
(
vec![("a", 1, -1), ("b", 1, -2), ("a", 1, 1)],
vec![("b", 1, -2)],
),
(
vec![("a", 1, -1), ("b", 1, 0), ("a", 1, 1)],
vec![],
),
(
vec![("a", 1, 0)],
vec![],
),
(
vec![("a", 1, 0), ("b", 1, 0)],
vec![],
),
(
vec![("a", 1, 1), ("b", 2, 1)],
vec![("a", 1, 1), ("b", 2, 1)],
),
];
for (mut input, output) in test_cases {
consolidate_updates(&mut input);
assert_eq!(input, output);
}
}
#[test]
fn test_consolidating_container_builder() {
let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
for _ in 0..1024 {
ccb.push_into((0, 0, 0));
}
assert_eq!(ccb.extract(), None);
assert_eq!(ccb.finish(), None);
for i in 0..1024 {
ccb.push_into((i, 0, 1));
}
let mut collected = Vec::default();
while let Some(container) = ccb.finish() {
collected.append(container);
}
collected.sort();
for i in 0..1024 {
assert_eq!((i, 0, 1), collected[i]);
}
}
#[test]
fn test_consolidate_into() {
let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
let mut target = Vec::default();
data.sort();
data.consolidate_into(&mut target);
assert_eq!(target, [(2, 1, 1)]);
}
#[cfg(not(debug_assertions))]
const LEN: usize = 256 << 10;
#[cfg(not(debug_assertions))]
const REPS: usize = 10 << 10;
#[cfg(debug_assertions)]
const LEN: usize = 256 << 1;
#[cfg(debug_assertions)]
const REPS: usize = 10 << 1;
#[test]
fn test_consolidator_duration() {
let mut data = Vec::with_capacity(LEN);
let mut data2 = Vec::with_capacity(LEN);
let mut target = Vec::new();
let mut duration = std::time::Duration::default();
for _ in 0..REPS {
data.clear();
data2.clear();
target.clear();
data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data.sort_by(|x,y| x.0.cmp(&y.0));
let start = std::time::Instant::now();
data.consolidate_into(&mut target);
duration += start.elapsed();
consolidate_updates(&mut data2);
assert_eq!(target, data2);
}
println!("elapsed consolidator {duration:?}");
}
#[test]
fn test_consolidator_duration_vec() {
let mut data = Vec::with_capacity(LEN);
let mut duration = std::time::Duration::default();
for _ in 0..REPS {
data.clear();
data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data.sort_by(|x,y| x.0.cmp(&y.0));
let start = std::time::Instant::now();
consolidate_updates(&mut data);
duration += start.elapsed();
}
println!("elapsed vec {duration:?}");
}
}