Skip to main content

streamling_e2e/
lib.rs

1//! End-to-end test framework for streamling.
2//!
3//! This crate provides utilities for running e2e tests against a k3s cluster
4//! with PostgreSQL, Kafka (Redpanda), ClickHouse, and Prometheus.
5//!
6//! ## Design Principles
7//!
8//! - **No streamling dependencies**: This crate only depends on client libraries.
9//!   Streamling is executed as an external binary.
10//! - **Resource isolation**: Each test creates unique databases/topics via UUIDs.
11//! - **Automatic cleanup**: Resources are cleaned up when `TestContext` is dropped.
12//!
13//! ## Example
14//!
15//! ```rust,ignore
16//! use streamling_e2e::TestContext;
17//!
18//! #[tokio::test]
19//! async fn test_kafka_to_postgres() {
20//!     let ctx = TestContext::new().await.unwrap();
21//!     
22//!     // Seed data
23//!     ctx.produce_json_records(&ctx.kafka_topic, records).await.unwrap();
24//!     
25//!     // Run streamling
26//!     let status = ctx.run_streamling(&pipeline_yaml).await.unwrap();
27//!     assert!(status.success());
28//!     
29//!     // Verify results
30//!     let count = ctx.postgres.count("SELECT COUNT(*) FROM output").await.unwrap();
31//!     assert_eq!(count, 100);
32//! }
33//! ```
34
35pub mod inspect;
36pub mod resources;
37pub mod streamling;
38
39use resources::{
40    ClickHouseResource, KafkaResource, MySqlResource, PostgresResource, PrometheusResource,
41    SqsResource,
42};
43use std::env;
44use std::path::PathBuf;
45use std::process::ExitStatus;
46use tempfile::TempDir;
47use thiserror::Error;
48use tracing::info;
49use uuid::Uuid;
50
51// Re-export commonly used types
52pub use resources::{PrintSinkOutput, PrintSinkRow};
53pub use streamling::StreamlingOutput;
54
55/// Errors that can occur during e2e testing
56#[derive(Error, Debug)]
57pub enum E2eError {
58    #[error("Environment variable not set: {0}")]
59    EnvVarNotSet(String),
60
61    #[error("PostgreSQL error: {0}")]
62    Postgres(#[from] sqlx::Error),
63
64    #[error("Kafka error: {0}")]
65    Kafka(String),
66
67    #[error("ClickHouse error: {0}")]
68    ClickHouse(String),
69
70    #[error("MySQL error: {0}")]
71    Mysql(String),
72
73    #[error("Prometheus error: {0}")]
74    Prometheus(String),
75
76    #[error("SQS error: {0}")]
77    Sqs(String),
78
79    #[error("IO error: {0}")]
80    Io(#[from] std::io::Error),
81
82    #[error("YAML error: {0}")]
83    Yaml(#[from] serde_yaml::Error),
84
85    #[error("Streamling execution failed: {0}")]
86    StreamlingFailed(String),
87}
88
89pub type Result<T> = std::result::Result<T, E2eError>;
90
91/// Configuration for the e2e test environment
92#[derive(Debug, Clone)]
93pub struct E2eConfig {
94    pub postgres_url: String,
95    pub kafka_broker: String,
96    pub schema_registry_url: String,
97    pub clickhouse_url: String,
98    pub mysql_url: Option<String>,
99    pub prometheus_url: Option<String>,
100    pub sqs_url: Option<String>,
101    pub streamling_bin: Option<PathBuf>,
102}
103
104impl E2eConfig {
105    /// Load configuration from environment variables
106    pub fn from_env() -> Result<Self> {
107        Ok(Self {
108            postgres_url: env::var("E2E_POSTGRES_URL")
109                .map_err(|_| E2eError::EnvVarNotSet("E2E_POSTGRES_URL".to_string()))?,
110            kafka_broker: env::var("E2E_KAFKA_BROKER")
111                .map_err(|_| E2eError::EnvVarNotSet("E2E_KAFKA_BROKER".to_string()))?,
112            schema_registry_url: env::var("E2E_SCHEMA_REGISTRY_URL")
113                .map_err(|_| E2eError::EnvVarNotSet("E2E_SCHEMA_REGISTRY_URL".to_string()))?,
114            clickhouse_url: env::var("E2E_CLICKHOUSE_URL")
115                .map_err(|_| E2eError::EnvVarNotSet("E2E_CLICKHOUSE_URL".to_string()))?,
116            mysql_url: env::var("E2E_MYSQL_URL").ok(),
117            prometheus_url: env::var("E2E_PROMETHEUS_URL").ok(),
118            sqs_url: env::var("E2E_SQS_URL").ok(),
119            streamling_bin: env::var("E2E_STREAMLING_BIN").ok().map(PathBuf::from),
120        })
121    }
122}
123
124/// Test context providing isolated resources for a single test
125pub struct TestContext {
126    /// Unique identifier for this test run
127    pub test_id: String,
128
129    /// Configuration
130    pub config: E2eConfig,
131
132    /// PostgreSQL resource manager
133    pub postgres: PostgresResource,
134
135    /// Kafka resource manager
136    pub kafka: KafkaResource,
137
138    /// ClickHouse resource manager (optional, only if clickhouse tests are needed)
139    pub clickhouse: Option<ClickHouseResource>,
140
141    /// MySQL resource manager (optional, only if mysql tests are needed)
142    pub mysql: Option<MySqlResource>,
143
144    /// Prometheus resource manager (optional, only if metrics tests are needed)
145    pub prometheus: Option<PrometheusResource>,
146
147    /// SQS resource manager (optional, only if SQS tests are needed)
148    pub sqs: Option<SqsResource>,
149
150    /// When true, [`Self::build_env_vars`] forwards `STREAMLING__PLUGIN__PATH`
151    /// to the streamling subprocess if it is set in the current environment
152    /// and points at an existing file.
153    pub use_plugin: bool,
154
155    /// Temporary directory for pipeline files
156    pub temp_dir: TempDir,
157
158    /// Name of the isolated PostgreSQL database
159    pub pg_database: String,
160
161    /// Name of the isolated Kafka topic
162    pub kafka_topic: String,
163}
164
165impl TestContext {
166    /// Create a new test context with isolated resources
167    pub async fn new() -> Result<Self> {
168        Self::with_options(TestContextOptions::default()).await
169    }
170
171    /// Create a new test context with custom options
172    pub async fn with_options(options: TestContextOptions) -> Result<Self> {
173        let config = E2eConfig::from_env()?;
174        let test_id = Uuid::new_v4().to_string();
175        let short_id = &test_id[..8];
176
177        info!("Creating test context with id: {}", test_id);
178
179        // Create temporary directory
180        let temp_dir = TempDir::new()?;
181
182        // Create isolated PostgreSQL database
183        let pg_database = format!("test_{}", short_id);
184        let postgres = PostgresResource::new(&config.postgres_url, &pg_database).await?;
185        info!("Created PostgreSQL database: {}", pg_database);
186
187        // Create isolated Kafka topic
188        let kafka_topic = format!("test_{}_topic", short_id);
189        let kafka = KafkaResource::new(
190            &config.kafka_broker,
191            &config.schema_registry_url,
192            &kafka_topic,
193        )
194        .await?;
195        info!("Created Kafka topic: {}", kafka_topic);
196
197        // Create ClickHouse database if needed
198        let clickhouse = if options.with_clickhouse {
199            let ch_database = format!("test_{}", short_id);
200            Some(ClickHouseResource::new(&config.clickhouse_url, &ch_database).await?)
201        } else {
202            None
203        };
204
205        // Create MySQL database if needed
206        let mysql = if options.with_mysql {
207            let mysql_url = config.mysql_url.as_deref().ok_or_else(|| {
208                E2eError::Mysql("E2E_MYSQL_URL must be set when with_mysql is enabled".into())
209            })?;
210            let mysql_database = format!("test_{}", short_id);
211            Some(
212                MySqlResource::new(mysql_url, &mysql_database)
213                    .await
214                    .map_err(|e| {
215                        E2eError::Mysql(format!("failed to create MySQL resource: {}", e))
216                    })?,
217            )
218        } else {
219            None
220        };
221
222        // Create Prometheus resource if URL is configured and needed
223        let prometheus = if options.with_prometheus {
224            config
225                .prometheus_url
226                .as_ref()
227                .map(|url| PrometheusResource::new(url))
228        } else {
229            None
230        };
231
232        // Create SQS resource if SQS URL is configured and needed
233        let sqs = if options.with_sqs {
234            if let Some(url) = &config.sqs_url {
235                let sqs_queue = format!("test_{}_queue", short_id);
236                Some(SqsResource::new(url, &sqs_queue).await?)
237            } else {
238                None
239            }
240        } else {
241            None
242        };
243
244        Ok(Self {
245            test_id,
246            config,
247            postgres,
248            kafka,
249            clickhouse,
250            mysql,
251            prometheus,
252            sqs,
253            use_plugin: options.with_plugin,
254            temp_dir,
255            pg_database,
256            kafka_topic,
257        })
258    }
259
260    /// Get the PostgreSQL connection string for the isolated database
261    pub fn pg_connection_string(&self) -> String {
262        self.postgres.connection_string()
263    }
264
265    /// Run streamling with the given pipeline YAML content
266    pub async fn run_streamling(&self, pipeline_yaml: &str) -> Result<ExitStatus> {
267        let pipeline_path = self.temp_dir.path().join("pipeline.yaml");
268        std::fs::write(&pipeline_path, pipeline_yaml)?;
269
270        streamling::run_streamling(
271            &pipeline_path,
272            self.config.streamling_bin.as_deref(),
273            &self.build_env_vars(),
274        )
275        .await
276    }
277
278    /// Run streamling with a pipeline file path
279    pub async fn run_streamling_file(&self, pipeline_path: &std::path::Path) -> Result<ExitStatus> {
280        streamling::run_streamling(
281            pipeline_path,
282            self.config.streamling_bin.as_deref(),
283            &self.build_env_vars(),
284        )
285        .await
286    }
287
288    /// Build environment variables for streamling execution
289    pub fn build_env_vars(&self) -> Vec<(String, String)> {
290        let mut env_vars = vec![
291            (
292                "STREAMLING__KAFKA_SOURCE__BROKERS".to_string(),
293                self.config.kafka_broker.clone(),
294            ),
295            (
296                "STREAMLING__KAFKA_SOURCE__SCHEMA_REGISTRY_URL".to_string(),
297                self.config.schema_registry_url.clone(),
298            ),
299            // Unique consumer group ID per test to avoid offset conflicts
300            (
301                "STREAMLING__KAFKA_SOURCE__CONSUMER_GROUP_ID".to_string(),
302                format!("e2e-test-{}", self.test_id),
303            ),
304            (
305                "STREAMLING__POSTGRES_SINK__HOST".to_string(),
306                self.postgres.host.clone(),
307            ),
308            (
309                "STREAMLING__POSTGRES_SINK__PORT".to_string(),
310                self.postgres.port.to_string(),
311            ),
312            (
313                "STREAMLING__POSTGRES_SINK__USER".to_string(),
314                self.postgres.user.clone(),
315            ),
316            (
317                "STREAMLING__POSTGRES_SINK__PASS".to_string(),
318                self.postgres.password.clone(),
319            ),
320            (
321                "STREAMLING__POSTGRES_SINK__DB".to_string(),
322                self.pg_database.clone(),
323            ),
324            // Use in-memory state backend for tests (don't persist checkpoints)
325            (
326                "STREAMLING__STATE_BACKEND__BACKEND_TYPE".to_string(),
327                "InMemory".to_string(),
328            ),
329            // Kafka sink configuration
330            (
331                "STREAMLING__KAFKA_SINK__BROKERS".to_string(),
332                self.config.kafka_broker.clone(),
333            ),
334            (
335                "STREAMLING__KAFKA_SINK__SCHEMA_REGISTRY_URL".to_string(),
336                self.config.schema_registry_url.clone(),
337            ),
338            // Avoid port collisions when e2e tests run streamling in parallel.
339            // Port 0 lets the OS pick an ephemeral free port for the admin API.
340            ("STREAMLING__ADMIN_API_PORT".to_string(), "0".to_string()),
341        ];
342
343        // Add ClickHouse source and sink configuration if clickhouse is enabled
344        if let Some(clickhouse) = &self.clickhouse {
345            // ClickHouse source configuration
346            env_vars.push((
347                "STREAMLING__CLICKHOUSE_SOURCE__URL".to_string(),
348                self.config.clickhouse_url.clone(),
349            ));
350            env_vars.push((
351                "STREAMLING__CLICKHOUSE_SOURCE__USER".to_string(),
352                "default".to_string(),
353            ));
354            env_vars.push((
355                "STREAMLING__CLICKHOUSE_SOURCE__PASSWORD".to_string(),
356                String::new(),
357            ));
358            env_vars.push((
359                "STREAMLING__CLICKHOUSE_SOURCE__DATABASE".to_string(),
360                clickhouse.database.clone(),
361            ));
362            // ClickHouse sink configuration
363            env_vars.push((
364                "STREAMLING__CLICKHOUSE_SINK__URL".to_string(),
365                self.config.clickhouse_url.clone(),
366            ));
367            env_vars.push((
368                "STREAMLING__CLICKHOUSE_SINK__USER".to_string(),
369                "default".to_string(),
370            ));
371            env_vars.push((
372                "STREAMLING__CLICKHOUSE_SINK__PASSWORD".to_string(),
373                String::new(),
374            ));
375            env_vars.push((
376                "STREAMLING__CLICKHOUSE_SINK__DATABASE".to_string(),
377                clickhouse.database.clone(),
378            ));
379        }
380
381        // Add AWS/SQS configuration for ElasticMQ if SQS is enabled
382        if self.sqs.is_some() {
383            if let Some(sqs_url) = &self.config.sqs_url {
384                // Default credential chain fallback (for SqsResource in test harness)
385                env_vars.push(("AWS_ACCESS_KEY_ID".to_string(), "test".to_string()));
386                env_vars.push(("AWS_SECRET_ACCESS_KEY".to_string(), "test".to_string()));
387                env_vars.push(("AWS_DEFAULT_REGION".to_string(), "us-east-1".to_string()));
388                env_vars.push(("AWS_ENDPOINT_URL".to_string(), sqs_url.clone()));
389            }
390        }
391
392        if self.use_plugin {
393            if let Ok(plugin_path) = env::var("STREAMLING__PLUGIN__PATH") {
394                if !plugin_path.is_empty() && PathBuf::from(&plugin_path).exists() {
395                    env_vars.push(("STREAMLING__PLUGIN__PATH".to_string(), plugin_path));
396                }
397            }
398        }
399
400        // Add Prometheus/OpenTelemetry metrics configuration if enabled
401        if let Some(prometheus) = &self.prometheus {
402            // Set application ID to test_id for metrics isolation
403            env_vars.push((
404                "STREAMLING__APPLICATION_ID".to_string(),
405                self.test_id.clone(),
406            ));
407            env_vars.push((
408                "STREAMLING__OPEN_TELEMETRY_METRICS__INGESTION_ENDPOINT".to_string(),
409                prometheus.ingestion_endpoint.clone(),
410            ));
411            env_vars.push((
412                "STREAMLING__OPEN_TELEMETRY_METRICS__ENDPOINT_PROTOCOL".to_string(),
413                "http/protobuf".to_string(),
414            ));
415            env_vars.push((
416                "STREAMLING__OPEN_TELEMETRY_METRICS__BATCH_INTERVAL_SECS".to_string(),
417                "1".to_string(),
418            ));
419        }
420
421        env_vars
422    }
423
424    /// Create an additional SQS queue for use in tests
425    pub async fn create_sqs_queue(&self, queue_suffix: &str) -> Result<SqsResource> {
426        let queue_name = format!("test_{}_{}", &self.test_id[..8], queue_suffix);
427        let sqs_url = self
428            .config
429            .sqs_url
430            .as_ref()
431            .ok_or_else(|| E2eError::EnvVarNotSet("E2E_SQS_URL".to_string()))?;
432        SqsResource::new(sqs_url, &queue_name).await
433    }
434
435    /// Create an additional Kafka topic for use in tests (e.g., for sink output)
436    pub async fn create_kafka_topic(&self, topic_suffix: &str) -> Result<KafkaResource> {
437        let topic = format!("test_{}_{}", &self.test_id[..8], topic_suffix);
438        KafkaResource::new(
439            &self.config.kafka_broker,
440            &self.config.schema_registry_url,
441            &topic,
442        )
443        .await
444    }
445
446    /// Consume messages from a Kafka topic and return them as decoded values
447    pub async fn consume_kafka_messages(
448        &self,
449        topic: &str,
450        max_messages: usize,
451    ) -> Result<Vec<(i64, String, String)>> {
452        let (messages, _) = KafkaResource::inspect_topic_messages(
453            &self.config.kafka_broker,
454            &self.config.schema_registry_url,
455            topic,
456            max_messages,
457            max_messages,
458        )
459        .await?;
460        Ok(messages)
461    }
462
463    /// Run streamling with pipeline YAML and stop after N records
464    ///
465    /// This is the most common pattern for e2e tests - run until a specific
466    /// number of records have been processed.
467    pub async fn run_pipeline(&self, pipeline_yaml: &str, record_limit: u64) -> Result<ExitStatus> {
468        self.run_pipeline_with_opts(
469            pipeline_yaml,
470            PipelineOpts::new().record_limit(record_limit),
471        )
472        .await
473    }
474
475    /// Run streamling with pipeline YAML and custom options
476    pub async fn run_pipeline_with_opts(
477        &self,
478        pipeline_yaml: &str,
479        opts: PipelineOpts,
480    ) -> Result<ExitStatus> {
481        let pipeline_path = self.temp_dir.path().join("pipeline.yaml");
482        std::fs::write(&pipeline_path, pipeline_yaml)?;
483
484        let mut env_vars = self.build_env_vars();
485
486        if let Some(limit) = opts.record_limit {
487            env_vars.push((
488                "STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
489                limit.to_string(),
490            ));
491        }
492
493        env_vars.extend(opts.extra_env);
494
495        let timeout = opts.timeout.unwrap_or(std::time::Duration::from_secs(30));
496
497        tokio::time::timeout(
498            timeout,
499            streamling::run_streamling(
500                &pipeline_path,
501                self.config.streamling_bin.as_deref(),
502                &env_vars,
503            ),
504        )
505        .await
506        .map_err(|_| {
507            E2eError::StreamlingFailed(format!("Pipeline execution timed out after {:?}", timeout))
508        })?
509    }
510
511    /// Run streamling with pipeline YAML and capture stdout/stderr for inspection
512    ///
513    /// This is useful for tests that need to inspect the print sink output.
514    /// Returns a `PrintSinkOutput` which can be used to verify the pipeline output.
515    pub async fn run_pipeline_with_capture(
516        &self,
517        pipeline_yaml: &str,
518        opts: PipelineOpts,
519    ) -> Result<PrintSinkOutput> {
520        let pipeline_path = self.temp_dir.path().join("pipeline.yaml");
521        std::fs::write(&pipeline_path, pipeline_yaml)?;
522
523        let mut env_vars = self.build_env_vars();
524
525        if let Some(limit) = opts.record_limit {
526            env_vars.push((
527                "STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
528                limit.to_string(),
529            ));
530        }
531
532        env_vars.extend(opts.extra_env);
533
534        let timeout = opts.timeout.unwrap_or(std::time::Duration::from_secs(30));
535
536        let output = tokio::time::timeout(
537            timeout,
538            streamling::run_streamling_with_capture(
539                &pipeline_path,
540                self.config.streamling_bin.as_deref(),
541                &env_vars,
542            ),
543        )
544        .await
545        .map_err(|_| {
546            E2eError::StreamlingFailed(format!("Pipeline execution timed out after {:?}", timeout))
547        })??;
548
549        // Parse print sink output from stderr (tracing logs are written to stderr)
550        let parsed = PrintSinkOutput::parse(&output.stderr);
551        tracing::debug!("Parsed {} rows from print sink output", parsed.len());
552
553        Ok(parsed)
554    }
555
556    /// Run streamling with pipeline YAML and return raw output (stdout, stderr, status)
557    ///
558    /// This is useful for tests that need to inspect error messages or other raw output
559    /// that doesn't fit the PrintSinkOutput format. Unlike other run methods, this
560    /// returns the output even if the pipeline fails, allowing tests to inspect
561    /// error messages.
562    pub async fn run_pipeline_raw(
563        &self,
564        pipeline_yaml: &str,
565        opts: PipelineOpts,
566    ) -> Result<StreamlingOutput> {
567        let pipeline_path = self.temp_dir.path().join("pipeline.yaml");
568        std::fs::write(&pipeline_path, pipeline_yaml)?;
569
570        let mut env_vars = self.build_env_vars();
571
572        if let Some(limit) = opts.record_limit {
573            env_vars.push((
574                "STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
575                limit.to_string(),
576            ));
577        }
578
579        env_vars.extend(opts.extra_env);
580
581        let timeout = opts.timeout.unwrap_or(std::time::Duration::from_secs(30));
582
583        tokio::time::timeout(
584            timeout,
585            streamling::run_streamling_raw(
586                &pipeline_path,
587                self.config.streamling_bin.as_deref(),
588                &env_vars,
589                &opts.extra_args,
590            ),
591        )
592        .await
593        .map_err(|_| {
594            E2eError::StreamlingFailed(format!("Pipeline execution timed out after {:?}", timeout))
595        })?
596    }
597}
598
599/// Options for running a pipeline
600#[derive(Debug, Clone)]
601pub struct PipelineOpts {
602    /// Stop after processing this many records
603    pub record_limit: Option<u64>,
604    /// Additional environment variables
605    pub extra_env: Vec<(String, String)>,
606    /// Timeout for pipeline execution (default: 30 seconds)
607    pub timeout: Option<std::time::Duration>,
608    /// Extra CLI arguments passed to the streamling binary
609    pub extra_args: Vec<String>,
610}
611
612impl Default for PipelineOpts {
613    fn default() -> Self {
614        Self {
615            record_limit: None,
616            extra_env: Vec::new(),
617            timeout: Some(std::time::Duration::from_secs(30)),
618            extra_args: Vec::new(),
619        }
620    }
621}
622
623impl PipelineOpts {
624    pub fn new() -> Self {
625        Self::default()
626    }
627
628    pub fn record_limit(mut self, limit: u64) -> Self {
629        self.record_limit = Some(limit);
630        self
631    }
632
633    pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
634        self.extra_env.push((key.into(), value.into()));
635        self
636    }
637
638    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
639        self.timeout = Some(timeout);
640        self
641    }
642
643    pub fn no_timeout(mut self) -> Self {
644        self.timeout = None;
645        self
646    }
647
648    pub fn arg(mut self, arg: impl Into<String>) -> Self {
649        self.extra_args.push(arg.into());
650        self
651    }
652}
653
654/// Options for creating a test context
655#[derive(Debug, Clone, Default)]
656pub struct TestContextOptions {
657    /// Whether to create a ClickHouse database
658    pub with_clickhouse: bool,
659    /// Whether to create a MySQL database
660    pub with_mysql: bool,
661    /// Whether to enable Prometheus metrics
662    pub with_prometheus: bool,
663    /// Whether to create an SQS queue (requires ElasticMQ or SQS-compatible endpoint)
664    pub with_sqs: bool,
665    /// Whether to forward `STREAMLING__PLUGIN__PATH` to the streamling
666    /// subprocess (for tests that require plugin-provided sources, sinks, or UDFs).
667    pub with_plugin: bool,
668}
669
670impl TestContextOptions {
671    pub fn new() -> Self {
672        Self::default()
673    }
674
675    pub fn with_clickhouse(mut self) -> Self {
676        self.with_clickhouse = true;
677        self
678    }
679
680    pub fn with_mysql(mut self) -> Self {
681        self.with_mysql = true;
682        self
683    }
684
685    pub fn with_prometheus(mut self) -> Self {
686        self.with_prometheus = true;
687        self
688    }
689
690    pub fn with_sqs(mut self) -> Self {
691        self.with_sqs = true;
692        self
693    }
694
695    pub fn with_plugin(mut self) -> Self {
696        self.with_plugin = true;
697        self
698    }
699}
700
701/// Initialize tracing for tests
702pub fn init_tracing() {
703    use tracing_subscriber::{fmt, prelude::*, EnvFilter};
704
705    let _ = tracing_subscriber::registry()
706        .with(fmt::layer())
707        .with(EnvFilter::from_default_env())
708        .try_init();
709}