glory_cli/ext/
sync.rs

1use crate::ext::anyhow::{bail, Context, Result};
2use std::{
3    net::SocketAddr,
4    process::{Output, Stdio},
5    time::Duration,
6};
7use tokio::{
8    net::TcpStream,
9    process::{Child, Command},
10    sync::broadcast,
11    time::sleep,
12};
13
14pub trait OutputExt {
15    fn stderr(&self) -> String;
16    fn has_stderr(&self) -> bool;
17    fn stdout(&self) -> String;
18    fn has_stdout(&self) -> bool;
19}
20
21impl OutputExt for Output {
22    fn stderr(&self) -> String {
23        String::from_utf8_lossy(&self.stderr).to_string()
24    }
25
26    fn has_stderr(&self) -> bool {
27        println!("stderr: {}\n'{}'", self.stderr.len(), self.stderr());
28        self.stderr.len() > 1
29    }
30
31    fn stdout(&self) -> String {
32        String::from_utf8_lossy(&self.stdout).to_string()
33    }
34
35    fn has_stdout(&self) -> bool {
36        self.stdout.len() > 1
37    }
38}
39pub enum CommandResult<T> {
40    Success(T),
41    Failure(T),
42    Interrupted,
43}
44
45pub async fn wait_interruptible(name: &str, mut process: Child, mut interrupt_rx: broadcast::Receiver<()>) -> Result<CommandResult<()>> {
46    tokio::select! {
47        res = process.wait() => match res {
48            Ok(exit) => {
49                if exit.success() {
50                    log::trace!("{name} process finished with success");
51                    Ok(CommandResult::Success(()))
52                } else {
53                    log::trace!("{name} process finished with code {:?}", exit.code());
54                    Ok(CommandResult::Failure(()))
55                }
56            }
57            Err(e) => bail!("Command failed due to: {e}"),
58        },
59        _ = interrupt_rx.recv() => {
60            process.kill().await.context("Could not kill process")?;
61            log::trace!("{name} process interrupted");
62            Ok(CommandResult::Interrupted)
63        }
64    }
65}
66
67pub async fn wait_piped_interruptible(name: &str, mut cmd: Command, mut interrupt_rx: broadcast::Receiver<()>) -> Result<CommandResult<Output>> {
68    // see: https://docs.rs/tokio/latest/tokio/process/index.html
69
70    cmd.kill_on_drop(true);
71    cmd.stdout(Stdio::piped());
72    cmd.stderr(Stdio::piped());
73    let process = cmd.spawn()?;
74    tokio::select! {
75        res = process.wait_with_output() => match res {
76            Ok(output) => {
77                if output.status.success() {
78                    log::trace!("{name} process finished with success");
79                    Ok(CommandResult::Success(output))
80                } else {
81                    log::trace!("{name} process finished with code {:?}", output.status.code());
82                    Ok(CommandResult::Failure(output))
83                }
84            }
85            Err(e) => bail!("Command failed due to: {e}"),
86        },
87        _ = interrupt_rx.recv() => {
88            log::trace!("{name} process interrupted");
89            Ok(CommandResult::Interrupted)
90        }
91    }
92}
93pub async fn wait_for_socket(name: &str, addr: SocketAddr) -> bool {
94    let duration = Duration::from_millis(500);
95
96    for _ in 0..20 {
97        if TcpStream::connect(&addr).await.is_ok() {
98            log::debug!("{name} server port {addr} open");
99            return true;
100        }
101        sleep(duration).await;
102    }
103    log::warn!("{name} timed out waiting for port {addr}");
104    false
105}