pipebase 0.2.0

A tokio based runtime library for data integration app
Documentation
use super::{AggregateAs, GroupAs};
use crate::common::Convert;
use std::{
    cmp::{Ord, Ordering, PartialOrd},
    fmt::Debug,
    hash::Hash,
};

pub trait LeftRight {
    type L;
    type R;
    fn left(&self) -> &Self::L;
    fn right(&self) -> &Self::R;
    fn into_tuple(self) -> (Self::L, Self::R);
}

impl<L, R> LeftRight for (L, R) {
    type L = L;
    type R = R;

    fn left(&self) -> &Self::L {
        &self.0
    }

    fn right(&self) -> &Self::R {
        &self.1
    }

    fn into_tuple(self) -> (L, R) {
        self
    }
}

#[derive(Debug, Clone, Eq)]
pub struct Pair<L, R>(pub L, pub R);

impl<L, R> Pair<L, R> {
    pub fn new(k: L, v: R) -> Self {
        Pair(k, v)
    }
}

impl<L, R> LeftRight for Pair<L, R> {
    type L = L;
    type R = R;
    fn left(&self) -> &L {
        &self.0
    }

    fn right(&self) -> &R {
        &self.1
    }

    fn into_tuple(self) -> (Self::L, Self::R) {
        (self.0, self.1)
    }
}

impl<L, R> From<(L, R)> for Pair<L, R> {
    fn from(t: (L, R)) -> Self {
        Pair::new(t.0, t.1)
    }
}

impl<L, R, P> Convert<P> for Pair<L, R>
where
    P: LeftRight<L = L, R = R>,
    L: Clone,
    R: Clone,
{
    fn convert(rhs: P) -> Self {
        let (l, r) = rhs.into_tuple();
        Pair::new(l, r)
    }
}

impl<L, R> Ord for Pair<L, R>
where
    L: Eq,
    R: Ord,
{
    fn cmp(&self, other: &Self) -> Ordering {
        self.1.cmp(&other.1)
    }
}

impl<L, R> PartialOrd for Pair<L, R>
where
    L: Eq,
    R: Ord,
{
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl<L, R> PartialEq for Pair<L, R>
where
    R: Eq,
{
    fn eq(&self, other: &Self) -> bool {
        self.1 == other.1
    }
}

impl<L, R> GroupAs<L> for Pair<L, R>
where
    L: Clone + Hash + Eq + PartialEq,
{
    fn group(&self) -> L {
        self.0.to_owned()
    }
}

impl<L, R> AggregateAs<R> for Pair<L, R>
where
    R: Clone,
{
    fn aggregate_value(&self) -> R {
        self.1.to_owned()
    }
}

impl<L, R> AggregateAs<Vec<Pair<L, R>>> for Pair<L, R>
where
    L: Clone,
    R: Clone,
{
    fn aggregate_value(&self) -> Vec<Pair<L, R>> {
        vec![self.to_owned()]
    }
}

impl<L, R> std::ops::AddAssign<Self> for Pair<L, R>
where
    L: Eq + PartialEq + Debug,
    R: std::ops::AddAssign<R>,
{
    fn add_assign(&mut self, rhs: Self) {
        if !self.0.eq(&rhs.0) {
            panic!(
                "can not add assign pair with different left: self {:?}, rhs {:?}",
                self.0, rhs.0
            );
        }
        self.1 += rhs.1
    }
}

impl<L, R> Hash for Pair<L, R>
where
    L: Hash,
{
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.0.hash(state)
    }
}

#[cfg(test)]
mod left_right_tests {

    use crate::prelude::*;

    #[derive(LeftRight)]
    struct Record {
        #[left]
        id: String,
        #[right]
        val: i32,
    }

    #[test]
    fn test_left_right() {
        let r = Record {
            id: "foo".to_owned(),
            val: 1,
        };
        assert_eq!("foo", r.left());
        assert_eq!(&1, r.right());
    }
}

#[cfg(test)]
mod pair_tests {

    use crate::prelude::*;
    use std::collections::hash_map::DefaultHasher;
    use std::hash::{Hash, Hasher};

    fn get_hash<H>(h: &H) -> u64
    where
        H: Hash,
    {
        let mut hasher = DefaultHasher::new();
        h.hash(&mut hasher);
        hasher.finish()
    }

    #[test]
    fn test_right_ordered_pair_cmp() {
        let p0 = Pair::new("foo".to_owned(), 1);
        let p1 = Pair::new("foo".to_owned(), 2);
        assert!(p0 < p1);
        assert_eq!(get_hash(&p0), get_hash(&p1));
        let p2 = Pair::new("bar".to_owned(), 2);
        assert_eq!(p1, p2);
        assert_ne!(get_hash(&p1), get_hash(&p2));
        assert!(p0 < p2);
    }

    use crate::*;

    #[tokio::test]
    async fn test_right_ordered_pair_group_sum() {
        let (tx0, rx0) = channel!(Vec<Pair<String, u32>>, 1024);
        let (tx1, mut rx1) = channel!(Vec<Pair<String, u32>>, 1024);
        let channels = pipe_channels!(rx0, [tx1]);
        let config = config!(UnorderedGroupAddAggregatorConfig);
        let pipe = mapper!("pair_group_summation");
        let f0 = populate_records(
            tx0,
            vec![vec![
                Pair::new("foo".to_owned(), 1),
                Pair::new("foo".to_owned(), 2),
                Pair::new("bar".to_owned(), 2),
            ]],
        );
        f0.await;
        join_pipes!([run_pipe!(pipe, config, channels)]);
        let gs = rx1.recv().await.unwrap();
        for p in gs {
            match p.left().as_str() {
                "foo" => assert_eq!(&3, p.right()),
                "bar" => assert_eq!(&2, p.right()),
                _ => unreachable!(),
            }
        }
    }

    #[tokio::test]
    async fn test_top_pair() {
        let (tx0, rx0) = channel!(Vec<Pair<String, Count32>>, 1024);
        let (tx1, mut rx1) = channel!(Vec<Pair<String, Count32>>, 1024);
        let channels = pipe_channels!(rx0, [tx1]);
        let config = config!(
            TopAggregatorConfig,
            "resources/catalogs/top_aggregator_desc.yml"
        );
        let pipe = Mapper::new("top_word");
        let f0 = populate_records(
            tx0,
            vec![vec![
                Pair::new("d".to_owned(), Count32::new(4)),
                Pair::new("a".to_owned(), Count32::new(1)),
                Pair::new("e".to_owned(), Count32::new(5)),
                Pair::new("b".to_owned(), Count32::new(2)),
                Pair::new("c".to_owned(), Count32::new(3)),
            ]],
        );
        f0.await;
        join_pipes!([run_pipe!(pipe, config, channels)]);
        let top = rx1.recv().await.unwrap();
        assert_eq!(3, top.len());
        let top1 = top.get(0).unwrap();
        assert_eq!(5, top1.right().get());
        assert_eq!("e", top1.left());
        let top2 = top.get(1).unwrap();
        assert_eq!(4, top2.right().get());
        assert_eq!("d", top2.left());
        let top3 = top.get(2).unwrap();
        assert_eq!(3, top3.right().get());
        assert_eq!("c", top3.left());
    }
}