use crate::Workspace;
use crate::cmd::{Command, CommandError, ProcessLinesActions, ProcessOutput, ProcessStatistics};
use log::{error, info};
use serde::Deserialize;
use std::{
error::Error,
fmt,
ops::RangeInclusive,
path::{Path, PathBuf},
time::Duration,
};
#[derive(Debug)]
pub struct SandboxImage {
name: String,
}
impl SandboxImage {
pub fn local(name: &str) -> Result<Self, CommandError> {
let image = SandboxImage { name: name.into() };
info!("sandbox image is local, skipping pull");
image.ensure_exists_locally()?;
Ok(image)
}
pub fn remote(name: &str) -> Result<Self, CommandError> {
let mut image = SandboxImage { name: name.into() };
info!("pulling image {name} from Docker Hub");
Command::new_workspaceless("docker")
.args(&["pull", name])
.run()
.map_err(|e| CommandError::SandboxImagePullFailed(Box::new(e)))?;
if let Some(name_with_hash) = image.get_name_with_hash() {
image.name = name_with_hash;
info!("pulled image {}", image.name);
}
image.ensure_exists_locally()?;
Ok(image)
}
fn ensure_exists_locally(&self) -> Result<(), CommandError> {
info!("checking the image {} is available locally", self.name);
Command::new_workspaceless("docker")
.args(&["image", "inspect", &self.name])
.log_output(false)
.run()
.map_err(|e| CommandError::SandboxImageMissing(Box::new(e)))?;
Ok(())
}
pub fn get_name_with_hash(&self) -> Option<String> {
Command::new_workspaceless("docker")
.args(&[
"inspect",
&self.name,
"--format",
"{{index .RepoDigests 0}}",
])
.log_output(false)
.run_capture()
.ok()?
.stdout_lines()
.first()
.cloned()
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum MountKind {
ReadWrite,
ReadOnly,
}
#[derive(Clone)]
struct MountConfig {
host_path: PathBuf,
sandbox_path: PathBuf,
perm: MountKind,
}
impl MountConfig {
fn host_path(&self, workspace: &Workspace) -> Result<PathBuf, CommandError> {
if let Some(container) = workspace.current_container() {
let inside_container_path = crate::utils::normalize_path(&self.host_path);
for mount in container.mounts() {
let dest = crate::utils::normalize_path(Path::new(mount.destination()));
if let Ok(shared) = inside_container_path.strip_prefix(&dest) {
return Ok(Path::new(mount.source()).join(shared));
}
}
Err(CommandError::WorkspaceNotMountedCorrectly)
} else {
Ok(crate::utils::normalize_path(&self.host_path))
}
}
fn to_volume_arg(&self, workspace: &Workspace) -> Result<String, CommandError> {
let perm = match self.perm {
MountKind::ReadWrite => "rw",
MountKind::ReadOnly => "ro",
};
Ok(format!(
"{}:{}:{},Z",
self.host_path(workspace)?.to_string_lossy(),
self.sandbox_path.to_string_lossy(),
perm
))
}
fn to_mount_arg(&self, workspace: &Workspace) -> Result<String, CommandError> {
let mut opts_with_leading_comma = vec![];
if self.perm == MountKind::ReadOnly {
opts_with_leading_comma.push(",readonly");
}
Ok(format!(
"type=bind,src={},dst={}{}",
self.host_path(workspace)?.to_string_lossy(),
self.sandbox_path.to_string_lossy(),
opts_with_leading_comma.join(""),
))
}
}
#[derive(Clone)]
pub struct SandboxBuilder {
mounts: Vec<MountConfig>,
env: Vec<(String, String)>,
memory_limit: Option<usize>,
cpu_limit: Option<f32>,
cpuset_cpus: Option<RangeInclusive<usize>>,
workdir: Option<String>,
user: Option<String>,
cmd: Vec<String>,
enable_networking: bool,
}
impl SandboxBuilder {
pub fn new() -> Self {
Self {
mounts: Vec::new(),
env: Vec::new(),
workdir: None,
memory_limit: None,
cpu_limit: None,
cpuset_cpus: None,
user: None,
cmd: Vec::new(),
enable_networking: true,
}
}
pub fn mount(mut self, host_path: &Path, sandbox_path: &Path, kind: MountKind) -> Self {
self.mounts.push(MountConfig {
host_path: host_path.into(),
sandbox_path: sandbox_path.into(),
perm: kind,
});
self
}
pub fn memory_limit(mut self, limit: Option<usize>) -> Self {
self.memory_limit = limit;
self
}
pub fn cpu_limit(mut self, limit: Option<f32>) -> Self {
self.cpu_limit = limit;
self
}
pub fn cpuset_cpus(mut self, cpus: Option<RangeInclusive<usize>>) -> Self {
self.cpuset_cpus = cpus;
self
}
pub fn enable_networking(mut self, enable: bool) -> Self {
self.enable_networking = enable;
self
}
pub(super) fn env<S1: Into<String>, S2: Into<String>>(mut self, key: S1, value: S2) -> Self {
self.env.push((key.into(), value.into()));
self
}
pub(super) fn cmd(mut self, cmd: Vec<String>) -> Self {
self.cmd = cmd;
self
}
pub(super) fn workdir<S: Into<String>>(mut self, workdir: S) -> Self {
self.workdir = Some(workdir.into());
self
}
pub(super) fn user(mut self, user: u32, group: u32) -> Self {
self.user = Some(format!("{user}:{group}"));
self
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
skip_all,
fields(
image = %workspace.sandbox_image().name,
mounts = self.mounts.len(),
env = self.env.len(),
memory_limit = ?self.memory_limit,
cpu_limit = ?self.cpu_limit,
cpuset_cpus = ?self.cpuset_cpus,
enable_networking = self.enable_networking,
command = ?self.cmd,
)
)
)]
fn create(self, workspace: &Workspace) -> Result<Container<'_>, CommandError> {
let mut args: Vec<String> = vec!["create".into()];
for mount in &self.mounts {
std::fs::create_dir_all(&mount.host_path)?;
if cfg!(windows) {
args.push("--mount".into());
args.push(mount.to_mount_arg(workspace)?)
} else {
args.push("-v".into());
args.push(mount.to_volume_arg(workspace)?)
}
}
if let Some(limit) = self.memory_limit {
args.push("-m".into());
args.push(limit.to_string());
}
if let Some(limit) = self.cpu_limit {
args.push("--cpus".into());
args.push(limit.to_string());
}
if let Some(cpus) = self.cpuset_cpus {
args.push("--cpuset-cpus".into());
args.push(format_cpuset_cpus(&cpus));
}
if !self.enable_networking {
args.push("--network".into());
args.push("none".into());
}
if cfg!(windows) {
args.push("--isolation=process".into());
}
args.push(workspace.sandbox_image().name.clone());
args.push("sleep".into());
args.push("infinity".into());
let out = Command::new(workspace, "docker")
.args(&args)
.run_capture()
.map_err(|err| CommandError::SandboxContainerCreate(Box::new(err)))?;
Ok(Container {
id: out.stdout_lines()[0].clone(),
workspace,
cmd: self.cmd,
env: self.env,
workdir: self.workdir,
user: self.user,
})
}
#[allow(clippy::too_many_arguments)]
#[allow(clippy::type_complexity)]
#[cfg_attr(
feature = "tracing",
tracing::instrument(
skip_all,
fields(
image = %workspace.sandbox_image().name,
mounts = self.mounts.len(),
env = self.env.len(),
memory_limit = ?self.memory_limit,
cpu_limit = ?self.cpu_limit,
cpuset_cpus = ?self.cpuset_cpus,
enable_networking = self.enable_networking,
command = ?self.cmd,
capture,
timeout_secs = ?timeout.map(|timeout| timeout.as_secs()),
no_output_timeout_secs = ?no_output_timeout.map(|timeout| timeout.as_secs()),
)
)
)]
pub(super) fn run(
self,
workspace: &Workspace,
timeout: Option<Duration>,
no_output_timeout: Option<Duration>,
process_lines: Option<&mut dyn FnMut(&str, &mut ProcessLinesActions)>,
log_output: bool,
log_command: bool,
capture: bool,
) -> Result<ProcessOutput, CommandError> {
let container = self.create(workspace)?;
scopeguard::defer! {{
if let Err(err) = container.delete() {
error!("failed to delete container {}", container.id);
error!("caused by: {err}");
let mut err: &dyn Error = &err;
while let Some(cause) = err.source() {
error!("caused by: {cause}");
err = cause;
}
}
}}
container.run(
timeout,
no_output_timeout,
process_lines,
log_output,
log_command,
capture,
)
}
}
#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
struct InspectContainer {
state: InspectState,
}
#[derive(Deserialize)]
struct InspectState {
#[serde(rename = "OOMKilled")]
oom_killed: bool,
}
#[derive(Clone)]
struct Container<'w> {
id: String,
workspace: &'w Workspace,
cmd: Vec<String>,
env: Vec<(String, String)>,
workdir: Option<String>,
user: Option<String>,
}
impl fmt::Display for Container<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.id.fmt(f)
}
}
impl Container<'_> {
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn inspect(&self) -> Result<InspectContainer, CommandError> {
let output = Command::new(self.workspace, "docker")
.args(&["inspect", &self.id])
.log_output(false)
.run_capture()?;
let mut data: Vec<InspectContainer> =
::serde_json::from_str(&output.stdout_lines().join("\n"))
.map_err(CommandError::InvalidDockerInspectOutput)?;
assert_eq!(data.len(), 1);
Ok(data.pop().unwrap())
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn start(&self) -> Result<(), CommandError> {
Command::new(self.workspace, "docker")
.args(&["start", &self.id])
.log_output(false)
.run()
.map(|_| ())
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn stop(&self) -> Result<(), CommandError> {
Command::new(self.workspace, "docker")
.args(&["stop", "-t", "1", &self.id])
.log_output(false)
.run()
.map(|_| ())
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn exec_cat_file(&self, path: &str) -> Option<Vec<String>> {
Command::new(self.workspace, "docker")
.args(&["exec", &self.id, "cat", path])
.log_output(false)
.log_command(false)
.run_capture()
.ok()
.map(|o| o.stdout_lines().to_vec())
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn read_memory_peak(&self) -> Option<u64> {
let paths = [
"/sys/fs/cgroup/memory.peak", "/sys/fs/cgroup/memory/memory.max_usage_in_bytes", ];
for path in paths {
if let Some(val) = self
.exec_cat_file(path)
.and_then(|lines| lines.first()?.trim().parse::<u64>().ok())
{
return Some(val);
}
}
None
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn check_cgroup_oom(&self) -> bool {
let paths = [
"/sys/fs/cgroup/memory.events", "/sys/fs/cgroup/memory/memory.oom_control", ];
for path in paths {
if let Some(lines) = self.exec_cat_file(path) {
let found = lines.iter().any(|line| {
line.strip_prefix("oom_kill ")
.and_then(|rest| rest.trim().parse::<u64>().ok())
.is_some_and(|count| count > 0)
});
if found {
return true;
}
return false;
}
}
false
}
#[allow(clippy::type_complexity)]
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip_all, fields(container_id = %self.id, capture))
)]
fn run(
&self,
timeout: Option<Duration>,
no_output_timeout: Option<Duration>,
process_lines: Option<&mut dyn FnMut(&str, &mut ProcessLinesActions)>,
log_output: bool,
log_command: bool,
capture: bool,
) -> Result<ProcessOutput, CommandError> {
self.start()?;
let mut args: Vec<String> = vec!["exec".into()];
for (var, value) in &self.env {
args.push("-e".into());
args.push(format!("{var}={value}"));
}
if let Some(ref workdir) = self.workdir {
args.push("-w".into());
args.push(workdir.clone());
}
if let Some(ref user) = self.user {
args.push("--user".into());
args.push(user.clone());
}
args.push(self.id.clone());
args.extend(self.cmd.iter().cloned());
let mut cmd = Command::new(self.workspace, "docker")
.args(&args)
.timeout(timeout)
.log_output(log_output)
.log_command(log_command)
.no_output_timeout(no_output_timeout);
if let Some(f) = process_lines {
cmd = cmd.process_lines(f);
}
let res = cmd.run_inner(capture);
let memory_peak = self.read_memory_peak();
let cgroup_oom = self.check_cgroup_oom();
let _ = self.stop();
let details = self.inspect()?;
if details.state.oom_killed || cgroup_oom {
Err(match res {
Ok(_) | Err(CommandError::ExecutionFailed { .. }) => CommandError::SandboxOOM,
Err(err) => err,
})
} else {
res.map(|mut output| {
output.statistics = ProcessStatistics { memory_peak };
output
})
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn delete(&self) -> Result<(), CommandError> {
Command::new(self.workspace, "docker")
.args(&["rm", "-f", &self.id])
.run()
.map(|_| ())
}
}
pub fn docker_running(workspace: &Workspace) -> bool {
info!("checking if the docker daemon is running");
Command::new(workspace, "docker")
.args(&["info"])
.log_output(false)
.run()
.is_ok()
}
fn format_cpuset_cpus(cpus: &RangeInclusive<usize>) -> String {
format!("{}-{}", cpus.start(), cpus.end())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn formats_cpuset_cpus() {
assert_eq!(format_cpuset_cpus(&(2..=4)), "2-4");
}
}