system-harness 1.0.0

An system harness abstraction and configuration serialization provider for virtualization and emulation systems
Documentation
use crate::{Error, ErrorKind, Status, SystemHarness, SystemTerminal};
use serde::{Deserialize, Serialize};
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::process::Command;
use tokio::process::Child;
use std::process::Output;
use std::process::Stdio;
use std::pin::Pin;
use std::task::Poll;

fn strip_last_newline(input: &str) -> &str {
    input
        .strip_suffix("\r\n")
        .or(input.strip_suffix("\n"))
        .unwrap_or(input)
}

/// Process output to result
fn output_to_result(output: Output) -> Result<String, Error> {
    match output.status.success() {
        true => Ok(strip_last_newline(
                std::str::from_utf8(&output.stdout)?
        ).to_string()),
        false => {
            let error = std::str::from_utf8(&output.stderr)?;
            Err(Error::new(ErrorKind::HarnessError, error))
        }
    }
}

/// A container system config
#[derive(Clone, Serialize, Deserialize)]
pub struct ContainerSystemConfig {

    /// Container runtime
    tool: String,

    /// Container image
    image: String,

}

impl ContainerSystemConfig {

    /// Build and run a container based on name
    pub async fn build(&self) -> Result<ContainerSystem, Error> {
        let id = Command::new(&self.tool)
            .arg("create")
            .arg("-t") 
            .arg(&self.image)
            .output()
            .await
            .map_err(|err| err.into())
            .and_then(output_to_result)
            .map_err(|err| { log::warn!("{err}"); err })?;
        log::trace!("Created container: {id}");

        Command::new(&self.tool)
            .stdout(Stdio::null())
            .arg("start")
            .arg(&id)
            .status().await?;

        Ok(ContainerSystem {
            id,
            tool: self.tool.clone()
        })
    }

}

pub struct ContainerSystem {
    tool: String,
    id: String,
}

impl Drop for ContainerSystem {
    fn drop(&mut self) {
        if let Err(err) = std::process::Command::new(&self.tool)
            .arg("rm")
            .arg("-f")
            .arg(&self.id)
            .output() {
            log::error!("{err}");
        }
    }
}

pub struct ContainerSystemTerminal {
    process: Child,
}

#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
struct State {
    running: bool,
    paused: bool
}

#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
struct Inspect {
    state: State
}

impl SystemTerminal for ContainerSystemTerminal {

    async fn send_key(&mut self, _key: crate::Key) -> Result<(), Error> {
        Err(Error::new(ErrorKind::HarnessError, "Sending a keystroke not supported"))
    }

}

impl AsyncRead for ContainerSystemTerminal {

    fn poll_read(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        if let Some(stdout) = &mut self.process.stdout {
            Pin::new(stdout).poll_read(cx, buf)
        } else {
            Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Can't read from container")))
        }
    }
}

impl AsyncWrite for ContainerSystemTerminal {

    fn poll_write(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, std::io::Error>> {
        if let Some(stdin) = &mut self.as_mut().process.stdin {
            Pin::new(stdin).poll_write(cx, buf)
        } else {
            Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Can't write to container")))
        }
    }

    fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), std::io::Error>> {
        if let Some(stdin) = &mut self.as_mut().process.stdin {
            Pin::new(stdin).poll_flush(cx)
        } else {
            Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Can't flush write to container")))
        }
    }

    fn poll_shutdown(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), std::io::Error>> {
        if let Some(stdin) = &mut self.as_mut().process.stdin {
            Pin::new(stdin).poll_flush(cx)
        } else {
            Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Can't shutdown write to container")))
        }
    }
}



impl<'sys> SystemHarness<'sys> for ContainerSystem {

    type Terminal = ContainerSystemTerminal;

    async fn terminal(&'sys mut self) -> Result<Self::Terminal, Error> {
        let process = Command::new(&self.tool)
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .arg("exec")
            .arg("-it")
            .arg(&self.id)
            .arg("sh")
            .spawn()?;
        Ok(Self::Terminal {
            process,
        })
    }

    async fn pause(&mut self) -> Result<(), Error> {
        log::trace!("Pausing container: {}", &self.id); 
        Command::new(&self.tool)
            .arg("pause")
            .arg(&self.id)
            .output()
            .await
            .map_err(|err| err.into())
            .and_then(output_to_result)
            .map(|_| log::trace!("Paused container: {}", self.id))
    }

    async fn resume(&mut self) -> Result<(), Error> {
        log::trace!("Resuming container: {}", &self.id); 
        Command::new(&self.tool)
            .arg("unpause")
            .arg(&self.id)
            .output()
            .await
            .map_err(|err| err.into())
            .and_then(output_to_result)
            .map(|_| log::trace!("Resumed container: {}", self.id))
    }

    async fn shutdown(&mut self) -> Result<(), Error> {
        log::trace!("Shutting down container: {}", &self.id); 
        Command::new(&self.tool)
            .arg("stop")
            .arg("--time")
            .arg("1")
            .arg(&self.id)
            .output()
            .await
            .map_err(|err| err.into())
            .and_then(output_to_result)
            .map(|_| log::trace!("Stopped container: {}", self.id))
    }

    async fn status(&mut self) -> Result<Status, Error> {
        Command::new(&self.tool)
            .arg("inspect")
            .arg(&self.id)
            .output()
            .await
            .map_err(|err| err.into())
            .and_then(output_to_result)
            .map_err(|err| { log::warn!("{err}"); err })
            .and_then(|stdout| {
                let inspect: Vec<Inspect> = serde_json::from_str(&stdout)?;
                inspect.into_iter()
                    .next()
                    .ok_or(Error::new(ErrorKind::HarnessError, "Container doesn't exist"))
                    .and_then(|inspect| {
                        let state = &inspect.state;
                        if state.running {
                            Ok(Status::Running)
                        } else if state.paused {
                            Ok(Status::Paused)
                        } else if !state.running && !state.paused {
                            Ok(Status::Shutdown)
                        } else {
                            Err(Error::new(ErrorKind::HarnessError,
                                    format!("Unhandled status")))
                        }
                    })
            })
    }

    async fn running(&mut self) -> Result<bool, Error> {
        self.status().await.map(|status| status == Status::Running)
    }

}