Skip to main content

streamling_e2e/
streamling.rs

1//! Streamling binary execution helpers.
2
3use crate::{E2eError, Result};
4use std::path::{Path, PathBuf};
5use std::process::ExitStatus;
6use tokio::process::Command;
7use tracing::info;
8
9/// Output from running streamling with captured stdout/stderr
10#[derive(Debug, Clone)]
11pub struct StreamlingOutput {
12    /// Exit status of the process
13    pub status: ExitStatus,
14    /// Captured stdout
15    pub stdout: String,
16    /// Captured stderr
17    pub stderr: String,
18}
19
20/// Find the path to the streamling directory
21/// Allows overrides via E2E_STREAMLING_DIR for cross-repo e2e tests (e.g. plugins repo).
22/// Falls back to CARGO_MANIFEST_DIR-based workspace detection.
23fn find_streamling_dir() -> Option<PathBuf> {
24    std::env::var("E2E_STREAMLING_DIR")
25        .ok()
26        .map(PathBuf::from)
27        .or_else(|| {
28            std::env::var("CARGO_MANIFEST_DIR")
29                .ok()
30                .and_then(|manifest_dir| {
31                    Path::new(&manifest_dir)
32                        .parent()
33                        .and_then(|p| p.parent())
34                        .map(|p| p.join("crates/streamling"))
35                })
36        })
37}
38
39fn construct_program_with_args(binary_path: Option<&Path>) -> (String, Vec<String>) {
40    if let Some(bin) = binary_path {
41        // Use pre-built binary
42        (bin.to_string_lossy().to_string(), vec![])
43    } else {
44        // Use cargo run with package selector
45        (
46            "cargo".to_string(),
47            vec![
48                "run".to_string(),
49                "--release".to_string(),
50                "-p".to_string(),
51                "streamling".to_string(),
52                "--".to_string(),
53            ],
54        )
55    }
56}
57
58/// Run the streamling binary with the given pipeline file
59pub async fn run_streamling(
60    pipeline_path: &Path,
61    binary_path: Option<&Path>,
62    env_vars: &[(String, String)],
63) -> Result<ExitStatus> {
64    let streamling_dir = find_streamling_dir();
65    let (program, args) = construct_program_with_args(binary_path);
66
67    let mut cmd = Command::new(&program);
68
69    // Add cargo args if using cargo run
70    for arg in &args {
71        cmd.arg(arg);
72    }
73
74    // Run from crates/streamling/ so any relative paths in a pipeline (e.g. state.db)
75    // resolve consistently. Config and the WASM runtime are embedded in the binary.
76    if let Some(ref dir) = streamling_dir {
77        cmd.current_dir(dir);
78        info!("Running streamling from directory: {}", dir.display());
79    }
80
81    // Set pipeline location via environment variable (streamling reads config first)
82    cmd.env(
83        "STREAMLING__PIPELINE_DEFINITION_LOCATION",
84        pipeline_path.to_string_lossy().to_string(),
85    );
86
87    // Add environment variables
88    for (key, value) in env_vars {
89        cmd.env(key, value);
90    }
91
92    info!(
93        "Running streamling with pipeline: {}",
94        pipeline_path.display()
95    );
96
97    // Check if we should show streamling output (default to true)
98    let show_output = std::env::var("E2E_SHOW_STREAMLING_OUTPUT")
99        .map(|v| v != "0" && v != "false")
100        .unwrap_or(true);
101
102    let status = if show_output {
103        // Stream output in real-time
104        cmd.stdout(std::process::Stdio::piped());
105        cmd.stderr(std::process::Stdio::piped());
106
107        let mut child = cmd.spawn()?;
108
109        // Spawn tasks to stream stdout/stderr
110        let stdout_handle = child.stdout.take().map(|stdout| {
111            tokio::spawn(async move {
112                use tokio::io::{AsyncBufReadExt, BufReader};
113                let reader = BufReader::new(stdout);
114                let mut lines = reader.lines();
115                while let Ok(Some(line)) = lines.next_line().await {
116                    tracing::info!(target: "streamling", "{}", line);
117                }
118            })
119        });
120
121        let stderr_handle = child.stderr.take().map(|stderr| {
122            tokio::spawn(async move {
123                use tokio::io::{AsyncBufReadExt, BufReader};
124                let reader = BufReader::new(stderr);
125                let mut lines = reader.lines();
126                while let Ok(Some(line)) = lines.next_line().await {
127                    tracing::warn!(target: "streamling", "{}", line);
128                }
129            })
130        });
131
132        // Wait for process to finish
133        let exit_status = child.wait().await?;
134
135        // Wait for output streams to finish
136        if let Some(handle) = stdout_handle {
137            let _ = handle.await;
138        }
139        if let Some(handle) = stderr_handle {
140            let _ = handle.await;
141        }
142
143        exit_status
144    } else {
145        // Capture output (original behavior)
146        let output = cmd.output().await?;
147
148        // Log captured output
149        if !output.stdout.is_empty() {
150            let stdout = String::from_utf8_lossy(&output.stdout);
151            for line in stdout.lines() {
152                tracing::debug!(target: "streamling", "{}", line);
153            }
154        }
155        if !output.stderr.is_empty() {
156            let stderr = String::from_utf8_lossy(&output.stderr);
157            for line in stderr.lines() {
158                if output.status.success() {
159                    tracing::debug!(target: "streamling", "{}", line);
160                } else {
161                    tracing::error!(target: "streamling", "{}", line);
162                }
163            }
164        }
165
166        output.status
167    };
168
169    if !status.success() {
170        return Err(E2eError::StreamlingFailed(format!(
171            "streamling exited with status: {:?}",
172            status.code()
173        )));
174    }
175
176    Ok(status)
177}
178
179/// Run streamling and wait for it to process a specific number of records
180/// This is useful for tests that need streamling to run until a condition is met
181pub async fn run_streamling_with_limit(
182    pipeline_path: &Path,
183    binary_path: Option<&Path>,
184    env_vars: &[(String, String)],
185    record_limit: u64,
186) -> Result<ExitStatus> {
187    let mut all_env_vars = env_vars.to_vec();
188    all_env_vars.push((
189        "STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
190        record_limit.to_string(),
191    ));
192
193    run_streamling(pipeline_path, binary_path, &all_env_vars).await
194}
195
196/// Run streamling and capture stdout/stderr for inspection
197///
198/// Unlike `run_streamling`, this function always captures output rather than streaming it,
199/// and returns the captured output for parsing (e.g., print sink output).
200pub async fn run_streamling_with_capture(
201    pipeline_path: &Path,
202    binary_path: Option<&Path>,
203    env_vars: &[(String, String)],
204) -> Result<StreamlingOutput> {
205    let streamling_dir = find_streamling_dir();
206    let (program, args) = construct_program_with_args(binary_path);
207
208    let mut cmd = Command::new(&program);
209
210    // Add cargo args if using cargo run
211    for arg in &args {
212        cmd.arg(arg);
213    }
214
215    // Run from crates/streamling/ so relative pipeline paths resolve consistently.
216    // Config and the WASM runtime are embedded in the binary.
217    if let Some(ref dir) = streamling_dir {
218        cmd.current_dir(dir);
219        info!("Running streamling from directory: {}", dir.display());
220    }
221
222    // Set pipeline location via environment variable
223    cmd.env(
224        "STREAMLING__PIPELINE_DEFINITION_LOCATION",
225        pipeline_path.to_string_lossy().to_string(),
226    );
227
228    // Add environment variables
229    for (key, value) in env_vars {
230        cmd.env(key, value);
231    }
232
233    info!(
234        "Running streamling with pipeline (capture mode): {}",
235        pipeline_path.display()
236    );
237
238    // Capture output
239    let output = cmd.output().await?;
240
241    let stdout = String::from_utf8_lossy(&output.stdout).to_string();
242    let stderr = String::from_utf8_lossy(&output.stderr).to_string();
243
244    // Log captured output for debugging
245    if !stdout.is_empty() {
246        for line in stdout.lines() {
247            tracing::debug!(target: "streamling", "{}", line);
248        }
249    }
250    if !stderr.is_empty() {
251        for line in stderr.lines() {
252            if output.status.success() {
253                tracing::debug!(target: "streamling", "{}", line);
254            } else {
255                tracing::error!(target: "streamling", "{}", line);
256            }
257        }
258    }
259
260    if !output.status.success() {
261        return Err(E2eError::StreamlingFailed(format!(
262            "streamling exited with status: {:?}\nstderr: {}",
263            output.status.code(),
264            stderr
265        )));
266    }
267
268    Ok(StreamlingOutput {
269        status: output.status,
270        stdout,
271        stderr,
272    })
273}
274
275/// Run streamling with capture and a record limit
276pub async fn run_streamling_with_capture_and_limit(
277    pipeline_path: &Path,
278    binary_path: Option<&Path>,
279    env_vars: &[(String, String)],
280    record_limit: u64,
281) -> Result<StreamlingOutput> {
282    let mut all_env_vars = env_vars.to_vec();
283    all_env_vars.push((
284        "STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
285        record_limit.to_string(),
286    ));
287
288    run_streamling_with_capture(pipeline_path, binary_path, &all_env_vars).await
289}
290
291/// Run streamling and return raw output, even on failure
292///
293/// Unlike `run_streamling_with_capture`, this function returns the output
294/// regardless of the exit status. This is useful for tests that need to
295/// inspect error messages from failed pipeline runs.
296pub async fn run_streamling_raw(
297    pipeline_path: &Path,
298    binary_path: Option<&Path>,
299    env_vars: &[(String, String)],
300    extra_args: &[String],
301) -> Result<StreamlingOutput> {
302    let streamling_dir = find_streamling_dir();
303    let (program, args) = construct_program_with_args(binary_path);
304
305    let mut cmd = Command::new(&program);
306
307    // Add cargo args if using cargo run
308    for arg in &args {
309        cmd.arg(arg);
310    }
311
312    for arg in extra_args {
313        cmd.arg(arg);
314    }
315
316    // Run from crates/streamling/ so relative pipeline paths resolve consistently.
317    // Config and the WASM runtime are embedded in the binary.
318    if let Some(ref dir) = streamling_dir {
319        cmd.current_dir(dir);
320        info!("Running streamling from directory: {}", dir.display());
321    }
322
323    // Set pipeline location via environment variable
324    cmd.env(
325        "STREAMLING__PIPELINE_DEFINITION_LOCATION",
326        pipeline_path.to_string_lossy().to_string(),
327    );
328
329    // Add environment variables
330    for (key, value) in env_vars {
331        cmd.env(key, value);
332    }
333
334    info!(
335        "Running streamling with pipeline (raw mode): {}",
336        pipeline_path.display()
337    );
338
339    // Capture output
340    let output = cmd.output().await?;
341
342    let stdout = String::from_utf8_lossy(&output.stdout).to_string();
343    let stderr = String::from_utf8_lossy(&output.stderr).to_string();
344
345    // Log captured output for debugging
346    if !stdout.is_empty() {
347        for line in stdout.lines() {
348            tracing::debug!(target: "streamling", "{}", line);
349        }
350    }
351    if !stderr.is_empty() {
352        for line in stderr.lines() {
353            if output.status.success() {
354                tracing::debug!(target: "streamling", "{}", line);
355            } else {
356                tracing::error!(target: "streamling", "{}", line);
357            }
358        }
359    }
360
361    // Return output regardless of exit status
362    Ok(StreamlingOutput {
363        status: output.status,
364        stdout,
365        stderr,
366    })
367}