palimpsest-dataflow 0.1.1

A Postgres WAL-backed live query sync engine.
Documentation
#![allow(clippy::all, clippy::nursery, clippy::pedantic)]

use palimpsest_dataflow::operators::{Count, Reduce};
use palimpsest_dataflow::AsCollection;
use timely::dataflow::operators::capture::Extract;
use timely::dataflow::operators::{Capture, Map, ToStream};

#[test]
fn reduce() {
    let data = timely::example(|scope| {
        let col1 = vec![
            ((0, 0), Default::default(), 1),
            ((1, 2), Default::default(), 1),
            ((1, 1), Default::default(), 1),
        ]
        .into_iter()
        .to_stream(scope)
        .as_collection();

        col1.reduce(|_, s, t| t.push((*s[0].0, s.len() as isize)))
            .inner
            .capture()
    });

    let extracted = data.extract();
    assert_eq!(extracted.len(), 1);
    assert_eq!(
        extracted[0].1,
        vec![
            ((0, 0), Default::default(), 1),
            ((1, 1), Default::default(), 2)
        ]
    );
}

#[test]
fn reduce_scaling() {
    let data = timely::example(|scope| {
        // let scale = 100;
        let scale = 100_000;

        (0..1)
            .to_stream(scope)
            .flat_map(move |_| (0..scale).map(|i| ((), i, 1)))
            .as_collection()
            .count()
            .inner
            .capture()
    });

    let extracted = data.extract();
    assert_eq!(extracted.len(), 1);
}