1pub mod inspect;
36pub mod resources;
37pub mod streamling;
38
39use resources::{
40 ClickHouseResource, KafkaResource, MySqlResource, PostgresResource, PrometheusResource,
41 SqsResource,
42};
43use std::env;
44use std::path::PathBuf;
45use std::process::ExitStatus;
46use tempfile::TempDir;
47use thiserror::Error;
48use tracing::info;
49use uuid::Uuid;
50
51pub use resources::{PrintSinkOutput, PrintSinkRow};
53pub use streamling::StreamlingOutput;
54
55#[derive(Error, Debug)]
57pub enum E2eError {
58 #[error("Environment variable not set: {0}")]
59 EnvVarNotSet(String),
60
61 #[error("PostgreSQL error: {0}")]
62 Postgres(#[from] sqlx::Error),
63
64 #[error("Kafka error: {0}")]
65 Kafka(String),
66
67 #[error("ClickHouse error: {0}")]
68 ClickHouse(String),
69
70 #[error("MySQL error: {0}")]
71 Mysql(String),
72
73 #[error("Prometheus error: {0}")]
74 Prometheus(String),
75
76 #[error("SQS error: {0}")]
77 Sqs(String),
78
79 #[error("IO error: {0}")]
80 Io(#[from] std::io::Error),
81
82 #[error("YAML error: {0}")]
83 Yaml(#[from] serde_yaml::Error),
84
85 #[error("Streamling execution failed: {0}")]
86 StreamlingFailed(String),
87}
88
89pub type Result<T> = std::result::Result<T, E2eError>;
90
91#[derive(Debug, Clone)]
93pub struct E2eConfig {
94 pub postgres_url: String,
95 pub kafka_broker: String,
96 pub schema_registry_url: String,
97 pub clickhouse_url: String,
98 pub mysql_url: Option<String>,
99 pub prometheus_url: Option<String>,
100 pub sqs_url: Option<String>,
101 pub streamling_bin: Option<PathBuf>,
102}
103
104impl E2eConfig {
105 pub fn from_env() -> Result<Self> {
107 Ok(Self {
108 postgres_url: env::var("E2E_POSTGRES_URL")
109 .map_err(|_| E2eError::EnvVarNotSet("E2E_POSTGRES_URL".to_string()))?,
110 kafka_broker: env::var("E2E_KAFKA_BROKER")
111 .map_err(|_| E2eError::EnvVarNotSet("E2E_KAFKA_BROKER".to_string()))?,
112 schema_registry_url: env::var("E2E_SCHEMA_REGISTRY_URL")
113 .map_err(|_| E2eError::EnvVarNotSet("E2E_SCHEMA_REGISTRY_URL".to_string()))?,
114 clickhouse_url: env::var("E2E_CLICKHOUSE_URL")
115 .map_err(|_| E2eError::EnvVarNotSet("E2E_CLICKHOUSE_URL".to_string()))?,
116 mysql_url: env::var("E2E_MYSQL_URL").ok(),
117 prometheus_url: env::var("E2E_PROMETHEUS_URL").ok(),
118 sqs_url: env::var("E2E_SQS_URL").ok(),
119 streamling_bin: env::var("E2E_STREAMLING_BIN").ok().map(PathBuf::from),
120 })
121 }
122}
123
124pub struct TestContext {
126 pub test_id: String,
128
129 pub config: E2eConfig,
131
132 pub postgres: PostgresResource,
134
135 pub kafka: KafkaResource,
137
138 pub clickhouse: Option<ClickHouseResource>,
140
141 pub mysql: Option<MySqlResource>,
143
144 pub prometheus: Option<PrometheusResource>,
146
147 pub sqs: Option<SqsResource>,
149
150 pub use_plugin: bool,
154
155 pub temp_dir: TempDir,
157
158 pub pg_database: String,
160
161 pub kafka_topic: String,
163}
164
165impl TestContext {
166 pub async fn new() -> Result<Self> {
168 Self::with_options(TestContextOptions::default()).await
169 }
170
171 pub async fn with_options(options: TestContextOptions) -> Result<Self> {
173 let config = E2eConfig::from_env()?;
174 let test_id = Uuid::new_v4().to_string();
175 let short_id = &test_id[..8];
176
177 info!("Creating test context with id: {}", test_id);
178
179 let temp_dir = TempDir::new()?;
181
182 let pg_database = format!("test_{}", short_id);
184 let postgres = PostgresResource::new(&config.postgres_url, &pg_database).await?;
185 info!("Created PostgreSQL database: {}", pg_database);
186
187 let kafka_topic = format!("test_{}_topic", short_id);
189 let kafka = KafkaResource::new(
190 &config.kafka_broker,
191 &config.schema_registry_url,
192 &kafka_topic,
193 )
194 .await?;
195 info!("Created Kafka topic: {}", kafka_topic);
196
197 let clickhouse = if options.with_clickhouse {
199 let ch_database = format!("test_{}", short_id);
200 Some(ClickHouseResource::new(&config.clickhouse_url, &ch_database).await?)
201 } else {
202 None
203 };
204
205 let mysql = if options.with_mysql {
207 let mysql_url = config.mysql_url.as_deref().ok_or_else(|| {
208 E2eError::Mysql("E2E_MYSQL_URL must be set when with_mysql is enabled".into())
209 })?;
210 let mysql_database = format!("test_{}", short_id);
211 Some(
212 MySqlResource::new(mysql_url, &mysql_database)
213 .await
214 .map_err(|e| {
215 E2eError::Mysql(format!("failed to create MySQL resource: {}", e))
216 })?,
217 )
218 } else {
219 None
220 };
221
222 let prometheus = if options.with_prometheus {
224 config
225 .prometheus_url
226 .as_ref()
227 .map(|url| PrometheusResource::new(url))
228 } else {
229 None
230 };
231
232 let sqs = if options.with_sqs {
234 if let Some(url) = &config.sqs_url {
235 let sqs_queue = format!("test_{}_queue", short_id);
236 Some(SqsResource::new(url, &sqs_queue).await?)
237 } else {
238 None
239 }
240 } else {
241 None
242 };
243
244 Ok(Self {
245 test_id,
246 config,
247 postgres,
248 kafka,
249 clickhouse,
250 mysql,
251 prometheus,
252 sqs,
253 use_plugin: options.with_plugin,
254 temp_dir,
255 pg_database,
256 kafka_topic,
257 })
258 }
259
260 pub fn pg_connection_string(&self) -> String {
262 self.postgres.connection_string()
263 }
264
265 pub async fn run_streamling(&self, pipeline_yaml: &str) -> Result<ExitStatus> {
267 let pipeline_path = self.temp_dir.path().join("pipeline.yaml");
268 std::fs::write(&pipeline_path, pipeline_yaml)?;
269
270 streamling::run_streamling(
271 &pipeline_path,
272 self.config.streamling_bin.as_deref(),
273 &self.build_env_vars(),
274 )
275 .await
276 }
277
278 pub async fn run_streamling_file(&self, pipeline_path: &std::path::Path) -> Result<ExitStatus> {
280 streamling::run_streamling(
281 pipeline_path,
282 self.config.streamling_bin.as_deref(),
283 &self.build_env_vars(),
284 )
285 .await
286 }
287
288 pub fn build_env_vars(&self) -> Vec<(String, String)> {
290 let mut env_vars = vec![
291 (
292 "STREAMLING__KAFKA_SOURCE__BROKERS".to_string(),
293 self.config.kafka_broker.clone(),
294 ),
295 (
296 "STREAMLING__KAFKA_SOURCE__SCHEMA_REGISTRY_URL".to_string(),
297 self.config.schema_registry_url.clone(),
298 ),
299 (
301 "STREAMLING__KAFKA_SOURCE__CONSUMER_GROUP_ID".to_string(),
302 format!("e2e-test-{}", self.test_id),
303 ),
304 (
305 "STREAMLING__POSTGRES_SINK__HOST".to_string(),
306 self.postgres.host.clone(),
307 ),
308 (
309 "STREAMLING__POSTGRES_SINK__PORT".to_string(),
310 self.postgres.port.to_string(),
311 ),
312 (
313 "STREAMLING__POSTGRES_SINK__USER".to_string(),
314 self.postgres.user.clone(),
315 ),
316 (
317 "STREAMLING__POSTGRES_SINK__PASS".to_string(),
318 self.postgres.password.clone(),
319 ),
320 (
321 "STREAMLING__POSTGRES_SINK__DB".to_string(),
322 self.pg_database.clone(),
323 ),
324 (
326 "STREAMLING__STATE_BACKEND__BACKEND_TYPE".to_string(),
327 "InMemory".to_string(),
328 ),
329 (
331 "STREAMLING__KAFKA_SINK__BROKERS".to_string(),
332 self.config.kafka_broker.clone(),
333 ),
334 (
335 "STREAMLING__KAFKA_SINK__SCHEMA_REGISTRY_URL".to_string(),
336 self.config.schema_registry_url.clone(),
337 ),
338 ("STREAMLING__ADMIN_API_PORT".to_string(), "0".to_string()),
341 ];
342
343 if let Some(clickhouse) = &self.clickhouse {
345 env_vars.push((
347 "STREAMLING__CLICKHOUSE_SOURCE__URL".to_string(),
348 self.config.clickhouse_url.clone(),
349 ));
350 env_vars.push((
351 "STREAMLING__CLICKHOUSE_SOURCE__USER".to_string(),
352 "default".to_string(),
353 ));
354 env_vars.push((
355 "STREAMLING__CLICKHOUSE_SOURCE__PASSWORD".to_string(),
356 String::new(),
357 ));
358 env_vars.push((
359 "STREAMLING__CLICKHOUSE_SOURCE__DATABASE".to_string(),
360 clickhouse.database.clone(),
361 ));
362 env_vars.push((
364 "STREAMLING__CLICKHOUSE_SINK__URL".to_string(),
365 self.config.clickhouse_url.clone(),
366 ));
367 env_vars.push((
368 "STREAMLING__CLICKHOUSE_SINK__USER".to_string(),
369 "default".to_string(),
370 ));
371 env_vars.push((
372 "STREAMLING__CLICKHOUSE_SINK__PASSWORD".to_string(),
373 String::new(),
374 ));
375 env_vars.push((
376 "STREAMLING__CLICKHOUSE_SINK__DATABASE".to_string(),
377 clickhouse.database.clone(),
378 ));
379 }
380
381 if self.sqs.is_some() {
383 if let Some(sqs_url) = &self.config.sqs_url {
384 env_vars.push(("AWS_ACCESS_KEY_ID".to_string(), "test".to_string()));
386 env_vars.push(("AWS_SECRET_ACCESS_KEY".to_string(), "test".to_string()));
387 env_vars.push(("AWS_DEFAULT_REGION".to_string(), "us-east-1".to_string()));
388 env_vars.push(("AWS_ENDPOINT_URL".to_string(), sqs_url.clone()));
389 }
390 }
391
392 if self.use_plugin {
393 if let Ok(plugin_path) = env::var("STREAMLING__PLUGIN__PATH") {
394 if !plugin_path.is_empty() && PathBuf::from(&plugin_path).exists() {
395 env_vars.push(("STREAMLING__PLUGIN__PATH".to_string(), plugin_path));
396 }
397 }
398 }
399
400 if let Some(prometheus) = &self.prometheus {
402 env_vars.push((
404 "STREAMLING__APPLICATION_ID".to_string(),
405 self.test_id.clone(),
406 ));
407 env_vars.push((
408 "STREAMLING__OPEN_TELEMETRY_METRICS__INGESTION_ENDPOINT".to_string(),
409 prometheus.ingestion_endpoint.clone(),
410 ));
411 env_vars.push((
412 "STREAMLING__OPEN_TELEMETRY_METRICS__ENDPOINT_PROTOCOL".to_string(),
413 "http/protobuf".to_string(),
414 ));
415 env_vars.push((
416 "STREAMLING__OPEN_TELEMETRY_METRICS__BATCH_INTERVAL_SECS".to_string(),
417 "1".to_string(),
418 ));
419 }
420
421 env_vars
422 }
423
424 pub async fn create_sqs_queue(&self, queue_suffix: &str) -> Result<SqsResource> {
426 let queue_name = format!("test_{}_{}", &self.test_id[..8], queue_suffix);
427 let sqs_url = self
428 .config
429 .sqs_url
430 .as_ref()
431 .ok_or_else(|| E2eError::EnvVarNotSet("E2E_SQS_URL".to_string()))?;
432 SqsResource::new(sqs_url, &queue_name).await
433 }
434
435 pub async fn create_kafka_topic(&self, topic_suffix: &str) -> Result<KafkaResource> {
437 let topic = format!("test_{}_{}", &self.test_id[..8], topic_suffix);
438 KafkaResource::new(
439 &self.config.kafka_broker,
440 &self.config.schema_registry_url,
441 &topic,
442 )
443 .await
444 }
445
446 pub async fn consume_kafka_messages(
448 &self,
449 topic: &str,
450 max_messages: usize,
451 ) -> Result<Vec<(i64, String, String)>> {
452 let (messages, _) = KafkaResource::inspect_topic_messages(
453 &self.config.kafka_broker,
454 &self.config.schema_registry_url,
455 topic,
456 max_messages,
457 max_messages,
458 )
459 .await?;
460 Ok(messages)
461 }
462
463 pub async fn run_pipeline(&self, pipeline_yaml: &str, record_limit: u64) -> Result<ExitStatus> {
468 self.run_pipeline_with_opts(
469 pipeline_yaml,
470 PipelineOpts::new().record_limit(record_limit),
471 )
472 .await
473 }
474
475 pub async fn run_pipeline_with_opts(
477 &self,
478 pipeline_yaml: &str,
479 opts: PipelineOpts,
480 ) -> Result<ExitStatus> {
481 let pipeline_path = self.temp_dir.path().join("pipeline.yaml");
482 std::fs::write(&pipeline_path, pipeline_yaml)?;
483
484 let mut env_vars = self.build_env_vars();
485
486 if let Some(limit) = opts.record_limit {
487 env_vars.push((
488 "STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
489 limit.to_string(),
490 ));
491 }
492
493 env_vars.extend(opts.extra_env);
494
495 let timeout = opts.timeout.unwrap_or(std::time::Duration::from_secs(30));
496
497 tokio::time::timeout(
498 timeout,
499 streamling::run_streamling(
500 &pipeline_path,
501 self.config.streamling_bin.as_deref(),
502 &env_vars,
503 ),
504 )
505 .await
506 .map_err(|_| {
507 E2eError::StreamlingFailed(format!("Pipeline execution timed out after {:?}", timeout))
508 })?
509 }
510
511 pub async fn run_pipeline_with_capture(
516 &self,
517 pipeline_yaml: &str,
518 opts: PipelineOpts,
519 ) -> Result<PrintSinkOutput> {
520 let pipeline_path = self.temp_dir.path().join("pipeline.yaml");
521 std::fs::write(&pipeline_path, pipeline_yaml)?;
522
523 let mut env_vars = self.build_env_vars();
524
525 if let Some(limit) = opts.record_limit {
526 env_vars.push((
527 "STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
528 limit.to_string(),
529 ));
530 }
531
532 env_vars.extend(opts.extra_env);
533
534 let timeout = opts.timeout.unwrap_or(std::time::Duration::from_secs(30));
535
536 let output = tokio::time::timeout(
537 timeout,
538 streamling::run_streamling_with_capture(
539 &pipeline_path,
540 self.config.streamling_bin.as_deref(),
541 &env_vars,
542 ),
543 )
544 .await
545 .map_err(|_| {
546 E2eError::StreamlingFailed(format!("Pipeline execution timed out after {:?}", timeout))
547 })??;
548
549 let parsed = PrintSinkOutput::parse(&output.stderr);
551 tracing::debug!("Parsed {} rows from print sink output", parsed.len());
552
553 Ok(parsed)
554 }
555
556 pub async fn run_pipeline_raw(
563 &self,
564 pipeline_yaml: &str,
565 opts: PipelineOpts,
566 ) -> Result<StreamlingOutput> {
567 let pipeline_path = self.temp_dir.path().join("pipeline.yaml");
568 std::fs::write(&pipeline_path, pipeline_yaml)?;
569
570 let mut env_vars = self.build_env_vars();
571
572 if let Some(limit) = opts.record_limit {
573 env_vars.push((
574 "STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
575 limit.to_string(),
576 ));
577 }
578
579 env_vars.extend(opts.extra_env);
580
581 let timeout = opts.timeout.unwrap_or(std::time::Duration::from_secs(30));
582
583 tokio::time::timeout(
584 timeout,
585 streamling::run_streamling_raw(
586 &pipeline_path,
587 self.config.streamling_bin.as_deref(),
588 &env_vars,
589 &opts.extra_args,
590 ),
591 )
592 .await
593 .map_err(|_| {
594 E2eError::StreamlingFailed(format!("Pipeline execution timed out after {:?}", timeout))
595 })?
596 }
597}
598
599#[derive(Debug, Clone)]
601pub struct PipelineOpts {
602 pub record_limit: Option<u64>,
604 pub extra_env: Vec<(String, String)>,
606 pub timeout: Option<std::time::Duration>,
608 pub extra_args: Vec<String>,
610}
611
612impl Default for PipelineOpts {
613 fn default() -> Self {
614 Self {
615 record_limit: None,
616 extra_env: Vec::new(),
617 timeout: Some(std::time::Duration::from_secs(30)),
618 extra_args: Vec::new(),
619 }
620 }
621}
622
623impl PipelineOpts {
624 pub fn new() -> Self {
625 Self::default()
626 }
627
628 pub fn record_limit(mut self, limit: u64) -> Self {
629 self.record_limit = Some(limit);
630 self
631 }
632
633 pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
634 self.extra_env.push((key.into(), value.into()));
635 self
636 }
637
638 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
639 self.timeout = Some(timeout);
640 self
641 }
642
643 pub fn no_timeout(mut self) -> Self {
644 self.timeout = None;
645 self
646 }
647
648 pub fn arg(mut self, arg: impl Into<String>) -> Self {
649 self.extra_args.push(arg.into());
650 self
651 }
652}
653
654#[derive(Debug, Clone, Default)]
656pub struct TestContextOptions {
657 pub with_clickhouse: bool,
659 pub with_mysql: bool,
661 pub with_prometheus: bool,
663 pub with_sqs: bool,
665 pub with_plugin: bool,
668}
669
670impl TestContextOptions {
671 pub fn new() -> Self {
672 Self::default()
673 }
674
675 pub fn with_clickhouse(mut self) -> Self {
676 self.with_clickhouse = true;
677 self
678 }
679
680 pub fn with_mysql(mut self) -> Self {
681 self.with_mysql = true;
682 self
683 }
684
685 pub fn with_prometheus(mut self) -> Self {
686 self.with_prometheus = true;
687 self
688 }
689
690 pub fn with_sqs(mut self) -> Self {
691 self.with_sqs = true;
692 self
693 }
694
695 pub fn with_plugin(mut self) -> Self {
696 self.with_plugin = true;
697 self
698 }
699}
700
701pub fn init_tracing() {
703 use tracing_subscriber::{fmt, prelude::*, EnvFilter};
704
705 let _ = tracing_subscriber::registry()
706 .with(fmt::layer())
707 .with(EnvFilter::from_default_env())
708 .try_init();
709}