use serde::Serialize;
use streamling_e2e::{init_tracing, PipelineOpts, TestContext};
#[derive(Debug, Clone, Serialize)]
struct Transfer {
id: i64,
account: String,
amount: i64,
}
const TRANSFER_SCHEMA: &str = r#"{
"type": "record",
"name": "Transfer",
"doc": "{\"primaryKeys\":[\"id\"]}",
"fields": [
{"name": "id", "type": "long"},
{"name": "account", "type": "string"},
{"name": "amount", "type": "long"}
]
}"#;
#[tokio::test]
async fn test_basic_aggregation_with_inserts_and_deletes() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TRANSFER_SCHEMA)
.await
.expect("Failed to register schema");
let insert_records = vec![
Transfer {
id: 1,
account: "alice".to_string(),
amount: 100,
},
Transfer {
id: 2,
account: "alice".to_string(),
amount: 50,
},
Transfer {
id: 4,
account: "bob".to_string(),
amount: 200,
},
Transfer {
id: 5,
account: "bob".to_string(),
amount: 75,
},
Transfer {
id: 8,
account: "alice".to_string(),
amount: 0,
},
];
ctx.kafka
.produce_avro_records(&insert_records)
.await
.expect("Failed to produce insert records");
let delete_records = vec![
Transfer {
id: 3,
account: "alice".to_string(),
amount: 30,
},
Transfer {
id: 6,
account: "bob".to_string(),
amount: 100,
},
Transfer {
id: 7,
account: "bob".to_string(),
amount: 50,
},
];
ctx.kafka
.produce_avro_records_with_op(&delete_records, "d")
.await
.expect("Failed to produce delete records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
postgres_agg:
type: postgres_aggregate
from: kafka_source
landing_table: transfers
agg_table: account_balances
schema: streamling
batch_flush_interval: 1s
batch_size: 10
primary_key: id
group_by:
account:
type: text
aggregate:
balance:
from: amount
fn: sum
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(8)
.env("STREAMLING__RECORD_BATCH_SIZE", "10")
.env("STREAMLING__RECORD_BATCH_INTERVAL_MS", "1000"),
)
.await
.expect("Streamling execution failed");
assert!(status.success());
let landing_count = ctx
.postgres
.count("SELECT COUNT(*) FROM streamling.transfers")
.await
.expect("Failed to count landing table rows");
assert_eq!(
landing_count, 8,
"Landing table should have exactly 8 rows (unique id+op combinations)"
);
let has_gs_op: Vec<(bool,)> = ctx
.postgres
.query(
r#"SELECT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'streamling'
AND table_name = 'transfers'
AND column_name = '_gs_op'
)"#,
)
.await
.expect("Failed to check _gs_op column");
assert!(
has_gs_op[0].0,
"Landing table should have _gs_op column in append-only mode"
);
let pk_columns: Vec<(String,)> = ctx
.postgres
.query(
r#"SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = '"streamling"."transfers"'::regclass AND i.indisprimary
ORDER BY a.attnum"#,
)
.await
.expect("Failed to fetch primary key columns");
let pk_col_names: Vec<String> = pk_columns.iter().map(|(name,)| name.clone()).collect();
assert!(
pk_col_names.contains(&"id".to_string()) && pk_col_names.contains(&"_gs_op".to_string()),
"Landing table should have composite PK (id, _gs_op), got: {:?}",
pk_col_names
);
let agg_count = ctx
.postgres
.count("SELECT COUNT(*) FROM streamling.account_balances")
.await
.expect("Failed to count aggregation table rows");
assert_eq!(agg_count, 2, "Aggregation table should have 2 accounts");
let alice_balance: Vec<(String, i64)> = ctx
.postgres
.query("SELECT account, balance FROM streamling.account_balances WHERE account = 'alice'")
.await
.expect("Failed to fetch alice's balance");
assert_eq!(alice_balance.len(), 1);
assert_eq!(
alice_balance[0].1, 120,
"Alice's balance should be 120 (100 + 50 - 30)"
);
let bob_balance: Vec<(String, i64)> = ctx
.postgres
.query("SELECT account, balance FROM streamling.account_balances WHERE account = 'bob'")
.await
.expect("Failed to fetch bob's balance");
assert_eq!(bob_balance.len(), 1);
assert_eq!(
bob_balance[0].1, 125,
"Bob's balance should be 125 (200 + 75 - 100 - 50)"
);
}
#[tokio::test]
async fn test_aggregation_deduplication() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TRANSFER_SCHEMA)
.await
.expect("Failed to register schema");
let records = vec![
Transfer {
id: 1,
account: "alice".to_string(),
amount: 100,
},
Transfer {
id: 1,
account: "alice".to_string(),
amount: 100,
},
Transfer {
id: 1,
account: "alice".to_string(),
amount: 100,
},
Transfer {
id: 2,
account: "bob".to_string(),
amount: 50,
},
Transfer {
id: 2,
account: "bob".to_string(),
amount: 50,
},
];
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
postgres_agg:
type: postgres_aggregate
from: kafka_source
landing_table: transfers_dedup
agg_table: balances_dedup
schema: streamling
batch_flush_interval: 1s
batch_size: 1
primary_key: id
group_by:
account:
type: text
aggregate:
balance:
from: amount
fn: sum
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(5)
.env("STREAMLING__RECORD_BATCH_SIZE", "1")
.env("STREAMLING__RECORD_BATCH_INTERVAL_MS", "1000"),
)
.await
.expect("Streamling execution failed");
assert!(status.success());
let landing_count = ctx
.postgres
.count("SELECT COUNT(*) FROM streamling.transfers_dedup")
.await
.expect("Failed to count landing table rows");
assert_eq!(
landing_count, 2,
"Landing table should have 2 rows after deduplication (not 5)"
);
let agg_count = ctx
.postgres
.count("SELECT COUNT(*) FROM streamling.balances_dedup")
.await
.expect("Failed to count aggregation table rows");
assert_eq!(agg_count, 2, "Aggregation table should have 2 accounts");
let alice_balance: Vec<(String, i64)> = ctx
.postgres
.query("SELECT account, balance FROM streamling.balances_dedup WHERE account = 'alice'")
.await
.expect("Failed to fetch alice's balance");
assert_eq!(alice_balance.len(), 1);
assert_eq!(
alice_balance[0].1, 100,
"Alice's balance should be 100 (deduplicated, not 300)"
);
let bob_balance: Vec<(String, i64)> = ctx
.postgres
.query("SELECT account, balance FROM streamling.balances_dedup WHERE account = 'bob'")
.await
.expect("Failed to fetch bob's balance");
assert_eq!(bob_balance.len(), 1);
assert_eq!(
bob_balance[0].1, 50,
"Bob's balance should be 50 (deduplicated, not 100)"
);
}