actyxos_data_flow 0.1.0

Tools for exporting live data from ActyxOS to external systems
Documentation
/*
 * Copyright 2020 Actyx AG
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
//! Utility for consolidating differential dataflow deltas

use std::collections::btree_map::{self, Iter};
use std::collections::BTreeMap;
use std::ops::AddAssign;

use differential_dataflow::difference::Semigroup;
use differential_dataflow::Data;

/// A collection implementation for consolidating and batching deltas
///
/// Insert incoming pairs of data and multiplicity using the `+=` operator.
/// Turn into an iterator or a vector to further consume the batch.
#[derive(Debug, Default)]
pub struct Coll<D: Data, R: Semigroup>(BTreeMap<D, R>);

impl<D: Data, R: Semigroup> Coll<D, R> {
    pub fn new() -> Coll<D, R> {
        Coll(Default::default())
    }

    pub fn from_map(map: BTreeMap<D, R>) -> Coll<D, R> {
        Coll(map)
    }

    pub fn to_vec<Out: From<D>>(&self) -> Vec<(Out, R)> {
        self.0
            .iter()
            .map(|(d, r)| (Out::from(d.clone()), r.clone()))
            .collect()
    }

    pub fn iter(&self) -> Iter<D, R> {
        self.0.iter()
    }

    pub fn clear(&mut self) {
        self.0.clear();
    }

    pub fn is_empty(&self) -> bool {
        self.0.is_empty()
    }

    pub fn len(&self) -> usize {
        self.0.len()
    }
}

impl<D: Data, R: Semigroup> IntoIterator for Coll<D, R> {
    type Item = (D, R);
    type IntoIter = btree_map::IntoIter<D, R>;
    fn into_iter(self) -> Self::IntoIter {
        self.0.into_iter()
    }
}

impl<'a, D: Data, R: Semigroup> IntoIterator for &'a Coll<D, R> {
    type Item = (&'a D, &'a R);
    type IntoIter = btree_map::Iter<'a, D, R>;
    fn into_iter(self) -> Self::IntoIter {
        self.iter()
    }
}

impl Coll<usize, i64> {
    pub fn new_time() -> Coll<usize, i64> {
        let mut coll = Self::new();
        coll += (0, 1);
        coll
    }
}

impl<D: Data, R: Semigroup> AddAssign<(D, R)> for Coll<D, R> {
    fn add_assign(&mut self, other: (D, R)) {
        let (key, value) = other;
        let v = self
            .0
            .entry(key.clone())
            .and_modify(|x| *x += &value)
            .or_insert(value);
        if v.is_zero() {
            self.0.remove(&key);
        }
    }
}