cellos_host_cellos/
lib.rs1pub mod memory_broker;
26
27pub use memory_broker::MemorySecretBroker;
28
29use std::collections::HashMap;
30#[cfg(target_os = "linux")]
31use std::path::PathBuf;
32use std::sync::Arc;
33
34use async_trait::async_trait;
35use tokio::sync::Mutex;
36use tracing::instrument;
37use uuid::Uuid;
38
39use cellos_core::ports::{CellBackend, CellHandle, TeardownReport};
40#[cfg(target_os = "linux")]
41use cellos_core::sanitize_cgroup_leaf_segment;
42use cellos_core::{CellosError, ExecutionCellDocument};
43
44#[derive(Debug, Clone, Default)]
48pub struct WorkloadEnv {
49 pairs: Vec<(String, String)>,
50}
51
52impl WorkloadEnv {
53 pub fn new() -> Self {
54 Self { pairs: Vec::new() }
55 }
56
57 pub fn push(
59 &mut self,
60 key: impl Into<String>,
61 value: impl Into<String>,
62 ) -> Result<(), CellosError> {
63 let k = key.into();
64 let v = value.into();
65 if k.is_empty() {
66 return Err(CellosError::InvalidSpec("env key must be non-empty".into()));
67 }
68 if k.contains('=') || k.as_bytes().contains(&0u8) {
69 return Err(CellosError::InvalidSpec(format!(
70 "env key {k:?} contains '=' or NUL — refused"
71 )));
72 }
73 if v.as_bytes().contains(&0u8) {
74 return Err(CellosError::InvalidSpec(format!(
75 "env value for {k:?} contains NUL — refused"
76 )));
77 }
78 self.pairs.push((k, v));
79 Ok(())
80 }
81
82 pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
83 self.pairs.iter().map(|(k, v)| (k.as_str(), v.as_str()))
84 }
85
86 pub fn len(&self) -> usize {
87 self.pairs.len()
88 }
89
90 pub fn is_empty(&self) -> bool {
91 self.pairs.is_empty()
92 }
93}
94
95#[cfg(unix)]
97pub struct SpawnedWorkload {
98 child: std::process::Child,
99}
100
101#[cfg(unix)]
102impl SpawnedWorkload {
103 pub fn pid(&self) -> u32 {
105 self.child.id()
106 }
107
108 pub fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
110 self.child.wait()
111 }
112
113 pub fn kill(&mut self) -> std::io::Result<()> {
115 self.child.kill()
116 }
117}
118
119#[cfg(unix)]
134pub fn spawn_isolated_workload(
135 argv: &[String],
136 env: &WorkloadEnv,
137) -> Result<SpawnedWorkload, CellosError> {
138 use std::os::unix::process::CommandExt;
139 use std::process::{Command, Stdio};
140
141 if argv.is_empty() {
142 return Err(CellosError::InvalidSpec(
143 "spawn_isolated_workload: argv must be non-empty".into(),
144 ));
145 }
146
147 let mut cmd = Command::new(&argv[0]);
148 if argv.len() > 1 {
149 cmd.args(&argv[1..]);
150 }
151
152 cmd.env_clear();
154 for (k, v) in env.iter() {
155 cmd.env(k, v);
156 }
157
158 cmd.stdin(Stdio::piped());
162 cmd.stdout(Stdio::piped());
163 cmd.stderr(Stdio::piped());
164
165 unsafe {
170 cmd.pre_exec(|| {
171 let mut walked = false;
175 if let Ok(dir) = std::fs::read_dir("/proc/self/fd") {
176 walked = true;
177 for entry in dir.flatten() {
178 if let Ok(name) = entry.file_name().into_string() {
179 if let Ok(fd) = name.parse::<libc::c_int>() {
180 if fd > 2 {
181 libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC);
183 }
184 }
185 }
186 }
187 }
188 if !walked {
189 const FD_WALK_CEILING: libc::c_int = 65_536;
192 let mut rl: libc::rlimit = libc::rlimit {
193 rlim_cur: 0,
194 rlim_max: 0,
195 };
196 let max: libc::c_int = if libc::getrlimit(libc::RLIMIT_NOFILE, &mut rl) == 0 {
197 if rl.rlim_cur > FD_WALK_CEILING as libc::rlim_t {
198 FD_WALK_CEILING
199 } else {
200 rl.rlim_cur as libc::c_int
201 }
202 } else {
203 1024
204 };
205 let mut fd: libc::c_int = 3;
206 while fd < max {
207 libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC);
208 fd += 1;
209 }
210 }
211 Ok(())
212 });
213 }
214
215 let mut child = cmd.spawn().map_err(|e| {
216 CellosError::Host(format!(
217 "spawn_isolated_workload: spawn {:?} failed: {e}",
218 argv[0]
219 ))
220 })?;
221
222 drop(child.stdin.take());
226 drop(child.stdout.take());
227 drop(child.stderr.take());
228
229 Ok(SpawnedWorkload { child })
230}
231
232#[cfg(not(unix))]
234pub fn spawn_isolated_workload(_argv: &[String], _env: &WorkloadEnv) -> Result<(), CellosError> {
235 Err(CellosError::Host(
236 "spawn_isolated_workload: host subprocess spawn is Unix-only".into(),
237 ))
238}
239
240#[derive(Debug, Clone)]
241struct CellRecord {
242 #[allow(dead_code)]
243 run_token: Uuid,
244 #[cfg(target_os = "linux")]
246 cgroup_path: Option<PathBuf>,
247}
248
249#[derive(Clone)]
251pub struct ProprietaryCellBackend {
252 cells: Arc<Mutex<HashMap<String, CellRecord>>>,
253}
254
255impl Default for ProprietaryCellBackend {
256 fn default() -> Self {
257 Self::new()
258 }
259}
260
261impl ProprietaryCellBackend {
262 pub fn new() -> Self {
263 Self {
264 cells: Arc::new(Mutex::new(HashMap::new())),
265 }
266 }
267
268 pub async fn tracked_cell_count(&self) -> usize {
270 self.cells.lock().await.len()
271 }
272
273 pub async fn has_tracked_state(&self, cell_id: &str) -> bool {
275 self.cells.lock().await.contains_key(cell_id)
276 }
277}
278
279#[cfg(target_os = "linux")]
281fn linux_cgroup_leaf_for_cell(cell_id: &str) -> Result<Option<PathBuf>, CellosError> {
282 let Ok(raw) = std::env::var("CELLOS_CGROUP_PARENT") else {
283 return Ok(None);
284 };
285 let parent = raw.trim();
286 if parent.is_empty() {
287 return Ok(None);
288 }
289 let leaf = PathBuf::from(parent).join(format!(
290 "cellos_{}_{}",
291 sanitize_cgroup_leaf_segment(cell_id),
292 Uuid::new_v4()
293 ));
294 std::fs::create_dir(&leaf).map_err(|e| {
295 CellosError::Host(format!(
296 "CELLOS_CGROUP_PARENT: create cgroup leaf {}: {e}",
297 leaf.display()
298 ))
299 })?;
300 Ok(Some(leaf))
301}
302
303#[async_trait]
304impl CellBackend for ProprietaryCellBackend {
305 #[instrument(skip(self, spec))]
306 async fn create(&self, spec: &ExecutionCellDocument) -> Result<CellHandle, CellosError> {
307 if spec.spec.id.is_empty() {
308 return Err(CellosError::InvalidSpec("spec.id must be non-empty".into()));
309 }
310 let id = spec.spec.id.clone();
311 let mut map = self.cells.lock().await;
312 if map.contains_key(&id) {
313 return Err(CellosError::Host(format!(
314 "cell id {id:?} already active on host (no duplicate live cells)"
315 )));
316 }
317 #[cfg(target_os = "linux")]
318 let cgroup_path = linux_cgroup_leaf_for_cell(&id)?;
319 #[cfg(not(target_os = "linux"))]
320 let cgroup_path = None;
321
322 map.insert(
323 id.clone(),
324 CellRecord {
325 run_token: Uuid::new_v4(),
326 #[cfg(target_os = "linux")]
327 cgroup_path: cgroup_path.clone(),
328 },
329 );
330 Ok(CellHandle {
331 cell_id: id,
332 cgroup_path,
333 nft_rules_applied: None,
336 kernel_digest_sha256: None,
338 rootfs_digest_sha256: None,
339 firecracker_digest_sha256: None,
340 })
341 }
342
343 #[instrument(skip(self, handle))]
344 async fn destroy(&self, handle: &CellHandle) -> Result<TeardownReport, CellosError> {
345 let mut map = self.cells.lock().await;
346 let removed = map.remove(&handle.cell_id);
347 if removed.is_none() {
348 return Err(CellosError::Host(format!(
349 "cell {:?} unknown or already destroyed (no double-teardown)",
350 handle.cell_id
351 )));
352 }
353 #[cfg(target_os = "linux")]
354 if let Some(rec) = &removed {
355 if let Some(ref p) = rec.cgroup_path {
356 if let Err(e) = std::fs::remove_dir(p) {
357 tracing::warn!(
358 target: "cellos.host.proprietary",
359 path = %p.display(),
360 error = %e,
361 "cgroup leaf cleanup failed (non-fatal)"
362 );
363 }
364 }
365 }
366 let peers_tracked_after = map.len();
367 Ok(TeardownReport {
368 cell_id: handle.cell_id.clone(),
369 destroyed: true,
370 peers_tracked_after,
371 })
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378
379 #[tokio::test]
380 async fn destroy_removes_tracked_state_same_id_can_run_again() {
381 let host = ProprietaryCellBackend::new();
382 let doc = sample_doc("cell-a");
383
384 let h1 = host.create(&doc).await.unwrap();
385 assert_eq!(host.tracked_cell_count().await, 1);
386 host.destroy(&h1).await.unwrap();
387 assert_eq!(host.tracked_cell_count().await, 0);
388 assert!(!host.has_tracked_state("cell-a").await);
389
390 let h2 = host.create(&doc).await.unwrap();
391 assert_eq!(h2.cell_id, "cell-a");
392 host.destroy(&h2).await.unwrap();
393 assert_eq!(host.tracked_cell_count().await, 0);
394 }
395
396 #[tokio::test]
397 async fn double_destroy_errors() {
398 let host = ProprietaryCellBackend::new();
399 let doc = sample_doc("x");
400 let h = host.create(&doc).await.unwrap();
401 host.destroy(&h).await.unwrap();
402 let err = host.destroy(&h).await.unwrap_err();
403 match err {
404 CellosError::Host(_) => {}
405 e => panic!("expected Host error, got {e:?}"),
406 }
407 }
408
409 #[tokio::test]
410 async fn teardown_report_peers_tracked_after_counts_remaining_cells() {
411 let host = ProprietaryCellBackend::new();
412 let a = host.create(&sample_doc("a")).await.unwrap();
413 let b = host.create(&sample_doc("b")).await.unwrap();
414 let r = host.destroy(&a).await.unwrap();
415 assert!(r.destroyed);
416 assert_eq!(r.peers_tracked_after, 1);
417 assert!(host.has_tracked_state("b").await);
418 let r2 = host.destroy(&b).await.unwrap();
419 assert_eq!(r2.peers_tracked_after, 0);
420 }
421
422 fn sample_doc(id: &str) -> ExecutionCellDocument {
423 serde_json::from_value(serde_json::json!({
424 "apiVersion": "cellos.io/v1",
425 "kind": "ExecutionCell",
426 "spec": {
427 "id": id,
428 "authority": { "secretRefs": [] },
429 "lifetime": { "ttlSeconds": 60 }
430 }
431 }))
432 .unwrap()
433 }
434}