use serde::Serialize;
use streamling_e2e::{init_tracing, PipelineOpts, TestContext};
#[derive(Debug, Clone, Serialize)]
struct TestRecord {
id: i64,
value: String,
timestamp: i64,
}
const TEST_SCHEMA: &str = r#"{
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "id", "type": "long"},
{"name": "value", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}"#;
#[tokio::test]
async fn test_print_sink_basic() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records_to_produce = 100;
let records: Vec<TestRecord> = (1..=records_to_produce)
.map(|i| TestRecord {
id: i,
value: format!("value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
print_sink:
type: print
from: kafka_source
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(records_to_produce as u64)
.timeout(std::time::Duration::from_secs(60)), )
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
}
#[tokio::test]
async fn test_print_sink_early_exit() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records_to_produce = 100;
let stop_at = 50;
let records: Vec<TestRecord> = (1..=records_to_produce)
.map(|i| TestRecord {
id: i,
value: format!("value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
print_sink:
type: print
from: kafka_source
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline(&pipeline, stop_at)
.await
.expect("Streamling execution failed");
assert!(
status.success(),
"Streamling should exit successfully even with early stop"
);
}
#[tokio::test]
async fn test_print_sink_multiple_batches() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records_to_produce = 500;
let records: Vec<TestRecord> = (1..=records_to_produce)
.map(|i| TestRecord {
id: i,
value: format!("value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
print_sink:
type: print
from: kafka_source
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(records_to_produce as u64)
.timeout(std::time::Duration::from_secs(60)),
)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
}
#[tokio::test]
async fn test_blackhole_sink() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let records_to_produce = 500;
let records: Vec<TestRecord> = (1..=records_to_produce)
.map(|i| TestRecord {
id: i,
value: format!("value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
blackhole_sink:
type: blackhole
from: kafka_source
"#,
topic = ctx.kafka_topic,
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(records_to_produce as u64)
.timeout(std::time::Duration::from_secs(60)),
)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
}