cellos-supervisor 0.5.1

CellOS execution-cell runner — boots cells in Firecracker microVMs or gVisor, enforces narrow typed authority, emits signed CloudEvents.
Documentation
//! Mixed-target export routing and metadata propagation.
//!
//! Verifies that artifact-level target selection fans out to distinct endpoints
//! and forwards `contentType` as the HTTP `Content-Type` header.

#[cfg(unix)]
mod unix {
    use std::fs::File;
    use std::io::{BufRead, BufReader, Read, Write};
    use std::net::{TcpListener, TcpStream};
    use std::path::{Path, PathBuf};
    use std::process::Command;
    use std::thread;
    use std::time::{Duration, Instant};

    use serde_json::Value;

    #[derive(Debug)]
    struct CapturedRequest {
        method: String,
        target: String,
        content_type: Option<String>,
        body: Vec<u8>,
    }

    fn supervisor_exe() -> PathBuf {
        if let Some(p) = std::env::var_os("CARGO_BIN_EXE_cellos_supervisor") {
            return PathBuf::from(p);
        }
        let root = Path::new(env!("CARGO_MANIFEST_DIR"))
            .parent()
            .and_then(|p| p.parent())
            .expect("cellos-supervisor crate under workspace root");
        let profile = std::env::var("PROFILE").unwrap_or_else(|_| "debug".into());
        root.join("target").join(profile).join("cellos-supervisor")
    }

    fn read_request(stream: &mut TcpStream) -> CapturedRequest {
        let mut reader = BufReader::new(stream.try_clone().expect("clone stream"));
        let mut request_line = String::new();
        reader
            .read_line(&mut request_line)
            .expect("read request line");
        assert!(!request_line.trim().is_empty(), "expected request line");

        let mut content_length = 0usize;
        let mut content_type = None;
        loop {
            let mut line = String::new();
            reader.read_line(&mut line).expect("read header");
            if line == "\r\n" || line.is_empty() {
                break;
            }
            if let Some((name, value)) = line.split_once(':') {
                let name = name.trim().to_ascii_lowercase();
                let value = value.trim().to_string();
                if name == "content-length" {
                    content_length = value.parse::<usize>().expect("parse content-length");
                } else if name == "content-type" {
                    content_type = Some(value);
                }
            }
        }

        let mut body = vec![0u8; content_length];
        reader.read_exact(&mut body).expect("read request body");

        let mut parts = request_line.split_whitespace();
        let method = parts.next().expect("method").to_string();
        let target = parts.next().expect("target").to_string();
        CapturedRequest {
            method,
            target,
            content_type,
            body,
        }
    }

    fn write_response(stream: &mut TcpStream) {
        write!(
            stream,
            "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
        )
        .expect("write response");
        stream.flush().expect("flush response");
    }

