1use crate::{E2eError, Result};
4use std::path::{Path, PathBuf};
5use std::process::ExitStatus;
6use tokio::process::Command;
7use tracing::info;
8
9#[derive(Debug, Clone)]
11pub struct StreamlingOutput {
12 pub status: ExitStatus,
14 pub stdout: String,
16 pub stderr: String,
18}
19
20fn find_streamling_dir() -> Option<PathBuf> {
24 std::env::var("E2E_STREAMLING_DIR")
25 .ok()
26 .map(PathBuf::from)
27 .or_else(|| {
28 std::env::var("CARGO_MANIFEST_DIR")
29 .ok()
30 .and_then(|manifest_dir| {
31 Path::new(&manifest_dir)
32 .parent()
33 .and_then(|p| p.parent())
34 .map(|p| p.join("crates/streamling"))
35 })
36 })
37}
38
39fn construct_program_with_args(binary_path: Option<&Path>) -> (String, Vec<String>) {
40 if let Some(bin) = binary_path {
41 (bin.to_string_lossy().to_string(), vec![])
43 } else {
44 (
46 "cargo".to_string(),
47 vec![
48 "run".to_string(),
49 "--release".to_string(),
50 "-p".to_string(),
51 "streamling".to_string(),
52 "--".to_string(),
53 ],
54 )
55 }
56}
57
58pub async fn run_streamling(
60 pipeline_path: &Path,
61 binary_path: Option<&Path>,
62 env_vars: &[(String, String)],
63) -> Result<ExitStatus> {
64 let streamling_dir = find_streamling_dir();
65 let (program, args) = construct_program_with_args(binary_path);
66
67 let mut cmd = Command::new(&program);
68
69 for arg in &args {
71 cmd.arg(arg);
72 }
73
74 if let Some(ref dir) = streamling_dir {
77 cmd.current_dir(dir);
78 info!("Running streamling from directory: {}", dir.display());
79 }
80
81 cmd.env(
83 "STREAMLING__PIPELINE_DEFINITION_LOCATION",
84 pipeline_path.to_string_lossy().to_string(),
85 );
86
87 for (key, value) in env_vars {
89 cmd.env(key, value);
90 }
91
92 info!(
93 "Running streamling with pipeline: {}",
94 pipeline_path.display()
95 );
96
97 let show_output = std::env::var("E2E_SHOW_STREAMLING_OUTPUT")
99 .map(|v| v != "0" && v != "false")
100 .unwrap_or(true);
101
102 let status = if show_output {
103 cmd.stdout(std::process::Stdio::piped());
105 cmd.stderr(std::process::Stdio::piped());
106
107 let mut child = cmd.spawn()?;
108
109 let stdout_handle = child.stdout.take().map(|stdout| {
111 tokio::spawn(async move {
112 use tokio::io::{AsyncBufReadExt, BufReader};
113 let reader = BufReader::new(stdout);
114 let mut lines = reader.lines();
115 while let Ok(Some(line)) = lines.next_line().await {
116 tracing::info!(target: "streamling", "{}", line);
117 }
118 })
119 });
120
121 let stderr_handle = child.stderr.take().map(|stderr| {
122 tokio::spawn(async move {
123 use tokio::io::{AsyncBufReadExt, BufReader};
124 let reader = BufReader::new(stderr);
125 let mut lines = reader.lines();
126 while let Ok(Some(line)) = lines.next_line().await {
127 tracing::warn!(target: "streamling", "{}", line);
128 }
129 })
130 });
131
132 let exit_status = child.wait().await?;
134
135 if let Some(handle) = stdout_handle {
137 let _ = handle.await;
138 }
139 if let Some(handle) = stderr_handle {
140 let _ = handle.await;
141 }
142
143 exit_status
144 } else {
145 let output = cmd.output().await?;
147
148 if !output.stdout.is_empty() {
150 let stdout = String::from_utf8_lossy(&output.stdout);
151 for line in stdout.lines() {
152 tracing::debug!(target: "streamling", "{}", line);
153 }
154 }
155 if !output.stderr.is_empty() {
156 let stderr = String::from_utf8_lossy(&output.stderr);
157 for line in stderr.lines() {
158 if output.status.success() {
159 tracing::debug!(target: "streamling", "{}", line);
160 } else {
161 tracing::error!(target: "streamling", "{}", line);
162 }
163 }
164 }
165
166 output.status
167 };
168
169 if !status.success() {
170 return Err(E2eError::StreamlingFailed(format!(
171 "streamling exited with status: {:?}",
172 status.code()
173 )));
174 }
175
176 Ok(status)
177}
178
179pub async fn run_streamling_with_limit(
182 pipeline_path: &Path,
183 binary_path: Option<&Path>,
184 env_vars: &[(String, String)],
185 record_limit: u64,
186) -> Result<ExitStatus> {
187 let mut all_env_vars = env_vars.to_vec();
188 all_env_vars.push((
189 "STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
190 record_limit.to_string(),
191 ));
192
193 run_streamling(pipeline_path, binary_path, &all_env_vars).await
194}
195
196pub async fn run_streamling_with_capture(
201 pipeline_path: &Path,
202 binary_path: Option<&Path>,
203 env_vars: &[(String, String)],
204) -> Result<StreamlingOutput> {
205 let streamling_dir = find_streamling_dir();
206 let (program, args) = construct_program_with_args(binary_path);
207
208 let mut cmd = Command::new(&program);
209
210 for arg in &args {
212 cmd.arg(arg);
213 }
214
215 if let Some(ref dir) = streamling_dir {
218 cmd.current_dir(dir);
219 info!("Running streamling from directory: {}", dir.display());
220 }
221
222 cmd.env(
224 "STREAMLING__PIPELINE_DEFINITION_LOCATION",
225 pipeline_path.to_string_lossy().to_string(),
226 );
227
228 for (key, value) in env_vars {
230 cmd.env(key, value);
231 }
232
233 info!(
234 "Running streamling with pipeline (capture mode): {}",
235 pipeline_path.display()
236 );
237
238 let output = cmd.output().await?;
240
241 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
242 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
243
244 if !stdout.is_empty() {
246 for line in stdout.lines() {
247 tracing::debug!(target: "streamling", "{}", line);
248 }
249 }
250 if !stderr.is_empty() {
251 for line in stderr.lines() {
252 if output.status.success() {
253 tracing::debug!(target: "streamling", "{}", line);
254 } else {
255 tracing::error!(target: "streamling", "{}", line);
256 }
257 }
258 }
259
260 if !output.status.success() {
261 return Err(E2eError::StreamlingFailed(format!(
262 "streamling exited with status: {:?}\nstderr: {}",
263 output.status.code(),
264 stderr
265 )));
266 }
267
268 Ok(StreamlingOutput {
269 status: output.status,
270 stdout,
271 stderr,
272 })
273}
274
275pub async fn run_streamling_with_capture_and_limit(
277 pipeline_path: &Path,
278 binary_path: Option<&Path>,
279 env_vars: &[(String, String)],
280 record_limit: u64,
281) -> Result<StreamlingOutput> {
282 let mut all_env_vars = env_vars.to_vec();
283 all_env_vars.push((
284 "STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
285 record_limit.to_string(),
286 ));
287
288 run_streamling_with_capture(pipeline_path, binary_path, &all_env_vars).await
289}
290
291pub async fn run_streamling_raw(
297 pipeline_path: &Path,
298 binary_path: Option<&Path>,
299 env_vars: &[(String, String)],
300 extra_args: &[String],
301) -> Result<StreamlingOutput> {
302 let streamling_dir = find_streamling_dir();
303 let (program, args) = construct_program_with_args(binary_path);
304
305 let mut cmd = Command::new(&program);
306
307 for arg in &args {
309 cmd.arg(arg);
310 }
311
312 for arg in extra_args {
313 cmd.arg(arg);
314 }
315
316 if let Some(ref dir) = streamling_dir {
319 cmd.current_dir(dir);
320 info!("Running streamling from directory: {}", dir.display());
321 }
322
323 cmd.env(
325 "STREAMLING__PIPELINE_DEFINITION_LOCATION",
326 pipeline_path.to_string_lossy().to_string(),
327 );
328
329 for (key, value) in env_vars {
331 cmd.env(key, value);
332 }
333
334 info!(
335 "Running streamling with pipeline (raw mode): {}",
336 pipeline_path.display()
337 );
338
339 let output = cmd.output().await?;
341
342 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
343 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
344
345 if !stdout.is_empty() {
347 for line in stdout.lines() {
348 tracing::debug!(target: "streamling", "{}", line);
349 }
350 }
351 if !stderr.is_empty() {
352 for line in stderr.lines() {
353 if output.status.success() {
354 tracing::debug!(target: "streamling", "{}", line);
355 } else {
356 tracing::error!(target: "streamling", "{}", line);
357 }
358 }
359 }
360
361 Ok(StreamlingOutput {
363 status: output.status,
364 stdout,
365 stderr,
366 })
367}