#![cfg(target_os = "linux")]
use noether_core::effects::{Effect, EffectSet};
use noether_engine::executor::isolation::{build_bwrap_command, find_bwrap, IsolationPolicy};
use serde_json::{json, Value};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
fn nix_python3() -> Option<PathBuf> {
let out = Command::new("nix")
.args([
"build",
"--no-link",
"--print-out-paths",
"--quiet",
"nixpkgs#python3",
])
.output()
.ok()?;
if !out.status.success() {
return None;
}
let store_path = String::from_utf8(out.stdout).ok()?;
let store_path = store_path.trim();
if store_path.is_empty() {
return None;
}
let python = Path::new(store_path).join("bin").join("python3");
python.exists().then_some(python)
}
fn deps() -> Option<(PathBuf, PathBuf)> {
let bwrap = find_bwrap()?;
let python = nix_python3()?;
Some((bwrap, python))
}
fn skip_if_deps_missing() -> Option<(PathBuf, PathBuf)> {
match deps() {
Some(d) => Some(d),
None => {
eprintln!(
"isolation_escape: skipping — missing bwrap or nix-built \
python3; both are required to drive real sandbox escape \
probes. Unit tests in `executor::isolation` still verify \
the argv-construction contract."
);
None
}
}
}
fn run_attack(bwrap: &Path, python: &Path, policy: &IsolationPolicy, code: &str) -> Value {
let inner = vec![
python.to_string_lossy().into_owned(),
"-c".into(),
code.into(),
];
let mut cmd = build_bwrap_command(bwrap, policy, &inner);
cmd.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let child = match cmd.spawn() {
Ok(c) => c,
Err(e) => return json!({ "spawn_error": format!("{e}") }),
};
let out = match child.wait_with_output() {
Ok(o) => o,
Err(e) => return json!({ "wait_error": format!("{e}") }),
};
let stdout = String::from_utf8_lossy(&out.stdout);
let stderr = String::from_utf8_lossy(&out.stderr);
if !out.status.success() {
return json!({
"exit_failure": out.status.code(),
"stderr": stderr.to_string(),
"stdout": stdout.to_string(),
});
}
serde_json::from_str(stdout.trim()).unwrap_or_else(
|_| json!({ "unparseable_stdout": stdout.to_string(), "stderr": stderr.to_string() }),
)
}
fn assert_ran(result: &Value) {
for bad in [
"spawn_error",
"wait_error",
"exit_failure",
"unparseable_stdout",
] {
assert!(
result.get(bad).is_none(),
"sandboxed probe did not run cleanly ({bad}): {result}"
);
}
}
#[test]
fn network_blocked_when_effect_not_declared() {
let Some((bwrap, python)) = skip_if_deps_missing() else {
return;
};
let policy = IsolationPolicy::from_effects(&EffectSet::pure());
let code = r#"
import socket, json
try:
socket.gethostbyname("example.com")
print(json.dumps({"blocked": False}))
except OSError as e:
print(json.dumps({"blocked": True, "errno": e.errno}))
"#;
let result = run_attack(&bwrap, &python, &policy, code);
assert_ran(&result);
assert_eq!(
result.get("blocked"),
Some(&json!(true)),
"pure-effect stage resolved DNS — network namespace not \
unshared: {result}"
);
}
#[test]
fn network_allowed_when_effect_declared() {
let Some((bwrap, python)) = skip_if_deps_missing() else {
return;
};
let policy = IsolationPolicy::from_effects(&EffectSet::new([Effect::Pure, Effect::Network]));
let code = r#"
import socket, json
def resolve(host):
try:
socket.gethostbyname(host)
return {"ok": True}
except OSError as e:
return {"ok": False, "errno": e.errno, "msg": str(e)}
print(json.dumps({
"localhost": resolve("localhost"),
"example_com": resolve("example.com"),
}))
"#;
let result = run_attack(&bwrap, &python, &policy, code);
assert_ran(&result);
assert_eq!(
result.get("localhost").and_then(|v| v.get("ok")),
Some(&json!(true)),
"localhost failed to resolve inside sandbox — NSS config \
(/etc/hosts, /etc/nsswitch.conf) not correctly bound: {result}"
);
if result.get("example_com").and_then(|v| v.get("ok")) != Some(&json!(true)) {
eprintln!(
"network_allowed_when_effect_declared: example.com did not \
resolve — test host may be offline. localhost worked, so \
the sandbox's NSS wiring is verified. Result: {result}"
);
}
}
#[test]
fn cannot_read_etc_shadow() {
let Some((bwrap, python)) = skip_if_deps_missing() else {
return;
};
let policy = IsolationPolicy::from_effects(&EffectSet::pure());
let code = r#"
import json
try:
with open("/etc/shadow", "r") as f:
f.read()
print(json.dumps({"leaked": True}))
except (FileNotFoundError, PermissionError) as e:
print(json.dumps({"leaked": False, "error": type(e).__name__}))
"#;
let result = run_attack(&bwrap, &python, &policy, code);
assert_ran(&result);
assert_eq!(
result.get("leaked"),
Some(&json!(false)),
"/etc/shadow readable inside sandbox — bind-mount policy too \
wide: {result}"
);
}
#[test]
fn cannot_read_host_ssh_keys() {
let Some((bwrap, python)) = skip_if_deps_missing() else {
return;
};
let policy = IsolationPolicy::from_effects(&EffectSet::pure());
let code = r#"
import os, json
candidates = [
os.path.expanduser("~/.ssh/id_rsa"),
os.path.expanduser("~/.ssh/id_ed25519"),
os.path.expanduser("~/.aws/credentials"),
]
leaked = []
for p in candidates:
try:
with open(p, "rb") as f:
f.read(1)
leaked.append(p)
except (FileNotFoundError, PermissionError, IsADirectoryError, NotADirectoryError):
pass
print(json.dumps({"leaked_paths": leaked}))
"#;
let result = run_attack(&bwrap, &python, &policy, code);
assert_ran(&result);
let leaked = result
.get("leaked_paths")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
assert!(
leaked.is_empty(),
"sandbox leaked host credentials: {leaked:?}; result: {result}"
);
}
#[test]
fn uid_mapped_to_nobody_inside_sandbox() {
let Some((bwrap, python)) = skip_if_deps_missing() else {
return;
};
let policy = IsolationPolicy::from_effects(&EffectSet::pure());
let code = r#"
import os, json
print(json.dumps({"uid": os.getuid(), "gid": os.getgid()}))
"#;
let result = run_attack(&bwrap, &python, &policy, code);
assert_ran(&result);
assert_eq!(
result.get("uid"),
Some(&json!(65534)),
"sandbox did not apply --uid 65534: {result}"
);
assert_eq!(
result.get("gid"),
Some(&json!(65534)),
"sandbox did not apply --gid 65534: {result}"
);
}
#[test]
fn cannot_escalate_to_root() {
let Some((bwrap, python)) = skip_if_deps_missing() else {
return;
};
let policy = IsolationPolicy::from_effects(&EffectSet::pure());
let code = r#"
import os, json
attempts = {}
try:
os.setuid(0)
attempts["setuid_0"] = True
except (PermissionError, OSError) as e:
attempts["setuid_0"] = False
attempts["setuid_error"] = type(e).__name__
try:
os.setgid(0)
attempts["setgid_0"] = True
except (PermissionError, OSError) as e:
attempts["setgid_0"] = False
try:
os.chroot("/")
attempts["chroot"] = True
except (PermissionError, OSError) as e:
attempts["chroot"] = False
print(json.dumps(attempts))
"#;
let result = run_attack(&bwrap, &python, &policy, code);
assert_ran(&result);
assert_eq!(
result.get("setuid_0"),
Some(&json!(false)),
"setuid(0) succeeded inside sandbox — capability drop misconfigured: {result}"
);
assert_eq!(
result.get("setgid_0"),
Some(&json!(false)),
"setgid(0) succeeded inside sandbox — capability drop misconfigured: {result}"
);
assert_eq!(
result.get("chroot"),
Some(&json!(false)),
"chroot succeeded inside sandbox — CAP_SYS_CHROOT not dropped: {result}"
);
}
#[test]
fn home_env_is_sandbox_consistent() {
let Some((bwrap, python)) = skip_if_deps_missing() else {
return;
};
let policy = IsolationPolicy::from_effects(&EffectSet::pure());
let code = r#"
import os, json
print(json.dumps({
"home": os.environ.get("HOME"),
"user": os.environ.get("USER"),
}))
"#;
let result = run_attack(&bwrap, &python, &policy, code);
assert_ran(&result);
assert_eq!(
result.get("home"),
Some(&json!("/work")),
"HOME inside sandbox leaked host value — `--setenv` wiring broken: {result}"
);
assert_eq!(
result.get("user"),
Some(&json!("nobody")),
"USER inside sandbox leaked host value: {result}"
);
}
#[test]
fn work_dir_is_private_tmpfs_and_empty() {
let Some((bwrap, python)) = skip_if_deps_missing() else {
return;
};
let policy = IsolationPolicy::from_effects(&EffectSet::pure());
let code = r#"
import os, json
entries = sorted(os.listdir("/work"))
with open("/work/scratch.txt", "w") as f:
f.write("ok")
with open("/work/scratch.txt") as f:
content = f.read()
print(json.dumps({"initial_entries": entries, "roundtrip": content}))
"#;
let result = run_attack(&bwrap, &python, &policy, code);
assert_ran(&result);
assert_eq!(
result.get("initial_entries"),
Some(&json!([])),
"/work was not empty at stage entry — state leak or stale tmpdir: {result}"
);
assert_eq!(
result.get("roundtrip"),
Some(&json!("ok")),
"/work not writable inside sandbox: {result}"
);
}