1use std::{
4 ffi::{OsStr, OsString},
5 future::Future,
6 os::fd::OwnedFd,
7 path::Path,
8 pin::Pin,
9 process::{Output, Stdio},
10 task::{Context, Poll},
11 time::Duration,
12};
13
14use tokio::{
15 io::unix::AsyncFd,
16 process::{Child, ChildStderr, ChildStdin, ChildStdout},
17 task::JoinHandle,
18};
19use tokio_util::compat::{Compat, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
20
21use super::{
22 util::{chown_all_blocking, get_stdio_from_piped},
23 Runtime, RuntimeAsyncFd, RuntimeChild, RuntimeTask,
24};
25
26#[derive(Clone)]
29pub struct TokioRuntime;
30
31impl Runtime for TokioRuntime {
32 type Task<O: Send + 'static> = TokioRuntimeTask<O>;
33 type TimeoutError = tokio::time::error::Elapsed;
34 type File = Compat<tokio::fs::File>;
35 type AsyncFd = TokioRuntimeAsyncFd;
36 type Child = TokioRuntimeChild;
37
38 #[cfg(feature = "vmm-process")]
39 #[cfg_attr(docsrs, doc(cfg(feature = "vmm-process")))]
40 type SocketBackend = hyper_client_sockets::tokio::TokioBackend;
41
42 fn spawn_task<F>(&self, future: F) -> Self::Task<F::Output>
43 where
44 F: Future + Send + 'static,
45 F::Output: Send + 'static,
46 {
47 TokioRuntimeTask(tokio::task::spawn(future))
48 }
49
50 fn timeout<F>(
51 &self,
52 duration: Duration,
53 future: F,
54 ) -> impl Future<Output = Result<F::Output, Self::TimeoutError>> + Send
55 where
56 F: Future + Send,
57 F::Output: Send,
58 {
59 tokio::time::timeout(duration, future)
60 }
61
62 fn fs_exists(&self, path: &Path) -> impl Future<Output = Result<bool, std::io::Error>> + Send {
63 tokio::fs::try_exists(path)
64 }
65
66 fn fs_remove_file(&self, path: &Path) -> impl Future<Output = Result<(), std::io::Error>> + Send {
67 tokio::fs::remove_file(path)
68 }
69
70 fn fs_create_dir_all(&self, path: &Path) -> impl Future<Output = Result<(), std::io::Error>> + Send {
71 tokio::fs::create_dir_all(path)
72 }
73
74 async fn fs_create_file(&self, path: &Path) -> Result<(), std::io::Error> {
75 tokio::fs::File::create(path).await.map(|_| ())
76 }
77
78 fn fs_write(&self, path: &Path, content: String) -> impl Future<Output = Result<(), std::io::Error>> + Send {
79 tokio::fs::write(path, content)
80 }
81
82 fn fs_read_to_string(&self, path: &Path) -> impl Future<Output = Result<String, std::io::Error>> + Send {
83 tokio::fs::read_to_string(path)
84 }
85
86 fn fs_rename(
87 &self,
88 source_path: &Path,
89 destination_path: &Path,
90 ) -> impl Future<Output = Result<(), std::io::Error>> + Send {
91 tokio::fs::rename(source_path, destination_path)
92 }
93
94 fn fs_remove_dir_all(&self, path: &Path) -> impl Future<Output = Result<(), std::io::Error>> + Send {
95 tokio::fs::remove_dir_all(path)
96 }
97
98 async fn fs_copy(&self, source_path: &Path, destination_path: &Path) -> Result<(), std::io::Error> {
99 tokio::fs::copy(source_path, destination_path).await.map(|_| ())
100 }
101
102 async fn fs_chown_all(&self, path: &Path, uid: u32, gid: u32) -> Result<(), std::io::Error> {
103 let path = path.to_owned();
104 match tokio::task::spawn_blocking(move || chown_all_blocking(&path, uid, gid)).await {
105 Ok(result) => result,
106 Err(_) => Err(std::io::Error::other("chown_all_blocking blocking task panicked")),
107 }
108 }
109
110 fn fs_hard_link(
111 &self,
112 source_path: &Path,
113 destination_path: &Path,
114 ) -> impl Future<Output = Result<(), std::io::Error>> + Send {
115 tokio::fs::hard_link(source_path, destination_path)
116 }
117
118 async fn fs_open_file_for_read(&self, path: &Path) -> Result<Self::File, std::io::Error> {
119 let mut open_options = tokio::fs::OpenOptions::new();
120 open_options.read(true);
121 let file = open_options.open(path).await?;
122 Ok(file.compat())
123 }
124
125 fn create_async_fd(&self, fd: OwnedFd) -> Result<Self::AsyncFd, std::io::Error> {
126 Ok(TokioRuntimeAsyncFd(AsyncFd::new(fd)?))
127 }
128
129 fn spawn_process(
130 &self,
131 program: &OsStr,
132 args: &[OsString],
133 stdout: bool,
134 stderr: bool,
135 stdin: bool,
136 ) -> Result<Self::Child, std::io::Error> {
137 let mut child = tokio::process::Command::new(program)
138 .args(args)
139 .stdout(get_stdio_from_piped(stdout))
140 .stderr(get_stdio_from_piped(stderr))
141 .stdin(get_stdio_from_piped(stdin))
142 .spawn()?;
143
144 let stdout = child.stdout.take().map(|stdout| stdout.compat());
145 let stderr = child.stderr.take().map(|stderr| stderr.compat());
146 let stdin = child.stdin.take().map(|stdin| stdin.compat_write());
147
148 Ok(TokioRuntimeChild {
149 child,
150 stdout,
151 stdin,
152 stderr,
153 })
154 }
155
156 fn run_process(
157 &self,
158 program: &OsStr,
159 args: &[OsString],
160 stdout: bool,
161 stderr: bool,
162 ) -> impl Future<Output = Result<Output, std::io::Error>> + Send {
163 tokio::process::Command::new(program)
164 .args(args)
165 .stdout(get_stdio_from_piped(stdout))
166 .stderr(get_stdio_from_piped(stderr))
167 .stdin(Stdio::null())
168 .output()
169 }
170}
171
172pub struct TokioRuntimeTask<O: Send + 'static>(JoinHandle<O>);
174
175impl<O: Send + 'static> RuntimeTask<O> for TokioRuntimeTask<O> {
176 fn cancel(self) -> impl Future<Output = Option<O>> {
177 self.0.abort();
178 self.join()
179 }
180
181 async fn join(self) -> Option<O> {
182 match self.0.await {
183 Ok(output) => Some(output),
184 Err(_) => None,
185 }
186 }
187
188 fn poll_join(&mut self, context: &mut Context) -> Poll<Option<O>> {
189 Pin::new(&mut self.0).poll(context).map(|r| r.ok())
190 }
191}
192
193pub struct TokioRuntimeAsyncFd(AsyncFd<OwnedFd>);
195
196impl RuntimeAsyncFd for TokioRuntimeAsyncFd {
197 async fn readable(&self) -> Result<(), std::io::Error> {
198 let mut guard = self.0.readable().await?;
199 guard.retain_ready();
200 Ok(())
201 }
202}
203
204#[derive(Debug)]
206pub struct TokioRuntimeChild {
207 child: Child,
208 stdout: Option<Compat<ChildStdout>>,
209 stdin: Option<Compat<ChildStdin>>,
210 stderr: Option<Compat<ChildStderr>>,
211}
212
213impl RuntimeChild for TokioRuntimeChild {
214 type Stdout = Compat<ChildStdout>;
215
216 type Stderr = Compat<ChildStderr>;
217
218 type Stdin = Compat<ChildStdin>;
219
220 fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>, std::io::Error> {
221 self.child.try_wait()
222 }
223
224 fn wait(&mut self) -> impl Future<Output = Result<std::process::ExitStatus, std::io::Error>> {
225 self.child.wait()
226 }
227
228 fn kill(&mut self) -> Result<(), std::io::Error> {
229 self.child.start_kill()
230 }
231
232 fn get_stdout(&mut self) -> &mut Option<Self::Stdout> {
233 &mut self.stdout
234 }
235
236 fn get_stdin(&mut self) -> &mut Option<Self::Stdin> {
237 &mut self.stdin
238 }
239
240 fn get_stderr(&mut self) -> &mut Option<Self::Stderr> {
241 &mut self.stderr
242 }
243
244 fn take_stdout(&mut self) -> Option<Self::Stdout> {
245 self.stdout.take()
246 }
247
248 fn take_stderr(&mut self) -> Option<Self::Stderr> {
249 self.stderr.take()
250 }
251
252 fn take_stdin(&mut self) -> Option<Self::Stdin> {
253 self.stdin.take()
254 }
255}