use std::fs;
use std::process::Command;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow, bail};
use rlmesh_grpc::EnvClient;
use serde::Serialize;
use tempfile::TempDir;
use uuid::Uuid;
use crate::source::ResolvedEnvironmentSourceRef;
use crate::{EffectiveSandboxSpec, EnvironmentSourceRef, hf, shell_quote};
const DEFAULT_CONTAINER_PORT: u16 = 50051;
const READY_PROBE_TIMEOUT: Duration = Duration::from_secs(2);
const CONTAINER_LOG_TAIL_BYTES: usize = 64 * 1024;
const OWNER_LABEL: &str = "rlmesh.sandbox=1";
const OWNER_LABEL_FILTER: &str = "label=rlmesh.sandbox=1";
const OWNER_PID_LABEL_KEY: &str = "rlmesh.sandbox.owner-pid";
const OWNER_PID_NS_LABEL_KEY: &str = "rlmesh.sandbox.owner-pid-ns";
const REAP_PS_FORMAT: &str = "{{.ID}}|{{.Label \"rlmesh.sandbox.owner-pid\"}}|{{.Label \"rlmesh.sandbox.owner-pid-ns\"}}|{{.State}}";
#[derive(Debug, Clone)]
pub struct BuildArtifact {
pub image_id: String,
}
#[derive(Debug, Clone)]
pub struct StartedContainer {
pub container_id: String,
pub address: String,
}
#[derive(Debug, Clone, Default)]
pub struct DockerBackend;
impl DockerBackend {
pub fn ensure_image(&self, spec: &EffectiveSandboxSpec) -> Result<BuildArtifact> {
self.ensure_image_tagged(spec, &spec.image_tag())
}
fn ensure_image_tagged(
&self,
spec: &EffectiveSandboxSpec,
image_tag: &str,
) -> Result<BuildArtifact> {
if let Some(image_id) = inspect_image_id(image_tag)? {
return Ok(BuildArtifact { image_id });
}
let tempdir = tempfile::tempdir().context("failed to create sandbox build context")?;
self.write_build_context(spec, &tempdir)?;
let build_memory = match spec.build_memory.as_deref() {
Some(raw) => resolve_build_memory(raw)?,
None => None,
};
let output = match &build_memory {
Some(memory) => {
let builder = ensure_bounded_builder(memory)?;
Command::new("docker")
.args([
"buildx",
"build",
"--builder",
&builder,
"--load",
"-t",
image_tag,
".",
])
.current_dir(tempdir.path())
.output()
.context("failed to invoke docker buildx build")?
}
None => Command::new("docker")
.args(["build", "-t", image_tag, "."])
.current_dir(tempdir.path())
.output()
.context("failed to invoke docker build")?,
};
if !output.status.success() {
bail!("docker build failed:\n{}", command_output(&output));
}
let image_id = inspect_image_id(image_tag)?
.ok_or_else(|| anyhow!("docker build completed but image id was not found"))?;
Ok(BuildArtifact { image_id })
}
pub async fn run_container_async(
&self,
spec: &EffectiveSandboxSpec,
artifact: &BuildArtifact,
) -> Result<StartedContainer> {
let container_name = format!("rlmesh-sandbox-{}-{}", spec.slug(), Uuid::new_v4().simple());
let bootstrap_json = render_bootstrap_json(spec)?;
let output = Command::new("docker")
.args(docker_run_args(
&container_name,
&artifact.image_id,
&bootstrap_json,
std::process::id(),
))
.output()
.context("failed to start docker container")?;
if !output.status.success() {
let _ = self.remove_container(&container_name);
bail!("docker run failed:\n{}", command_output(&output));
}
let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
let host_port = match resolve_published_port(&container_id) {
Ok(port) => port,
Err(err) => {
let _ = self.stop_container(&container_id);
return Err(err);
}
};
let address = format!("tcp://127.0.0.1:{host_port}");
if let Err(err) = wait_for_ready(&address, &container_id, Duration::from_secs(30)).await {
let hint = spec.rlmesh_package.skew_hint();
let report =
self.startup_failure_report(&container_id, &container_name, &err, hint.as_deref());
let _ = self.stop_container(&container_id);
return Err(report);
}
Ok(StartedContainer {
container_id,
address,
})
}
pub fn stop_container(&self, container_id: &str) -> Result<()> {
let Some(state) = inspect_container_state(container_id)? else {
return Ok(());
};
if state.status == "running" {
self.stop_running_container(container_id)?;
}
self.remove_container(container_id)
}
pub fn reap_orphaned_containers(&self) -> Result<Vec<String>> {
let candidates = list_owned_containers()?;
let self_pid = std::process::id();
let self_pid_namespace = current_pid_namespace_id();
let mut reaped = Vec::new();
for candidate in candidates {
if candidate.owner_pid == Some(self_pid) {
continue;
}
let owner_liveness = candidate
.owner_pid
.map_or(OwnerPidLiveness::Unknown, |pid| {
owner_pid_liveness(
pid,
candidate.owner_pid_namespace.as_deref(),
self_pid_namespace.as_deref(),
)
});
if !is_orphan(&candidate.status, candidate.owner_pid, owner_liveness) {
continue;
}
if self.stop_container(&candidate.id).is_ok() {
reaped.push(candidate.id);
}
}
Ok(reaped)
}
fn stop_running_container(&self, container_id: &str) -> Result<()> {
let output = Command::new("docker")
.args(["stop", container_id])
.output()
.context("failed to stop docker container")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
if stderr.contains("No such container") {
return Ok(());
}
bail!("docker stop failed: {}", stderr.trim());
}
Ok(())
}
fn remove_container(&self, container_id: &str) -> Result<()> {
let output = Command::new("docker")
.args(["rm", container_id])
.output()
.context("failed to remove docker container")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
if stderr.contains("No such container") || stderr.contains("No such object") {
return Ok(());
}
bail!("docker rm failed: {}", stderr.trim());
}
Ok(())
}
fn startup_failure_report(
&self,
container_id: &str,
container_name: &str,
cause: &anyhow::Error,
hint: Option<&str>,
) -> anyhow::Error {
let state = match inspect_container_state(container_id) {
Ok(Some(state)) => state.summary(),
Ok(None) => "container state: unavailable (container not found)".to_string(),
Err(err) => format!("container state: unavailable ({err})"),
};
let logs = match self.container_logs(container_id) {
Ok(logs) if logs.trim().is_empty() => "container logs: <empty>".to_string(),
Ok(logs) => format!(
"container logs:\n{}",
tail_text(&logs, CONTAINER_LOG_TAIL_BYTES)
),
Err(err) => format!("container logs: unavailable ({err})"),
};
anyhow!(format_startup_failure_report(
container_id,
container_name,
&cause.to_string(),
&state,
&logs,
hint,
))
}
fn container_logs(&self, container_id: &str) -> Result<String> {
let output = Command::new("docker")
.args(["logs", container_id])
.output()
.context("failed to read docker logs")?;
let mut logs = String::new();
logs.push_str(&String::from_utf8_lossy(&output.stdout));
if !output.stderr.is_empty() {
if !logs.is_empty() {
logs.push('\n');
}
logs.push_str(&String::from_utf8_lossy(&output.stderr));
}
Ok(logs)
}
fn write_build_context(&self, spec: &EffectiveSandboxSpec, tempdir: &TempDir) -> Result<()> {
if let Some(source_path) = spec.rlmesh_package.source_path() {
let filename = source_path
.file_name()
.ok_or_else(|| anyhow!("RLMesh wheel path must have a filename"))?;
let package_dir = tempdir.path().join("packages");
fs::create_dir_all(&package_dir)
.context("failed to create sandbox package build context")?;
fs::copy(source_path, package_dir.join(filename)).with_context(|| {
format!("failed to copy RLMesh wheel {}", source_path.display())
})?;
}
if let ResolvedEnvironmentSourceRef::Hf(source) = &spec.resolved_source {
let EnvironmentSourceRef::Hf(requested_source) = &spec.requested_source else {
bail!("resolved HF source did not match requested source");
};
hf::materialize_source(
requested_source,
&source.resolved_revision,
&tempdir.path().join("source"),
)?;
}
let dockerfile = render_dockerfile(spec)?;
fs::write(tempdir.path().join("Dockerfile"), dockerfile)
.context("failed to write generated Dockerfile")?;
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ContainerState {
status: String,
exit_code: Option<i32>,
error: String,
}
impl ContainerState {
fn is_terminal(&self) -> bool {
matches!(self.status.as_str(), "dead" | "exited")
}
fn summary(&self) -> String {
let exit_code = self
.exit_code
.map(|value| value.to_string())
.unwrap_or_else(|| "unknown".to_string());
if self.error.trim().is_empty() {
format!(
"container state: status={}, exit_code={exit_code}",
self.status
)
} else {
format!(
"container state: status={}, exit_code={exit_code}, error={}",
self.status, self.error
)
}
}
}
#[derive(Debug, Clone, Serialize)]
struct BootstrapConfigFile {
spec: BootstrapSpec,
}
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
enum BootstrapSpec {
Gym(GymBootstrapSpec),
Hf(HfBootstrapSpec),
}
impl BootstrapSpec {
fn from_effective_spec(spec: &EffectiveSandboxSpec) -> Result<Self> {
Ok(match &spec.resolved_source {
ResolvedEnvironmentSourceRef::Gym(source) => Self::Gym(GymBootstrapSpec {
env_id: source.env_id.clone(),
imports: spec.imports.clone(),
kwargs: spec.kwargs.clone(),
num_envs: spec.num_envs,
vectorization_mode: spec.vectorization_mode.as_str().to_string(),
}),
ResolvedEnvironmentSourceRef::Hf(source) => Self::Hf(HfBootstrapSpec {
source_subdir: "source".to_string(),
suite: source.suite.clone(),
task: source.task.clone(),
imports: spec.imports.clone(),
kwargs: spec.kwargs.clone(),
num_envs: spec.num_envs,
vectorization_mode: spec.vectorization_mode.as_str().to_string(),
}),
})
}
}
#[derive(Debug, Clone, Serialize)]
struct GymBootstrapSpec {
env_id: String,
imports: Vec<String>,
kwargs: std::collections::BTreeMap<String, serde_json::Value>,
num_envs: usize,
vectorization_mode: String,
}
#[derive(Debug, Clone, Serialize)]
struct HfBootstrapSpec {
source_subdir: String,
suite: Option<String>,
task: Option<String>,
imports: Vec<String>,
kwargs: std::collections::BTreeMap<String, serde_json::Value>,
num_envs: usize,
vectorization_mode: String,
}
async fn wait_for_ready(address: &str, container_id: &str, timeout: Duration) -> Result<()> {
let deadline = Instant::now() + timeout;
loop {
let probe = tokio::time::timeout(READY_PROBE_TIMEOUT, async {
let mut client = EnvClient::connect(address).await?;
client.handshake().await?;
Ok::<_, rlmesh_grpc::error::Error>(())
})
.await;
match probe {
Ok(Ok(())) => return Ok(()),
Ok(Err(err)) => {
if let Some(state) = container_terminated(container_id) {
bail!("sandbox container exited before ready ({state})");
}
if err.is_fatal_handshake() || Instant::now() >= deadline {
return Err(err.into());
}
tokio::time::sleep(Duration::from_millis(300)).await;
}
Err(_) if Instant::now() < deadline => {
if let Some(state) = container_terminated(container_id) {
bail!("sandbox container exited before ready ({state})");
}
tokio::time::sleep(Duration::from_millis(300)).await;
}
Err(_) => bail!(
"sandbox container did not respond within {} seconds",
timeout.as_secs()
),
}
}
}
fn container_terminated(container_id: &str) -> Option<String> {
confirmed_terminal_summary(inspect_container_state(container_id))
}
fn confirmed_terminal_summary(inspected: Result<Option<ContainerState>>) -> Option<String> {
match inspected {
Ok(Some(state)) if state.is_terminal() => Some(state.summary()),
_ => None,
}
}
fn render_dockerfile(spec: &EffectiveSandboxSpec) -> Result<String> {
validate_dockerfile_token("base_image", &spec.base_image)?;
let source_copy = match &spec.resolved_source {
ResolvedEnvironmentSourceRef::Gym(_) => "",
ResolvedEnvironmentSourceRef::Hf(_) => "COPY source /opt/rlmesh/source\n",
};
let package_copy = if spec.rlmesh_package.source_path().is_some() {
"COPY packages /opt/rlmesh/packages\n"
} else {
""
};
let package_command = render_package_install_command(spec);
Ok(format!(
"# syntax=docker/dockerfile:1.7\n\n\
FROM {}\n\n\
ENV RLMESH_PORT={DEFAULT_CONTAINER_PORT}\n\
ENV RLMESH_ENV_PORT={DEFAULT_CONTAINER_PORT}\n\
ENV PYTHONUNBUFFERED=1\n\n\
WORKDIR /opt/rlmesh\n\
{}\
{}\
\n\
RUN sh -lc {}\n\n\
EXPOSE {DEFAULT_CONTAINER_PORT}\n\
ENTRYPOINT [\"python\", \"-m\", \"rlmesh._bootstrap.sandbox_env\"]\n",
spec.base_image,
source_copy,
package_copy,
shell_quote(&package_command),
))
}
fn render_bootstrap_json(spec: &EffectiveSandboxSpec) -> Result<String> {
serde_json::to_string(&BootstrapConfigFile {
spec: BootstrapSpec::from_effective_spec(spec)?,
})
.context("failed to serialize sandbox bootstrap payload")
}
fn render_package_install_command(spec: &EffectiveSandboxSpec) -> String {
let mut parts = vec![
"python -m pip install --no-cache-dir --upgrade pip".to_string(),
format!(
"python -m pip install --no-cache-dir {}",
shell_quote(spec.rlmesh_package.install_ref())
),
"python -m pip install --no-cache-dir gymnasium".to_string(),
];
if matches!(&spec.resolved_source, ResolvedEnvironmentSourceRef::Hf(_)) {
parts.push(
"if [ -f /opt/rlmesh/source/requirements.txt ]; then python -m pip install --no-cache-dir -r /opt/rlmesh/source/requirements.txt; fi"
.to_string(),
);
}
if !spec.packages.is_empty() {
let package_args = spec
.packages
.iter()
.map(|package| shell_quote(package))
.collect::<Vec<_>>()
.join(" ");
parts.push(format!(
"python -m pip install --no-cache-dir {package_args}"
));
}
parts.join(" && ")
}
fn validate_dockerfile_token(label: &str, value: &str) -> Result<()> {
anyhow::ensure!(
!value.contains('\n') && !value.contains('\r'),
"{label} must not contain newlines"
);
Ok(())
}
fn command_output(output: &std::process::Output) -> String {
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = stdout.trim();
let stderr = stderr.trim();
let mut sections = Vec::new();
if !stdout.is_empty() {
sections.push(format!("stdout:\n{stdout}"));
}
if !stderr.is_empty() {
sections.push(format!("stderr:\n{stderr}"));
}
if sections.is_empty() {
sections.push(format!("exit status: {}", output.status));
}
sections.join("\n")
}
const BUILD_MEMORY_HEADROOM: u64 = 8 * 1024 * 1024 * 1024;
const BUILD_MEMORY_FLOOR: u64 = 4 * 1024 * 1024 * 1024;
fn resolve_build_memory(raw: &str) -> Result<Option<String>> {
match raw.trim().to_ascii_lowercase().as_str() {
"" | "off" | "none" | "unbounded" => Ok(None),
"auto" => Ok(Some(auto_build_memory())),
_ => {
let value = raw.trim();
if is_docker_size(value) {
Ok(Some(value.to_string()))
} else {
bail!(
"build_memory must be a docker size like \"20g\", or \"auto\"/\"off\"; got {value:?}"
)
}
}
}
}
fn is_docker_size(value: &str) -> bool {
let lower = value.trim().to_ascii_lowercase();
let without_b = lower.strip_suffix('b').unwrap_or(&lower);
let number = without_b
.strip_suffix(['k', 'm', 'g', 't'])
.unwrap_or(without_b);
!number.is_empty() && number.parse::<f64>().is_ok_and(|n| n > 0.0)
}
fn auto_build_memory() -> String {
let total = host_total_ram_bytes().unwrap_or(BUILD_MEMORY_FLOOR + BUILD_MEMORY_HEADROOM);
bounded_build_memory(total).to_string()
}
fn bounded_build_memory(total_ram: u64) -> u64 {
total_ram
.saturating_sub(BUILD_MEMORY_HEADROOM)
.max(BUILD_MEMORY_FLOOR)
}
fn host_total_ram_bytes() -> Option<u64> {
proc_meminfo_total().or_else(sysctl_memsize)
}
fn proc_meminfo_total() -> Option<u64> {
fs::read_to_string("/proc/meminfo")
.ok()?
.lines()
.find_map(|line| line.strip_prefix("MemTotal:"))
.and_then(|rest| rest.split_whitespace().next())
.and_then(|kb| kb.parse::<u64>().ok())
.map(|kb| kb * 1024)
}
fn sysctl_memsize() -> Option<u64> {
let output = Command::new("sysctl")
.args(["-n", "hw.memsize"])
.output()
.ok()?;
if !output.status.success() {
return None;
}
String::from_utf8_lossy(&output.stdout).trim().parse().ok()
}
fn builder_name(memory: &str) -> String {
let suffix: String = memory
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() {
c.to_ascii_lowercase()
} else {
'-'
}
})
.collect();
format!("rlmesh-build-{suffix}")
}
fn ensure_bounded_builder(memory: &str) -> Result<String> {
let name = builder_name(memory);
if builder_exists(&name)? {
return Ok(name);
}
let output = Command::new("docker")
.args([
"buildx",
"create",
"--name",
&name,
"--driver",
"docker-container",
"--driver-opt",
&format!("memory={memory}"),
"--driver-opt",
&format!("memory-swap={memory}"),
])
.output()
.context("failed to create bounded docker buildx builder")?;
if !output.status.success() {
if builder_exists(&name)? {
return Ok(name);
}
bail!(
"failed to create bounded docker buildx builder:\n{}",
command_output(&output)
);
}
Ok(name)
}
fn builder_exists(name: &str) -> Result<bool> {
Ok(Command::new("docker")
.args(["buildx", "inspect", name])
.output()
.context("failed to inspect docker buildx builder")?
.status
.success())
}
fn format_startup_failure_report(
container_id: &str,
container_name: &str,
cause: &str,
state: &str,
logs: &str,
hint: Option<&str>,
) -> String {
let mut report = format!(
"sandbox container did not become ready: {cause}\ncontainer id: {container_id}\ncontainer name: {container_name}\n{state}\n{logs}"
);
if let Some(hint) = hint {
report.push_str("\nhint: ");
report.push_str(hint);
}
report
}
fn docker_run_args(
container_name: &str,
image_id: &str,
bootstrap_json: &str,
owner_pid: u32,
) -> Vec<String> {
let mut args = vec![
"run".to_string(),
"-d".to_string(),
"--cap-drop".to_string(),
"ALL".to_string(),
"--security-opt".to_string(),
"no-new-privileges".to_string(),
"--label".to_string(),
OWNER_LABEL.to_string(),
"--label".to_string(),
format!("{OWNER_PID_LABEL_KEY}={owner_pid}"),
];
if let Some(pid_namespace) = current_pid_namespace_id() {
args.extend([
"--label".to_string(),
format!("{OWNER_PID_NS_LABEL_KEY}={pid_namespace}"),
]);
}
args.extend([
"--env".to_string(),
format!("RLMESH_BOOTSTRAP_JSON={bootstrap_json}"),
"--name".to_string(),
container_name.to_string(),
"-p".to_string(),
format!("127.0.0.1:0:{DEFAULT_CONTAINER_PORT}"),
image_id.to_string(),
]);
args
}
fn resolve_published_port(container_id: &str) -> Result<u16> {
let output = Command::new("docker")
.args(["port", container_id, &DEFAULT_CONTAINER_PORT.to_string()])
.output()
.context("failed to read published docker port")?;
if !output.status.success() {
bail!(
"docker port failed: {}",
String::from_utf8_lossy(&output.stderr).trim()
);
}
let stdout = String::from_utf8_lossy(&output.stdout);
parse_published_port(&stdout).ok_or_else(|| {
anyhow!("could not parse published port from docker port output: {stdout:?}")
})
}
fn parse_published_port(raw: &str) -> Option<u16> {
raw.lines()
.filter_map(|line| line.trim().rsplit_once(':'))
.find_map(|(_, port)| port.trim().parse::<u16>().ok())
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct OwnedContainer {
id: String,
owner_pid: Option<u32>,
owner_pid_namespace: Option<String>,
status: String,
}
fn list_owned_containers() -> Result<Vec<OwnedContainer>> {
let output = Command::new("docker")
.args([
"ps",
"--all",
"--no-trunc",
"--filter",
OWNER_LABEL_FILTER,
"--format",
REAP_PS_FORMAT,
])
.output()
.context("failed to list rlmesh sandbox containers")?;
if !output.status.success() {
bail!(
"docker ps failed: {}",
String::from_utf8_lossy(&output.stderr).trim()
);
}
Ok(parse_owned_containers(&String::from_utf8_lossy(
&output.stdout,
)))
}
fn parse_owned_containers(raw: &str) -> Vec<OwnedContainer> {
raw.lines()
.filter_map(|line| {
let mut fields = line.split('|');
let id = fields.next()?.trim();
let owner_pid = fields.next().unwrap_or("").trim();
let owner_pid_namespace_or_status = fields.next().unwrap_or("").trim();
let status = fields.next();
let (owner_pid_namespace, status) = match status {
Some(status) => (
(!owner_pid_namespace_or_status.is_empty())
.then(|| owner_pid_namespace_or_status.to_string()),
status.trim(),
),
None => (None, owner_pid_namespace_or_status),
};
if id.is_empty() || status.is_empty() {
return None;
}
Some(OwnedContainer {
id: id.to_string(),
owner_pid: owner_pid.parse::<u32>().ok(),
owner_pid_namespace,
status: status.to_string(),
})
})
.collect()
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum OwnerPidLiveness {
Alive,
Dead,
Unknown,
}
fn is_orphan(status: &str, owner_pid: Option<u32>, owner_liveness: OwnerPidLiveness) -> bool {
match owner_pid {
Some(_) => owner_liveness == OwnerPidLiveness::Dead,
None => !is_running_status(status),
}
}
fn is_running_status(status: &str) -> bool {
status.eq_ignore_ascii_case("running")
}
fn owner_pid_liveness(
pid: u32,
owner_pid_namespace: Option<&str>,
self_pid_namespace: Option<&str>,
) -> OwnerPidLiveness {
match (owner_pid_namespace, self_pid_namespace) {
(Some(owner), Some(current)) if owner == current => {
if pid_is_alive(pid) {
OwnerPidLiveness::Alive
} else {
OwnerPidLiveness::Dead
}
}
_ => OwnerPidLiveness::Unknown,
}
}
fn current_pid_namespace_id() -> Option<String> {
fs::read_link("/proc/self/ns/pid")
.ok()
.map(|path| path.to_string_lossy().into_owned())
}
fn pid_is_alive(pid: u32) -> bool {
#[cfg(target_os = "linux")]
{
std::path::Path::new(&format!("/proc/{pid}")).exists()
}
#[cfg(not(target_os = "linux"))]
{
std::process::Command::new("kill")
.args(["-0", &pid.to_string()])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.is_ok_and(|s| s.success())
}
}
fn inspect_container_state(container_id: &str) -> Result<Option<ContainerState>> {
let output = Command::new("docker")
.args([
"inspect",
"--format",
"{{.State.Status}}\n{{.State.ExitCode}}\n{{.State.Error}}",
container_id,
])
.output()
.context("failed to inspect docker container")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
if stderr.contains("No such container") || stderr.contains("No such object") {
return Ok(None);
}
bail!("docker inspect failed: {}", stderr.trim());
}
parse_container_state(&String::from_utf8_lossy(&output.stdout)).map(Some)
}
fn parse_container_state(raw: &str) -> Result<ContainerState> {
let mut lines = raw.lines();
let status = lines.next().unwrap_or_default().trim().to_string();
if status.is_empty() {
bail!("docker inspect did not report container status");
}
let exit_code = lines
.next()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::parse::<i32>)
.transpose()
.context("docker inspect reported invalid container exit code")?;
let error = lines.collect::<Vec<_>>().join("\n").trim().to_string();
Ok(ContainerState {
status,
exit_code,
error,
})
}
fn tail_text(value: &str, max_bytes: usize) -> String {
if value.len() <= max_bytes {
return value.to_string();
}
let mut start = value.len() - max_bytes;
while !value.is_char_boundary(start) {
start += 1;
}
format!(
"[truncated to last {max_bytes} bytes]\n{}",
value[start..].trim_start()
)
}
fn inspect_image_id(image_ref: &str) -> Result<Option<String>> {
let output = Command::new("docker")
.args(["image", "inspect", "--format", "{{.Id}}", image_ref])
.output()
.context("failed to inspect docker image id")?;
if !output.status.success() {
return Ok(None);
}
Ok(Some(
String::from_utf8_lossy(&output.stdout).trim().to_string(),
))
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use anyhow::anyhow;
use serde_json::json;
use super::{
BootstrapSpec, ContainerState, OwnedContainer, OwnerPidLiveness, bounded_build_memory,
builder_name, confirmed_terminal_summary, current_pid_namespace_id, docker_run_args,
format_startup_failure_report, is_docker_size, is_orphan, owner_pid_liveness,
parse_container_state, parse_owned_containers, parse_published_port, pid_is_alive,
render_bootstrap_json, render_dockerfile, resolve_build_memory, shell_quote, tail_text,
};
use crate::source::{ResolvedEnvironmentSourceRef, ResolvedHfSourceRef};
use crate::{
EffectiveSandboxSpec, EnvironmentSourceRef, GymSourceRef, ResolvedRlmeshPackage,
VectorizationMode,
};
fn pip_rlmesh_package() -> ResolvedRlmeshPackage {
ResolvedRlmeshPackage::Pip {
spec: "rlmesh==0.1.0b2".to_string(),
}
}
fn gym_spec() -> EffectiveSandboxSpec {
EffectiveSandboxSpec {
schema_version: crate::BOOTSTRAP_SCHEMA_VERSION,
requested_source: EnvironmentSourceRef::parse("CartPole-v1").unwrap(),
resolved_source: ResolvedEnvironmentSourceRef::Gym(GymSourceRef {
env_id: "CartPole-v1".to_string(),
}),
base_image: "python:3.12-slim".to_string(),
rlmesh_package: pip_rlmesh_package(),
packages: vec![],
imports: vec![],
kwargs: BTreeMap::new(),
num_envs: 1,
vectorization_mode: VectorizationMode::Sync,
build_memory: None,
build_hash: "abcdef0123456789".to_string(),
}
}
#[test]
fn dockerfile_installs_rlmesh_gymnasium_and_packages() {
let spec = EffectiveSandboxSpec {
packages: vec!["pygame".to_string()],
..gym_spec()
};
let dockerfile = render_dockerfile(&spec).unwrap();
assert!(dockerfile.contains("FROM python:3.12-slim"));
assert!(dockerfile.contains("rlmesh==0.1.0b2"));
assert!(dockerfile.contains("gymnasium"));
assert!(dockerfile.contains("pygame"));
assert!(dockerfile.contains("rlmesh._bootstrap.sandbox_env"));
assert!(!dockerfile.contains("COPY source"));
assert!(!dockerfile.contains("COPY bootstrap.json"));
assert!(!dockerfile.contains("bootstrap.json"));
}
#[test]
fn build_memory_resolution_ceiling_and_builder_name() {
const GIB: u64 = 1024 * 1024 * 1024;
assert_eq!(bounded_build_memory(32 * GIB), 24 * GIB);
assert_eq!(bounded_build_memory(8 * GIB), 4 * GIB);
assert_eq!(bounded_build_memory(0), 4 * GIB);
assert_eq!(resolve_build_memory("off").unwrap(), None);
assert_eq!(resolve_build_memory("").unwrap(), None);
assert_eq!(resolve_build_memory("20g").unwrap().as_deref(), Some("20g"));
assert!(
resolve_build_memory("auto")
.unwrap()
.is_some_and(|m| m.parse::<u64>().is_ok())
);
for bad in ["garbage", ";;;", "20 g", "-5g", "0"] {
assert!(
resolve_build_memory(bad).is_err(),
"{bad} should be rejected"
);
}
assert!(
["20g", "2048m", "1.5g", "1073741824", "512k", "1t"]
.iter()
.all(|s| is_docker_size(s))
);
assert!(
!["", "g", "0", "-5g", "garbage", "20 g", "b"]
.iter()
.any(|s| is_docker_size(s))
);
assert_eq!(builder_name("20g"), "rlmesh-build-20g");
assert_eq!(builder_name("25769803776"), "rlmesh-build-25769803776");
}
#[test]
fn docker_run_args_do_not_auto_remove_container() {
let args = docker_run_args("rlmesh-sandbox-test", "sha256:abc", "{}", 4242);
assert_eq!(args.first().map(String::as_str), Some("run"));
assert!(args.iter().any(|arg| arg == "-d"));
assert!(!args.iter().any(|arg| arg == "--rm"));
assert!(args.iter().any(|arg| arg == "--cap-drop"));
assert!(args.iter().any(|arg| arg == "no-new-privileges"));
assert!(args.iter().any(|arg| arg == "rlmesh-sandbox-test"));
}
#[test]
fn docker_run_args_publish_ephemeral_host_port() {
let args = docker_run_args("rlmesh-sandbox-test", "sha256:abc", "{}", 4242);
assert!(args.iter().any(|arg| arg == "127.0.0.1:0:50051"));
assert!(!args.iter().any(|arg| arg.starts_with("127.0.0.1:4")));
}
#[test]
fn docker_run_args_label_container_for_reaping() {
let args = docker_run_args("rlmesh-sandbox-test", "sha256:abc", "{}", 4242);
let label_idx = args.iter().position(|arg| arg == "--label");
assert!(label_idx.is_some(), "containers must carry an owner label");
assert_eq!(
args.get(label_idx.unwrap() + 1).map(String::as_str),
Some("rlmesh.sandbox=1")
);
}
#[test]
fn docker_run_args_stamp_owner_pid_label() {
let args = docker_run_args("rlmesh-sandbox-test", "sha256:abc", "{}", 4242);
assert!(
args.iter()
.any(|arg| arg == "rlmesh.sandbox.owner-pid=4242"),
"containers must record their owner pid: {args:?}"
);
}
#[test]
fn docker_run_args_stamp_owner_pid_namespace_when_available() {
let Some(pid_namespace) = current_pid_namespace_id() else {
return;
};
let args = docker_run_args("rlmesh-sandbox-test", "sha256:abc", "{}", 4242);
assert!(
args.iter()
.any(|arg| arg == &format!("rlmesh.sandbox.owner-pid-ns={pid_namespace}")),
"containers must record their owner pid namespace when available: {args:?}"
);
}
#[test]
fn docker_run_args_inject_bootstrap_payload_at_run_time() {
let args = docker_run_args(
"rlmesh-sandbox-test",
"sha256:abc",
"{\"spec\":{\"kind\":\"gym\"}}",
4242,
);
let env_idx = args.iter().position(|arg| arg == "--env");
assert!(env_idx.is_some(), "bootstrap must be passed via --env");
assert_eq!(
args.get(env_idx.unwrap() + 1).map(String::as_str),
Some("RLMESH_BOOTSTRAP_JSON={\"spec\":{\"kind\":\"gym\"}}")
);
}
#[test]
fn render_bootstrap_json_carries_runtime_params() {
let mut kwargs = BTreeMap::new();
kwargs.insert("render_mode".to_string(), json!("rgb_array"));
let spec = EffectiveSandboxSpec {
kwargs,
num_envs: 4,
vectorization_mode: VectorizationMode::Async,
..gym_spec()
};
let json = render_bootstrap_json(&spec).unwrap();
assert!(json.contains("rgb_array"));
assert!(json.contains("\"num_envs\":4"));
assert!(json.contains("async"));
}
#[test]
fn transient_inspect_failure_does_not_confirm_termination() {
let running = ContainerState {
status: "running".to_string(),
exit_code: None,
error: String::new(),
};
let exited = ContainerState {
status: "exited".to_string(),
exit_code: Some(0),
error: String::new(),
};
assert!(confirmed_terminal_summary(Ok(Some(exited))).is_some());
assert!(confirmed_terminal_summary(Ok(Some(running))).is_none());
assert!(confirmed_terminal_summary(Ok(None)).is_none());
assert!(
confirmed_terminal_summary(Err(anyhow!("docker inspect failed: daemon busy")))
.is_none()
);
}
#[test]
fn parse_owned_containers_reads_id_pid_and_status() {
let parsed = parse_owned_containers(
"abc123|4242|pid:[4026531836]|running\ndef456|7|pid:[4026531837]|exited\n",
);
assert_eq!(
parsed,
vec![
OwnedContainer {
id: "abc123".to_string(),
owner_pid: Some(4242),
owner_pid_namespace: Some("pid:[4026531836]".to_string()),
status: "running".to_string(),
},
OwnedContainer {
id: "def456".to_string(),
owner_pid: Some(7),
owner_pid_namespace: Some("pid:[4026531837]".to_string()),
status: "exited".to_string(),
},
]
);
}
#[test]
fn parse_owned_containers_handles_legacy_unlabeled_pid() {
let parsed = parse_owned_containers("abc123|||running\nxyz789|notapid||exited\n");
assert_eq!(
parsed,
vec![
OwnedContainer {
id: "abc123".to_string(),
owner_pid: None,
owner_pid_namespace: None,
status: "running".to_string(),
},
OwnedContainer {
id: "xyz789".to_string(),
owner_pid: None,
owner_pid_namespace: None,
status: "exited".to_string(),
},
]
);
assert!(parse_owned_containers("\n \n").is_empty());
assert!(parse_owned_containers("").is_empty());
}
#[test]
fn is_orphan_spares_containers_owned_by_a_live_process() {
assert!(!is_orphan("running", Some(4242), OwnerPidLiveness::Alive));
assert!(!is_orphan("exited", Some(4242), OwnerPidLiveness::Alive));
}
#[test]
fn is_orphan_reaps_containers_whose_owner_is_gone() {
assert!(is_orphan("running", Some(4242), OwnerPidLiveness::Dead));
assert!(is_orphan("exited", Some(4242), OwnerPidLiveness::Dead));
}
#[test]
fn is_orphan_spares_containers_when_owner_liveness_is_unknown() {
assert!(!is_orphan("running", Some(4242), OwnerPidLiveness::Unknown));
assert!(!is_orphan("exited", Some(4242), OwnerPidLiveness::Unknown));
}
#[test]
fn is_orphan_treats_legacy_unlabeled_containers_conservatively() {
assert!(is_orphan("exited", None, OwnerPidLiveness::Unknown));
assert!(is_orphan("dead", None, OwnerPidLiveness::Unknown));
assert!(is_orphan("created", None, OwnerPidLiveness::Unknown));
assert!(!is_orphan("running", None, OwnerPidLiveness::Unknown));
assert!(!is_orphan("Running", None, OwnerPidLiveness::Unknown));
}
#[test]
fn owner_pid_liveness_is_unknown_without_matching_namespace() {
assert_eq!(
owner_pid_liveness(4242, None, Some("pid:[1]")),
OwnerPidLiveness::Unknown
);
assert_eq!(
owner_pid_liveness(4242, Some("pid:[2]"), Some("pid:[1]")),
OwnerPidLiveness::Unknown
);
assert_eq!(
owner_pid_liveness(4242, Some("pid:[1]"), None),
OwnerPidLiveness::Unknown
);
}
#[test]
fn pid_is_alive_detects_current_process() {
assert!(pid_is_alive(std::process::id()));
assert!(!pid_is_alive(u32::MAX));
}
#[test]
fn parse_published_port_reads_host_port() {
assert_eq!(parse_published_port("127.0.0.1:49153\n"), Some(49153));
assert_eq!(parse_published_port("0.0.0.0:50000"), Some(50000));
assert_eq!(parse_published_port("[::]:51000\n"), Some(51000));
assert_eq!(
parse_published_port("0.0.0.0:49153\n[::]:49153\n"),
Some(49153)
);
assert_eq!(parse_published_port(""), None);
assert_eq!(parse_published_port("garbage"), None);
}
#[test]
fn container_state_parses_exit_details() {
let state = parse_container_state("exited\n2\nboom\n").unwrap();
assert_eq!(
state,
ContainerState {
status: "exited".to_string(),
exit_code: Some(2),
error: "boom".to_string(),
}
);
assert!(state.is_terminal());
assert_eq!(
state.summary(),
"container state: status=exited, exit_code=2, error=boom"
);
}
#[test]
fn startup_failure_report_includes_container_context() {
let message = format_startup_failure_report(
"abc123",
"rlmesh-sandbox-test",
"connection refused",
"container state: status=exited, exit_code=1",
"container logs:\ntraceback",
None,
);
assert!(message.contains("connection refused"));
assert!(message.contains("container id: abc123"));
assert!(message.contains("container name: rlmesh-sandbox-test"));
assert!(message.contains("exit_code=1"));
assert!(message.contains("traceback"));
}
#[test]
fn startup_failure_report_appends_skew_hint() {
let with_hint = format_startup_failure_report(
"abc123",
"rlmesh-sandbox-test",
"handshake failed",
"container state: status=exited, exit_code=2",
"container logs:\nusage: ...",
Some("pass rlmesh_package=\"local\""),
);
assert!(with_hint.contains("hint: pass rlmesh_package=\"local\""));
}
#[test]
fn tail_text_keeps_log_tail() {
let value = "alpha\nbeta\ngamma";
assert_eq!(tail_text(value, value.len()), value);
let tail = tail_text(value, 10);
assert!(tail.starts_with("[truncated to last 10 bytes]"));
assert!(tail.ends_with("eta\ngamma"));
assert!(!tail.ends_with("alpha"));
}
#[test]
fn dockerfile_copies_hf_source_and_installs_requirements() {
let spec = EffectiveSandboxSpec {
schema_version: crate::BOOTSTRAP_SCHEMA_VERSION,
requested_source: EnvironmentSourceRef::parse("hf://org/repo@main:suite").unwrap(),
resolved_source: ResolvedEnvironmentSourceRef::Hf(ResolvedHfSourceRef {
repo: "org/repo".to_string(),
resolved_revision: "0123456789abcdef0123456789abcdef01234567".to_string(),
suite: Some("suite".to_string()),
task: None,
}),
base_image: "python:3.12-slim".to_string(),
rlmesh_package: pip_rlmesh_package(),
packages: vec!["numpy==2.0.0".to_string()],
imports: vec![],
kwargs: BTreeMap::new(),
num_envs: 1,
vectorization_mode: VectorizationMode::Sync,
build_memory: None,
build_hash: "abcdef0123456789".to_string(),
};
let dockerfile = render_dockerfile(&spec).unwrap();
assert!(dockerfile.contains("COPY source /opt/rlmesh/source"));
assert!(dockerfile.contains("/opt/rlmesh/source/requirements.txt"));
assert!(dockerfile.contains("numpy==2.0.0"));
}
#[test]
fn dockerfile_copies_and_installs_rlmesh_wheel_package() {
let spec = EffectiveSandboxSpec {
rlmesh_package: ResolvedRlmeshPackage::Wheel {
source_path: "/tmp/rlmesh-0.1.0b2-cp311-abi3-manylinux_x86_64.whl".into(),
install_path: "/opt/rlmesh/packages/rlmesh-0.1.0b2-cp311-abi3-manylinux_x86_64.whl"
.to_string(),
sha256: "abc".to_string(),
},
..gym_spec()
};
let dockerfile = render_dockerfile(&spec).unwrap();
assert!(dockerfile.contains("COPY packages /opt/rlmesh/packages"));
assert!(
dockerfile
.contains("/opt/rlmesh/packages/rlmesh-0.1.0b2-cp311-abi3-manylinux_x86_64.whl")
);
}
#[test]
fn hf_bootstrap_spec_includes_suite_and_task() {
let spec = EffectiveSandboxSpec {
schema_version: crate::BOOTSTRAP_SCHEMA_VERSION,
requested_source: EnvironmentSourceRef::parse("hf://org/repo@main:suite/0").unwrap(),
resolved_source: ResolvedEnvironmentSourceRef::Hf(ResolvedHfSourceRef {
repo: "org/repo".to_string(),
resolved_revision: "0123456789abcdef0123456789abcdef01234567".to_string(),
suite: Some("suite".to_string()),
task: Some("0".to_string()),
}),
base_image: "python:3.12-slim".to_string(),
rlmesh_package: pip_rlmesh_package(),
packages: vec![],
imports: vec![],
kwargs: BTreeMap::new(),
num_envs: 1,
vectorization_mode: VectorizationMode::Sync,
build_memory: None,
build_hash: "abcdef0123456789".to_string(),
};
let BootstrapSpec::Hf(bootstrap) = BootstrapSpec::from_effective_spec(&spec).unwrap()
else {
panic!("expected HF bootstrap spec");
};
assert_eq!(bootstrap.suite.as_deref(), Some("suite"));
assert_eq!(bootstrap.task.as_deref(), Some("0"));
}
#[test]
fn shell_quote_handles_single_quotes() {
assert_eq!(shell_quote("pkg=='x'"), "'pkg=='\"'\"'x'\"'\"''");
}
#[test]
fn bootstrap_spec_includes_kwargs() {
let mut kwargs = BTreeMap::new();
kwargs.insert("render_mode".to_string(), json!("rgb_array"));
let spec = EffectiveSandboxSpec {
imports: vec!["my_envs".to_string()],
kwargs,
..gym_spec()
};
match &spec.resolved_source {
ResolvedEnvironmentSourceRef::Gym(source) => assert_eq!(source.env_id, "CartPole-v1"),
_ => panic!("expected gym source"),
}
assert_eq!(spec.imports, vec!["my_envs"]);
assert_eq!(spec.kwargs["render_mode"], json!("rgb_array"));
}
}