1use crate::event::{Event, Fields, Level};
2use crate::event_matcher::{Count, EventMatcher, Events};
3use anyhow::{anyhow, Context, Result};
4use cargo_metadata::{Metadata, MetadataCommand};
5use itertools::Itertools;
6use nix::sys::signal::Signal;
7use nix::unistd::Pid;
8use nu_ansi_term::Color;
9use std::collections::HashSet;
10use std::env;
11use std::path::{Path, PathBuf};
12use std::process::Stdio;
13use std::sync::{LazyLock, Mutex};
14use subprocess::{Exec, Redirection};
15use tokio::sync::mpsc;
16use tokio::{
17 io::{AsyncBufReadExt, BufReader},
18 process::{Child, ChildStdout, Command},
19};
20
21enum BuilderKind {
22 Path(PathBuf),
23 CargoName {
24 name: String,
25 profile: Option<String>,
26 },
27}
28
29pub struct BinProcessBuilder {
112 kind: BuilderKind,
113 log_name: Option<String>,
114 binary_args: Vec<String>,
115 env_vars: Vec<(String, String)>,
116}
117
118impl BinProcessBuilder {
119 pub fn from_path(bin_path: PathBuf) -> Self {
123 BinProcessBuilder {
124 kind: BuilderKind::Path(bin_path),
125 log_name: None,
126 binary_args: vec![],
127 env_vars: vec![],
128 }
129 }
130
131 pub fn from_cargo_name(name: String, profile: Option<String>) -> Self {
144 BinProcessBuilder {
145 kind: BuilderKind::CargoName { name, profile },
146 log_name: None,
147 binary_args: vec![],
148 env_vars: vec![],
149 }
150 }
151
152 pub fn with_log_name(mut self, log_name: Option<String>) -> Self {
156 self.log_name = log_name;
157 self
158 }
159
160 pub fn with_args(mut self, args: Vec<String>) -> Self {
163 self.binary_args = args;
164 self
165 }
166
167 pub fn with_env_vars(mut self, env_vars: Vec<(String, String)>) -> Self {
170 self.env_vars = env_vars;
171 self
172 }
173
174 pub async fn start(self) -> BinProcess {
176 match self.kind {
177 BuilderKind::Path(path) => {
178 let binary_name = path.file_name().expect("Path needs at least one element");
179 let log_name = self
180 .log_name
181 .as_deref()
182 .unwrap_or(binary_name.to_str().unwrap());
183
184 BinProcess::start_binary(&path, log_name, &self.env_vars, &self.binary_args).await
185 }
186 BuilderKind::CargoName { name, profile } => {
187 let log_name = self.log_name.as_deref().unwrap_or(&name);
188
189 BinProcess::start_binary_name(
190 &name,
191 log_name,
192 &self.env_vars,
193 &self.binary_args,
194 profile.as_deref(),
195 )
196 .await
197 }
198 }
199 }
200}
201
202struct CargoCache {
203 metadata: Option<Metadata>,
204 built_binaries: HashSet<BuiltBinary>,
205}
206
207#[derive(Hash, PartialEq, Eq)]
208struct BuiltBinary {
209 name: String,
210 profile: String,
211}
212
213static CARGO_CACHE: LazyLock<Mutex<CargoCache>> = LazyLock::new(|| {
221 Mutex::new(CargoCache {
222 metadata: None,
223 built_binaries: HashSet::new(),
224 })
225});
226
227pub struct BinProcess {
228 child: Option<Child>,
230 event_rx: mpsc::UnboundedReceiver<Event>,
231}
232
233impl Drop for BinProcess {
234 fn drop(&mut self) {
235 if self.child.is_some() && !std::thread::panicking() {
236 panic!("Need to call either wait or shutdown_and_assert_success method on BinProcess before dropping it.");
237 }
238 }
239}
240
241impl BinProcess {
242 async fn start_binary_name(
243 cargo_bin_name: &str,
244 log_name: &str,
245 env_vars: &[(String, String)],
246 binary_args: &[String],
247 cargo_profile: Option<&str>,
248 ) -> BinProcess {
249 let profile = cargo_profile.unwrap_or(if env!("PROFILE") == "release" {
251 "release"
252 } else {
253 "dev"
254 });
255
256 let target_dir = {
258 let mut cargo_cache = CARGO_CACHE.lock().unwrap();
259 if cargo_cache.metadata.is_none() {
260 cargo_cache.metadata = Some(MetadataCommand::new().exec().unwrap());
261 }
262 let built_package = BuiltBinary {
263 name: cargo_bin_name.to_owned(),
264 profile: profile.to_owned(),
265 };
266 if !cargo_cache.built_binaries.contains(&built_package) {
267 let all_args = vec![
268 "build",
269 "--all-features",
270 "--profile",
271 profile,
272 "--bin",
273 cargo_bin_name,
274 ];
275 let metadata = cargo_cache.metadata.as_ref().unwrap();
276 run_command(
277 metadata.workspace_root.as_std_path(),
278 env!("CARGO"),
279 &all_args,
280 )
281 .unwrap();
282 cargo_cache.built_binaries.insert(built_package);
283 }
284
285 cargo_cache
286 .metadata
287 .as_ref()
288 .unwrap()
289 .target_directory
290 .clone()
291 };
292
293 let target_profile_name = match profile {
294 "dev" => "debug",
296 "test" => "debug",
298 "bench" => "release",
299 profile => profile,
300 };
301 let bin_path = target_dir.join(target_profile_name).join(cargo_bin_name);
302 BinProcess::start_binary(
303 bin_path.into_std_path_buf().as_path(),
304 log_name,
305 env_vars,
306 binary_args,
307 )
308 .await
309 }
310
311 async fn start_binary(
312 bin_path: &Path,
313 log_name: &str,
314 env_vars: &[(String, String)],
315 binary_args: &[String],
316 ) -> BinProcess {
317 let log_name = if log_name.len() > 10 {
318 panic!("In order to line up in log outputs, argument log_name to BinProcess::start_with_args must be of length <= 10 but the value was: {log_name}");
319 } else {
320 format!("{log_name: <10}") };
322
323 let mut child = Command::new(bin_path)
324 .args(binary_args)
325 .envs(env_vars.iter().map(|(k, v)| (k.as_str(), v.as_str())))
326 .stdout(Stdio::piped())
327 .stderr(Stdio::piped())
328 .kill_on_drop(true)
329 .spawn()
330 .context(format!("Failed to run {bin_path:?}"))
331 .unwrap();
332
333 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
334 let stdout_reader = BufReader::new(child.stdout.take().unwrap()).lines();
335 let mut stderr_reader = BufReader::new(child.stderr.take().unwrap()).lines();
336 tokio::spawn(async move {
337 if let Err(err) = process_stdout_events(stdout_reader, &event_tx, log_name).await {
338 event_tx
341 .send(Event {
342 timestamp: "".to_owned(),
343 level: Level::Error,
344 target: "tokio-bin-process".to_owned(),
345 fields: Fields {
346 message: err.to_string(),
347 fields: Default::default(),
348 },
349 span: Default::default(),
350 spans: Default::default(),
351 })
352 .ok();
353 }
354 });
355 tokio::spawn(async move {
356 while let Some(line) = stderr_reader.next_line().await.expect("An IO error occured while reading stderr from the application, I'm not actually sure when this happens?") {
357 tracing::error!("stderr from process: {line}");
358 }
359 });
360
361 BinProcess {
362 child: Some(child),
363 event_rx,
364 }
365 }
366
367 pub fn pid(&self) -> i32 {
370 self.child.as_ref().unwrap().id().unwrap() as i32
371 }
372
373 fn send_signal(&self, signal: Signal) {
376 nix::sys::signal::kill(Pid::from_raw(self.pid()), signal).unwrap();
377 }
378
379 pub fn send_sigterm(&self) {
382 self.send_signal(Signal::SIGTERM)
383 }
384
385 pub fn send_sigint(&self) {
388 self.send_signal(Signal::SIGINT)
389 }
390
391 pub async fn wait_for(
394 &mut self,
395 ready: &EventMatcher,
396 expected_errors_and_warnings: &[EventMatcher],
397 ) -> Events {
398 let mut events = vec![];
399 while let Some(event) = self.event_rx.recv().await {
400 let ready_match = ready.matches(&event);
401 events.push(event);
402 if ready_match {
403 BinProcess::assert_no_errors_or_warnings(&events, expected_errors_and_warnings);
404 return Events { events };
405 }
406 }
407 panic!("bin process shutdown before an event was found matching {ready:?}")
408 }
409
410 pub async fn consume_events(
413 &mut self,
414 event_count: usize,
415 expected_errors_and_warnings: &[EventMatcher],
416 ) -> Events {
417 let mut events = vec![];
418 for _ in 0..event_count {
419 match self.event_rx.recv().await {
420 Some(event) => events.push(event),
421 None => {
422 if events.is_empty() {
423 panic!("The process was terminated before the expected count of {event_count} events occured. No events received so far");
424 } else {
425 let events_received = events.iter().map(|x| format!("{x}")).join("\n");
426 panic!("The process was terminated before the expected count of {event_count} events occured. Events received so far:\n{events_received}");
427 }
428 }
429 }
430 }
431 BinProcess::assert_no_errors_or_warnings(&events, expected_errors_and_warnings);
432 Events { events }
433 }
434
435 pub async fn shutdown_and_then_consume_events(
438 self,
439 expected_errors_and_warnings: &[EventMatcher],
440 ) -> Events {
441 self.send_signal(nix::sys::signal::Signal::SIGTERM);
442 self.consume_remaining_events(expected_errors_and_warnings)
443 .await
444 }
445
446 pub async fn consume_remaining_events(
450 mut self,
451 expected_errors_and_warnings: &[EventMatcher],
452 ) -> Events {
453 let (events, status) = self
454 .consume_remaining_events_inner(expected_errors_and_warnings)
455 .await;
456
457 if status != 0 {
458 panic!("The bin process exited with {status} but expected 0 exit code (Success).\nevents:\n{events}");
459 }
460
461 events
462 }
463
464 pub async fn consume_remaining_events_expect_failure(
466 mut self,
467 expected_errors_and_warnings: &[EventMatcher],
468 ) -> Events {
469 let (events, status) = self
470 .consume_remaining_events_inner(expected_errors_and_warnings)
471 .await;
472
473 if status == 0 {
474 panic!("The bin process exited with {status} but expected non 0 exit code (Failure).\nevents:\n{events}");
475 }
476
477 events
478 }
479
480 fn assert_no_errors_or_warnings(
481 events: &[Event],
482 expected_errors_and_warnings: &[EventMatcher],
483 ) {
484 let mut error_count = vec![0; expected_errors_and_warnings.len()];
485 for event in events {
486 if let Level::Error | Level::Warn = event.level {
487 let mut matched = false;
488 for (matcher, count) in expected_errors_and_warnings
489 .iter()
490 .zip(error_count.iter_mut())
491 {
492 if matcher.matches(event) {
493 *count += 1;
494 matched = true;
495 }
496 }
497 if !matched {
498 panic!("Unexpected event {event}\nAny ERROR or WARN events that occur in integration tests must be explicitly allowed by adding an appropriate EventMatcher to the method call.")
499 }
500 }
501 }
502
503 for (matcher, count) in expected_errors_and_warnings.iter().zip(error_count.iter()) {
505 match matcher.count {
506 Count::Any => {}
507 Count::Times(matcher_count) => {
508 if matcher_count != *count {
509 panic!("Expected to find matches for {matcher:?}, {matcher_count} times but actually matched {count} times")
510 }
511 }
512 Count::GreaterThanOrEqual(x) => {
513 if *count < x {
514 panic!("Expected to find matches for {matcher:?}, greater than or equal to {x} times but actually matched {count} times")
515 }
516 }
517 Count::LessThanOrEqual(x) => {
518 if *count > x {
519 panic!("Expected to find matches for {matcher:?}, less than or equal to {x} times but actually matched {count} times")
520 }
521 }
522 }
523 }
524 }
525
526 async fn consume_remaining_events_inner(
527 &mut self,
528 expected_errors_and_warnings: &[EventMatcher],
529 ) -> (Events, i32) {
530 let child = self.child.take().unwrap();
534
535 let mut events = vec![];
536 while let Some(event) = self.event_rx.recv().await {
537 events.push(event);
538 }
539
540 BinProcess::assert_no_errors_or_warnings(&events, expected_errors_and_warnings);
541
542 use std::os::unix::process::ExitStatusExt;
543 let output = child.wait_with_output().await.unwrap();
544 let status = output.status.code().unwrap_or_else(|| {
545 panic!(
546 r#"Failed to get exit status.
547The signal that killed the process was {:?}.
548Possible causes:
549* a SIGKILL was issued, something is going very wrong.
550* a SIGINT or SIGTERM was issued but the aplications handler aborted without returning an exit value. (The default handler does this)
551 If you are building a long running application you should handle SIGKILL and SIGTERM such that your application cleanly shutsdown and returns an exit value.
552 Consider referring to how the tokio-bin-process example uses https://docs.rs/tokio/latest/tokio/signal/unix/struct.Signal.html
553* a SIGINT or SIGTERM was issued and the aplication has an appropriate handler but the process was killed before the handler could be setup.
554"#,
555 output.status.signal()
556 )
557 });
558
559 (Events { events }, status)
560 }
561}
562
563async fn process_stdout_events(
564 mut reader: tokio::io::Lines<BufReader<ChildStdout>>,
565 event_tx: &mpsc::UnboundedSender<Event>,
566 name: String,
567) -> Result<()> {
568 while let Some(line) = reader.next_line().await.context("An IO error occured while reading stdout from the application, I'm not actually sure when this happens?")? {
569 let event = Event::from_json_str(&line).context(format!(
570 "The application emitted a line that was not a valid event encoded in json: {}",
571 line
572 ))?;
573 println!("{} {event}", Color::Default.dimmed().paint(&name));
574 if event_tx.send(event).is_err() {
575 return Ok(());
577 }
578 }
579 Ok(())
580}
581
582fn run_command(working_dir: &Path, command: &str, args: &[&str]) -> Result<String> {
585 let data = Exec::cmd(command)
586 .args(args)
587 .cwd(working_dir)
588 .stdout(Redirection::Pipe)
589 .stderr(Redirection::Merge)
590 .capture()?;
591
592 if data.exit_status.success() {
593 Ok(data.stdout_str())
594 } else {
595 Err(anyhow!(
596 "command {} {:?} exited with {:?} and output:\n{}",
597 command,
598 args,
599 data.exit_status,
600 data.stdout_str()
601 ))
602 }
603}