declarative-dataflow 0.1.0

A reactive query engine built on Differential Dataflow.
Documentation
use std::collections::HashSet;
use std::sync::mpsc::channel;
use std::time::Duration;

use declarative_dataflow::plan::{Pull, PullLevel};
use declarative_dataflow::server::Server;
use declarative_dataflow::{AttributeConfig, InputSemantics, Plan, Rule, TxData, Value};
use InputSemantics::Raw;
use Value::{Aid, Bool, Eid, Number, String};

#[test]
fn pull_level() {
    timely::execute_directly(|worker| {
        let mut server = Server::<u64, u64>::new(Default::default());
        let (send_results, results) = channel();

        let (e,) = (1,);
        let plan = Plan::PullLevel(PullLevel {
            variables: vec![],
            plan: Box::new(Plan::MatchAV(e, "admin?".to_string(), Bool(false))),
            pull_attributes: vec!["name".to_string(), "age".to_string()],
            path_attributes: vec![],
        });

        worker.dataflow::<u64, _, _>(|scope| {
            server
                .context
                .internal
                .create_attribute("admin?", AttributeConfig::tx_time(Raw), scope)
                .unwrap();
            server
                .context
                .internal
                .create_attribute("name", AttributeConfig::tx_time(Raw), scope)
                .unwrap();
            server
                .context
                .internal
                .create_attribute("age", AttributeConfig::tx_time(Raw), scope)
                .unwrap();

            server
                .test_single(
                    scope,
                    Rule {
                        name: "pull_level".to_string(),
                        plan,
                    },
                )
                .inspect(move |x| {
                    send_results.send((x.0.clone(), x.2)).unwrap();
                });
        });

        server
            .transact(
                vec![
                    TxData(1, 100, "admin?".to_string(), Bool(true)),
                    TxData(1, 200, "admin?".to_string(), Bool(false)),
                    TxData(1, 300, "admin?".to_string(), Bool(false)),
                    TxData(1, 100, "name".to_string(), String("Mabel".to_string())),
                    TxData(1, 200, "name".to_string(), String("Dipper".to_string())),
                    TxData(1, 300, "name".to_string(), String("Soos".to_string())),
                    TxData(1, 100, "age".to_string(), Number(12)),
                    TxData(1, 200, "age".to_string(), Number(13)),
                ],
                0,
                0,
            )
            .unwrap();

        server.advance_domain(None, 1).unwrap();

        worker.step_while(|| server.is_any_outdated());

        let mut expected = HashSet::new();
        expected.insert((vec![Eid(200), Aid("age".to_string()), Number(13)], 1));
        expected.insert((
            vec![
                Eid(200),
                Aid("name".to_string()),
                String("Dipper".to_string()),
            ],
            1,
        ));
        expected.insert((
            vec![
                Eid(300),
                Aid("name".to_string()),
                String("Soos".to_string()),
            ],
            1,
        ));

        for _i in 0..expected.len() {
            let result = results.recv().unwrap();
            if !expected.remove(&result) {
                panic!("unknown result {:?}", result);
            }
        }

        assert!(results.recv_timeout(Duration::from_millis(400)).is_err());
    });
}

#[test]
fn pull_children() {
    timely::execute_directly(|worker| {
        let mut server = Server::<u64, u64>::new(Default::default());
        let (send_results, results) = channel();

        let (parent, child) = (1, 2);
        let plan = Plan::PullLevel(PullLevel {
            variables: vec![],
            plan: Box::new(Plan::MatchA(parent, "parent/child".to_string(), child)),
            pull_attributes: vec!["name".to_string(), "age".to_string()],
            path_attributes: vec!["parent/child".to_string()],
        });

        worker.dataflow::<u64, _, _>(|scope| {
            server
                .context
                .internal
                .create_attribute("parent/child", AttributeConfig::tx_time(Raw), scope)
                .unwrap();
            server
                .context
                .internal
                .create_attribute("name", AttributeConfig::tx_time(Raw), scope)
                .unwrap();
            server
                .context
                .internal
                .create_attribute("age", AttributeConfig::tx_time(Raw), scope)
                .unwrap();

            server
                .test_single(
                    scope,
                    Rule {
                        name: "pull_children".to_string(),
                        plan,
                    },
                )
                .inspect(move |x| {
                    send_results.send((x.0.clone(), x.2)).unwrap();
                });
        });

        server
            .transact(
                vec![
                    TxData(1, 100, "name".to_string(), String("Alice".to_string())),
                    TxData(1, 100, "parent/child".to_string(), Eid(300)),
                    TxData(1, 200, "name".to_string(), String("Bob".to_string())),
                    TxData(1, 200, "parent/child".to_string(), Eid(400)),
                    TxData(1, 300, "name".to_string(), String("Mabel".to_string())),
                    TxData(1, 300, "age".to_string(), Number(13)),
                    TxData(1, 400, "name".to_string(), String("Dipper".to_string())),
                    TxData(1, 400, "age".to_string(), Number(12)),
                ],
                0,
                0,
            )
            .unwrap();

        server.advance_domain(None, 1).unwrap();

        worker.step_while(|| server.is_any_outdated());

        let mut expected = HashSet::new();
        expected.insert((
            vec![
                Eid(100),
                Aid("parent/child".to_string()),
                Eid(300),
                Aid("age".to_string()),
                Number(13),
            ],
            1,
        ));
        expected.insert((
            vec![
                Eid(100),
                Aid("parent/child".to_string()),
                Eid(300),
                Aid("name".to_string()),
                String("Mabel".to_string()),
            ],
            1,
        ));
        expected.insert((
            vec![
                Eid(200),
                Aid("parent/child".to_string()),
                Eid(400),
                Aid("age".to_string()),
                Number(12),
            ],
            1,
        ));
        expected.insert((
            vec![
                Eid(200),
                Aid("parent/child".to_string()),
                Eid(400),
                Aid("name".to_string()),
                String("Dipper".to_string()),
            ],
            1,
        ));

        for _i in 0..expected.len() {
            let result = results.recv().unwrap();
            if !expected.remove(&result) {
                panic!("unknown result {:?}", result);
            }
        }

        assert!(results.recv_timeout(Duration::from_millis(400)).is_err());
    });
}

