tokio_bin_process/
process.rs

1use crate::event::{Event, Fields, Level};
2use crate::event_matcher::{Count, EventMatcher, Events};
3use anyhow::{anyhow, Context, Result};
4use cargo_metadata::{Metadata, MetadataCommand};
5use itertools::Itertools;
6use nix::sys::signal::Signal;
7use nix::unistd::Pid;
8use nu_ansi_term::Color;
9use std::collections::HashSet;
10use std::env;
11use std::path::{Path, PathBuf};
12use std::process::Stdio;
13use std::sync::{LazyLock, Mutex};
14use subprocess::{Exec, Redirection};
15use tokio::sync::mpsc;
16use tokio::{
17    io::{AsyncBufReadExt, BufReader},
18    process::{Child, ChildStdout, Command},
19};
20
21enum BuilderKind {
22    Path(PathBuf),
23    CargoName {
24        name: String,
25        profile: Option<String>,
26    },
27}
28
29/// Start a running process of a binary.
30///
31/// All `tracing` events emitted by your binary in JSON over stdout will be processed by `BinProcess` and then emitted to the tests stdout in the default human readable tracing format.
32/// To ensure any WARN/ERROR's from your test logic are visible, `BinProcess` will setup its own subscriber that outputs to the tests stdout in the default human readable format.
33/// If you set your own subscriber before constructing a [`BinProcess`] that will take preference instead.
34///
35/// Dropping the BinProcess will trigger a panic unless [`BinProcess::shutdown_and_then_consume_events`] or [`BinProcess::consume_remaining_events`] has been called.
36/// This is done to avoid missing important assertions run by those methods.
37///
38/// ## Which constructor to use:
39/// A guide to constructing `BinProcess` based on your use case:
40///
41/// ### You are writing an integration test or bench and the binary you want to run is defined in the same package as the test or bench you are writing.
42///
43/// Use [`BinProcessBuilder::from_path`] like this:
44/// ```rust
45/// # use tokio_bin_process::BinProcessBuilder;
46/// # // hack to make the doc test compile
47/// # macro_rules! bin_path {
48/// #     ($bin_name:expr) => {
49/// #         std::path::PathBuf::from("foo")
50/// #     };
51/// # }
52/// # async {
53/// BinProcessBuilder::from_path(bin_path!("cooldb"))
54///     .with_args(vec!["--log-format".to_owned(), "json".to_owned()])
55///     .start()
56///     .await;
57/// # };
58/// ```
59/// Using `from_path` instead of `from_cargo_name` here is faster and more robust as `BinProcess`
60/// does not need to invoke Cargo.
61///
62/// ### You are writing an integration test or bench and the binary you want to test is in the same
63/// workspace but in a different package to the test or bench you are writing.
64/// Use [`BinProcessBuilder::from_cargo_name`] like this:
65/// ```rust
66/// # use tokio_bin_process::BinProcessBuilder;
67/// # async {
68/// BinProcessBuilder::from_cargo_name("cooldb".to_owned(), None)
69///     .with_args(vec!["--log-format".to_owned(), "json".to_owned()])
70///     .start()
71///     .await;
72/// # };
73/// ```
74///
75/// ### You are writing an example or other binary within a package
76/// Use [`BinProcessBuilder::from_cargo_name`] like this:
77/// ```rust
78/// # use tokio_bin_process::BinProcessBuilder;
79/// # async {
80/// BinProcessBuilder::from_cargo_name("cooldb".to_owned(), None)
81///     .with_args(vec!["--log-format".to_owned(), "json".to_owned()])
82///     .start()
83///     .await;
84/// # };
85/// ```
86///
87/// ### You need to compile the binary with an arbitrary profile
88/// Use [`BinProcessBuilder::from_cargo_name`] like this:
89/// ```rust
90/// # use tokio_bin_process::BinProcessBuilder;
91/// # async {
92/// BinProcessBuilder::from_cargo_name("cooldb".to_owned(), Some("profilename".to_owned()))
93///     .with_args(vec!["--log-format".to_owned(), "json".to_owned()])
94///     .start()
95///     .await;
96/// # };
97/// ```
98///
99/// ### You have an arbitrary pre-compiled binary to run
100/// Use [`BinProcessBuilder::from_path`] like this:
101/// ```rust
102/// # use tokio_bin_process::BinProcessBuilder;
103/// # use std::path::PathBuf;
104/// # async {
105/// BinProcessBuilder::from_path(PathBuf::from("some/path/to/precompiled/cooldb"))
106///     .with_args(vec!["--log-format".to_owned(), "json".to_owned()])
107///     .start()
108///     .await;
109/// # };
110/// ```
111pub struct BinProcessBuilder {
112    kind: BuilderKind,
113    log_name: Option<String>,
114    binary_args: Vec<String>,
115    env_vars: Vec<(String, String)>,
116}
117
118impl BinProcessBuilder {
119    /// Start the binary specified in `bin_path`.
120    ///
121    /// Make sure to also call `with_args` or `with_env_vars` to enable the `tracing` JSON logger to stdout if that is not the default.
122    pub fn from_path(bin_path: PathBuf) -> Self {
123        BinProcessBuilder {
124            kind: BuilderKind::Path(bin_path),
125            log_name: None,
126            binary_args: vec![],
127            env_vars: vec![],
128        }
129    }
130
131    /// Prefer [`BinProcessBuilder::from_path`] where possible as it is faster and more robust.
132    ///
133    /// Start the binary named `cargo_bin_name` in the current workspace in a new process.
134    /// A `BinProcess` is returned which can be used to interact with the process.
135    ///
136    /// The crate will be compiled with the Cargo profile specified in `cargo_profile`.
137    /// * When it is `Some(_)` the value specified is used.
138    /// * When it is `None` it will use "release" if `tokio-bin-process` was compiled in a release derived profile or "dev" if it was compiled in a dev derived profile.
139    ///
140    /// The reason `None` will only ever result in a "release" or "dev" profile is due to a limitation on what profile information Cargo exposes to us.
141    ///
142    /// Make sure to also call `with_args` or `with_env_vars` to enable the `tracing` JSON logger to stdout if that is not the default.
143    pub fn from_cargo_name(name: String, profile: Option<String>) -> Self {
144        BinProcessBuilder {
145            kind: BuilderKind::CargoName { name, profile },
146            log_name: None,
147            binary_args: vec![],
148            env_vars: vec![],
149        }
150    }
151
152    /// `log_name` is prepended to the logs that `BinProcess` forwards to stdout.
153    /// This helps to differentiate between `tracing` logs generated by the test itself and the process under test.
154    /// The log name must be <= 10 characters.
155    pub fn with_log_name(mut self, log_name: Option<String>) -> Self {
156        self.log_name = log_name;
157        self
158    }
159
160    /// The `args` will be used as the args to the binary.
161    /// The args should give the desired setup for the given integration test.
162    pub fn with_args(mut self, args: Vec<String>) -> Self {
163        self.binary_args = args;
164        self
165    }
166
167    /// The `env_vars` will be used as the env vars given to the binary.
168    /// The env vars should give the desired setup for the given integration test.
169    pub fn with_env_vars(mut self, env_vars: Vec<(String, String)>) -> Self {
170        self.env_vars = env_vars;
171        self
172    }
173
174    /// Start the binary specified by from_path or from_cargo_name
175    pub async fn start(self) -> BinProcess {
176        match self.kind {
177            BuilderKind::Path(path) => {
178                let binary_name = path.file_name().expect("Path needs at least one element");
179                let log_name = self
180                    .log_name
181                    .as_deref()
182                    .unwrap_or(binary_name.to_str().unwrap());
183
184                BinProcess::start_binary(&path, log_name, &self.env_vars, &self.binary_args).await
185            }
186            BuilderKind::CargoName { name, profile } => {
187                let log_name = self.log_name.as_deref().unwrap_or(&name);
188
189                BinProcess::start_binary_name(
190                    &name,
191                    log_name,
192                    &self.env_vars,
193                    &self.binary_args,
194                    profile.as_deref(),
195                )
196                .await
197            }
198        }
199    }
200}
201
202struct CargoCache {
203    metadata: Option<Metadata>,
204    built_binaries: HashSet<BuiltBinary>,
205}
206
207#[derive(Hash, PartialEq, Eq)]
208struct BuiltBinary {
209    name: String,
210    profile: String,
211}
212
213// It is actually quite expensive to invoke `cargo build` even when there is nothing to build.
214// On my machine, on a specific project, it takes 170ms.
215// To avoid this cost for every call to start_with_args we use this global to keep track of which packages have been built by a BinProcess for the lifetime of the test run.
216//
217// Unfortunately this doesnt work when running each test in its own process. e.g. when using nextest
218// But worst case it just unnecessarily reruns `cargo build`.
219// TODO: we might be able to use CARGO_TARGET_TMPDIR to fix for nextest
220static CARGO_CACHE: LazyLock<Mutex<CargoCache>> = LazyLock::new(|| {
221    Mutex::new(CargoCache {
222        metadata: None,
223        built_binaries: HashSet::new(),
224    })
225});
226
227pub struct BinProcess {
228    /// Always Some while BinProcess is owned
229    child: Option<Child>,
230    event_rx: mpsc::UnboundedReceiver<Event>,
231}
232
233impl Drop for BinProcess {
234    fn drop(&mut self) {
235        if self.child.is_some() && !std::thread::panicking() {
236            panic!("Need to call either wait or shutdown_and_assert_success method on BinProcess before dropping it.");
237        }
238    }
239}
240
241impl BinProcess {
242    async fn start_binary_name(
243        cargo_bin_name: &str,
244        log_name: &str,
245        env_vars: &[(String, String)],
246        binary_args: &[String],
247        cargo_profile: Option<&str>,
248    ) -> BinProcess {
249        // PROFILE is set in build.rs from PROFILE listed in https://doc.rust-lang.org/cargo/reference/environment-variables.html#environment-variables-cargo-sets-for-build-scripts
250        let profile = cargo_profile.unwrap_or(if env!("PROFILE") == "release" {
251            "release"
252        } else {
253            "dev"
254        });
255
256        // First build the binary if its not yet built
257        let target_dir = {
258            let mut cargo_cache = CARGO_CACHE.lock().unwrap();
259            if cargo_cache.metadata.is_none() {
260                cargo_cache.metadata = Some(MetadataCommand::new().exec().unwrap());
261            }
262            let built_package = BuiltBinary {
263                name: cargo_bin_name.to_owned(),
264                profile: profile.to_owned(),
265            };
266            if !cargo_cache.built_binaries.contains(&built_package) {
267                let all_args = vec![
268                    "build",
269                    "--all-features",
270                    "--profile",
271                    profile,
272                    "--bin",
273                    cargo_bin_name,
274                ];
275                let metadata = cargo_cache.metadata.as_ref().unwrap();
276                run_command(
277                    metadata.workspace_root.as_std_path(),
278                    env!("CARGO"),
279                    &all_args,
280                )
281                .unwrap();
282                cargo_cache.built_binaries.insert(built_package);
283            }
284
285            cargo_cache
286                .metadata
287                .as_ref()
288                .unwrap()
289                .target_directory
290                .clone()
291        };
292
293        let target_profile_name = match profile {
294            // dev is mapped to debug for legacy reasons
295            "dev" => "debug",
296            // test and bench are hardcoded to reuse dev and release directories
297            "test" => "debug",
298            "bench" => "release",
299            profile => profile,
300        };
301        let bin_path = target_dir.join(target_profile_name).join(cargo_bin_name);
302        BinProcess::start_binary(
303            bin_path.into_std_path_buf().as_path(),
304            log_name,
305            env_vars,
306            binary_args,
307        )
308        .await
309    }
310
311    async fn start_binary(
312        bin_path: &Path,
313        log_name: &str,
314        env_vars: &[(String, String)],
315        binary_args: &[String],
316    ) -> BinProcess {
317        let log_name = if log_name.len() > 10 {
318            panic!("In order to line up in log outputs, argument log_name to BinProcess::start_with_args must be of length <= 10 but the value was: {log_name}");
319        } else {
320            format!("{log_name: <10}") // pads log_name up to 10 chars so that it lines up properly when included in log output.
321        };
322
323        let mut child = Command::new(bin_path)
324            .args(binary_args)
325            .envs(env_vars.iter().map(|(k, v)| (k.as_str(), v.as_str())))
326            .stdout(Stdio::piped())
327            .stderr(Stdio::piped())
328            .kill_on_drop(true)
329            .spawn()
330            .context(format!("Failed to run {bin_path:?}"))
331            .unwrap();
332
333        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
334        let stdout_reader = BufReader::new(child.stdout.take().unwrap()).lines();
335        let mut stderr_reader = BufReader::new(child.stderr.take().unwrap()).lines();
336        tokio::spawn(async move {
337            if let Err(err) = process_stdout_events(stdout_reader, &event_tx, log_name).await {
338                // Because we are in a task, panicking is likely to be ignored.
339                // Instead we generate a fake error event, which is possibly a bit confusing for the user but will at least cause the test to fail.
340                event_tx
341                    .send(Event {
342                        timestamp: "".to_owned(),
343                        level: Level::Error,
344                        target: "tokio-bin-process".to_owned(),
345                        fields: Fields {
346                            message: err.to_string(),
347                            fields: Default::default(),
348                        },
349                        span: Default::default(),
350                        spans: Default::default(),
351                    })
352                    .ok();
353            }
354        });
355        tokio::spawn(async move {
356            while let Some(line) = stderr_reader.next_line().await.expect("An IO error occured while reading stderr from the application, I'm not actually sure when this happens?") {
357                tracing::error!("stderr from process: {line}");
358            }
359        });
360
361        BinProcess {
362            child: Some(child),
363            event_rx,
364        }
365    }
366
367    /// Return the PID of
368    /// TODO: when nix crate is 1.0 this method should return a `nix::unistd::Pid` instead of an i32
369    pub fn pid(&self) -> i32 {
370        self.child.as_ref().unwrap().id().unwrap() as i32
371    }
372
373    /// TODO: make this public when nix crate goes 1.0
374    ///       Additionally reexport the Signal enum so that the user doesnt need to manually add nix crate to their Cargo.toml
375    fn send_signal(&self, signal: Signal) {
376        nix::sys::signal::kill(Pid::from_raw(self.pid()), signal).unwrap();
377    }
378
379    /// Send SIGTERM to the process
380    /// TODO: This will be replaced with a `send_signal` method when nix crate is 1.0
381    pub fn send_sigterm(&self) {
382        self.send_signal(Signal::SIGTERM)
383    }
384
385    /// Send SIGINT to the process
386    /// TODO: This will be replaced with a `send_signal` method when nix crate is 1.0
387    pub fn send_sigint(&self) {
388        self.send_signal(Signal::SIGINT)
389    }
390
391    /// Waits for the `ready` `EventMatcher` to match on an incoming event.
392    /// All events that were encountered while waiting are returned.
393    pub async fn wait_for(
394        &mut self,
395        ready: &EventMatcher,
396        expected_errors_and_warnings: &[EventMatcher],
397    ) -> Events {
398        let mut events = vec![];
399        while let Some(event) = self.event_rx.recv().await {
400            let ready_match = ready.matches(&event);
401            events.push(event);
402            if ready_match {
403                BinProcess::assert_no_errors_or_warnings(&events, expected_errors_and_warnings);
404                return Events { events };
405            }
406        }
407        panic!("bin process shutdown before an event was found matching {ready:?}")
408    }
409
410    /// Await `event_count` messages to be emitted from the process.
411    /// The emitted events are returned.
412    pub async fn consume_events(
413        &mut self,
414        event_count: usize,
415        expected_errors_and_warnings: &[EventMatcher],
416    ) -> Events {
417        let mut events = vec![];
418        for _ in 0..event_count {
419            match self.event_rx.recv().await {
420                Some(event) => events.push(event),
421                None => {
422                    if events.is_empty() {
423                        panic!("The process was terminated before the expected count of {event_count} events occured. No events received so far");
424                    } else {
425                        let events_received = events.iter().map(|x| format!("{x}")).join("\n");
426                        panic!("The process was terminated before the expected count of {event_count} events occured. Events received so far:\n{events_received}");
427                    }
428                }
429            }
430        }
431        BinProcess::assert_no_errors_or_warnings(&events, expected_errors_and_warnings);
432        Events { events }
433    }
434
435    /// Issues SIGTERM to the process and then awaits its shutdown.
436    /// All remaining events will be returned.
437    pub async fn shutdown_and_then_consume_events(
438        self,
439        expected_errors_and_warnings: &[EventMatcher],
440    ) -> Events {
441        self.send_signal(nix::sys::signal::Signal::SIGTERM);
442        self.consume_remaining_events(expected_errors_and_warnings)
443            .await
444    }
445
446    /// prefer `shutdown_and_then_consume_events`.
447    /// This method will not return until the process has terminated.
448    /// It is useful when you need to test a shutdown method other than SIGTERM.
449    pub async fn consume_remaining_events(
450        mut self,
451        expected_errors_and_warnings: &[EventMatcher],
452    ) -> Events {
453        let (events, status) = self
454            .consume_remaining_events_inner(expected_errors_and_warnings)
455            .await;
456
457        if status != 0 {
458            panic!("The bin process exited with {status} but expected 0 exit code (Success).\nevents:\n{events}");
459        }
460
461        events
462    }
463
464    /// Identical to `consume_remaining_events` but asserts that the process exited with failure code instead of success
465    pub async fn consume_remaining_events_expect_failure(
466        mut self,
467        expected_errors_and_warnings: &[EventMatcher],
468    ) -> Events {
469        let (events, status) = self
470            .consume_remaining_events_inner(expected_errors_and_warnings)
471            .await;
472
473        if status == 0 {
474            panic!("The bin process exited with {status} but expected non 0 exit code (Failure).\nevents:\n{events}");
475        }
476
477        events
478    }
479
480    fn assert_no_errors_or_warnings(
481        events: &[Event],
482        expected_errors_and_warnings: &[EventMatcher],
483    ) {
484        let mut error_count = vec![0; expected_errors_and_warnings.len()];
485        for event in events {
486            if let Level::Error | Level::Warn = event.level {
487                let mut matched = false;
488                for (matcher, count) in expected_errors_and_warnings
489                    .iter()
490                    .zip(error_count.iter_mut())
491                {
492                    if matcher.matches(event) {
493                        *count += 1;
494                        matched = true;
495                    }
496                }
497                if !matched {
498                    panic!("Unexpected event {event}\nAny ERROR or WARN events that occur in integration tests must be explicitly allowed by adding an appropriate EventMatcher to the method call.")
499                }
500            }
501        }
502
503        // TODO: move into Events::contains
504        for (matcher, count) in expected_errors_and_warnings.iter().zip(error_count.iter()) {
505            match matcher.count {
506                Count::Any => {}
507                Count::Times(matcher_count) => {
508                    if matcher_count != *count {
509                        panic!("Expected to find matches for {matcher:?}, {matcher_count} times but actually matched {count} times")
510                    }
511                }
512                Count::GreaterThanOrEqual(x) => {
513                    if *count < x {
514                        panic!("Expected to find matches for {matcher:?}, greater than or equal to {x} times but actually matched {count} times")
515                    }
516                }
517                Count::LessThanOrEqual(x) => {
518                    if *count > x {
519                        panic!("Expected to find matches for {matcher:?}, less than or equal to {x} times but actually matched {count} times")
520                    }
521                }
522            }
523        }
524    }
525
526    async fn consume_remaining_events_inner(
527        &mut self,
528        expected_errors_and_warnings: &[EventMatcher],
529    ) -> (Events, i32) {
530        // Take the child before we wait for the process to terminate.
531        // This ensures that the drop bomb wont go off if the future is dropped partway through.
532        // e.g. the user might have run BinProcess through `tokio::time::timeout`
533        let child = self.child.take().unwrap();
534
535        let mut events = vec![];
536        while let Some(event) = self.event_rx.recv().await {
537            events.push(event);
538        }
539
540        BinProcess::assert_no_errors_or_warnings(&events, expected_errors_and_warnings);
541
542        use std::os::unix::process::ExitStatusExt;
543        let output = child.wait_with_output().await.unwrap();
544        let status = output.status.code().unwrap_or_else(|| {
545            panic!(
546                r#"Failed to get exit status.
547The signal that killed the process was {:?}.
548Possible causes:
549* a SIGKILL was issued, something is going very wrong.
550* a SIGINT or SIGTERM was issued but the aplications handler aborted without returning an exit value. (The default handler does this)
551  If you are building a long running application you should handle SIGKILL and SIGTERM such that your application cleanly shutsdown and returns an exit value.
552  Consider referring to how the tokio-bin-process example uses https://docs.rs/tokio/latest/tokio/signal/unix/struct.Signal.html
553* a SIGINT or SIGTERM was issued and the aplication has an appropriate handler but the process was killed before the handler could be setup.
554"#,
555                output.status.signal()
556            )
557        });
558
559        (Events { events }, status)
560    }
561}
562
563async fn process_stdout_events(
564    mut reader: tokio::io::Lines<BufReader<ChildStdout>>,
565    event_tx: &mpsc::UnboundedSender<Event>,
566    name: String,
567) -> Result<()> {
568    while let Some(line) = reader.next_line().await.context("An IO error occured while reading stdout from the application, I'm not actually sure when this happens?")? {
569        let event = Event::from_json_str(&line).context(format!(
570            "The application emitted a line that was not a valid event encoded in json: {}",
571            line
572        ))?;
573        println!("{} {event}", Color::Default.dimmed().paint(&name));
574        if event_tx.send(event).is_err() {
575            // BinProcess is no longer interested in events
576            return Ok(());
577        }
578    }
579    Ok(())
580}
581
582/// Runs a command and returns the output as a string.
583/// Both stderr and stdout are returned in the result.
584fn run_command(working_dir: &Path, command: &str, args: &[&str]) -> Result<String> {
585    let data = Exec::cmd(command)
586        .args(args)
587        .cwd(working_dir)
588        .stdout(Redirection::Pipe)
589        .stderr(Redirection::Merge)
590        .capture()?;
591
592    if data.exit_status.success() {
593        Ok(data.stdout_str())
594    } else {
595        Err(anyhow!(
596            "command {} {:?} exited with {:?} and output:\n{}",
597            command,
598            args,
599            data.exit_status,
600            data.stdout_str()
601        ))
602    }
603}