1use crate::ext::anyhow::{bail, Context, Result};
2use std::{
3 net::SocketAddr,
4 process::{Output, Stdio},
5 time::Duration,
6};
7use tokio::{
8 net::TcpStream,
9 process::{Child, Command},
10 sync::broadcast,
11 time::sleep,
12};
13
14pub trait OutputExt {
15 fn stderr(&self) -> String;
16 fn has_stderr(&self) -> bool;
17 fn stdout(&self) -> String;
18 fn has_stdout(&self) -> bool;
19}
20
21impl OutputExt for Output {
22 fn stderr(&self) -> String {
23 String::from_utf8_lossy(&self.stderr).to_string()
24 }
25
26 fn has_stderr(&self) -> bool {
27 println!("stderr: {}\n'{}'", self.stderr.len(), self.stderr());
28 self.stderr.len() > 1
29 }
30
31 fn stdout(&self) -> String {
32 String::from_utf8_lossy(&self.stdout).to_string()
33 }
34
35 fn has_stdout(&self) -> bool {
36 self.stdout.len() > 1
37 }
38}
39pub enum CommandResult<T> {
40 Success(T),
41 Failure(T),
42 Interrupted,
43}
44
45pub async fn wait_interruptible(name: &str, mut process: Child, mut interrupt_rx: broadcast::Receiver<()>) -> Result<CommandResult<()>> {
46 tokio::select! {
47 res = process.wait() => match res {
48 Ok(exit) => {
49 if exit.success() {
50 log::trace!("{name} process finished with success");
51 Ok(CommandResult::Success(()))
52 } else {
53 log::trace!("{name} process finished with code {:?}", exit.code());
54 Ok(CommandResult::Failure(()))
55 }
56 }
57 Err(e) => bail!("Command failed due to: {e}"),
58 },
59 _ = interrupt_rx.recv() => {
60 process.kill().await.context("Could not kill process")?;
61 log::trace!("{name} process interrupted");
62 Ok(CommandResult::Interrupted)
63 }
64 }
65}
66
67pub async fn wait_piped_interruptible(name: &str, mut cmd: Command, mut interrupt_rx: broadcast::Receiver<()>) -> Result<CommandResult<Output>> {
68 cmd.kill_on_drop(true);
71 cmd.stdout(Stdio::piped());
72 cmd.stderr(Stdio::piped());
73 let process = cmd.spawn()?;
74 tokio::select! {
75 res = process.wait_with_output() => match res {
76 Ok(output) => {
77 if output.status.success() {
78 log::trace!("{name} process finished with success");
79 Ok(CommandResult::Success(output))
80 } else {
81 log::trace!("{name} process finished with code {:?}", output.status.code());
82 Ok(CommandResult::Failure(output))
83 }
84 }
85 Err(e) => bail!("Command failed due to: {e}"),
86 },
87 _ = interrupt_rx.recv() => {
88 log::trace!("{name} process interrupted");
89 Ok(CommandResult::Interrupted)
90 }
91 }
92}
93pub async fn wait_for_socket(name: &str, addr: SocketAddr) -> bool {
94 let duration = Duration::from_millis(500);
95
96 for _ in 0..20 {
97 if TcpStream::connect(&addr).await.is_ok() {
98 log::debug!("{name} server port {addr} open");
99 return true;
100 }
101 sleep(duration).await;
102 }
103 log::warn!("{name} timed out waiting for port {addr}");
104 false
105}