use std::time::Duration;
use chrono::{DateTime, Utc};
use merkql::broker::{Broker, BrokerConfig};
use merkql::record::ProducerRecord;
use merksql::builder::*;
use merksql::engine::pipeline;
use merksql::schema::SchemaRegistry;
use merksql::types::*;
fn setup_broker(dir: &tempfile::TempDir) -> merkql::broker::BrokerRef {
Broker::open(BrokerConfig::new(dir.path())).unwrap()
}
fn create_registry(topic: &str) -> SchemaRegistry {
let mut registry = SchemaRegistry::new();
registry
.register_stream(
"events",
Schema::new(vec![
Column::new("sensor_id", DataType::String),
Column::new("value", DataType::Double),
]),
topic,
)
.unwrap();
registry
}
fn make_timestamped_rows(data: &[(&str, f64, i64)]) -> Vec<Row> {
data.iter()
.map(|(sensor_id, value, ts_millis)| {
let json = format!(r#"{{"sensor_id": "{}", "value": {}}}"#, sensor_id, value);
Row::with_metadata(
vec![Value::String(json)],
RowMetadata {
timestamp: DateTime::from_timestamp_millis(*ts_millis),
..Default::default()
},
)
})
.collect()
}
#[test]
fn tumbling_window_basic() {
let dir = tempfile::tempdir().unwrap();
let _broker = setup_broker(&dir);
let registry = create_registry("test-topic");
let plan = QueryBuilder::from_source("events")
.group_by(&[col("sensor_id")])
.tumbling(Duration::from_secs(10))
.count_star("cnt")
.build();
let mut pipeline = pipeline::compile(&plan, ®istry).unwrap();
let rows = make_timestamped_rows(&[
("s1", 10.0, 1000), ("s1", 20.0, 3000), ("s2", 30.0, 5000), ("s1", 40.0, 7000), ]);
let result = pipeline.process(rows).unwrap();
assert_eq!(result.len(), 2);
let mut results: Vec<(String, i64)> = result
.iter()
.map(|r| {
let sid = r.get(0).as_str().unwrap().to_string();
let cnt = r.get(1).as_i64().unwrap();
(sid, cnt)
})
.collect();
results.sort();
assert_eq!(results[0], ("s1".to_string(), 3));
assert_eq!(results[1], ("s2".to_string(), 1));
for row in &result {
assert!(row.metadata.window_start.is_some());
assert!(row.metadata.window_end.is_some());
let start = row.metadata.window_start.unwrap().timestamp_millis();
let end = row.metadata.window_end.unwrap().timestamp_millis();
assert_eq!(start, 0);
assert_eq!(end, 10000);
}
}
#[test]
fn tumbling_window_multiple_windows() {
let dir = tempfile::tempdir().unwrap();
let _broker = setup_broker(&dir);
let registry = create_registry("test-topic");
let plan = QueryBuilder::from_source("events")
.group_by(&[col("sensor_id")])
.tumbling(Duration::from_secs(10))
.count_star("cnt")
.sum(col("value"), "total")
.build();
let mut pipeline = pipeline::compile(&plan, ®istry).unwrap();
let rows = make_timestamped_rows(&[
("s1", 10.0, 2000), ("s1", 20.0, 5000), ("s1", 30.0, 12000), ("s1", 40.0, 15000), ("s1", 50.0, 18000), ]);
let result = pipeline.process(rows).unwrap();
assert_eq!(result.len(), 2);
let mut results: Vec<(i64, i64, f64)> = result
.iter()
.map(|r| {
let start = r.metadata.window_start.unwrap().timestamp_millis();
let cnt = r.get(1).as_i64().unwrap();
let total = r.get(2).as_f64().unwrap();
(start, cnt, total)
})
.collect();
results.sort_by_key(|r| r.0);
assert_eq!(results[0].0, 0);
assert_eq!(results[0].1, 2);
assert!((results[0].2 - 30.0).abs() < 0.01);
assert_eq!(results[1].0, 10000);
assert_eq!(results[1].1, 3);
assert!((results[1].2 - 120.0).abs() < 0.01);
}
#[test]
fn hopping_window_overlapping() {
let dir = tempfile::tempdir().unwrap();
let _broker = setup_broker(&dir);
let registry = create_registry("test-topic");
let plan = QueryBuilder::from_source("events")
.group_by(&[col("sensor_id")])
.hopping(Duration::from_secs(10), Duration::from_secs(5))
.count_star("cnt")
.build();
let mut pipeline = pipeline::compile(&plan, ®istry).unwrap();
let rows = make_timestamped_rows(&[("s1", 10.0, 7000)]);
let result = pipeline.process(rows).unwrap();
assert_eq!(result.len(), 2);
let mut window_starts: Vec<i64> = result
.iter()
.map(|r| r.metadata.window_start.unwrap().timestamp_millis())
.collect();
window_starts.sort();
assert_eq!(window_starts, vec![0, 5000]);
for row in &result {
assert_eq!(row.get(1).as_i64().unwrap(), 1);
}
}
#[test]
fn hopping_window_accumulation() {
let dir = tempfile::tempdir().unwrap();
let _broker = setup_broker(&dir);
let registry = create_registry("test-topic");
let plan = QueryBuilder::from_source("events")
.group_by(&[col("sensor_id")])
.hopping(Duration::from_secs(10), Duration::from_secs(5))
.sum(col("value"), "total")
.build();
let mut pipeline = pipeline::compile(&plan, ®istry).unwrap();
let rows = make_timestamped_rows(&[
("s1", 10.0, 52000),
("s1", 20.0, 57000),
("s1", 30.0, 62000),
]);
let result = pipeline.process(rows).unwrap();
let mut results: Vec<(i64, f64)> = result
.iter()
.map(|r| {
let start = r.metadata.window_start.unwrap().timestamp_millis();
let total = r.get(1).as_f64().unwrap();
(start, total)
})
.collect();
results.sort_by_key(|r| r.0);
assert_eq!(results[0].0, 45000);
assert!((results[0].1 - 10.0).abs() < 0.01);
assert_eq!(results[1].0, 50000);
assert!((results[1].1 - 30.0).abs() < 0.01);
assert_eq!(results[2].0, 55000);
assert!((results[2].1 - 50.0).abs() < 0.01);
assert_eq!(results[3].0, 60000);
assert!((results[3].1 - 30.0).abs() < 0.01);
}
#[test]
fn tumbling_window_multiple_groups() {
let dir = tempfile::tempdir().unwrap();
let _broker = setup_broker(&dir);
let registry = create_registry("test-topic");
let plan = QueryBuilder::from_source("events")
.group_by(&[col("sensor_id")])
.tumbling(Duration::from_secs(10))
.count_star("cnt")
.build();
let mut pipeline = pipeline::compile(&plan, ®istry).unwrap();
let rows = make_timestamped_rows(&[
("s1", 10.0, 1000),
("s2", 20.0, 2000),
("s1", 30.0, 3000),
("s2", 40.0, 4000),
("s3", 50.0, 5000),
]);
let result = pipeline.process(rows).unwrap();
assert_eq!(result.len(), 3);
let mut results: Vec<(String, i64)> = result
.iter()
.map(|r| {
let sid = r.get(0).as_str().unwrap().to_string();
let cnt = r.get(1).as_i64().unwrap();
(sid, cnt)
})
.collect();
results.sort();
assert_eq!(results[0], ("s1".to_string(), 2));
assert_eq!(results[1], ("s2".to_string(), 2));
assert_eq!(results[2], ("s3".to_string(), 1));
}
#[test]
fn tumbling_window_with_filter() {
let dir = tempfile::tempdir().unwrap();
let _broker = setup_broker(&dir);
let registry = create_registry("test-topic");
let plan = QueryBuilder::from_source("events")
.filter(col("value").gt(lit_f64(15.0)))
.group_by(&[col("sensor_id")])
.tumbling(Duration::from_secs(10))
.count_star("cnt")
.build();
let mut pipeline = pipeline::compile(&plan, ®istry).unwrap();
let rows = make_timestamped_rows(&[
("s1", 10.0, 1000), ("s1", 20.0, 3000), ("s1", 30.0, 5000), ("s2", 5.0, 2000), ("s2", 25.0, 4000), ]);
let result = pipeline.process(rows).unwrap();
let mut results: Vec<(String, i64)> = result
.iter()
.map(|r| {
let sid = r.get(0).as_str().unwrap().to_string();
let cnt = r.get(1).as_i64().unwrap();
(sid, cnt)
})
.collect();
results.sort();
assert_eq!(results[0], ("s1".to_string(), 2));
assert_eq!(results[1], ("s2".to_string(), 1));
}
#[test]
fn tumbling_window_builder_with_grace() {
let plan = QueryBuilder::from_source("events")
.group_by(&[col("sensor_id")])
.tumbling_with_grace(Duration::from_secs(10), Duration::from_secs(2))
.count_star("cnt")
.build();
match &plan {
merksql::plan::QueryPlan::Aggregate { window, .. } => match window.as_ref().unwrap() {
merksql::plan::WindowSpec::Tumbling { size, grace } => {
assert_eq!(*size, Duration::from_secs(10));
assert_eq!(*grace, Some(Duration::from_secs(2)));
}
_ => panic!("Expected Tumbling window"),
},
_ => panic!("Expected Aggregate plan"),
}
}
#[test]
fn session_window_basic() {
let dir = tempfile::tempdir().unwrap();
let _broker = setup_broker(&dir);
let registry = create_registry("test-topic");
let plan = QueryBuilder::from_source("events")
.group_by(&[col("sensor_id")])
.session(Duration::from_secs(5))
.count_star("cnt")
.build();
let mut pipeline = pipeline::compile(&plan, ®istry).unwrap();
let rows = make_timestamped_rows(&[("s1", 10.0, 1000), ("s1", 20.0, 2000), ("s1", 30.0, 3000)]);
let result = pipeline.process(rows).unwrap();
assert!(!result.is_empty());
for row in &result {
assert_eq!(row.get(0), &Value::String("s1".to_string()));
assert!(row.get(1).as_i64().unwrap() >= 1);
}
}
#[test]
fn windowed_avg_and_sum() {
let dir = tempfile::tempdir().unwrap();
let _broker = setup_broker(&dir);
let registry = create_registry("test-topic");
let plan = QueryBuilder::from_source("events")
.group_by(&[col("sensor_id")])
.tumbling(Duration::from_secs(10))
.sum(col("value"), "total")
.avg(col("value"), "average")
.min(col("value"), "minimum")
.max(col("value"), "maximum")
.build();
let mut pipeline = pipeline::compile(&plan, ®istry).unwrap();
let rows = make_timestamped_rows(&[("s1", 10.0, 1000), ("s1", 20.0, 3000), ("s1", 30.0, 5000)]);
let result = pipeline.process(rows).unwrap();
assert_eq!(result.len(), 1);
let row = &result[0];
assert_eq!(row.get(0), &Value::String("s1".to_string()));
assert!((row.get(1).as_f64().unwrap() - 60.0).abs() < 0.01); assert!((row.get(2).as_f64().unwrap() - 20.0).abs() < 0.01); assert!((row.get(3).as_f64().unwrap() - 10.0).abs() < 0.01); assert!((row.get(4).as_f64().unwrap() - 30.0).abs() < 0.01); }