Skip to main content

sql_fun_cli/
runnable.rs

1use std::{
2    path::PathBuf,
3    process::{ExitCode, Stdio},
4    time::Duration,
5};
6
7use anyhow::Context;
8use sql_fun_core::{CliSubCommand, SqlFunArgs};
9use tokio::{
10    io::{AsyncBufReadExt, AsyncRead, BufReader},
11    process::{Child, Command},
12};
13
14/// command output / error stream
15pub enum StdStream {
16    /// stdout line
17    Stdout(String),
18    /// stderr line
19    Stderr(String),
20}
21
22/// runnable command wrapper for [`CliSubCommand`]
23pub struct RunableCommand {
24    binary: PathBuf,
25    sub_command: CliSubCommand,
26}
27
28impl TryFrom<&CliSubCommand> for RunableCommand {
29    type Error = anyhow::Error;
30
31    fn try_from(value: &CliSubCommand) -> Result<Self, Self::Error> {
32        let binary_names = value.binary_names();
33        let mut binary_paths = Vec::new();
34
35        for bin in &binary_names {
36            if let Ok(bin_path) = which::which(bin) {
37                binary_paths.push(bin_path);
38            }
39        }
40
41        if binary_paths.is_empty() {
42            anyhow::bail!("expected binary not found {}", binary_names.join(", "))
43        }
44        if binary_paths.len() > 1 {
45            anyhow::bail!("ambiguous binary found {binary_paths:?}")
46        }
47
48        Ok(Self {
49            binary: binary_paths[0].clone(),
50            sub_command: value.clone(),
51        })
52    }
53}
54
55impl RunableCommand {
56    #[cfg(unix)]
57    fn send_sigterm(child: &Child) -> Result<(), anyhow::Error> {
58        use ::nix::unistd::Pid;
59        use nix::sys::signal::Signal;
60        let pid = Pid::from_raw(Self::pid_as_i32(child)?);
61        nix::sys::signal::kill(pid, Signal::SIGTERM)?;
62        Ok(())
63    }
64
65    async fn read_line_from<Stdio>(
66        from: &mut BufReader<Stdio>,
67        buffer: &mut String,
68        eof: &mut bool,
69    ) -> Result<Option<String>, anyhow::Error>
70    where
71        Stdio: AsyncRead + Unpin,
72    {
73        let len = from.read_line(buffer).await?;
74        if len == 0 {
75            *eof = true;
76            Ok(None)
77        } else {
78            let result = buffer.clone();
79            buffer.clear();
80            Ok(Some(result))
81        }
82    }
83
84    /// execute binary
85    ///
86    /// # Errors
87    ///
88    /// Returns an error when spawning or waiting on the child process fails.
89    pub async fn execute(&self, args: &SqlFunArgs) -> Result<ExitCode, anyhow::Error> {
90        match &self.sub_command {
91            CliSubCommand::External(external_args) => {
92                let exit_code = self.run_external(args, external_args).await?;
93                Ok(ExitCode::from(u8::try_from(exit_code).unwrap_or(u8::MAX)))
94            }
95            CliSubCommand::Start(_daemon_control_args) => todo!(),
96            CliSubCommand::Stop(_daemon_control_args) => todo!(),
97            CliSubCommand::Status(_daemon_control_args) => todo!(),
98            CliSubCommand::Restart(_daemon_control_args) => todo!(),
99            CliSubCommand::Initialize(_initialize_args) => todo!(),
100            CliSubCommand::Versions => todo!(),
101        }
102    }
103
104    async fn run_external(
105        &self,
106        core_args: &SqlFunArgs,
107        args: &[String],
108    ) -> Result<i32, anyhow::Error> {
109        let envs = core_args.environments()?;
110        let mut child = Command::new(&self.binary)
111            .env_clear()
112            .envs(&envs)
113            .args(args)
114            .stdout(Stdio::piped())
115            .stderr(Stdio::piped())
116            .spawn()?;
117        let Some(stdout) = child.stdout.take() else {
118            anyhow::bail!("can not to get STDOUT")
119        };
120        let Some(stderr) = child.stderr.take() else {
121            anyhow::bail!("can not to get STDERR")
122        };
123        let mut stdout = BufReader::new(stdout);
124        let mut stderr = BufReader::new(stderr);
125        let mut stdout_line_buf = String::new();
126        let mut stderr_line_buf = String::new();
127        let mut stream_items = Vec::new();
128        let mut stderr_eof = false;
129        let mut stdout_eof = false;
130        #[cfg(unix)]
131        let mut term_sent = false;
132        let timeout = Duration::from_secs(30);
133        let grace_timer = tokio::time::sleep(Duration::MAX);
134        tokio::pin!(grace_timer);
135        let status_result = loop {
136            let stream_item = tokio::select! {
137                biased;
138                line = Self::read_line_from( &mut stderr,&mut stderr_line_buf, &mut stderr_eof ), if !stderr_eof
139                => {
140                    let Some(line) = line? else {
141                        continue;
142                    };
143                    eprint!("{}", &line);
144                    StdStream::Stderr(line)
145                },
146                line = Self::read_line_from( &mut stdout,&mut stdout_line_buf, &mut stdout_eof ), if !stdout_eof
147                => {
148                    let Some(line) = line? else { continue; };
149                    print!("{}",&line);
150                    StdStream::Stdout(line)
151                },
152                status = child.wait() => {
153                    break status;
154                },
155                () = tokio::time::sleep(timeout) => {
156                    #[cfg(unix)]
157                    if !term_sent {
158                        let pid = Self::pid_as_i32(&child)?;
159                        eprintln!("Timeout reached, sending SIGTERM {pid}");
160                        Self::send_sigterm(&child)?;
161                        grace_timer.as_mut().reset(tokio::time::Instant::now() + timeout);
162                        term_sent =true;
163                        continue;
164                    }
165                    eprintln!("Timeout reached, killing process");
166                    child.start_kill()?;
167                    continue;
168                },
169                () = &mut grace_timer => {
170                    eprintln!("Terminating");
171                    child.start_kill()?;
172                    continue;
173                }
174            };
175            stream_items.push(stream_item);
176        };
177        if !stderr_eof {
178            while stderr.read_line(&mut stderr_line_buf).await? != 0 {
179                eprint!("{}", &stderr_line_buf);
180                stream_items.push(StdStream::Stderr(stderr_line_buf.clone()));
181                stderr_line_buf.clear();
182            }
183        }
184        if !stdout_eof {
185            while stdout.read_line(&mut stdout_line_buf).await? != 0 {
186                print!("{}", &stdout_line_buf);
187                stream_items.push(StdStream::Stdout(stdout_line_buf.clone()));
188                stdout_line_buf.clear();
189            }
190        }
191        let status_result = status_result?;
192        let exit_code = status_result.code().unwrap_or_else(|| {
193            #[cfg(windows)]
194            {
195                unreachable!()
196            }
197            #[cfg(unix)]
198            {
199                use std::os::unix::process::ExitStatusExt;
200                128 + status_result.signal().unwrap_or(0)
201            }
202        });
203        Ok(exit_code)
204    }
205
206    #[cfg(unix)]
207    fn pid_as_i32(child: &Child) -> Result<i32, anyhow::Error> {
208        let pid = child.id().context("child has pid")?;
209        i32::try_from(pid).context("pid does not fit into i32")
210    }
211}