cellos-supervisor 0.5.1

CellOS execution-cell runner — boots cells in Firecracker microVMs or gVisor, enforces narrow typed authority, emits signed CloudEvents.
Documentation
//! Remote export retries for transient HTTP/S3 failures.

#[cfg(unix)]
mod unix {
    use std::collections::HashMap;
    use std::fs::File;
    use std::io::{BufRead, BufReader, Read, Write};
    use std::net::{TcpListener, TcpStream};
    use std::path::{Path, PathBuf};
    use std::process::Command;
    use std::thread;
    use std::time::{Duration, Instant};

    use serde_json::Value;

    #[derive(Debug)]
    struct CapturedRequest {
        target: String,
        content_type: Option<String>,
        body: Vec<u8>,
    }

    type FlakyServerResult = (Vec<CapturedRequest>, HashMap<String, usize>);

    fn supervisor_exe() -> PathBuf {
        if let Some(p) = std::env::var_os("CARGO_BIN_EXE_cellos_supervisor") {
            return PathBuf::from(p);
        }
        let root = Path::new(env!("CARGO_MANIFEST_DIR"))
            .parent()
            .and_then(|p| p.parent())
            .expect("cellos-supervisor crate under workspace root");
        let profile = std::env::var("PROFILE").unwrap_or_else(|_| "debug".into());
        root.join("target").join(profile).join("cellos-supervisor")
    }

    fn read_request(stream: &mut TcpStream) -> CapturedRequest {
        let mut reader = BufReader::new(stream.try_clone().expect("clone stream"));
        let mut request_line = String::new();
        reader
            .read_line(&mut request_line)
            .expect("read request line");
        assert!(!request_line.trim().is_empty(), "expected request line");

        let mut content_length = 0usize;
        let mut content_type = None;
        loop {
            let mut line = String::new();
            reader.read_line(&mut line).expect("read header");
            if line == "\r\n" || line.is_empty() {
                break;
            }
            if let Some((name, value)) = line.split_once(':') {
                let name = name.trim().to_ascii_lowercase();
                let value = value.trim().to_string();
                if name == "content-length" {
                    content_length = value.parse::<usize>().expect("parse content-length");
                } else if name == "content-type" {
                    content_type = Some(value);
                }
            }
        }

        let mut body = vec![0u8; content_length];
        reader.read_exact(&mut body).expect("read request body");

        let mut parts = request_line.split_whitespace();
        assert_eq!(parts.next(), Some("PUT"), "expected PUT export request");
        let target = parts.next().expect("target").to_string();
        CapturedRequest {
            target,
            content_type,
            body,
        }
    }

    fn write_response(stream: &mut TcpStream, status_line: &str, body: &str) {
        write!(
            stream,
            "{status_line}\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
            body.len()
        )
        .expect("write response");
        stream.flush().expect("flush response");
    }

