1use std::{
2 path::PathBuf,
3 process::{ExitCode, Stdio},
4 time::Duration,
5};
6
7use anyhow::Context;
8use sql_fun_core::{CliSubCommand, SqlFunArgs};
9use tokio::{
10 io::{AsyncBufReadExt, AsyncRead, BufReader},
11 process::{Child, Command},
12};
13
14pub enum StdStream {
16 Stdout(String),
18 Stderr(String),
20}
21
22pub struct RunableCommand {
24 binary: PathBuf,
25 sub_command: CliSubCommand,
26}
27
28impl TryFrom<&CliSubCommand> for RunableCommand {
29 type Error = anyhow::Error;
30
31 fn try_from(value: &CliSubCommand) -> Result<Self, Self::Error> {
32 let binary_names = value.binary_names();
33 let mut binary_paths = Vec::new();
34
35 for bin in &binary_names {
36 if let Ok(bin_path) = which::which(bin) {
37 binary_paths.push(bin_path);
38 }
39 }
40
41 if binary_paths.is_empty() {
42 anyhow::bail!("expected binary not found {}", binary_names.join(", "))
43 }
44 if binary_paths.len() > 1 {
45 anyhow::bail!("ambiguous binary found {binary_paths:?}")
46 }
47
48 Ok(Self {
49 binary: binary_paths[0].clone(),
50 sub_command: value.clone(),
51 })
52 }
53}
54
55impl RunableCommand {
56 #[cfg(unix)]
57 fn send_sigterm(child: &Child) -> Result<(), anyhow::Error> {
58 use ::nix::unistd::Pid;
59 use nix::sys::signal::Signal;
60 let pid = Pid::from_raw(Self::pid_as_i32(child)?);
61 nix::sys::signal::kill(pid, Signal::SIGTERM)?;
62 Ok(())
63 }
64
65 async fn read_line_from<Stdio>(
66 from: &mut BufReader<Stdio>,
67 buffer: &mut String,
68 eof: &mut bool,
69 ) -> Result<Option<String>, anyhow::Error>
70 where
71 Stdio: AsyncRead + Unpin,
72 {
73 let len = from.read_line(buffer).await?;
74 if len == 0 {
75 *eof = true;
76 Ok(None)
77 } else {
78 let result = buffer.clone();
79 buffer.clear();
80 Ok(Some(result))
81 }
82 }
83
84 pub async fn execute(&self, args: &SqlFunArgs) -> Result<ExitCode, anyhow::Error> {
90 match &self.sub_command {
91 CliSubCommand::External(external_args) => {
92 let exit_code = self.run_external(args, external_args).await?;
93 Ok(ExitCode::from(u8::try_from(exit_code).unwrap_or(u8::MAX)))
94 }
95 CliSubCommand::Start(_daemon_control_args) => todo!(),
96 CliSubCommand::Stop(_daemon_control_args) => todo!(),
97 CliSubCommand::Status(_daemon_control_args) => todo!(),
98 CliSubCommand::Restart(_daemon_control_args) => todo!(),
99 CliSubCommand::Initialize(_initialize_args) => todo!(),
100 CliSubCommand::Versions => todo!(),
101 }
102 }
103
104 async fn run_external(
105 &self,
106 core_args: &SqlFunArgs,
107 args: &[String],
108 ) -> Result<i32, anyhow::Error> {
109 let envs = core_args.environments()?;
110 let mut child = Command::new(&self.binary)
111 .env_clear()
112 .envs(&envs)
113 .args(args)
114 .stdout(Stdio::piped())
115 .stderr(Stdio::piped())
116 .spawn()?;
117 let Some(stdout) = child.stdout.take() else {
118 anyhow::bail!("can not to get STDOUT")
119 };
120 let Some(stderr) = child.stderr.take() else {
121 anyhow::bail!("can not to get STDERR")
122 };
123 let mut stdout = BufReader::new(stdout);
124 let mut stderr = BufReader::new(stderr);
125 let mut stdout_line_buf = String::new();
126 let mut stderr_line_buf = String::new();
127 let mut stream_items = Vec::new();
128 let mut stderr_eof = false;
129 let mut stdout_eof = false;
130 #[cfg(unix)]
131 let mut term_sent = false;
132 let timeout = Duration::from_secs(30);
133 let grace_timer = tokio::time::sleep(Duration::MAX);
134 tokio::pin!(grace_timer);
135 let status_result = loop {
136 let stream_item = tokio::select! {
137 biased;
138 line = Self::read_line_from( &mut stderr,&mut stderr_line_buf, &mut stderr_eof ), if !stderr_eof
139 => {
140 let Some(line) = line? else {
141 continue;
142 };
143 eprint!("{}", &line);
144 StdStream::Stderr(line)
145 },
146 line = Self::read_line_from( &mut stdout,&mut stdout_line_buf, &mut stdout_eof ), if !stdout_eof
147 => {
148 let Some(line) = line? else { continue; };
149 print!("{}",&line);
150 StdStream::Stdout(line)
151 },
152 status = child.wait() => {
153 break status;
154 },
155 () = tokio::time::sleep(timeout) => {
156 #[cfg(unix)]
157 if !term_sent {
158 let pid = Self::pid_as_i32(&child)?;
159 eprintln!("Timeout reached, sending SIGTERM {pid}");
160 Self::send_sigterm(&child)?;
161 grace_timer.as_mut().reset(tokio::time::Instant::now() + timeout);
162 term_sent =true;
163 continue;
164 }
165 eprintln!("Timeout reached, killing process");
166 child.start_kill()?;
167 continue;
168 },
169 () = &mut grace_timer => {
170 eprintln!("Terminating");
171 child.start_kill()?;
172 continue;
173 }
174 };
175 stream_items.push(stream_item);
176 };
177 if !stderr_eof {
178 while stderr.read_line(&mut stderr_line_buf).await? != 0 {
179 eprint!("{}", &stderr_line_buf);
180 stream_items.push(StdStream::Stderr(stderr_line_buf.clone()));
181 stderr_line_buf.clear();
182 }
183 }
184 if !stdout_eof {
185 while stdout.read_line(&mut stdout_line_buf).await? != 0 {
186 print!("{}", &stdout_line_buf);
187 stream_items.push(StdStream::Stdout(stdout_line_buf.clone()));
188 stdout_line_buf.clear();
189 }
190 }
191 let status_result = status_result?;
192 let exit_code = status_result.code().unwrap_or_else(|| {
193 #[cfg(windows)]
194 {
195 unreachable!()
196 }
197 #[cfg(unix)]
198 {
199 use std::os::unix::process::ExitStatusExt;
200 128 + status_result.signal().unwrap_or(0)
201 }
202 });
203 Ok(exit_code)
204 }
205
206 #[cfg(unix)]
207 fn pid_as_i32(child: &Child) -> Result<i32, anyhow::Error> {
208 let pid = child.id().context("child has pid")?;
209 i32::try_from(pid).context("pid does not fit into i32")
210 }
211}