use serde::Serialize;
use streamling_e2e::{init_tracing, PipelineOpts, TestContext, TestContextOptions};
#[derive(Debug, Clone, Serialize)]
struct TestRecord {
block: i64,
id: String,
data: String,
}
const TEST_SCHEMA: &str = r#"{
"type": "record",
"name": "TestMessage",
"fields": [
{"name": "block", "type": "long"},
{"name": "id", "type": "string"},
{"name": "data", "type": "string"}
]
}"#;
fn create_test_records(count: usize) -> Vec<TestRecord> {
(1..=count)
.map(|i| TestRecord {
block: i as i64,
id: format!("id_{}", i),
data: format!("data{}", i),
})
.collect()
}
#[tokio::test]
async fn test_sql_transform_propagates_gs_op_when_missing() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records = create_test_records(10);
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
primary_key: id
transforms:
sql_transform:
type: sql
sql: "SELECT id, data FROM kafka_source"
primary_key: id
sinks:
print_sink:
type: print
from: sql_transform
sample_every: 1
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_with_capture(&pipeline, PipelineOpts::new().record_limit(10))
.await
.expect("Pipeline should complete successfully");
assert!(
output.has_column("_gs_op"),
"_gs_op should be present in output schema even when not explicitly selected. Got columns: {:?}",
output.column_names()
);
assert_eq!(output.len(), 10, "Should have processed 10 records");
}
#[tokio::test]
async fn test_sql_transform_preserves_existing_gs_op() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records = create_test_records(10);
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
primary_key: id
transforms:
sql_transform:
type: sql
sql: "SELECT id, data, _gs_op FROM kafka_source"
primary_key: id
sinks:
print_sink:
type: print
from: sql_transform
sample_every: 1
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_with_capture(&pipeline, PipelineOpts::new().record_limit(10))
.await
.expect("Pipeline should complete successfully");
let columns = output.column_names();
let gs_op_count = columns.iter().filter(|c| *c == "_gs_op").count();
assert_eq!(
gs_op_count, 1,
"_gs_op should appear exactly once in schema"
);
}
#[tokio::test]
async fn test_sql_union_propagates_gs_op_when_missing() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records = create_test_records(10);
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
primary_key: id
transforms:
sql_transform:
type: sql
sql: "SELECT id, data FROM kafka_source UNION ALL SELECT id, data FROM kafka_source"
primary_key: id
sinks:
print_sink:
type: print
from: sql_transform
sample_every: 1
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_with_capture(&pipeline, PipelineOpts::new().record_limit(20)) .await
.expect("Pipeline should complete successfully");
assert!(
output.has_column("_gs_op"),
"_gs_op should be present in output schema for UNION"
);
assert_eq!(output.len(), 20, "UNION ALL should produce 20 records");
}
#[tokio::test]
async fn test_sql_union_preserves_existing_gs_op() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records = create_test_records(10);
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
primary_key: id
transforms:
sql_transform:
type: sql
sql: "SELECT id, data, _gs_op FROM kafka_source UNION ALL SELECT id, data, _gs_op FROM kafka_source"
primary_key: id
sinks:
print_sink:
type: print
from: sql_transform
sample_every: 1
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_with_capture(&pipeline, PipelineOpts::new().record_limit(20))
.await
.expect("Pipeline should complete successfully");
let columns = output.column_names();
let gs_op_count = columns.iter().filter(|c| *c == "_gs_op").count();
assert_eq!(
gs_op_count, 1,
"_gs_op should appear exactly once in schema for UNION"
);
}
#[tokio::test]
async fn test_sql_transform_uses_flink_string_functions() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records = create_test_records(10);
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
primary_key: id
transforms:
sql_transform:
type: sql
sql: |
SELECT
id,
data,
_gs_op,
charLength(data) AS data_len,
TRANSLATE(data || 'a', 'a', 'z') AS translated,
REGEXP(data, '^data') AS matches_prefix
FROM kafka_source
primary_key: id
sinks:
print_sink:
type: print
from: sql_transform
sample_every: 1
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_with_capture(&pipeline, PipelineOpts::new().record_limit(10))
.await
.expect("Pipeline should complete successfully");
assert!(
output.has_column("data_len"),
"data_len column should exist"
);
assert!(
output.has_column("translated"),
"translated column should exist"
);
assert!(
output.has_column("matches_prefix"),
"matches_prefix column should exist"
);
assert!(!output.is_empty(), "Should have processed records");
for row in output.rows() {
if let Some(data) = row.data.get("data").and_then(|v| v.as_str()) {
if let Some(data_len) = row.data.get("data_len").and_then(|v| v.as_i64()) {
assert_eq!(
data_len as usize,
data.len(),
"charLength should match actual string length"
);
}
}
}
}
#[tokio::test]
async fn test_pipeline_sql_filter_diff_in_input_output_rows() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_prometheus())
.await
.expect("Failed to create test context");
let prometheus = ctx
.prometheus
.as_ref()
.expect("Prometheus should be available");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records = create_test_records(100);
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
primary_key: id
transforms:
sql_transform:
type: sql
sql: "SELECT id, data, _gs_op FROM kafka_source WHERE block % 2 = 0"
primary_key: id
sinks:
blackhole_sink:
type: blackhole
from: sql_transform
"#,
topic = ctx.kafka_topic
);
let _status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new().record_limit(50), )
.await
.expect("Pipeline should complete successfully");
use streamling_e2e::resources::PrometheusResource;
let input_query = PrometheusResource::input_rows_query("sql_transform", Some(&ctx.test_id));
let output_query = PrometheusResource::output_rows_query("sql_transform", Some(&ctx.test_id));
let input_rows = prometheus
.wait_for_metric_at_least(&input_query, 50, 10, 500)
.await;
assert!(
input_rows.is_ok(),
"Should have input rows metric for sql_transform: {:?}",
input_rows
);
let output_rows = prometheus
.wait_for_metric_at_least(&output_query, 50, 10, 500)
.await;
assert!(
output_rows.is_ok(),
"Should have output rows metric for sql_transform: {:?}",
output_rows
);
}
#[tokio::test]
async fn test_chained_sql_transforms_with_comment_apostrophes() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records = create_test_records(10);
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
primary_key: id
transforms:
step1:
type: sql
sql: |
SELECT
id,
block,
-- don't remove this comment — it has apostrophes
CONCAT('prefix_', data) AS prefixed_data
FROM kafka_source
primary_key: id
step2:
type: sql
sql: |
SELECT
id,
block,
-- it's important that this comment doesn't break parsing
prefixed_data,
block * 2 AS double_block
FROM step1
WHERE block > 0
primary_key: id
step3:
type: sql
sql: "SELECT id, double_block, prefixed_data FROM step2"
primary_key: id
sinks:
print_sink:
type: print
from: step3
sample_every: 1
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_with_capture(&pipeline, PipelineOpts::new().record_limit(10))
.await
.expect("Pipeline should complete — apostrophes in comments must not break topology sort");
assert!(
output.has_column("double_block"),
"step2's computed column should propagate through step3"
);
assert!(
output.has_column("prefixed_data"),
"step1's computed column should propagate through"
);
assert_eq!(output.len(), 10, "Should have processed all 10 records");
}