#[cfg(unix)]
mod unix {
use std::fs::File;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::thread;
use std::time::{Duration, Instant};
use serde_json::Value;
#[derive(Debug)]
struct CapturedRequest {
method: String,
target: String,
authorization: Option<String>,
body: Vec<u8>,
}
fn supervisor_exe() -> PathBuf {
if let Some(p) = std::env::var_os("CARGO_BIN_EXE_cellos_supervisor") {
return PathBuf::from(p);
}
let root = Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.and_then(|p| p.parent())
.expect("cellos-supervisor crate under workspace root");
let profile = std::env::var("PROFILE").unwrap_or_else(|_| "debug".into());
root.join("target").join(profile).join("cellos-supervisor")
}
fn read_request(stream: &mut TcpStream) -> CapturedRequest {
let mut reader = BufReader::new(stream.try_clone().expect("clone stream"));
let mut request_line = String::new();
reader
.read_line(&mut request_line)
.expect("read request line");
assert!(
!request_line.trim().is_empty(),
"expected non-empty request line"
);
let mut authorization = None;
let mut content_length = 0usize;
loop {
let mut line = String::new();
reader.read_line(&mut line).expect("read header");
if line == "\r\n" || line.is_empty() {
break;
}
if let Some((name, value)) = line.split_once(':') {
let header_name = name.trim().to_ascii_lowercase();
let header_value = value.trim().to_string();
if header_name == "authorization" {
authorization = Some(header_value);
} else if header_name == "content-length" {
content_length = header_value.parse::<usize>().expect("parse content-length");
}
}
}
let mut body = vec![0u8; content_length];
reader.read_exact(&mut body).expect("read request body");
let mut parts = request_line.split_whitespace();
let method = parts.next().expect("method").to_string();
let target = parts.next().expect("target").to_string();
CapturedRequest {
method,
target,
authorization,
body,
}
}
fn write_response(stream: &mut TcpStream, status_line: &str, body: &[u8], content_type: &str) {
write!(
stream,
"HTTP/1.1 {status_line}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body.len()
)
.expect("write response headers");
stream.write_all(body).expect("write response body");
stream.flush().expect("flush response");
}
fn start_mock_runner_services(
fail_oidc: bool,
expected_requests: usize,
) -> (String, String, thread::JoinHandle<Vec<CapturedRequest>>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock server");
listener
.set_nonblocking(true)
.expect("set mock server nonblocking");
let addr = listener.local_addr().expect("mock server local addr");
let handle = thread::spawn(move || {
let deadline = Instant::now() + Duration::from_secs(10);
let mut requests = Vec::new();
while requests.len() < expected_requests && Instant::now() < deadline {
match listener.accept() {
Ok((mut stream, _peer)) => {
stream
.set_nonblocking(false)
.expect("set accepted stream blocking");
let request = read_request(&mut stream);
match (request.method.as_str(), request.target.as_str()) {
("GET", target) if target.starts_with("/oidc?") => {
if fail_oidc {
write_response(
&mut stream,
"500 Internal Server Error",
b"mock oidc failure",
"text/plain",
);
} else {
write_response(
&mut stream,
"200 OK",
br#"{"value":"eyJhbGciOiJIUzI1NiJ9.mock.payload"}"#,
"application/json",
);
}
}
("PUT", target) if target.starts_with("/upload?") => {
write_response(&mut stream, "200 OK", b"", "text/plain");
}
_ => {
write_response(
&mut stream,
"404 Not Found",
b"unexpected request",
"text/plain",
);
}
}
requests.push(request);
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(20));
}
Err(err) => panic!("mock server accept failed: {err}"),
}
}
requests
});
(
format!("http://{addr}/oidc?api-version=2"),
format!("http://{addr}/upload?X-Amz-Signature=fake"),
handle,
)
}
fn read_jsonl_events(path: &Path) -> Vec<Value> {
let file = File::open(path).expect("open jsonl");
BufReader::new(file)
.lines()
.map(|line| {
serde_json::from_str::<Value>(&line.expect("read jsonl line"))
.expect("parse jsonl line")
})
.collect()
}
#[test]
fn github_oidc_and_s3_export_emit_identity_and_export_events() {
let tmp = tempfile::tempdir().expect("tempdir");
let jsonl_path = tmp.path().join("events.jsonl");
let spec_path = tmp.path().join("spec.json");
let artifact_path = tmp.path().join("test-results.xml");
let (oidc_url, s3_url, server) = start_mock_runner_services(false, 2);
let spec = serde_json::json!({
"apiVersion": "cellos.io/v1",
"kind": "ExecutionCell",
"spec": {
"id": "gha-oidc-s3-smoke",
"correlation": {
"platform": "github",
"externalRunId": "123456789",
"externalJobId": "build-and-test",
"tenantId": "org-acme"
},
"identity": {
"kind": "federatedOidc",
"provider": "github-actions",
"audience": "sts.amazonaws.com",
"secretRef": "AWS_WEB_IDENTITY",
"ttlSeconds": 300
},
"authority": {
"secretRefs": ["AWS_WEB_IDENTITY"]
},
"lifetime": {
"ttlSeconds": 600
},
"run": {
"secretDelivery": "env",
"argv": [
"/bin/sh",
"-c",
format!(
"test -n \"$AWS_WEB_IDENTITY\" && printf 'hello from cellos' > \"{}\"",
artifact_path.display()
)
]
},
"export": {
"targets": [
{
"name": "artifact-bucket",
"kind": "s3",
"bucket": "demo-bucket",
"keyPrefix": "github/run-123",
"secretRef": "AWS_WEB_IDENTITY"
}
],
"artifacts": [
{
"name": "test-results",
"path": artifact_path,
"target": "artifact-bucket",
"contentType": "application/xml"
}
]
}
}
});
std::fs::write(&spec_path, serde_json::to_string(&spec).unwrap()).expect("write spec");
let status = Command::new(supervisor_exe())
.env("CELLOS_DEPLOYMENT_PROFILE", "portable")
.env("CELL_OS_USE_NOOP_SINK", "1")
.env("CELLOS_CELL_BACKEND", "stub")
.env("CELL_OS_JSONL_EVENTS", &jsonl_path)
.env("CELLOS_BROKER", "github-oidc")
.env("ACTIONS_ID_TOKEN_REQUEST_URL", &oidc_url)
.env("ACTIONS_ID_TOKEN_REQUEST_TOKEN", "gha-request-token")
.env("CELLOS_EXPORT_S3_PRESIGNED_URL__ARTIFACT_BUCKET", &s3_url)
.env("CELL_OS_REQUIRE_S3_EXPORT", "1")
.env("CELLOS_RUN_ID", "gha-local-smoke")
.current_dir(env!("CARGO_MANIFEST_DIR"))
.arg(&spec_path)
.status()
.expect("spawn cellos-supervisor");
assert!(
status.success(),
"expected successful supervisor run: {status:?}"
);
let requests = server.join().expect("join mock server");
assert_eq!(requests.len(), 2, "expected OIDC GET and S3 PUT");
assert_eq!(requests[0].method, "GET");
assert!(
requests[0].target.contains("audience=sts.amazonaws.com"),
"OIDC request should use identity.audience, got {}",
requests[0].target
);
assert_eq!(
requests[0].authorization.as_deref(),
Some("Bearer gha-request-token")
);
assert_eq!(requests[1].method, "PUT");
assert_eq!(requests[1].target, "/upload?X-Amz-Signature=fake");
assert_eq!(
String::from_utf8(requests[1].body.clone()).unwrap(),
"hello from cellos"
);
let events = read_jsonl_events(&jsonl_path);
let event_types: Vec<&str> = events
.iter()
.filter_map(|event| event["type"].as_str())
.collect();
assert!(
event_types.contains(&"dev.cellos.events.cell.identity.v1.materialized"),
"missing identity materialized event: {event_types:?}"
);
assert!(
event_types.contains(&"dev.cellos.events.cell.identity.v1.revoked"),
"missing identity revoked event: {event_types:?}"
);
assert!(
event_types.contains(&"dev.cellos.events.cell.export.v2.completed"),
"missing export completed event: {event_types:?}"
);
assert!(
!event_types.contains(&"dev.cellos.events.cell.identity.v1.failed"),
"unexpected identity failed event: {event_types:?}"
);
let identity_event = events
.iter()
.find(|event| event["type"] == "dev.cellos.events.cell.identity.v1.materialized")
.expect("identity materialized event");
assert_eq!(
identity_event["data"]["identity"]["audience"],
"sts.amazonaws.com"
);
assert_eq!(
identity_event["data"]["identity"]["secretRef"],
"AWS_WEB_IDENTITY"
);
let export_event = events
.iter()
.find(|event| event["type"] == "dev.cellos.events.cell.export.v2.completed")
.expect("export completed event");
assert_eq!(export_event["data"]["receipt"]["targetKind"], "s3");
assert_eq!(
export_event["data"]["receipt"]["destination"],
"s3://demo-bucket/github/run-123/test-results"
);
assert!(
export_event["data"]["receipt"]["bytesWritten"]
.as_u64()
.expect("bytesWritten")
> 0
);
}
#[test]
fn github_oidc_failures_emit_identity_failed_event() {
let tmp = tempfile::tempdir().expect("tempdir");
let jsonl_path = tmp.path().join("events.jsonl");
let spec_path = tmp.path().join("spec.json");
let (oidc_url, _s3_url, server) = start_mock_runner_services(true, 1);
let spec = serde_json::json!({
"apiVersion": "cellos.io/v1",
"kind": "ExecutionCell",
"spec": {
"id": "gha-oidc-failure",
"identity": {
"kind": "federatedOidc",
"provider": "github-actions",
"audience": "sts.amazonaws.com",
"secretRef": "AWS_WEB_IDENTITY",
"ttlSeconds": 300
},
"authority": {
"secretRefs": ["AWS_WEB_IDENTITY"]
},
"lifetime": {
"ttlSeconds": 600
}
}
});
std::fs::write(&spec_path, serde_json::to_string(&spec).unwrap()).expect("write spec");
let status = Command::new(supervisor_exe())
.env("CELLOS_DEPLOYMENT_PROFILE", "portable")
.env("CELL_OS_USE_NOOP_SINK", "1")
.env("CELLOS_CELL_BACKEND", "stub")
.env("CELL_OS_JSONL_EVENTS", &jsonl_path)
.env("CELLOS_BROKER", "github-oidc")
.env("ACTIONS_ID_TOKEN_REQUEST_URL", &oidc_url)
.env("ACTIONS_ID_TOKEN_REQUEST_TOKEN", "gha-request-token")
.env("CELLOS_RUN_ID", "gha-local-failure")
.current_dir(env!("CARGO_MANIFEST_DIR"))
.arg(&spec_path)
.status()
.expect("spawn cellos-supervisor");
assert!(
!status.success(),
"expected supervisor failure when OIDC endpoint fails"
);
let requests = server.join().expect("join mock server");
assert_eq!(requests.len(), 1, "expected only the OIDC request");
assert_eq!(requests[0].method, "GET");
assert!(
requests[0].target.contains("audience=sts.amazonaws.com"),
"OIDC request should use identity.audience, got {}",
requests[0].target
);
let events = read_jsonl_events(&jsonl_path);
let identity_failed = events
.iter()
.find(|event| event["type"] == "dev.cellos.events.cell.identity.v1.failed")
.expect("identity failed event");
assert_eq!(identity_failed["data"]["operation"], "materialize");
assert!(
identity_failed["data"]["reason"]
.as_str()
.expect("failure reason")
.contains("oidc token request returned 500"),
"unexpected failure reason: {}",
identity_failed["data"]["reason"]
);
}
}