use std::time::{Duration, Instant};
use cellos_core::ports::CellHandle;
use cellos_core::{EgressRule, RunSpec, SecretView};
use crate::runtime_secret::RuntimeSecretSession;
use crate::supervisor::{DnsProxyActivation, SniProxyActivation};
use crate::supervisor_helpers::{effective_run_timeout, run_timeout_message, RunTimeoutSource};
#[cfg(unix)]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn run_cell_command(
run: &RunSpec,
ttl_seconds: u64,
handle: &CellHandle,
egress_rules: &[EgressRule],
dns_authority: Option<&cellos_core::DnsAuthority>,
secrets: &[SecretView],
runtime_secret_session: Option<&RuntimeSecretSession>,
dns_proxy_activation: Option<DnsProxyActivation>,
dns_proxy_emitter: Option<std::sync::Arc<dyn crate::dns_proxy::DnsQueryEmitter>>,
sni_proxy_activation: Option<SniProxyActivation>,
sni_proxy_emitter: Option<std::sync::Arc<dyn crate::sni_proxy::L7DecisionEmitter>>,
per_flow_realtime: Option<(std::sync::Arc<dyn cellos_core::ports::EventSink>, String)>,
flow_accumulator: Option<
std::sync::Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
>,
) -> (
i32,
u64,
Option<String>,
Option<bool>,
Option<String>,
Option<String>,
Vec<crate::nft_counters::NftCounterRow>,
) {
let run_timeout = match effective_run_timeout(run, ttl_seconds) {
Ok(t) => t,
Err(msg) => return (-1, 0, Some(msg), None, None, None, Vec::new()),
};
#[cfg(target_os = "linux")]
{
let attach = crate::linux_isolation::linux_cgroup_attach_for_run(handle);
let unshare = crate::linux_isolation::linux_subprocess_unshare_flags();
let seccomp_program = match crate::linux_seccomp::load_seccomp_program_from_env() {
Ok(p) => p,
Err(e) => {
return (
-1,
0,
Some(format!("seccomp: {e}")),
None,
None,
None,
Vec::new(),
);
}
};
if !matches!(attach, crate::linux_isolation::LinuxCgroupAttach::None)
|| unshare.is_some()
|| seccomp_program.is_some()
{
return crate::linux_isolation::linux_run_cell_command_isolated(
run,
&handle.cell_id,
attach,
unshare,
egress_rules,
dns_authority.cloned(),
secrets,
runtime_secret_session,
run_timeout,
seccomp_program,
dns_proxy_activation,
dns_proxy_emitter,
sni_proxy_activation,
sni_proxy_emitter,
per_flow_realtime,
flow_accumulator,
)
.await;
}
}
#[cfg(not(target_os = "linux"))]
{
crate::linux_isolation::maybe_warn_linux_isolation_env_on_non_linux();
let _ = handle;
let _ = egress_rules;
let _ = dns_authority;
let _ = dns_proxy_activation;
let _ = dns_proxy_emitter;
let _ = sni_proxy_activation;
let _ = sni_proxy_emitter;
let _ = per_flow_realtime;
let _ = flow_accumulator;
if std::env::var("CELLOS_PER_FLOW_ENFORCEMENT_EVENTS").as_deref() == Ok("1") {
tracing::warn!(
target: "cellos.supervisor.per_flow_enforcement",
"CELLOS_PER_FLOW_ENFORCEMENT_EVENTS=1 is set but supervisor host is not Linux — \
per-flow network_flow_decision events require nft and are not emitted on this platform"
);
}
}
let (code, ms, err) =
run_cell_command_tokio(run, run_timeout, secrets, runtime_secret_session).await;
(code, ms, err, None, None, None, Vec::new())
}
#[cfg(not(unix))]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn run_cell_command(
run: &RunSpec,
ttl_seconds: u64,
_handle: &CellHandle,
_egress_rules: &[EgressRule],
_dns_authority: Option<&cellos_core::DnsAuthority>,
secrets: &[SecretView],
_runtime_secret_session: Option<&RuntimeSecretSession>,
_dns_proxy_activation: Option<DnsProxyActivation>,
_dns_proxy_emitter: Option<std::sync::Arc<dyn crate::dns_proxy::DnsQueryEmitter>>,
_sni_proxy_activation: Option<SniProxyActivation>,
_sni_proxy_emitter: Option<std::sync::Arc<dyn crate::sni_proxy::L7DecisionEmitter>>,
_per_flow_realtime: Option<(std::sync::Arc<dyn cellos_core::ports::EventSink>, String)>,
_flow_accumulator: Option<
std::sync::Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
>,
) -> (
i32,
u64,
Option<String>,
Option<bool>,
Option<String>,
Option<String>,
Vec<crate::nft_counters::NftCounterRow>,
) {
let run_timeout = match effective_run_timeout(run, ttl_seconds) {
Ok(t) => t,
Err(msg) => return (-1, 0, Some(msg), None, None, None, Vec::new()),
};
if std::env::var("CELLOS_PER_FLOW_ENFORCEMENT_EVENTS").as_deref() == Ok("1") {
tracing::warn!(
target: "cellos.supervisor.per_flow_enforcement",
"CELLOS_PER_FLOW_ENFORCEMENT_EVENTS=1 is set but supervisor host is not unix/linux — \
per-flow network_flow_decision events require nft and are not emitted on this platform"
);
}
let (code, ms, err) = run_cell_command_tokio(run, run_timeout, secrets, None).await;
(code, ms, err, None, None, None, Vec::new())
}
pub(crate) async fn run_cell_command_tokio(
run: &RunSpec,
run_timeout: Option<(Duration, RunTimeoutSource)>,
secrets: &[SecretView],
runtime_secret_session: Option<&RuntimeSecretSession>,
) -> (i32, u64, Option<String>) {
let start = Instant::now();
let mut cmd = tokio::process::Command::new(&run.argv[0]);
if run.argv.len() > 1 {
cmd.args(&run.argv[1..]);
}
cmd.env_clear();
#[cfg(unix)]
cmd.env("PATH", "/usr/bin:/bin:/usr/local/bin");
#[cfg(windows)]
cmd.env("PATH", r"C:\Windows\System32;C:\Windows");
if let Some(session) = runtime_secret_session {
for (key, value) in session.env_pairs() {
cmd.env(key, value);
}
} else {
for s in secrets {
cmd.env(&s.key, s.value.as_str());
}
}
if let Some(wd) = &run.working_directory {
cmd.current_dir(wd);
}
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
let duration_ms = start.elapsed().as_millis() as u64;
return (-1, duration_ms, Some(e.to_string()));
}
};
if let Some((timeout, source)) = run_timeout {
tokio::select! {
status = child.wait() => {
let duration_ms = start.elapsed().as_millis() as u64;
match status {
Ok(status) => (status.code().unwrap_or(-1), duration_ms, None),
Err(e) => (-1, duration_ms, Some(e.to_string())),
}
}
_ = tokio::time::sleep(timeout) => {
let _ = child.start_kill();
let _ = child.wait().await;
let duration_ms = start.elapsed().as_millis() as u64;
(-1, duration_ms, Some(run_timeout_message(timeout, source)))
}
}
} else {
match child.wait().await {
Ok(status) => {
let duration_ms = start.elapsed().as_millis() as u64;
(status.code().unwrap_or(-1), duration_ms, None)
}
Err(e) => {
let duration_ms = start.elapsed().as_millis() as u64;
(-1, duration_ms, Some(e.to_string()))
}
}
}
}