use serde::Serialize;
use streamling_e2e::{init_tracing, PipelineOpts, TestContext};
#[derive(Debug, Clone, Serialize)]
struct TestRecord {
id: i64,
value: String,
timestamp: i64,
}
const TEST_SCHEMA: &str = r#"{
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "id", "type": "long"},
{"name": "value", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}"#;
#[derive(Debug, Clone, Serialize)]
struct JsonbTestRecord {
id: i64,
metadata: Metadata,
tags: Vec<String>,
data: String,
}
#[derive(Debug, Clone, Serialize)]
struct Metadata {
key: String,
value: i32,
}
const JSONB_SCHEMA: &str = r#"{
"type": "record",
"name": "JsonbTestMessage",
"fields": [
{"name": "id", "type": "long"},
{"name": "metadata", "type": {"type": "record", "name": "Metadata", "fields": [
{"name": "key", "type": "string"},
{"name": "value", "type": "int"}
]}},
{"name": "tags", "type": {"type": "array", "items": "string"}},
{"name": "data", "type": "string"}
]
}"#;
#[derive(Debug, Clone, Serialize)]
struct CompositePkRecord {
id: i64,
version: i64,
value: String,
}
const COMPOSITE_PK_SCHEMA: &str = r#"{
"type": "record",
"name": "CompositePkTestMessage",
"fields": [
{"name": "id", "type": "long"},
{"name": "version", "type": "long"},
{"name": "value", "type": "string"}
]
}"#;
#[tokio::test]
async fn test_basic_postgres_sink() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=10)
.map(|i| TestRecord {
id: i,
value: format!("value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_basic
schema: public
primary_key: id
on_conflict: update
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline(&pipeline, 10)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_basic")
.await
.expect("Failed to query count");
assert_eq!(count, 10, "Should have 10 records in output table");
let rows: Vec<(i64, String, i64)> = ctx
.postgres
.query("SELECT id, value, timestamp FROM public.test_basic WHERE id = 1")
.await
.expect("Failed to query record");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].0, 1);
assert_eq!(rows[0].1, "value_1");
assert_eq!(rows[0].2, 1001);
}
#[tokio::test]
async fn test_multiple_batches() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=18)
.map(|i| TestRecord {
id: i,
value: format!("batch_value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_batches
schema: public
primary_key: id
on_conflict: update
batch_size: 10
batch_flush_interval: 1000ms
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline(&pipeline, 18)
.await
.expect("Streamling execution failed");
assert!(status.success());
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_batches")
.await
.expect("Failed to query count");
assert_eq!(count, 18, "Should have 18 records after multiple batches");
}
#[tokio::test]
async fn test_jsonb_types() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(JSONB_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<JsonbTestRecord> = (1..=10)
.map(|i| JsonbTestRecord {
id: i,
metadata: Metadata {
key: format!("key_{}", i - 1),
value: ((i - 1) * 10) as i32,
},
tags: vec![format!("tag_{}", i - 1), format!("tag_{}", i + 99)],
data: format!("data_{}", i - 1),
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_jsonb
schema: public
primary_key: id
on_conflict: update
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline(&pipeline, 10)
.await
.expect("Streamling execution failed");
assert!(status.success());
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_jsonb")
.await
.expect("Failed to query count");
assert_eq!(count, 10, "Should have 10 records");
let jsonb_check: Vec<(String,)> = ctx
.postgres
.query("SELECT pg_typeof(metadata)::text FROM public.test_jsonb LIMIT 1")
.await
.expect("Failed to check type");
assert!(
jsonb_check[0].0.contains("jsonb"),
"metadata should be JSONB type"
);
let tags_check: Vec<(String,)> = ctx
.postgres
.query("SELECT pg_typeof(tags)::text FROM public.test_jsonb LIMIT 1")
.await
.expect("Failed to check type");
assert!(
tags_check[0].0.contains("jsonb"),
"tags should be JSONB type"
);
}
#[tokio::test]
async fn test_deduplication() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records = vec![
TestRecord {
id: 1,
value: "first".to_string(),
timestamp: 100,
},
TestRecord {
id: 2,
value: "second".to_string(),
timestamp: 200,
},
TestRecord {
id: 1,
value: "third".to_string(),
timestamp: 300,
},
TestRecord {
id: 3,
value: "fourth".to_string(),
timestamp: 400,
},
TestRecord {
id: 1,
value: "fifth".to_string(),
timestamp: 500,
},
];
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_dedup
schema: public
primary_key: id
on_conflict: update
batch_size: 1
batch_flush_interval: 100ms
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(5)
.env("STREAMLING__RECORD_BATCH_SIZE", "1"),
)
.await
.expect("Streamling execution failed");
assert!(status.success());
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_dedup")
.await
.expect("Failed to query count");
assert_eq!(count, 3, "Should have 3 unique records after deduplication");
let rows: Vec<(i64, String, i64)> = ctx
.postgres
.query("SELECT id, value, timestamp FROM public.test_dedup WHERE id = 1")
.await
.expect("Failed to query record");
assert_eq!(rows[0].1, "fifth", "id=1 should have latest value 'fifth'");
assert_eq!(rows[0].2, 500, "id=1 should have latest timestamp 500");
let id2: Vec<(String,)> = ctx
.postgres
.query("SELECT value FROM public.test_dedup WHERE id = 2")
.await
.expect("Failed to query");
assert_eq!(id2[0].0, "second");
let id3: Vec<(String,)> = ctx
.postgres
.query("SELECT value FROM public.test_dedup WHERE id = 3")
.await
.expect("Failed to query");
assert_eq!(id3[0].0, "fourth");
}
#[tokio::test]
async fn test_delete_operations() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let insert_records = vec![
TestRecord {
id: 1,
value: "value_1".to_string(),
timestamp: 100,
},
TestRecord {
id: 2,
value: "value_2".to_string(),
timestamp: 200,
},
TestRecord {
id: 3,
value: "value_3".to_string(),
timestamp: 300,
},
];
ctx.kafka
.produce_avro_records(&insert_records)
.await
.expect("Failed to produce insert records");
ctx.kafka
.produce_avro_records_with_op(
&[
TestRecord {
id: 1,
value: "".to_string(),
timestamp: 0,
},
TestRecord {
id: 2,
value: "".to_string(),
timestamp: 0,
},
],
"d",
)
.await
.expect("Failed to produce delete records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_delete
schema: public
primary_key: id
on_conflict: update
batch_size: 1
batch_flush_interval: 100ms
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(5)
.env("STREAMLING__RECORD_BATCH_SIZE", "1"),
)
.await
.expect("Streamling execution failed");
assert!(status.success());
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_delete")
.await
.expect("Failed to query count");
assert_eq!(count, 1, "Should have 1 record after delete operations");
let remaining: Vec<(i64,)> = ctx
.postgres
.query("SELECT id FROM public.test_delete")
.await
.expect("Failed to query");
assert_eq!(remaining[0].0, 3, "Only id=3 should remain");
}
#[tokio::test]
async fn test_composite_primary_key() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(COMPOSITE_PK_SCHEMA)
.await
.expect("Failed to register schema");
let records = vec![
CompositePkRecord {
id: 1,
version: 1,
value: "initial_v1".to_string(),
},
CompositePkRecord {
id: 1,
version: 2,
value: "value_v2".to_string(),
},
CompositePkRecord {
id: 2,
version: 1,
value: "value_v1".to_string(),
},
CompositePkRecord {
id: 2,
version: 2,
value: "value_v2".to_string(),
},
CompositePkRecord {
id: 1,
version: 1,
value: "updated_v1".to_string(),
}, ];
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_composite_pk
schema: public
primary_key: id,version
on_conflict: update
batch_size: 1
batch_flush_interval: 100ms
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(5)
.env("STREAMLING__RECORD_BATCH_SIZE", "1"),
)
.await
.expect("Streamling execution failed");
assert!(status.success());
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_composite_pk")
.await
.expect("Failed to query count");
assert_eq!(count, 4, "Should have 4 unique records");
let row_1_1: Vec<(String,)> = ctx
.postgres
.query("SELECT value FROM public.test_composite_pk WHERE id = 1 AND version = 1")
.await
.expect("Failed to query");
assert_eq!(
row_1_1[0].0, "updated_v1",
"(1,1) should have updated value"
);
let row_1_2: Vec<(String,)> = ctx
.postgres
.query("SELECT value FROM public.test_composite_pk WHERE id = 1 AND version = 2")
.await
.expect("Failed to query");
assert_eq!(row_1_2[0].0, "value_v2");
let row_2_1: Vec<(String,)> = ctx
.postgres
.query("SELECT value FROM public.test_composite_pk WHERE id = 2 AND version = 1")
.await
.expect("Failed to query");
assert_eq!(row_2_1[0].0, "value_v1");
let row_2_2: Vec<(String,)> = ctx
.postgres
.query("SELECT value FROM public.test_composite_pk WHERE id = 2 AND version = 2")
.await
.expect("Failed to query");
assert_eq!(row_2_2[0].0, "value_v2");
}
#[tokio::test]
async fn test_on_conflict_nothing() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let initial = vec![TestRecord {
id: 1,
value: "original".to_string(),
timestamp: 100,
}];
ctx.kafka
.produce_avro_records(&initial)
.await
.expect("Failed to produce records");
let pipeline_update = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_on_conflict
schema: public
primary_key: id
on_conflict: update
batch_size: 1
batch_flush_interval: 100ms
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline_update,
PipelineOpts::new()
.record_limit(1)
.env("STREAMLING__RECORD_BATCH_SIZE", "1"),
)
.await
.expect("Streamling execution failed");
assert!(status.success());
let rows: Vec<(String,)> = ctx
.postgres
.query("SELECT value FROM public.test_on_conflict WHERE id = 1")
.await
.expect("Failed to query");
assert_eq!(rows[0].0, "original");
let conflict = vec![TestRecord {
id: 1,
value: "should_not_update".to_string(),
timestamp: 200,
}];
ctx.kafka
.produce_avro_records(&conflict)
.await
.expect("Failed to produce records");
let pipeline_nothing = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_on_conflict
schema: public
primary_key: id
on_conflict: nothing
batch_size: 1
batch_flush_interval: 100ms
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline_nothing,
PipelineOpts::new()
.record_limit(1)
.env("STREAMLING__RECORD_BATCH_SIZE", "1"),
)
.await
.expect("Streamling execution failed");
assert!(status.success());
let rows: Vec<(String,)> = ctx
.postgres
.query("SELECT value FROM public.test_on_conflict WHERE id = 1")
.await
.expect("Failed to query");
assert_eq!(
rows[0].0, "original",
"Value should remain 'original' with on_conflict='nothing'"
);
}
#[tokio::test]
async fn test_update_where() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let initial_records = vec![
TestRecord {
id: 1,
value: "original_1".to_string(),
timestamp: 100,
},
TestRecord {
id: 2,
value: "original_2".to_string(),
timestamp: 200,
},
TestRecord {
id: 3,
value: "original_3".to_string(),
timestamp: 300,
},
];
ctx.kafka
.produce_avro_records(&initial_records)
.await
.expect("Failed to produce initial records");
let pipeline_seed = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_cond_update
schema: public
primary_key: id
on_conflict: update
batch_size: 1
batch_flush_interval: 100ms
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline_seed,
PipelineOpts::new()
.record_limit(3)
.env("STREAMLING__RECORD_BATCH_SIZE", "1"),
)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Seed pipeline should succeed");
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_cond_update")
.await
.expect("Failed to query count");
assert_eq!(count, 3, "Should have 3 seeded rows");
let update_records = vec![
TestRecord {
id: 1,
value: "updated_1".to_string(),
timestamp: 999, },
TestRecord {
id: 2,
value: "should_not_update".to_string(),
timestamp: 50, },
TestRecord {
id: 3,
value: "should_not_update_either".to_string(),
timestamp: 300, },
];
ctx.kafka
.produce_avro_records(&update_records)
.await
.expect("Failed to produce update records");
let pipeline_conditional = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_cond_update
schema: public
primary_key: id
on_conflict: update
update_where:
timestamp: '>'
batch_size: 1
batch_flush_interval: 100ms
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline_conditional,
PipelineOpts::new()
.record_limit(6)
.env("STREAMLING__RECORD_BATCH_SIZE", "1"),
)
.await
.expect("Streamling execution failed");
assert!(
status.success(),
"Conditional update pipeline should succeed"
);
let row1: Vec<(String, i64)> = ctx
.postgres
.query("SELECT value, timestamp FROM public.test_cond_update WHERE id = 1")
.await
.expect("Failed to query id=1");
assert_eq!(
row1[0].0, "updated_1",
"id=1 should be updated (newer timestamp)"
);
assert_eq!(row1[0].1, 999);
let row2: Vec<(String, i64)> = ctx
.postgres
.query("SELECT value, timestamp FROM public.test_cond_update WHERE id = 2")
.await
.expect("Failed to query id=2");
assert_eq!(
row2[0].0, "original_2",
"id=2 should NOT be updated (older timestamp)"
);
assert_eq!(row2[0].1, 200);
let row3: Vec<(String, i64)> = ctx
.postgres
.query("SELECT value, timestamp FROM public.test_cond_update WHERE id = 3")
.await
.expect("Failed to query id=3");
assert_eq!(
row3[0].0, "original_3",
"id=3 should NOT be updated (equal timestamp, strict >)"
);
assert_eq!(row3[0].1, 300);
let final_count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_cond_update")
.await
.expect("Failed to query final count");
assert_eq!(final_count, 3, "Should still have exactly 3 rows");
}
#[tokio::test]
async fn test_parallel_postgres_sink() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=100)
.map(|i| TestRecord {
id: i,
value: format!("parallel_value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_parallel
schema: public
primary_key: id
on_conflict: update
parallelism: 4
batch_size: 25
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline(&pipeline, 100)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_parallel")
.await
.expect("Failed to query count");
assert_eq!(count, 100, "Should have all 100 records");
let rows: Vec<(i64, String, i64)> = ctx
.postgres
.query("SELECT id, value, timestamp FROM public.test_parallel WHERE id = 50")
.await
.expect("Failed to query record");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].0, 50);
assert_eq!(rows[0].1, "parallel_value_50");
assert_eq!(rows[0].2, 1050);
}
#[tokio::test]
async fn test_parallel_postgres_sink_deduplication() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let mut records = Vec::new();
for i in 1..=5 {
records.push(TestRecord {
id: i,
value: format!("original_{}", i),
timestamp: 1000 + i,
});
}
for i in 1..=5 {
records.push(TestRecord {
id: i,
value: format!("updated_{}", i),
timestamp: 2000 + i,
});
}
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_parallel_dedup
schema: public
primary_key: id
on_conflict: update
parallelism: 3
batch_size: 5
batch_flush_interval: 100ms
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(10)
.env("STREAMLING__RECORD_BATCH_SIZE", "1"),
)
.await
.expect("Streamling execution failed");
assert!(status.success());
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_parallel_dedup")
.await
.expect("Failed to query count");
assert_eq!(count, 5, "Should have 5 unique records after deduplication");
let rows: Vec<(i64, String, i64)> = ctx
.postgres
.query("SELECT id, value, timestamp FROM public.test_parallel_dedup ORDER BY id")
.await
.expect("Failed to query records");
assert_eq!(rows.len(), 5);
for (idx, row) in rows.iter().enumerate() {
let expected_id = (idx + 1) as i64;
assert_eq!(row.0, expected_id);
assert_eq!(row.1, format!("updated_{}", expected_id));
assert_eq!(row.2, 2000 + expected_id);
}
}
#[tokio::test]
async fn test_parallel_postgres_sink_deletes() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let insert_records: Vec<TestRecord> = (1..=10)
.map(|i| TestRecord {
id: i,
value: format!("value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&insert_records)
.await
.expect("Failed to produce insert records");
let delete_records: Vec<TestRecord> = (1..=10)
.step_by(2)
.map(|i| TestRecord {
id: i,
value: String::new(),
timestamp: 0,
})
.collect();
ctx.kafka
.produce_avro_records_with_op(&delete_records, "d")
.await
.expect("Failed to produce delete records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_parallel_delete
schema: public
primary_key: id
on_conflict: update
parallelism: 3
batch_size: 5
batch_flush_interval: 100ms
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(15)
.env("STREAMLING__RECORD_BATCH_SIZE", "1"),
)
.await
.expect("Streamling execution failed");
assert!(status.success());
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_parallel_delete")
.await
.expect("Failed to query count");
assert_eq!(count, 5, "Should have 5 records after deleting odds");
let remaining: Vec<(i64,)> = ctx
.postgres
.query("SELECT id FROM public.test_parallel_delete ORDER BY id")
.await
.expect("Failed to query");
let ids: Vec<i64> = remaining.iter().map(|r| r.0).collect();
assert_eq!(ids, vec![2, 4, 6, 8, 10], "Only even IDs should remain");
}
#[tokio::test]
async fn test_generic_batch_accumulation_on_sink() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (1..=50)
.map(|i| TestRecord {
id: i,
value: format!("batch_value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
batch_size: 1
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: test_batch_accumulation
schema: public
primary_key: id
on_conflict: update
batch_size: 25
batch_flush_interval: 500ms
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(&pipeline, PipelineOpts::new().record_limit(50))
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.test_batch_accumulation")
.await
.expect("Failed to query count");
assert_eq!(count, 50, "All 50 records should be written");
}