use crate::{E2eError, Result};
use std::path::{Path, PathBuf};
use std::process::ExitStatus;
use tokio::process::Command;
use tracing::info;
#[derive(Debug, Clone)]
pub struct StreamlingOutput {
pub status: ExitStatus,
pub stdout: String,
pub stderr: String,
}
fn find_streamling_dir() -> Option<PathBuf> {
std::env::var("E2E_STREAMLING_DIR")
.ok()
.map(PathBuf::from)
.or_else(|| {
std::env::var("CARGO_MANIFEST_DIR")
.ok()
.and_then(|manifest_dir| {
Path::new(&manifest_dir)
.parent()
.and_then(|p| p.parent())
.map(|p| p.join("crates/streamling"))
})
})
}
fn construct_program_with_args(binary_path: Option<&Path>) -> (String, Vec<String>) {
if let Some(bin) = binary_path {
(bin.to_string_lossy().to_string(), vec![])
} else {
(
"cargo".to_string(),
vec![
"run".to_string(),
"--release".to_string(),
"-p".to_string(),
"streamling".to_string(),
"--".to_string(),
],
)
}
}
pub async fn run_streamling(
pipeline_path: &Path,
binary_path: Option<&Path>,
env_vars: &[(String, String)],
) -> Result<ExitStatus> {
let streamling_dir = find_streamling_dir();
let (program, args) = construct_program_with_args(binary_path);
let mut cmd = Command::new(&program);
for arg in &args {
cmd.arg(arg);
}
if let Some(ref dir) = streamling_dir {
cmd.current_dir(dir);
info!("Running streamling from directory: {}", dir.display());
}
cmd.env(
"STREAMLING__PIPELINE_DEFINITION_LOCATION",
pipeline_path.to_string_lossy().to_string(),
);
for (key, value) in env_vars {
cmd.env(key, value);
}
info!(
"Running streamling with pipeline: {}",
pipeline_path.display()
);
let show_output = std::env::var("E2E_SHOW_STREAMLING_OUTPUT")
.map(|v| v != "0" && v != "false")
.unwrap_or(true);
let status = if show_output {
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
let mut child = cmd.spawn()?;
let stdout_handle = child.stdout.take().map(|stdout| {
tokio::spawn(async move {
use tokio::io::{AsyncBufReadExt, BufReader};
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
tracing::info!(target: "streamling", "{}", line);
}
})
});
let stderr_handle = child.stderr.take().map(|stderr| {
tokio::spawn(async move {
use tokio::io::{AsyncBufReadExt, BufReader};
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
tracing::warn!(target: "streamling", "{}", line);
}
})
});
let exit_status = child.wait().await?;
if let Some(handle) = stdout_handle {
let _ = handle.await;
}
if let Some(handle) = stderr_handle {
let _ = handle.await;
}
exit_status
} else {
let output = cmd.output().await?;
if !output.stdout.is_empty() {
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
tracing::debug!(target: "streamling", "{}", line);
}
}
if !output.stderr.is_empty() {
let stderr = String::from_utf8_lossy(&output.stderr);
for line in stderr.lines() {
if output.status.success() {
tracing::debug!(target: "streamling", "{}", line);
} else {
tracing::error!(target: "streamling", "{}", line);
}
}
}
output.status
};
if !status.success() {
return Err(E2eError::StreamlingFailed(format!(
"streamling exited with status: {:?}",
status.code()
)));
}
Ok(status)
}
pub async fn run_streamling_with_limit(
pipeline_path: &Path,
binary_path: Option<&Path>,
env_vars: &[(String, String)],
record_limit: u64,
) -> Result<ExitStatus> {
let mut all_env_vars = env_vars.to_vec();
all_env_vars.push((
"STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
record_limit.to_string(),
));
run_streamling(pipeline_path, binary_path, &all_env_vars).await
}
pub async fn run_streamling_with_capture(
pipeline_path: &Path,
binary_path: Option<&Path>,
env_vars: &[(String, String)],
) -> Result<StreamlingOutput> {
let streamling_dir = find_streamling_dir();
let (program, args) = construct_program_with_args(binary_path);
let mut cmd = Command::new(&program);
for arg in &args {
cmd.arg(arg);
}
if let Some(ref dir) = streamling_dir {
cmd.current_dir(dir);
info!("Running streamling from directory: {}", dir.display());
}
cmd.env(
"STREAMLING__PIPELINE_DEFINITION_LOCATION",
pipeline_path.to_string_lossy().to_string(),
);
for (key, value) in env_vars {
cmd.env(key, value);
}
info!(
"Running streamling with pipeline (capture mode): {}",
pipeline_path.display()
);
let output = cmd.output().await?;
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if !stdout.is_empty() {
for line in stdout.lines() {
tracing::debug!(target: "streamling", "{}", line);
}
}
if !stderr.is_empty() {
for line in stderr.lines() {
if output.status.success() {
tracing::debug!(target: "streamling", "{}", line);
} else {
tracing::error!(target: "streamling", "{}", line);
}
}
}
if !output.status.success() {
return Err(E2eError::StreamlingFailed(format!(
"streamling exited with status: {:?}\nstderr: {}",
output.status.code(),
stderr
)));
}
Ok(StreamlingOutput {
status: output.status,
stdout,
stderr,
})
}
pub async fn run_streamling_with_capture_and_limit(
pipeline_path: &Path,
binary_path: Option<&Path>,
env_vars: &[(String, String)],
record_limit: u64,
) -> Result<StreamlingOutput> {
let mut all_env_vars = env_vars.to_vec();
all_env_vars.push((
"STREAMLING__NUM_RECORDS_BEFORE_STOP".to_string(),
record_limit.to_string(),
));
run_streamling_with_capture(pipeline_path, binary_path, &all_env_vars).await
}
pub async fn run_streamling_raw(
pipeline_path: &Path,
binary_path: Option<&Path>,
env_vars: &[(String, String)],
extra_args: &[String],
) -> Result<StreamlingOutput> {
let streamling_dir = find_streamling_dir();
let (program, args) = construct_program_with_args(binary_path);
let mut cmd = Command::new(&program);
for arg in &args {
cmd.arg(arg);
}
for arg in extra_args {
cmd.arg(arg);
}
if let Some(ref dir) = streamling_dir {
cmd.current_dir(dir);
info!("Running streamling from directory: {}", dir.display());
}
cmd.env(
"STREAMLING__PIPELINE_DEFINITION_LOCATION",
pipeline_path.to_string_lossy().to_string(),
);
for (key, value) in env_vars {
cmd.env(key, value);
}
info!(
"Running streamling with pipeline (raw mode): {}",
pipeline_path.display()
);
let output = cmd.output().await?;
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if !stdout.is_empty() {
for line in stdout.lines() {
tracing::debug!(target: "streamling", "{}", line);
}
}
if !stderr.is_empty() {
for line in stderr.lines() {
if output.status.success() {
tracing::debug!(target: "streamling", "{}", line);
} else {
tracing::error!(target: "streamling", "{}", line);
}
}
}
Ok(StreamlingOutput {
status: output.status,
stdout,
stderr,
})
}