use serde::{Deserialize, Serialize};
use streamling_e2e::{init_tracing, PipelineOpts, TestContext};
#[derive(Debug, Clone, Serialize)]
struct TestRecord {
block: i64,
id: String,
data: String,
}
const TEST_SCHEMA: &str = r#"{
"type": "record",
"name": "TestMessage",
"fields": [
{"name": "block", "type": "long"},
{"name": "id", "type": "string"},
{"name": "data", "type": "string"}
]
}"#;
#[tokio::test]
async fn test_pipeline_primary_key_check() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (0..10)
.map(|i| TestRecord {
block: i,
id: format!("id_{}", i),
data: format!("data{}", i),
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
test_kafka_source:
type: kafka
topic: {topic}
primary_key: does_not_exist
transforms: {{}}
sinks:
blackhole_sink:
type: blackhole
from: test_kafka_source
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_raw(&pipeline, PipelineOpts::new().record_limit(10))
.await
.expect("Failed to run pipeline");
assert!(
!output.status.success(),
"Pipeline should have failed due to invalid primary key"
);
let combined_output = format!("{}\n{}", output.stdout, output.stderr);
assert!(
combined_output.contains("Primary key validation failed for node 'test_kafka_source'")
&& combined_output.contains("columns [\"does_not_exist\"] not found in schema")
&& combined_output
.contains("Available columns: [\"block\", \"id\", \"data\", \"_gs_op\"]"),
"Expected primary key validation error with details, got stdout:\n{}\nstderr:\n{}",
output.stdout,
output.stderr
);
}
#[tokio::test]
async fn test_pipeline_invalid_sql_transform() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (0..5)
.map(|i| TestRecord {
block: i,
id: format!("id_{}", i),
data: format!("data{}", i),
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
test_kafka_source:
type: kafka
topic: {topic}
primary_key: id
transforms:
bad_sql:
type: sql
sql: "SELEKT id, data FORM test_kafka_source"
primary_key: id
sinks:
blackhole_sink:
type: blackhole
from: bad_sql
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_raw(&pipeline, PipelineOpts::new().record_limit(5))
.await
.expect("Failed to run pipeline");
assert!(
!output.status.success(),
"Pipeline should have failed due to invalid SQL"
);
let combined_output = format!("{}\n{}", output.stdout, output.stderr);
assert!(
combined_output.contains("SQL") || combined_output.contains("sql"),
"Error should mention SQL, got stdout:\n{}\nstderr:\n{}",
output.stdout,
output.stderr
);
}
#[tokio::test]
async fn test_pipeline_undefined_dynamic_table() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (0..5)
.map(|i| TestRecord {
block: i,
id: format!("id_{}", i),
data: format!("data{}", i),
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
test_kafka_source:
type: kafka
topic: {topic}
primary_key: id
transforms:
filtered:
type: sql
sql: "SELECT id, data FROM test_kafka_source WHERE dynamic_table_check('nonexistent_table', id)"
primary_key: id
sinks:
blackhole_sink:
type: blackhole
from: filtered
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_raw(&pipeline, PipelineOpts::new().record_limit(5))
.await
.expect("Failed to run pipeline");
assert!(
!output.status.success(),
"Pipeline should have failed due to undefined dynamic table"
);
let combined_output = format!("{}\n{}", output.stdout, output.stderr);
assert!(
combined_output.contains("dynamic table 'nonexistent_table'")
&& combined_output.contains("not defined in pipeline topology"),
"Error should mention the undefined dynamic table, got stdout:\n{}\nstderr:\n{}",
output.stdout,
output.stderr
);
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct ValidationOutput {
success: bool,
is_valid: bool,
errors: Vec<String>,
warnings: Vec<String>,
}
#[tokio::test]
async fn test_pipeline_validate_json_output() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TestRecord> = (0..10)
.map(|i| TestRecord {
block: i,
id: format!("id_{}", i),
data: format!("data{}", i),
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
test_kafka_source:
type: kafka
topic: {topic}
primary_key: does_not_exist
transforms: {{}}
sinks:
blackhole_sink:
type: blackhole
from: test_kafka_source
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_raw(
&pipeline,
PipelineOpts::new().record_limit(10).arg("--validate"),
)
.await
.expect("Failed to run pipeline");
assert!(
!output.status.success(),
"Pipeline with invalid primary key should exit non-zero under --validate"
);
let validation: ValidationOutput = serde_json::from_str(&output.stdout).unwrap_or_else(|e| {
panic!(
"Failed to parse validation JSON from stdout: {}\nstdout was:\n{}",
e, output.stdout
)
});
assert!(
validation.success,
"User-facing validation error → success should be true (validation ran)"
);
assert!(
!validation.is_valid,
"Pipeline has errors → is_valid should be false"
);
assert!(
!validation.errors.is_empty(),
"Expected at least one error entry"
);
let all_errors = validation.errors.join("\n");
assert!(
all_errors.contains("Primary key validation failed for node 'test_kafka_source': columns [\"does_not_exist\"] not found in schema. Available columns: [\"block\", \"id\", \"data\", \"_gs_op\"]"),
"Errors should mention the primary key issue, got: {:?}",
validation.errors
);
}
const BLOCK_NUMBER_SCHEMA: &str = r#"{
"type": "record",
"name": "BlockRecord",
"fields": [
{"name": "block_number", "type": "long"},
{"name": "id", "type": "string"},
{"name": "data", "type": "string"}
]
}"#;
#[derive(Debug, Clone, Serialize)]
struct BlockRecord {
block_number: i64,
id: String,
data: String,
}
#[tokio::test]
async fn test_invalid_column_in_source_filter() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(BLOCK_NUMBER_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<BlockRecord> = (0..5)
.map(|i| BlockRecord {
block_number: 27535200 + i,
id: format!("id_{}", i),
data: format!("data{}", i),
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
blocks:
type: kafka
topic: {topic}
filter: number > 27535200 and number < 27568220
transforms: {{}}
sinks:
blackhole_sink:
type: blackhole
from: blocks
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_raw(
&pipeline,
PipelineOpts::new().record_limit(5).arg("--validate"),
)
.await
.expect("Failed to run pipeline");
assert!(
!output.status.success(),
"Pipeline with invalid filter column should exit non-zero"
);
let validation: ValidationOutput = serde_json::from_str(&output.stdout).unwrap_or_else(|e| {
panic!(
"Failed to parse validation JSON from stdout: {}\nstdout was:\n{}",
e, output.stdout
)
});
assert!(
!validation.success,
"Internal error → success should be false (validation could not complete)"
);
assert!(
!validation.is_valid,
"Pipeline has errors → is_valid should be false"
);
assert!(
!validation.errors.is_empty(),
"Expected at least one error entry"
);
let all_errors = validation.errors.join("\n");
assert!(
all_errors.contains("kafka source 'blocks': failed to create Kafka source"),
"Error should mention the Kafka source context, got: {:?}",
validation.errors
);
assert!(
all_errors.contains("No field named number")
&& all_errors.contains("Did you mean 'block_number'"),
"Error should mention the missing column 'number' and suggest 'block_number', got: {:?}",
validation.errors
);
let schema_error_count = all_errors
.matches("Schema error: No field named number")
.count();
assert_eq!(
schema_error_count, 1,
"Schema error should appear exactly once (no duplicates), got: {:?}",
validation.errors
);
}
const U256_FILTER_SCHEMA: &str = r#"{
"type": "record",
"name": "TraceRecord",
"fields": [
{"name": "id", "type": "long"},
{"name": "call_type", "type": "string"},
{"name": "value_str", "type": "string"}
]
}"#;
#[derive(Debug, Clone, Serialize)]
struct TraceRecord {
id: i64,
call_type: String,
value_str: String,
}
#[tokio::test]
async fn test_validate_u256_comparison_with_boolean_predicate() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(U256_FILTER_SCHEMA)
.await
.expect("Failed to register schema");
let records: Vec<TraceRecord> = vec![
TraceRecord {
id: 1,
call_type: "call".to_string(),
value_str: "1000".to_string(),
},
TraceRecord {
id: 2,
call_type: "delegatecall".to_string(),
value_str: "500".to_string(),
},
TraceRecord {
id: 3,
call_type: "call".to_string(),
value_str: "0".to_string(),
},
];
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms:
with_amount:
type: sql
sql: "SELECT id, call_type, to_u256(value_str) as amount FROM kafka_source"
primary_key: id
filtered:
type: sql
sql: "SELECT id, call_type, amount FROM with_amount WHERE call_type <> 'delegatecall' AND amount > 0"
primary_key: id
sinks:
blackhole_sink:
type: blackhole
from: filtered
"#,
topic = ctx.kafka_topic
);
let output = ctx
.run_pipeline_raw(
&pipeline,
PipelineOpts::new().record_limit(3).arg("--validate"),
)
.await
.expect("Failed to run pipeline");
let validation: ValidationOutput = serde_json::from_str(&output.stdout).unwrap_or_else(|e| {
panic!(
"Failed to parse validation JSON from stdout: {}\nstdout was:\n{}\nstderr was:\n{}",
e, output.stdout, output.stderr
)
});
assert!(
validation.success,
"Validation should have run successfully, got errors: {:?}",
validation.errors
);
assert!(
validation.is_valid,
"Pipeline should be valid — u256 comparison combined with boolean predicate \
must not corrupt the SQL. Errors: {:?}",
validation.errors
);
assert!(
validation.errors.is_empty(),
"Expected no validation errors, got: {:?}",
validation.errors
);
assert!(
output.status.success(),
"Pipeline should exit zero when validation passes.\nstdout:\n{}\nstderr:\n{}",
output.stdout,
output.stderr
);
}