    fn start_mock_export_server(
        expected_requests: usize,
    ) -> (String, thread::JoinHandle<Vec<CapturedRequest>>) {
        let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock server");
        listener
            .set_nonblocking(true)
            .expect("set nonblocking listener");
        let addr = listener.local_addr().expect("local addr");
        let handle = thread::spawn(move || {
            let deadline = Instant::now() + Duration::from_secs(10);
            let mut requests = Vec::new();
            while requests.len() < expected_requests && Instant::now() < deadline {
                match listener.accept() {
                    Ok((mut stream, _)) => {
                        stream
                            .set_nonblocking(false)
                            .expect("set accepted stream blocking");
                        let request = read_request(&mut stream);
                        assert_eq!(request.method, "PUT", "expected PUT export request");
                        write_response(&mut stream);
                        requests.push(request);
                    }
                    Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
                        thread::sleep(Duration::from_millis(20));
                    }
                    Err(err) => panic!("mock export accept failed: {err}"),
                }
            }
            requests
        });
        (format!("http://{addr}"), handle)
    }

    fn read_jsonl_events(path: &Path) -> Vec<Value> {
        let file = File::open(path).expect("open jsonl");
        BufReader::new(file)
            .lines()
            .map(|line| {
                serde_json::from_str::<Value>(&line.expect("read jsonl line"))
                    .expect("parse jsonl line")
            })
            .collect()
    }

    #[test]
    fn routes_mixed_targets_and_forwards_content_type() {
        let tmp = tempfile::tempdir().expect("tempdir");
        let jsonl_path = tmp.path().join("events.jsonl");
        let spec_path = tmp.path().join("spec.json");
        let junit_path = tmp.path().join("junit.xml");
        let coverage_path = tmp.path().join("coverage.txt");

        std::fs::write(&junit_path, "<testsuite/>").expect("write junit");
        std::fs::write(&coverage_path, "coverage=87%").expect("write coverage");

        let (base_url, server) = start_mock_export_server(2);

        let spec = serde_json::json!({
            "apiVersion": "cellos.io/v1",
            "kind": "ExecutionCell",
            "spec": {
                "id": "multi-target-export",
                "authority": {
                    "secretRefs": []
                },
                "lifetime": {
                    "ttlSeconds": 300
                },
                "export": {
                    "targets": [
                        {
                            "name": "artifact-bucket",
                            "kind": "s3",
                            "bucket": "demo-bucket",
                            "keyPrefix": "multi/run-1"
                        },
                        {
                            "name": "artifact-api",
                            "kind": "http",
                            "baseUrl": "https://artifacts.example.invalid/upload"
                        }
                    ],
                    "artifacts": [
                        {
                            "name": "test-results",
                            "path": junit_path,
                            "target": "artifact-bucket",
                            "contentType": "application/xml"
                        },
                        {
                            "name": "coverage-summary",
                            "path": coverage_path,
                            "target": "artifact-api",
                            "contentType": "text/plain"
                        }
                    ]
                }
            }
        });
        std::fs::write(&spec_path, serde_json::to_string(&spec).unwrap()).expect("write spec");

        let status = Command::new(supervisor_exe())
            .env("CELLOS_DEPLOYMENT_PROFILE", "portable")
            .env("CELL_OS_USE_NOOP_SINK", "1")
            .env("CELLOS_CELL_BACKEND", "stub")
            .env("CELL_OS_JSONL_EVENTS", &jsonl_path)
            .env(
                "CELLOS_EXPORT_S3_PRESIGNED_URL__ARTIFACT_BUCKET",
                format!("{base_url}/s3-upload?X-Amz-Signature=fake"),
            )
            .env(
                "CELLOS_EXPORT_HTTP_BASE_URL__ARTIFACT_API",
                format!("{base_url}/http-upload/{{cell_id}}/{{artifact_name}}"),
            )
            .env("CELL_OS_REQUIRE_S3_EXPORT", "1")
            .env("CELL_OS_REQUIRE_HTTP_EXPORT", "1")
            .current_dir(env!("CARGO_MANIFEST_DIR"))
            .arg(&spec_path)
            .status()
            .expect("spawn cellos-supervisor");

        assert!(
            status.success(),
            "expected successful supervisor run: {status:?}"
        );

        let requests = server.join().expect("join mock export server");
        assert_eq!(requests.len(), 2, "expected two export PUTs");

        let s3_request = requests
            .iter()
            .find(|request| request.target.starts_with("/s3-upload?"))
            .expect("s3 export request");
        assert_eq!(
            s3_request.content_type.as_deref(),
            Some("application/xml"),
            "s3 request should forward artifact content type"
        );
        assert_eq!(
            String::from_utf8(s3_request.body.clone()).unwrap(),
            "<testsuite/>"
        );

        let http_request = requests
            .iter()
            .find(|request| request.target == "/http-upload/multi-target-export/coverage-summary")
            .expect("http export request");
        assert_eq!(
            http_request.content_type.as_deref(),
            Some("text/plain"),
            "http request should forward artifact content type"
        );
        assert_eq!(
            String::from_utf8(http_request.body.clone()).unwrap(),
            "coverage=87%"
        );

        let events = read_jsonl_events(&jsonl_path);
        let export_events: Vec<&Value> = events
            .iter()
            .filter(|event| event["type"] == "dev.cellos.events.cell.export.v2.completed")
            .collect();
        assert_eq!(
            export_events.len(),
            2,
            "expected two export completed events"
        );

        let s3_event = export_events
            .iter()
            .find(|event| event["data"]["artifactName"] == "test-results")
            .expect("s3 export event");
        assert_eq!(s3_event["data"]["receipt"]["targetKind"], "s3");
        assert_eq!(
            s3_event["data"]["receipt"]["destination"],
            "s3://demo-bucket/multi/run-1/test-results"
        );

        let http_event = export_events
            .iter()
            .find(|event| event["data"]["artifactName"] == "coverage-summary")
            .expect("http export event");
        assert_eq!(http_event["data"]["receipt"]["targetKind"], "http");
        assert_eq!(
            http_event["data"]["receipt"]["destination"],
            format!("{base_url}/http-upload/multi-target-export/coverage-summary")
        );
    }
}