#[cfg(unix)]
mod unix {
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::thread;
use std::time::{Duration, Instant};
use serde_json::Value;
#[derive(Debug)]
struct CapturedRequest {
target: String,
content_type: Option<String>,
body: Vec<u8>,
}
type FlakyServerResult = (Vec<CapturedRequest>, HashMap<String, usize>);
fn supervisor_exe() -> PathBuf {
if let Some(p) = std::env::var_os("CARGO_BIN_EXE_cellos_supervisor") {
return PathBuf::from(p);
}
let root = Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.and_then(|p| p.parent())
.expect("cellos-supervisor crate under workspace root");
let profile = std::env::var("PROFILE").unwrap_or_else(|_| "debug".into());
root.join("target").join(profile).join("cellos-supervisor")
}
fn read_request(stream: &mut TcpStream) -> CapturedRequest {
let mut reader = BufReader::new(stream.try_clone().expect("clone stream"));
let mut request_line = String::new();
reader
.read_line(&mut request_line)
.expect("read request line");
assert!(!request_line.trim().is_empty(), "expected request line");
let mut content_length = 0usize;
let mut content_type = None;
loop {
let mut line = String::new();
reader.read_line(&mut line).expect("read header");
if line == "\r\n" || line.is_empty() {
break;
}
if let Some((name, value)) = line.split_once(':') {
let name = name.trim().to_ascii_lowercase();
let value = value.trim().to_string();
if name == "content-length" {
content_length = value.parse::<usize>().expect("parse content-length");
} else if name == "content-type" {
content_type = Some(value);
}
}
}
let mut body = vec![0u8; content_length];
reader.read_exact(&mut body).expect("read request body");
let mut parts = request_line.split_whitespace();
assert_eq!(parts.next(), Some("PUT"), "expected PUT export request");
let target = parts.next().expect("target").to_string();
CapturedRequest {
target,
content_type,
body,
}
}
fn write_response(stream: &mut TcpStream, status_line: &str, body: &str) {
write!(
stream,
"{status_line}\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
)
.expect("write response");
stream.flush().expect("flush response");
}
fn route_key(target: &str) -> &'static str {
if target.starts_with("/s3-upload?") {
"s3"
} else if target.starts_with("/http-upload/") {
"http"
} else {
panic!("unexpected target: {target}");
}
}
fn start_flaky_export_server(
expected_requests: usize,
) -> (String, thread::JoinHandle<FlakyServerResult>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock server");
listener
.set_nonblocking(true)
.expect("set nonblocking listener");
let addr = listener.local_addr().expect("local addr");
let handle = thread::spawn(move || {
let deadline = Instant::now() + Duration::from_secs(10);
let mut requests = Vec::new();
let mut attempts_by_route = HashMap::new();
while requests.len() < expected_requests && Instant::now() < deadline {
match listener.accept() {
Ok((mut stream, _)) => {
stream
.set_nonblocking(false)
.expect("set accepted stream blocking");
let request = read_request(&mut stream);
let route = route_key(&request.target).to_string();
let attempts = attempts_by_route.entry(route).or_insert(0usize);
*attempts += 1;
if *attempts == 1 {
write_response(
&mut stream,
"HTTP/1.1 503 Service Unavailable",
"retry me",
);
} else {
write_response(&mut stream, "HTTP/1.1 200 OK", "");
}
requests.push(request);
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(20));
}
Err(err) => panic!("mock export accept failed: {err}"),
}
}
(requests, attempts_by_route)
});
(format!("http://{addr}"), handle)
}
fn read_jsonl_events(path: &Path) -> Vec<Value> {
let file = File::open(path).expect("open jsonl");
BufReader::new(file)
.lines()
.map(|line| {
serde_json::from_str::<Value>(&line.expect("read jsonl line"))
.expect("parse jsonl line")
})
.collect()
}
#[test]
fn retries_transient_http_and_s3_failures() {
let tmp = tempfile::tempdir().expect("tempdir");
let jsonl_path = tmp.path().join("events.jsonl");
let spec_path = tmp.path().join("spec.json");
let junit_path = tmp.path().join("junit.xml");
let coverage_path = tmp.path().join("coverage.txt");
std::fs::write(&junit_path, "<testsuite/>").expect("write junit");
std::fs::write(&coverage_path, "coverage=91%").expect("write coverage");
let (base_url, server) = start_flaky_export_server(4);
let spec = serde_json::json!({
"apiVersion": "cellos.io/v1",
"kind": "ExecutionCell",
"spec": {
"id": "retry-remote-export",
"authority": {
"secretRefs": []
},
"lifetime": {
"ttlSeconds": 300
},
"export": {
"targets": [
{
"name": "artifact-bucket",
"kind": "s3",
"bucket": "demo-bucket",
"keyPrefix": "retry/run-1"
},
{
"name": "artifact-api",
"kind": "http",
"baseUrl": "https://artifacts.example.invalid/upload"
}
],
"artifacts": [
{
"name": "test-results",
"path": junit_path,
"target": "artifact-bucket",
"contentType": "application/xml"
},
{
"name": "coverage-summary",
"path": coverage_path,
"target": "artifact-api",
"contentType": "text/plain"
}
]
}
}
});
std::fs::write(&spec_path, serde_json::to_string(&spec).unwrap()).expect("write spec");
let status = Command::new(supervisor_exe())
.env("CELLOS_DEPLOYMENT_PROFILE", "portable")
.env("CELL_OS_USE_NOOP_SINK", "1")
.env("CELLOS_CELL_BACKEND", "stub")
.env("CELL_OS_JSONL_EVENTS", &jsonl_path)
.env(
"CELLOS_EXPORT_S3_PRESIGNED_URL__ARTIFACT_BUCKET",
format!("{base_url}/s3-upload?X-Amz-Signature=fake"),
)
.env("CELLOS_EXPORT_S3_MAX_ATTEMPTS__ARTIFACT_BUCKET", "2")
.env("CELLOS_EXPORT_S3_RETRY_BACKOFF_MS__ARTIFACT_BUCKET", "0")
.env(
"CELLOS_EXPORT_HTTP_BASE_URL__ARTIFACT_API",
format!("{base_url}/http-upload/{{cell_id}}/{{artifact_name}}"),
)
.env("CELLOS_EXPORT_HTTP_MAX_ATTEMPTS__ARTIFACT_API", "2")
.env("CELLOS_EXPORT_HTTP_RETRY_BACKOFF_MS__ARTIFACT_API", "0")
.env("CELL_OS_REQUIRE_S3_EXPORT", "1")
.env("CELL_OS_REQUIRE_HTTP_EXPORT", "1")
.current_dir(env!("CARGO_MANIFEST_DIR"))
.arg(&spec_path)
.status()
.expect("spawn cellos-supervisor");
assert!(
status.success(),
"expected successful supervisor run after retries: {status:?}"
);
let (requests, attempts_by_route) = server.join().expect("join mock export server");
assert_eq!(requests.len(), 4, "expected retry traffic for both targets");
assert_eq!(attempts_by_route.get("s3"), Some(&2));
assert_eq!(attempts_by_route.get("http"), Some(&2));
let s3_requests: Vec<&CapturedRequest> = requests
.iter()
.filter(|request| request.target.starts_with("/s3-upload?"))
.collect();
assert_eq!(s3_requests.len(), 2, "expected one retry for S3");
assert!(s3_requests
.iter()
.all(|request| request.content_type.as_deref() == Some("application/xml")));
assert!(s3_requests
.iter()
.all(|request| String::from_utf8(request.body.clone()).unwrap() == "<testsuite/>"));
let http_requests: Vec<&CapturedRequest> = requests
.iter()
.filter(|request| request.target == "/http-upload/retry-remote-export/coverage-summary")
.collect();
assert_eq!(http_requests.len(), 2, "expected one retry for HTTP");
assert!(http_requests
.iter()
.all(|request| request.content_type.as_deref() == Some("text/plain")));
assert!(http_requests
.iter()
.all(|request| String::from_utf8(request.body.clone()).unwrap() == "coverage=91%"));
let events = read_jsonl_events(&jsonl_path);
let export_completed: Vec<&Value> = events
.iter()
.filter(|event| event["type"] == "dev.cellos.events.cell.export.v2.completed")
.collect();
assert_eq!(export_completed.len(), 2, "expected two completed exports");
assert!(
!events
.iter()
.any(|event| event["type"] == "dev.cellos.events.cell.export.v2.failed"),
"did not expect export failed events after successful retries"
);
}
}