fctools/runtime/
tokio.rs

1//! A runtime implementation using Tokio's different features for all of its components.
2
3use std::{
4    ffi::{OsStr, OsString},
5    future::Future,
6    os::fd::OwnedFd,
7    path::Path,
8    pin::Pin,
9    process::{Output, Stdio},
10    task::{Context, Poll},
11    time::Duration,
12};
13
14use tokio::{
15    io::unix::AsyncFd,
16    process::{Child, ChildStderr, ChildStdin, ChildStdout},
17    task::JoinHandle,
18};
19use tokio_util::compat::{Compat, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
20
21use super::{
22    util::{chown_all_blocking, get_stdio_from_piped},
23    Runtime, RuntimeAsyncFd, RuntimeChild, RuntimeTask,
24};
25
26/// The [Runtime] implementation backed by the [tokio] crate. Since [tokio] heavily utilizes thread-local
27/// storage, this struct is zero-sized and doesn't store anything.
28#[derive(Clone)]
29pub struct TokioRuntime;
30
31impl Runtime for TokioRuntime {
32    type Task<O: Send + 'static> = TokioRuntimeTask<O>;
33    type TimeoutError = tokio::time::error::Elapsed;
34    type File = Compat<tokio::fs::File>;
35    type AsyncFd = TokioRuntimeAsyncFd;
36    type Child = TokioRuntimeChild;
37
38    #[cfg(feature = "vmm-process")]
39    #[cfg_attr(docsrs, doc(cfg(feature = "vmm-process")))]
40    type SocketBackend = hyper_client_sockets::tokio::TokioBackend;
41
42    fn spawn_task<F>(&self, future: F) -> Self::Task<F::Output>
43    where
44        F: Future + Send + 'static,
45        F::Output: Send + 'static,
46    {
47        TokioRuntimeTask(tokio::task::spawn(future))
48    }
49
50    fn timeout<F>(
51        &self,
52        duration: Duration,
53        future: F,
54    ) -> impl Future<Output = Result<F::Output, Self::TimeoutError>> + Send
55    where
56        F: Future + Send,
57        F::Output: Send,
58    {
59        tokio::time::timeout(duration, future)
60    }
61
62    fn fs_exists(&self, path: &Path) -> impl Future<Output = Result<bool, std::io::Error>> + Send {
63        tokio::fs::try_exists(path)
64    }
65
66    fn fs_remove_file(&self, path: &Path) -> impl Future<Output = Result<(), std::io::Error>> + Send {
67        tokio::fs::remove_file(path)
68    }
69
70    fn fs_create_dir_all(&self, path: &Path) -> impl Future<Output = Result<(), std::io::Error>> + Send {
71        tokio::fs::create_dir_all(path)
72    }
73
74    async fn fs_create_file(&self, path: &Path) -> Result<(), std::io::Error> {
75        tokio::fs::File::create(path).await.map(|_| ())
76    }
77
78    fn fs_write(&self, path: &Path, content: String) -> impl Future<Output = Result<(), std::io::Error>> + Send {
79        tokio::fs::write(path, content)
80    }
81
82    fn fs_read_to_string(&self, path: &Path) -> impl Future<Output = Result<String, std::io::Error>> + Send {
83        tokio::fs::read_to_string(path)
84    }
85
86    fn fs_rename(
87        &self,
88        source_path: &Path,
89        destination_path: &Path,
90    ) -> impl Future<Output = Result<(), std::io::Error>> + Send {
91        tokio::fs::rename(source_path, destination_path)
92    }
93
94    fn fs_remove_dir_all(&self, path: &Path) -> impl Future<Output = Result<(), std::io::Error>> + Send {
95        tokio::fs::remove_dir_all(path)
96    }
97
98    async fn fs_copy(&self, source_path: &Path, destination_path: &Path) -> Result<(), std::io::Error> {
99        tokio::fs::copy(source_path, destination_path).await.map(|_| ())
100    }
101
102    async fn fs_chown_all(&self, path: &Path, uid: u32, gid: u32) -> Result<(), std::io::Error> {
103        let path = path.to_owned();
104        match tokio::task::spawn_blocking(move || chown_all_blocking(&path, uid, gid)).await {
105            Ok(result) => result,
106            Err(_) => Err(std::io::Error::other("chown_all_blocking blocking task panicked")),
107        }
108    }
109
110    fn fs_hard_link(
111        &self,
112        source_path: &Path,
113        destination_path: &Path,
114    ) -> impl Future<Output = Result<(), std::io::Error>> + Send {
115        tokio::fs::hard_link(source_path, destination_path)
116    }
117
118    async fn fs_open_file_for_read(&self, path: &Path) -> Result<Self::File, std::io::Error> {
119        let mut open_options = tokio::fs::OpenOptions::new();
120        open_options.read(true);
121        let file = open_options.open(path).await?;
122        Ok(file.compat())
123    }
124
125    fn create_async_fd(&self, fd: OwnedFd) -> Result<Self::AsyncFd, std::io::Error> {
126        Ok(TokioRuntimeAsyncFd(AsyncFd::new(fd)?))
127    }
128
129    fn spawn_process(
130        &self,
131        program: &OsStr,
132        args: &[OsString],
133        stdout: bool,
134        stderr: bool,
135        stdin: bool,
136    ) -> Result<Self::Child, std::io::Error> {
137        let mut child = tokio::process::Command::new(program)
138            .args(args)
139            .stdout(get_stdio_from_piped(stdout))
140            .stderr(get_stdio_from_piped(stderr))
141            .stdin(get_stdio_from_piped(stdin))
142            .spawn()?;
143
144        let stdout = child.stdout.take().map(|stdout| stdout.compat());
145        let stderr = child.stderr.take().map(|stderr| stderr.compat());
146        let stdin = child.stdin.take().map(|stdin| stdin.compat_write());
147
148        Ok(TokioRuntimeChild {
149            child,
150            stdout,
151            stdin,
152            stderr,
153        })
154    }
155
156    fn run_process(
157        &self,
158        program: &OsStr,
159        args: &[OsString],
160        stdout: bool,
161        stderr: bool,
162    ) -> impl Future<Output = Result<Output, std::io::Error>> + Send {
163        tokio::process::Command::new(program)
164            .args(args)
165            .stdout(get_stdio_from_piped(stdout))
166            .stderr(get_stdio_from_piped(stderr))
167            .stdin(Stdio::null())
168            .output()
169    }
170}
171
172/// The [RuntimeTask] implementation for the [TokioRuntime].
173pub struct TokioRuntimeTask<O: Send + 'static>(JoinHandle<O>);
174
175impl<O: Send + 'static> RuntimeTask<O> for TokioRuntimeTask<O> {
176    fn cancel(self) -> impl Future<Output = Option<O>> {
177        self.0.abort();
178        self.join()
179    }
180
181    async fn join(self) -> Option<O> {
182        match self.0.await {
183            Ok(output) => Some(output),
184            Err(_) => None,
185        }
186    }
187
188    fn poll_join(&mut self, context: &mut Context) -> Poll<Option<O>> {
189        Pin::new(&mut self.0).poll(context).map(|r| r.ok())
190    }
191}
192
193/// The [RuntimeAsyncFd] implementation for the [TokioRuntime].
194pub struct TokioRuntimeAsyncFd(AsyncFd<OwnedFd>);
195
196impl RuntimeAsyncFd for TokioRuntimeAsyncFd {
197    async fn readable(&self) -> Result<(), std::io::Error> {
198        let mut guard = self.0.readable().await?;
199        guard.retain_ready();
200        Ok(())
201    }
202}
203
204/// The [RuntimeChild] implementation for the [TokioRuntime].
205#[derive(Debug)]
206pub struct TokioRuntimeChild {
207    child: Child,
208    stdout: Option<Compat<ChildStdout>>,
209    stdin: Option<Compat<ChildStdin>>,
210    stderr: Option<Compat<ChildStderr>>,
211}
212
213impl RuntimeChild for TokioRuntimeChild {
214    type Stdout = Compat<ChildStdout>;
215
216    type Stderr = Compat<ChildStderr>;
217
218    type Stdin = Compat<ChildStdin>;
219
220    fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>, std::io::Error> {
221        self.child.try_wait()
222    }
223
224    fn wait(&mut self) -> impl Future<Output = Result<std::process::ExitStatus, std::io::Error>> {
225        self.child.wait()
226    }
227
228    fn kill(&mut self) -> Result<(), std::io::Error> {
229        self.child.start_kill()
230    }
231
232    fn get_stdout(&mut self) -> &mut Option<Self::Stdout> {
233        &mut self.stdout
234    }
235
236    fn get_stdin(&mut self) -> &mut Option<Self::Stdin> {
237        &mut self.stdin
238    }
239
240    fn get_stderr(&mut self) -> &mut Option<Self::Stderr> {
241        &mut self.stderr
242    }
243
244    fn take_stdout(&mut self) -> Option<Self::Stdout> {
245        self.stdout.take()
246    }
247
248    fn take_stderr(&mut self) -> Option<Self::Stderr> {
249        self.stderr.take()
250    }
251
252    fn take_stdin(&mut self) -> Option<Self::Stdin> {
253        self.stdin.take()
254    }
255}