use serde::Serialize;
use streamling_e2e::{init_tracing, PipelineOpts, TestContext};
#[derive(Debug, Clone, Serialize)]
struct RecordV1 {
id: i64,
data: String,
}
#[derive(Debug, Clone, Serialize)]
struct RecordV2 {
id: i64,
data: String,
version: i32,
}
const SCHEMA_V1: &str = r#"{
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "id", "type": "long"},
{"name": "data", "type": "string"}
]
}"#;
const SCHEMA_V2: &str = r#"{
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "id", "type": "long"},
{"name": "data", "type": "string"},
{"name": "version", "type": "int", "default": 1}
]
}"#;
#[tokio::test]
async fn test_schema_evolution_new_field_with_default() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(SCHEMA_V1)
.await
.expect("Failed to register v1 schema");
let v1_records: Vec<RecordV1> = (1..=5)
.map(|i| RecordV1 {
id: i,
data: format!("data_v1_{}", i),
})
.collect();
ctx.kafka
.produce_avro_records(&v1_records)
.await
.expect("Failed to produce v1 records");
ctx.kafka
.register_schema(SCHEMA_V2)
.await
.expect("Failed to register v2 schema");
let v2_records: Vec<RecordV2> = (6..=10)
.map(|i| RecordV2 {
id: i,
data: format!("data_v2_{}", i),
version: 2,
})
.collect();
ctx.kafka
.produce_avro_records(&v2_records)
.await
.expect("Failed to produce v2 records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: schema_evolution_test
schema: public
primary_key: id
on_conflict: update
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(10)
.timeout(std::time::Duration::from_secs(60)),
)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.schema_evolution_test")
.await
.expect("Failed to query count");
assert_eq!(count, 10, "Should have processed all 10 records");
let v1_count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.schema_evolution_test WHERE id <= 5")
.await
.expect("Failed to query v1 records");
assert_eq!(v1_count, 5, "Should have 5 v1 records (id 1-5)");
let v2_count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.schema_evolution_test WHERE id > 5")
.await
.expect("Failed to query v2 records");
assert_eq!(v2_count, 5, "Should have 5 v2 records (id 6-10)");
}
#[derive(Debug, Clone, Serialize)]
struct UserV1 {
id: i64,
name: String,
}
#[derive(Debug, Clone, Serialize)]
struct UserV2 {
id: i64,
name: String,
email: Option<String>,
}
const USER_SCHEMA_V1: &str = r#"{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}"#;
const USER_SCHEMA_V2: &str = r#"{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}"#;
#[tokio::test]
async fn test_schema_evolution_nullable_field() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(USER_SCHEMA_V1)
.await
.expect("Failed to register v1 schema");
let v1_users: Vec<UserV1> = (1..=3)
.map(|i| UserV1 {
id: i,
name: format!("user_{}", i),
})
.collect();
ctx.kafka
.produce_avro_records(&v1_users)
.await
.expect("Failed to produce v1 users");
ctx.kafka
.register_schema(USER_SCHEMA_V2)
.await
.expect("Failed to register v2 schema");
let v2_users: Vec<UserV2> = (4..=6)
.map(|i| UserV2 {
id: i,
name: format!("user_{}", i),
email: Some(format!("user{}@example.com", i)),
})
.collect();
ctx.kafka
.produce_avro_records(&v2_users)
.await
.expect("Failed to produce v2 users");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: schema_evolution_nullable_test
schema: public
primary_key: id
on_conflict: update
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(6)
.timeout(std::time::Duration::from_secs(60)),
)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.schema_evolution_nullable_test")
.await
.expect("Failed to query count");
assert_eq!(count, 6, "Should have processed all 6 records");
let name_count = ctx
.postgres
.count(
"SELECT COUNT(*) FROM public.schema_evolution_nullable_test WHERE name LIKE 'user_%'",
)
.await
.expect("Failed to query names");
assert_eq!(name_count, 6, "All records should have names");
}
#[tokio::test]
async fn test_validate_writer_schema_ordering_default_fails_on_newer_writer() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(SCHEMA_V1)
.await
.expect("Failed to register v1 schema");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
# validate_writer_schema_ordering defaults to true
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: writer_ahead_default_test
schema: public
primary_key: id
on_conflict: update
"#,
topic = ctx.kafka_topic,
);
let pipeline_fut = ctx.run_pipeline_raw(
&pipeline,
PipelineOpts::new()
.record_limit(10)
.timeout(std::time::Duration::from_secs(30)),
);
let kafka = &ctx.kafka;
let evolve_fut = async move {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
kafka
.register_schema(SCHEMA_V2)
.await
.expect("Failed to register v2 schema");
kafka
.produce_avro_records(&[RecordV2 {
id: 42,
data: "newer-than-reader".to_string(),
version: 2,
}])
.await
.expect("Failed to produce v2 record");
};
let (output_result, _) = tokio::join!(pipeline_fut, evolve_fut);
let output = output_result.expect("Pipeline future should not error (timeout?)");
assert!(
!output.status.success(),
"Expected pipeline to fail fast when writer schema is ahead, got success status: {:?}\nstderr:\n{}",
output.status,
output.stderr,
);
assert!(
output.stderr.contains("Schema resolution failed")
|| output.stderr.contains("writer_schema_id"),
"Expected stderr to mention the writer-ahead error, got:\n{}",
output.stderr,
);
}
#[tokio::test]
async fn test_validate_writer_schema_ordering_disabled_processes_newer_writer() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(SCHEMA_V1)
.await
.expect("Failed to register v1 schema");
ctx.kafka
.produce_avro_records(&[RecordV1 {
id: 1,
data: "v1-baseline".to_string(),
}])
.await
.expect("Failed to produce v1 record");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
validate_writer_schema_ordering: false
transforms: {{}}
sinks:
pg_sink:
type: postgres
from: kafka_source
table: writer_ahead_disabled_test
schema: public
primary_key: id
on_conflict: update
"#,
topic = ctx.kafka_topic,
);
let pipeline_fut = ctx.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(2)
.timeout(std::time::Duration::from_secs(60)),
);
let kafka = &ctx.kafka;
let evolve_fut = async move {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
kafka
.register_schema(SCHEMA_V2)
.await
.expect("Failed to register v2 schema");
kafka
.produce_avro_records(&[RecordV2 {
id: 2,
data: "v2-after-startup".to_string(),
version: 99,
}])
.await
.expect("Failed to produce v2 record");
};
let (status_result, _) = tokio::join!(pipeline_fut, evolve_fut);
let status = status_result.expect("Pipeline future should not error");
assert!(
status.success(),
"Expected pipeline to succeed with validate_writer_schema_ordering=false, got: {status:?}"
);
let count = ctx
.postgres
.count("SELECT COUNT(*) FROM public.writer_ahead_disabled_test")
.await
.expect("Failed to query count");
assert_eq!(count, 2, "Should have processed both v1 and v2 records");
}