use serde::Serialize;
use streamling_e2e::{init_tracing, PipelineOpts, TestContext, TestContextOptions};
#[derive(Debug, Clone, Serialize)]
struct TestRecord {
block: i64,
id: String,
data: String,
timestamp: i64,
}
const TEST_SCHEMA: &str = r#"{"type":"record","name":"TestMessage","fields":[
{"name":"block","type":"long"},
{"name":"id","type":"string"},
{"name":"data","type":"string"},
{"name":"timestamp","type":"long"}
]}"#;
#[tokio::test]
async fn test_hybrid_clickhouse_to_kafka() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx.clickhouse.as_ref().expect("ClickHouse not initialized");
clickhouse
.execute(
"CREATE TABLE hybrid_source_test (
block Int64,
id String,
data String,
timestamp Int64,
is_deleted UInt8
) ENGINE = MergeTree()
ORDER BY (block, id)",
)
.await
.expect("Failed to create ClickHouse table");
clickhouse
.execute(
"INSERT INTO hybrid_source_test VALUES
(1, 'Alice', 'A', 0, 0),
(2, 'Bob', 'B', 0, 0),
(3, 'Charlie', 'C', 0, 0)",
)
.await
.expect("Failed to insert ClickHouse data");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let kafka_records: Vec<TestRecord> = (1..=3)
.map(|i| TestRecord {
block: 100 + i,
id: format!("kafka_user_{}", i),
data: format!("kafka_data_{}", i),
timestamp: 2000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&kafka_records)
.await
.expect("Failed to produce Kafka records");
clickhouse
.execute(
"CREATE TABLE kafka_offsets (
topic String,
partition Int32,
offset UInt32
) ENGINE = MergeTree()
ORDER BY (topic, partition)",
)
.await
.expect("Failed to create offset table");
let pipeline = format!(
r#"
sources:
hybrid_source:
type: hybrid
bounded_sources:
- source_type: clickhouse
table_name: hybrid_source_test
columns: block,id,data,timestamp
unbounded_source:
source_type: kafka
topic: {kafka_topic}
start_at: earliest
offset_table:
topic_name: {kafka_topic}
table_name: kafka_offsets
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: hybrid_source
table: hybrid_results
schema: public
primary_key: id
on_conflict: update
"#,
kafka_topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(6) .timeout(std::time::Duration::from_secs(120)),
)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.hybrid_results")
.await
.expect("Failed to query count");
assert!(
count >= 3,
"Should have at least 3 records from ClickHouse, got {}",
count
);
let ch_data: Vec<(String,)> = ctx
.postgres
.query(
"SELECT id FROM public.hybrid_results WHERE id IN ('Alice', 'Bob', 'Charlie') ORDER BY id",
)
.await
.expect("Failed to query ClickHouse data");
assert_eq!(
ch_data.len(),
3,
"Should have 3 ClickHouse records (Alice, Bob, Charlie)"
);
let kafka_data: Vec<(String,)> = ctx
.postgres
.query("SELECT id FROM public.hybrid_results WHERE id LIKE 'kafka_user_%' ORDER BY id")
.await
.expect("Failed to query Kafka data");
assert_eq!(
kafka_data.len(),
3,
"Should have 3 Kafka records (kafka_user_1, kafka_user_2, kafka_user_3)"
);
}
#[tokio::test]
async fn test_hybrid_source_with_filters() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx.clickhouse.as_ref().expect("ClickHouse not initialized");
clickhouse
.execute(
"CREATE TABLE hybrid_filter_test_ch (
block Int64,
id String,
data String,
timestamp Int64,
is_deleted UInt8
) ENGINE = MergeTree()
ORDER BY (block, id)",
)
.await
.expect("Failed to create ClickHouse table");
let mut insert_values = Vec::new();
for i in 0..30 {
insert_values.push(format!(
"(0, 'ch_id_{}', 'ch_data_{}', {}, 0)",
i,
i,
1700000000 + i
));
}
for i in 30..50 {
insert_values.push(format!(
"(1, 'ch_id_{}', 'ch_data_{}', {}, 0)",
i,
i,
1700000000 + i
));
}
let insert_query = format!(
"INSERT INTO hybrid_filter_test_ch (block, id, data, timestamp, is_deleted) VALUES {}",
insert_values.join(", ")
);
clickhouse
.execute(&insert_query)
.await
.expect("Failed to insert ClickHouse data");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let kafka_records: Vec<TestRecord> = (1..=50)
.map(|i| TestRecord {
block: i,
id: format!("kafka_id_{}", i),
data: format!("kafka_data_{}", i),
timestamp: 1700000000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&kafka_records)
.await
.expect("Failed to produce Kafka records");
clickhouse
.execute(
"CREATE TABLE kafka_offsets_filter (
topic String,
partition Int32,
offset UInt32
) ENGINE = MergeTree()
ORDER BY (topic, partition)",
)
.await
.expect("Failed to create offset table");
let pipeline = format!(
r#"
sources:
hybrid_filtered_source:
type: hybrid
bounded_sources:
- source_type: clickhouse
table_name: hybrid_filter_test_ch
columns: block,id,data,timestamp
filter: "block = 1"
unbounded_source:
source_type: kafka
topic: {kafka_topic}
start_at: earliest
filter: "block > 25"
offset_table:
topic_name: {kafka_topic}
table_name: kafka_offsets_filter
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: hybrid_filtered_source
table: hybrid_filter_results
schema: public
primary_key: id
on_conflict: update
"#,
kafka_topic = ctx.kafka_topic,
);
let expected_total = 45u64;
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(expected_total)
.timeout(std::time::Duration::from_secs(120)),
)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.hybrid_filter_results")
.await
.expect("Failed to query count");
assert_eq!(
count, expected_total as i64,
"Should have {} records total (20 ClickHouse + 25 Kafka), got {}",
expected_total, count
);
let ch_count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.hybrid_filter_results WHERE id LIKE 'ch_id_%'")
.await
.expect("Failed to query ClickHouse count");
assert_eq!(
ch_count, 20,
"Should have 20 ClickHouse records with block=1, got {}",
ch_count
);
let kafka_count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.hybrid_filter_results WHERE id LIKE 'kafka_id_%'")
.await
.expect("Failed to query Kafka count");
assert_eq!(
kafka_count, 25,
"Should have 25 Kafka records with block > 25, got {}",
kafka_count
);
}
#[tokio::test]
async fn test_hybrid_source_job_mode_terminates() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx.clickhouse.as_ref().expect("ClickHouse not initialized");
clickhouse
.execute(
"CREATE TABLE hybrid_job_mode_test (
block Int64,
id String,
data String,
timestamp Int64,
is_deleted UInt8
) ENGINE = MergeTree()
ORDER BY (block, id)",
)
.await
.expect("Failed to create ClickHouse table");
clickhouse
.execute(
"INSERT INTO hybrid_job_mode_test VALUES
(1, 'ch_1', 'bounded_A', 100, 0),
(2, 'ch_2', 'bounded_B', 200, 0),
(3, 'ch_3', 'bounded_C', 300, 0)",
)
.await
.expect("Failed to insert ClickHouse data");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let kafka_records: Vec<TestRecord> = (1..=3)
.map(|i| TestRecord {
block: 100 + i,
id: format!("kafka_{}", i),
data: format!("unbounded_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&kafka_records)
.await
.expect("Failed to produce Kafka records");
clickhouse
.execute(
"CREATE TABLE kafka_offsets_job_mode (
topic String,
partition Int32,
offset UInt32
) ENGINE = MergeTree()
ORDER BY (topic, partition)",
)
.await
.expect("Failed to create offset table");
let pipeline = format!(
r#"
sources:
hybrid_source:
type: hybrid
bounded_sources:
- source_type: clickhouse
table_name: hybrid_job_mode_test
columns: block,id,data,timestamp
unbounded_source:
source_type: kafka
topic: {kafka_topic}
start_at: earliest
offset_table:
topic_name: {kafka_topic}
table_name: kafka_offsets_job_mode
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: hybrid_source
table: hybrid_job_mode_results
schema: public
primary_key: id
on_conflict: update
batch_size: 1
"#,
kafka_topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.env("STREAMLING__JOB_MODE", "true")
.env("STREAMLING__RECORD_BATCH_SIZE", "1")
.timeout(std::time::Duration::from_secs(120)),
)
.await
.expect("Pipeline execution failed");
assert!(
status.success(),
"Job mode pipeline should terminate successfully after bounded phase"
);
let total_count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.hybrid_job_mode_results")
.await
.expect("Failed to query count");
assert_eq!(
total_count, 3,
"Should have exactly 3 records from bounded phase, got {}",
total_count
);
let ch_records: Vec<(String,)> = ctx
.postgres
.query("SELECT id FROM public.hybrid_job_mode_results WHERE id LIKE 'ch_%' ORDER BY id")
.await
.expect("Failed to query ClickHouse records");
assert_eq!(
ch_records.len(),
3,
"Should have 3 ClickHouse records, got {}",
ch_records.len()
);
let kafka_count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.hybrid_job_mode_results WHERE id LIKE 'kafka_%'")
.await
.expect("Failed to query Kafka count");
assert_eq!(
kafka_count, 0,
"Should have 0 Kafka records (unbounded phase skipped in job mode), got {}",
kafka_count
);
}
#[tokio::test]
async fn test_hybrid_clickhouse_resume_from_high_saved_split() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx.clickhouse.as_ref().expect("ClickHouse not initialized");
clickhouse
.execute(
"CREATE TABLE hybrid_resume_test (
block Int64,
id String,
data String,
timestamp Int64,
is_deleted UInt8
) ENGINE = MergeTree()
ORDER BY (block, id)",
)
.await
.expect("Failed to create ClickHouse table");
let mut values: Vec<String> = (0..50)
.map(|i| {
format!(
"({}, 'low_{}', 'low_data_{}', {}, 0)",
i,
i,
i,
1_700_000_000 + i
)
})
.collect();
let high_start = 5_000_000i64;
let high_rows = 20_000i64;
values.extend((0..high_rows).map(|i| {
let block = high_start + i;
format!(
"({}, 'high_{}', 'high_data_{}', {}, 0)",
block,
i,
i,
1_800_000_000 + i
)
}));
for chunk in values.chunks(200) {
clickhouse
.execute(&format!(
"INSERT INTO hybrid_resume_test (block, id, data, timestamp, is_deleted) VALUES {}",
chunk.join(", ")
))
.await
.expect("Failed to insert ClickHouse data");
}
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
clickhouse
.execute(
"CREATE TABLE kafka_offsets_hybrid_resume (
topic String,
partition Int32,
offset UInt32
) ENGINE = MergeTree()
ORDER BY (topic, partition)",
)
.await
.expect("Failed to create offset table");
let state_table = format!("hybrid_resume_{}", ctx.test_id.replace("-", "_"));
let application_id = format!("hybrid_resume_{}", ctx.test_id);
let checkpoint_interval_sec =
std::env::var("STREAMLING_E2E_HYBRID_RESUME_CHECKPOINT_INTERVAL_SEC")
.unwrap_or_else(|_| "1".to_string());
let expected_high_rows = high_rows as u64;
let ready_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
while tokio::time::Instant::now() < ready_deadline {
let visible_rows = clickhouse
.count("SELECT COUNT(*) FROM hybrid_resume_test WHERE id LIKE 'high_%'")
.await
.expect("Failed to query ClickHouse visibility for run 1");
if visible_rows >= expected_high_rows {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
}
let pipeline_run1 = format!(
r#"
sources:
hybrid_source:
type: hybrid
bounded_sources:
- source_type: clickhouse
table_name: hybrid_resume_test
columns: block,id,data,timestamp
start_at: "{high_start}"
unbounded_source:
source_type: kafka
topic: {kafka_topic}
start_at: earliest
offset_table:
topic_name: {kafka_topic}
table_name: kafka_offsets_hybrid_resume
primary_key: id
transforms: {{}}
sinks:
blackhole_sink:
type: blackhole
from: hybrid_source
"#,
high_start = high_start,
kafka_topic = ctx.kafka_topic
);
let status_1 = ctx
.run_pipeline_with_opts(
&pipeline_run1,
PipelineOpts::new()
.record_limit(800)
.timeout(std::time::Duration::from_secs(240))
.env("STREAMLING__APPLICATION_ID", &application_id)
.env("STREAMLING__STATE_BACKEND__BACKEND_TYPE", "Postgres")
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__HOST",
&ctx.postgres.host,
)
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__PORT",
ctx.postgres.port.to_string(),
)
.env("STREAMLING__STATE_BACKEND__POSTGRES__USER", "postgres")
.env("STREAMLING__STATE_BACKEND__POSTGRES__PASSWORD", "postgres")
.env("STREAMLING__STATE_BACKEND__POSTGRES__DB", &ctx.pg_database)
.env("STREAMLING__STATE_BACKEND__POSTGRES__SSLMODE", "disable")
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__STATE_TABLE_NAME",
&state_table,
)
.env(
"STREAMLING__CHECKPOINT_INTERVAL_SEC",
&checkpoint_interval_sec,
)
.env("STREAMLING__RECORD_BATCH_SIZE", "1")
.env("STREAMLING__CLICKHOUSE_SOURCE__PAGE_SIZE", "1")
.env("STREAMLING__CLICKHOUSE_SOURCE__BLOCK_RANGE", "100")
.env("STREAMLING__JOB_MODE", "true"),
)
.await
.expect("Pipeline run 1 failed");
assert!(status_1.success(), "Pipeline run 1 should succeed");
let poll_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(60);
let mut state_keys: Vec<String> = Vec::new();
let mut has_clickhouse_split = false;
while tokio::time::Instant::now() < poll_deadline {
let rows: Vec<(String,)> = ctx
.postgres
.query(&format!(
"SELECT \"key\" FROM streamling.\"{}\" ORDER BY \"key\"",
state_table
))
.await
.expect("Failed to query state keys");
state_keys = rows.into_iter().map(|row| row.0).collect();
has_clickhouse_split = state_keys
.iter()
.any(|k| k.starts_with("clickhouse_source:"));
if has_clickhouse_split {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
assert!(
has_clickhouse_split,
"Run 1 did not persist clickhouse split key within timeout; state keys={:?}",
state_keys
);
let run1_offset_count = clickhouse
.count("SELECT COUNT(*) FROM kafka_offsets_hybrid_resume")
.await
.expect("Failed to query Kafka offset table after run 1");
assert_eq!(
run1_offset_count, 0,
"Run 1 should stay in bounded stage and avoid unbounded Kafka offsets writes"
);
ctx.postgres
.execute(&format!(
"DELETE FROM streamling.\"{}\" WHERE \"key\" = 'hybrid_source_hybrid_source_v1'",
state_table
))
.await
.expect("Failed to clear hybrid phase-state key");
let pipeline_run2 = format!(
r#"
sources:
hybrid_source:
type: hybrid
bounded_sources:
- source_type: clickhouse
table_name: hybrid_resume_test
columns: block,id,data,timestamp
unbounded_source:
source_type: kafka
topic: {kafka_topic}
start_at: earliest
offset_table:
topic_name: {kafka_topic}
table_name: kafka_offsets_hybrid_resume
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: hybrid_source
table: hybrid_resume_run2
schema: public
primary_key: id
on_conflict: update
batch_size: 1
"#,
kafka_topic = ctx.kafka_topic
);
let status_2 = ctx
.run_pipeline_with_opts(
&pipeline_run2,
PipelineOpts::new()
.record_limit(40)
.timeout(std::time::Duration::from_secs(120))
.env("STREAMLING__APPLICATION_ID", &application_id)
.env("STREAMLING__STATE_BACKEND__BACKEND_TYPE", "Postgres")
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__HOST",
&ctx.postgres.host,
)
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__PORT",
ctx.postgres.port.to_string(),
)
.env("STREAMLING__STATE_BACKEND__POSTGRES__USER", "postgres")
.env("STREAMLING__STATE_BACKEND__POSTGRES__PASSWORD", "postgres")
.env("STREAMLING__STATE_BACKEND__POSTGRES__DB", &ctx.pg_database)
.env("STREAMLING__STATE_BACKEND__POSTGRES__SSLMODE", "disable")
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__STATE_TABLE_NAME",
&state_table,
)
.env("STREAMLING__CHECKPOINT_INTERVAL_SEC", "1")
.env("STREAMLING__RECORD_BATCH_SIZE", "1")
.env("STREAMLING__CLICKHOUSE_SOURCE__PAGE_SIZE", "20")
.env("STREAMLING__CLICKHOUSE_SOURCE__BLOCK_RANGE", "100")
.env("STREAMLING__JOB_MODE", "true"),
)
.await
.expect("Pipeline run 2 failed");
assert!(status_2.success(), "Pipeline run 2 should succeed");
let count_run2 = ctx
.postgres
.count("SELECT COUNT(*) FROM public.hybrid_resume_run2")
.await
.expect("Failed to query run 2 count");
assert!(count_run2 > 0, "Run 2 should process resumed rows");
let high_count_run2 = ctx
.postgres
.count("SELECT COUNT(*) FROM public.hybrid_resume_run2 WHERE id LIKE 'high_%'")
.await
.expect("Failed to query high_* count for run 2");
assert!(
high_count_run2 > 0,
"Run 2 should include bounded high_* rows from resumed ClickHouse scan, got {}",
high_count_run2
);
let low_count_run2 = ctx
.postgres
.count("SELECT COUNT(*) FROM public.hybrid_resume_run2 WHERE id LIKE 'low_%'")
.await
.expect("Failed to query low_* count for run 2");
assert_eq!(
low_count_run2, 0,
"Run 2 should not restart from low_* rows, got {} low_* rows",
low_count_run2
);
let min_high_block_run2: Vec<(i64,)> = ctx
.postgres
.query("SELECT MIN(block) FROM public.hybrid_resume_run2 WHERE id LIKE 'high_%'")
.await
.expect("Failed to query min high block for run 2");
let min_high_block_run2 = min_high_block_run2[0].0;
assert!(
min_high_block_run2 >= high_start,
"Run 2 high_* rows should remain in high range (min_high_block={}, high_start={})",
min_high_block_run2,
high_start
);
}
#[tokio::test]
async fn test_job_mode_validation_rejects_non_hybrid() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (0..3)
.map(|i| TestRecord {
block: i,
id: format!("id_{}", i),
data: format!("data_{}", i),
timestamp: i * 100,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
my_kafka_source:
type: kafka
topic: {topic}
primary_key: id
transforms: {{}}
sinks:
blackhole_sink:
type: blackhole
from: my_kafka_source
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_raw(
&pipeline,
PipelineOpts::new()
.env("STREAMLING__JOB_MODE", "true")
.timeout(std::time::Duration::from_secs(30)),
)
.await
.expect("Failed to run pipeline");
assert!(
!output.status.success(),
"Pipeline should fail when job_mode is enabled without hybrid sources"
);
let combined_output = format!("{}\n{}", output.stdout, output.stderr);
assert!(
combined_output.contains("job_mode")
&& combined_output.contains("do not support it")
&& combined_output.contains("'my_kafka_source'"),
"Error should mention job_mode, unsupported source, and the source name, got:\n{}",
combined_output
);
}
#[tokio::test]
async fn test_hybrid_source_checkpoints_after_unbounded_transition() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx.clickhouse.as_ref().expect("ClickHouse not initialized");
clickhouse
.execute(
"CREATE TABLE hybrid_unbounded_ckpt_test (
block Int64,
id String,
data String,
timestamp Int64,
is_deleted UInt8
) ENGINE = MergeTree()
ORDER BY (block, id)",
)
.await
.expect("Failed to create ClickHouse table");
clickhouse
.execute(
"INSERT INTO hybrid_unbounded_ckpt_test VALUES
(1, 'Alice', 'A', 0, 0),
(2, 'Bob', 'B', 0, 0),
(3, 'Charlie', 'C', 0, 0)",
)
.await
.expect("Failed to insert ClickHouse data");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let bootstrap: Vec<TestRecord> = (1..=5)
.map(|i| TestRecord {
block: 100 + i,
id: format!("bootstrap_{}", i),
data: format!("boot_data_{}", i),
timestamp: 2000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&bootstrap)
.await
.expect("Failed to produce bootstrap Kafka records");
clickhouse
.execute(
"CREATE TABLE kafka_offsets_unbounded_ckpt (
topic String,
partition Int32,
offset UInt32
) ENGINE = MergeTree()
ORDER BY (topic, partition)",
)
.await
.expect("Failed to create offset table");
let state_table = format!("hybrid_unbounded_ckpt_{}", ctx.test_id.replace('-', "_"));
let application_id = format!("hybrid_unbounded_ckpt_{}", ctx.test_id);
let pipeline = format!(
r#"
sources:
hybrid_source:
type: hybrid
bounded_sources:
- source_type: clickhouse
table_name: hybrid_unbounded_ckpt_test
columns: block,id,data,timestamp
unbounded_source:
source_type: kafka
topic: {kafka_topic}
start_at: earliest
offset_table:
topic_name: {kafka_topic}
table_name: kafka_offsets_unbounded_ckpt
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: hybrid_source
table: hybrid_unbounded_ckpt_results
schema: public
primary_key: id
on_conflict: update
batch_size: 10
batch_flush_interval: 100ms
"#,
kafka_topic = ctx.kafka_topic,
);
let opts = PipelineOpts::new()
.record_limit(100)
.timeout(std::time::Duration::from_secs(90))
.env("STREAMLING__APPLICATION_ID", &application_id)
.env("STREAMLING__STATE_BACKEND__BACKEND_TYPE", "Postgres")
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__HOST",
&ctx.postgres.host,
)
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__PORT",
ctx.postgres.port.to_string(),
)
.env("STREAMLING__STATE_BACKEND__POSTGRES__USER", "postgres")
.env("STREAMLING__STATE_BACKEND__POSTGRES__PASSWORD", "postgres")
.env("STREAMLING__STATE_BACKEND__POSTGRES__DB", &ctx.pg_database)
.env("STREAMLING__STATE_BACKEND__POSTGRES__SSLMODE", "disable")
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__STATE_TABLE_NAME",
&state_table,
)
.env("STREAMLING__CHECKPOINT_INTERVAL_SEC", "1")
.env("STREAMLING__RECORD_BATCH_SIZE", "10");
let pipeline_fut = ctx.run_pipeline_with_opts(&pipeline, opts);
let sample_window_secs: u64 =
std::env::var("STREAMLING_E2E_HYBRID_UNBOUNDED_SAMPLE_WINDOW_SEC")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(10);
let producer_window_secs = sample_window_secs + 15;
let producer_fut = async {
let producer_deadline =
tokio::time::Instant::now() + std::time::Duration::from_secs(producer_window_secs);
let mut next_id: u64 = 1;
while tokio::time::Instant::now() < producer_deadline {
let rec = TestRecord {
block: 200 + next_id as i64,
id: format!("live_user_{}", next_id),
data: format!("live_data_{}", next_id),
timestamp: 3000 + next_id as i64,
};
if let Err(e) = ctx.kafka.produce_avro_records(&[rec]).await {
tracing::warn!("Producer side task error (non-fatal): {}", e);
break;
}
next_id += 1;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
};
let observer_fut = async {
let key_query = format!(
"SELECT key FROM streamling.\"{}\" \
WHERE key LIKE 'hybrid_source:%' \
ORDER BY key",
state_table
);
let transition_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(90);
let mut kafka_key: Option<String> = None;
while tokio::time::Instant::now() < transition_deadline {
if let Ok(rows) = ctx.postgres.query::<(String,)>(&key_query).await {
if let Some(first) = rows.first() {
kafka_key = Some(first.0.clone());
break;
}
}
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
}
let kafka_key = match kafka_key {
Some(k) => k,
None => {
let all_keys_query = format!(
"SELECT key FROM streamling.\"{}\" ORDER BY key",
state_table
);
let all_keys = ctx
.postgres
.query::<(String,)>(&all_keys_query)
.await
.unwrap_or_default();
panic!(
"Hybrid pipeline did not transition to unbounded phase within 90s; \
no key matching 'hybrid_source:%' in state table '{}'. \
Observed keys: {:?}",
state_table,
all_keys.iter().map(|r| &r.0).collect::<Vec<_>>()
);
}
};
tracing::info!("Observed kafka source state key: {}", kafka_key);
let kafka_key_sql_escaped = kafka_key.replace('\'', "''");
let data_query = format!(
"SELECT data::text FROM streamling.\"{}\" WHERE key = '{}'",
state_table, kafka_key_sql_escaped
);
let min_change_events: usize =
std::env::var("STREAMLING_E2E_HYBRID_UNBOUNDED_MIN_CHANGE_EVENTS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5);
let sample_start = tokio::time::Instant::now();
let sample_window = std::time::Duration::from_secs(sample_window_secs);
let sample_interval = std::time::Duration::from_millis(250);
let mut samples: Vec<(u64, String)> = Vec::new();
while sample_start.elapsed() < sample_window {
if let Ok(rows) = ctx.postgres.query::<(String,)>(&data_query).await {
if let Some(row) = rows.first() {
samples.push((sample_start.elapsed().as_millis() as u64, row.0.clone()));
}
}
tokio::time::sleep(sample_interval).await;
}
assert!(
!samples.is_empty(),
"Observer captured zero samples for kafka source state row '{}'",
kafka_key
);
let change_offsets_ms: Vec<u64> = samples
.windows(2)
.filter_map(|w| if w[0].1 != w[1].1 { Some(w[1].0) } else { None })
.collect();
tracing::info!(
"Captured {} samples; {} change events at offsets (ms): {:?}",
samples.len(),
change_offsets_ms.len(),
change_offsets_ms
);
assert!(
change_offsets_ms.len() >= min_change_events,
"Expected >= {} checkpoint-driven state changes in the {}s window \
after the unbounded transition, got {}. samples={}, \
change_offsets_ms={:?}, final_state_jsonb_text={:?}",
min_change_events,
sample_window_secs,
change_offsets_ms.len(),
samples.len(),
change_offsets_ms,
samples.last().map(|s| &s.1)
);
let mut gaps: Vec<u64> = change_offsets_ms.windows(2).map(|w| w[1] - w[0]).collect();
assert!(
!gaps.is_empty(),
"Cannot compute checkpoint-interval gaps from < 2 change events"
);
gaps.sort_unstable();
let median_gap_ms = gaps[gaps.len() / 2];
assert!(
(500..=3000).contains(&median_gap_ms),
"Median gap between checkpoint-driven state changes was {} ms; \
expected within [500, 3000] (CHECKPOINT_INTERVAL_SEC=1). gaps_ms={:?}",
median_gap_ms,
gaps
);
};
let (pipeline_result, _, _) = tokio::join!(pipeline_fut, producer_fut, observer_fut);
let status = pipeline_result.expect("Pipeline execution failed");
assert!(status.success(), "Pipeline should complete successfully");
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.hybrid_unbounded_ckpt_results")
.await
.expect("Failed to query sink count");
assert!(
count >= 3,
"Sink should contain at least 3 bounded records, got {}",
count
);
}