use std::thread;
use std::time::Duration;
use merkql::broker::{Broker, BrokerConfig};
use merkql::consumer::{ConsumerConfig, OffsetReset};
use merkql::record::ProducerRecord;
use merksql::builder::*;
use merksql::runtime::persistent::{PersistentQuery, QueryStatus};
use merksql::runtime::registry::QueryRegistry;
use merksql::schema::SchemaRegistry;
use merksql::types::*;
use merksql::{ExecuteResult, MerkSql};
fn setup_broker(dir: &tempfile::TempDir) -> merkql::broker::BrokerRef {
Broker::open(BrokerConfig::new(dir.path())).unwrap()
}
fn create_registry(input_topic: &str) -> SchemaRegistry {
let mut registry = SchemaRegistry::new();
registry
.register_stream(
"readings",
Schema::new(vec![
Column::new("sensor_id", DataType::String),
Column::new("temp", DataType::Double),
Column::new("humidity", DataType::Integer),
]),
input_topic,
)
.unwrap();
registry
}
fn read_output_topic(broker: &merkql::broker::BrokerRef, topic: &str) -> Vec<String> {
let mut consumer = Broker::consumer(
broker,
ConsumerConfig {
group_id: format!("test-reader-{}", uuid::Uuid::new_v4()),
auto_commit: false,
offset_reset: OffsetReset::Earliest,
},
);
consumer.subscribe(&[topic]).unwrap();
let records = consumer.poll(Duration::from_millis(100)).unwrap();
consumer.close().unwrap();
records.into_iter().map(|r| r.value).collect()
}
#[test]
fn persistent_query_processes_records() {
let dir = tempfile::tempdir().unwrap();
let broker = setup_broker(&dir);
let input_topic = "persistent-input";
let output_topic = "persistent-output";
let producer = Broker::producer(&broker);
producer
.send(&ProducerRecord::new(
input_topic,
None,
r#"{"sensor_id": "s1", "temp": 22.5, "humidity": 45}"#,
))
.unwrap();
producer
.send(&ProducerRecord::new(
input_topic,
None,
r#"{"sensor_id": "s2", "temp": 105.3, "humidity": 80}"#,
))
.unwrap();
producer
.send(&ProducerRecord::new(
input_topic,
None,
r#"{"sensor_id": "s3", "temp": 110.0, "humidity": 90}"#,
))
.unwrap();
let registry = create_registry(input_topic);
let plan = QueryBuilder::from_source("readings")
.filter(col("temp").gt(lit_f64(100.0)))
.select(&[col("sensor_id"), col("temp")])
.as_stream("high_temps", output_topic)
.build();
let mut query = PersistentQuery::start(
"test-q1".to_string(),
plan,
output_topic.to_string(),
broker.clone(),
®istry,
)
.unwrap();
assert_eq!(query.status(), QueryStatus::Running);
thread::sleep(Duration::from_millis(500));
query.stop();
assert_ne!(query.status(), QueryStatus::Running);
let output = read_output_topic(&broker, output_topic);
assert_eq!(output.len(), 2);
}
#[test]
fn persistent_query_terminate() {
let dir = tempfile::tempdir().unwrap();
let broker = setup_broker(&dir);
let input_topic = "term-input";
let output_topic = "term-output";
let registry = create_registry(input_topic);
broker.ensure_topic(input_topic).unwrap();
let plan = QueryBuilder::from_source("readings")
.as_stream("out", output_topic)
.build();
let mut query = PersistentQuery::start(
"test-q2".to_string(),
plan,
output_topic.to_string(),
broker.clone(),
®istry,
)
.unwrap();
assert_eq!(query.status(), QueryStatus::Running);
query.terminate();
assert_eq!(query.status(), QueryStatus::Terminated);
}
#[test]
fn query_registry_lifecycle() {
let dir = tempfile::tempdir().unwrap();
let broker = setup_broker(&dir);
let input_topic = "reg-input";
let output_topic = "reg-output";
let registry = create_registry(input_topic);
broker.ensure_topic(input_topic).unwrap();
let mut query_reg = QueryRegistry::new();
let plan = QueryBuilder::from_source("readings")
.as_stream("out", output_topic)
.build();
let id = query_reg
.start_query(plan, output_topic.to_string(), &broker, ®istry)
.unwrap();
assert_eq!(id, "q1");
assert_eq!(query_reg.len(), 1);
let status = query_reg.status(&id).unwrap();
assert_eq!(status, QueryStatus::Running);
let listing = query_reg.list();
assert_eq!(listing.len(), 1);
assert_eq!(listing[0].0, "q1");
query_reg.stop(&id).unwrap();
let status = query_reg.status(&id).unwrap();
assert_ne!(status, QueryStatus::Running);
assert!(query_reg.status("nonexistent").is_none());
assert!(query_reg.stop("nonexistent").is_err());
}
#[test]
fn query_registry_multiple_queries() {
let dir = tempfile::tempdir().unwrap();
let broker = setup_broker(&dir);
let input_topic = "multi-input";
let registry = create_registry(input_topic);
broker.ensure_topic(input_topic).unwrap();
let mut query_reg = QueryRegistry::new();
let plan1 = QueryBuilder::from_source("readings")
.as_stream("out1", "output1")
.build();
let plan2 = QueryBuilder::from_source("readings")
.as_stream("out2", "output2")
.build();
let id1 = query_reg
.start_query(plan1, "output1".to_string(), &broker, ®istry)
.unwrap();
let id2 = query_reg
.start_query(plan2, "output2".to_string(), &broker, ®istry)
.unwrap();
assert_eq!(id1, "q1");
assert_eq!(id2, "q2");
assert_eq!(query_reg.len(), 2);
query_reg.stop_all();
assert_ne!(query_reg.status(&id1).unwrap(), QueryStatus::Running);
assert_ne!(query_reg.status(&id2).unwrap(), QueryStatus::Running);
}
#[test]
fn merksql_execute_ddl() {
let dir = tempfile::tempdir().unwrap();
let broker = setup_broker(&dir);
let mut engine = MerkSql::new(broker);
let result = engine.execute(
"CREATE TABLE readings (sensor_id VARCHAR, temp DOUBLE) WITH (KAFKA_TOPIC='readings-topic')"
).unwrap();
match result {
ExecuteResult::SourceCreated { name } => {
assert_eq!(name, "readings");
}
_ => panic!("Expected SourceCreated"),
}
assert!(engine.schemas.get("readings").is_some());
}
#[test]
fn merksql_execute_pull_query() {
let dir = tempfile::tempdir().unwrap();
let broker = setup_broker(&dir);
let topic = "merksql-pull";
let producer = Broker::producer(&broker);
producer
.send(&ProducerRecord::new(
topic,
None,
r#"{"sensor_id": "s1", "temp": 22.5}"#,
))
.unwrap();
producer
.send(&ProducerRecord::new(
topic,
None,
r#"{"sensor_id": "s2", "temp": 105.0}"#,
))
.unwrap();
let mut engine = MerkSql::new(broker);
engine
.execute(&format!(
"CREATE TABLE readings (sensor_id VARCHAR, temp DOUBLE) WITH (KAFKA_TOPIC='{}')",
topic
))
.unwrap();
let result = engine
.execute("SELECT sensor_id, temp FROM readings WHERE temp > 100.0")
.unwrap();
match result {
ExecuteResult::Rows { rows, schema } => {
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].get(0), &Value::String("s2".to_string()));
assert_eq!(schema.len(), 2);
}
_ => panic!("Expected Rows"),
}
}
#[test]
fn merksql_execute_persistent_query() {
let dir = tempfile::tempdir().unwrap();
let broker = setup_broker(&dir);
let input_topic = "merksql-input";
let output_topic = "merksql-output";
let producer = Broker::producer(&broker);
producer
.send(&ProducerRecord::new(
input_topic,
None,
r#"{"sensor_id": "s1", "temp": 150.0}"#,
))
.unwrap();
let mut engine = MerkSql::new(broker.clone());
engine
.execute(&format!(
"CREATE TABLE readings (sensor_id VARCHAR, temp DOUBLE) WITH (KAFKA_TOPIC='{}')",
input_topic
))
.unwrap();
let result = engine.execute(
&format!("CREATE TABLE high_temps WITH (KAFKA_TOPIC='{}') AS SELECT sensor_id, temp FROM readings WHERE temp > 100.0", output_topic)
).unwrap();
match result {
ExecuteResult::QueryStarted { id } => {
assert_eq!(id, "q1");
thread::sleep(Duration::from_millis(500));
let status = engine.queries.status(&id).unwrap();
assert_eq!(status, QueryStatus::Running);
engine.queries.stop(&id).unwrap();
}
_ => panic!("Expected QueryStarted"),
}
let output = read_output_topic(&broker, output_topic);
assert_eq!(output.len(), 1);
}
#[test]
fn merksql_builder_api() {
let dir = tempfile::tempdir().unwrap();
let broker = setup_broker(&dir);
let topic = "builder-pull";
let producer = Broker::producer(&broker);
producer
.send(&ProducerRecord::new(topic, None, r#"{"x": 10, "y": 20}"#))
.unwrap();
producer
.send(&ProducerRecord::new(topic, None, r#"{"x": 30, "y": 40}"#))
.unwrap();
let mut engine = MerkSql::new(broker);
engine
.schemas
.register_stream(
"data",
Schema::new(vec![
Column::new("x", DataType::Integer),
Column::new("y", DataType::Integer),
]),
topic,
)
.unwrap();
let plan = QueryBuilder::from_source("data")
.filter(col("x").gt(lit_i64(15)))
.select(&[col("x"), col("y")])
.build();
let result = engine.query(plan).unwrap();
match result {
ExecuteResult::Rows { rows, .. } => {
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].get(0), &Value::Integer(30));
assert_eq!(rows[0].get(1), &Value::Integer(40));
}
_ => panic!("Expected Rows"),
}
}