use serde::Serialize;
use streamling_e2e::{init_tracing, PipelineOpts, TestContext};
#[derive(Debug, Clone, Serialize)]
struct BlockRecord {
id: i64,
block: i64,
data: String,
}
const BLOCK_SCHEMA: &str = r#"{
"type": "record",
"name": "BlockRecord",
"fields": [
{"name": "id", "type": "long"},
{"name": "block", "type": "long"},
{"name": "data", "type": "string"}
]
}"#;
#[tokio::test]
async fn test_kafka_source_filter_basic() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(BLOCK_SCHEMA)
.await
.expect("Failed to register schema");
let records_to_produce: i64 = 100;
let records: Vec<BlockRecord> = (1..=records_to_produce)
.map(|i| BlockRecord {
id: i,
block: i,
data: format!("data_{}", i),
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
filter: "block > 60"
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: filter_basic_test
schema: public
primary_key: id
on_conflict: update
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(40) .timeout(std::time::Duration::from_secs(60)),
)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.filter_basic_test")
.await
.expect("Failed to query count");
assert_eq!(count, 40, "Filter 'block > 60' should pass 40 records");
let min_block: Vec<(i64,)> = ctx
.postgres
.query("SELECT MIN(block) FROM public.filter_basic_test")
.await
.expect("Failed to query min block");
assert_eq!(min_block[0].0, 61, "Minimum block should be 61");
}
#[tokio::test]
async fn test_kafka_source_filter_partial_matches() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(BLOCK_SCHEMA)
.await
.expect("Failed to register schema");
let records_to_produce: i64 = 10;
let records: Vec<BlockRecord> = (1..=records_to_produce)
.map(|i| BlockRecord {
id: i,
block: i,
data: format!("data_{}", i),
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
filter: "block > 5"
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: filter_partial_test
schema: public
primary_key: id
on_conflict: update
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(5)
.timeout(std::time::Duration::from_secs(60)),
)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.filter_partial_test")
.await
.expect("Failed to query count");
assert_eq!(count, 5, "Filter 'block > 5' should pass 5 records");
}
#[tokio::test]
async fn test_kafka_source_filter_string() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(BLOCK_SCHEMA)
.await
.expect("Failed to register schema");
let records_to_produce: i64 = 100;
let records: Vec<BlockRecord> = (1..=records_to_produce)
.map(|i| BlockRecord {
id: i,
block: i,
data: if i % 2 == 0 {
"target_data".to_string()
} else {
format!("other_{}", i)
},
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
filter: "data = 'target_data'"
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: filter_string_test
schema: public
primary_key: id
on_conflict: update
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(50)
.timeout(std::time::Duration::from_secs(60)),
)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.filter_string_test")
.await
.expect("Failed to query count");
assert_eq!(
count, 50,
"Filter should pass 50 records with 'target_data'"
);
let all_match = ctx
.postgres
.count("SELECT COUNT(*) FROM public.filter_string_test WHERE data = 'target_data'")
.await
.expect("Failed to query matching records");
assert_eq!(
all_match, 50,
"All records should have data = 'target_data'"
);
}