pub mod inspect;
pub mod resources;
pub mod streamling;
use resources::{
ClickHouseResource, KafkaResource, MySqlResource, PostgresResource, PrometheusResource,
SqsResource,
};
use std::env;
use std::path::PathBuf;
use std::process::ExitStatus;
use tempfile::TempDir;
use thiserror::Error;
use tracing::info;
use uuid::Uuid;
pub use resources::{PrintSinkOutput, PrintSinkRow};
pub use streamling::StreamlingOutput;
#[derive(Error, Debug)]
pub enum E2eError {
#[error("Environment variable not set: {0}")]
EnvVarNotSet(String),
#[error("PostgreSQL error: {0}")]
Postgres(#[from] sqlx::Error),
#[error("Kafka error: {0}")]
Kafka(String),
#[error("ClickHouse error: {0}")]
ClickHouse(String),
#[error("MySQL error: {0}")]
Mysql(String),
#[error("Prometheus error: {0}")]
Prometheus(String),
#[error("SQS error: {0}")]
Sqs(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("YAML error: {0}")]
Yaml(#[from] serde_yaml::Error),
#[error("Streamling execution failed: {0}")]
StreamlingFailed(String),
}
pub type Result<T> = std::result::Result<T, E2eError>;
#[derive(Debug, Clone)]
pub struct E2eConfig {
pub postgres_url: String,
pub kafka_broker: String,
pub schema_registry_url: String,
pub clickhouse_url: String,
pub mysql_url: Option<String>,
pub prometheus_url: Option<String>,
pub sqs_url: Option<String>,
pub streamling_bin: Option<PathBuf>,
}
impl E2eConfig {
pub fn from_env() -> Result<Self> {
Ok(Self {
postgres_url: env::var("E2E_POSTGRES_URL")
.map_err(|_| E2eError::EnvVarNotSet("E2E_POSTGRES_URL".to_string()))?,
kafka_broker: env::var("E2E_KAFKA_BROKER")
.map_err(|_| E2eError::EnvVarNotSet("E2E_KAFKA_BROKER".to_string()))?,
schema_registry_url: env::var("E2E_SCHEMA_REGISTRY_URL")
.map_err(|_| E2eError::EnvVarNotSet("E2E_SCHEMA_REGISTRY_URL".to_string()))?,
clickhouse_url: env::var("E2E_CLICKHOUSE_URL")
.map_err(|_| E2eError::EnvVarNotSet("E2E_CLICKHOUSE_URL".to_string()))?,
mysql_url: env::var("E2E_MYSQL_URL").ok(),
prometheus_url: env::var("E2E_PROMETHEUS_URL").ok(),
sqs_url: env::var("E2E_SQS_URL").ok(),
streamling_bin: env::var("E2E_STREAMLING_BIN").ok().map(PathBuf::from),
})
}
}
pub struct TestContext {
pub test_id: String,
pub config: E2eConfig,
pub postgres: PostgresResource,
pub kafka: KafkaResource,
pub clickhouse: Option<ClickHouseResource>,
pub mysql: Option<MySqlResource>,
pub prometheus: Option<PrometheusResource>,
pub sqs: Option<SqsResource>,
pub use_plugin: bool,
pub temp_dir: TempDir,
pub pg_database: String,
pub kafka_topic: String,
}
impl TestContext {
pub async fn new() -> Result<Self> {
Self::with_options(TestContextOptions::default()).await
}
pub async fn with_options(options: TestContextOptions) -> Result<Self> {
let config = E2eConfig::from_env()?;
let test_id = Uuid::new_v4().to_string();
let short_id = &test_id[..8];
info!("Creating test context with id: {}", test_id);
let temp_dir = TempDir::new()?;
let pg_database = format!("test_{}", short_id);
let postgres = PostgresResource::new(&config.postgres_url, &pg_database).await?;
info!("Created PostgreSQL database: {}", pg_database);
let kafka_topic = format!("test_{}_topic", short_id);
let kafka = KafkaResource::new(
&config.kafka_broker,
&config.schema_registry_url,
&kafka_topic,
)
.await?;
info!("Created Kafka topic: {}", kafka_topic);
let clickhouse = if options.with_clickhouse {
let ch_database = format!("test_{}", short_id);
Some(ClickHouseResource::new(&config.clickhouse_url, &ch_database).await?)
} else {
None
};
let mysql = if options.with_mysql {
let mysql_url = config.mysql_url.as_deref().ok_or_else(|| {
E2eError::Mysql("E2E_MYSQL_URL must be set when with_mysql is enabled".into())
})?;
let mysql_database = format!("test_{}", short_id);
Some(
MySqlResource::new(mysql_url, &mysql_database)
.await
.map_err(|e| {
E2eError::Mysql(format!("failed to create MySQL resource: {}", e))
})?,
)
} else {
None
};
let prometheus = if options.with_prometheus {
config
.prometheus_url
.as_ref()
.map(|url| PrometheusResource::new(url))
} else {
None
};
let sqs = if options.with_sqs {
if let Some(url) = &config.sqs_url {
let sqs_queue = format!("test_{}_queue", short_id);
Some(SqsResource::new(url, &sqs_queue).await?)
} else {
None
}
} else {
None
};
Ok(Self {
test_id,
config,
postgres,
kafka,
clickhouse,
mysql,
prometheus,
sqs,
use_plugin: options.with_plugin,
temp_dir,
pg_database,
kafka_topic,
})
}
pub fn pg_connection_string(&self) -> String {
self.postgres.connection_string()
}
pub async fn run_streamling(&self, pipeline_yaml: &str) -> Result<ExitStatus> {
let pipeline_path = self.temp_dir.path().join("pipeline.yaml");
std::fs::write(&pipeline_path, pipeline_yaml)?;
streamling::run_streamling(
&pipeline_path,
self.config.streamling_bin.as_deref(),
&self.build_env_vars(),
)
.await
}
pub async fn run_streamling_file(&self, pipeline_path: &std::path::Path) -> Result<ExitStatus> {
streamling::run_streamling(
pipeline_path,
self.config.streamling_bin.as_deref(),
&self.build_env_vars(),
)
.await
}
pub fn build_env_vars(&self) -> Vec<(String, String)> {
let mut env_vars = vec![
(
"STREAMLING__KAFKA_SOURCE__BROKERS".to_string(),
self.config.kafka_broker.clone(),
),
(
"STREAMLING__KAFKA_SOURCE__SCHEMA_REGISTRY_URL".to_string(),
self.config.schema_registry_url.clone(),
),
(
"STREAMLING__KAFKA_SOURCE__CONSUMER_GROUP_ID".to_string(),
format!("e2e-test-{}", self.test_id),
),
(
"STREAMLING__POSTGRES_SINK__HOST".to_string(),
self.postgres.host.clone(),
),
(
"STREAMLING__POSTGRES_SINK__PORT".to_string(),
self.postgres.port.to_string(),
),
(
"STREAMLING__POSTGRES_SINK__USER".to_string(),
self.postgres.user.clone(),
),
(
"STREAMLING__POSTGRES_SINK__PASS".to_string(),
self.postgres.password.clone(),
),
(
"STREAMLING__POSTGRES_SINK__DB".to_string(),
self.pg_database.clone(),
),
(
"STREAMLING__STATE_BACKEND__BACKEND_TYPE".to_string(),
"InMemory".to_string(),
),
(
"STREAMLING__KAFKA_SINK__BROKERS".to_string(),
self.config.kafka_broker.clone(),
),
(
"STREAMLING__KAFKA_SINK__SCHEMA_REGISTRY_URL".to_string(),
self.config.schema_registry_url.clone(),
),
("STREAMLING__ADMIN_API_PORT".to_string(), "0".to_string()),
];
if let Some(clickhouse) = &self.clickhouse {
env_vars.push((
"STREAMLING__CLICKHOUSE_SOURCE__URL".to_string(),
self.config.clickhouse_url.clone(),
));
env_vars.push((
"STREAMLING__CLICKHOUSE_SOURCE__USER".to_string(),
"default".to_string(),
));
env_vars.push((
"STREAMLING__CLICKHOUSE_SOURCE__PASSWORD".to_string(),
String::new(),
));
env_vars.push((
"STREAMLING__CLICKHOUSE_SOURCE__DATABASE".to_string(),
clickhouse.database.clone(),
));
env_vars.push((
"STREAMLING__CLICKHOUSE_SINK__URL".to_string(),
self.config.clickhouse_url.clone(),
));
env_vars.push((
"STREAMLING__CLICKHOUSE_SINK__USER".to_string(),
"default".to_string(),
));
env_vars.push((
"STREAMLING__CLICKHOUSE_SINK__PASSWORD".to_string(),
String::new(),
));
env_vars.push((
"STREAMLING__CLICKHOUSE_SINK__DATABASE".to_string(),
clickhouse.database.clone(),
));
}
if self.sqs.is_some() {
if let Some(sqs_url) = &self.config.sqs_url {
env_vars.push(("AWS_ACCESS_KEY_ID".to_string(), "test".to_string()));
env_vars.push(("AWS_SECRET_ACCESS_KEY".to_string(), "test".to_string()));
env_vars.push(("AWS_DEFAULT_REGION".to_string(), "us-east-1".to_string()));
env_vars.push(("AWS_ENDPOINT_URL".to_string(), sqs_url.clone()));
}
}
if self.use_plugin {
if let Ok(plugin_path) = env::var("STREAMLING__PLUGIN__PATH") {
if !plugin_path.is_empty() && PathBuf::from(&plugin_path).exists() {
env_vars.push(("STREAMLING__PLUGIN__PATH".to_string(), plugin_path));
}
}
}
if let Some(prometheus) = &self.prometheus {
env_vars.push((
"STREAMLING__APPLICATION_ID".to_string(),
self.test_id.clone(),
));
env_vars.push((
"STREAMLING__OPEN_TELEMETRY_METRICS__INGESTION_ENDPOINT".to_string(),
prometheus.ingestion_endpoint.clone(),
));
env_vars.push((
"STREAMLING__OPEN_TELEMETRY_METRICS__ENDPOINT_PROTOCOL".to_string(),
"http/protobuf".to_string(),
));
env_vars.push((
"STREAMLING__OPEN_TELEMETRY_METRICS__BATCH_INTERVAL_SECS".to_string(),
"1".to_string(),
));
}
env_vars
}
pub async fn create_sqs_queue(&self, queue_suffix: &str) -> Result<SqsResource> {
let queue_name = format!("test_{}_{}", &self.test_id[..8], queue_suffix);
let sqs_url = self
.config
.sqs_url
.as_ref()
.ok_or_else(|| E2eError::EnvVarNotSet("E2E_SQS_URL".to_string()))?;
SqsResource::new(sqs_url, &queue_name).await
}
pub async fn create_kafka_topic(&self, topic_suffix: &str) -> Result<KafkaResource> {
let topic = format!("test_{}_{}", &self.test_id[..8], topic_suffix);
KafkaResource::new(
&self.config.kafka_broker,
&self.config.schema_registry_url,
&topic,
)
.await
}
pub async fn consume_kafka_messages(
&self,
topic: &str,
max_messages: usize,
) -> Result<Vec<(i64, String, String)>> {
let (messages, _) = KafkaResource::inspect_topic_messages(
&self.config.kafka_broker,
&self.config.schema_registry_url,
topic,
max_messages,
max_messages,
)
.await?;
Ok(messages)
}
pub async fn run_pipeline(&self, pipeline_yaml: &str, record_limit: u64) -> Result<ExitStatus> {
self.run_pipeline_with_opts(
pipeline_yaml,
PipelineOpts::new().record_limit(record_limit),
)
.await
}
pub async fn run_pipeline_with_opts(
&self,
pipeline_yaml: &str,
opts: PipelineOpts,
) -> Result<ExitStatus> {
let pipeline_path = self.temp_dir.path().join("pipeline.yaml");
std::fs::write(&pipeline_path, pipeline_yaml)?;
let mut env_vars = self.build_env_vars();
if let Some(limit) = opts.record_limit {
env_vars.push((
"STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
limit.to_string(),
));
}
env_vars.extend(opts.extra_env);
let timeout = opts.timeout.unwrap_or(std::time::Duration::from_secs(30));
tokio::time::timeout(
timeout,
streamling::run_streamling(
&pipeline_path,
self.config.streamling_bin.as_deref(),
&env_vars,
),
)
.await
.map_err(|_| {
E2eError::StreamlingFailed(format!("Pipeline execution timed out after {:?}", timeout))
})?
}
pub async fn run_pipeline_with_capture(
&self,
pipeline_yaml: &str,
opts: PipelineOpts,
) -> Result<PrintSinkOutput> {
let pipeline_path = self.temp_dir.path().join("pipeline.yaml");
std::fs::write(&pipeline_path, pipeline_yaml)?;
let mut env_vars = self.build_env_vars();
if let Some(limit) = opts.record_limit {
env_vars.push((
"STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
limit.to_string(),
));
}
env_vars.extend(opts.extra_env);
let timeout = opts.timeout.unwrap_or(std::time::Duration::from_secs(30));
let output = tokio::time::timeout(
timeout,
streamling::run_streamling_with_capture(
&pipeline_path,
self.config.streamling_bin.as_deref(),
&env_vars,
),
)
.await
.map_err(|_| {
E2eError::StreamlingFailed(format!("Pipeline execution timed out after {:?}", timeout))
})??;
let parsed = PrintSinkOutput::parse(&output.stderr);
tracing::debug!("Parsed {} rows from print sink output", parsed.len());
Ok(parsed)
}
pub async fn run_pipeline_raw(
&self,
pipeline_yaml: &str,
opts: PipelineOpts,
) -> Result<StreamlingOutput> {
let pipeline_path = self.temp_dir.path().join("pipeline.yaml");
std::fs::write(&pipeline_path, pipeline_yaml)?;
let mut env_vars = self.build_env_vars();
if let Some(limit) = opts.record_limit {
env_vars.push((
"STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
limit.to_string(),
));
}
env_vars.extend(opts.extra_env);
let timeout = opts.timeout.unwrap_or(std::time::Duration::from_secs(30));
tokio::time::timeout(
timeout,
streamling::run_streamling_raw(
&pipeline_path,
self.config.streamling_bin.as_deref(),
&env_vars,
&opts.extra_args,
),
)
.await
.map_err(|_| {
E2eError::StreamlingFailed(format!("Pipeline execution timed out after {:?}", timeout))
})?
}
}
#[derive(Debug, Clone)]
pub struct PipelineOpts {
pub record_limit: Option<u64>,
pub extra_env: Vec<(String, String)>,
pub timeout: Option<std::time::Duration>,
pub extra_args: Vec<String>,
}
impl Default for PipelineOpts {
fn default() -> Self {
Self {
record_limit: None,
extra_env: Vec::new(),
timeout: Some(std::time::Duration::from_secs(30)),
extra_args: Vec::new(),
}
}
}
impl PipelineOpts {
pub fn new() -> Self {
Self::default()
}
pub fn record_limit(mut self, limit: u64) -> Self {
self.record_limit = Some(limit);
self
}
pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.extra_env.push((key.into(), value.into()));
self
}
pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn no_timeout(mut self) -> Self {
self.timeout = None;
self
}
pub fn arg(mut self, arg: impl Into<String>) -> Self {
self.extra_args.push(arg.into());
self
}
}
#[derive(Debug, Clone, Default)]
pub struct TestContextOptions {
pub with_clickhouse: bool,
pub with_mysql: bool,
pub with_prometheus: bool,
pub with_sqs: bool,
pub with_plugin: bool,
}
impl TestContextOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_clickhouse(mut self) -> Self {
self.with_clickhouse = true;
self
}
pub fn with_mysql(mut self) -> Self {
self.with_mysql = true;
self
}
pub fn with_prometheus(mut self) -> Self {
self.with_prometheus = true;
self
}
pub fn with_sqs(mut self) -> Self {
self.with_sqs = true;
self
}
pub fn with_plugin(mut self) -> Self {
self.with_plugin = true;
self
}
}
pub fn init_tracing() {
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
let _ = tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.try_init();
}