use columnar::{Columnar, Container, ContainerOf, Vecs, Borrow, Index, IndexAs, Len, Push};
use columnar::primitive::offsets::Strides;
use crate::difference::{Semigroup, IsZero};
use super::layout::ColumnarUpdate as Update;
pub type Lists<C> = Vecs<C, Strides>;
pub fn retain_items<'a, C: Container>(lists: <Lists<C> as Borrow>::Borrowed<'a>, keep: &[bool]) -> (Lists<C>, Vec<bool>) {
let mut output = <Lists::<C> as Container>::with_capacity_for([lists].into_iter());
let mut bitmap = Vec::with_capacity(lists.len());
assert_eq!(keep.len(), lists.values.len());
for list_index in 0 .. lists.len() {
let (lower, upper) = lists.bounds.bounds(list_index);
for item_index in lower .. upper {
if keep[item_index] {
output.values.push(lists.values.get(item_index));
}
}
if output.values.len() > columnar::Index::last(&output.bounds.borrow()).unwrap_or(0) as usize {
output.bounds.push(output.values.len() as u64);
bitmap.push(true);
}
else { bitmap.push(false); }
}
assert_eq!(bitmap.len(), lists.len());
(output, bitmap)
}
pub struct UpdatesTyped<U: Update> {
pub keys: Lists<ContainerOf<U::Key>>,
pub vals: Lists<ContainerOf<U::Val>>,
pub times: Lists<ContainerOf<U::Time>>,
pub diffs: Lists<ContainerOf<U::Diff>>,
}
impl<U: Update> Default for UpdatesTyped<U> {
fn default() -> Self {
Self {
keys: Default::default(),
vals: Default::default(),
times: Default::default(),
diffs: Default::default(),
}
}
}
impl<U: Update> std::fmt::Debug for UpdatesTyped<U> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UpdatesTyped").finish()
}
}
impl<U: Update> Clone for UpdatesTyped<U> {
fn clone(&self) -> Self {
Self {
keys: self.keys.clone(),
vals: self.vals.clone(),
times: self.times.clone(),
diffs: self.diffs.clone(),
}
}
}
pub struct UpdatesView<'a, U: Update> {
pub keys: <Lists<ContainerOf<U::Key>> as Borrow>::Borrowed<'a>,
pub vals: <Lists<ContainerOf<U::Val>> as Borrow>::Borrowed<'a>,
pub times: <Lists<ContainerOf<U::Time>> as Borrow>::Borrowed<'a>,
pub diffs: <Lists<ContainerOf<U::Diff>> as Borrow>::Borrowed<'a>,
}
impl<'a, U: Update> Copy for UpdatesView<'a, U> {}
impl<'a, U: Update> Clone for UpdatesView<'a, U> { fn clone(&self) -> Self { *self } }
impl<'a, U: Update> UpdatesView<'a, U> {
pub fn iter(self) -> impl Iterator<Item = (
columnar::Ref<'a, U::Key>,
columnar::Ref<'a, U::Val>,
columnar::Ref<'a, U::Time>,
columnar::Ref<'a, U::Diff>,
)> {
let UpdatesView { keys, vals, times, diffs } = self;
(0..Len::len(&keys))
.flat_map(move |outer| child_range(keys.bounds, outer))
.flat_map(move |k| {
let key = keys.values.get(k);
child_range(vals.bounds, k).map(move |v| (key, v))
})
.flat_map(move |(key, v)| {
let val = vals.values.get(v);
child_range(times.bounds, v).map(move |t| (key, val, t))
})
.flat_map(move |(key, val, t)| {
let time = times.values.get(t);
child_range(diffs.bounds, t).map(move |d| (key, val, time, diffs.values.get(d)))
})
}
pub fn vals_bounds(self, key_range: std::ops::Range<usize>) -> std::ops::Range<usize> {
if !key_range.is_empty() {
let bounds = self.vals.bounds;
let lower = if key_range.start == 0 { 0 } else { bounds.index_as(key_range.start - 1) as usize };
let upper = bounds.index_as(key_range.end - 1) as usize;
lower..upper
} else { key_range }
}
pub fn times_bounds(self, val_range: std::ops::Range<usize>) -> std::ops::Range<usize> {
if !val_range.is_empty() {
let bounds = self.times.bounds;
let lower = if val_range.start == 0 { 0 } else { bounds.index_as(val_range.start - 1) as usize };
let upper = bounds.index_as(val_range.end - 1) as usize;
lower..upper
} else { val_range }
}
}
impl<U: Update> UpdatesTyped<U> {
pub fn view(&self) -> UpdatesView<'_, U> {
UpdatesView {
keys: self.keys.borrow(),
vals: self.vals.borrow(),
times: self.times.borrow(),
diffs: self.diffs.borrow(),
}
}
}
pub struct Updates<U: Update, B = timely::bytes::arc::Bytes> {
pub keys: columnar::bytes::stash::Stash<Lists<ContainerOf<U::Key>>, B>,
pub vals: columnar::bytes::stash::Stash<Lists<ContainerOf<U::Val>>, B>,
pub times: columnar::bytes::stash::Stash<Lists<ContainerOf<U::Time>>, B>,
pub diffs: columnar::bytes::stash::Stash<Lists<ContainerOf<U::Diff>>, B>,
}
impl<U: Update, B> Default for Updates<U, B> {
fn default() -> Self {
Self {
keys: Default::default(),
vals: Default::default(),
times: Default::default(),
diffs: Default::default(),
}
}
}
impl<U: Update, B: Clone> Clone for Updates<U, B> {
fn clone(&self) -> Self {
Self {
keys: self.keys.clone(),
vals: self.vals.clone(),
times: self.times.clone(),
diffs: self.diffs.clone(),
}
}
}
impl<U: Update, B> From<UpdatesTyped<U>> for Updates<U, B> {
fn from(owned: UpdatesTyped<U>) -> Self {
use columnar::bytes::stash::Stash;
Self {
keys: Stash::Typed(owned.keys),
vals: Stash::Typed(owned.vals),
times: Stash::Typed(owned.times),
diffs: Stash::Typed(owned.diffs),
}
}
}
impl<U: Update, B: std::ops::Deref<Target = [u8]> + Clone + 'static> Updates<U, B> {
pub fn view(&self) -> UpdatesView<'_, U> {
UpdatesView {
keys: self.keys.borrow(),
vals: self.vals.borrow(),
times: self.times.borrow(),
diffs: self.diffs.borrow(),
}
}
pub fn len(&self) -> usize {
self.view().diffs.values.len()
}
pub fn is_empty(&self) -> bool { self.len() == 0 }
pub fn into_typed(mut self) -> UpdatesTyped<U> {
use columnar::bytes::stash::Stash;
self.keys.make_typed();
self.vals.make_typed();
self.times.make_typed();
self.diffs.make_typed();
let Stash::Typed(keys) = self.keys else { unreachable!() };
let Stash::Typed(vals) = self.vals else { unreachable!() };
let Stash::Typed(times) = self.times else { unreachable!() };
let Stash::Typed(diffs) = self.diffs else { unreachable!() };
UpdatesTyped { keys, vals, times, diffs }
}
}
pub type Tuple<U> = (<U as Update>::Key, <U as Update>::Val, <U as Update>::Time, <U as Update>::Diff);
#[inline]
pub fn child_range<B: IndexAs<u64>>(bounds: B, i: usize) -> std::ops::Range<usize> {
let lower = if i == 0 { 0 } else { bounds.index_as(i - 1) as usize };
let upper = bounds.index_as(i) as usize;
lower..upper
}
pub struct Consolidating<I: Iterator, D> {
iter: std::iter::Peekable<I>,
diff: D,
}
impl<K, V, T, D, I> Consolidating<I, D>
where
K: Copy + Eq,
V: Copy + Eq,
T: Copy + Eq,
D: Semigroup + IsZero + Default,
I: Iterator<Item = (K, V, T, D)>,
{
pub fn new(iter: I) -> Self {
Self { iter: iter.peekable(), diff: D::default() }
}
}
impl<K, V, T, D, I> Iterator for Consolidating<I, D>
where
K: Copy + Eq,
V: Copy + Eq,
T: Copy + Eq,
D: Semigroup + IsZero + Default + Clone,
I: Iterator<Item = (K, V, T, D)>,
{
type Item = (K, V, T, D);
fn next(&mut self) -> Option<Self::Item> {
loop {
let (k, v, t, d) = self.iter.next()?;
self.diff = d;
while let Some(&(k2, v2, t2, _)) = self.iter.peek() {
if k2 == k && v2 == v && t2 == t {
let (_, _, _, d2) = self.iter.next().unwrap();
self.diff.plus_equals(&d2);
} else {
break;
}
}
if !self.diff.is_zero() {
return Some((k, v, t, self.diff.clone()));
}
}
}
}
impl<U: Update> UpdatesTyped<U> {
pub fn extend_from_keys(&mut self, other: UpdatesView<'_, U>, key_range: std::ops::Range<usize>) {
self.keys.values.extend_from_self(other.keys.values, key_range.clone());
self.vals.extend_from_self(other.vals, key_range.clone());
let val_range = other.vals_bounds(key_range);
self.times.extend_from_self(other.times, val_range.clone());
let time_range = other.times_bounds(val_range);
self.diffs.extend_from_self(other.diffs, time_range);
}
pub fn form_unsorted<'a>(unsorted: impl Iterator<Item = columnar::Ref<'a, Tuple<U>>>) -> Self {
let mut data = unsorted.collect::<Vec<_>>();
data.sort();
Self::form(data.into_iter())
}
pub fn form<'a>(sorted: impl Iterator<Item = columnar::Ref<'a, Tuple<U>>>) -> Self {
let consolidated = Consolidating::new(
sorted.map(|(k, v, t, d)| (k, v, t, <U::Diff as Columnar>::into_owned(d)))
);
let mut output = Self::default();
let mut updates = consolidated;
if let Some((key, val, time, diff)) = updates.next() {
let mut prev = (key, val, time);
output.keys.values.push(key);
output.vals.values.push(val);
output.times.values.push(time);
output.diffs.values.push(&diff);
output.diffs.bounds.push(output.diffs.values.len() as u64);
for (key, val, time, diff) in updates {
if key != prev.0 {
output.vals.bounds.push(output.vals.values.len() as u64);
output.times.bounds.push(output.times.values.len() as u64);
output.keys.values.push(key);
output.vals.values.push(val);
}
else if val != prev.1 {
output.times.bounds.push(output.times.values.len() as u64);
output.vals.values.push(val);
}
else {
assert!(time != prev.2);
}
output.times.values.push(time);
output.diffs.values.push(&diff);
output.diffs.bounds.push(output.diffs.values.len() as u64);
prev = (key, val, time);
}
output.keys.bounds.push(output.keys.values.len() as u64);
output.vals.bounds.push(output.vals.values.len() as u64);
output.times.bounds.push(output.times.values.len() as u64);
}
output
}
pub fn consolidate(self) -> Self { Self::form_unsorted(self.iter()) }
pub fn filter_zero(self) -> Self {
if self.diffs.bounds.strided() == Some(1) { self }
else {
let mut keep = Vec::with_capacity(self.times.values.len());
for index in 0 .. self.times.values.len() {
keep.push({
let (lower, upper) = self.diffs.bounds.bounds(index);
lower < upper
});
}
let (times, keep) = retain_items(self.times.borrow(), &keep[..]);
let (vals, keep) = retain_items(self.vals.borrow(), &keep[..]);
let (keys, _keep) = retain_items(self.keys.borrow(), &keep[..]);
UpdatesTyped {
keys,
vals,
times,
diffs: Lists {
bounds: Strides::new(1, self.diffs.values.len() as u64),
values: self.diffs.values,
},
}
}
}
pub fn len(&self) -> usize { self.diffs.values.len() }
}
impl<KP, VP, TP, DP, U: Update> Push<(KP, VP, TP, DP)> for UpdatesTyped<U>
where
ContainerOf<U::Key>: Push<KP>,
ContainerOf<U::Val>: Push<VP>,
ContainerOf<U::Time>: Push<TP>,
ContainerOf<U::Diff>: Push<DP>,
{
fn push(&mut self, (key, val, time, diff): (KP, VP, TP, DP)) {
self.keys.values.push(key);
self.keys.bounds.push(self.keys.values.len() as u64);
self.vals.values.push(val);
self.vals.bounds.push(self.vals.values.len() as u64);
self.times.values.push(time);
self.times.bounds.push(self.times.values.len() as u64);
self.diffs.values.push(diff);
self.diffs.bounds.push(self.diffs.values.len() as u64);
}
}
impl<U: Update> timely::container::PushInto<((U::Key, U::Val), U::Time, U::Diff)> for UpdatesTyped<U> {
fn push_into(&mut self, ((key, val), time, diff): ((U::Key, U::Val), U::Time, U::Diff)) {
self.push((&key, &val, &time, &diff));
}
}
impl<U: Update> UpdatesTyped<U> {
pub fn iter(&self) -> impl Iterator<Item = (
columnar::Ref<'_, U::Key>,
columnar::Ref<'_, U::Val>,
columnar::Ref<'_, U::Time>,
columnar::Ref<'_, U::Diff>,
)> {
self.view().iter()
}
}
impl<U: Update> timely::Accountable for UpdatesTyped<U> {
#[inline] fn record_count(&self) -> i64 { Len::len(&self.diffs.values) as i64 }
}
impl<U: Update> timely::dataflow::channels::ContainerBytes for UpdatesTyped<U> {
fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() }
fn length_in_bytes(&self) -> usize { unimplemented!() }
fn into_bytes<W: std::io::Write>(&self, _writer: &mut W) { unimplemented!() }
}
pub struct UpdatesBuilder<U: Update> {
updates: UpdatesTyped<U>,
}
impl<U: Update> UpdatesBuilder<U> {
pub fn new_from(mut updates: UpdatesTyped<U>) -> Self {
use columnar::Len;
if Len::len(&updates.keys.values) > 0 {
updates.keys.bounds.pop();
updates.vals.bounds.pop();
updates.times.bounds.pop();
}
Self { updates }
}
pub fn meld(&mut self, chunk: &UpdatesTyped<U>) {
use columnar::{Borrow, Index, Len};
if chunk.len() == 0 { return; }
if Len::len(&self.updates.keys.values) == 0 {
self.updates = chunk.clone();
self.updates.keys.bounds.pop();
self.updates.vals.bounds.pop();
self.updates.times.bounds.pop();
return;
}
let keys_match = {
let skb = self.updates.keys.values.borrow();
let ckb = chunk.keys.values.borrow();
skb.get(Len::len(&skb) - 1) == ckb.get(0)
};
let vals_match = keys_match && {
let svb = self.updates.vals.values.borrow();
let cvb = chunk.vals.values.borrow();
svb.get(Len::len(&svb) - 1) == cvb.get(0)
};
let chunk_num_keys = Len::len(&chunk.keys.values);
let chunk_num_vals = Len::len(&chunk.vals.values);
let chunk_num_times = Len::len(&chunk.times.values);
let first_key_vals = child_range(chunk.vals.borrow().bounds, 0);
let first_val_times = child_range(chunk.times.borrow().bounds, 0);
let mut differ = false;
if keys_match {
if chunk_num_keys > 1 {
self.updates.keys.values.extend_from_self(chunk.keys.values.borrow(), 1..chunk_num_keys);
}
} else {
self.updates.keys.values.extend_from_self(chunk.keys.values.borrow(), 0..chunk_num_keys);
differ = true;
}
if differ {
self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64);
self.updates.vals.extend_from_self(chunk.vals.borrow(), 0..chunk_num_keys);
self.updates.vals.bounds.pop();
} else {
if vals_match {
if first_key_vals.len() > 1 {
self.updates.vals.values.extend_from_self(
chunk.vals.values.borrow(),
(first_key_vals.start + 1)..first_key_vals.end,
);
}
} else {
self.updates.vals.values.extend_from_self(
chunk.vals.values.borrow(),
first_key_vals.clone(),
);
differ = true;
}
if chunk_num_keys > 1 {
self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64);
self.updates.vals.extend_from_self(chunk.vals.borrow(), 1..chunk_num_keys);
self.updates.vals.bounds.pop();
}
}
if differ {
self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64);
self.updates.times.extend_from_self(chunk.times.borrow(), 0..chunk_num_vals);
self.updates.times.bounds.pop();
} else {
debug_assert!({
let stb = self.updates.times.values.borrow();
let ctb = chunk.times.values.borrow();
stb.get(Len::len(&stb) - 1) != ctb.get(0)
}, "meld: duplicate time within same (key, val)");
self.updates.times.values.extend_from_self(
chunk.times.values.borrow(),
first_val_times.clone(),
);
differ = true;
if chunk_num_vals > 1 {
self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64);
self.updates.times.extend_from_self(chunk.times.borrow(), 1..chunk_num_vals);
self.updates.times.bounds.pop();
}
}
debug_assert!(differ);
self.updates.diffs.extend_from_self(chunk.diffs.borrow(), 0..chunk_num_times);
}
pub fn done(mut self) -> UpdatesTyped<U> {
use columnar::Len;
if Len::len(&self.updates.keys.values) > 0 {
self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64);
self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64);
self.updates.keys.bounds.push(Len::len(&self.updates.keys.values) as u64);
}
self.updates
}
}
#[cfg(test)]
mod tests {
use super::*;
use columnar::Push;
type TestUpdate = (u64, u64, u64, i64);
fn collect(updates: &UpdatesTyped<TestUpdate>) -> Vec<(u64, u64, u64, i64)> {
updates.iter().map(|(k, v, t, d)| (*k, *v, *t, *d)).collect()
}
#[test]
fn test_push_and_consolidate_basic() {
let mut updates = UpdatesTyped::<TestUpdate>::default();
updates.push((&1, &10, &100, &1));
updates.push((&1, &10, &100, &2));
updates.push((&2, &20, &200, &5));
assert_eq!(updates.len(), 3);
assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 3), (2, 20, 200, 5)]);
}
#[test]
fn test_cancellation() {
let mut updates = UpdatesTyped::<TestUpdate>::default();
updates.push((&1, &10, &100, &3));
updates.push((&1, &10, &100, &-3));
updates.push((&2, &20, &200, &1));
assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 1)]);
}
#[test]
fn test_multiple_vals_and_times() {
let mut updates = UpdatesTyped::<TestUpdate>::default();
updates.push((&1, &10, &100, &1));
updates.push((&1, &10, &200, &2));
updates.push((&1, &20, &100, &3));
updates.push((&1, &20, &100, &4));
assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 200, 2), (1, 20, 100, 7)]);
}
#[test]
fn test_val_cancellation_propagates() {
let mut updates = UpdatesTyped::<TestUpdate>::default();
updates.push((&1, &10, &100, &5));
updates.push((&1, &10, &100, &-5));
updates.push((&1, &20, &100, &1));
assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 1)]);
}
#[test]
fn test_empty() {
let updates = UpdatesTyped::<TestUpdate>::default();
assert_eq!(collect(&updates.consolidate()), vec![]);
}
#[test]
fn test_total_cancellation() {
let mut updates = UpdatesTyped::<TestUpdate>::default();
updates.push((&1, &10, &100, &1));
updates.push((&1, &10, &100, &-1));
assert_eq!(collect(&updates.consolidate()), vec![]);
}
#[test]
fn test_unsorted_input() {
let mut updates = UpdatesTyped::<TestUpdate>::default();
updates.push((&3, &30, &300, &1));
updates.push((&1, &10, &100, &2));
updates.push((&2, &20, &200, &3));
assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 2), (2, 20, 200, 3), (3, 30, 300, 1)]);
}
#[test]
fn test_first_key_cancels() {
let mut updates = UpdatesTyped::<TestUpdate>::default();
updates.push((&1, &10, &100, &5));
updates.push((&1, &10, &100, &-5));
updates.push((&2, &20, &200, &3));
assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 3)]);
}
#[test]
fn test_middle_time_cancels() {
let mut updates = UpdatesTyped::<TestUpdate>::default();
updates.push((&1, &10, &100, &1));
updates.push((&1, &10, &200, &2));
updates.push((&1, &10, &200, &-2));
updates.push((&1, &10, &300, &3));
assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 300, 3)]);
}
#[test]
fn test_first_val_cancels() {
let mut updates = UpdatesTyped::<TestUpdate>::default();
updates.push((&1, &10, &100, &1));
updates.push((&1, &10, &100, &-1));
updates.push((&1, &20, &100, &5));
assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 5)]);
}
#[test]
fn test_interleaved_cancellations() {
let mut updates = UpdatesTyped::<TestUpdate>::default();
updates.push((&1, &10, &100, &1));
updates.push((&1, &10, &100, &-1));
updates.push((&2, &20, &200, &7));
updates.push((&3, &30, &300, &4));
updates.push((&3, &30, &300, &-4));
assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 7)]);
}
}