use streamling_e2e::{init_tracing, PipelineOpts, TestContext, TestContextOptions};
#[tokio::test]
async fn test_clickhouse_checkpoint_state() {
init_tracing();
let ctx = TestContext::with_options(TestContextOptions::new().with_clickhouse())
.await
.expect("Failed to create test context");
let clickhouse = ctx.clickhouse.as_ref().expect("ClickHouse not initialized");
clickhouse
.execute(
"CREATE TABLE checkpoint_test (
id UInt32,
name String,
is_deleted UInt8
) ENGINE = MergeTree()
ORDER BY id",
)
.await
.expect("Failed to create ClickHouse table");
let num_records = 2000;
let batch_size = 100;
for batch_start in (0..num_records).step_by(batch_size) {
let values: Vec<String> = (batch_start..batch_start + batch_size)
.map(|i| format!("({}, 'name_{}', 0)", i, i))
.collect();
let insert_query = format!(
"INSERT INTO checkpoint_test (id, name, is_deleted) VALUES {}",
values.join(", ")
);
clickhouse
.execute(&insert_query)
.await
.expect("Failed to insert ClickHouse data");
}
let state_table = format!("checkpoint_state_{}", ctx.test_id.replace("-", "_"));
let application_id = format!("checkpoint_test_{}", ctx.test_id);
let pipeline_1 = r#"
sources:
checkpoint_source:
type: clickhouse
table_name: checkpoint_test
start_at: "5"
primary_key: id
transforms: {}
sinks:
pg_sink:
type: postgres
from: checkpoint_source
table: checkpoint_run1
schema: public
primary_key: id
on_conflict: update
batch_size: 10
batch_flush_interval: 100ms
"#;
let first_run_limit = 500u64;
let status_1 = ctx
.run_pipeline_with_opts(
pipeline_1,
PipelineOpts::new()
.record_limit(first_run_limit)
.timeout(std::time::Duration::from_secs(120))
.env("STREAMLING__APPLICATION_ID", &application_id)
.env("STREAMLING__STATE_BACKEND__BACKEND_TYPE", "Postgres")
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__HOST",
&ctx.postgres.host,
)
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__PORT",
ctx.postgres.port.to_string(),
)
.env("STREAMLING__STATE_BACKEND__POSTGRES__USER", "postgres")
.env("STREAMLING__STATE_BACKEND__POSTGRES__PASSWORD", "postgres")
.env("STREAMLING__STATE_BACKEND__POSTGRES__DB", &ctx.pg_database)
.env("STREAMLING__STATE_BACKEND__POSTGRES__SSLMODE", "disable")
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__STATE_TABLE_NAME",
&state_table,
)
.env("STREAMLING__CHECKPOINT_INTERVAL_SEC", "1")
.env("STREAMLING__RECORD_BATCH_SIZE", "10"),
)
.await
.expect("Pipeline 1 execution failed");
assert!(
status_1.success(),
"Pipeline 1 should complete successfully"
);
let count_1 = ctx
.postgres
.count("SELECT COUNT(*) FROM public.checkpoint_run1")
.await
.expect("Failed to query count");
assert!(
count_1 >= first_run_limit as i64 - 50, "Pipeline 1 should have processed ~{} records, got {}",
first_run_limit,
count_1
);
let min_id_1: Vec<(i64,)> = ctx
.postgres
.query("SELECT MIN(id) FROM public.checkpoint_run1")
.await
.expect("Failed to query min id");
assert_eq!(
min_id_1[0].0, 5,
"Pipeline 1 should start from ID 5 as specified"
);
let max_id_1: Vec<(i64,)> = ctx
.postgres
.query("SELECT MAX(id) FROM public.checkpoint_run1")
.await
.expect("Failed to query max id");
let last_processed_id = max_id_1[0].0;
tracing::info!(
"Pipeline 1 processed IDs from 5 to {}, count={}",
last_processed_id,
count_1
);
let checkpoint_count = ctx
.postgres
.count(&format!(
"SELECT COUNT(*) FROM streamling.\"{}\"",
state_table
))
.await
.expect("Failed to query checkpoint table");
tracing::info!("Checkpoint entries in state table: {}", checkpoint_count);
let pipeline_2 = r#"
sources:
checkpoint_source:
type: clickhouse
table_name: checkpoint_test
primary_key: id
transforms: {}
sinks:
pg_sink:
type: postgres
from: checkpoint_source
table: checkpoint_run2
schema: public
primary_key: id
on_conflict: update
batch_size: 10
batch_flush_interval: 100ms
"#;
let second_run_limit = 300u64;
let status_2 = ctx
.run_pipeline_with_opts(
pipeline_2,
PipelineOpts::new()
.record_limit(second_run_limit)
.timeout(std::time::Duration::from_secs(120))
.env("STREAMLING__APPLICATION_ID", &application_id)
.env("STREAMLING__STATE_BACKEND__BACKEND_TYPE", "Postgres")
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__HOST",
&ctx.postgres.host,
)
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__PORT",
ctx.postgres.port.to_string(),
)
.env("STREAMLING__STATE_BACKEND__POSTGRES__USER", "postgres")
.env("STREAMLING__STATE_BACKEND__POSTGRES__PASSWORD", "postgres")
.env("STREAMLING__STATE_BACKEND__POSTGRES__DB", &ctx.pg_database)
.env("STREAMLING__STATE_BACKEND__POSTGRES__SSLMODE", "disable")
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__STATE_TABLE_NAME",
&state_table,
)
.env("STREAMLING__CHECKPOINT_INTERVAL_SEC", "1")
.env("STREAMLING__RECORD_BATCH_SIZE", "10"),
)
.await
.expect("Pipeline 2 execution failed");
assert!(
status_2.success(),
"Pipeline 2 should complete successfully"
);
let count_2 = ctx
.postgres
.count("SELECT COUNT(*) FROM public.checkpoint_run2")
.await
.expect("Failed to query count");
assert!(
count_2 > 0,
"Pipeline 2 should have processed some records, got {}",
count_2
);
let min_id_2: Vec<(i64,)> = ctx
.postgres
.query("SELECT MIN(id) FROM public.checkpoint_run2")
.await
.expect("Failed to query min id");
tracing::info!(
"Pipeline 2 processed IDs starting from {}, count={}",
min_id_2[0].0,
count_2
);
if checkpoint_count > 0 {
assert!(
min_id_2[0].0 > 0,
"Pipeline 2 should NOT restart from ID 0 when checkpoint exists, got min_id={}",
min_id_2[0].0
);
let expected_min = 100; assert!(
min_id_2[0].0 >= expected_min,
"Pipeline 2 should resume from checkpoint (expected min >= {}, got min={})",
expected_min,
min_id_2[0].0
);
} else {
tracing::warn!(
"No checkpoints found in state table - pipeline may have completed before checkpoint interval"
);
}
}