#[test]
fn pull() {
    timely::execute_directly(|worker| {
        let mut server = Server::<u64, u64>::new(Default::default());
        let (send_results, results) = channel();

        let (a, b, c) = (1, 2, 3);
        let plan = Plan::Pull(Pull {
            variables: vec![],
            paths: vec![
                PullLevel {
                    variables: vec![],
                    plan: Box::new(Plan::MatchA(a, "join/binding".to_string(), b)),
                    pull_attributes: vec![
                        "pattern/e".to_string(),
                        "pattern/a".to_string(),
                        "pattern/v".to_string(),
                    ],
                    path_attributes: vec!["join/binding".to_string()],
                },
                PullLevel {
                    variables: vec![],
                    plan: Box::new(Plan::MatchA(a, "name".to_string(), c)),
                    pull_attributes: vec![],
                    path_attributes: vec!["name".to_string()],
                },
            ],
        });

        worker.dataflow::<u64, _, _>(|scope| {
            server
                .context
                .internal
                .create_attribute("name", AttributeConfig::tx_time(Raw), scope)
                .unwrap();
            server
                .context
                .internal
                .create_attribute("join/binding", AttributeConfig::tx_time(Raw), scope)
                .unwrap();
            server
                .context
                .internal
                .create_attribute("pattern/e", AttributeConfig::tx_time(Raw), scope)
                .unwrap();
            server
                .context
                .internal
                .create_attribute("pattern/a", AttributeConfig::tx_time(Raw), scope)
                .unwrap();
            server
                .context
                .internal
                .create_attribute("pattern/v", AttributeConfig::tx_time(Raw), scope)
                .unwrap();

            server
                .test_single(
                    scope,
                    Rule {
                        name: "pull".to_string(),
                        plan,
                    },
                )
                .inspect(move |x| {
                    send_results.send((x.0.clone(), x.2)).unwrap();
                });
        });

        server
            .transact(
                vec![
                    TxData(1, 100, "name".to_string(), String("rule".to_string())),
                    TxData(1, 100, "join/binding".to_string(), Eid(200)),
                    TxData(1, 100, "join/binding".to_string(), Eid(300)),
                    TxData(1, 200, "pattern/a".to_string(), Aid("xyz".to_string())),
                    TxData(1, 300, "pattern/e".to_string(), Eid(12345)),
                    TxData(1, 300, "pattern/a".to_string(), Aid("asd".to_string())),
                ],
                0,
                0,
            )
            .unwrap();

        server.advance_domain(None, 1).unwrap();

        worker.step_while(|| server.is_any_outdated());

        let mut expected = HashSet::new();
        expected.insert((
            vec![
                Eid(100),
                Aid("name".to_string()),
                String("rule".to_string()),
            ],
            1,
        ));
        expected.insert((
            vec![
                Eid(100),
                Aid("join/binding".to_string()),
                Eid(200),
                Aid("pattern/a".to_string()),
                Aid("xyz".to_string()),
            ],
            1,
        ));
        expected.insert((
            vec![
                Eid(100),
                Aid("join/binding".to_string()),
                Eid(300),
                Aid("pattern/e".to_string()),
                Eid(12345),
            ],
            1,
        ));
        expected.insert((
            vec![
                Eid(100),
                Aid("join/binding".to_string()),
                Eid(300),
                Aid("pattern/a".to_string()),
                Aid("asd".to_string()),
            ],
            1,
        ));

        for _i in 0..expected.len() {
            let result = results.recv_timeout(Duration::from_millis(400)).unwrap();
            if !expected.remove(&result) {
                panic!("unknown result {:?}", result);
            }
        }

        assert!(results.recv_timeout(Duration::from_millis(400)).is_err());
    });
}