Skip to main content

cellos_host_cellos/
lib.rs

1//! Proprietary **CellOS** host backend (L2).
2//!
3//! Today this is a **simulated** runtime: in-memory cell records that **must** be removed on
4//! `destroy` so tests can falsify residue. When the real kernel/userspace ABI exists, replace the
5//! inner implementation with FFI/IPC while keeping the same [`CellBackend`] contract.
6//!
7//! # Subprocess spawn (E1-04)
8//!
9//! [`spawn_isolated_workload`] is the **only** sanctioned way for this backend to materialise a
10//! child process. It enforces three FD-hygiene invariants on Unix:
11//!
12//! 1. **No ambient env.** The child gets `env_clear()` plus *exactly* the declared injection set —
13//!    nothing inherited from the supervisor.
14//! 2. **Stdio replaced with pipes.** stdin / stdout / stderr are wired to fresh pipes whose
15//!    *parent* ends drop before the caller can write or read, so a child cannot reach back into
16//!    the supervisor's terminal or log streams.
17//! 3. **All FDs > 2 closed on exec.** A `pre_exec` closure walks `/proc/self/fd` and sets
18//!    `FD_CLOEXEC` on every fd > 2. The kernel atomically closes those at `execve(2)` so a
19//!    workload never inherits NATS sockets, broker handles, or audit-log writers.
20//!
21//! These guarantees are exercised by `tests/fd_isolation.rs`, which reads
22//! `/proc/<child>/environ` of a real child and asserts byte-for-byte equality with the declared
23//! injection set.
24
25pub mod memory_broker;
26
27pub use memory_broker::MemorySecretBroker;
28
29use std::collections::HashMap;
30#[cfg(target_os = "linux")]
31use std::path::PathBuf;
32use std::sync::Arc;
33
34use async_trait::async_trait;
35use tokio::sync::Mutex;
36use tracing::instrument;
37use uuid::Uuid;
38
39use cellos_core::ports::{CellBackend, CellHandle, TeardownReport};
40#[cfg(target_os = "linux")]
41use cellos_core::sanitize_cgroup_leaf_segment;
42use cellos_core::{CellosError, ExecutionCellDocument};
43
44/// Declared environment injection set for [`spawn_isolated_workload`].
45///
46/// Order is preserved. Empty values are valid. Keys must not contain `=` or NUL.
47#[derive(Debug, Clone, Default)]
48pub struct WorkloadEnv {
49    pairs: Vec<(String, String)>,
50}
51
52impl WorkloadEnv {
53    pub fn new() -> Self {
54        Self { pairs: Vec::new() }
55    }
56
57    /// Append a single `KEY=VALUE` pair. Returns an error if `key` is invalid.
58    pub fn push(
59        &mut self,
60        key: impl Into<String>,
61        value: impl Into<String>,
62    ) -> Result<(), CellosError> {
63        let k = key.into();
64        let v = value.into();
65        if k.is_empty() {
66            return Err(CellosError::InvalidSpec("env key must be non-empty".into()));
67        }
68        if k.contains('=') || k.as_bytes().contains(&0u8) {
69            return Err(CellosError::InvalidSpec(format!(
70                "env key {k:?} contains '=' or NUL — refused"
71            )));
72        }
73        if v.as_bytes().contains(&0u8) {
74            return Err(CellosError::InvalidSpec(format!(
75                "env value for {k:?} contains NUL — refused"
76            )));
77        }
78        self.pairs.push((k, v));
79        Ok(())
80    }
81
82    pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
83        self.pairs.iter().map(|(k, v)| (k.as_str(), v.as_str()))
84    }
85
86    pub fn len(&self) -> usize {
87        self.pairs.len()
88    }
89
90    pub fn is_empty(&self) -> bool {
91        self.pairs.is_empty()
92    }
93}
94
95/// Handle to a spawned workload child. Drops do not kill — caller `wait()`s.
96#[cfg(unix)]
97pub struct SpawnedWorkload {
98    child: std::process::Child,
99}
100
101#[cfg(unix)]
102impl SpawnedWorkload {
103    /// PID of the child process (still valid until [`SpawnedWorkload::wait`]).
104    pub fn pid(&self) -> u32 {
105        self.child.id()
106    }
107
108    /// Wait for the child to exit and return its exit status.
109    pub fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
110        self.child.wait()
111    }
112
113    /// Send SIGKILL to the child (best-effort; ignores ESRCH).
114    pub fn kill(&mut self) -> std::io::Result<()> {
115        self.child.kill()
116    }
117}
118
119/// Spawn a workload subprocess with closed FD inheritance and a declared env injection set.
120///
121/// Contract (Unix):
122/// - Child env is *exactly* `env` (no inheritance).
123/// - stdin/stdout/stderr are connected to fresh pipes; the parent ends drop before this function
124///   returns, so the child sees stdin EOF on its first read and any stdout/stderr writes go to
125///   pipe ends with no parent reader (the kernel buffers up to one pipe-page, then SIGPIPE on
126///   overflow — workloads that need stdio capture should use an explicit channel).
127/// - All open file descriptors with `fd > 2` in the parent get `FD_CLOEXEC` set in the forked
128///   child via `pre_exec`, so `execve(2)` atomically closes them. We deliberately set CLOEXEC
129///   rather than calling `close(fd)` directly: another thread may have just opened a new FD with
130///   the same number, and CLOEXEC is race-free against fork.
131///
132/// The argv must be non-empty. `argv[0]` is the program path.
133#[cfg(unix)]
134pub fn spawn_isolated_workload(
135    argv: &[String],
136    env: &WorkloadEnv,
137) -> Result<SpawnedWorkload, CellosError> {
138    use std::os::unix::process::CommandExt;
139    use std::process::{Command, Stdio};
140
141    if argv.is_empty() {
142        return Err(CellosError::InvalidSpec(
143            "spawn_isolated_workload: argv must be non-empty".into(),
144        ));
145    }
146
147    let mut cmd = Command::new(&argv[0]);
148    if argv.len() > 1 {
149        cmd.args(&argv[1..]);
150    }
151
152    // Invariant 1: env_clear + only declared keys. No PATH, no LANG, nothing ambient.
153    cmd.env_clear();
154    for (k, v) in env.iter() {
155        cmd.env(k, v);
156    }
157
158    // Invariant 2: stdio = pipes. The parent ends are owned by `Command` until spawn(); after
159    // spawn() returns, we drop them immediately (we don't take stdin/stdout/stderr off `child`),
160    // so the child sees EOF on stdin and orphaned write-ends on stdout/stderr.
161    cmd.stdin(Stdio::piped());
162    cmd.stdout(Stdio::piped());
163    cmd.stderr(Stdio::piped());
164
165    // Invariant 3: close all inherited FDs > 2 on exec. We CLOEXEC rather than close() to avoid
166    // a race with concurrent threads opening new FDs at the same numeric slot.
167    // SAFETY: pre_exec runs in the forked child only, before execve. Only async-signal-safe
168    // operations are used (open/readdir/fcntl on /proc/self/fd, all syscalls).
169    unsafe {
170        cmd.pre_exec(|| {
171            // /proc/self/fd is the canonical Linux/Unix-ish way to enumerate open FDs from
172            // inside the process; on non-Linux Unix this may be missing — fall back to the
173            // RLIMIT_NOFILE-bounded loop below.
174            let mut walked = false;
175            if let Ok(dir) = std::fs::read_dir("/proc/self/fd") {
176                walked = true;
177                for entry in dir.flatten() {
178                    if let Ok(name) = entry.file_name().into_string() {
179                        if let Ok(fd) = name.parse::<libc::c_int>() {
180                            if fd > 2 {
181                                // FD_CLOEXEC: closed atomically by the kernel on execve.
182                                libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC);
183                            }
184                        }
185                    }
186                }
187            }
188            if !walked {
189                // Fallback for kernels without /proc: bound by RLIMIT_NOFILE, capped to a sane
190                // ceiling so an `RLIM_INFINITY` limit doesn't make us walk billions of FDs.
191                const FD_WALK_CEILING: libc::c_int = 65_536;
192                let mut rl: libc::rlimit = libc::rlimit {
193                    rlim_cur: 0,
194                    rlim_max: 0,
195                };
196                let max: libc::c_int = if libc::getrlimit(libc::RLIMIT_NOFILE, &mut rl) == 0 {
197                    if rl.rlim_cur > FD_WALK_CEILING as libc::rlim_t {
198                        FD_WALK_CEILING
199                    } else {
200                        rl.rlim_cur as libc::c_int
201                    }
202                } else {
203                    1024
204                };
205                let mut fd: libc::c_int = 3;
206                while fd < max {
207                    libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC);
208                    fd += 1;
209                }
210            }
211            Ok(())
212        });
213    }
214
215    let mut child = cmd.spawn().map_err(|e| {
216        CellosError::Host(format!(
217            "spawn_isolated_workload: spawn {:?} failed: {e}",
218            argv[0]
219        ))
220    })?;
221
222    // Drop the **parent** ends of the pipes immediately so the child sees stdin EOF and any
223    // stdout/stderr writes go to a pipe with no parent reader. We do *not* expose
224    // stdin/stdout/stderr through `SpawnedWorkload` — capture is an explicit, separate concern.
225    drop(child.stdin.take());
226    drop(child.stdout.take());
227    drop(child.stderr.take());
228
229    Ok(SpawnedWorkload { child })
230}
231
232/// Non-Unix stub: this backend does not spawn host subprocesses on non-Unix platforms.
233#[cfg(not(unix))]
234pub fn spawn_isolated_workload(_argv: &[String], _env: &WorkloadEnv) -> Result<(), CellosError> {
235    Err(CellosError::Host(
236        "spawn_isolated_workload: host subprocess spawn is Unix-only".into(),
237    ))
238}
239
240#[derive(Debug, Clone)]
241struct CellRecord {
242    #[allow(dead_code)]
243    run_token: Uuid,
244    /// cgroup v2 leaf created at `create` (Linux + `CELLOS_CGROUP_PARENT`); removed in `destroy`.
245    #[cfg(target_os = "linux")]
246    cgroup_path: Option<PathBuf>,
247}
248
249/// Simulated proprietary host: tracks active cells; **destroy** removes all host-side state for that id.
250#[derive(Clone)]
251pub struct ProprietaryCellBackend {
252    cells: Arc<Mutex<HashMap<String, CellRecord>>>,
253}
254
255impl Default for ProprietaryCellBackend {
256    fn default() -> Self {
257        Self::new()
258    }
259}
260
261impl ProprietaryCellBackend {
262    pub fn new() -> Self {
263        Self {
264            cells: Arc::new(Mutex::new(HashMap::new())),
265        }
266    }
267
268    /// How many cells the host still considers **live** (tests + operators; not an auth decision).
269    pub async fn tracked_cell_count(&self) -> usize {
270        self.cells.lock().await.len()
271    }
272
273    /// True if any host-tracked state remains for this `cell_id`.
274    pub async fn has_tracked_state(&self, cell_id: &str) -> bool {
275        self.cells.lock().await.contains_key(cell_id)
276    }
277}
278
279/// cgroup v2 leaf under `CELLOS_CGROUP_PARENT` (unified hierarchy; parent must exist and be writable).
280#[cfg(target_os = "linux")]
281fn linux_cgroup_leaf_for_cell(cell_id: &str) -> Result<Option<PathBuf>, CellosError> {
282    let Ok(raw) = std::env::var("CELLOS_CGROUP_PARENT") else {
283        return Ok(None);
284    };
285    let parent = raw.trim();
286    if parent.is_empty() {
287        return Ok(None);
288    }
289    let leaf = PathBuf::from(parent).join(format!(
290        "cellos_{}_{}",
291        sanitize_cgroup_leaf_segment(cell_id),
292        Uuid::new_v4()
293    ));
294    std::fs::create_dir(&leaf).map_err(|e| {
295        CellosError::Host(format!(
296            "CELLOS_CGROUP_PARENT: create cgroup leaf {}: {e}",
297            leaf.display()
298        ))
299    })?;
300    Ok(Some(leaf))
301}
302
303#[async_trait]
304impl CellBackend for ProprietaryCellBackend {
305    #[instrument(skip(self, spec))]
306    async fn create(&self, spec: &ExecutionCellDocument) -> Result<CellHandle, CellosError> {
307        if spec.spec.id.is_empty() {
308            return Err(CellosError::InvalidSpec("spec.id must be non-empty".into()));
309        }
310        let id = spec.spec.id.clone();
311        let mut map = self.cells.lock().await;
312        if map.contains_key(&id) {
313            return Err(CellosError::Host(format!(
314                "cell id {id:?} already active on host (no duplicate live cells)"
315            )));
316        }
317        #[cfg(target_os = "linux")]
318        let cgroup_path = linux_cgroup_leaf_for_cell(&id)?;
319        #[cfg(not(target_os = "linux"))]
320        let cgroup_path = None;
321
322        map.insert(
323            id.clone(),
324            CellRecord {
325                run_token: Uuid::new_v4(),
326                #[cfg(target_os = "linux")]
327                cgroup_path: cgroup_path.clone(),
328            },
329        );
330        Ok(CellHandle {
331            cell_id: id,
332            cgroup_path,
333            // This backend does not own nftables enforcement; the supervisor's
334            // host-subprocess path surfaces the signal via `run_cell_command`.
335            nft_rules_applied: None,
336            // FC-08: host-subprocess backend has no boot artifact manifest.
337            kernel_digest_sha256: None,
338            rootfs_digest_sha256: None,
339            firecracker_digest_sha256: None,
340        })
341    }
342
343    #[instrument(skip(self, handle))]
344    async fn destroy(&self, handle: &CellHandle) -> Result<TeardownReport, CellosError> {
345        let mut map = self.cells.lock().await;
346        let removed = map.remove(&handle.cell_id);
347        if removed.is_none() {
348            return Err(CellosError::Host(format!(
349                "cell {:?} unknown or already destroyed (no double-teardown)",
350                handle.cell_id
351            )));
352        }
353        #[cfg(target_os = "linux")]
354        if let Some(rec) = &removed {
355            if let Some(ref p) = rec.cgroup_path {
356                if let Err(e) = std::fs::remove_dir(p) {
357                    tracing::warn!(
358                        target: "cellos.host.proprietary",
359                        path = %p.display(),
360                        error = %e,
361                        "cgroup leaf cleanup failed (non-fatal)"
362                    );
363                }
364            }
365        }
366        let peers_tracked_after = map.len();
367        Ok(TeardownReport {
368            cell_id: handle.cell_id.clone(),
369            destroyed: true,
370            peers_tracked_after,
371        })
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378
379    #[tokio::test]
380    async fn destroy_removes_tracked_state_same_id_can_run_again() {
381        let host = ProprietaryCellBackend::new();
382        let doc = sample_doc("cell-a");
383
384        let h1 = host.create(&doc).await.unwrap();
385        assert_eq!(host.tracked_cell_count().await, 1);
386        host.destroy(&h1).await.unwrap();
387        assert_eq!(host.tracked_cell_count().await, 0);
388        assert!(!host.has_tracked_state("cell-a").await);
389
390        let h2 = host.create(&doc).await.unwrap();
391        assert_eq!(h2.cell_id, "cell-a");
392        host.destroy(&h2).await.unwrap();
393        assert_eq!(host.tracked_cell_count().await, 0);
394    }
395
396    #[tokio::test]
397    async fn double_destroy_errors() {
398        let host = ProprietaryCellBackend::new();
399        let doc = sample_doc("x");
400        let h = host.create(&doc).await.unwrap();
401        host.destroy(&h).await.unwrap();
402        let err = host.destroy(&h).await.unwrap_err();
403        match err {
404            CellosError::Host(_) => {}
405            e => panic!("expected Host error, got {e:?}"),
406        }
407    }
408
409    #[tokio::test]
410    async fn teardown_report_peers_tracked_after_counts_remaining_cells() {
411        let host = ProprietaryCellBackend::new();
412        let a = host.create(&sample_doc("a")).await.unwrap();
413        let b = host.create(&sample_doc("b")).await.unwrap();
414        let r = host.destroy(&a).await.unwrap();
415        assert!(r.destroyed);
416        assert_eq!(r.peers_tracked_after, 1);
417        assert!(host.has_tracked_state("b").await);
418        let r2 = host.destroy(&b).await.unwrap();
419        assert_eq!(r2.peers_tracked_after, 0);
420    }
421
422    fn sample_doc(id: &str) -> ExecutionCellDocument {
423        serde_json::from_value(serde_json::json!({
424            "apiVersion": "cellos.io/v1",
425            "kind": "ExecutionCell",
426            "spec": {
427                "id": id,
428                "authority": { "secretRefs": [] },
429                "lifetime": { "ttlSeconds": 60 }
430            }
431        }))
432        .unwrap()
433    }
434}