use std::io::{BufRead, BufReader, Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::process::Command;
use std::thread;
use std::time::Duration;
fn ilo() -> Command {
Command::new(env!("CARGO_BIN_EXE_ilo"))
}
fn spawn_chunked_server(
lines: Vec<&'static str>,
) -> (SocketAddr, thread::JoinHandle<Result<String, String>>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
let addr = listener.local_addr().expect("local_addr");
let handle = thread::spawn(move || -> Result<String, String> {
let (mut stream, _peer) = listener
.accept()
.map_err(|e| format!("accept failed: {e}"))?;
let mut req_buf = [0u8; 8192];
let n = stream.read(&mut req_buf).map_err(|e| e.to_string())?;
let req_text = String::from_utf8_lossy(&req_buf[..n]).to_string();
let head = b"HTTP/1.1 200 OK\r\nContent-Type: text/event-stream\r\nTransfer-Encoding: chunked\r\n\r\n";
stream.write_all(head).map_err(|e| e.to_string())?;
stream.flush().ok();
for line in &lines {
let payload = format!("{line}\n");
let chunk = format!("{:X}\r\n{}\r\n", payload.len(), payload);
stream
.write_all(chunk.as_bytes())
.map_err(|e| e.to_string())?;
stream.flush().ok();
thread::sleep(Duration::from_millis(5));
}
stream.write_all(b"0\r\n\r\n").map_err(|e| e.to_string())?;
stream.flush().ok();
Ok(req_text)
});
(addr, handle)
}
fn spawn_echo_post_server() -> (SocketAddr, thread::JoinHandle<Result<String, String>>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
let addr = listener.local_addr().expect("local_addr");
let handle = thread::spawn(move || -> Result<String, String> {
let (stream, _peer) = listener.accept().map_err(|e| e.to_string())?;
let mut reader = BufReader::new(stream.try_clone().map_err(|e| e.to_string())?);
let mut writer = stream;
let mut content_length = 0usize;
let mut header_lines = Vec::new();
loop {
let mut line = String::new();
reader.read_line(&mut line).map_err(|e| e.to_string())?;
if line == "\r\n" || line.is_empty() {
break;
}
if let Some(rest) = line.to_ascii_lowercase().strip_prefix("content-length:") {
content_length = rest.trim().parse::<usize>().unwrap_or(0);
}
header_lines.push(line);
}
let mut body = vec![0u8; content_length];
if content_length > 0 {
reader.read_exact(&mut body).map_err(|e| e.to_string())?;
}
let body_text = String::from_utf8_lossy(&body).to_string();
let head = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n";
writer.write_all(head).map_err(|e| e.to_string())?;
let payload = format!("{body_text}\n");
let chunk = format!("{:X}\r\n{}\r\n", payload.len(), payload);
writer
.write_all(chunk.as_bytes())
.map_err(|e| e.to_string())?;
writer.write_all(b"0\r\n\r\n").map_err(|e| e.to_string())?;
writer.flush().ok();
Ok(header_lines.join("") + &body_text)
});
(addr, handle)
}
fn spawn_truncated_server() -> SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
let addr = listener.local_addr().expect("local_addr");
thread::spawn(move || {
let Ok((mut stream, _)) = listener.accept() else {
return;
};
let mut buf = [0u8; 4096];
let _ = stream.read(&mut buf);
let _ = stream.write_all(
b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n6\r\nalpha\n\r\n5\r\nbeta\n\r\n",
);
let _ = stream.flush();
drop(stream);
});
addr
}
#[test]
fn get_stream_yields_lines_in_order() {
let (addr, srv) = spawn_chunked_server(vec!["alpha", "beta", "gamma"]);
let url = format!("http://{addr}/events");
let prog = r#"main url:t>n;n=0;@line (get-stream url){prnt line;n=+ n 1};n"#;
let out = ilo()
.args(["run", "--allow-net=*", prog, "main", &url])
.output()
.expect("failed to run ilo");
let _server_req = srv.join().expect("server thread panicked");
assert!(
out.status.success(),
"stderr: {}",
String::from_utf8_lossy(&out.stderr)
);
let stdout = String::from_utf8_lossy(&out.stdout);
let lines: Vec<&str> = stdout.lines().filter(|l| !l.is_empty()).collect();
assert_eq!(
lines,
vec!["alpha", "beta", "gamma", "3"],
"expected three chunk-lines then count, got {stdout:?}"
);
}
#[test]
fn get_stream_h_passes_authorization_header() {
let (addr, srv) = spawn_chunked_server(vec!["ok"]);
let url = format!("http://{addr}/events");
let prog = r#"main url:t>n;n=0;h=mset mmap "Authorization" "Bearer s3cret";@line (get-stream-h url h){prnt line;n=+ n 1};n"#;
let out = ilo()
.args(["run", "--allow-net=*", prog, "main", &url])
.output()
.expect("failed to run ilo");
let req = srv
.join()
.expect("server thread panicked")
.expect("server failed");
assert!(
out.status.success(),
"stderr: {}",
String::from_utf8_lossy(&out.stderr)
);
assert!(
req.contains("Authorization: Bearer s3cret"),
"Authorization header missing from request:\n{req}"
);
}
#[test]
fn pst_stream_streams_response_body() {
let (addr, srv) = spawn_echo_post_server();
let url = format!("http://{addr}/echo");
let prog = r#"main url:t>n;n=0;@line (pst-stream url "hello-stream"){prnt line;n=+ n 1};n"#;
let out = ilo()
.args(["run", "--allow-net=*", prog, "main", &url])
.output()
.expect("failed to run ilo");
let _ = srv.join().expect("server thread panicked");
assert!(
out.status.success(),
"stderr: {}",
String::from_utf8_lossy(&out.stderr)
);
let stdout = String::from_utf8_lossy(&out.stdout);
assert!(
stdout.contains("hello-stream"),
"expected echoed body in stdout, got: {stdout}"
);
}
#[test]
fn get_stream_returns_err_when_net_denied() {
let prog =
r#"main>n;n=0;@line (get-stream "http://127.0.0.1:1/whatever"){prnt line;n=+ n 1};n"#;
let out = ilo().args([prog]).output().expect("failed to run ilo");
assert!(
!out.status.success(),
"expected non-zero exit for cap-denied call, stdout={:?} stderr={:?}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr)
);
}
#[test]
fn get_stream_surfaces_mid_stream_error() {
let addr = spawn_truncated_server();
let url = format!("http://{addr}/truncated");
let prog = r#"main url:t>n;n=0;@line (get-stream url){prnt line;n=+ n 1};n"#;
let out = ilo()
.args(["run", "--allow-net=*", prog, "main", &url])
.output()
.expect("failed to run ilo");
let stdout = String::from_utf8_lossy(&out.stdout);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stdout.contains("alpha") || stderr.contains("http-stream"),
"expected either streamed lines or a stream error, stdout={stdout:?} stderr={stderr:?}"
);
}
#[test]
fn backend_get_stream_native_iterates_lines() {
use ilo::interpreter::http_wasm::default_backend;
let (addr, srv) = spawn_chunked_server(vec!["one", "two", "three"]);
let url = format!("http://{addr}/raw");
let backend = default_backend();
let iter = backend
.get_stream(&url, &[])
.expect("backend get_stream should open the connection");
let collected: Vec<String> = iter.map(|r| r.expect("line read ok")).collect();
let _ = srv.join().expect("server thread panicked");
assert_eq!(collected, vec!["one", "two", "three"]);
}
#[allow(dead_code)]
fn _force_use(_s: &TcpStream) {}