retriever 0.0.3

Retriever is an embedded in-memory data store for rust applications.
Documentation
use crate::internal::mr::rvec::RVec;
use std::sync::Arc;

pub(crate) struct ReduceRules<Element, Summary> {
    map: Arc<dyn Fn(&Element, &Summary, usize) -> Option<Summary> + Send + Sync + 'static>,
    reduce: Arc<dyn Fn(&[Summary], &Summary) -> Option<Summary> + Send + Sync + 'static>,
}

pub(crate) struct Reduce<Element, Summary> {
    rules: ReduceRules<Element, Summary>,
    reductions: Vec<RVec<Summary>>,
    group_size: usize,
}

impl<Element, Summary> ReduceRules<Element, Summary> {
    pub(crate) fn new<Map, Reduce>(map: Map, reduce: Reduce) -> ReduceRules<Element, Summary>
    where
        Map: Fn(&Element, &Summary, usize) -> Option<Summary> + Send + Sync + 'static,
        Reduce: Fn(&[Summary], &Summary) -> Option<Summary> + Send + Sync + 'static,
    {
        ReduceRules {
            map: Arc::new(map),
            reduce: Arc::new(reduce),
        }
    }
}

impl<Element, Summary> Reduce<Element, Summary>
where
    Summary: Default,
{
    pub(crate) fn new(
        _parent: &RVec<Element>,
        group_size: usize,
        rules: ReduceRules<Element, Summary>,
    ) -> Self {
        assert!(group_size > 1);

        Reduce {
            rules,
            reductions: vec![RVec::default()],
            group_size,
        }
    }

    pub(crate) fn update(&mut self, parent: &RVec<Element>) -> Option<&Summary> {
        let mut layer = 0;
        let map = &self.rules.map;
        let reduce = &self.rules.reduce;

        if parent.len() == 0 {
            self.reductions[layer] = RVec::default();
        } else {
            self.reductions[layer].reduce(parent, 1, |xs, y, i| {
                if xs.is_empty() {
                    None
                } else {
                    (map)(&xs[0], y, i)
                }
            });
        }

        while self.reductions[layer].len() > 1 {
            if self.reductions.len() == layer + 1 {
                self.reductions.push(RVec::default());
            }

            let (left, right) = self.reductions.split_at_mut(layer + 1);
            right[0].reduce(&left[layer], self.group_size, |xs, y, _| {
                if xs.is_empty() {
                    None
                } else {
                    (reduce)(xs, y)
                }
            });

            layer += 1;
        }

        self.reductions.truncate(layer + 1);

        self.peek()
    }

    pub(crate) fn peek(&self) -> Option<&Summary> {
        let result_slice = &self.reductions[self.reductions.len() - 1];

        if result_slice.len() == 0 {
            None
        } else if result_slice.len() == 1 {
            Some(&result_slice[0])
        } else {
            panic!("bug in retriever");
        }
    }
}

impl<Element, Summary> Clone for ReduceRules<Element, Summary> {
    fn clone(&self) -> Self {
        ReduceRules {
            map: Arc::clone(&self.map),
            reduce: Arc::clone(&self.reduce),
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;

    fn summation_rules() -> ReduceRules<i64, i64> {
        ReduceRules {
            map: Arc::new(
                |n: &i64, old_n: &i64, _: usize| {
                    if n != old_n {
                        Some(*n)
                    } else {
                        None
                    }
                },
            ),
            reduce: Arc::new(|ns: &[i64], _old_n: &i64| Some(ns.iter().cloned().sum::<i64>())),
        }
    }

    #[test]
    fn test_sum() {
        use super::*;

        let mut numbers = RVec::default();

        numbers.push(1);
        numbers.push(2);
        numbers.push(3);
        numbers.push(4);
        numbers.push(5);
        numbers.push(6);
        numbers.push(7);

        let mut sum = Reduce::new(&numbers, 2, summation_rules());

        sum.update(&numbers);

        assert_eq!(sum.peek(), Some(&28));
    }

    #[test]
    fn test_sum_with_update() {
        use super::*;

        let mut numbers = RVec::default();

        numbers.push(1);
        numbers.push(2);
        numbers.push(3);
        numbers.push(4);
        numbers.push(5);
        numbers.push(6);
        numbers.push(7);

        let mut sum = Reduce::new(&numbers, 2, summation_rules());

        sum.update(&numbers);
        assert_eq!(sum.peek(), Some(&28));

        numbers[3] += 10;

        sum.update(&numbers);
        assert_eq!(sum.peek(), Some(&38));
    }

    #[test]
    fn test_sum_with_removal() {
        use super::*;

        let mut numbers = RVec::default();

        numbers.push(1);
        numbers.push(2);
        numbers.push(3);
        numbers.push(4);
        numbers.push(5);
        numbers.push(6);
        numbers.push(7);

        let mut sum = Reduce::new(&numbers, 2, summation_rules());

        sum.update(&numbers);
        assert_eq!(sum.peek(), Some(&28));

        numbers.swap_remove(3);

        sum.update(&numbers);
        assert_eq!(sum.peek(), Some(&24));
    }

    #[test]
    fn test_sum_with_addition() {
        use super::*;

        let mut numbers = RVec::default();

        numbers.push(1);
        numbers.push(2);
        numbers.push(3);
        numbers.push(4);
        numbers.push(5);
        numbers.push(6);
        numbers.push(7);

        let mut sum = Reduce::new(&numbers, 2, summation_rules());

        sum.update(&numbers);
        assert_eq!(sum.peek(), Some(&28));

        numbers.push(8);

        sum.update(&numbers);
        assert_eq!(sum.peek(), Some(&36));
    }
}