1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
/*
 * Copyright 2020 Actyx AG
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
//! Utility for consolidating differential dataflow deltas

use std::collections::btree_map::{self, Iter};
use std::collections::BTreeMap;
use std::ops::AddAssign;

use differential_dataflow::difference::Semigroup;
use differential_dataflow::Data;

/// A collection implementation for consolidating and batching deltas
///
/// Insert incoming pairs of data and multiplicity using the `+=` operator.
/// Turn into an iterator or a vector to further consume the batch.
#[derive(Debug, Default)]
pub struct Coll<D: Data, R: Semigroup>(BTreeMap<D, R>);

impl<D: Data, R: Semigroup> Coll<D, R> {
    pub fn new() -> Coll<D, R> {
        Coll(Default::default())
    }

    pub fn from_map(map: BTreeMap<D, R>) -> Coll<D, R> {
        Coll(map)
    }

    pub fn to_vec<Out: From<D>>(&self) -> Vec<(Out, R)> {
        self.0
            .iter()
            .map(|(d, r)| (Out::from(d.clone()), r.clone()))
            .collect()
    }

    pub fn iter(&self) -> Iter<D, R> {
        self.0.iter()
    }

    pub fn clear(&mut self) {
        self.0.clear();
    }

    pub fn is_empty(&self) -> bool {
        self.0.is_empty()
    }

    pub fn len(&self) -> usize {
        self.0.len()
    }
}

impl<D: Data, R: Semigroup> IntoIterator for Coll<D, R> {
    type Item = (D, R);
    type IntoIter = btree_map::IntoIter<D, R>;
    fn into_iter(self) -> Self::IntoIter {
        self.0.into_iter()
    }
}

impl<'a, D: Data, R: Semigroup> IntoIterator for &'a Coll<D, R> {
    type Item = (&'a D, &'a R);
    type IntoIter = btree_map::Iter<'a, D, R>;
    fn into_iter(self) -> Self::IntoIter {
        self.iter()
    }
}

impl Coll<usize, i64> {
    pub fn new_time() -> Coll<usize, i64> {
        let mut coll = Self::new();
        coll += (0, 1);
        coll
    }
}

impl<D: Data, R: Semigroup> AddAssign<(D, R)> for Coll<D, R> {
    fn add_assign(&mut self, other: (D, R)) {
        let (key, value) = other;
        let v = self
            .0
            .entry(key.clone())
            .and_modify(|x| *x += &value)
            .or_insert(value);
        if v.is_zero() {
            self.0.remove(&key);
        }
    }
}