use crate::{Error, ErrorKind, Status, SystemHarness, SystemTerminal};
use serde::{Deserialize, Serialize};
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::process::Command;
use tokio::process::Child;
use std::process::Output;
use std::process::Stdio;
use std::pin::Pin;
use std::task::Poll;
fn strip_last_newline(input: &str) -> &str {
input
.strip_suffix("\r\n")
.or(input.strip_suffix("\n"))
.unwrap_or(input)
}
fn output_to_result(output: Output) -> Result<String, Error> {
match output.status.success() {
true => Ok(strip_last_newline(
std::str::from_utf8(&output.stdout)?
).to_string()),
false => {
let error = std::str::from_utf8(&output.stderr)?;
Err(Error::new(ErrorKind::HarnessError, error))
}
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct ContainerSystemConfig {
tool: String,
image: String,
}
impl ContainerSystemConfig {
pub async fn build(&self) -> Result<ContainerSystem, Error> {
let id = Command::new(&self.tool)
.arg("create")
.arg("-t")
.arg(&self.image)
.output()
.await
.map_err(|err| err.into())
.and_then(output_to_result)
.map_err(|err| { log::warn!("{err}"); err })?;
log::trace!("Created container: {id}");
Command::new(&self.tool)
.stdout(Stdio::null())
.arg("start")
.arg(&id)
.status().await?;
Ok(ContainerSystem {
id,
tool: self.tool.clone()
})
}
}
pub struct ContainerSystem {
tool: String,
id: String,
}
impl Drop for ContainerSystem {
fn drop(&mut self) {
if let Err(err) = std::process::Command::new(&self.tool)
.arg("rm")
.arg("-f")
.arg(&self.id)
.output() {
log::error!("{err}");
}
}
}
pub struct ContainerSystemTerminal {
process: Child,
}
#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
struct State {
running: bool,
paused: bool
}
#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
struct Inspect {
state: State
}
impl SystemTerminal for ContainerSystemTerminal {
async fn send_key(&mut self, _key: crate::Key) -> Result<(), Error> {
Err(Error::new(ErrorKind::HarnessError, "Sending a keystroke not supported"))
}
}
impl AsyncRead for ContainerSystemTerminal {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
if let Some(stdout) = &mut self.process.stdout {
Pin::new(stdout).poll_read(cx, buf)
} else {
Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Can't read from container")))
}
}
}
impl AsyncWrite for ContainerSystemTerminal {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
if let Some(stdin) = &mut self.as_mut().process.stdin {
Pin::new(stdin).poll_write(cx, buf)
} else {
Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Can't write to container")))
}
}
fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), std::io::Error>> {
if let Some(stdin) = &mut self.as_mut().process.stdin {
Pin::new(stdin).poll_flush(cx)
} else {
Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Can't flush write to container")))
}
}
fn poll_shutdown(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), std::io::Error>> {
if let Some(stdin) = &mut self.as_mut().process.stdin {
Pin::new(stdin).poll_flush(cx)
} else {
Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Can't shutdown write to container")))
}
}
}
impl<'sys> SystemHarness<'sys> for ContainerSystem {
type Terminal = ContainerSystemTerminal;
async fn terminal(&'sys mut self) -> Result<Self::Terminal, Error> {
let process = Command::new(&self.tool)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.arg("exec")
.arg("-it")
.arg(&self.id)
.arg("sh")
.spawn()?;
Ok(Self::Terminal {
process,
})
}
async fn pause(&mut self) -> Result<(), Error> {
log::trace!("Pausing container: {}", &self.id);
Command::new(&self.tool)
.arg("pause")
.arg(&self.id)
.output()
.await
.map_err(|err| err.into())
.and_then(output_to_result)
.map(|_| log::trace!("Paused container: {}", self.id))
}
async fn resume(&mut self) -> Result<(), Error> {
log::trace!("Resuming container: {}", &self.id);
Command::new(&self.tool)
.arg("unpause")
.arg(&self.id)
.output()
.await
.map_err(|err| err.into())
.and_then(output_to_result)
.map(|_| log::trace!("Resumed container: {}", self.id))
}
async fn shutdown(&mut self) -> Result<(), Error> {
log::trace!("Shutting down container: {}", &self.id);
Command::new(&self.tool)
.arg("stop")
.arg("--time")
.arg("1")
.arg(&self.id)
.output()
.await
.map_err(|err| err.into())
.and_then(output_to_result)
.map(|_| log::trace!("Stopped container: {}", self.id))
}
async fn status(&mut self) -> Result<Status, Error> {
Command::new(&self.tool)
.arg("inspect")
.arg(&self.id)
.output()
.await
.map_err(|err| err.into())
.and_then(output_to_result)
.map_err(|err| { log::warn!("{err}"); err })
.and_then(|stdout| {
let inspect: Vec<Inspect> = serde_json::from_str(&stdout)?;
inspect.into_iter()
.next()
.ok_or(Error::new(ErrorKind::HarnessError, "Container doesn't exist"))
.and_then(|inspect| {
let state = &inspect.state;
if state.running {
Ok(Status::Running)
} else if state.paused {
Ok(Status::Paused)
} else if !state.running && !state.paused {
Ok(Status::Shutdown)
} else {
Err(Error::new(ErrorKind::HarnessError,
format!("Unhandled status")))
}
})
})
}
async fn running(&mut self) -> Result<bool, Error> {
self.status().await.map(|status| status == Status::Running)
}
}