#![cfg(target_os = "linux")]
mod linux {
use std::fs::File;
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::process::Command;
fn unshare_net_available() -> bool {
Command::new("unshare")
.args(["-n", "/bin/true"])
.status()
.map(|s| s.success())
.unwrap_or(false)
}
fn supervisor_exe() -> PathBuf {
if let Some(p) = std::env::var_os("CARGO_BIN_EXE_cellos_supervisor") {
return PathBuf::from(p);
}
let root = Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.and_then(|p| p.parent())
.expect("cellos-supervisor crate under workspace root");
let profile = std::env::var("PROFILE").unwrap_or_else(|_| "debug".into());
root.join("target").join(profile).join("cellos-supervisor")
}
fn read_events(path: &Path) -> Vec<serde_json::Value> {
let f = match File::open(path) {
Ok(f) => f,
Err(_) => return Vec::new(),
};
let r = BufReader::new(f);
r.lines()
.flatten()
.filter(|l| !l.is_empty())
.filter_map(|l| serde_json::from_str(&l).ok())
.collect()
}
fn dns_query_events(events: &[serde_json::Value]) -> Vec<&serde_json::Value> {
events
.iter()
.filter(|e| {
e.get("type")
.and_then(|t| t.as_str())
.is_some_and(|t| t.ends_with(".dns_query"))
})
.collect()
}
fn workload_python_script() -> String {
r#"
import socket, struct, threading, sys, time
def make_a_query(qname, txn_id):
# Header: id, flags=0x0100 (RD), qdcount=1, others=0
h = struct.pack(">HHHHHH", txn_id, 0x0100, 1, 0, 0, 0)
q = b""
for label in qname.split("."):
q += bytes([len(label)]) + label.encode("ascii")
q += b"\x00"
q += struct.pack(">HH", 1, 1) # QTYPE=A, QCLASS=IN
return h + q
def make_a_response(query_bytes):
# Echo header but with QR=1, ANCOUNT=1; copy question; append A 1.2.3.4 RR.
txn_id = struct.unpack(">H", query_bytes[:2])[0]
flags = 0x8180 # QR=1, RD=1, RA=1, RCODE=0
h = struct.pack(">HHHHHH", txn_id, flags, 1, 1, 0, 0)
# Question section: header is 12 bytes; walk to end-of-name then +4.
idx = 12
while True:
b = query_bytes[idx]
if b == 0:
idx += 1
break
idx += 1 + b
idx += 4 # qtype + qclass
question = query_bytes[12:idx]
# Answer: pointer to QNAME (0xc00c), TYPE=A, CLASS=IN, TTL=300, RDLENGTH=4, RDATA=1.2.3.4
rr = b"\xc0\x0c" + struct.pack(">HHIH", 1, 1, 300, 4) + bytes([1, 2, 3, 4])
return h + question + rr
def upstream_stub(stop):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(("127.0.0.1", 53054))
s.settimeout(0.2)
while not stop.is_set():
try:
data, peer = s.recvfrom(2048)
except socket.timeout:
continue
try:
resp = make_a_response(data)
s.sendto(resp, peer)
except Exception as e:
print(f"[upstream-stub] error: {e}", flush=True)
s.close()
def query_proxy(qname, txn_id):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(2.0)
s.bind(("127.0.0.1", 0))
s.sendto(make_a_query(qname, txn_id), ("127.0.0.1", 53053))
data, _ = s.recvfrom(2048)
s.close()
rcode = data[3] & 0x0f
return rcode
stop = threading.Event()
upstream_thread = threading.Thread(target=upstream_stub, args=(stop,), daemon=True)
upstream_thread.start()
time.sleep(0.1) # let upstream stub bind
allow_rcode = query_proxy("api.example.com", 0xa1a1)
print(f"[workload] allow query api.example.com -> rcode={allow_rcode}", flush=True)
deny_rcode = query_proxy("evil.example.com", 0xb2b2)
print(f"[workload] deny query evil.example.com -> rcode={deny_rcode}", flush=True)
stop.set()
time.sleep(0.05)
# Allow path: NOERROR (0). Deny path: REFUSED (5).
if allow_rcode != 0:
print(f"FAIL: expected NOERROR for allow path, got rcode={allow_rcode}", flush=True)
sys.exit(2)
if deny_rcode != 5:
print(f"FAIL: expected REFUSED for deny path, got rcode={deny_rcode}", flush=True)
sys.exit(3)
print("OK", flush=True)
sys.exit(0)
"#
.to_string()
}
#[test]
#[ignore = "requires CAP_SYS_ADMIN for unshare(CLONE_NEWNET); CI runs with --ignored"]
fn dns_proxy_spawns_in_cell_netns_and_observes_allow_and_deny() {
if !unshare_net_available() {
return;
}
if Command::new("python3").arg("--version").status().is_err() {
return;
}
let tmp = tempfile::tempdir().expect("tempdir");
let spec_path = tmp.path().join("spec.json");
let jsonl_path = tmp.path().join("events.jsonl");
let workload_path = tmp.path().join("workload.py");
std::fs::write(&workload_path, workload_python_script()).expect("write workload");
let argv_json =
serde_json::to_string(&["/usr/bin/python3", workload_path.to_str().expect("utf-8")])
.expect("argv json");
let json = format!(
r#"{{
"apiVersion": "cellos.io/v1",
"kind": "ExecutionCell",
"spec": {{
"id": "seam1-phase2b-e2e",
"authority": {{
"secretRefs": [],
"egressRules": [
{{"host": "127.0.0.1", "port": 53053, "protocol": "udp"}},
{{"host": "127.0.0.1", "port": 53054, "protocol": "udp"}}
],
"dnsAuthority": {{
"hostnameAllowlist": ["api.example.com"],
"blockDirectWorkloadDns": true,
"resolvers": [
{{
"resolverId": "resolver-test-001",
"endpoint": "127.0.0.1:53053",
"protocol": "do53-udp"
}}
]
}}
}},
"lifetime": {{"ttlSeconds": 60}},
"run": {
"secretDelivery": "env",{
"argv": {argv_json},
"timeoutSeconds": 30
}}
}}
}}"#
);
let mut f = File::create(&spec_path).expect("create spec");
f.write_all(json.as_bytes()).expect("write spec");
drop(f);
let exe = supervisor_exe();
assert!(
exe.is_file(),
"supervisor binary missing at {}",
exe.display()
);
let status = Command::new(exe)
.env("CELLOS_DEPLOYMENT_PROFILE", "portable")
.env("CELLOS_CELL_BACKEND", "stub")
.env("CELLOS_JSONL_SINK_PATH", &jsonl_path)
.env("CELL_OS_USE_NOOP_SINK", "1")
.env("CELLOS_SUBPROCESS_UNSHARE", "net")
.env("CELLOS_DNS_PROXY", "1")
.env("CELLOS_DNS_PROXY_UPSTREAM_OVERRIDE", "127.0.0.1:53054")
.current_dir(env!("CARGO_MANIFEST_DIR"))
.arg(&spec_path)
.status()
.expect("spawn supervisor");
assert!(
status.success(),
"supervisor with DNS proxy spawn should succeed: {status:?}"
);
let events = read_events(&jsonl_path);
let dns_events = dns_query_events(&events);
assert!(
dns_events.len() >= 2,
"expected >=2 dns_query events (allow + deny), got {}: {:#?}",
dns_events.len(),
dns_events
);
let allow_event = dns_events.iter().find(|e| {
e.get("data")
.and_then(|d| d.get("queryName"))
.and_then(|q| q.as_str())
== Some("api.example.com")
});
let deny_event = dns_events.iter().find(|e| {
e.get("data")
.and_then(|d| d.get("queryName"))
.and_then(|q| q.as_str())
== Some("evil.example.com")
});
let allow_event = allow_event.expect("missing allow event for api.example.com");
let deny_event = deny_event.expect("missing deny event for evil.example.com");
assert_eq!(
allow_event["data"]["decision"], "allow",
"allow event decision wrong: {allow_event}"
);
assert_eq!(
allow_event["data"]["reasonCode"], "allowed_by_allowlist",
"allow event reasonCode wrong: {allow_event}"
);
assert_eq!(
allow_event["data"]["upstreamResolverId"], "resolver-test-001",
"allow event resolver id wrong: {allow_event}"
);
assert_eq!(
deny_event["data"]["decision"], "deny",
"deny event decision wrong: {deny_event}"
);
assert_eq!(
deny_event["data"]["reasonCode"], "denied_not_in_allowlist",
"deny event reasonCode wrong: {deny_event}"
);
assert_eq!(
deny_event["data"]["responseRcode"], 5,
"deny event response rcode wrong: {deny_event}"
);
}
#[test]
#[ignore = "requires CAP_SYS_ADMIN; CI runs with --ignored"]
fn dns_proxy_spawn_failure_emits_upstream_failure_event() {
if !unshare_net_available() {
return;
}
if Command::new("python3").arg("--version").status().is_err() {
return;
}
let tmp = tempfile::tempdir().expect("tempdir");
let spec_path = tmp.path().join("spec.json");
let jsonl_path = tmp.path().join("events.jsonl");
let argv_json = serde_json::to_string(&["/usr/bin/true"]).expect("argv");
let json = format!(
r#"{{
"apiVersion": "cellos.io/v1",
"kind": "ExecutionCell",
"spec": {{
"id": "seam1-phase2b-e2e-skip",
"authority": {{
"secretRefs": [],
"dnsAuthority": {{
"hostnameAllowlist": ["api.example.com"],
"resolvers": [
{{
"resolverId": "resolver-bad-001",
"endpoint": "not-an-ip:53",
"protocol": "do53-udp"
}}
]
}}
}},
"lifetime": {{"ttlSeconds": 60}},
"run": {
"secretDelivery": "env",{
"argv": {argv_json},
"timeoutSeconds": 10
}}
}}
}}"#
);
let mut f = File::create(&spec_path).expect("create spec");
f.write_all(json.as_bytes()).expect("write spec");
drop(f);
let exe = supervisor_exe();
let status = Command::new(exe)
.env("CELLOS_DEPLOYMENT_PROFILE", "portable")
.env("CELLOS_CELL_BACKEND", "stub")
.env("CELLOS_JSONL_SINK_PATH", &jsonl_path)
.env("CELL_OS_USE_NOOP_SINK", "1")
.env("CELLOS_SUBPROCESS_UNSHARE", "net")
.env("CELLOS_DNS_PROXY", "1")
.current_dir(env!("CARGO_MANIFEST_DIR"))
.arg(&spec_path)
.status()
.expect("spawn supervisor");
assert!(
status.success(),
"supervisor should succeed even when proxy activation skips"
);
let events = read_events(&jsonl_path);
let dns_events = dns_query_events(&events);
assert!(
dns_events.is_empty(),
"skipped activation must NOT emit dns_query events, got: {:#?}",
dns_events
);
}
}