use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use serde_json::json;
type ExportSinkBundle = (Arc<dyn ExportSink>, HashMap<String, Arc<dyn ExportSink>>);
#[derive(Debug)]
struct ConfigWarning {
var: &'static str,
value: String,
reason: String,
}
#[derive(Debug, Default)]
struct StartupConfigWarnings {
items: Vec<ConfigWarning>,
}
impl StartupConfigWarnings {
fn new() -> Self {
Self::default()
}
fn record(&mut self, var: &'static str, value: impl Into<String>, reason: impl Into<String>) {
let value = value.into();
let reason = reason.into();
tracing::warn!(
target: "cellos.supervisor.config",
var = %var,
value = %value,
reason = %reason,
"env-var fallback: ignored value, using default (set CELLOS_STRICT_CONFIG=1 to fail loud)"
);
self.items.push(ConfigWarning { var, value, reason });
}
fn finalize(self) -> anyhow::Result<()> {
if self.items.is_empty() {
return Ok(());
}
let strict = strict_config_enabled();
if !strict {
return Ok(());
}
let summary: Vec<String> = self
.items
.iter()
.map(|w| format!("{}={:?} ({})", w.var, w.value, w.reason))
.collect();
tracing::error!(
target: "cellos.supervisor.config",
strict = true,
count = self.items.len(),
ignored = ?summary,
"CELLOS_STRICT_CONFIG=1: refusing to start — one or more env vars were ignored or misconfigured during composition"
);
Err(anyhow::anyhow!(
"CELLOS_STRICT_CONFIG is set: refusing to start with {count} ignored/misconfigured env var(s): [{joined}]",
count = self.items.len(),
joined = summary.join("; ")
))
}
}
fn strict_config_enabled() -> bool {
std::env::var("CELLOS_STRICT_CONFIG")
.map(|v| {
let t = v.trim().to_ascii_lowercase();
matches!(t.as_str(), "1" | "true" | "yes" | "on")
})
.unwrap_or(false)
}
use cellos_broker_env::EnvSecretBroker;
use cellos_broker_file::FileSecretBroker;
use cellos_broker_oidc::GithubActionsOidcBroker;
use cellos_broker_vault::VaultAppRoleBroker;
use cellos_core::ports::{
CellBackend, EventSink, ExportSink, NoopEventSink, NoopExportSink, SecretBroker,
};
use cellos_core::{
redact_url_credentials_for_logs, validate_authorization_policy, validate_policy_pack_document,
AuthorizationPolicy, AuthorizationPolicyDocument, ExecutionCellDocument, ExportTarget,
PolicyPackDocument,
};
use cellos_export_http::HttpExportSink;
use cellos_export_local::LocalExportSink;
use cellos_export_s3::PresignedS3ExportSink;
use cellos_host_cellos::{MemorySecretBroker, ProprietaryCellBackend};
use cellos_host_firecracker::FirecrackerCellBackend;
use cellos_host_stub::StubCellBackend;
use cellos_sink_dlq::DlqSink;
use cellos_sink_jetstream::JetStreamEventSink;
use cellos_sink_jsonl::JsonlEventSink;
use cellos_sink_redact::RedactingEventSink;
use crate::event_signing::SigningEventSink;
use crate::spec_input::resolve_event_subject;
use crate::supervisor::Supervisor;
pub(crate) fn resolve_caller_identity() -> String {
std::env::var("CELLOS_CALLER_IDENTITY")
.ok()
.map(|v| v.trim().to_string())
.filter(|v| !v.is_empty())
.unwrap_or_else(|| "default".to_string())
}
#[allow(dead_code)]
fn selected_host_backend_kind() -> &'static str {
match std::env::var("CELLOS_CELL_BACKEND")
.unwrap_or_default()
.as_str()
{
"firecracker" => "firecracker",
"gvisor" => "gvisor",
"stub" => "stub",
_ => "proprietary",
}
}
#[allow(dead_code)]
fn selected_secret_broker_kind() -> &'static str {
match std::env::var("CELLOS_BROKER").unwrap_or_default().as_str() {
"env" => "env",
"file" => "file",
"github-oidc" => "github-oidc",
"vault-approle" => "vault-approle",
_ => "memory",
}
}
#[allow(dead_code)]
fn selected_default_export_sink_kind() -> &'static str {
if scoped_env("CELLOS_EXPORT_HTTP_BASE_URL", None).is_some() {
"http"
} else if std::env::var("CELLOS_EXPORT_DIR").is_ok() {
"local"
} else {
"noop"
}
}
#[allow(dead_code)]
pub(crate) fn build_validation_summary(
doc: &ExecutionCellDocument,
run_id: &str,
raw_spec_hash: &str,
spec_path: &Path,
) -> serde_json::Value {
let named_targets = doc
.spec
.export
.as_ref()
.and_then(|e| e.targets.as_ref())
.map(|targets| {
targets
.iter()
.map(|target| match target {
ExportTarget::Http(http) => json!({
"name": http.name,
"kind": "http",
}),
ExportTarget::S3(s3) => json!({
"name": s3.name,
"kind": "s3",
}),
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
json!({
"mode": "validate",
"specPath": spec_path.display().to_string(),
"specId": doc.spec.id,
"runId": run_id,
"specSha256": raw_spec_hash,
"deploymentProfile": std::env::var("CELLOS_DEPLOYMENT_PROFILE").unwrap_or_else(|_| "hardened".to_string()),
"hostBackend": selected_host_backend_kind(),
"secretBroker": selected_secret_broker_kind(),
"eventSink": {
"primary": if std::env::var("CELL_OS_USE_NOOP_SINK").is_ok() { "noop" } else { "jetstream-or-noop-fallback" },
"subject": resolve_event_subject(
&doc.spec.id,
run_id,
doc.spec.correlation.as_ref().and_then(|c| c.tenant_id.as_deref()),
),
"jsonlMirrorEnabled": std::env::var("CELL_OS_JSONL_EVENTS").is_ok(),
},
"export": {
"defaultSink": selected_default_export_sink_kind(),
"namedTargets": named_targets,
},
"policyPackConfigured": std::env::var("CELLOS_POLICY_PACK_PATH").ok().filter(|v| !v.trim().is_empty()).is_some(),
"authorityKeysConfigured": std::env::var("CELLOS_AUTHORITY_KEYS_PATH").ok().filter(|v| !v.trim().is_empty()).is_some(),
})
}
pub(crate) fn enforce_deployment_profile() -> anyhow::Result<()> {
let profile = std::env::var("CELLOS_DEPLOYMENT_PROFILE").unwrap_or_default();
if profile.eq_ignore_ascii_case("hardened") {
if std::env::var_os("CELLOS_AUTHORITY_KEYS_PATH").is_none() {
return Err(anyhow::anyhow!(
"CELLOS_DEPLOYMENT_PROFILE=hardened requires CELLOS_AUTHORITY_KEYS_PATH to be set"
));
}
if std::env::var("CELL_OS_REQUIRE_JETSTREAM").is_err() {
std::env::set_var("CELL_OS_REQUIRE_JETSTREAM", "1");
tracing::info!(
target: "cellos.supervisor.profile",
"hardened profile: CELL_OS_REQUIRE_JETSTREAM auto-enabled"
);
}
let scoped_value = std::env::var("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS")
.ok()
.map(|v| v.trim().to_ascii_lowercase());
let is_explicit_permissive = matches!(
scoped_value.as_deref(),
Some("0") | Some("false") | Some("no") | Some("off")
);
if scoped_value.is_none() || is_explicit_permissive {
std::env::set_var("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS", "1");
if is_explicit_permissive {
tracing::warn!(
target: "cellos.supervisor.profile",
"hardened profile: CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS permissive opt-out OVERRIDDEN — strict mode required under hardened"
);
} else {
tracing::info!(
target: "cellos.supervisor.profile",
"hardened profile: CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS auto-enabled"
);
}
}
if std::env::var("CELLOS_REQUIRE_AUTHORITY_DERIVATION").is_err() {
std::env::set_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION", "1");
tracing::info!(
target: "cellos.supervisor.profile",
"hardened profile: CELLOS_REQUIRE_AUTHORITY_DERIVATION auto-enabled"
);
}
if std::env::var("CELLOS_REQUIRE_TELEMETRY_DECLARED").is_err() {
std::env::set_var("CELLOS_REQUIRE_TELEMETRY_DECLARED", "1");
tracing::info!(
target: "cellos.supervisor.profile",
"hardened profile: CELLOS_REQUIRE_TELEMETRY_DECLARED auto-enabled"
);
}
if std::env::var("CELLOS_STRICT_CONFIG").is_err() {
std::env::set_var("CELLOS_STRICT_CONFIG", "1");
tracing::info!(
target: "cellos.supervisor.profile",
"hardened profile: CELLOS_STRICT_CONFIG auto-enabled (env-var typos fail-loud)"
);
}
tracing::info!(
target: "cellos.supervisor.profile",
profile = "hardened",
"deployment profile enforced: authority keys required, JetStream fail-closed, scoped derivation tokens required, authority derivation required for specs declaring egress or secrets, telemetry block required, strict config fail-closed"
);
}
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum DeploymentMode {
Hardened,
Portable,
}
impl DeploymentMode {
fn as_str(self) -> &'static str {
match self {
DeploymentMode::Hardened => "Hardened",
DeploymentMode::Portable => "Portable",
}
}
}
pub(crate) fn resolve_deployment_mode() -> anyhow::Result<DeploymentMode> {
let raw = std::env::var("CELLOS_DEPLOYMENT_PROFILE").unwrap_or_default();
let normalised = raw.trim().to_ascii_lowercase();
match normalised.as_str() {
"" | "hardened" => Ok(DeploymentMode::Hardened),
"portable" => Ok(DeploymentMode::Portable),
other => Err(anyhow::anyhow!(
"CELLOS_DEPLOYMENT_PROFILE: unknown profile {other:?} — expected 'hardened' or 'portable'"
)),
}
}
pub(crate) fn probe_missing_isolation_primitives() -> Vec<&'static str> {
if let Some(forced) = forced_missing_primitives_from_env() {
return forced;
}
let mut missing: Vec<&'static str> = Vec::new();
#[cfg(target_os = "linux")]
{
if !std::path::Path::new("/proc/self/ns/user").exists() {
missing.push("unshare");
}
if !std::path::Path::new("/sys/fs/cgroup/cgroup.controllers").exists() {
missing.push("cgroup-v2");
}
let seccomp_supported = std::fs::read_to_string("/proc/self/status")
.map(|s| s.lines().any(|l| l.starts_with("Seccomp:")))
.unwrap_or(false);
if !seccomp_supported {
missing.push("seccomp");
}
}
#[cfg(not(target_os = "linux"))]
{
missing.push("unshare");
missing.push("seccomp");
missing.push("cgroup-v2");
}
missing
}
fn forced_missing_primitives_from_env() -> Option<Vec<&'static str>> {
let raw = std::env::var("CELLOS_TEST_FORCE_MISSING_ISOLATION_PRIMITIVES").ok()?;
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
let mut out: Vec<&'static str> = Vec::new();
for token in trimmed.split(',') {
let normalised = token.trim().to_ascii_lowercase();
match normalised.as_str() {
"" => continue,
"all" | "*" => {
out.clear();
out.push("unshare");
out.push("seccomp");
out.push("cgroup-v2");
return Some(out);
}
"unshare" => {
if !out.contains(&"unshare") {
out.push("unshare");
}
}
"seccomp" => {
if !out.contains(&"seccomp") {
out.push("seccomp");
}
}
"cgroup-v2" | "cgroup_v2" | "cgroupv2" => {
if !out.contains(&"cgroup-v2") {
out.push("cgroup-v2");
}
}
other => {
tracing::trace!(
target: "cellos.supervisor.profile",
token = %other,
"CELLOS_TEST_FORCE_MISSING_ISOLATION_PRIMITIVES: ignoring unknown token"
);
}
}
}
Some(out)
}
pub(crate) fn enforce_isolation_default(mode: DeploymentMode) -> anyhow::Result<()> {
let missing = probe_missing_isolation_primitives();
match mode {
DeploymentMode::Hardened => {
if std::env::var("CELLOS_REQUIRE_ISOLATION").is_err() {
std::env::set_var("CELLOS_REQUIRE_ISOLATION", "1");
}
if !missing.is_empty() {
tracing::error!(
target: "cellos.supervisor.profile",
event = "host_capability_check_failed",
profile = "hardened",
missing = ?missing,
"hardened profile: host is missing isolation primitives — refusing to start"
);
return Err(anyhow::anyhow!(
"CELLOS_DEPLOYMENT_PROFILE=hardened (1.0 default): host is missing isolation \
primitives: {missing:?}. Set CELLOS_DEPLOYMENT_PROFILE=portable to run \
without isolation enforcement (audit-trail-only mode)."
));
}
}
DeploymentMode::Portable => {
if !missing.is_empty() {
tracing::warn!(
target: "cellos.supervisor.profile",
profile = "portable",
missing = ?missing,
"portable profile: isolation primitives missing — degraded posture, \
CELLOS_REQUIRE_ISOLATION not enforced"
);
} else {
tracing::info!(
target: "cellos.supervisor.profile",
profile = "portable",
"portable profile: isolation primitives available but enforcement disabled \
by operator opt-out"
);
}
}
}
Ok(())
}
pub(crate) fn emit_startup_banner(mode: DeploymentMode) {
let (label, summary) = match mode {
DeploymentMode::Hardened => (
"Hardened",
"isolation enforced (unshare/seccomp/cgroup v2 required) — 1.0 default",
),
DeploymentMode::Portable => (
"Portable",
"isolation NOT enforced — explicit operator opt-out via CELLOS_DEPLOYMENT_PROFILE=portable",
),
};
tracing::info!(
target: "cellos.supervisor.profile",
event = "startup_banner",
profile = mode.as_str(),
"[cellos-supervisor] startup-profile: {label} ({summary})"
);
}
pub(crate) fn enforce_authority_derivation_requirement(
doc: &ExecutionCellDocument,
) -> anyhow::Result<()> {
if std::env::var("CELLOS_REQUIRE_AUTHORITY_DERIVATION").is_err() {
return Ok(());
}
let authority = &doc.spec.authority;
let has_egress = authority
.egress_rules
.as_ref()
.is_some_and(|r| !r.is_empty());
let has_secrets = authority
.secret_refs
.as_ref()
.is_some_and(|r| !r.is_empty());
let has_authority = has_egress || has_secrets;
let has_token = authority.authority_derivation.is_some();
if has_authority && !has_token {
return Err(anyhow::anyhow!(
"CELLOS_REQUIRE_AUTHORITY_DERIVATION is set: spec declares authority \
(egressRules or secretRefs) without an authorityDerivation token — \
cryptographic authority proof is required"
));
}
Ok(())
}
pub(crate) fn enforce_telemetry_declared(doc: &ExecutionCellDocument) -> anyhow::Result<()> {
if std::env::var("CELLOS_REQUIRE_TELEMETRY_DECLARED").is_err() {
return Ok(());
}
if doc.spec.telemetry.is_none() {
return Err(anyhow::anyhow!(
"CELLOS_REQUIRE_TELEMETRY_DECLARED is set: spec.telemetry block is required — \
every admitted cell must declare its telemetry intent (channel, events, \
agent version) under hardened deployments"
));
}
Ok(())
}
pub(crate) fn enforce_kubernetes_namespace_placement(
doc: &ExecutionCellDocument,
) -> anyhow::Result<()> {
let Ok(runner_ns) = std::env::var("CELLOS_K8S_NAMESPACE") else {
return Ok(());
};
let runner_ns = runner_ns.trim();
if runner_ns.is_empty() {
return Ok(());
}
let Some(placement) = &doc.spec.placement else {
return Ok(());
};
let Some(spec_ns) = placement.kubernetes_namespace.as_deref() else {
return Ok(());
};
if spec_ns != runner_ns {
return Err(anyhow::anyhow!(
"CELLOS_K8S_NAMESPACE={runner_ns}: spec.placement.kubernetesNamespace={spec_ns} \
does not match — refusing to admit cell {spec_id}",
spec_id = doc.spec.id,
));
}
Ok(())
}
pub(crate) fn load_authority_keys() -> anyhow::Result<HashMap<String, String>> {
let Some(path) = std::env::var_os("CELLOS_AUTHORITY_KEYS_PATH") else {
return Ok(HashMap::new());
};
#[cfg(unix)]
let raw = {
use std::io::Read;
use std::os::unix::fs::OpenOptionsExt;
let mut opts = std::fs::OpenOptions::new();
opts.read(true);
opts.custom_flags(libc::O_RDONLY | libc::O_NOFOLLOW);
let mut file = opts.open(&path).map_err(|e| {
anyhow::anyhow!(
"CELLOS_AUTHORITY_KEYS_PATH: cannot read '{}': {e}",
path.to_string_lossy()
)
})?;
let mut buf = String::new();
file.read_to_string(&mut buf).map_err(|e| {
anyhow::anyhow!(
"CELLOS_AUTHORITY_KEYS_PATH: cannot read '{}': {e}",
path.to_string_lossy()
)
})?;
buf
};
#[cfg(not(unix))]
let raw = std::fs::read_to_string(&path).map_err(|e| {
anyhow::anyhow!(
"CELLOS_AUTHORITY_KEYS_PATH: cannot read '{}': {e}",
path.to_string_lossy()
)
})?;
let keys: HashMap<String, String> = serde_json::from_str(&raw).map_err(|e| {
anyhow::anyhow!(
"CELLOS_AUTHORITY_KEYS_PATH: JSON parse error in '{}': {e}",
path.to_string_lossy()
)
})?;
tracing::info!(
path = %path.to_string_lossy(),
role_count = keys.len(),
"authority verifying keys loaded"
);
Ok(keys)
}
pub(crate) async fn build_supervisor(
doc: &ExecutionCellDocument,
run_id: &str,
authority_keys: Arc<HashMap<String, String>>,
) -> anyhow::Result<Supervisor> {
let caller_identity = resolve_caller_identity();
tracing::debug!(
target: "cellos.supervisor.rbac",
caller_identity = %caller_identity,
"A2-02: resolved caller identity for secretRef admission"
);
let mut warnings = StartupConfigWarnings::new();
let event_sink = build_primary_event_sink(doc, run_id, &mut warnings).await?;
let host = build_host_backend(&mut warnings, Some(Arc::clone(&event_sink)))?;
let broker = build_secret_broker(&mut warnings)?;
let jsonl_sink = build_jsonl_sink();
let (default_export_sink, named_export_sinks) = build_export_sinks(doc, &mut warnings)?;
let mut supervisor = Supervisor::new(
host,
broker,
event_sink,
jsonl_sink,
default_export_sink,
named_export_sinks,
authority_keys,
);
let require_trust_verify_keys = trust_verify_keys_required();
let trust_keys =
crate::trust_keyset_load::load_trust_verify_keys_from_env(require_trust_verify_keys)?;
supervisor.trust_verify_keys = Arc::clone(&trust_keys);
let outcome = crate::trust_keyset_load::load_and_verify_trust_keyset_from_env(
trust_keys.as_ref(),
require_trust_verify_keys,
std::time::SystemTime::now(),
)?;
crate::trust_keyset_load::emit_keyset_outcome(
&outcome,
&supervisor.event_sink,
supervisor.jsonl_sink.as_ref(),
chrono::Utc::now(),
)
.await?;
if let Some(path) = std::env::var_os("CELLOS_POLICY_PACK_PATH") {
#[cfg(unix)]
let raw = {
use std::io::Read;
use std::os::unix::fs::OpenOptionsExt;
let mut opts = std::fs::OpenOptions::new();
opts.read(true);
opts.custom_flags(libc::O_RDONLY | libc::O_NOFOLLOW);
let mut file = opts.open(&path).map_err(|e| {
anyhow::anyhow!(
"CELLOS_POLICY_PACK_PATH: cannot read '{}': {e}",
path.to_string_lossy()
)
})?;
let mut buf = String::new();
file.read_to_string(&mut buf).map_err(|e| {
anyhow::anyhow!(
"CELLOS_POLICY_PACK_PATH: cannot read '{}': {e}",
path.to_string_lossy()
)
})?;
buf
};
#[cfg(not(unix))]
let raw = std::fs::read_to_string(&path).map_err(|e| {
anyhow::anyhow!(
"CELLOS_POLICY_PACK_PATH: cannot read '{}': {e}",
path.to_string_lossy()
)
})?;
let pack_doc: PolicyPackDocument = serde_json::from_str(&raw).map_err(|e| {
anyhow::anyhow!(
"CELLOS_POLICY_PACK_PATH: JSON parse error in '{}': {e}",
path.to_string_lossy()
)
})?;
validate_policy_pack_document(&pack_doc)
.map_err(|e| {
anyhow::anyhow!(
"CELLOS_POLICY_PACK_PATH: invalid policy pack in '{}': {e}",
path.to_string_lossy()
)
})
.or_else(|err| {
let msg = err.to_string();
let is_version_floor_err = msg.contains("is older than runtime-supported floor");
if !is_version_floor_err {
return Err(err);
}
let allow_downgrade = std::env::var(cellos_core::POLICY_ALLOW_DOWNGRADE_ENV)
.map(|v| {
let t = v.trim().to_ascii_lowercase();
matches!(t.as_str(), "1" | "true" | "yes" | "on")
})
.unwrap_or(false);
cellos_core::check_policy_pack_version_compatibility(
pack_doc.spec.version.as_deref(),
allow_downgrade,
)
.map_err(|e| {
anyhow::anyhow!(
"CELLOS_POLICY_PACK_PATH: invalid policy pack in '{}': {e}",
path.to_string_lossy()
)
})
})?;
tracing::info!(
pack_id = %pack_doc.spec.id,
path = %path.to_string_lossy(),
"policy pack loaded"
);
supervisor.policy_pack = Some(pack_doc.spec);
}
supervisor.authz_policy = load_authz_policy()?;
warnings.finalize()?;
Ok(supervisor)
}
pub(crate) fn load_authz_policy() -> anyhow::Result<Option<AuthorizationPolicy>> {
let Some(path) = std::env::var_os("CELLOS_AUTHZ_POLICY_PATH") else {
return Ok(None);
};
#[cfg(unix)]
let raw = {
use std::io::Read;
use std::os::unix::fs::OpenOptionsExt;
let mut opts = std::fs::OpenOptions::new();
opts.read(true);
opts.custom_flags(libc::O_RDONLY | libc::O_NOFOLLOW);
let mut file = opts.open(&path).map_err(|e| {
anyhow::anyhow!(
"CELLOS_AUTHZ_POLICY_PATH: cannot read '{}': {e}",
path.to_string_lossy()
)
})?;
let mut buf = String::new();
file.read_to_string(&mut buf).map_err(|e| {
anyhow::anyhow!(
"CELLOS_AUTHZ_POLICY_PATH: cannot read '{}': {e}",
path.to_string_lossy()
)
})?;
buf
};
#[cfg(not(unix))]
let raw = std::fs::read_to_string(&path).map_err(|e| {
anyhow::anyhow!(
"CELLOS_AUTHZ_POLICY_PATH: cannot read '{}': {e}",
path.to_string_lossy()
)
})?;
let doc: AuthorizationPolicyDocument = serde_json::from_str(&raw).map_err(|e| {
anyhow::anyhow!(
"CELLOS_AUTHZ_POLICY_PATH: JSON parse error in '{}': {e}",
path.to_string_lossy()
)
})?;
validate_authorization_policy(&doc).map_err(|e| {
anyhow::anyhow!(
"CELLOS_AUTHZ_POLICY_PATH: invalid authorization policy in '{}': {e}",
path.to_string_lossy()
)
})?;
tracing::info!(
target: "cellos.supervisor.rbac",
path = %path.to_string_lossy(),
subjects = doc.spec.subjects.len(),
allowed_pools = doc.spec.allowed_pools.len(),
allowed_packs = doc.spec.allowed_policy_packs.len(),
max_cells_per_hour = ?doc.spec.max_cells_per_hour,
"T12: authorization policy loaded"
);
Ok(Some(doc.spec))
}
fn trust_verify_keys_required() -> bool {
std::env::var("CELLOS_REQUIRE_TRUST_VERIFY_KEYS")
.map(|v| {
let t = v.trim().to_ascii_lowercase();
matches!(t.as_str(), "1" | "true" | "yes" | "on")
})
.unwrap_or(false)
}
fn build_host_backend(
warnings: &mut StartupConfigWarnings,
event_sink: Option<Arc<dyn EventSink>>,
) -> anyhow::Result<Arc<dyn CellBackend>> {
let raw = std::env::var("CELLOS_CELL_BACKEND").unwrap_or_default();
let backend: Arc<dyn CellBackend> = match raw.as_str() {
"firecracker" => {
let mut backend = FirecrackerCellBackend::from_env()
.map_err(|e| anyhow::anyhow!("firecracker backend init: {e}"))?;
if let Some(sink) = event_sink.clone() {
backend = backend.with_event_sink(sink);
}
tracing::info!(
target: "cellos.supervisor.host",
backend_kind = "firecracker",
jailer_configured = backend.config().jailer_binary_path.is_some(),
chroot_base = %backend.config().chroot_base_dir.display(),
"host backend selected"
);
#[cfg(target_os = "linux")]
{
let backend_for_fill: Arc<FirecrackerCellBackend> = Arc::new(backend);
let pool_size_env = std::env::var(cellos_host_firecracker::pool::POOL_SIZE_ENV)
.ok()
.and_then(|v| v.trim().parse::<usize>().ok())
.unwrap_or(0);
if pool_size_env > 0 {
let bg = Arc::clone(&backend_for_fill);
tokio::spawn(async move {
tracing::info!(
target: "cellos.supervisor.host",
pool_size = pool_size_env,
"warm pool fill: starting background fill task"
);
bg.fill_pool().await;
tracing::info!(
target: "cellos.supervisor.host",
available = bg.pool_available().await,
"warm pool fill: background fill task complete"
);
});
}
backend_for_fill as Arc<dyn CellBackend>
}
#[cfg(not(target_os = "linux"))]
{
Arc::new(backend) as Arc<dyn CellBackend>
}
}
"stub" => {
tracing::info!(
target: "cellos.supervisor.host",
backend_kind = "stub",
"host backend selected"
);
Arc::new(StubCellBackend)
}
"gvisor" => {
#[cfg(target_os = "linux")]
{
use cellos_host_gvisor::GVisorCellBackend;
tracing::info!(
target: "cellos.supervisor.host",
backend_kind = "gvisor",
"host backend selected"
);
Arc::new(GVisorCellBackend::new()) as Arc<dyn CellBackend>
}
#[cfg(not(target_os = "linux"))]
{
anyhow::bail!(
"CELLOS_CELL_BACKEND=gvisor requires Linux (runsc is Linux-only); \
unset the variable or choose `stub` on non-Linux hosts"
);
}
}
"" => {
tracing::info!(
target: "cellos.supervisor.host",
backend_kind = "proprietary",
"host backend selected"
);
Arc::new(ProprietaryCellBackend::new())
}
_ => {
warnings.record(
"CELLOS_CELL_BACKEND",
raw.clone(),
"value didn't match enum {firecracker, gvisor, stub}; falling back to proprietary backend",
);
tracing::info!(
target: "cellos.supervisor.host",
backend_kind = "proprietary",
"host backend selected"
);
Arc::new(ProprietaryCellBackend::new())
}
};
Ok(backend)
}
fn build_secret_broker(
warnings: &mut StartupConfigWarnings,
) -> anyhow::Result<Arc<dyn SecretBroker>> {
let raw = std::env::var("CELLOS_BROKER").unwrap_or_default();
let broker: Arc<dyn SecretBroker> = match raw.as_str() {
"env" => Arc::new(EnvSecretBroker::new()),
"file" => Arc::new(FileSecretBroker::new()),
"github-oidc" => Arc::new(
GithubActionsOidcBroker::new()
.map_err(|e| anyhow::anyhow!("github-oidc broker init: {e}"))?,
),
"vault-approle" => Arc::new(
VaultAppRoleBroker::from_env()
.map_err(|e| anyhow::anyhow!("vault-approle broker init: {e}"))?,
),
"" => Arc::new(MemorySecretBroker::new()),
_ => {
warnings.record(
"CELLOS_BROKER",
raw.clone(),
"value didn't match enum {env, file, github-oidc, vault-approle}; falling back to in-memory broker",
);
Arc::new(MemorySecretBroker::new())
}
};
Ok(broker)
}
async fn build_primary_event_sink(
doc: &ExecutionCellDocument,
run_id: &str,
warnings: &mut StartupConfigWarnings,
) -> anyhow::Result<Arc<dyn EventSink>> {
let event_sink: Arc<dyn EventSink> = if std::env::var("CELL_OS_USE_NOOP_SINK").is_ok() {
tracing::info!(
target: "cellos.supervisor.observability",
sink_kind = "noop",
sink_reason = "CELL_OS_USE_NOOP_SINK",
"primary event sink is noop (explicit env)"
);
Arc::new(NoopEventSink)
} else {
let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://127.0.0.1:4222".into());
let nats_url_log = redact_url_credentials_for_logs(&nats_url);
let subject = resolve_event_subject(
&doc.spec.id,
run_id,
doc.spec
.correlation
.as_ref()
.and_then(|c| c.tenant_id.as_deref()),
);
let nats_ca_file = std::env::var("NATS_CA_FILE")
.ok()
.filter(|s| !s.trim().is_empty())
.map(std::path::PathBuf::from);
let nats_ca_path = nats_ca_file.as_deref();
match JetStreamEventSink::connect_with_root_ca(&nats_url, subject, nats_ca_path).await {
Ok(sink) => {
tracing::info!(
target: "cellos.supervisor.observability",
sink_kind = "jetstream",
nats_url = %nats_url_log,
"JetStream event sink connected"
);
Arc::new(sink)
}
Err(e) => {
if std::env::var("CELL_OS_REQUIRE_JETSTREAM").is_ok() {
return Err(anyhow::anyhow!(
"CELL_OS_REQUIRE_JETSTREAM is set but JetStream connect failed (broker {nats_url_log}): {e}"
));
}
tracing::warn!(
target: "cellos.supervisor.observability",
sink_kind = "noop",
sink_reason = "jetstream_connect_failed",
error = %e,
nats_url = %nats_url_log,
"JetStream unavailable; using noop event sink"
);
warnings.record(
"NATS_URL",
nats_url_log.to_string(),
format!(
"JetStream connect failed and CELL_OS_REQUIRE_JETSTREAM is unset; \
primary event sink degraded to noop ({e})"
),
);
Arc::new(NoopEventSink)
}
}
};
let (signed_sink, signing_warnings) =
SigningEventSink::from_env_with_warnings(RedactingEventSink::from_env(event_sink));
for w in signing_warnings {
warnings.record(w.var, w.value, w.reason);
}
Ok(DlqSink::from_env(signed_sink))
}
fn build_jsonl_sink() -> Option<Arc<dyn EventSink>> {
std::env::var("CELL_OS_JSONL_EVENTS")
.ok()
.map(|path| -> Arc<dyn EventSink> {
DlqSink::from_env(SigningEventSink::from_env(RedactingEventSink::from_env(
Arc::new(JsonlEventSink::new(path)),
)))
})
}
fn build_export_sinks(
doc: &ExecutionCellDocument,
warnings: &mut StartupConfigWarnings,
) -> anyhow::Result<ExportSinkBundle> {
let cell_id = doc.spec.id.clone();
let named_export_sinks = build_named_export_sinks(doc, &cell_id, warnings)?;
let default_export_sink: Arc<dyn ExportSink> =
if scoped_env("CELLOS_EXPORT_HTTP_BASE_URL", None).is_some() {
build_http_transport_sink(&cell_id, None, warnings)?
} else if let Ok(root) = std::env::var("CELLOS_EXPORT_DIR") {
Arc::new(
LocalExportSink::new(root, &cell_id)
.map_err(|e| anyhow::anyhow!("CELLOS_EXPORT_DIR local sink: {e}"))?,
)
} else {
Arc::new(NoopExportSink)
};
Ok((default_export_sink, named_export_sinks))
}
fn export_target_env_suffix(target_name: &str) -> String {
target_name
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() {
c.to_ascii_uppercase()
} else {
'_'
}
})
.collect()
}
fn scoped_env(base: &str, target_name: Option<&str>) -> Option<String> {
if let Some(name) = target_name {
let scoped = format!("{base}__{}", export_target_env_suffix(name));
if let Ok(value) = std::env::var(&scoped) {
if !value.trim().is_empty() {
return Some(value);
}
}
}
std::env::var(base).ok().filter(|s| !s.trim().is_empty())
}
fn scoped_retry_attempts(base: &str, target_name: Option<&str>) -> anyhow::Result<usize> {
let Some(raw) = scoped_env(base, target_name) else {
return Ok(1);
};
let attempts = raw
.parse::<usize>()
.map_err(|e| anyhow::anyhow!("{base} must be an integer >= 1: {e}"))?;
if attempts == 0 {
return Err(anyhow::anyhow!("{base} must be >= 1"));
}
Ok(attempts)
}
fn scoped_retry_backoff_ms(base: &str, target_name: Option<&str>) -> anyhow::Result<u64> {
let Some(raw) = scoped_env(base, target_name) else {
return Ok(0);
};
raw.parse::<u64>()
.map_err(|e| anyhow::anyhow!("{base} must be an integer >= 0: {e}"))
}
fn build_http_transport_sink(
cell_id: &str,
target_name: Option<&str>,
warnings: &mut StartupConfigWarnings,
) -> anyhow::Result<Arc<dyn ExportSink>> {
let Some(base_url) = scoped_env("CELLOS_EXPORT_HTTP_BASE_URL", target_name) else {
if std::env::var("CELL_OS_REQUIRE_HTTP_EXPORT").is_ok() {
let scope = target_name
.map(|name| format!("target {name:?}"))
.unwrap_or_else(|| "default export sink".into());
return Err(anyhow::anyhow!(
"CELL_OS_REQUIRE_HTTP_EXPORT is set but no HTTP export base URL is configured for {scope}"
));
}
if let Some(name) = target_name {
tracing::warn!(target = %name, "HTTP export URL not configured; using noop sink");
warnings.record(
"CELLOS_EXPORT_HTTP_BASE_URL",
String::new(),
format!(
"missing companion var for HTTP export target {name:?}; degraded to noop sink"
),
);
} else {
tracing::warn!("HTTP export URL not configured; using noop sink");
warnings.record(
"CELLOS_EXPORT_HTTP_BASE_URL",
String::new(),
"missing companion var for default HTTP export sink; degraded to noop sink",
);
}
return Ok(Arc::new(NoopExportSink));
};
let bearer_token = scoped_env("CELLOS_EXPORT_HTTP_BEARER_TOKEN", target_name);
let max_attempts = scoped_retry_attempts("CELLOS_EXPORT_HTTP_MAX_ATTEMPTS", target_name)?;
let retry_backoff_ms =
scoped_retry_backoff_ms("CELLOS_EXPORT_HTTP_RETRY_BACKOFF_MS", target_name)?;
let sink = HttpExportSink::new(
base_url,
cell_id.to_string(),
bearer_token,
max_attempts,
retry_backoff_ms,
)
.map_err(|e| anyhow::anyhow!("HTTP export sink init: {e}"))?;
Ok(Arc::new(sink))
}
fn build_s3_transport_sink(
cell_id: &str,
target_name: &str,
bucket: &str,
key_prefix: Option<String>,
region: Option<String>,
warnings: &mut StartupConfigWarnings,
) -> anyhow::Result<Arc<dyn ExportSink>> {
let presigned_url = if let Some(url) =
scoped_env("CELLOS_EXPORT_S3_PRESIGNED_URL", Some(target_name))
{
Some(url)
} else if let Some(url) = scoped_env("CELLOS_EXPORT_HTTP_BASE_URL", Some(target_name)) {
tracing::warn!(
target = %target_name,
"using legacy CELLOS_EXPORT_HTTP_BASE_URL fallback for S3 target; prefer CELLOS_EXPORT_S3_PRESIGNED_URL"
);
warnings.record(
"CELLOS_EXPORT_HTTP_BASE_URL",
url.clone(),
format!(
"legacy alias used for S3 target {target_name:?}; prefer CELLOS_EXPORT_S3_PRESIGNED_URL"
),
);
Some(url)
} else {
None
};
let Some(presigned_url) = presigned_url else {
if std::env::var("CELL_OS_REQUIRE_S3_EXPORT").is_ok()
|| std::env::var("CELL_OS_REQUIRE_HTTP_EXPORT").is_ok()
{
return Err(anyhow::anyhow!(
"S3 export is required but no presigned URL is configured for target {target_name:?}"
));
}
tracing::warn!(
target = %target_name,
"S3 export URL not configured; using noop sink"
);
warnings.record(
"CELLOS_EXPORT_S3_PRESIGNED_URL",
String::new(),
format!(
"missing companion var for S3 export target {target_name:?}; degraded to noop sink"
),
);
return Ok(Arc::new(NoopExportSink));
};
let effective_region = scoped_env("CELLOS_EXPORT_S3_REGION", Some(target_name)).or(region);
if let Some(ref r) = effective_region {
tracing::info!(target = %target_name, region = %r, "S3 export target region");
}
let sink = PresignedS3ExportSink::new(
presigned_url,
cell_id.to_string(),
bucket.to_string(),
key_prefix,
Some(target_name.to_string()),
effective_region,
scoped_retry_attempts("CELLOS_EXPORT_S3_MAX_ATTEMPTS", Some(target_name))?,
scoped_retry_backoff_ms("CELLOS_EXPORT_S3_RETRY_BACKOFF_MS", Some(target_name))?,
)
.map_err(|e| anyhow::anyhow!("S3 export sink init: {e}"))?;
Ok(Arc::new(sink))
}
fn build_named_export_sinks(
doc: &ExecutionCellDocument,
cell_id: &str,
warnings: &mut StartupConfigWarnings,
) -> anyhow::Result<HashMap<String, Arc<dyn ExportSink>>> {
let mut named = HashMap::new();
let Some(export) = &doc.spec.export else {
return Ok(named);
};
let Some(targets) = &export.targets else {
return Ok(named);
};
for target in targets {
let sink: Arc<dyn ExportSink> = match target {
ExportTarget::Http(http) => {
tracing::info!(target = %http.name, "configuring HTTP export target");
build_http_transport_sink(cell_id, Some(&http.name), warnings)?
}
ExportTarget::S3(s3) => {
tracing::info!(
target = %s3.name,
bucket = %s3.bucket,
region = s3.region.as_deref().unwrap_or("(spec unset)"),
"configuring S3 export target"
);
build_s3_transport_sink(
cell_id,
&s3.name,
&s3.bucket,
s3.key_prefix.clone(),
s3.region.clone(),
warnings,
)?
}
};
named.insert(target.name().to_string(), sink);
}
Ok(named)
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use super::{
enforce_authority_derivation_requirement, enforce_deployment_profile,
enforce_isolation_default, export_target_env_suffix, forced_missing_primitives_from_env,
probe_missing_isolation_primitives, scoped_env, scoped_retry_attempts,
scoped_retry_backoff_ms, strict_config_enabled, DeploymentMode, StartupConfigWarnings,
};
use cellos_core::{
AuthorityBundle, AuthorityCapability, AuthorityDerivationToken, AuthoritySignature,
EgressRule, ExecutionCellDocument, ExecutionCellSpec, Lifetime, RoleId,
};
static ENV_MUTEX: Mutex<()> = Mutex::new(());
#[test]
fn scoped_env_prefers_target_specific_value() {
let _guard = ENV_MUTEX.lock().unwrap();
std::env::set_var(
"CELLOS_EXPORT_HTTP_BASE_URL",
"https://default.example/upload",
);
std::env::set_var(
"CELLOS_EXPORT_HTTP_BASE_URL__ARTIFACT_BUCKET",
"https://scoped.example/upload",
);
let value = scoped_env("CELLOS_EXPORT_HTTP_BASE_URL", Some("artifact-bucket"));
std::env::remove_var("CELLOS_EXPORT_HTTP_BASE_URL");
std::env::remove_var("CELLOS_EXPORT_HTTP_BASE_URL__ARTIFACT_BUCKET");
assert_eq!(value.as_deref(), Some("https://scoped.example/upload"));
}
#[test]
fn scoped_env_falls_back_to_unsuffixed_value() {
let _guard = ENV_MUTEX.lock().unwrap();
std::env::set_var(
"CELLOS_EXPORT_S3_PRESIGNED_URL",
"https://default.example/upload",
);
std::env::remove_var("CELLOS_EXPORT_S3_PRESIGNED_URL__ARTIFACT_BUCKET");
let value = scoped_env("CELLOS_EXPORT_S3_PRESIGNED_URL", Some("artifact-bucket"));
std::env::remove_var("CELLOS_EXPORT_S3_PRESIGNED_URL");
assert_eq!(value.as_deref(), Some("https://default.example/upload"));
}
#[test]
fn export_target_env_suffix_normalizes_name() {
assert_eq!(
export_target_env_suffix("artifact-bucket.eu-west-1"),
"ARTIFACT_BUCKET_EU_WEST_1"
);
}
#[test]
fn scoped_retry_attempts_prefers_target_specific_value() {
let _guard = ENV_MUTEX.lock().unwrap();
std::env::set_var("CELLOS_EXPORT_HTTP_MAX_ATTEMPTS", "2");
std::env::set_var("CELLOS_EXPORT_HTTP_MAX_ATTEMPTS__ARTIFACT_API", "4");
let attempts =
scoped_retry_attempts("CELLOS_EXPORT_HTTP_MAX_ATTEMPTS", Some("artifact-api"))
.expect("parse attempts");
std::env::remove_var("CELLOS_EXPORT_HTTP_MAX_ATTEMPTS");
std::env::remove_var("CELLOS_EXPORT_HTTP_MAX_ATTEMPTS__ARTIFACT_API");
assert_eq!(attempts, 4);
}
#[test]
fn scoped_retry_attempts_rejects_zero() {
let _guard = ENV_MUTEX.lock().unwrap();
std::env::set_var("CELLOS_EXPORT_S3_MAX_ATTEMPTS", "0");
let err = scoped_retry_attempts("CELLOS_EXPORT_S3_MAX_ATTEMPTS", Some("artifact-bucket"))
.expect_err("zero attempts should fail");
std::env::remove_var("CELLOS_EXPORT_S3_MAX_ATTEMPTS");
assert!(err.to_string().contains("must be >= 1"));
}
#[test]
fn scoped_retry_backoff_defaults_to_zero() {
let _guard = ENV_MUTEX.lock().unwrap();
std::env::remove_var("CELLOS_EXPORT_HTTP_RETRY_BACKOFF_MS");
let backoff =
scoped_retry_backoff_ms("CELLOS_EXPORT_HTTP_RETRY_BACKOFF_MS", Some("artifact-api"))
.expect("default backoff");
assert_eq!(backoff, 0);
}
#[test]
fn hardened_profile_requires_authority_keys_path() {
let _guard = ENV_MUTEX.lock().unwrap();
let prev_profile = std::env::var_os("CELLOS_DEPLOYMENT_PROFILE");
let prev_keys_path = std::env::var_os("CELLOS_AUTHORITY_KEYS_PATH");
let prev_require_js = std::env::var_os("CELL_OS_REQUIRE_JETSTREAM");
let prev_require_scoped = std::env::var_os("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS");
let prev_require_deriv = std::env::var_os("CELLOS_REQUIRE_AUTHORITY_DERIVATION");
let prev_require_telemetry = std::env::var_os("CELLOS_REQUIRE_TELEMETRY_DECLARED");
std::env::set_var("CELLOS_DEPLOYMENT_PROFILE", "hardened");
std::env::remove_var("CELLOS_AUTHORITY_KEYS_PATH");
let err = enforce_deployment_profile()
.expect_err("hardened profile without authority keys path must error");
let msg = err.to_string();
match prev_profile {
Some(v) => std::env::set_var("CELLOS_DEPLOYMENT_PROFILE", v),
None => std::env::remove_var("CELLOS_DEPLOYMENT_PROFILE"),
}
match prev_keys_path {
Some(v) => std::env::set_var("CELLOS_AUTHORITY_KEYS_PATH", v),
None => std::env::remove_var("CELLOS_AUTHORITY_KEYS_PATH"),
}
match prev_require_js {
Some(v) => std::env::set_var("CELL_OS_REQUIRE_JETSTREAM", v),
None => std::env::remove_var("CELL_OS_REQUIRE_JETSTREAM"),
}
match prev_require_scoped {
Some(v) => std::env::set_var("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS", v),
None => std::env::remove_var("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS"),
}
match prev_require_deriv {
Some(v) => std::env::set_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION", v),
None => std::env::remove_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION"),
}
match prev_require_telemetry {
Some(v) => std::env::set_var("CELLOS_REQUIRE_TELEMETRY_DECLARED", v),
None => std::env::remove_var("CELLOS_REQUIRE_TELEMETRY_DECLARED"),
}
assert!(
msg.contains("CELLOS_AUTHORITY_KEYS_PATH"),
"error message should mention CELLOS_AUTHORITY_KEYS_PATH; got: {msg}"
);
}
#[test]
fn hardened_profile_auto_enables_require_authority_derivation() {
let _guard = ENV_MUTEX.lock().unwrap();
let prev_profile = std::env::var_os("CELLOS_DEPLOYMENT_PROFILE");
let prev_keys_path = std::env::var_os("CELLOS_AUTHORITY_KEYS_PATH");
let prev_require_js = std::env::var_os("CELL_OS_REQUIRE_JETSTREAM");
let prev_require_scoped = std::env::var_os("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS");
let prev_require_deriv = std::env::var_os("CELLOS_REQUIRE_AUTHORITY_DERIVATION");
let prev_require_telemetry = std::env::var_os("CELLOS_REQUIRE_TELEMETRY_DECLARED");
std::env::set_var("CELLOS_DEPLOYMENT_PROFILE", "hardened");
std::env::set_var("CELLOS_AUTHORITY_KEYS_PATH", "/dev/null");
std::env::remove_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION");
std::env::remove_var("CELLOS_REQUIRE_TELEMETRY_DECLARED");
let result = enforce_deployment_profile();
let after = std::env::var("CELLOS_REQUIRE_AUTHORITY_DERIVATION").ok();
let after_telemetry = std::env::var("CELLOS_REQUIRE_TELEMETRY_DECLARED").ok();
match prev_profile {
Some(v) => std::env::set_var("CELLOS_DEPLOYMENT_PROFILE", v),
None => std::env::remove_var("CELLOS_DEPLOYMENT_PROFILE"),
}
match prev_keys_path {
Some(v) => std::env::set_var("CELLOS_AUTHORITY_KEYS_PATH", v),
None => std::env::remove_var("CELLOS_AUTHORITY_KEYS_PATH"),
}
match prev_require_js {
Some(v) => std::env::set_var("CELL_OS_REQUIRE_JETSTREAM", v),
None => std::env::remove_var("CELL_OS_REQUIRE_JETSTREAM"),
}
match prev_require_scoped {
Some(v) => std::env::set_var("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS", v),
None => std::env::remove_var("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS"),
}
match prev_require_deriv {
Some(v) => std::env::set_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION", v),
None => std::env::remove_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION"),
}
match prev_require_telemetry {
Some(v) => std::env::set_var("CELLOS_REQUIRE_TELEMETRY_DECLARED", v),
None => std::env::remove_var("CELLOS_REQUIRE_TELEMETRY_DECLARED"),
}
result.expect("hardened profile with keys path must succeed");
assert_eq!(
after.as_deref(),
Some("1"),
"hardened profile should auto-enable CELLOS_REQUIRE_AUTHORITY_DERIVATION"
);
assert_eq!(
after_telemetry.as_deref(),
Some("1"),
"hardened profile should auto-enable CELLOS_REQUIRE_TELEMETRY_DECLARED"
);
}
#[test]
fn hardened_profile_auto_enables_strict_config() {
let _guard = ENV_MUTEX.lock().unwrap();
let prev_profile = std::env::var_os("CELLOS_DEPLOYMENT_PROFILE");
let prev_keys_path = std::env::var_os("CELLOS_AUTHORITY_KEYS_PATH");
let prev_require_js = std::env::var_os("CELL_OS_REQUIRE_JETSTREAM");
let prev_require_scoped = std::env::var_os("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS");
let prev_require_deriv = std::env::var_os("CELLOS_REQUIRE_AUTHORITY_DERIVATION");
let prev_require_telemetry = std::env::var_os("CELLOS_REQUIRE_TELEMETRY_DECLARED");
let prev_strict = std::env::var_os("CELLOS_STRICT_CONFIG");
std::env::set_var("CELLOS_DEPLOYMENT_PROFILE", "hardened");
std::env::set_var("CELLOS_AUTHORITY_KEYS_PATH", "/dev/null");
std::env::remove_var("CELLOS_STRICT_CONFIG");
let result = enforce_deployment_profile();
let after_strict = std::env::var("CELLOS_STRICT_CONFIG").ok();
match prev_profile {
Some(v) => std::env::set_var("CELLOS_DEPLOYMENT_PROFILE", v),
None => std::env::remove_var("CELLOS_DEPLOYMENT_PROFILE"),
}
match prev_keys_path {
Some(v) => std::env::set_var("CELLOS_AUTHORITY_KEYS_PATH", v),
None => std::env::remove_var("CELLOS_AUTHORITY_KEYS_PATH"),
}
match prev_require_js {
Some(v) => std::env::set_var("CELL_OS_REQUIRE_JETSTREAM", v),
None => std::env::remove_var("CELL_OS_REQUIRE_JETSTREAM"),
}
match prev_require_scoped {
Some(v) => std::env::set_var("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS", v),
None => std::env::remove_var("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS"),
}
match prev_require_deriv {
Some(v) => std::env::set_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION", v),
None => std::env::remove_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION"),
}
match prev_require_telemetry {
Some(v) => std::env::set_var("CELLOS_REQUIRE_TELEMETRY_DECLARED", v),
None => std::env::remove_var("CELLOS_REQUIRE_TELEMETRY_DECLARED"),
}
match prev_strict {
Some(v) => std::env::set_var("CELLOS_STRICT_CONFIG", v),
None => std::env::remove_var("CELLOS_STRICT_CONFIG"),
}
result.expect("hardened profile with keys path must succeed");
assert_eq!(
after_strict.as_deref(),
Some("1"),
"hardened profile must auto-enable CELLOS_STRICT_CONFIG so env-var typos fail loud"
);
}
#[test]
fn hardened_profile_respects_explicit_strict_config_value() {
let _guard = ENV_MUTEX.lock().unwrap();
let prev_profile = std::env::var_os("CELLOS_DEPLOYMENT_PROFILE");
let prev_keys_path = std::env::var_os("CELLOS_AUTHORITY_KEYS_PATH");
let prev_require_js = std::env::var_os("CELL_OS_REQUIRE_JETSTREAM");
let prev_require_scoped = std::env::var_os("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS");
let prev_require_deriv = std::env::var_os("CELLOS_REQUIRE_AUTHORITY_DERIVATION");
let prev_require_telemetry = std::env::var_os("CELLOS_REQUIRE_TELEMETRY_DECLARED");
let prev_strict = std::env::var_os("CELLOS_STRICT_CONFIG");
std::env::set_var("CELLOS_DEPLOYMENT_PROFILE", "hardened");
std::env::set_var("CELLOS_AUTHORITY_KEYS_PATH", "/dev/null");
std::env::set_var("CELLOS_STRICT_CONFIG", "0");
let result = enforce_deployment_profile();
let after_strict = std::env::var("CELLOS_STRICT_CONFIG").ok();
match prev_profile {
Some(v) => std::env::set_var("CELLOS_DEPLOYMENT_PROFILE", v),
None => std::env::remove_var("CELLOS_DEPLOYMENT_PROFILE"),
}
match prev_keys_path {
Some(v) => std::env::set_var("CELLOS_AUTHORITY_KEYS_PATH", v),
None => std::env::remove_var("CELLOS_AUTHORITY_KEYS_PATH"),
}
match prev_require_js {
Some(v) => std::env::set_var("CELL_OS_REQUIRE_JETSTREAM", v),
None => std::env::remove_var("CELL_OS_REQUIRE_JETSTREAM"),
}
match prev_require_scoped {
Some(v) => std::env::set_var("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS", v),
None => std::env::remove_var("CELLOS_REQUIRE_SCOPED_DERIVATION_TOKENS"),
}
match prev_require_deriv {
Some(v) => std::env::set_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION", v),
None => std::env::remove_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION"),
}
match prev_require_telemetry {
Some(v) => std::env::set_var("CELLOS_REQUIRE_TELEMETRY_DECLARED", v),
None => std::env::remove_var("CELLOS_REQUIRE_TELEMETRY_DECLARED"),
}
match prev_strict {
Some(v) => std::env::set_var("CELLOS_STRICT_CONFIG", v),
None => std::env::remove_var("CELLOS_STRICT_CONFIG"),
}
result.expect("hardened profile with keys path must succeed");
assert_eq!(
after_strict.as_deref(),
Some("0"),
"explicit operator CELLOS_STRICT_CONFIG=0 must be preserved (auto-enable is opt-out)"
);
}
fn doc_with_authority(bundle: AuthorityBundle) -> ExecutionCellDocument {
ExecutionCellDocument {
api_version: "cellos.io/v1".into(),
kind: "ExecutionCell".into(),
spec: ExecutionCellSpec {
id: "seam3-test".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: bundle,
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
},
}
}
fn stub_token() -> AuthorityDerivationToken {
AuthorityDerivationToken {
role_root: RoleId("role-test".into()),
parent_run_id: Some("run-seam3".into()),
derivation_steps: vec![],
leaf_capability: AuthorityCapability {
egress_rules: vec![],
secret_refs: vec![],
},
grantor_signature: AuthoritySignature {
algorithm: "ed25519".into(),
bytes: "AA==".into(),
},
}
}
fn with_require_authority_derivation<F: FnOnce() -> R, R>(set: bool, f: F) -> R {
let prev = std::env::var_os("CELLOS_REQUIRE_AUTHORITY_DERIVATION");
if set {
std::env::set_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION", "1");
} else {
std::env::remove_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION");
}
let out = f();
match prev {
Some(v) => std::env::set_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION", v),
None => std::env::remove_var("CELLOS_REQUIRE_AUTHORITY_DERIVATION"),
}
out
}
#[test]
fn require_authority_derivation_passes_when_flag_unset() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_authority(AuthorityBundle {
secret_refs: Some(vec!["MY_SECRET".into()]),
..AuthorityBundle::default()
});
with_require_authority_derivation(false, || {
enforce_authority_derivation_requirement(&doc)
.expect("flag unset → spec without token must pass");
});
}
#[test]
fn require_authority_derivation_rejects_secret_refs_without_token() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_authority(AuthorityBundle {
secret_refs: Some(vec!["MY_SECRET".into()]),
..AuthorityBundle::default()
});
let err = with_require_authority_derivation(true, || {
enforce_authority_derivation_requirement(&doc)
.expect_err("flag set + authority + no token must fail")
});
let msg = err.to_string();
assert!(
msg.contains("authorityDerivation"),
"error must mention authorityDerivation; got: {msg}"
);
}
#[test]
fn require_authority_derivation_rejects_egress_rules_without_token() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_authority(AuthorityBundle {
egress_rules: Some(vec![EgressRule {
host: "api.example.com".into(),
port: 443,
protocol: Some("https".into()),
dns_egress_justification: None,
}]),
..AuthorityBundle::default()
});
let err = with_require_authority_derivation(true, || {
enforce_authority_derivation_requirement(&doc)
.expect_err("flag set + egress + no token must fail")
});
let msg = err.to_string();
assert!(
msg.contains("authorityDerivation"),
"error must mention authorityDerivation; got: {msg}"
);
}
#[test]
fn require_authority_derivation_passes_when_token_present() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_authority(AuthorityBundle {
secret_refs: Some(vec!["MY_SECRET".into()]),
authority_derivation: Some(stub_token()),
..AuthorityBundle::default()
});
with_require_authority_derivation(true, || {
enforce_authority_derivation_requirement(&doc)
.expect("flag set + authority + token must pass");
});
}
#[test]
fn require_authority_derivation_passes_for_empty_authority() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_authority(AuthorityBundle::default());
with_require_authority_derivation(true, || {
enforce_authority_derivation_requirement(&doc)
.expect("flag set + empty authority must pass");
});
}
#[test]
fn require_authority_derivation_passes_for_empty_secret_refs_vec() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_authority(AuthorityBundle {
egress_rules: Some(vec![]),
secret_refs: Some(vec![]),
..AuthorityBundle::default()
});
with_require_authority_derivation(true, || {
enforce_authority_derivation_requirement(&doc)
.expect("flag set + empty (Some(vec![])) authority must pass");
});
}
fn with_forced_missing<F: FnOnce() -> R, R>(value: Option<&str>, f: F) -> R {
let prev = std::env::var_os("CELLOS_TEST_FORCE_MISSING_ISOLATION_PRIMITIVES");
match value {
Some(v) => std::env::set_var("CELLOS_TEST_FORCE_MISSING_ISOLATION_PRIMITIVES", v),
None => std::env::remove_var("CELLOS_TEST_FORCE_MISSING_ISOLATION_PRIMITIVES"),
}
let out = f();
match prev {
Some(v) => std::env::set_var("CELLOS_TEST_FORCE_MISSING_ISOLATION_PRIMITIVES", v),
None => std::env::remove_var("CELLOS_TEST_FORCE_MISSING_ISOLATION_PRIMITIVES"),
}
out
}
fn with_require_telemetry_declared<F: FnOnce() -> R, R>(set: bool, f: F) -> R {
let prev = std::env::var_os("CELLOS_REQUIRE_TELEMETRY_DECLARED");
if set {
std::env::set_var("CELLOS_REQUIRE_TELEMETRY_DECLARED", "1");
} else {
std::env::remove_var("CELLOS_REQUIRE_TELEMETRY_DECLARED");
}
let out = f();
match prev {
Some(v) => std::env::set_var("CELLOS_REQUIRE_TELEMETRY_DECLARED", v),
None => std::env::remove_var("CELLOS_REQUIRE_TELEMETRY_DECLARED"),
}
out
}
fn with_strict_config<F: FnOnce() -> R, R>(value: Option<&str>, f: F) -> R {
let prev = std::env::var_os("CELLOS_STRICT_CONFIG");
match value {
Some(v) => std::env::set_var("CELLOS_STRICT_CONFIG", v),
None => std::env::remove_var("CELLOS_STRICT_CONFIG"),
}
let out = f();
match prev {
Some(v) => std::env::set_var("CELLOS_STRICT_CONFIG", v),
None => std::env::remove_var("CELLOS_STRICT_CONFIG"),
}
out
}
#[test]
fn forced_missing_single_primitive() {
let _guard = ENV_MUTEX.lock().unwrap();
let parsed = with_forced_missing(Some("unshare"), forced_missing_primitives_from_env);
assert_eq!(parsed, Some(vec!["unshare"]));
}
#[test]
fn forced_missing_multiple_primitives() {
let _guard = ENV_MUTEX.lock().unwrap();
let parsed =
with_forced_missing(Some("unshare,seccomp"), forced_missing_primitives_from_env);
assert_eq!(parsed, Some(vec!["unshare", "seccomp"]));
}
#[test]
fn forced_missing_all_wildcard_returns_full_set() {
let _guard = ENV_MUTEX.lock().unwrap();
let via_all = with_forced_missing(Some("all"), forced_missing_primitives_from_env);
let via_star = with_forced_missing(Some("*"), forced_missing_primitives_from_env);
let expected = Some(vec!["unshare", "seccomp", "cgroup-v2"]);
assert_eq!(via_all, expected);
assert_eq!(via_star, expected);
}
#[test]
fn forced_missing_unknown_token_ignored_known_kept() {
let _guard = ENV_MUTEX.lock().unwrap();
let parsed = with_forced_missing(
Some("unshare,bogus-primitive,cgroup-v2"),
forced_missing_primitives_from_env,
);
assert_eq!(parsed, Some(vec!["unshare", "cgroup-v2"]));
}
#[test]
fn forced_missing_unset_returns_none() {
let _guard = ENV_MUTEX.lock().unwrap();
let parsed = with_forced_missing(None, forced_missing_primitives_from_env);
assert!(
parsed.is_none(),
"unset env must return None, got {parsed:?}"
);
}
#[test]
fn forced_missing_drives_probe_output() {
let _guard = ENV_MUTEX.lock().unwrap();
let probed = with_forced_missing(Some("seccomp"), probe_missing_isolation_primitives);
assert_eq!(probed, vec!["seccomp"]);
}
#[test]
fn enforce_isolation_default_hardened_errors_when_primitives_missing() {
let _guard = ENV_MUTEX.lock().unwrap();
let prev_require_iso = std::env::var_os("CELLOS_REQUIRE_ISOLATION");
std::env::remove_var("CELLOS_REQUIRE_ISOLATION");
let err = with_forced_missing(Some("unshare"), || {
enforce_isolation_default(DeploymentMode::Hardened)
.expect_err("hardened + forced-missing must error")
});
let msg = err.to_string();
match prev_require_iso {
Some(v) => std::env::set_var("CELLOS_REQUIRE_ISOLATION", v),
None => std::env::remove_var("CELLOS_REQUIRE_ISOLATION"),
}
assert!(
msg.contains("host is missing isolation primitives"),
"error must surface the structured-error phrase; got: {msg}"
);
}
#[test]
fn enforce_isolation_default_portable_warns_but_succeeds_when_primitives_missing() {
let _guard = ENV_MUTEX.lock().unwrap();
with_forced_missing(Some("all"), || {
enforce_isolation_default(DeploymentMode::Portable)
.expect("portable mode must succeed even with missing primitives");
});
}
#[test]
fn require_telemetry_declared_passes_when_flag_unset() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_authority(AuthorityBundle::default());
with_require_telemetry_declared(false, || {
super::enforce_telemetry_declared(&doc)
.expect("flag unset → spec without telemetry must pass");
});
}
#[test]
fn require_telemetry_declared_rejects_spec_without_telemetry() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_authority(AuthorityBundle::default());
let err = with_require_telemetry_declared(true, || {
super::enforce_telemetry_declared(&doc)
.expect_err("flag set + missing telemetry must fail")
});
let msg = err.to_string();
assert!(
msg.contains("spec.telemetry"),
"error must mention spec.telemetry; got: {msg}"
);
}
#[test]
fn require_telemetry_declared_passes_when_telemetry_present() {
use cellos_core::{TelemetryChannel, TelemetrySpec};
let _guard = ENV_MUTEX.lock().unwrap();
let mut doc = doc_with_authority(AuthorityBundle::default());
doc.spec.telemetry = Some(TelemetrySpec {
channel: TelemetryChannel::VsockCbor,
events: vec!["process.spawn".into()],
rate_limits: None,
host_vs_guest_fields: None,
agent_version: "1.0.0".into(),
});
with_require_telemetry_declared(true, || {
super::enforce_telemetry_declared(&doc)
.expect("flag set + telemetry present must pass");
});
}
#[test]
fn redteam_d_egress_gate_fires_before_telemetry_gate() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_authority(AuthorityBundle {
egress_rules: Some(vec![EgressRule {
host: "api.example.com".into(),
port: 443,
protocol: Some("https".into()),
dns_egress_justification: None,
}]),
..AuthorityBundle::default()
});
with_require_authority_derivation(true, || {
with_require_telemetry_declared(true, || {
let egress_result = enforce_authority_derivation_requirement(&doc);
let err = egress_result
.expect_err("egress gate must reject before telemetry gate is consulted");
let msg = err.to_string();
assert!(
msg.contains("authorityDerivation"),
"first gate to fail must be the egress / authorityDerivation gate; got: {msg}"
);
assert!(
!msg.contains("spec.telemetry"),
"egress error must NOT mention telemetry — confirms gate ordering; got: {msg}"
);
});
});
}
#[test]
fn redteam_d_telemetry_gate_still_fires_when_egress_gate_passes() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_authority(AuthorityBundle::default());
with_require_authority_derivation(true, || {
with_require_telemetry_declared(true, || {
enforce_authority_derivation_requirement(&doc)
.expect("empty authority bundle must pass egress gate");
let err = super::enforce_telemetry_declared(&doc)
.expect_err("missing telemetry must fail the telemetry gate");
assert!(err.to_string().contains("spec.telemetry"));
});
});
}
#[test]
fn redteam_d_empty_egress_with_non_empty_secrets_still_requires_token() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_authority(AuthorityBundle {
egress_rules: Some(vec![]), secret_refs: Some(vec!["AWS_ACCESS".into()]),
..AuthorityBundle::default()
});
with_require_authority_derivation(true, || {
let err = enforce_authority_derivation_requirement(&doc)
.expect_err("non-empty secret_refs + no token must still fail");
assert!(err.to_string().contains("authorityDerivation"));
});
}
#[test]
fn redteam_d_non_empty_egress_with_empty_secrets_requires_token() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_authority(AuthorityBundle {
egress_rules: Some(vec![EgressRule {
host: "api.example.com".into(),
port: 443,
protocol: Some("https".into()),
dns_egress_justification: None,
}]),
secret_refs: Some(vec![]),
..AuthorityBundle::default()
});
with_require_authority_derivation(true, || {
let err = enforce_authority_derivation_requirement(&doc)
.expect_err("non-empty egress_rules + empty secret_refs + no token must fail");
assert!(err.to_string().contains("authorityDerivation"));
});
}
fn with_k8s_namespace<F: FnOnce() -> R, R>(value: Option<&str>, f: F) -> R {
let prev = std::env::var_os("CELLOS_K8S_NAMESPACE");
match value {
Some(v) => std::env::set_var("CELLOS_K8S_NAMESPACE", v),
None => std::env::remove_var("CELLOS_K8S_NAMESPACE"),
}
let out = f();
match prev {
Some(v) => std::env::set_var("CELLOS_K8S_NAMESPACE", v),
None => std::env::remove_var("CELLOS_K8S_NAMESPACE"),
}
out
}
fn doc_with_namespace(namespace: Option<&str>) -> ExecutionCellDocument {
let mut doc = doc_with_authority(AuthorityBundle::default());
if let Some(ns) = namespace {
doc.spec.placement = Some(cellos_core::PlacementSpec {
pool_id: None,
kubernetes_namespace: Some(ns.to_string()),
queue_name: None,
});
}
doc
}
#[test]
fn k8s_namespace_unset_accepts_any_spec() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_namespace(Some("cellos-prod"));
with_k8s_namespace(None, || {
super::enforce_kubernetes_namespace_placement(&doc)
.expect("unset env must accept any spec");
});
}
#[test]
fn k8s_namespace_set_matches_spec() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_namespace(Some("cellos-prod"));
with_k8s_namespace(Some("cellos-prod"), || {
super::enforce_kubernetes_namespace_placement(&doc)
.expect("matching namespace must pass");
});
}
#[test]
fn k8s_namespace_set_rejects_mismatch() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_namespace(Some("cellos-staging"));
let err = with_k8s_namespace(Some("cellos-prod"), || {
super::enforce_kubernetes_namespace_placement(&doc)
.expect_err("mismatching namespace must fail")
});
let msg = err.to_string();
assert!(
msg.contains("cellos-staging") && msg.contains("cellos-prod"),
"error must mention both namespaces; got: {msg}"
);
}
#[test]
fn k8s_namespace_set_accepts_spec_without_constraint() {
let _guard = ENV_MUTEX.lock().unwrap();
let doc = doc_with_namespace(None);
with_k8s_namespace(Some("cellos-prod"), || {
super::enforce_kubernetes_namespace_placement(&doc)
.expect("portable spec must pass on any namespace");
});
}
#[test]
fn strict_config_enabled_recognises_truthy_values() {
let _guard = ENV_MUTEX.lock().unwrap();
for v in ["1", "true", "TRUE", "Yes", "on"] {
let got = with_strict_config(Some(v), strict_config_enabled);
assert!(got, "value {v:?} should be recognised as truthy");
}
}
#[test]
fn strict_config_enabled_rejects_other_values() {
let _guard = ENV_MUTEX.lock().unwrap();
for v in ["0", "false", "no", "off", "", "maybe"] {
let got = with_strict_config(Some(v), strict_config_enabled);
assert!(!got, "value {v:?} should NOT be recognised as truthy");
}
let got = with_strict_config(None, strict_config_enabled);
assert!(!got, "unset must be falsy");
}
#[test]
fn warnings_finalize_ok_when_empty_regardless_of_strict() {
let _guard = ENV_MUTEX.lock().unwrap();
with_strict_config(Some("1"), || {
let warnings = StartupConfigWarnings::new();
warnings
.finalize()
.expect("empty warnings list must always succeed");
});
}
#[test]
fn warnings_finalize_ok_when_non_empty_and_strict_unset() {
let _guard = ENV_MUTEX.lock().unwrap();
with_strict_config(None, || {
let mut warnings = StartupConfigWarnings::new();
warnings.record(
"CELLOS_CELL_BACKEND",
"firecraker",
"value didn't match enum",
);
warnings
.finalize()
.expect("non-empty warnings + strict unset must succeed (WARN-then-continue)");
});
}
#[test]
fn warnings_finalize_errors_when_non_empty_and_strict_enabled() {
let _guard = ENV_MUTEX.lock().unwrap();
with_strict_config(Some("1"), || {
let mut warnings = StartupConfigWarnings::new();
warnings.record(
"CELLOS_CELL_BACKEND",
"firecraker",
"value didn't match enum {firecracker, gvisor, stub}",
);
warnings.record(
"CELLOS_BROKER",
"valt-approle",
"value didn't match enum {env, file, github-oidc, vault-approle}",
);
let err = warnings
.finalize()
.expect_err("non-empty + strict must fail loud");
let msg = err.to_string();
assert!(
msg.contains("CELLOS_STRICT_CONFIG"),
"error must reference the strict flag; got: {msg}"
);
assert!(
msg.contains("CELLOS_CELL_BACKEND") && msg.contains("CELLOS_BROKER"),
"error must list every recorded warning; got: {msg}"
);
});
}
}