use crate::{
Workspace,
cmd::{Command, CommandError, ProcessLinesActions, ProcessOutput, container_dirs},
};
use log::{error, info};
use serde::Deserialize;
use std::{
cell::RefCell,
ffi::OsString,
fmt, mem,
ops::RangeInclusive,
path::{Path, PathBuf},
rc::Rc,
time::Duration,
};
#[derive(Debug)]
pub struct SandboxImage {
name: String,
}
impl SandboxImage {
pub fn local(name: &str) -> Result<Self, CommandError> {
let image = SandboxImage { name: name.into() };
info!("sandbox image is local, skipping pull");
image.ensure_exists_locally()?;
Ok(image)
}
pub fn remote(name: &str) -> Result<Self, CommandError> {
let mut image = SandboxImage { name: name.into() };
info!("pulling image {name} from Docker Hub");
Command::new_workspaceless("docker")
.args(["pull", name])
.run()
.map_err(|e| CommandError::SandboxImagePullFailed(Box::new(e)))?;
if let Some(name_with_hash) = image.get_name_with_hash() {
image.name = name_with_hash;
info!("pulled image {}", image.name);
}
image.ensure_exists_locally()?;
Ok(image)
}
fn ensure_exists_locally(&self) -> Result<(), CommandError> {
info!("checking the image {} is available locally", self.name);
Command::new_workspaceless("docker")
.args(["image", "inspect", &self.name])
.log_output(false)
.run()
.map_err(|e| CommandError::SandboxImageMissing(Box::new(e)))?;
Ok(())
}
pub fn get_name_with_hash(&self) -> Option<String> {
Command::new_workspaceless("docker")
.args([
"inspect",
&self.name,
"--format",
"{{index .RepoDigests 0}}",
])
.log_output(false)
.run_capture()
.ok()?
.stdout_lines()
.first()
.cloned()
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum MountKind {
ReadWrite,
ReadOnly,
}
#[derive(Clone)]
struct MountConfig {
host_path: PathBuf,
sandbox_path: PathBuf,
perm: MountKind,
}
impl MountConfig {
fn host_path(&self, workspace: &Workspace) -> Result<PathBuf, CommandError> {
if let Some(container) = workspace.current_container() {
let inside_container_path = crate::utils::normalize_path(&self.host_path);
for mount in container.mounts() {
let dest = crate::utils::normalize_path(Path::new(mount.destination()));
if let Ok(shared) = inside_container_path.strip_prefix(&dest) {
return Ok(Path::new(mount.source()).join(shared));
}
}
Err(CommandError::WorkspaceNotMountedCorrectly)
} else {
Ok(crate::utils::normalize_path(&self.host_path))
}
}
fn to_volume_arg(&self, workspace: &Workspace) -> Result<String, CommandError> {
let perm = match self.perm {
MountKind::ReadWrite => "rw",
MountKind::ReadOnly => "ro",
};
Ok(format!(
"{}:{}:{},Z",
self.host_path(workspace)?.to_string_lossy(),
self.sandbox_path.to_string_lossy(),
perm
))
}
fn to_mount_arg(&self, workspace: &Workspace) -> Result<String, CommandError> {
let mut opts_with_leading_comma = vec![];
if self.perm == MountKind::ReadOnly {
opts_with_leading_comma.push(",readonly");
}
Ok(format!(
"type=bind,src={},dst={}{}",
self.host_path(workspace)?.to_string_lossy(),
self.sandbox_path.to_string_lossy(),
opts_with_leading_comma.join(""),
))
}
}
#[derive(Clone)]
pub struct SandboxBuilder {
mounts: Vec<MountConfig>,
source_dir_mount_kind: MountKind,
memory_limit: Option<usize>,
cpu_limit: Option<f32>,
cpuset_cpus: Option<RangeInclusive<usize>>,
enable_networking: bool,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct SandboxStatistics {
memory_peak: Option<u64>,
}
impl SandboxStatistics {
pub fn memory_peak_bytes(&self) -> Option<u64> {
self.memory_peak
}
pub fn combine(self, other: Self) -> Self {
Self {
memory_peak: match (self.memory_peak, other.memory_peak) {
(Some(a), Some(b)) => Some(a.max(b)),
(a, b) => a.or(b),
},
}
}
pub fn merge(&mut self, other: Self) {
*self = mem::take(self).combine(other);
}
}
#[derive(Debug, Default)]
pub(crate) struct SandboxStatisticsState {
statistics: RefCell<SandboxStatistics>,
}
impl SandboxStatisticsState {
pub(crate) fn snapshot(&self) -> SandboxStatistics {
self.statistics.borrow().clone()
}
fn merge(&self, statistics: SandboxStatistics) {
self.statistics.borrow_mut().merge(statistics);
}
}
pub struct Sandbox<'w> {
workspace: &'w Workspace,
builder: SandboxBuilder,
source_dir: PathBuf,
target_dir: PathBuf,
container: Option<Container<'w>>,
statistics: Rc<SandboxStatisticsState>,
}
pub(crate) struct SandboxCommand {
pub(crate) cmd: Vec<OsString>,
pub(crate) env: Vec<(OsString, OsString)>,
pub(crate) workdir: Option<PathBuf>,
pub(crate) user: Option<String>,
}
impl SandboxCommand {
pub(crate) fn new(program: impl Into<OsString>) -> SandboxCommand {
Self {
cmd: vec![program.into()],
env: Vec::new(),
workdir: None,
user: None,
}
}
pub(crate) fn user(mut self, user: u32, group: u32) -> Self {
self.user = Some(format!("{user}:{group}"));
self
}
pub(crate) fn workdir(mut self, workdir: impl AsRef<Path>) -> Self {
self.workdir = Some(crate::utils::normalize_path(workdir.as_ref()));
self
}
pub(crate) fn env(mut self, k: impl Into<OsString>, v: impl Into<OsString>) -> Self {
self.env.push((k.into(), v.into()));
self
}
pub(crate) fn arg(mut self, arg: impl Into<OsString>) -> Self {
self.cmd.push(arg.into());
self
}
pub(crate) fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<OsString>,
{
for arg in args {
self = self.arg(arg);
}
self
}
}
impl SandboxBuilder {
pub fn new() -> Self {
Self {
mounts: Vec::new(),
source_dir_mount_kind: MountKind::ReadOnly,
memory_limit: None,
cpu_limit: None,
cpuset_cpus: None,
enable_networking: true,
}
}
pub fn mount(mut self, host_path: &Path, sandbox_path: &Path, kind: MountKind) -> Self {
self.mounts.push(MountConfig {
host_path: host_path.into(),
sandbox_path: sandbox_path.into(),
perm: kind,
});
self
}
pub fn source_dir_mount_kind(mut self, mount_kind: MountKind) -> Self {
self.source_dir_mount_kind = mount_kind;
self
}
pub fn memory_limit(mut self, limit: Option<usize>) -> Self {
self.memory_limit = limit;
self
}
pub fn cpu_limit(mut self, limit: Option<f32>) -> Self {
self.cpu_limit = limit;
self
}
pub fn cpuset_cpus(mut self, cpus: Option<RangeInclusive<usize>>) -> Self {
self.cpuset_cpus = cpus;
self
}
pub fn enable_networking(mut self, enable: bool) -> Self {
self.enable_networking = enable;
self
}
pub fn start<'w>(
self,
workspace: &'w Workspace,
source_dir: impl AsRef<Path>,
target_dir: impl AsRef<Path>,
) -> Result<Sandbox<'w>, CommandError> {
self.start_with_statistics(
workspace,
source_dir,
target_dir,
Rc::new(SandboxStatisticsState::default()),
)
}
pub(crate) fn start_with_statistics<'w>(
self,
workspace: &'w Workspace,
source_dir: impl AsRef<Path>,
target_dir: impl AsRef<Path>,
statistics: Rc<SandboxStatisticsState>,
) -> Result<Sandbox<'w>, CommandError> {
let source_dir = crate::utils::normalize_path(source_dir.as_ref());
let target_dir = crate::utils::normalize_path(target_dir.as_ref());
let container = Sandbox::create_container(&self, workspace, &source_dir, &target_dir)?;
Ok(Sandbox {
workspace,
builder: self,
source_dir,
target_dir,
container: Some(container),
statistics,
})
}
fn create_started(self, workspace: &Workspace) -> Result<Container<'_>, CommandError> {
let mut container = self.create(workspace)?;
container.start()?;
container.record_oom_kill_count();
Ok(container)
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
skip_all,
fields(
image = %workspace.sandbox_image().name,
mounts = self.mounts.len(),
memory_limit = ?self.memory_limit,
cpu_limit = ?self.cpu_limit,
cpuset_cpus = ?self.cpuset_cpus,
enable_networking = self.enable_networking,
)
)
)]
fn create(self, workspace: &Workspace) -> Result<Container<'_>, CommandError> {
let mut args: Vec<String> = vec!["create".into()];
for mount in &self.mounts {
std::fs::create_dir_all(&mount.host_path)?;
if cfg!(windows) {
args.push("--mount".into());
args.push(mount.to_mount_arg(workspace)?)
} else {
args.push("-v".into());
args.push(mount.to_volume_arg(workspace)?)
}
}
if let Some(limit) = self.memory_limit {
args.push("-m".into());
args.push(limit.to_string());
}
if let Some(limit) = self.cpu_limit {
args.push("--cpus".into());
args.push(limit.to_string());
}
if let Some(cpus) = self.cpuset_cpus {
args.push("--cpuset-cpus".into());
args.push(format_cpuset_cpus(&cpus));
}
if !self.enable_networking {
args.push("--network".into());
args.push("none".into());
}
if cfg!(windows) {
args.push("--isolation=process".into());
}
args.push(workspace.sandbox_image().name.clone());
args.push("sleep".into());
args.push("infinity".into());
let out = Command::new(workspace, "docker")
.args(&args)
.run_capture()
.map_err(|err| CommandError::SandboxContainerCreate(Box::new(err)))?;
Ok(Container {
id: Some(out.stdout_lines()[0].clone()),
workspace,
running: true,
oom_killed: false,
oom_kill_count: None,
})
}
}
#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
struct InspectContainer {
state: InspectState,
}
#[derive(Deserialize)]
struct InspectState {
#[serde(rename = "OOMKilled")]
oom_killed: bool,
#[serde(rename = "Running")]
running: bool,
}
struct Container<'w> {
id: Option<String>,
workspace: &'w Workspace,
running: bool,
oom_killed: bool,
oom_kill_count: Option<u64>,
}
impl Container<'_> {
fn id(&self) -> &str {
self.id
.as_deref()
.expect("container has already been deleted")
}
}
impl fmt::Display for Container<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.id().fmt(f)
}
}
impl Container<'_> {
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn inspect(&self) -> Result<InspectContainer, CommandError> {
let output = Command::new(self.workspace, "docker")
.args(["inspect", self.id()])
.log_output(false)
.run_capture()?;
let mut data: Vec<InspectContainer> =
::serde_json::from_str(&output.stdout_lines().join("\n"))
.map_err(CommandError::InvalidDockerInspectOutput)?;
assert_eq!(data.len(), 1);
Ok(data.pop().unwrap())
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn start(&self) -> Result<(), CommandError> {
Command::new(self.workspace, "docker")
.args(["start", self.id()])
.log_output(false)
.run()
.map(|_| ())
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn exec_cat_file(&self, path: &str) -> Option<Vec<String>> {
Command::new(self.workspace, "docker")
.args(["exec", self.id(), "cat", path])
.log_output(false)
.log_command(false)
.run_capture()
.ok()
.map(|o| o.stdout_lines().to_vec())
}
fn record_oom_kill_count(&mut self) {
self.oom_kill_count = self.read_oom_kill_count();
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn read_memory_peak(&self) -> Option<u64> {
let paths = [
"/sys/fs/cgroup/memory.peak", "/sys/fs/cgroup/memory/memory.max_usage_in_bytes", ];
for path in paths {
if let Some(val) = self
.exec_cat_file(path)
.and_then(|lines| lines.first()?.trim().parse::<u64>().ok())
{
return Some(val);
}
}
None
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn read_oom_kill_count(&self) -> Option<u64> {
let paths = [
"/sys/fs/cgroup/memory.events", "/sys/fs/cgroup/memory/memory.oom_control", ];
for path in paths {
if let Some(lines) = self.exec_cat_file(path) {
for line in &lines {
if let Some(count) = line
.strip_prefix("oom_kill ")
.and_then(|rest| rest.trim().parse::<u64>().ok())
{
return Some(count);
}
}
return Some(0);
}
}
None
}
fn check_cgroup_oom(&mut self) -> bool {
let current = self.read_oom_kill_count();
let previous = self.oom_kill_count;
self.oom_kill_count = current;
current.unwrap_or_default() > previous.unwrap_or_default()
}
fn check_container_oom(&mut self, details: &InspectContainer) -> bool {
self.running = details.state.running;
let previous = self.oom_killed;
self.oom_killed = details.state.oom_killed;
details.state.oom_killed && !previous
}
fn is_running(&self) -> bool {
self.running
}
#[allow(clippy::too_many_arguments, clippy::type_complexity)]
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip_all, fields(container_id = %self.id(), capture))
)]
fn run_command(
&mut self,
command: SandboxCommand,
timeout: Option<Duration>,
no_output_timeout: Option<Duration>,
process_lines: Option<&mut dyn FnMut(&str, &mut ProcessLinesActions)>,
log_output: bool,
log_command: bool,
capture: bool,
) -> (SandboxStatistics, Result<ProcessOutput, CommandError>) {
let mut cmd = Command::new(self.workspace, "docker").arg("exec");
for (var, value) in command.env {
cmd = cmd
.arg("-e")
.arg(format!("{}={}", var.display(), value.display()));
}
if let Some(workdir) = command.workdir {
cmd = cmd.arg("-w").arg(workdir);
}
if let Some(user) = command.user {
cmd = cmd.arg("--user").arg(user);
}
cmd = cmd
.arg(self.id())
.args(command.cmd)
.timeout(timeout)
.log_output(log_output)
.log_command(log_command)
.no_output_timeout(no_output_timeout);
if let Some(f) = process_lines {
cmd = cmd.process_lines(f);
}
let res = cmd.run_inner(capture);
let statistics = SandboxStatistics {
memory_peak: self.read_memory_peak(),
};
let cgroup_oom = self.check_cgroup_oom();
let details = match self.inspect() {
Ok(details) => details,
Err(err) => return (statistics, Err(err)),
};
let container_oom = self.check_container_oom(&details);
let res = if container_oom || cgroup_oom {
Err(match res {
Ok(_) | Err(CommandError::ExecutionFailed { .. }) => CommandError::SandboxOOM,
Err(err) => err,
})
} else {
res
};
(statistics, res)
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn delete(&mut self) -> Result<(), CommandError> {
let Some(id) = self.id.take() else {
return Ok(());
};
if let Err(err) = Command::new(self.workspace, "docker")
.args(["rm", "-f", &id])
.run()
{
self.id = Some(id);
return Err(err);
}
Ok(())
}
}
impl Drop for Container<'_> {
fn drop(&mut self) {
if let Err(err) = self.delete() {
error!(
"docker rm failed, leaked sandbox container {}:\n{:?}",
self.id.as_deref().unwrap_or_default(),
err
);
}
}
}
impl<'w> Sandbox<'w> {
fn command_timed_out(res: &Result<ProcessOutput, CommandError>) -> bool {
matches!(
res,
Err(CommandError::NoOutputFor(_))
| Err(CommandError::Timeout(_))
| Err(CommandError::KillAfterTimeoutFailed(_))
)
}
pub fn statistics(&self) -> SandboxStatistics {
self.statistics.snapshot()
}
pub(crate) fn container_workdir(&self, path: &Path) -> Option<PathBuf> {
let relative = path.strip_prefix(&self.source_dir).ok()?;
Some(container_dirs::WORK_DIR.join(relative))
}
fn create_container(
builder: &SandboxBuilder,
workspace: &'w Workspace,
source_dir: &Path,
target_dir: &Path,
) -> Result<Container<'w>, CommandError> {
builder
.clone()
.mount(
source_dir,
&container_dirs::WORK_DIR,
builder.source_dir_mount_kind,
)
.mount(
target_dir,
&container_dirs::TARGET_DIR,
MountKind::ReadWrite,
)
.mount(
&workspace.cargo_home(),
&container_dirs::CARGO_HOME,
MountKind::ReadOnly,
)
.mount(
&workspace.rustup_home(),
&container_dirs::RUSTUP_HOME,
MountKind::ReadOnly,
)
.create_started(workspace)
}
fn ensure_reusable_container(&mut self) -> Result<(), CommandError> {
let needs_recreate = self
.container
.as_ref()
.is_none_or(|container| !container.is_running());
if needs_recreate {
self.container = Some(Self::create_container(
&self.builder,
self.workspace,
&self.source_dir,
&self.target_dir,
)?);
}
Ok(())
}
#[allow(clippy::too_many_arguments, clippy::type_complexity)]
#[cfg_attr(
feature = "tracing",
tracing::instrument(
skip_all,
fields(
image = %self.workspace.sandbox_image().name,
mounts = self.builder.mounts.len(),
memory_limit = ?self.builder.memory_limit,
cpu_limit = ?self.builder.cpu_limit,
cpuset_cpus = ?self.builder.cpuset_cpus,
enable_networking = self.builder.enable_networking,
capture,
timeout_secs = ?timeout.map(|timeout| timeout.as_secs()),
no_output_timeout_secs = ?no_output_timeout.map(|timeout| timeout.as_secs()),
)
)
)]
pub(crate) fn run(
&mut self,
command: SandboxCommand,
timeout: Option<Duration>,
no_output_timeout: Option<Duration>,
process_lines: Option<&mut dyn FnMut(&str, &mut ProcessLinesActions)>,
log_output: bool,
log_command: bool,
capture: bool,
) -> Result<ProcessOutput, CommandError> {
let container_workdir = match command.workdir {
Some(workdir) => self
.container_workdir(&workdir)
.expect("explicit workdir must be inside the sandbox source directory"),
None => container_dirs::WORK_DIR.clone(),
};
let command = SandboxCommand {
workdir: Some(container_workdir),
..command
};
self.ensure_reusable_container()?;
let (statistics, res) = self.container.as_mut().unwrap().run_command(
command,
timeout,
no_output_timeout,
process_lines,
log_output,
log_command,
capture,
);
self.statistics.merge(statistics);
if Self::command_timed_out(&res)
&& let Some(mut container) = self.container.take()
{
container.delete()?;
}
res
}
pub fn cleanup(&mut self) -> Result<SandboxStatistics, CommandError> {
if let Some(container) = self.container.as_mut() {
container.delete()?;
}
self.container = None;
Ok(self.statistics())
}
}
pub fn docker_running(workspace: &Workspace) -> bool {
info!("checking if the docker daemon is running");
Command::new(workspace, "docker")
.args(["info"])
.log_output(false)
.run()
.is_ok()
}
fn format_cpuset_cpus(cpus: &RangeInclusive<usize>) -> String {
format!("{}-{}", cpus.start(), cpus.end())
}
#[cfg(test)]
mod tests {
use super::*;
use test_case::test_case;
#[test]
fn formats_cpuset_cpus() {
assert_eq!(format_cpuset_cpus(&(2..=4)), "2-4");
}
const fn stats(peak: Option<u64>) -> SandboxStatistics {
SandboxStatistics { memory_peak: peak }
}
#[test_case(stats(None), stats(None), stats(None))]
#[test_case(stats(Some(100)), stats(None), stats(Some(100)))]
#[test_case(stats(None), stats(Some(100)), stats(Some(100)))]
#[test_case(stats(Some(300)), stats(Some(100)), stats(Some(300)))]
#[test_case(stats(Some(100)), stats(Some(300)), stats(Some(300)))]
#[test_case(stats(Some(42)), stats(Some(42)), stats(Some(42)))]
fn test_combine(lhs: SandboxStatistics, rhs: SandboxStatistics, expected: SandboxStatistics) {
{
let lhs = lhs.clone();
let rhs = rhs.clone();
assert_eq!(lhs.combine(rhs), expected);
}
{
let mut lhs = lhs.clone();
lhs.merge(rhs);
assert_eq!(lhs, expected);
}
}
#[test]
fn merge_accumulate_over_multiple() {
let mut s = stats(None);
s.merge(stats(Some(50)));
s.merge(stats(Some(200)));
s.merge(stats(None));
s.merge(stats(Some(150)));
assert_eq!(s.memory_peak, Some(200));
}
}