use crate::event::{Event, Fields, Level};
use crate::event_matcher::{Count, EventMatcher, Events};
use anyhow::{anyhow, Context, Result};
use cargo_metadata::{Metadata, MetadataCommand};
use itertools::Itertools;
use nix::sys::signal::Signal;
use nix::unistd::Pid;
use nu_ansi_term::Color;
use std::collections::HashSet;
use std::env;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::{LazyLock, Mutex};
use subprocess::{Exec, Redirection};
use tokio::sync::mpsc;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, ChildStdout, Command},
};
enum BuilderKind {
Path(PathBuf),
CargoName {
name: String,
profile: Option<String>,
},
}
pub struct BinProcessBuilder {
kind: BuilderKind,
log_name: Option<String>,
binary_args: Vec<String>,
env_vars: Vec<(String, String)>,
}
impl BinProcessBuilder {
pub fn from_path(bin_path: PathBuf) -> Self {
BinProcessBuilder {
kind: BuilderKind::Path(bin_path),
log_name: None,
binary_args: vec![],
env_vars: vec![],
}
}
pub fn from_cargo_name(name: String, profile: Option<String>) -> Self {
BinProcessBuilder {
kind: BuilderKind::CargoName { name, profile },
log_name: None,
binary_args: vec![],
env_vars: vec![],
}
}
pub fn with_log_name(mut self, log_name: Option<String>) -> Self {
self.log_name = log_name;
self
}
pub fn with_args(mut self, args: Vec<String>) -> Self {
self.binary_args = args;
self
}
pub fn with_env_vars(mut self, env_vars: Vec<(String, String)>) -> Self {
self.env_vars = env_vars;
self
}
pub async fn start(self) -> BinProcess {
match self.kind {
BuilderKind::Path(path) => {
let binary_name = path.file_name().expect("Path needs at least one element");
let log_name = self
.log_name
.as_deref()
.unwrap_or(binary_name.to_str().unwrap());
BinProcess::start_binary(&path, log_name, &self.env_vars, &self.binary_args).await
}
BuilderKind::CargoName { name, profile } => {
let log_name = self.log_name.as_deref().unwrap_or(&name);
BinProcess::start_binary_name(
&name,
log_name,
&self.env_vars,
&self.binary_args,
profile.as_deref(),
)
.await
}
}
}
}
struct CargoCache {
metadata: Option<Metadata>,
built_binaries: HashSet<BuiltBinary>,
}
#[derive(Hash, PartialEq, Eq)]
struct BuiltBinary {
name: String,
profile: String,
}
static CARGO_CACHE: LazyLock<Mutex<CargoCache>> = LazyLock::new(|| {
Mutex::new(CargoCache {
metadata: None,
built_binaries: HashSet::new(),
})
});
pub struct BinProcess {
child: Option<Child>,
event_rx: mpsc::UnboundedReceiver<Event>,
}
impl Drop for BinProcess {
fn drop(&mut self) {
if self.child.is_some() && !std::thread::panicking() {
panic!("Need to call either wait or shutdown_and_assert_success method on BinProcess before dropping it.");
}
}
}
impl BinProcess {
async fn start_binary_name(
cargo_bin_name: &str,
log_name: &str,
env_vars: &[(String, String)],
binary_args: &[String],
cargo_profile: Option<&str>,
) -> BinProcess {
let profile = cargo_profile.unwrap_or(if env!("PROFILE") == "release" {
"release"
} else {
"dev"
});
let target_dir = {
let mut cargo_cache = CARGO_CACHE.lock().unwrap();
if cargo_cache.metadata.is_none() {
cargo_cache.metadata = Some(MetadataCommand::new().exec().unwrap());
}
let built_package = BuiltBinary {
name: cargo_bin_name.to_owned(),
profile: profile.to_owned(),
};
if !cargo_cache.built_binaries.contains(&built_package) {
let all_args = vec![
"build",
"--all-features",
"--profile",
profile,
"--bin",
cargo_bin_name,
];
let metadata = cargo_cache.metadata.as_ref().unwrap();
run_command(
metadata.workspace_root.as_std_path(),
env!("CARGO"),
&all_args,
)
.unwrap();
cargo_cache.built_binaries.insert(built_package);
}
cargo_cache
.metadata
.as_ref()
.unwrap()
.target_directory
.clone()
};
let target_profile_name = match profile {
"dev" => "debug",
"test" => "debug",
"bench" => "release",
profile => profile,
};
let bin_path = target_dir.join(target_profile_name).join(cargo_bin_name);
BinProcess::start_binary(
bin_path.into_std_path_buf().as_path(),
log_name,
env_vars,
binary_args,
)
.await
}
async fn start_binary(
bin_path: &Path,
log_name: &str,
env_vars: &[(String, String)],
binary_args: &[String],
) -> BinProcess {
let log_name = if log_name.len() > 10 {
panic!("In order to line up in log outputs, argument log_name to BinProcess::start_with_args must be of length <= 10 but the value was: {log_name}");
} else {
format!("{log_name: <10}") };
let mut child = Command::new(bin_path)
.args(binary_args)
.envs(env_vars.iter().map(|(k, v)| (k.as_str(), v.as_str())))
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
.context(format!("Failed to run {bin_path:?}"))
.unwrap();
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
let stdout_reader = BufReader::new(child.stdout.take().unwrap()).lines();
let mut stderr_reader = BufReader::new(child.stderr.take().unwrap()).lines();
tokio::spawn(async move {
if let Err(err) = process_stdout_events(stdout_reader, &event_tx, log_name).await {
event_tx
.send(Event {
timestamp: "".to_owned(),
level: Level::Error,
target: "tokio-bin-process".to_owned(),
fields: Fields {
message: err.to_string(),
fields: Default::default(),
},
span: Default::default(),
spans: Default::default(),
})
.ok();
}
});
tokio::spawn(async move {
while let Some(line) = stderr_reader.next_line().await.expect("An IO error occured while reading stderr from the application, I'm not actually sure when this happens?") {
tracing::error!("stderr from process: {line}");
}
});
BinProcess {
child: Some(child),
event_rx,
}
}
pub fn pid(&self) -> i32 {
self.child.as_ref().unwrap().id().unwrap() as i32
}
fn send_signal(&self, signal: Signal) {
nix::sys::signal::kill(Pid::from_raw(self.pid()), signal).unwrap();
}
pub fn send_sigterm(&self) {
self.send_signal(Signal::SIGTERM)
}
pub fn send_sigint(&self) {
self.send_signal(Signal::SIGINT)
}
pub async fn wait_for(
&mut self,
ready: &EventMatcher,
expected_errors_and_warnings: &[EventMatcher],
) -> Events {
let mut events = vec![];
while let Some(event) = self.event_rx.recv().await {
let ready_match = ready.matches(&event);
events.push(event);
if ready_match {
BinProcess::assert_no_errors_or_warnings(&events, expected_errors_and_warnings);
return Events { events };
}
}
panic!("bin process shutdown before an event was found matching {ready:?}")
}
pub async fn consume_events(
&mut self,
event_count: usize,
expected_errors_and_warnings: &[EventMatcher],
) -> Events {
let mut events = vec![];
for _ in 0..event_count {
match self.event_rx.recv().await {
Some(event) => events.push(event),
None => {
if events.is_empty() {
panic!("The process was terminated before the expected count of {event_count} events occured. No events received so far");
} else {
let events_received = events.iter().map(|x| format!("{x}")).join("\n");
panic!("The process was terminated before the expected count of {event_count} events occured. Events received so far:\n{events_received}");
}
}
}
}
BinProcess::assert_no_errors_or_warnings(&events, expected_errors_and_warnings);
Events { events }
}
pub async fn shutdown_and_then_consume_events(
self,
expected_errors_and_warnings: &[EventMatcher],
) -> Events {
self.send_signal(nix::sys::signal::Signal::SIGTERM);
self.consume_remaining_events(expected_errors_and_warnings)
.await
}
pub async fn consume_remaining_events(
mut self,
expected_errors_and_warnings: &[EventMatcher],
) -> Events {
let (events, status) = self
.consume_remaining_events_inner(expected_errors_and_warnings)
.await;
if status != 0 {
panic!("The bin process exited with {status} but expected 0 exit code (Success).\nevents:\n{events}");
}
events
}
pub async fn consume_remaining_events_expect_failure(
mut self,
expected_errors_and_warnings: &[EventMatcher],
) -> Events {
let (events, status) = self
.consume_remaining_events_inner(expected_errors_and_warnings)
.await;
if status == 0 {
panic!("The bin process exited with {status} but expected non 0 exit code (Failure).\nevents:\n{events}");
}
events
}
fn assert_no_errors_or_warnings(
events: &[Event],
expected_errors_and_warnings: &[EventMatcher],
) {
let mut error_count = vec![0; expected_errors_and_warnings.len()];
for event in events {
if let Level::Error | Level::Warn = event.level {
let mut matched = false;
for (matcher, count) in expected_errors_and_warnings
.iter()
.zip(error_count.iter_mut())
{
if matcher.matches(event) {
*count += 1;
matched = true;
}
}
if !matched {
panic!("Unexpected event {event}\nAny ERROR or WARN events that occur in integration tests must be explicitly allowed by adding an appropriate EventMatcher to the method call.")
}
}
}
for (matcher, count) in expected_errors_and_warnings.iter().zip(error_count.iter()) {
match matcher.count {
Count::Any => {}
Count::Times(matcher_count) => {
if matcher_count != *count {
panic!("Expected to find matches for {matcher:?}, {matcher_count} times but actually matched {count} times")
}
}
Count::GreaterThanOrEqual(x) => {
if *count < x {
panic!("Expected to find matches for {matcher:?}, greater than or equal to {x} times but actually matched {count} times")
}
}
Count::LessThanOrEqual(x) => {
if *count > x {
panic!("Expected to find matches for {matcher:?}, less than or equal to {x} times but actually matched {count} times")
}
}
}
}
}
async fn consume_remaining_events_inner(
&mut self,
expected_errors_and_warnings: &[EventMatcher],
) -> (Events, i32) {
let child = self.child.take().unwrap();
let mut events = vec![];
while let Some(event) = self.event_rx.recv().await {
events.push(event);
}
BinProcess::assert_no_errors_or_warnings(&events, expected_errors_and_warnings);
use std::os::unix::process::ExitStatusExt;
let output = child.wait_with_output().await.unwrap();
let status = output.status.code().unwrap_or_else(|| {
panic!(
r#"Failed to get exit status.
The signal that killed the process was {:?}.
Possible causes:
* a SIGKILL was issued, something is going very wrong.
* a SIGINT or SIGTERM was issued but the aplications handler aborted without returning an exit value. (The default handler does this)
If you are building a long running application you should handle SIGKILL and SIGTERM such that your application cleanly shutsdown and returns an exit value.
Consider referring to how the tokio-bin-process example uses https://docs.rs/tokio/latest/tokio/signal/unix/struct.Signal.html
* a SIGINT or SIGTERM was issued and the aplication has an appropriate handler but the process was killed before the handler could be setup.
"#,
output.status.signal()
)
});
(Events { events }, status)
}
}
async fn process_stdout_events(
mut reader: tokio::io::Lines<BufReader<ChildStdout>>,
event_tx: &mpsc::UnboundedSender<Event>,
name: String,
) -> Result<()> {
while let Some(line) = reader.next_line().await.context("An IO error occured while reading stdout from the application, I'm not actually sure when this happens?")? {
let event = Event::from_json_str(&line).context(format!(
"The application emitted a line that was not a valid event encoded in json: {}",
line
))?;
println!("{} {event}", Color::Default.dimmed().paint(&name));
if event_tx.send(event).is_err() {
return Ok(());
}
}
Ok(())
}
fn run_command(working_dir: &Path, command: &str, args: &[&str]) -> Result<String> {
let data = Exec::cmd(command)
.args(args)
.cwd(working_dir)
.stdout(Redirection::Pipe)
.stderr(Redirection::Merge)
.capture()?;
if data.exit_status.success() {
Ok(data.stdout_str())
} else {
Err(anyhow!(
"command {} {:?} exited with {:?} and output:\n{}",
command,
args,
data.exit_status,
data.stdout_str()
))
}
}