    fn route_key(target: &str) -> &'static str {
        if target.starts_with("/s3-upload?") {
            "s3"
        } else if target.starts_with("/http-upload/") {
            "http"
        } else {
            panic!("unexpected target: {target}");
        }
    }

    fn start_flaky_export_server(
        expected_requests: usize,
    ) -> (String, thread::JoinHandle<FlakyServerResult>) {
        let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock server");
        listener
            .set_nonblocking(true)
            .expect("set nonblocking listener");
        let addr = listener.local_addr().expect("local addr");
        let handle = thread::spawn(move || {
            let deadline = Instant::now() + Duration::from_secs(10);
            let mut requests = Vec::new();
            let mut attempts_by_route = HashMap::new();
            while requests.len() < expected_requests && Instant::now() < deadline {
                match listener.accept() {
                    Ok((mut stream, _)) => {
                        stream
                            .set_nonblocking(false)
                            .expect("set accepted stream blocking");
                        let request = read_request(&mut stream);
                        let route = route_key(&request.target).to_string();
                        let attempts = attempts_by_route.entry(route).or_insert(0usize);
                        *attempts += 1;
                        if *attempts == 1 {
                            write_response(
                                &mut stream,
                                "HTTP/1.1 503 Service Unavailable",
                                "retry me",
                            );
                        } else {
                            write_response(&mut stream, "HTTP/1.1 200 OK", "");
                        }
                        requests.push(request);
                    }
                    Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
                        thread::sleep(Duration::from_millis(20));
                    }
                    Err(err) => panic!("mock export accept failed: {err}"),
                }
            }
            (requests, attempts_by_route)
        });
        (format!("http://{addr}"), handle)
    }

    fn read_jsonl_events(path: &Path) -> Vec<Value> {
        let file = File::open(path).expect("open jsonl");
        BufReader::new(file)
            .lines()
            .map(|line| {
                serde_json::from_str::<Value>(&line.expect("read jsonl line"))
                    .expect("parse jsonl line")
            })
            .collect()
    }

    #[test]
    fn retries_transient_http_and_s3_failures() {
        let tmp = tempfile::tempdir().expect("tempdir");
        let jsonl_path = tmp.path().join("events.jsonl");
        let spec_path = tmp.path().join("spec.json");
        let junit_path = tmp.path().join("junit.xml");
        let coverage_path = tmp.path().join("coverage.txt");

        std::fs::write(&junit_path, "<testsuite/>").expect("write junit");
        std::fs::write(&coverage_path, "coverage=91%").expect("write coverage");

        let (base_url, server) = start_flaky_export_server(4);

        let spec = serde_json::json!({
            "apiVersion": "cellos.io/v1",
            "kind": "ExecutionCell",
            "spec": {
                "id": "retry-remote-export",
                "authority": {
                    "secretRefs": []
                },
                "lifetime": {
                    "ttlSeconds": 300
                },
                "export": {
                    "targets": [
                        {
                            "name": "artifact-bucket",
                            "kind": "s3",
                            "bucket": "demo-bucket",
                            "keyPrefix": "retry/run-1"
                        },
                        {
                            "name": "artifact-api",
                            "kind": "http",
                            "baseUrl": "https://artifacts.example.invalid/upload"
                        }
                    ],
                    "artifacts": [
                        {
                            "name": "test-results",
                            "path": junit_path,
                            "target": "artifact-bucket",
                            "contentType": "application/xml"
                        },
                        {
                            "name": "coverage-summary",
                            "path": coverage_path,
                            "target": "artifact-api",
                            "contentType": "text/plain"
                        }
                    ]
                }
            }
        });
        std::fs::write(&spec_path, serde_json::to_string(&spec).unwrap()).expect("write spec");

        let status = Command::new(supervisor_exe())
            .env("CELLOS_DEPLOYMENT_PROFILE", "portable")
            .env("CELL_OS_USE_NOOP_SINK", "1")
            .env("CELLOS_CELL_BACKEND", "stub")
            .env("CELL_OS_JSONL_EVENTS", &jsonl_path)
            .env(
                "CELLOS_EXPORT_S3_PRESIGNED_URL__ARTIFACT_BUCKET",
                format!("{base_url}/s3-upload?X-Amz-Signature=fake"),
            )
            .env("CELLOS_EXPORT_S3_MAX_ATTEMPTS__ARTIFACT_BUCKET", "2")
            .env("CELLOS_EXPORT_S3_RETRY_BACKOFF_MS__ARTIFACT_BUCKET", "0")
            .env(
                "CELLOS_EXPORT_HTTP_BASE_URL__ARTIFACT_API",
                format!("{base_url}/http-upload/{{cell_id}}/{{artifact_name}}"),
            )
            .env("CELLOS_EXPORT_HTTP_MAX_ATTEMPTS__ARTIFACT_API", "2")
            .env("CELLOS_EXPORT_HTTP_RETRY_BACKOFF_MS__ARTIFACT_API", "0")
            .env("CELL_OS_REQUIRE_S3_EXPORT", "1")
            .env("CELL_OS_REQUIRE_HTTP_EXPORT", "1")
            .current_dir(env!("CARGO_MANIFEST_DIR"))
            .arg(&spec_path)
            .status()
            .expect("spawn cellos-supervisor");

        assert!(
            status.success(),
            "expected successful supervisor run after retries: {status:?}"
        );

        let (requests, attempts_by_route) = server.join().expect("join mock export server");
        assert_eq!(requests.len(), 4, "expected retry traffic for both targets");
        assert_eq!(attempts_by_route.get("s3"), Some(&2));
        assert_eq!(attempts_by_route.get("http"), Some(&2));

        let s3_requests: Vec<&CapturedRequest> = requests
            .iter()
            .filter(|request| request.target.starts_with("/s3-upload?"))
            .collect();
        assert_eq!(s3_requests.len(), 2, "expected one retry for S3");
        assert!(s3_requests
            .iter()
            .all(|request| request.content_type.as_deref() == Some("application/xml")));
        assert!(s3_requests
            .iter()
            .all(|request| String::from_utf8(request.body.clone()).unwrap() == "<testsuite/>"));

        let http_requests: Vec<&CapturedRequest> = requests
            .iter()
            .filter(|request| request.target == "/http-upload/retry-remote-export/coverage-summary")
            .collect();
        assert_eq!(http_requests.len(), 2, "expected one retry for HTTP");
        assert!(http_requests
            .iter()
            .all(|request| request.content_type.as_deref() == Some("text/plain")));
        assert!(http_requests
            .iter()
            .all(|request| String::from_utf8(request.body.clone()).unwrap() == "coverage=91%"));

        let events = read_jsonl_events(&jsonl_path);
        let export_completed: Vec<&Value> = events
            .iter()
            .filter(|event| event["type"] == "dev.cellos.events.cell.export.v2.completed")
            .collect();
        assert_eq!(export_completed.len(), 2, "expected two completed exports");
        assert!(
            !events
                .iter()
                .any(|event| event["type"] == "dev.cellos.events.cell.export.v2.failed"),
            "did not expect export failed events after successful retries"
        );
    }
}