use clickhouse::Row;
use serde::{Deserialize, Serialize};
use streamling_e2e::{init_tracing, PipelineOpts, TestContext, TestContextOptions};
#[derive(Debug, Clone, Serialize)]
struct TestRecord {
id: i64,
value: String,
timestamp: i64,
}
const TEST_SCHEMA: &str = r#"{
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "id", "type": "long"},
{"name": "value", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}"#;
fn clickhouse_env(ctx: &TestContext) -> Vec<(String, String)> {
let clickhouse = ctx
.clickhouse
.as_ref()
.expect("ClickHouse should be enabled");
vec![
(
"STREAMLING__CLICKHOUSE_SINK__URL".to_string(),
ctx.config.clickhouse_url.clone(),
),
(
"STREAMLING__CLICKHOUSE_SINK__DATABASE".to_string(),
clickhouse.database.clone(),
),
(
"STREAMLING__CLICKHOUSE_SINK__USER".to_string(),
"default".to_string(),
),
(
"STREAMLING__CLICKHOUSE_SINK__PASSWORD".to_string(),
String::new(),
),
]
}
#[tokio::test]
async fn test_basic_kafka_to_clickhouse() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx
.clickhouse
.as_ref()
.expect("ClickHouse should be enabled");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=10)
.map(|i| TestRecord {
id: i,
value: format!("value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
ch_sink:
type: clickhouse
from: kafka_source
table: test_output
primary_key: id
compression: gzip
"#,
topic = ctx.kafka_topic,
);
let mut opts = PipelineOpts::new().record_limit(10);
for (k, v) in clickhouse_env(&ctx) {
opts = opts.env(&k, &v);
}
let status = ctx
.run_pipeline_with_opts(&pipeline, opts)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = clickhouse
.count("SELECT COUNT(*) FROM test_output")
.await
.expect("Failed to query count");
assert_eq!(count, 10, "Should have 10 records in output table");
}
#[tokio::test]
async fn test_multiple_batches() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx
.clickhouse
.as_ref()
.expect("ClickHouse should be enabled");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let total_records = 25;
let records: Vec<TestRecord> = (1..=total_records)
.map(|i| TestRecord {
id: i,
value: format!("value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
ch_sink:
type: clickhouse
from: kafka_source
table: test_multi_batch
primary_key: id
batch_size: 5
batch_flush_interval: 100ms
"#,
topic = ctx.kafka_topic,
);
let mut opts = PipelineOpts::new().record_limit(total_records as u64);
for (k, v) in clickhouse_env(&ctx) {
opts = opts.env(&k, &v);
}
let status = ctx
.run_pipeline_with_opts(&pipeline, opts)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = clickhouse
.count("SELECT COUNT(*) FROM test_multi_batch")
.await
.expect("Failed to query count");
assert_eq!(
count, total_records as u64,
"Should have {} records in output table",
total_records
);
}
#[derive(Debug, Clone, Serialize)]
struct TimestampRecord {
id: i64,
created_at: i64,
updated_at: i64,
value: String,
}
const TIMESTAMP_SCHEMA: &str = r#"{
"type": "record",
"name": "TimestampRecord",
"fields": [
{"name": "id", "type": "long"},
{"name": "created_at", "type": "long"},
{"name": "updated_at", "type": "long"},
{"name": "value", "type": "string"}
]
}"#;
#[tokio::test]
async fn test_schema_override() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx
.clickhouse
.as_ref()
.expect("ClickHouse should be enabled");
ctx.kafka
.register_schema(TIMESTAMP_SCHEMA)
.await
.expect("Failed to register schema");
let base_timestamp = 1700000000i64; let total_records = 5;
let records: Vec<TimestampRecord> = (0..total_records)
.map(|i| TimestampRecord {
id: i,
created_at: base_timestamp + i * 3600, updated_at: base_timestamp + i * 7200, value: format!("value_{}", i),
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
ch_sink:
type: clickhouse
from: kafka_source
table: schema_override_output
primary_key: id
schema_override:
created_at: "DateTime64(3)"
updated_at: "DateTime64(3) CODEC(Delta, ZSTD)"
"#,
topic = ctx.kafka_topic,
);
let mut opts = PipelineOpts::new().record_limit(total_records as u64);
for (k, v) in clickhouse_env(&ctx) {
opts = opts.env(&k, &v);
}
let status = ctx
.run_pipeline_with_opts(&pipeline, opts)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = clickhouse
.count("SELECT COUNT(*) FROM schema_override_output")
.await
.expect("Failed to query count");
assert_eq!(
count, total_records as u64,
"Should have {} records",
total_records
);
let columns = clickhouse
.get_column_types("schema_override_output")
.await
.expect("Failed to get column types");
let created_at_type = columns
.iter()
.find(|(name, _)| name == "created_at")
.map(|(_, t)| t.as_str());
assert!(
created_at_type == Some("DateTime64(3)"),
"created_at should be DateTime64(3), got: {:?}",
created_at_type
);
let updated_at_type = columns
.iter()
.find(|(name, _)| name == "updated_at")
.map(|(_, t)| t.as_str());
assert!(
updated_at_type == Some("DateTime64(3)"),
"updated_at should be DateTime64(3), got: {:?}",
updated_at_type
);
}
#[tokio::test]
async fn test_is_deleted_injection() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx
.clickhouse
.as_ref()
.expect("ClickHouse should be enabled");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let total_records = 10;
let records: Vec<TestRecord> = (1..=total_records)
.map(|i| TestRecord {
id: i,
value: format!("value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records_with_op(&records, "c")
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
ch_sink:
type: clickhouse
from: kafka_source
table: cdc_test_output
primary_key: id
"#,
topic = ctx.kafka_topic,
);
let mut opts = PipelineOpts::new().record_limit(total_records as u64);
for (k, v) in clickhouse_env(&ctx) {
opts = opts.env(&k, &v);
}
let status = ctx
.run_pipeline_with_opts(&pipeline, opts)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = clickhouse
.count("SELECT COUNT(*) FROM cdc_test_output")
.await
.expect("Failed to query count");
assert_eq!(
count, total_records as u64,
"Should have {} records",
total_records
);
let columns = clickhouse
.get_column_types("cdc_test_output")
.await
.expect("Failed to get column types");
let is_deleted_col = columns.iter().find(|(name, _)| name == "is_deleted");
assert!(
is_deleted_col.is_some(),
"is_deleted column should be automatically added"
);
assert_eq!(
is_deleted_col.unwrap().1,
"UInt8",
"is_deleted should be UInt8"
);
let deleted_count = clickhouse
.count("SELECT COUNT(*) FROM cdc_test_output WHERE is_deleted = 1")
.await
.expect("Failed to query deleted count");
assert_eq!(deleted_count, 0, "All records should have is_deleted = 0");
}
#[tokio::test]
async fn test_deduplication() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx
.clickhouse
.as_ref()
.expect("ClickHouse should be enabled");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records = vec![
TestRecord {
id: 1,
value: "first_1".to_string(),
timestamp: 100,
},
TestRecord {
id: 2,
value: "first_2".to_string(),
timestamp: 200,
},
TestRecord {
id: 1,
value: "updated_1".to_string(),
timestamp: 300,
}, TestRecord {
id: 3,
value: "first_3".to_string(),
timestamp: 400,
},
];
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
ch_sink:
type: clickhouse
from: kafka_source
table: dedup_test_output
primary_key: id
"#,
topic = ctx.kafka_topic,
);
let mut opts = PipelineOpts::new().record_limit(2); for (k, v) in clickhouse_env(&ctx) {
opts = opts.env(&k, &v);
}
let status = ctx
.run_pipeline_with_opts(&pipeline, opts)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = clickhouse
.count("SELECT COUNT(*) FROM dedup_test_output FINAL")
.await
.expect("Failed to query count");
assert_eq!(count, 3, "Should have 3 unique records after deduplication");
#[derive(Row, Deserialize)]
struct ResultRow {
value: String,
}
let rows: Vec<ResultRow> = clickhouse
.query("SELECT value FROM dedup_test_output WHERE id = 1")
.await
.expect("Failed to query value");
assert_eq!(rows.len(), 1, "Should have exactly one row for id=1");
assert_eq!(
rows[0].value, "updated_1",
"id=1 should have updated value after deduplication"
);
}
#[tokio::test]
async fn test_delete_operations() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx
.clickhouse
.as_ref()
.expect("ClickHouse should be enabled");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let initial_records = vec![
TestRecord {
id: 1,
value: "value_1".to_string(),
timestamp: 100,
},
TestRecord {
id: 2,
value: "value_2".to_string(),
timestamp: 200,
},
TestRecord {
id: 3,
value: "value_3".to_string(),
timestamp: 300,
},
];
ctx.kafka
.produce_avro_records_with_op(&initial_records, "c")
.await
.expect("Failed to produce initial records");
let delete_records = vec![
TestRecord {
id: 1,
value: String::new(),
timestamp: 0,
},
TestRecord {
id: 2,
value: String::new(),
timestamp: 0,
},
];
ctx.kafka
.produce_avro_records_with_op(&delete_records, "d")
.await
.expect("Failed to produce delete records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
ch_sink:
type: clickhouse
from: kafka_source
table: delete_test_output
primary_key: id
"#,
topic = ctx.kafka_topic,
);
let mut opts = PipelineOpts::new().record_limit(initial_records.len() as u64);
for (k, v) in clickhouse_env(&ctx) {
opts = opts.env(&k, &v);
}
let status = ctx
.run_pipeline_with_opts(&pipeline, opts)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let active_count = clickhouse
.count("SELECT COUNT(*) FROM delete_test_output FINAL WHERE is_deleted = 0")
.await
.expect("Failed to query active count");
assert_eq!(
active_count, 1,
"Should have 1 active record (id=3) after deletes"
);
let deleted_count = clickhouse
.count("SELECT COUNT(*) FROM delete_test_output FINAL WHERE is_deleted = 1")
.await
.expect("Failed to query deleted count");
assert_eq!(
deleted_count, 0,
"Deleted records should be cleaned up by ReplacingMergeTree"
);
}
#[tokio::test]
async fn test_append_only_mode_false() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx
.clickhouse
.as_ref()
.expect("ClickHouse should be enabled");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let insert_records = vec![
TestRecord {
id: 1,
value: "value_1".to_string(),
timestamp: 100,
},
TestRecord {
id: 2,
value: "value_2".to_string(),
timestamp: 200,
},
TestRecord {
id: 3,
value: "value_3".to_string(),
timestamp: 300,
},
];
ctx.kafka
.produce_avro_records_with_op(&insert_records, "c")
.await
.expect("Failed to produce insert records");
let delete_records = vec![TestRecord {
id: 2,
value: String::new(),
timestamp: 0,
}];
ctx.kafka
.produce_avro_records_with_op(&delete_records, "d")
.await
.expect("Failed to produce delete records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
ch_sink:
type: clickhouse
from: kafka_source
table: append_only_false_test
primary_key: id
append_only_mode: false
"#,
topic = ctx.kafka_topic,
);
let mut opts = PipelineOpts::new().record_limit(insert_records.len() as u64);
for (k, v) in clickhouse_env(&ctx) {
opts = opts.env(&k, &v);
}
let status = ctx
.run_pipeline_with_opts(&pipeline, opts)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let columns = clickhouse
.get_column_types("append_only_false_test")
.await
.expect("Failed to get column types");
let has_is_deleted = columns.iter().any(|(name, _)| name == "is_deleted");
assert!(
!has_is_deleted,
"append_only_mode: false should NOT create an is_deleted column"
);
let has_insert_time = columns.iter().any(|(name, _)| name == "insert_time");
assert!(
!has_insert_time,
"append_only_mode: false should NOT create an insert_time column"
);
let total_count = clickhouse
.count("SELECT COUNT(*) FROM append_only_false_test")
.await
.expect("Failed to query total count");
assert_eq!(
total_count, 2,
"Should have 2 records after DELETE removed id=2"
);
#[derive(Row, Deserialize)]
struct IdRow {
id: i64,
}
let mut rows: Vec<IdRow> = clickhouse
.query("SELECT id FROM append_only_false_test ORDER BY id")
.await
.expect("Failed to query rows");
let ids: Vec<i64> = rows.drain(..).map(|r| r.id).collect();
assert_eq!(ids, vec![1, 3], "Remaining records should be id=1 and id=3");
}
#[tokio::test]
async fn test_append_only_mode_false_with_updates() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx
.clickhouse
.as_ref()
.expect("ClickHouse should be enabled");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let insert_records = vec![
TestRecord {
id: 1,
value: "original".to_string(),
timestamp: 100,
},
TestRecord {
id: 2,
value: "value_2".to_string(),
timestamp: 200,
},
];
ctx.kafka
.produce_avro_records_with_op(&insert_records, "c")
.await
.expect("Failed to produce insert records");
let update_records = vec![TestRecord {
id: 1,
value: "updated".to_string(),
timestamp: 300,
}];
ctx.kafka
.produce_avro_records_with_op(&update_records, "u")
.await
.expect("Failed to produce update records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
ch_sink:
type: clickhouse
from: kafka_source
table: append_only_false_update_test
primary_key: id
append_only_mode: false
"#,
topic = ctx.kafka_topic,
);
let mut opts = PipelineOpts::new().record_limit(insert_records.len() as u64);
for (k, v) in clickhouse_env(&ctx) {
opts = opts.env(&k, &v);
}
let status = ctx
.run_pipeline_with_opts(&pipeline, opts)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = clickhouse
.count("SELECT COUNT(*) FROM append_only_false_update_test FINAL")
.await
.expect("Failed to query count");
assert_eq!(count, 2, "Should have 2 unique records");
#[derive(Row, Deserialize)]
struct ValueRow {
value: String,
}
let rows: Vec<ValueRow> = clickhouse
.query("SELECT value FROM append_only_false_update_test FINAL WHERE id = 1")
.await
.expect("Failed to query value");
assert_eq!(rows.len(), 1);
assert_eq!(
rows[0].value, "updated",
"id=1 should have the updated value"
);
}