use std::time::Duration;
use cellos_core::{
CloudEventV1, ExecutionCellDocument, ExportArtifact, ExportReceiptTargetKind, ExportTarget,
RunSpec, RuntimeSecretLeaseRequest, SecretView, WorkloadIdentity, WorkloadIdentityKind,
};
use crate::runtime_secret::RuntimeSecretEntryInput;
pub(crate) fn dns_authority_refresh_enabled() -> bool {
std::env::var("CELLOS_DNS_AUTHORITY_REFRESH")
.map(|v| {
let t = v.trim().to_ascii_lowercase();
matches!(t.as_str(), "1" | "true" | "yes" | "on")
})
.unwrap_or(false)
}
pub(crate) fn dns_authority_continuous_enabled() -> bool {
std::env::var("CELLOS_DNS_AUTHORITY_CONTINUOUS")
.map(|v| {
let t = v.trim().to_ascii_lowercase();
matches!(t.as_str(), "1" | "true" | "yes" | "on")
})
.unwrap_or(false)
}
pub(crate) fn dns_authority_tick_interval(
policy: Option<&cellos_core::DnsRefreshPolicy>,
) -> std::time::Duration {
let configured: Option<u64> = std::env::var("CELLOS_DNS_AUTHORITY_TICK_INTERVAL_SECS")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok());
let raw_secs = match configured {
Some(s) => s,
None => {
let from_policy = policy
.and_then(|p| p.min_ttl_seconds)
.map(u64::from)
.unwrap_or(60);
from_policy.min(60)
}
};
let clamped = cellos_supervisor::resolver_refresh::ticker::clamp_tick_interval_secs(raw_secs);
std::time::Duration::from_secs(clamped)
}
pub(crate) fn trust_plane_observability_enabled() -> bool {
match std::env::var("CELLOS_TRUST_PLANE_EVENTS") {
Ok(v) => {
let t = v.trim().to_ascii_lowercase();
if t.is_empty() {
return true;
}
!matches!(t.as_str(), "0" | "false" | "no" | "off")
}
Err(_) => true,
}
}
pub(crate) fn dns_proxy_enabled() -> bool {
std::env::var("CELLOS_DNS_PROXY")
.map(|v| {
let t = v.trim().to_ascii_lowercase();
matches!(t.as_str(), "1" | "true" | "yes" | "on")
})
.unwrap_or(false)
}
pub(crate) fn sni_proxy_enabled() -> bool {
std::env::var("CELLOS_SNI_PROXY")
.map(|v| {
let t = v.trim().to_ascii_lowercase();
matches!(t.as_str(), "1" | "true" | "yes" | "on")
})
.unwrap_or(false)
}
const SNI_PROXY_DEFAULT_BIND: &str = "127.0.0.1:8443";
pub(crate) fn sni_proxy_bind_addr() -> std::net::SocketAddr {
let raw = std::env::var("CELLOS_SNI_PROXY_BIND")
.unwrap_or_else(|_| SNI_PROXY_DEFAULT_BIND.to_string());
raw.parse().unwrap_or_else(|e| {
tracing::warn!(
target: "cellos.supervisor.sni_proxy",
error = %e,
raw = %raw,
default = SNI_PROXY_DEFAULT_BIND,
"CELLOS_SNI_PROXY_BIND parse failed — falling back to default"
);
SNI_PROXY_DEFAULT_BIND.parse().expect("default bind parses")
})
}
pub(crate) fn sni_proxy_upstream_override() -> Option<std::net::SocketAddr> {
std::env::var("CELLOS_SNI_PROXY_UPSTREAM_OVERRIDE")
.ok()
.and_then(|raw| raw.trim().parse().ok())
}
pub(crate) fn l7_gate_observability_enabled() -> bool {
std::env::var("CELLOS_L7_GATE_OBSERVABILITY")
.map(|v| {
let t = v.trim().to_ascii_lowercase();
matches!(t.as_str(), "1" | "true" | "yes" | "on")
})
.unwrap_or(false)
}
pub(crate) fn identity_resolve_key(identity: &WorkloadIdentity) -> &str {
match identity.kind {
WorkloadIdentityKind::FederatedOidc => &identity.audience,
}
}
pub(crate) fn effective_export_target_name<'a>(
doc: &'a ExecutionCellDocument,
artifact: &'a ExportArtifact,
) -> Option<&'a str> {
if let Some(target_name) = artifact.target.as_deref() {
return Some(target_name);
}
let targets = doc.spec.export.as_ref()?.targets.as_ref()?;
if targets.len() == 1 {
Some(targets[0].name())
} else {
None
}
}
pub(crate) fn configured_export_target<'a>(
doc: &'a ExecutionCellDocument,
artifact: &'a ExportArtifact,
) -> Option<&'a ExportTarget> {
let target_name = effective_export_target_name(doc, artifact)?;
doc.spec
.export
.as_ref()
.and_then(|export| export.targets.as_ref())
.and_then(|targets| targets.iter().find(|target| target.name() == target_name))
}
pub(crate) fn resolve_export_target_kind(
doc: &ExecutionCellDocument,
artifact: &ExportArtifact,
sink_kind: Option<ExportReceiptTargetKind>,
) -> ExportReceiptTargetKind {
let spec_kind = configured_export_target(doc, artifact).map(|target| match target {
ExportTarget::Http(_) => ExportReceiptTargetKind::Http,
ExportTarget::S3(_) => ExportReceiptTargetKind::S3,
});
spec_kind
.or(sink_kind)
.unwrap_or(ExportReceiptTargetKind::Local)
}
pub(crate) fn logical_export_destination(
doc: &ExecutionCellDocument,
artifact: &ExportArtifact,
sink_destination: Option<&str>,
) -> Option<String> {
match configured_export_target(doc, artifact) {
Some(ExportTarget::S3(target)) => {
let mut key = target
.key_prefix
.as_deref()
.unwrap_or("")
.trim_matches('/')
.to_string();
if !key.is_empty() {
key.push('/');
}
key.push_str(&artifact.name);
Some(format!("s3://{}/{}", target.bucket, key))
}
Some(ExportTarget::Http(target)) => sink_destination
.map(str::to_string)
.or_else(|| Some(target.base_url.clone())),
None => sink_destination.map(str::to_string),
}
}
pub(crate) fn argv0_allow_prefixes_error(argv: &[String]) -> Option<String> {
let raw = std::env::var("CELLOS_RUN_ARGV0_ALLOW_PREFIXES").ok()?;
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
let prefixes: Vec<String> = trimmed
.split(',')
.map(|s| s.trim().trim_end_matches(['/', '\\']).replace('\\', "/"))
.filter(|p| !p.is_empty())
.collect();
if prefixes.is_empty() {
return None;
}
let Some(argv0) = argv.first() else {
return Some("spec.run.argv is empty".into());
};
if argv0.is_empty() {
return Some("spec.run.argv[0] is empty".into());
}
let norm = argv0.replace('\\', "/");
let is_absolute = if cfg!(unix) {
norm.starts_with('/')
} else {
let b = norm.as_bytes();
(b.len() >= 3
&& b[0].is_ascii_alphabetic()
&& b[1] == b':'
&& (b[2] == b'/' || b[2] == b'\\'))
|| norm.starts_with("//")
};
if !is_absolute {
return Some(format!(
"CELLOS_RUN_ARGV0_ALLOW_PREFIXES is set but argv[0] is not an absolute path (got {argv0:?}); use a full path such as /usr/bin/true"
));
}
let ok = prefixes
.iter()
.any(|p| norm == *p || norm.starts_with(&format!("{p}/")));
if !ok {
return Some(format!(
"argv[0] {argv0:?} is not under any allowed prefix from CELLOS_RUN_ARGV0_ALLOW_PREFIXES: {prefixes:?}"
));
}
None
}
pub(crate) fn configured_run_timeout_env() -> Result<Option<Duration>, String> {
match std::env::var("CELLOS_RUN_TIMEOUT_MS") {
Err(_) => Ok(None),
Ok(raw) => {
let t = raw.trim();
if t.is_empty() {
return Ok(None);
}
let ms: u64 = t
.parse()
.map_err(|_| format!("invalid CELLOS_RUN_TIMEOUT_MS: {raw:?}"))?;
if ms == 0 {
return Err("CELLOS_RUN_TIMEOUT_MS must be > 0".into());
}
Ok(Some(Duration::from_millis(ms)))
}
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum RunTimeoutSource {
TtlCeiling { ttl_seconds: u64 },
SpecTimeout,
EnvTimeout,
}
pub(crate) fn effective_run_timeout(
run: &RunSpec,
ttl_seconds: u64,
) -> Result<Option<(Duration, RunTimeoutSource)>, String> {
let env_timeout = configured_run_timeout_env()?;
let spec_timeout = run.timeout_ms.map(Duration::from_millis);
let ttl_ceiling = Duration::from_millis(ttl_seconds.saturating_mul(1000));
let combined: Option<(Duration, RunTimeoutSource)> = match (spec_timeout, env_timeout) {
(Some(s), Some(e)) => Some(if s <= e {
(s, RunTimeoutSource::SpecTimeout)
} else {
(e, RunTimeoutSource::EnvTimeout)
}),
(Some(s), None) => Some((s, RunTimeoutSource::SpecTimeout)),
(None, Some(e)) => Some((e, RunTimeoutSource::EnvTimeout)),
(None, None) => None,
};
Ok(Some(match combined {
Some((d, source)) => {
if d <= ttl_ceiling {
(d, source)
} else {
(ttl_ceiling, RunTimeoutSource::TtlCeiling { ttl_seconds })
}
}
None => (ttl_ceiling, RunTimeoutSource::TtlCeiling { ttl_seconds }),
}))
}
pub(crate) fn run_timeout_message(timeout: Duration, source: RunTimeoutSource) -> String {
match source {
RunTimeoutSource::TtlCeiling { ttl_seconds } => format!(
"cell killed by TTL watchdog after {} ms (ttl_seconds={ttl_seconds})",
timeout.as_millis()
),
RunTimeoutSource::SpecTimeout => format!(
"command timed out after {} ms (spec.run.timeoutMs)",
timeout.as_millis()
),
RunTimeoutSource::EnvTimeout => format!(
"command timed out after {} ms (CELLOS_RUN_TIMEOUT_MS)",
timeout.as_millis()
),
}
}
pub(crate) fn secret_ttl_for_key(doc: &ExecutionCellDocument, key: &str) -> u64 {
if let Some(identity) = &doc.spec.identity {
if identity.secret_ref == key {
return identity
.ttl_seconds
.unwrap_or(doc.spec.lifetime.ttl_seconds);
}
}
doc.spec.lifetime.ttl_seconds
}
pub(crate) fn runtime_secret_entries_for_doc(
doc: &ExecutionCellDocument,
secrets: &[SecretView],
) -> Vec<RuntimeSecretEntryInput> {
secrets
.iter()
.map(|secret| RuntimeSecretEntryInput {
key: secret.key.clone(),
value: zeroize::Zeroizing::new(secret.value.as_str().to_string()),
ttl_seconds: secret_ttl_for_key(doc, &secret.key),
})
.collect()
}
pub(crate) fn runtime_secret_lease_requests_for_doc(
doc: &ExecutionCellDocument,
) -> Vec<RuntimeSecretLeaseRequest> {
let mut requests = Vec::new();
if let Some(identity) = &doc.spec.identity {
requests.push(RuntimeSecretLeaseRequest {
key: identity.secret_ref.clone(),
ttl_seconds: identity
.ttl_seconds
.unwrap_or(doc.spec.lifetime.ttl_seconds),
});
}
if let Some(secret_refs) = &doc.spec.authority.secret_refs {
for key in secret_refs {
if doc
.spec
.identity
.as_ref()
.is_some_and(|identity| identity.secret_ref == *key)
{
continue;
}
requests.push(RuntimeSecretLeaseRequest {
key: key.clone(),
ttl_seconds: secret_ttl_for_key(doc, key),
});
}
}
requests
}
pub(crate) fn configured_broker_name() -> &'static str {
match std::env::var("CELLOS_BROKER").unwrap_or_default().as_str() {
"env" => "env",
"file" => "file",
"github-oidc" => "github-oidc",
"vault-approle" => "vault-approle",
_ => "memory",
}
}
pub(crate) fn cloud_event(ty: impl Into<String>, data: serde_json::Value) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: "cellos-supervisor".into(),
ty: ty.into(),
datacontenttype: Some("application/json".into()),
data: Some(data),
time: Some(chrono::Utc::now().to_rfc3339()),
traceparent: None,
}
}
#[cfg(test)]
mod tests {
use super::{
effective_export_target_name, effective_run_timeout, logical_export_destination,
RunTimeoutSource,
};
use cellos_core::types::SecretDeliveryMode;
use cellos_core::{
AuthorityBundle, ExecutionCellDocument, ExecutionCellSpec, ExportArtifact, ExportChannels,
ExportTarget, HttpExportTarget, Lifetime, RunSpec, S3ExportTarget,
};
use std::time::Duration;
fn run_with_timeout(timeout_ms: Option<u64>) -> RunSpec {
RunSpec {
argv: vec!["/usr/bin/true".into()],
working_directory: None,
timeout_ms,
limits: None,
secret_delivery: SecretDeliveryMode::Env,
}
}
#[test]
fn effective_run_timeout_falls_back_to_ttl_ceiling() {
std::env::remove_var("CELLOS_RUN_TIMEOUT_MS");
let run = run_with_timeout(None);
let (got, source) = effective_run_timeout(&run, 60).expect("ok").expect("some");
assert_eq!(got, Duration::from_secs(60));
assert!(matches!(
source,
RunTimeoutSource::TtlCeiling { ttl_seconds: 60 }
));
}
#[test]
fn effective_run_timeout_prefers_spec_when_below_ttl() {
std::env::remove_var("CELLOS_RUN_TIMEOUT_MS");
let run = run_with_timeout(Some(5_000));
let (got, source) = effective_run_timeout(&run, 60).expect("ok").expect("some");
assert_eq!(got, Duration::from_millis(5_000));
assert!(matches!(source, RunTimeoutSource::SpecTimeout));
}
#[test]
fn effective_run_timeout_caps_at_ttl_ceiling() {
std::env::remove_var("CELLOS_RUN_TIMEOUT_MS");
let run = run_with_timeout(Some(120_000));
let (got, source) = effective_run_timeout(&run, 60).expect("ok").expect("some");
assert_eq!(got, Duration::from_secs(60));
assert!(matches!(
source,
RunTimeoutSource::TtlCeiling { ttl_seconds: 60 }
));
}
fn doc_with_targets(targets: Vec<ExportTarget>) -> ExecutionCellDocument {
ExecutionCellDocument {
api_version: "cellos.io/v1".into(),
kind: "ExecutionCell".into(),
spec: ExecutionCellSpec {
id: "cell-1".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: AuthorityBundle::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: Some(ExportChannels {
artifacts: None,
targets: Some(targets),
}),
telemetry: None,
},
}
}
#[test]
fn infers_single_target_name_when_artifact_target_missing() {
let doc = doc_with_targets(vec![ExportTarget::Http(HttpExportTarget {
name: "artifact-api".into(),
base_url: "https://example.com/upload".into(),
secret_ref: None,
})]);
let artifact = ExportArtifact {
name: "coverage-summary".into(),
path: "/tmp/coverage.txt".into(),
target: None,
content_type: None,
};
assert_eq!(
effective_export_target_name(&doc, &artifact),
Some("artifact-api")
);
}
#[test]
fn computes_logical_s3_destination_from_target_metadata() {
let doc = doc_with_targets(vec![ExportTarget::S3(S3ExportTarget {
name: "artifact-bucket".into(),
bucket: "acme-cellos-artifacts".into(),
key_prefix: Some("github/acme/widget/123456790".into()),
region: Some("eu-west-1".into()),
secret_ref: None,
})]);
let artifact = ExportArtifact {
name: "test-results".into(),
path: "/tmp/junit.xml".into(),
target: None,
content_type: None,
};
assert_eq!(
logical_export_destination(&doc, &artifact, Some("https://ignored.example/upload")),
Some("s3://acme-cellos-artifacts/github/acme/widget/123456790/test-results".into())
);
}
}