#[cfg(unix)]
mod unix {
use std::fs::File;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::thread;
use std::time::{Duration, Instant};
use serde_json::Value;
#[derive(Debug)]
struct CapturedRequest {
method: String,
target: String,
content_type: Option<String>,
body: Vec<u8>,
}
fn supervisor_exe() -> PathBuf {
if let Some(p) = std::env::var_os("CARGO_BIN_EXE_cellos_supervisor") {
return PathBuf::from(p);
}
let root = Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.and_then(|p| p.parent())
.expect("cellos-supervisor crate under workspace root");
let profile = std::env::var("PROFILE").unwrap_or_else(|_| "debug".into());
root.join("target").join(profile).join("cellos-supervisor")
}
fn read_request(stream: &mut TcpStream) -> CapturedRequest {
let mut reader = BufReader::new(stream.try_clone().expect("clone stream"));
let mut request_line = String::new();
reader
.read_line(&mut request_line)
.expect("read request line");
assert!(!request_line.trim().is_empty(), "expected request line");
let mut content_length = 0usize;
let mut content_type = None;
loop {
let mut line = String::new();
reader.read_line(&mut line).expect("read header");
if line == "\r\n" || line.is_empty() {
break;
}
if let Some((name, value)) = line.split_once(':') {
let name = name.trim().to_ascii_lowercase();
let value = value.trim().to_string();
if name == "content-length" {
content_length = value.parse::<usize>().expect("parse content-length");
} else if name == "content-type" {
content_type = Some(value);
}
}
}
let mut body = vec![0u8; content_length];
reader.read_exact(&mut body).expect("read request body");
let mut parts = request_line.split_whitespace();
let method = parts.next().expect("method").to_string();
let target = parts.next().expect("target").to_string();
CapturedRequest {
method,
target,
content_type,
body,
}
}
fn write_response(stream: &mut TcpStream) {
write!(
stream,
"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
)
.expect("write response");
stream.flush().expect("flush response");
}
fn start_mock_export_server(
expected_requests: usize,
) -> (String, thread::JoinHandle<Vec<CapturedRequest>>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock server");
listener
.set_nonblocking(true)
.expect("set nonblocking listener");
let addr = listener.local_addr().expect("local addr");
let handle = thread::spawn(move || {
let deadline = Instant::now() + Duration::from_secs(10);
let mut requests = Vec::new();
while requests.len() < expected_requests && Instant::now() < deadline {
match listener.accept() {
Ok((mut stream, _)) => {
stream
.set_nonblocking(false)
.expect("set accepted stream blocking");
let request = read_request(&mut stream);
assert_eq!(request.method, "PUT", "expected PUT export request");
write_response(&mut stream);
requests.push(request);
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(20));
}
Err(err) => panic!("mock export accept failed: {err}"),
}
}
requests
});
(format!("http://{addr}"), handle)
}
fn read_jsonl_events(path: &Path) -> Vec<Value> {
let file = File::open(path).expect("open jsonl");
BufReader::new(file)
.lines()
.map(|line| {
serde_json::from_str::<Value>(&line.expect("read jsonl line"))
.expect("parse jsonl line")
})
.collect()
}
#[test]
fn routes_mixed_targets_and_forwards_content_type() {
let tmp = tempfile::tempdir().expect("tempdir");
let jsonl_path = tmp.path().join("events.jsonl");
let spec_path = tmp.path().join("spec.json");
let junit_path = tmp.path().join("junit.xml");
let coverage_path = tmp.path().join("coverage.txt");
std::fs::write(&junit_path, "<testsuite/>").expect("write junit");
std::fs::write(&coverage_path, "coverage=87%").expect("write coverage");
let (base_url, server) = start_mock_export_server(2);
let spec = serde_json::json!({
"apiVersion": "cellos.io/v1",
"kind": "ExecutionCell",
"spec": {
"id": "multi-target-export",
"authority": {
"secretRefs": []
},
"lifetime": {
"ttlSeconds": 300
},
"export": {
"targets": [
{
"name": "artifact-bucket",
"kind": "s3",
"bucket": "demo-bucket",
"keyPrefix": "multi/run-1"
},
{
"name": "artifact-api",
"kind": "http",
"baseUrl": "https://artifacts.example.invalid/upload"
}
],
"artifacts": [
{
"name": "test-results",
"path": junit_path,
"target": "artifact-bucket",
"contentType": "application/xml"
},
{
"name": "coverage-summary",
"path": coverage_path,
"target": "artifact-api",
"contentType": "text/plain"
}
]
}
}
});
std::fs::write(&spec_path, serde_json::to_string(&spec).unwrap()).expect("write spec");
let status = Command::new(supervisor_exe())
.env("CELLOS_DEPLOYMENT_PROFILE", "portable")
.env("CELL_OS_USE_NOOP_SINK", "1")
.env("CELLOS_CELL_BACKEND", "stub")
.env("CELL_OS_JSONL_EVENTS", &jsonl_path)
.env(
"CELLOS_EXPORT_S3_PRESIGNED_URL__ARTIFACT_BUCKET",
format!("{base_url}/s3-upload?X-Amz-Signature=fake"),
)
.env(
"CELLOS_EXPORT_HTTP_BASE_URL__ARTIFACT_API",
format!("{base_url}/http-upload/{{cell_id}}/{{artifact_name}}"),
)
.env("CELL_OS_REQUIRE_S3_EXPORT", "1")
.env("CELL_OS_REQUIRE_HTTP_EXPORT", "1")
.current_dir(env!("CARGO_MANIFEST_DIR"))
.arg(&spec_path)
.status()
.expect("spawn cellos-supervisor");
assert!(
status.success(),
"expected successful supervisor run: {status:?}"
);
let requests = server.join().expect("join mock export server");
assert_eq!(requests.len(), 2, "expected two export PUTs");
let s3_request = requests
.iter()
.find(|request| request.target.starts_with("/s3-upload?"))
.expect("s3 export request");
assert_eq!(
s3_request.content_type.as_deref(),
Some("application/xml"),
"s3 request should forward artifact content type"
);
assert_eq!(
String::from_utf8(s3_request.body.clone()).unwrap(),
"<testsuite/>"
);
let http_request = requests
.iter()
.find(|request| request.target == "/http-upload/multi-target-export/coverage-summary")
.expect("http export request");
assert_eq!(
http_request.content_type.as_deref(),
Some("text/plain"),
"http request should forward artifact content type"
);
assert_eq!(
String::from_utf8(http_request.body.clone()).unwrap(),
"coverage=87%"
);
let events = read_jsonl_events(&jsonl_path);
let export_events: Vec<&Value> = events
.iter()
.filter(|event| event["type"] == "dev.cellos.events.cell.export.v2.completed")
.collect();
assert_eq!(
export_events.len(),
2,
"expected two export completed events"
);
let s3_event = export_events
.iter()
.find(|event| event["data"]["artifactName"] == "test-results")
.expect("s3 export event");
assert_eq!(s3_event["data"]["receipt"]["targetKind"], "s3");
assert_eq!(
s3_event["data"]["receipt"]["destination"],
"s3://demo-bucket/multi/run-1/test-results"
);
let http_event = export_events
.iter()
.find(|event| event["data"]["artifactName"] == "coverage-summary")
.expect("http export event");
assert_eq!(http_event["data"]["receipt"]["targetKind"], "http");
assert_eq!(
http_event["data"]["receipt"]["destination"],
format!("{base_url}/http-upload/multi-target-export/coverage-summary")
);
}
}