pub mod memory_broker;
pub use memory_broker::MemorySecretBroker;
use std::collections::HashMap;
#[cfg(target_os = "linux")]
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;
use tracing::instrument;
use uuid::Uuid;
use cellos_core::ports::{CellBackend, CellHandle, TeardownReport};
#[cfg(target_os = "linux")]
use cellos_core::sanitize_cgroup_leaf_segment;
use cellos_core::{CellosError, ExecutionCellDocument};
#[derive(Debug, Clone, Default)]
pub struct WorkloadEnv {
pairs: Vec<(String, String)>,
}
impl WorkloadEnv {
pub fn new() -> Self {
Self { pairs: Vec::new() }
}
pub fn push(
&mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Result<(), CellosError> {
let k = key.into();
let v = value.into();
if k.is_empty() {
return Err(CellosError::InvalidSpec("env key must be non-empty".into()));
}
if k.contains('=') || k.as_bytes().contains(&0u8) {
return Err(CellosError::InvalidSpec(format!(
"env key {k:?} contains '=' or NUL — refused"
)));
}
if v.as_bytes().contains(&0u8) {
return Err(CellosError::InvalidSpec(format!(
"env value for {k:?} contains NUL — refused"
)));
}
self.pairs.push((k, v));
Ok(())
}
pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
self.pairs.iter().map(|(k, v)| (k.as_str(), v.as_str()))
}
pub fn len(&self) -> usize {
self.pairs.len()
}
pub fn is_empty(&self) -> bool {
self.pairs.is_empty()
}
}
#[cfg(unix)]
pub struct SpawnedWorkload {
child: std::process::Child,
}
#[cfg(unix)]
impl SpawnedWorkload {
pub fn pid(&self) -> u32 {
self.child.id()
}
pub fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
self.child.wait()
}
pub fn kill(&mut self) -> std::io::Result<()> {
self.child.kill()
}
}
#[cfg(unix)]
pub fn spawn_isolated_workload(
argv: &[String],
env: &WorkloadEnv,
) -> Result<SpawnedWorkload, CellosError> {
use std::os::unix::process::CommandExt;
use std::process::{Command, Stdio};
if argv.is_empty() {
return Err(CellosError::InvalidSpec(
"spawn_isolated_workload: argv must be non-empty".into(),
));
}
let mut cmd = Command::new(&argv[0]);
if argv.len() > 1 {
cmd.args(&argv[1..]);
}
cmd.env_clear();
for (k, v) in env.iter() {
cmd.env(k, v);
}
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
unsafe {
cmd.pre_exec(|| {
let mut walked = false;
if let Ok(dir) = std::fs::read_dir("/proc/self/fd") {
walked = true;
for entry in dir.flatten() {
if let Ok(name) = entry.file_name().into_string() {
if let Ok(fd) = name.parse::<libc::c_int>() {
if fd > 2 {
libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC);
}
}
}
}
}
if !walked {
const FD_WALK_CEILING: libc::c_int = 65_536;
let mut rl: libc::rlimit = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
let max: libc::c_int = if libc::getrlimit(libc::RLIMIT_NOFILE, &mut rl) == 0 {
if rl.rlim_cur > FD_WALK_CEILING as libc::rlim_t {
FD_WALK_CEILING
} else {
rl.rlim_cur as libc::c_int
}
} else {
1024
};
let mut fd: libc::c_int = 3;
while fd < max {
libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC);
fd += 1;
}
}
Ok(())
});
}
let mut child = cmd.spawn().map_err(|e| {
CellosError::Host(format!(
"spawn_isolated_workload: spawn {:?} failed: {e}",
argv[0]
))
})?;
drop(child.stdin.take());
drop(child.stdout.take());
drop(child.stderr.take());
Ok(SpawnedWorkload { child })
}
#[cfg(not(unix))]
pub fn spawn_isolated_workload(_argv: &[String], _env: &WorkloadEnv) -> Result<(), CellosError> {
Err(CellosError::Host(
"spawn_isolated_workload: host subprocess spawn is Unix-only".into(),
))
}
#[derive(Debug, Clone)]
struct CellRecord {
#[allow(dead_code)]
run_token: Uuid,
#[cfg(target_os = "linux")]
cgroup_path: Option<PathBuf>,
}
#[derive(Clone)]
pub struct ProprietaryCellBackend {
cells: Arc<Mutex<HashMap<String, CellRecord>>>,
}
impl Default for ProprietaryCellBackend {
fn default() -> Self {
Self::new()
}
}
impl ProprietaryCellBackend {
pub fn new() -> Self {
Self {
cells: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn tracked_cell_count(&self) -> usize {
self.cells.lock().await.len()
}
pub async fn has_tracked_state(&self, cell_id: &str) -> bool {
self.cells.lock().await.contains_key(cell_id)
}
}
#[cfg(target_os = "linux")]
fn linux_cgroup_leaf_for_cell(cell_id: &str) -> Result<Option<PathBuf>, CellosError> {
let Ok(raw) = std::env::var("CELLOS_CGROUP_PARENT") else {
return Ok(None);
};
let parent = raw.trim();
if parent.is_empty() {
return Ok(None);
}
let leaf = PathBuf::from(parent).join(format!(
"cellos_{}_{}",
sanitize_cgroup_leaf_segment(cell_id),
Uuid::new_v4()
));
std::fs::create_dir(&leaf).map_err(|e| {
CellosError::Host(format!(
"CELLOS_CGROUP_PARENT: create cgroup leaf {}: {e}",
leaf.display()
))
})?;
Ok(Some(leaf))
}
#[async_trait]
impl CellBackend for ProprietaryCellBackend {
#[instrument(skip(self, spec))]
async fn create(&self, spec: &ExecutionCellDocument) -> Result<CellHandle, CellosError> {
if spec.spec.id.is_empty() {
return Err(CellosError::InvalidSpec("spec.id must be non-empty".into()));
}
let id = spec.spec.id.clone();
let mut map = self.cells.lock().await;
if map.contains_key(&id) {
return Err(CellosError::Host(format!(
"cell id {id:?} already active on host (no duplicate live cells)"
)));
}
#[cfg(target_os = "linux")]
let cgroup_path = linux_cgroup_leaf_for_cell(&id)?;
#[cfg(not(target_os = "linux"))]
let cgroup_path = None;
map.insert(
id.clone(),
CellRecord {
run_token: Uuid::new_v4(),
#[cfg(target_os = "linux")]
cgroup_path: cgroup_path.clone(),
},
);
Ok(CellHandle {
cell_id: id,
cgroup_path,
nft_rules_applied: None,
kernel_digest_sha256: None,
rootfs_digest_sha256: None,
firecracker_digest_sha256: None,
})
}
#[instrument(skip(self, handle))]
async fn destroy(&self, handle: &CellHandle) -> Result<TeardownReport, CellosError> {
let mut map = self.cells.lock().await;
let removed = map.remove(&handle.cell_id);
if removed.is_none() {
return Err(CellosError::Host(format!(
"cell {:?} unknown or already destroyed (no double-teardown)",
handle.cell_id
)));
}
#[cfg(target_os = "linux")]
if let Some(rec) = &removed {
if let Some(ref p) = rec.cgroup_path {
if let Err(e) = std::fs::remove_dir(p) {
tracing::warn!(
target: "cellos.host.proprietary",
path = %p.display(),
error = %e,
"cgroup leaf cleanup failed (non-fatal)"
);
}
}
}
let peers_tracked_after = map.len();
Ok(TeardownReport {
cell_id: handle.cell_id.clone(),
destroyed: true,
peers_tracked_after,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn destroy_removes_tracked_state_same_id_can_run_again() {
let host = ProprietaryCellBackend::new();
let doc = sample_doc("cell-a");
let h1 = host.create(&doc).await.unwrap();
assert_eq!(host.tracked_cell_count().await, 1);
host.destroy(&h1).await.unwrap();
assert_eq!(host.tracked_cell_count().await, 0);
assert!(!host.has_tracked_state("cell-a").await);
let h2 = host.create(&doc).await.unwrap();
assert_eq!(h2.cell_id, "cell-a");
host.destroy(&h2).await.unwrap();
assert_eq!(host.tracked_cell_count().await, 0);
}
#[tokio::test]
async fn double_destroy_errors() {
let host = ProprietaryCellBackend::new();
let doc = sample_doc("x");
let h = host.create(&doc).await.unwrap();
host.destroy(&h).await.unwrap();
let err = host.destroy(&h).await.unwrap_err();
match err {
CellosError::Host(_) => {}
e => panic!("expected Host error, got {e:?}"),
}
}
#[tokio::test]
async fn teardown_report_peers_tracked_after_counts_remaining_cells() {
let host = ProprietaryCellBackend::new();
let a = host.create(&sample_doc("a")).await.unwrap();
let b = host.create(&sample_doc("b")).await.unwrap();
let r = host.destroy(&a).await.unwrap();
assert!(r.destroyed);
assert_eq!(r.peers_tracked_after, 1);
assert!(host.has_tracked_state("b").await);
let r2 = host.destroy(&b).await.unwrap();
assert_eq!(r2.peers_tracked_after, 0);
}
fn sample_doc(id: &str) -> ExecutionCellDocument {
serde_json::from_value(serde_json::json!({
"apiVersion": "cellos.io/v1",
"kind": "ExecutionCell",
"spec": {
"id": id,
"authority": { "secretRefs": [] },
"lifetime": { "ttlSeconds": 60 }
}
}))
.unwrap()
}
}