use serde::Serialize;
use streamling_e2e::{init_tracing, PipelineOpts, Result, TestContext, TestContextOptions};
#[derive(Debug, Clone, Serialize)]
struct TestRecord {
id: i64,
data: String,
timestamp: i64,
}
const TEST_SCHEMA: &str = r#"{
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "id", "type": "long"},
{"name": "data", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}"#;
async fn setup_with_prometheus() -> Result<TestContext> {
init_tracing();
TestContext::with_options(TestContextOptions::new().with_prometheus()).await
}
#[tokio::test]
async fn test_basic_metrics_emission() {
let ctx = match setup_with_prometheus().await {
Ok(ctx) => ctx,
Err(e) => {
eprintln!("Skipping test - could not create context: {}", e);
return;
}
};
let prometheus = match &ctx.prometheus {
Some(p) => p,
None => {
eprintln!("Skipping test - Prometheus not configured");
return;
}
};
let total_records = 25u64;
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=total_records as i64)
.map(|i| TestRecord {
id: i,
data: format!("data_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline_yaml = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {}
primary_key: id
transforms: {{}}
sinks:
postgres_sink:
type: postgres
from: kafka_source
table: test_output
schema: public
on_conflict: update
"#,
ctx.kafka_topic
);
let status = ctx
.run_pipeline_with_opts(
&pipeline_yaml,
PipelineOpts::new().record_limit(total_records),
)
.await
.expect("Failed to run pipeline");
assert!(status.success(), "Pipeline should complete successfully");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let output_query =
streamling_e2e::resources::PrometheusResource::output_rows_query("kafka_source", None);
let output_rows = prometheus
.query_count(&output_query)
.await
.expect("Failed to query output rows");
let input_query =
streamling_e2e::resources::PrometheusResource::input_rows_query("postgres_sink", None);
let input_rows = prometheus
.query_count(&input_query)
.await
.expect("Failed to query input rows");
if let Some(count) = output_rows {
assert!(
count >= total_records,
"Expected at least {} output rows, got {}",
total_records,
count
);
} else {
eprintln!(
"Warning: output_rows metric not found - this may indicate metrics are not configured"
);
}
if let Some(count) = input_rows {
assert!(
count >= total_records,
"Expected at least {} input rows, got {}",
total_records,
count
);
} else {
eprintln!(
"Warning: input_rows metric not found - this may indicate metrics are not configured"
);
}
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_output")
.await
.expect("Failed to count rows");
assert_eq!(count, total_records as i64);
}
#[tokio::test]
async fn test_checkpoint_metrics_emission() {
let ctx = match setup_with_prometheus().await {
Ok(ctx) => ctx,
Err(e) => {
eprintln!("Skipping test - could not create context: {}", e);
return;
}
};
let prometheus = match &ctx.prometheus {
Some(p) => p,
None => {
eprintln!("Skipping test - Prometheus not configured");
return;
}
};
let total_records = 50u64;
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=total_records as i64)
.map(|i| TestRecord {
id: i,
data: format!("data_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let state_table = format!("checkpoint_metrics_state_{}", ctx.test_id.replace('-', "_"));
let pipeline_yaml = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {}
primary_key: id
transforms: {{}}
sinks:
postgres_sink:
type: postgres
from: kafka_source
table: test_ckpt_metrics
schema: public
on_conflict: update
batch_size: 10
batch_flush_interval: 100ms
"#,
ctx.kafka_topic
);
let status = ctx
.run_pipeline_with_opts(
&pipeline_yaml,
PipelineOpts::new()
.record_limit(total_records)
.timeout(std::time::Duration::from_secs(120))
.env("STREAMLING__STATE_BACKEND__BACKEND_TYPE", "Postgres")
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__HOST",
&ctx.postgres.host,
)
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__PORT",
ctx.postgres.port.to_string(),
)
.env("STREAMLING__STATE_BACKEND__POSTGRES__USER", "postgres")
.env("STREAMLING__STATE_BACKEND__POSTGRES__PASSWORD", "postgres")
.env("STREAMLING__STATE_BACKEND__POSTGRES__DB", &ctx.pg_database)
.env("STREAMLING__STATE_BACKEND__POSTGRES__SSLMODE", "disable")
.env(
"STREAMLING__STATE_BACKEND__POSTGRES__STATE_TABLE_NAME",
&state_table,
)
.env("STREAMLING__CHECKPOINT_INTERVAL_SEC", "1")
.env("STREAMLING__RECORD_BATCH_SIZE", "10"),
)
.await
.expect("Failed to run pipeline");
assert!(status.success(), "Pipeline should complete successfully");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let coordinator_metrics = [
"streamling_checkpoint_epochs_succeeded_total",
"streamling_checkpoint_markers_sent_total",
"streamling_checkpoint_acks_received_total",
"streamling_checkpoint_finalizers_sent_total",
];
for metric_name in &coordinator_metrics {
let query = streamling_e2e::resources::PrometheusResource::checkpoint_coordinator_query(
metric_name,
None,
);
let result = prometheus
.query_count(&query)
.await
.unwrap_or_else(|_| panic!("Failed to query {}", metric_name));
if let Some(count) = result {
assert!(
count >= 1,
"Expected at least 1 for {}, got {}",
metric_name,
count
);
} else {
eprintln!(
"Warning: {} metric not found - query: {}",
metric_name, query
);
}
}
let sink_flush_query =
streamling_e2e::resources::PrometheusResource::checkpoint_histogram_query(
"streamling_checkpoint_sink_flush_milliseconds",
"postgres_sink",
None,
);
let sink_flush = prometheus
.query_count(&sink_flush_query)
.await
.expect("Failed to query checkpoint_sink_flush_milliseconds");
if let Some(_value) = sink_flush {
} else {
eprintln!(
"Warning: checkpoint_sink_flush_milliseconds metric not found - query: {}",
sink_flush_query
);
}
}
#[tokio::test]
async fn test_metrics_with_instance_id() {
let ctx = match setup_with_prometheus().await {
Ok(ctx) => ctx,
Err(e) => {
eprintln!("Skipping test - could not create context: {}", e);
return;
}
};
let prometheus = match &ctx.prometheus {
Some(p) => p,
None => {
eprintln!("Skipping test - Prometheus not configured");
return;
}
};
let total_records = 10u64;
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=total_records as i64)
.map(|i| TestRecord {
id: i,
data: format!("data_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline_yaml = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {}
primary_key: id
transforms: {{}}
sinks:
postgres_sink:
type: postgres
from: kafka_source
table: test_metrics_instance
schema: public
on_conflict: update
"#,
ctx.kafka_topic
);
let status = ctx
.run_pipeline_with_opts(
&pipeline_yaml,
PipelineOpts::new().record_limit(total_records),
)
.await
.expect("Failed to run pipeline");
assert!(status.success(), "Pipeline should complete successfully");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let query = streamling_e2e::resources::PrometheusResource::output_rows_query(
"kafka_source",
Some(&ctx.test_id),
);
let result = prometheus
.query_count(&query)
.await
.expect("Failed to query metrics");
if let Some(count) = result {
assert!(
count >= total_records,
"Expected at least {} rows with instance_id {}, got {}",
total_records,
ctx.test_id,
count
);
} else {
eprintln!(
"Warning: metric with instance_id {} not found - query: {}",
ctx.test_id, query
);
}
}
#[tokio::test]
async fn test_kafka_lag_reports_zero_when_caught_up() {
let ctx = match setup_with_prometheus().await {
Ok(ctx) => ctx,
Err(e) => {
eprintln!("Skipping test - could not create context: {}", e);
return;
}
};
let prometheus = match &ctx.prometheus {
Some(p) => p,
None => {
eprintln!("Skipping test - Prometheus not configured");
return;
}
};
let total_records = 10u64;
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=total_records as i64)
.map(|i| TestRecord {
id: i,
data: format!("data_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline_yaml = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {}
primary_key: id
transforms: {{}}
sinks:
postgres_sink:
type: postgres
from: kafka_source
table: test_lag_zero
schema: public
on_conflict: update
batch_size: 1
"#,
ctx.kafka_topic
);
let _pipeline_result = ctx
.run_pipeline_with_opts(
&pipeline_yaml,
PipelineOpts::new()
.timeout(std::time::Duration::from_secs(20))
.env("STREAMLING__RECORD_BATCH_SIZE", "1")
.env("STREAMLING__CHECKPOINT_INTERVAL_SEC", "1")
.env("STREAMLING__KAFKA_SOURCE__LAG_REPORT_INTERVAL_MS", "1000"),
)
.await;
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let query = format!(
"streamling_kafka_consumer_messages_lag{{id=\"kafka_source\",instance=\"{}\"}}",
ctx.test_id
);
let lag = prometheus
.query(&query)
.await
.expect("Failed to query lag metric");
assert_eq!(
lag,
Some(0.0),
"Kafka consumer lag gauge should report 0 after consuming all records (query: {})",
query
);
}
#[tokio::test]
async fn test_event_time_metrics_kafka_source() {
let ctx = match setup_with_prometheus().await {
Ok(ctx) => ctx,
Err(e) => {
eprintln!("Skipping test - could not create context: {}", e);
return;
}
};
let prometheus = match &ctx.prometheus {
Some(p) => p,
None => {
eprintln!("Skipping test - Prometheus not configured");
return;
}
};
let total_records = 10u64;
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=total_records as i64)
.map(|i| TestRecord {
id: i,
data: format!("data_{}", i),
timestamp: 1000 + i,
})
.collect();
let max_timestamp = (1000 + total_records as i64) as f64;
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline_yaml = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {}
primary_key: id
telemetry:
event_time:
column: timestamp
unit: seconds
transforms: {{}}
sinks:
postgres_sink:
type: postgres
from: kafka_source
table: test_event_time_kafka
schema: public
on_conflict: update
"#,
ctx.kafka_topic
);
let status = ctx
.run_pipeline_with_opts(
&pipeline_yaml,
PipelineOpts::new().record_limit(total_records),
)
.await
.expect("Failed to run pipeline");
assert!(status.success(), "Pipeline should complete successfully");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let histogram_count_query = format!(
"streamling_event_time_lag_milliseconds_count{{id=\"kafka_source\",instance=\"{}\"}}",
ctx.test_id
);
let histogram_count = prometheus
.query_count(&histogram_count_query)
.await
.expect("Failed to query event_time_lag count");
assert!(
matches!(histogram_count, Some(c) if c >= total_records),
"Expected at least {} event_time_lag observations, got {:?} (query: {})",
total_records,
histogram_count,
histogram_count_query
);
let watermark_query = format!(
"streamling_event_time_watermark_milliseconds{{id=\"kafka_source\",instance=\"{}\"}}",
ctx.test_id
);
let watermark = prometheus
.query(&watermark_query)
.await
.expect("Failed to query event_time_watermark");
let expected_ms = max_timestamp * 1_000.0;
assert!(
matches!(watermark, Some(v) if (v - expected_ms).abs() < 0.5),
"Expected watermark = {} ms (max_timestamp_seconds * 1000), got {:?} (query: {})",
expected_ms,
watermark,
watermark_query
);
}
#[tokio::test]
async fn test_event_time_metrics_pre_filter_observation() {
let ctx = match setup_with_prometheus().await {
Ok(ctx) => ctx,
Err(e) => {
eprintln!("Skipping test - could not create context: {}", e);
return;
}
};
let prometheus = match &ctx.prometheus {
Some(p) => p,
None => {
eprintln!("Skipping test - Prometheus not configured");
return;
}
};
let total_records = 10u64;
let post_filter_records = 4u64;
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=total_records as i64)
.map(|i| TestRecord {
id: i,
data: format!("data_{}", i),
timestamp: 1_700_000_000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline_yaml = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {}
primary_key: id
filter: "id > 6"
telemetry:
event_time:
column: timestamp
unit: seconds
transforms: {{}}
sinks:
postgres_sink:
type: postgres
from: kafka_source
table: test_event_time_pre_filter
schema: public
on_conflict: update
"#,
ctx.kafka_topic
);
let status = ctx
.run_pipeline_with_opts(
&pipeline_yaml,
PipelineOpts::new().record_limit(post_filter_records),
)
.await
.expect("Failed to run pipeline");
assert!(status.success(), "Pipeline should complete successfully");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let histogram_count_query = format!(
"streamling_event_time_lag_milliseconds_count{{id=\"kafka_source\",instance=\"{}\"}}",
ctx.test_id
);
let histogram_count = prometheus
.query_count(&histogram_count_query)
.await
.expect("Failed to query event_time_lag count");
assert!(
matches!(histogram_count, Some(c) if c >= total_records),
"Expected histogram count >= {} (pre-filter row count), got {:?}. \
If this is {} the filter is being observed before the histogram, \
which violates R7. (query: {})",
total_records,
histogram_count,
post_filter_records,
histogram_count_query
);
let sink_count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_event_time_pre_filter")
.await
.expect("Failed to count rows");
assert_eq!(
sink_count, post_filter_records as i64,
"Sink should receive only post-filter rows"
);
}
#[tokio::test]
async fn test_event_time_metrics_absent_when_not_configured() {
let ctx = match setup_with_prometheus().await {
Ok(ctx) => ctx,
Err(e) => {
eprintln!("Skipping test - could not create context: {}", e);
return;
}
};
let prometheus = match &ctx.prometheus {
Some(p) => p,
None => {
eprintln!("Skipping test - Prometheus not configured");
return;
}
};
let total_records = 5u64;
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=total_records as i64)
.map(|i| TestRecord {
id: i,
data: format!("data_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline_yaml = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {}
primary_key: id
transforms: {{}}
sinks:
postgres_sink:
type: postgres
from: kafka_source
table: test_event_time_absent
schema: public
on_conflict: update
"#,
ctx.kafka_topic
);
let status = ctx
.run_pipeline_with_opts(
&pipeline_yaml,
PipelineOpts::new().record_limit(total_records),
)
.await
.expect("Failed to run pipeline");
assert!(status.success(), "Pipeline should complete successfully");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let watermark_query = format!(
"streamling_event_time_watermark_milliseconds{{id=\"kafka_source\",instance=\"{}\"}}",
ctx.test_id
);
let watermark = prometheus.query(&watermark_query).await.unwrap_or(None);
assert!(
watermark.is_none(),
"event_time_watermark must NOT be emitted when telemetry is unconfigured, got {:?}",
watermark
);
let histogram_count_query = format!(
"streamling_event_time_lag_milliseconds_count{{id=\"kafka_source\",instance=\"{}\"}}",
ctx.test_id
);
let count = prometheus
.query_count(&histogram_count_query)
.await
.unwrap_or(None);
assert!(
count.is_none(),
"event_time_lag must NOT be emitted when telemetry is unconfigured, got {:?}",
count
);
}
#[tokio::test]
async fn test_event_time_metrics_misconfigured_column_logs_once_and_skips() {
let ctx = match setup_with_prometheus().await {
Ok(ctx) => ctx,
Err(e) => {
eprintln!("Skipping test - could not create context: {}", e);
return;
}
};
let prometheus = match &ctx.prometheus {
Some(p) => p,
None => {
eprintln!("Skipping test - Prometheus not configured");
return;
}
};
let total_records = 5u64;
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=total_records as i64)
.map(|i| TestRecord {
id: i,
data: format!("data_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline_yaml = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {}
primary_key: id
telemetry:
event_time:
column: nonexistent_column
unit: seconds
transforms: {{}}
sinks:
postgres_sink:
type: postgres
from: kafka_source
table: test_event_time_misconfigured
schema: public
on_conflict: update
"#,
ctx.kafka_topic
);
let output = ctx
.run_pipeline_raw(
&pipeline_yaml,
PipelineOpts::new().record_limit(total_records),
)
.await
.expect("Pipeline execution failed");
assert!(
output.status.success(),
"Pipeline should complete successfully despite misconfiguration. stderr: {}",
output.stderr
);
let sink_count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_event_time_misconfigured")
.await
.expect("Failed to count rows");
assert_eq!(sink_count, total_records as i64);
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let watermark_query = format!(
"streamling_event_time_watermark_milliseconds{{id=\"kafka_source\",instance=\"{}\"}}",
ctx.test_id
);
let watermark = prometheus.query(&watermark_query).await.unwrap_or(None);
assert!(
watermark.is_none(),
"watermark must not be emitted on misconfigured column, got {:?}",
watermark
);
let combined_output = format!("{}\n{}", output.stdout, output.stderr);
assert!(
combined_output.contains("event-time instrumentation skipped"),
"expected misconfiguration warning in pipeline output, stderr:\n{}",
output.stderr
);
}
#[tokio::test]
async fn test_event_time_metrics_across_source_transform_and_sink() {
let ctx = match setup_with_prometheus().await {
Ok(ctx) => ctx,
Err(e) => {
eprintln!("Skipping test - could not create context: {}", e);
return;
}
};
let prometheus = match &ctx.prometheus {
Some(p) => p,
None => {
eprintln!("Skipping test - Prometheus not configured");
return;
}
};
let total_records = 5u64;
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=total_records as i64)
.map(|i| TestRecord {
id: i,
data: format!("data_{}", i),
timestamp: 1_700_000_000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline_yaml = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {}
primary_key: id
telemetry:
event_time:
column: timestamp
unit: seconds
transforms:
passthrough:
type: sql
primary_key: id
sql: "SELECT id, data, timestamp FROM kafka_source"
telemetry:
event_time:
column: timestamp
unit: seconds
sinks:
postgres_sink:
type: postgres
from: passthrough
table: test_event_time_all_nodes
schema: public
on_conflict: update
telemetry:
event_time:
column: timestamp
unit: seconds
"#,
ctx.kafka_topic
);
let status = ctx
.run_pipeline_with_opts(
&pipeline_yaml,
PipelineOpts::new().record_limit(total_records),
)
.await
.expect("Failed to run pipeline");
assert!(status.success(), "Pipeline should complete successfully");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
for node_ref in ["kafka_source", "passthrough", "postgres_sink"] {
let query = format!(
"streamling_event_time_watermark_milliseconds{{id=\"{}\",instance=\"{}\"}}",
node_ref, ctx.test_id
);
let value = prometheus
.query(&query)
.await
.expect("Failed to query watermark metric");
assert!(
value.is_some(),
"Expected watermark series for node '{}' (query: {}), got None",
node_ref,
query
);
}
for node_ref in ["kafka_source", "passthrough", "postgres_sink"] {
let query = format!(
"streamling_event_time_lag_milliseconds_count{{id=\"{}\",instance=\"{}\"}}",
node_ref, ctx.test_id
);
let count = prometheus
.query_count(&query)
.await
.expect("Failed to query lag count");
assert!(
matches!(count, Some(c) if c >= total_records),
"Expected lag count >= {} for node '{}', got {:?} (query: {})",
total_records,
node_ref,
count,
query
);
}
}
#[tokio::test]
async fn test_yaml_telemetry_labels_appear_on_emitted_metrics() {
let ctx = match setup_with_prometheus().await {
Ok(ctx) => ctx,
Err(e) => {
eprintln!("Skipping test - could not create context: {}", e);
return;
}
};
let prometheus = match &ctx.prometheus {
Some(p) => p,
None => {
eprintln!("Skipping test - Prometheus not configured");
return;
}
};
let total_records = 10u64;
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=total_records as i64)
.map(|i| TestRecord {
id: i,
data: format!("data_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline_yaml = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {}
primary_key: id
telemetry:
labels:
tier: critical
dataset: e2e_yaml_labels
transforms: {{}}
sinks:
postgres_sink:
type: postgres
from: kafka_source
table: test_output
schema: public
on_conflict: update
telemetry:
labels:
destination: e2e_sink
"#,
ctx.kafka_topic
);
let status = ctx
.run_pipeline_with_opts(
&pipeline_yaml,
PipelineOpts::new().record_limit(total_records),
)
.await
.expect("Failed to run pipeline");
assert!(status.success(), "Pipeline should complete successfully");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let source_query = r#"streamling_output_rows_total{id="kafka_source",tier="critical",dataset="e2e_yaml_labels"}"#;
let source_rows = prometheus
.query_count(source_query)
.await
.expect("Failed to query source output rows");
assert!(
matches!(source_rows, Some(count) if count >= total_records),
"Expected source output_rows_total with tier=critical,dataset=e2e_yaml_labels to be >= {}, got {:?} (query: {})",
total_records,
source_rows,
source_query
);
let sink_query = r#"streamling_input_rows_total{id="postgres_sink",destination="e2e_sink"}"#;
let sink_rows = prometheus
.query_count(sink_query)
.await
.expect("Failed to query sink input rows");
assert!(
matches!(sink_rows, Some(count) if count >= total_records),
"Expected sink input_rows_total with destination=e2e_sink to be >= {}, got {:?} (query: {})",
total_records,
sink_rows,
sink_query
);
}