merksql 0.1.0

Streaming SQL engine for merkql — ksqlDB-style queries over embedded event logs
Documentation
use std::time::Duration;

use merksql::builder::*;
use merksql::expr::Expr;
use merksql::plan::*;

#[test]
fn build_simple_scan() {
    let plan = QueryBuilder::from_source("readings").build();
    assert_eq!(
        plan,
        QueryPlan::Scan {
            source: "readings".to_string()
        }
    );
}

#[test]
fn build_filter() {
    let plan = QueryBuilder::from_source("readings")
        .filter(col("temp").gt(lit_f64(100.0)))
        .build();

    match &plan {
        QueryPlan::Filter { input, predicate } => {
            assert!(matches!(input.as_ref(), QueryPlan::Scan { source } if source == "readings"));
            assert!(matches!(predicate, Expr::BinaryOp { .. }));
        }
        _ => panic!("Expected Filter plan"),
    }
}

#[test]
fn build_filter_and_project() {
    let plan = QueryBuilder::from_source("readings")
        .filter(col("temp").gt(lit_f64(100.0)))
        .select(&[col("sensor_id"), col("temp")])
        .build();

    match &plan {
        QueryPlan::Project { input, expressions } => {
            assert_eq!(expressions.len(), 2);
            match input.as_ref() {
                QueryPlan::Filter { input, .. } => {
                    assert!(matches!(input.as_ref(), QueryPlan::Scan { .. }));
                }
                _ => panic!("Expected Filter inside Project"),
            }
        }
        _ => panic!("Expected Project plan"),
    }
}

#[test]
fn build_stream_sink() {
    let plan = QueryBuilder::from_source("readings")
        .filter(col("temp").gt(lit_f64(100.0)))
        .select(&[col("sensor_id"), col("temp")])
        .as_stream("high_temps", "high-temps-topic")
        .build();

    match &plan {
        QueryPlan::Sink {
            name,
            topic,
            sink_type,
            ..
        } => {
            assert_eq!(name, "high_temps");
            assert_eq!(topic, "high-temps-topic");
            assert_eq!(*sink_type, SinkType::Stream);
        }
        _ => panic!("Expected Sink plan"),
    }
}

#[test]
fn build_aggregate_count() {
    let plan = QueryBuilder::from_source("readings")
        .group_by(&[col("sensor_id")])
        .count_star("cnt")
        .build();

    match &plan {
        QueryPlan::Aggregate {
            group_by,
            aggregates,
            window,
            having,
            ..
        } => {
            assert_eq!(group_by.len(), 1);
            assert_eq!(aggregates.len(), 1);
            assert_eq!(aggregates[0].function, AggregateFunction::Count);
            assert_eq!(aggregates[0].alias, "cnt");
            assert!(window.is_none());
            assert!(having.is_none());
        }
        _ => panic!("Expected Aggregate plan"),
    }
}

#[test]
fn build_aggregate_with_multiple_functions() {
    let plan = QueryBuilder::from_source("readings")
        .group_by(&[col("sensor_id")])
        .count_star("cnt")
        .sum(col("temp"), "total_temp")
        .avg(col("temp"), "avg_temp")
        .min(col("temp"), "min_temp")
        .max(col("temp"), "max_temp")
        .build();

    match &plan {
        QueryPlan::Aggregate { aggregates, .. } => {
            assert_eq!(aggregates.len(), 5);
            assert_eq!(aggregates[0].function, AggregateFunction::Count);
            assert_eq!(aggregates[1].function, AggregateFunction::Sum);
            assert_eq!(aggregates[2].function, AggregateFunction::Avg);
            assert_eq!(aggregates[3].function, AggregateFunction::Min);
            assert_eq!(aggregates[4].function, AggregateFunction::Max);
        }
        _ => panic!("Expected Aggregate plan"),
    }
}

#[test]
fn build_tumbling_window() {
    let plan = QueryBuilder::from_source("readings")
        .group_by(&[col("sensor_id")])
        .tumbling(Duration::from_secs(300))
        .count_star("cnt")
        .build();

    match &plan {
        QueryPlan::Aggregate { window, .. } => match window.as_ref().unwrap() {
            WindowSpec::Tumbling { size, grace } => {
                assert_eq!(*size, Duration::from_secs(300));
                assert!(grace.is_none());
            }
            _ => panic!("Expected Tumbling window"),
        },
        _ => panic!("Expected Aggregate plan"),
    }
}

#[test]
fn build_hopping_window() {
    let plan = QueryBuilder::from_source("readings")
        .group_by(&[col("sensor_id")])
        .hopping(Duration::from_secs(300), Duration::from_secs(60))
        .count_star("cnt")
        .build();

    match &plan {
        QueryPlan::Aggregate { window, .. } => match window.as_ref().unwrap() {
            WindowSpec::Hopping { size, advance, .. } => {
                assert_eq!(*size, Duration::from_secs(300));
                assert_eq!(*advance, Duration::from_secs(60));
            }
            _ => panic!("Expected Hopping window"),
        },
        _ => panic!("Expected Aggregate plan"),
    }
}

#[test]
fn build_having() {
    let plan = QueryBuilder::from_source("readings")
        .group_by(&[col("sensor_id")])
        .count_star("cnt")
        .having(col("cnt").gt(lit_i64(10)))
        .build();

    match &plan {
        QueryPlan::Aggregate { having, .. } => {
            assert!(having.is_some());
        }
        _ => panic!("Expected Aggregate plan"),
    }
}

#[test]
fn build_aggregate_table_sink() {
    let plan = QueryBuilder::from_source("readings")
        .group_by(&[col("sensor_id")])
        .count_star("cnt")
        .as_table("sensor_counts", "sensor-counts-topic")
        .build();

    match &plan {
        QueryPlan::Sink { sink_type, .. } => {
            assert_eq!(*sink_type, SinkType::Table);
        }
        _ => panic!("Expected Sink plan"),
    }
}

#[test]
fn build_join() {
    let plan = QueryBuilder::from_source("orders")
        .join(
            "customers",
            JoinType::Left,
            col("orders.customer_id").eq_expr(col("customers.id")),
        )
        .build();

    match &plan {
        QueryPlan::Join {
            join_type, within, ..
        } => {
            assert_eq!(*join_type, JoinType::Left);
            assert!(within.is_none());
        }
        _ => panic!("Expected Join plan"),
    }
}

#[test]
fn build_join_within() {
    let plan = QueryBuilder::from_source("clicks")
        .join(
            "impressions",
            JoinType::Inner,
            col("clicks.ad_id").eq_expr(col("impressions.ad_id")),
        )
        .within(Duration::from_secs(3600))
        .build();

    match &plan {
        QueryPlan::Join { within, .. } => {
            assert_eq!(*within, Some(Duration::from_secs(3600)));
        }
        _ => panic!("Expected Join plan"),
    }
}

#[test]
fn plan_source_names() {
    let plan = QueryBuilder::from_source("readings")
        .filter(col("temp").gt(lit_f64(100.0)))
        .select(&[col("sensor_id")])
        .build();

    assert_eq!(plan.source_names(), vec!["readings"]);

    let plan = QueryBuilder::from_source("orders")
        .join("customers", JoinType::Inner, col("id").eq_expr(col("id")))
        .build();

    let sources = plan.source_names();
    assert_eq!(sources.len(), 2);
    assert!(sources.contains(&"orders".to_string()));
    assert!(sources.contains(&"customers".to_string()));
}