use axum::http::StatusCode;
use serde::Serialize;
use streamling_e2e::resources::WebhookResource;
use streamling_e2e::{init_tracing, PipelineOpts, TestContext};
#[derive(Debug, Clone, Serialize)]
struct TestRecord {
id: i64,
value: String,
timestamp: i64,
}
const TEST_SCHEMA: &str = r#"{
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "id", "type": "long"},
{"name": "value", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}"#;
#[tokio::test]
async fn test_webhook_sink_basic() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let webhook = WebhookResource::new()
.await
.expect("Failed to start webhook server");
let records_to_produce = 100;
let records: Vec<TestRecord> = (1..=records_to_produce)
.map(|i| TestRecord {
id: i,
value: format!("value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
webhook_sink:
type: webhook
from: kafka_source
url: {webhook_url}
one_row_per_request: true
payload_version: 0
"#,
topic = ctx.kafka_topic,
webhook_url = webhook.webhook_url(),
);
let status = ctx
.run_pipeline(&pipeline, records_to_produce as u64)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let received_all = webhook
.wait_for_requests(
records_to_produce as usize,
std::time::Duration::from_secs(10),
)
.await;
assert!(
received_all,
"Expected {} webhook requests, got {}",
records_to_produce,
webhook.request_count()
);
assert_eq!(
webhook.request_count(),
records_to_produce as usize,
"Should have received {} webhook requests",
records_to_produce
);
}
#[tokio::test]
async fn test_webhook_sink_multiple_batches() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let webhook = WebhookResource::new()
.await
.expect("Failed to start webhook server");
let records_to_produce = 500;
let records: Vec<TestRecord> = (1..=records_to_produce)
.map(|i| TestRecord {
id: i,
value: format!("value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
webhook_sink:
type: webhook
from: kafka_source
url: {webhook_url}
one_row_per_request: true
payload_version: 0
"#,
topic = ctx.kafka_topic,
webhook_url = webhook.webhook_url(),
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(records_to_produce as u64)
.timeout(std::time::Duration::from_secs(120)),
)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let received_all = webhook
.wait_for_requests(
records_to_produce as usize,
std::time::Duration::from_secs(30),
)
.await;
assert!(
received_all,
"Expected {} webhook requests, got {}",
records_to_produce,
webhook.request_count()
);
}
#[tokio::test]
async fn test_webhook_sink_skip_on_error_continues_after_500() {
init_tracing();
let ctx = TestContext::new()
.await
.expect("Failed to create test context");
ctx.kafka
.register_schema(TEST_SCHEMA)
.await
.expect("Failed to register schema");
let webhook = WebhookResource::new_with_response_plan(vec![
StatusCode::INTERNAL_SERVER_ERROR,
StatusCode::OK,
StatusCode::OK,
])
.await
.expect("Failed to start webhook server");
let records: Vec<TestRecord> = (1..=3)
.map(|i| TestRecord {
id: i,
value: format!("value_{}", i),
timestamp: 1000 + i,
})
.collect();
ctx.kafka
.produce_avro_records(&records)
.await
.expect("Failed to produce records");
let pipeline = format!(
r#"
sources:
kafka_source:
type: kafka
topic: {topic}
starting_offsets: earliest
primary_key: id
transforms: {{}}
sinks:
webhook_sink:
type: webhook
from: kafka_source
url: {webhook_url}
one_row_per_request: true
payload_version: 0
skip_on_error: true
"#,
topic = ctx.kafka_topic,
webhook_url = webhook.webhook_url(),
);
let status = ctx
.run_pipeline_with_opts(
&pipeline,
PipelineOpts::new()
.record_limit(records.len() as u64)
.env("STREAMLING__RECORD_BATCH_SIZE", "1"),
)
.await
.expect("Streamling execution failed");
assert!(status.success(), "Streamling should exit successfully");
let received_all = webhook
.wait_for_requests(records.len(), std::time::Duration::from_secs(10))
.await;
assert!(
received_all,
"Expected {} webhook requests, got {}",
records.len(),
webhook.request_count()
);
}