system_harness/
container.rs

1use crate::{Error, ErrorKind, Status, SystemHarness, SystemTerminal};
2use serde::{Deserialize, Serialize};
3use tokio::io::AsyncRead;
4use tokio::io::AsyncWrite;
5use tokio::process::Command;
6use tokio::process::Child;
7use std::process::Output;
8use std::process::Stdio;
9use std::pin::Pin;
10use std::task::Poll;
11
12fn strip_last_newline(input: &str) -> &str {
13    input
14        .strip_suffix("\r\n")
15        .or(input.strip_suffix("\n"))
16        .unwrap_or(input)
17}
18
19/// Process output to result
20fn output_to_result(output: Output) -> Result<String, Error> {
21    match output.status.success() {
22        true => Ok(strip_last_newline(
23                std::str::from_utf8(&output.stdout)?
24        ).to_string()),
25        false => {
26            let error = std::str::from_utf8(&output.stderr)?;
27            Err(Error::new(ErrorKind::HarnessError, error))
28        }
29    }
30}
31
32/// A container system config
33#[derive(Clone, Serialize, Deserialize)]
34pub struct ContainerSystemConfig {
35
36    /// Container runtime
37    tool: String,
38
39    /// Container image
40    image: String,
41
42}
43
44impl ContainerSystemConfig {
45
46    /// Build and run a container based on name
47    pub async fn build(&self) -> Result<ContainerSystem, Error> {
48        let id = Command::new(&self.tool)
49            .arg("create")
50            .arg("-t") 
51            .arg(&self.image)
52            .output()
53            .await
54            .map_err(|err| err.into())
55            .and_then(output_to_result)
56            .map_err(|err| { log::warn!("{err}"); err })?;
57        log::trace!("Created container: {id}");
58
59        Command::new(&self.tool)
60            .stdout(Stdio::null())
61            .arg("start")
62            .arg(&id)
63            .status().await?;
64
65        Ok(ContainerSystem {
66            id,
67            tool: self.tool.clone()
68        })
69    }
70
71}
72
73pub struct ContainerSystem {
74    tool: String,
75    id: String,
76}
77
78impl Drop for ContainerSystem {
79    fn drop(&mut self) {
80        if let Err(err) = std::process::Command::new(&self.tool)
81            .arg("rm")
82            .arg("-f")
83            .arg(&self.id)
84            .output() {
85            log::error!("{err}");
86        }
87    }
88}
89
90pub struct ContainerSystemTerminal {
91    process: Child,
92}
93
94#[derive(Deserialize)]
95#[serde(rename_all = "PascalCase")]
96struct State {
97    running: bool,
98    paused: bool
99}
100
101#[derive(Deserialize)]
102#[serde(rename_all = "PascalCase")]
103struct Inspect {
104    state: State
105}
106
107impl SystemTerminal for ContainerSystemTerminal {
108
109    async fn send_key(&mut self, _key: crate::Key) -> Result<(), Error> {
110        Err(Error::new(ErrorKind::HarnessError, "Sending a keystroke not supported"))
111    }
112
113}
114
115impl AsyncRead for ContainerSystemTerminal {
116
117    fn poll_read(
118        mut self: std::pin::Pin<&mut Self>,
119        cx: &mut std::task::Context<'_>,
120        buf: &mut tokio::io::ReadBuf<'_>,
121    ) -> Poll<std::io::Result<()>> {
122        if let Some(stdout) = &mut self.process.stdout {
123            Pin::new(stdout).poll_read(cx, buf)
124        } else {
125            Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Can't read from container")))
126        }
127    }
128}
129
130impl AsyncWrite for ContainerSystemTerminal {
131
132    fn poll_write(
133        mut self: std::pin::Pin<&mut Self>,
134        cx: &mut std::task::Context<'_>,
135        buf: &[u8],
136    ) -> Poll<Result<usize, std::io::Error>> {
137        if let Some(stdin) = &mut self.as_mut().process.stdin {
138            Pin::new(stdin).poll_write(cx, buf)
139        } else {
140            Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Can't write to container")))
141        }
142    }
143
144    fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), std::io::Error>> {
145        if let Some(stdin) = &mut self.as_mut().process.stdin {
146            Pin::new(stdin).poll_flush(cx)
147        } else {
148            Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Can't flush write to container")))
149        }
150    }
151
152    fn poll_shutdown(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), std::io::Error>> {
153        if let Some(stdin) = &mut self.as_mut().process.stdin {
154            Pin::new(stdin).poll_flush(cx)
155        } else {
156            Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Can't shutdown write to container")))
157        }
158    }
159}
160
161
162
163impl<'sys> SystemHarness<'sys> for ContainerSystem {
164
165    type Terminal = ContainerSystemTerminal;
166
167    async fn terminal(&'sys mut self) -> Result<Self::Terminal, Error> {
168        let process = Command::new(&self.tool)
169            .stdin(Stdio::piped())
170            .stdout(Stdio::piped())
171            .arg("exec")
172            .arg("-it")
173            .arg(&self.id)
174            .arg("sh")
175            .spawn()?;
176        Ok(Self::Terminal {
177            process,
178        })
179    }
180
181    async fn pause(&mut self) -> Result<(), Error> {
182        log::trace!("Pausing container: {}", &self.id); 
183        Command::new(&self.tool)
184            .arg("pause")
185            .arg(&self.id)
186            .output()
187            .await
188            .map_err(|err| err.into())
189            .and_then(output_to_result)
190            .map(|_| log::trace!("Paused container: {}", self.id))
191    }
192
193    async fn resume(&mut self) -> Result<(), Error> {
194        log::trace!("Resuming container: {}", &self.id); 
195        Command::new(&self.tool)
196            .arg("unpause")
197            .arg(&self.id)
198            .output()
199            .await
200            .map_err(|err| err.into())
201            .and_then(output_to_result)
202            .map(|_| log::trace!("Resumed container: {}", self.id))
203    }
204
205    async fn shutdown(&mut self) -> Result<(), Error> {
206        log::trace!("Shutting down container: {}", &self.id); 
207        Command::new(&self.tool)
208            .arg("stop")
209            .arg("--time")
210            .arg("1")
211            .arg(&self.id)
212            .output()
213            .await
214            .map_err(|err| err.into())
215            .and_then(output_to_result)
216            .map(|_| log::trace!("Stopped container: {}", self.id))
217    }
218
219    async fn status(&mut self) -> Result<Status, Error> {
220        Command::new(&self.tool)
221            .arg("inspect")
222            .arg(&self.id)
223            .output()
224            .await
225            .map_err(|err| err.into())
226            .and_then(output_to_result)
227            .map_err(|err| { log::warn!("{err}"); err })
228            .and_then(|stdout| {
229                let inspect: Vec<Inspect> = serde_json::from_str(&stdout)?;
230                inspect.into_iter()
231                    .next()
232                    .ok_or(Error::new(ErrorKind::HarnessError, "Container doesn't exist"))
233                    .and_then(|inspect| {
234                        let state = &inspect.state;
235                        if state.running {
236                            Ok(Status::Running)
237                        } else if state.paused {
238                            Ok(Status::Paused)
239                        } else if !state.running && !state.paused {
240                            Ok(Status::Shutdown)
241                        } else {
242                            Err(Error::new(ErrorKind::HarnessError,
243                                    format!("Unhandled status")))
244                        }
245                    })
246            })
247    }
248
249    async fn running(&mut self) -> Result<bool, Error> {
250        self.status().await.map(|status| status == Status::Running)
251    }
252
253}