use std::path::PathBuf;
use std::process;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin};
use tokio::time::timeout;
#[allow(deprecated)]
fn workspace_bin(name: &str) -> PathBuf {
assert_cmd::cargo::cargo_bin(name)
}
struct TestServer {
child: Child,
address: String,
}
impl TestServer {
async fn new(addr: &str, closure: &str, tls: bool) -> Self {
Self::new_with_options(addr, closure, tls, &[], None, false).await
}
async fn new_with_plugins(
addr: &str,
closure: &str,
tls: bool,
plugins: &[std::path::PathBuf],
) -> Self {
Self::new_with_options(addr, closure, tls, plugins, None, false).await
}
async fn new_with_store(addr: &str, closure: &str, store_path: &std::path::Path) -> Self {
Self::new_with_options(addr, closure, false, &[], Some(store_path), false).await
}
async fn new_with_store_and_services(
addr: &str,
closure: &str,
store_path: &std::path::Path,
) -> Self {
Self::new_with_options(addr, closure, false, &[], Some(store_path), true).await
}
async fn new_with_options(
addr: &str,
closure: &str,
tls: bool,
plugins: &[std::path::PathBuf],
store_path: Option<&std::path::Path>,
services: bool,
) -> Self {
let mut cmd = tokio::process::Command::new(assert_cmd::cargo::cargo_bin!("http-nu"));
cmd.arg("--log-format").arg("jsonl");
for plugin in plugins {
cmd.arg("--plugin").arg(plugin);
}
if let Some(path) = store_path {
cmd.arg("--store").arg(path);
if services {
cmd.arg("--services");
}
}
cmd.arg(addr).arg("-c").arg(closure);
if tls {
cmd.arg("--tls").arg("tests/combined.pem");
}
let mut child = cmd
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to start http-nu server");
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
let mut addr_tx = Some(addr_tx);
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDOUT] {line}");
if addr_tx.is_some() {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&line) {
if let Some(addr_str) = json.get("address").and_then(|a| a.as_str()) {
if let Some(tx) = addr_tx.take() {
let _ = tx.send(addr_str.trim().to_string());
}
}
}
}
}
});
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDERR] {line}");
}
});
let address = timeout(std::time::Duration::from_secs(5), addr_rx)
.await
.expect("Failed to get address from http-nu server")
.expect("Channel closed before address received");
Self { child, address }
}
async fn curl(&self, path: &str) -> process::Output {
let mut cmd = tokio::process::Command::new("curl");
if self.address.starts_with('/') {
cmd.arg("--unix-socket").arg(&self.address);
cmd.arg(format!("http://localhost{path}"));
} else {
cmd.arg(format!("{}{path}", self.address));
}
cmd.output().await.expect("Failed to execute curl")
}
async fn curl_tls(&self, path: &str) -> process::Output {
let port = self.address.split(':').next_back().unwrap();
let mut cmd = tokio::process::Command::new("curl");
cmd.arg("--cacert")
.arg("tests/cert.pem")
.arg("--resolve")
.arg(format!("localhost:{port}:127.0.0.1"))
.arg(format!("https://localhost:{port}{path}"));
cmd.output().await.expect("Failed to execute curl")
}
fn send_ctrl_c(&mut self) {
#[cfg(unix)]
{
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
let pid = Pid::from_raw(self.child.id().expect("child id") as i32);
kill(pid, Signal::SIGINT).expect("failed to send SIGINT");
}
#[cfg(not(unix))]
{
let _ = self.child.start_kill();
}
}
fn send_sigterm(&mut self) {
#[cfg(unix)]
{
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
let pid = Pid::from_raw(self.child.id().expect("child id") as i32);
kill(pid, Signal::SIGTERM).expect("failed to send SIGTERM");
}
#[cfg(not(unix))]
{
let _ = self.child.start_kill();
}
}
async fn wait_for_exit(&mut self) -> std::process::ExitStatus {
use tokio::time::{timeout, Duration};
timeout(Duration::from_secs(5), self.child.wait())
.await
.expect("server did not exit in time")
.expect("failed waiting for child")
}
fn has_exited(&mut self) -> bool {
matches!(self.child.try_wait(), Ok(Some(_)))
}
}
impl Drop for TestServer {
fn drop(&mut self) {
if !self.has_exited() {
let _ = self.child.start_kill();
}
}
}
struct TestServerWithStdin {
child: Child,
address: String,
stdin: Option<ChildStdin>,
}
impl TestServerWithStdin {
fn spawn(addr: &str, tls: bool) -> (Child, ChildStdin, tokio::sync::oneshot::Receiver<String>) {
Self::spawn_with_watch(addr, tls)
}
fn spawn_with_watch(
addr: &str,
tls: bool,
) -> (Child, ChildStdin, tokio::sync::oneshot::Receiver<String>) {
let mut cmd = tokio::process::Command::new(assert_cmd::cargo::cargo_bin!("http-nu"));
cmd.arg("--log-format").arg("jsonl");
cmd.arg(addr).arg("-").arg("-w");
if tls {
cmd.arg("--tls").arg("tests/combined.pem");
}
let mut child = cmd
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to start http-nu server");
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
let mut addr_tx = Some(addr_tx);
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDOUT] {line}");
if addr_tx.is_some() {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&line) {
if let Some(addr_str) = json.get("address").and_then(|a| a.as_str()) {
if let Some(tx) = addr_tx.take() {
let _ = tx.send(addr_str.trim().to_string());
}
}
}
}
}
});
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDERR] {line}");
}
});
(child, stdin, addr_rx)
}
async fn write_script(&mut self, script: &str) {
let stdin = self.stdin.as_mut().expect("stdin already closed");
stdin
.write_all(script.as_bytes())
.await
.expect("Failed to write script to stdin");
stdin
.write_all(b"\0")
.await
.expect("Failed to write null terminator to stdin");
stdin.flush().await.expect("Failed to flush stdin");
}
async fn close_stdin(&mut self) {
self.stdin.take();
}
async fn curl_get(&self) -> String {
let mut cmd = tokio::process::Command::new("curl");
cmd.arg("-s").arg(format!("{}/", self.address));
let output = cmd.output().await.expect("Failed to execute curl");
String::from_utf8_lossy(&output.stdout).trim().to_string()
}
}
impl Drop for TestServerWithStdin {
fn drop(&mut self) {
let _ = self.child.start_kill();
}
}
#[tokio::test]
async fn test_server_startup_and_shutdown() {
let _server = TestServer::new("127.0.0.1:0", "{|req| $req.method}", false).await;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}
#[cfg(unix)]
#[tokio::test]
async fn test_server_unix_socket() {
let tmp = tempfile::tempdir().unwrap();
let socket_path = tmp.path().join("test.sock");
let socket_path_str = socket_path.to_str().unwrap();
let server = TestServer::new(socket_path_str, "{|req| $req.method}", false).await;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let output = server.curl("").await;
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "GET");
}
#[tokio::test]
async fn test_server_tcp_socket() {
let server = TestServer::new("127.0.0.1:0", "{|req| $req.method}", false).await;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let output = server.curl("").await;
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "GET");
}
#[tokio::test]
async fn test_server_tls_socket() {
let server = TestServer::new("127.0.0.1:0", "{|req| $req.method}", true).await;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let output = server.curl_tls("").await;
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "GET");
}
#[tokio::test]
async fn test_server_static_files() {
let tmp = tempfile::tempdir().unwrap();
let file_path = tmp.path().join("test.txt");
std::fs::write(&file_path, "Hello from static file").unwrap();
let closure = format!(
"{{|req| .static '{}' $req.path }}",
tmp.path().to_str().unwrap()
);
let server = TestServer::new("127.0.0.1:0", &closure, false).await;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let output = server.curl("/test.txt").await;
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "Hello from static file");
}
#[tokio::test]
async fn test_server_static_files_fallback() {
let tmp = tempfile::tempdir().unwrap();
let index_path = tmp.path().join("index.html");
std::fs::write(&index_path, "fallback page").unwrap();
let closure = format!(
"{{|req| .static '{}' $req.path --fallback 'index.html' }}",
tmp.path().to_str().unwrap()
);
let server = TestServer::new("127.0.0.1:0", &closure, false).await;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let output = server.curl("/missing/route").await;
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "fallback page");
}
#[tokio::test]
async fn test_server_reverse_proxy() {
let backend = TestServer::new(
"127.0.0.1:0",
r#"{|req|
let method = $req.method
let path = $req.path
let query = ($req.query | get foo | default 'none')
let header = ($req.headers | get "x-custom-header" | default "not-found")
$"Backend: ($method) ($path) ($query) ($header)"
}"#,
false,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let proxy_closure = format!(
r#"{{|req| .reverse-proxy "{}" {{ headers: {{ "x-custom-header": "proxy-added" }} }} }}"#,
backend.address
);
let proxy = TestServer::new("127.0.0.1:0", &proxy_closure, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = proxy.curl("/test?foo=bar").await;
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "Backend: GET /test bar proxy-added");
}
#[tokio::test]
async fn test_server_reverse_proxy_strip_prefix() {
let backend = TestServer::new("127.0.0.1:0", r#"{|req| $"Path: ($req.path)"}"#, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let proxy_closure = format!(
r#"{{|req| .reverse-proxy "{}" {{ strip_prefix: "/api" }} }}"#,
backend.address
);
let proxy = TestServer::new("127.0.0.1:0", &proxy_closure, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = proxy.curl("/api/users").await;
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "Path: /users");
}
#[tokio::test]
async fn test_server_reverse_proxy_strip_prefix_ssrf() {
let backend = TestServer::new("127.0.0.1:0", r#"{|req| $"Path: ($req.path)"}"#, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let proxy_closure = format!(
r#"{{|req| .reverse-proxy "{}" {{ strip_prefix: "/api" }} }}"#,
backend.address
);
let proxy = TestServer::new("127.0.0.1:0", &proxy_closure, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = proxy.curl("/api@evil.com/").await;
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "Path: /@evil.com/");
}
#[tokio::test]
async fn test_server_reverse_proxy_body_handling() {
let backend = TestServer::new("127.0.0.1:0", r#"{|req| $in}"#, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let proxy_closure = format!(r#"{{|req| .reverse-proxy "{}" }}"#, backend.address);
let proxy_forward = TestServer::new("127.0.0.1:0", &proxy_closure, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let mut cmd = tokio::process::Command::new("curl");
cmd.arg("-s")
.arg("-d")
.arg("forwarded")
.arg(&proxy_forward.address);
let output = cmd.output().await.expect("Failed to execute curl");
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "forwarded");
let proxy_closure = format!(
r#"{{|req| "override" | .reverse-proxy "{}" }}"#,
backend.address
);
let proxy_override = TestServer::new("127.0.0.1:0", &proxy_closure, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let mut cmd = tokio::process::Command::new("curl");
cmd.arg("-s")
.arg("-d")
.arg("original")
.arg(&proxy_override.address);
let output = cmd.output().await.expect("Failed to execute curl");
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "override");
}
#[tokio::test]
async fn test_server_reverse_proxy_host_header() {
let backend =
TestServer::new("127.0.0.1:0", r#"{|req| $req.headers | get "host"}"#, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let proxy_closure = format!(r#"{{|req| .reverse-proxy "{}" }}"#, backend.address);
let proxy = TestServer::new("127.0.0.1:0", &proxy_closure, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let mut cmd = tokio::process::Command::new("curl");
cmd.arg("-s")
.arg("-H")
.arg("Host: example.com")
.arg(&proxy.address);
let output = cmd.output().await.expect("Failed to execute curl");
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "example.com");
}
#[tokio::test]
async fn test_reverse_proxy_streaming() {
let backend = TestServer::new(
"127.0.0.1:0",
r#"{|req|
1..3 | each {|i|
sleep 100ms
$"chunk-($i)\n"
}
}"#,
false,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let proxy_closure = format!(r#"{{|req| .reverse-proxy "{}" }}"#, backend.address);
let proxy = TestServer::new("127.0.0.1:0", &proxy_closure, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
println!("Testing backend directly...");
let backend_start = std::time::Instant::now();
let mut backend_child = tokio::process::Command::new("curl")
.arg("-s")
.arg("--raw")
.arg("-N") .arg(&backend.address)
.stdout(std::process::Stdio::piped())
.spawn()
.expect("Failed to start curl for backend");
let backend_stdout = backend_child.stdout.take().unwrap();
use tokio::io::AsyncReadExt;
let mut backend_reader = backend_stdout;
let mut backend_first_byte = [0u8; 1];
backend_reader
.read_exact(&mut backend_first_byte)
.await
.unwrap();
let backend_first_byte_time = backend_start.elapsed();
let mut backend_remaining = Vec::new();
backend_reader
.read_to_end(&mut backend_remaining)
.await
.unwrap();
let backend_total_time = backend_start.elapsed();
backend_child.wait().await.unwrap();
println!(
"Backend - First byte: {:?}, Total: {:?}, Diff: {:?}",
backend_first_byte_time,
backend_total_time,
backend_total_time.saturating_sub(backend_first_byte_time)
);
let all_backend_data = [&backend_first_byte[..], &backend_remaining[..]].concat();
println!(
"Backend data: {:?}",
String::from_utf8_lossy(&all_backend_data)
);
let start = std::time::Instant::now();
let mut child = tokio::process::Command::new("curl")
.arg("-s")
.arg("--raw") .arg("-N") .arg(&proxy.address)
.stdout(std::process::Stdio::piped())
.spawn()
.expect("Failed to start curl");
let stdout = child.stdout.take().unwrap();
let mut reader = stdout;
let mut first_byte = [0u8; 1];
reader.read_exact(&mut first_byte).await.unwrap();
let first_byte_time = start.elapsed();
let mut remaining = Vec::new();
reader.read_to_end(&mut remaining).await.unwrap();
let total_time = start.elapsed();
child.wait().await.unwrap();
println!("First byte at: {first_byte_time:?}, Total time: {total_time:?}");
let time_difference = total_time.saturating_sub(first_byte_time);
assert!(total_time >= std::time::Duration::from_millis(280));
assert!(
time_difference >= std::time::Duration::from_millis(150),
"Expected at least 150ms between first byte and completion for streaming. Got: {time_difference:?}"
);
}
#[tokio::test]
async fn test_server_reverse_proxy_custom_query() {
let backend = TestServer::new("127.0.0.1:0", r#"{|req| $req.query | to json}"#, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let proxy_closure = format!(
r#"{{|req| .reverse-proxy "{}" {{ query: ($req.query | upsert "context-id" "smidgeons" | reject "debug") }} }}"#,
backend.address
);
let proxy = TestServer::new("127.0.0.1:0", &proxy_closure, false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let mut cmd = tokio::process::Command::new("curl");
cmd.arg("-s")
.arg(format!("{}/test?page=1&debug=true&limit=10", proxy.address));
let output = cmd.output().await.unwrap();
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
let json: serde_json::Value = serde_json::from_str(&stdout).unwrap();
assert_eq!(json["context-id"], "smidgeons");
assert_eq!(json["page"], "1");
assert_eq!(json["limit"], "10");
assert!(json.get("debug").is_none()); }
#[cfg(unix)]
#[tokio::test]
async fn test_server_tcp_graceful_shutdown() {
let mut server = TestServer::new("127.0.0.1:0", "{|req| $req.method}", false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
server.send_ctrl_c();
let status = server.wait_for_exit().await;
assert!(status.success());
}
#[cfg(unix)]
#[tokio::test]
async fn test_server_tls_graceful_shutdown() {
let mut server = TestServer::new("127.0.0.1:0", "{|req| $req.method}", true).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
server.send_ctrl_c();
let status = server.wait_for_exit().await;
assert!(status.success());
}
#[cfg(unix)]
#[tokio::test]
async fn test_server_unix_graceful_shutdown() {
let tmp = tempfile::tempdir().unwrap();
let socket_path = tmp.path().join("test_sigint.sock");
let socket_path_str = socket_path.to_str().unwrap();
let mut server = TestServer::new(socket_path_str, "{|req| $req.method}", false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
server.send_ctrl_c();
let status = server.wait_for_exit().await;
assert!(status.success());
}
#[cfg(unix)]
#[tokio::test]
async fn test_graceful_shutdown_waits_for_inflight_requests() {
let mut server =
TestServer::new("127.0.0.1:0", r#"{|req| sleep 500ms; "completed"}"#, false).await;
let url = format!("{}/", server.address);
let request_handle = tokio::spawn(async move {
tokio::process::Command::new("curl")
.arg("-s")
.arg("--retry")
.arg("3")
.arg("--retry-delay")
.arg("1")
.arg("--retry-connrefused")
.arg(&url)
.output()
.await
.expect("curl failed")
});
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
server.send_sigterm();
let output = request_handle.await.expect("request task panicked");
assert!(output.status.success(), "curl failed: {output:?}");
let body = String::from_utf8_lossy(&output.stdout);
assert_eq!(body, "completed");
let status = server.wait_for_exit().await;
assert!(status.success());
}
#[tokio::test]
async fn test_http1_support() {
let mut server = TestServer::new("127.0.0.1:0", r#"{|req| $req.proto}"#, false).await;
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg("--http1.1")
.arg(format!("{}/", server.address))
.output()
.await
.expect("curl failed");
assert!(output.status.success(), "curl failed: {output:?}");
let body = String::from_utf8_lossy(&output.stdout);
assert_eq!(body, "HTTP/1.1");
server.send_sigterm();
let status = server.wait_for_exit().await;
assert!(status.success());
}
#[tokio::test]
async fn test_http2_support() {
let mut server = TestServer::new("127.0.0.1:0", r#"{|req| $req.proto}"#, false).await;
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg("--http2-prior-knowledge")
.arg(format!("{}/", server.address))
.output()
.await
.expect("curl failed");
assert!(output.status.success(), "curl failed: {output:?}");
let body = String::from_utf8_lossy(&output.stdout);
assert_eq!(body, "HTTP/2.0");
server.send_sigterm();
let status = server.wait_for_exit().await;
assert!(status.success());
}
#[tokio::test]
async fn test_http2_tls_support() {
let mut server = TestServer::new("127.0.0.1:0", r#"{|req| $req.proto}"#, true).await;
let port = server.address.split(':').next_back().unwrap();
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg("--http2")
.arg("--cacert")
.arg("tests/cert.pem")
.arg("--resolve")
.arg(format!("localhost:{port}:127.0.0.1"))
.arg(format!("https://localhost:{port}/"))
.output()
.await
.expect("curl failed");
assert!(output.status.success(), "curl failed: {output:?}");
let body = String::from_utf8_lossy(&output.stdout);
assert_eq!(body, "HTTP/2.0");
server.send_sigterm();
let status = server.wait_for_exit().await;
assert!(status.success());
}
#[tokio::test]
async fn test_parse_error_ansi_formatting() {
let output = tokio::process::Command::new(assert_cmd::cargo::cargo_bin!("http-nu"))
.arg("127.0.0.1:0")
.arg("-c")
.arg("{|req| use nonexistent oauth}")
.output()
.await
.expect("Failed to execute http-nu");
assert!(!output.status.success());
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
!stderr.contains(r"\u{1b}"),
"stderr contains escaped ANSI codes: {stderr}"
);
assert!(
stderr.contains("Parse error") || stderr.contains("ExportNotFound"),
"stderr missing expected error text: {stderr}"
);
}
#[tokio::test]
async fn test_sse_brotli_compression_streams_immediately() {
let server = TestServer::new(
"127.0.0.1:0",
r#"{|req|
1..4 | each {|i|
sleep 200ms
{data: $"event-($i)"}
} | to sse
}"#,
false,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let start = std::time::Instant::now();
let mut child = tokio::process::Command::new("curl")
.arg("-s")
.arg("-N") .arg("-H")
.arg("Accept-Encoding: br")
.arg(&server.address)
.stdout(std::process::Stdio::piped())
.spawn()
.expect("Failed to start curl");
let stdout = child.stdout.take().unwrap();
use tokio::io::AsyncReadExt;
let mut reader = stdout;
let mut first_chunk = vec![0u8; 64];
let n = reader.read(&mut first_chunk).await.unwrap();
let first_chunk_time = start.elapsed();
assert!(n > 0, "Expected to receive data");
assert!(
first_chunk_time < std::time::Duration::from_millis(500),
"First SSE chunk took {first_chunk_time:?}, expected < 500ms. SSE compression may be buffering instead of streaming.",
);
let mut remaining = Vec::new();
reader.read_to_end(&mut remaining).await.unwrap();
let total_time = start.elapsed();
child.wait().await.unwrap();
assert!(
total_time >= std::time::Duration::from_millis(700),
"Total time {total_time:?} too short, expected ~800ms for streaming",
);
let all_compressed: Vec<u8> = first_chunk[..n]
.iter()
.chain(remaining.iter())
.copied()
.collect();
let mut decompressed = Vec::new();
brotli::BrotliDecompress(&mut &all_compressed[..], &mut decompressed)
.expect("Failed to decompress brotli SSE data");
let text = String::from_utf8(decompressed).expect("Invalid UTF-8");
assert!(text.contains("data: event-1"), "Missing event-1");
assert!(text.contains("data: event-2"), "Missing event-2");
assert!(text.contains("data: event-3"), "Missing event-3");
println!(
"SSE brotli streaming verified: first chunk at {first_chunk_time:?}, total {total_time:?}"
);
}
#[tokio::test]
async fn test_to_sse_command() {
let server = TestServer::new(
"127.0.0.1:0",
r#"{|req|
[
{id: "1", event: "greeting", data: "hello"}
{id: "2", event: "update", data: "world", retry: 5000}
{data: {count: 42}}
] | to sse
}"#,
false,
)
.await;
let output = std::process::Command::new("curl")
.arg("-s")
.arg("-i")
.arg(&server.address)
.output()
.expect("curl failed");
assert!(output.status.success());
let response = String::from_utf8_lossy(&output.stdout);
assert!(
response.contains("content-type: text/event-stream"),
"Missing content-type header"
);
assert!(
response.contains("cache-control: no-cache"),
"Missing cache-control header"
);
assert!(
response.contains("connection: keep-alive"),
"Missing connection header"
);
assert!(response.contains("id: 1"), "Missing id: 1");
assert!(
response.contains("event: greeting"),
"Missing event: greeting"
);
assert!(response.contains("data: hello"), "Missing data: hello");
assert!(response.contains("id: 2"), "Missing id: 2");
assert!(response.contains("event: update"), "Missing event: update");
assert!(response.contains("data: world"), "Missing data: world");
assert!(response.contains("retry: 5000"), "Missing retry: 5000");
assert!(
response.contains(r#"data: {"count":42}"#),
"Missing JSON data"
);
}
#[tokio::test]
async fn test_to_sse_ignores_null_fields() {
let server = TestServer::new(
"127.0.0.1:0",
r#"{|req|
[
{event: "test", data: "hello", id: null, retry: null}
{event: "with-id", data: "world", id: "123", retry: null}
{event: "with-retry", data: "foo", id: null, retry: 5000}
] | to sse
}"#,
false,
)
.await;
let output = std::process::Command::new("curl")
.arg("-s")
.arg(&server.address)
.output()
.expect("curl failed");
assert!(output.status.success());
let response = String::from_utf8_lossy(&output.stdout);
assert!(response.contains("event: test"), "Missing event: test");
assert!(response.contains("data: hello"), "Missing data: hello");
assert!(response.contains("id: 123"), "Missing id: 123");
assert!(
response.contains("event: with-id"),
"Missing event: with-id"
);
assert!(response.contains("retry: 5000"), "Missing retry: 5000");
assert!(
response.contains("event: with-retry"),
"Missing event: with-retry"
);
assert!(!response.contains("id: \n"), "Should not contain empty id");
assert!(
!response.contains("retry: \n"),
"Should not contain empty retry"
);
assert!(
!response.contains("id: null"),
"Should not contain id: null"
);
assert!(
!response.contains("retry: null"),
"Should not contain retry: null"
);
}
#[tokio::test]
async fn test_to_sse_data_list() {
let server = TestServer::new(
"127.0.0.1:0",
r#"{|req|
[
{event: "test", data: ["line1", "line2", "line3"]}
{event: "embedded", data: ["first", "has\nnewline", "last"]}
{event: "mixed", data: ["string", {num: 42}, "another"]}
] | to sse
}"#,
false,
)
.await;
let output = std::process::Command::new("curl")
.arg("-s")
.arg(&server.address)
.output()
.expect("curl failed");
assert!(output.status.success());
let response = String::from_utf8_lossy(&output.stdout);
assert!(response.contains("data: line1"), "Missing data: line1");
assert!(response.contains("data: line2"), "Missing data: line2");
assert!(response.contains("data: line3"), "Missing data: line3");
assert!(response.contains("data: first"), "Missing data: first");
assert!(response.contains("data: has"), "Missing data: has");
assert!(response.contains("data: newline"), "Missing data: newline");
assert!(response.contains("data: last"), "Missing data: last");
assert!(response.contains("data: string"), "Missing data: string");
assert!(
response.contains(r#"data: {"num":42}"#),
"Missing JSON data in list"
);
assert!(response.contains("data: another"), "Missing data: another");
}
#[tokio::test]
async fn test_server_missing_host_header() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
let mut server = TestServer::new(
"127.0.0.1:0",
"{|req| let host = $req.headers.host; $\"Host: ($host)\" }",
false,
)
.await;
let addr = server.address.strip_prefix("http://").unwrap();
let mut stream = TcpStream::connect(addr).await.expect("connect to server");
stream
.write_all(b"GET / HTTP/1.0\r\n\r\n")
.await
.expect("send request");
let mut buf = Vec::new();
stream.read_to_end(&mut buf).await.expect("read response");
let text = String::from_utf8_lossy(&buf);
assert!(text.contains("500"), "expected 500 status, got: {text}");
server.send_sigterm();
let status = server.wait_for_exit().await;
assert!(status.success());
}
#[tokio::test]
async fn test_router_exact_path() {
let server = TestServer::new(
"127.0.0.1:0",
r#"{|req|
use http-nu/router *
dispatch $req [
(route {path: "/health"} {|req ctx| "OK"})
(route {path: "/status"} {|req ctx| "RUNNING"})
(route true {|req ctx| "NOT FOUND"})
]
}"#,
false,
)
.await;
let output = server.curl("/health").await;
assert!(output.status.success());
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "OK");
let output = server.curl("/status").await;
assert!(output.status.success());
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "RUNNING");
let output = server.curl("/unknown").await;
assert!(output.status.success());
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "NOT FOUND");
}
#[tokio::test]
async fn test_router_path_parameters() {
let server = TestServer::new(
"127.0.0.1:0",
r#"{|req|
use http-nu/router *
dispatch $req [
(route {path-matches: "/users/:id"} {|req ctx| $"User: ($ctx.id)"})
(route {path-matches: "/posts/:userId/:postId"} {|req ctx| $"Post ($ctx.postId) by user ($ctx.userId)"})
(route true {|req ctx| "NOT FOUND"})
]
}"#,
false,
)
.await;
let output = server.curl("/users/alice").await;
assert!(output.status.success());
assert_eq!(
String::from_utf8_lossy(&output.stdout).trim(),
"User: alice"
);
let output = server.curl("/posts/bob/123").await;
assert!(output.status.success());
assert_eq!(
String::from_utf8_lossy(&output.stdout).trim(),
"Post 123 by user bob"
);
}
#[tokio::test]
async fn test_router_method_matching() {
let server = TestServer::new(
"127.0.0.1:0",
r#"{|req|
use http-nu/router *
dispatch $req [
(route {method: "GET", path: "/items"} {|req ctx| "LIST"})
(route {method: "POST", path: "/items"} {|req ctx| "CREATE"})
(route true {|req ctx| "NOT FOUND"})
]
}"#,
false,
)
.await;
let output = server.curl("/items").await;
assert!(output.status.success());
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "LIST");
let output = tokio::process::Command::new("curl")
.arg("-X")
.arg("POST")
.arg(format!("{}/items", server.address))
.output()
.await
.expect("curl failed");
assert!(output.status.success());
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "CREATE");
}
#[tokio::test]
async fn test_router_header_matching() {
let server = TestServer::new(
"127.0.0.1:0",
r#"{|req|
use http-nu/router *
dispatch $req [
(route {has-header: {accept: "application/json"}} {|req ctx| "JSON"})
(route true {|req ctx| "OTHER"})
]
}"#,
false,
)
.await;
let output = tokio::process::Command::new("curl")
.arg("-H")
.arg("Accept: application/json")
.arg(format!("{}/", server.address))
.output()
.await
.expect("curl failed");
assert!(output.status.success());
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "JSON");
let output = tokio::process::Command::new("curl")
.arg("-H")
.arg("Accept: text/html")
.arg(format!("{}/", server.address))
.output()
.await
.expect("curl failed");
assert!(output.status.success());
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "OTHER");
}
#[tokio::test]
async fn test_router_combined_conditions() {
let server = TestServer::new(
"127.0.0.1:0",
r#"{|req|
use http-nu/router *
dispatch $req [
(route {
method: "POST"
path-matches: "/api/:version/data"
has-header: {accept: "application/json"}
} {|req ctx| $"API ($ctx.version) JSON"})
(route true {|req ctx| "FALLBACK"})
]
}"#,
false,
)
.await;
let output = tokio::process::Command::new("curl")
.arg("-X")
.arg("POST")
.arg("-H")
.arg("Accept: application/json")
.arg(format!("{}/api/v1/data", server.address))
.output()
.await
.expect("curl failed");
assert!(output.status.success());
assert_eq!(
String::from_utf8_lossy(&output.stdout).trim(),
"API v1 JSON"
);
let output = tokio::process::Command::new("curl")
.arg("-H")
.arg("Accept: application/json")
.arg(format!("{}/api/v1/data", server.address))
.output()
.await
.expect("curl failed");
assert!(output.status.success());
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "FALLBACK");
}
#[tokio::test]
async fn test_router_no_match_501() {
let server = TestServer::new(
"127.0.0.1:0",
r#"{|req|
use http-nu/router *
dispatch $req [
(route {method: "POST", path: "/users"} {|req ctx| "CREATED"})
]
}"#,
false,
)
.await;
let output = tokio::process::Command::new("curl")
.arg("-i")
.arg(format!("{}/unknown", server.address))
.output()
.await
.expect("curl failed");
assert!(output.status.success());
let response = String::from_utf8_lossy(&output.stdout);
assert!(response.contains("501 Not Implemented"));
assert!(response.contains("No route configured"));
}
#[tokio::test]
async fn test_plugin_loading() {
let plugin_path = workspace_bin("nu_plugin_test");
let server = TestServer::new_with_plugins(
"127.0.0.1:0",
"{|req| test-plugin-cmd}",
false,
&[plugin_path],
)
.await;
let output = server.curl("/").await;
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "PLUGIN_WORKS");
}
#[tokio::test]
async fn test_plugin_process_shared_across_requests() {
let plugin_path = workspace_bin("nu_plugin_test");
let server = TestServer::new_with_plugins(
"127.0.0.1:0",
"{|req| test-plugin-cmd}",
false,
&[plugin_path],
)
.await;
let start = std::time::Instant::now();
for _ in 0..10 {
let output = server.curl("/").await;
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert_eq!(stdout.trim(), "PLUGIN_WORKS");
}
let elapsed = start.elapsed();
assert!(
elapsed < std::time::Duration::from_millis(400),
"10 requests took {elapsed:?}, expected < 400ms (plugin should be shared, not spawned per-request)"
);
}
#[tokio::test]
async fn test_watch_flag_incompatible_with_commands() {
let output = tokio::process::Command::new(assert_cmd::cargo::cargo_bin!("http-nu"))
.arg("127.0.0.1:0")
.arg("-c")
.arg("{|req| 'hello'}")
.arg("-w")
.output()
.await
.expect("Failed to execute http-nu");
assert!(!output.status.success());
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("cannot be used with") || stderr.contains("conflict"),
"Expected error about -w and -c being incompatible, got: {stderr}"
);
}
#[tokio::test]
async fn test_watch_file_reload_on_change() {
let tmp = tempfile::tempdir().unwrap();
let script_path = tmp.path().join("handler.nu");
std::fs::write(&script_path, r#"{|req| "version1"}"#).unwrap();
let mut cmd = tokio::process::Command::new(assert_cmd::cargo::cargo_bin!("http-nu"));
cmd.arg("--log-format")
.arg("jsonl")
.arg("127.0.0.1:0")
.arg(&script_path)
.arg("-w");
let mut child = cmd
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to start http-nu server");
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
let mut addr_tx = Some(addr_tx);
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDOUT] {line}");
if addr_tx.is_some() {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&line) {
if let Some(addr_str) = json.get("address").and_then(|a| a.as_str()) {
if let Some(tx) = addr_tx.take() {
let _ = tx.send(addr_str.trim().to_string());
}
}
}
}
}
});
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDERR] {line}");
}
});
let address = timeout(std::time::Duration::from_secs(5), addr_rx)
.await
.expect("Failed to get address")
.expect("Channel closed");
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg(format!("{address}/"))
.output()
.await
.expect("curl failed");
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "version1");
let dummy_path = tmp.path().join("trigger.txt");
std::fs::write(&dummy_path, "trigger").unwrap();
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
std::fs::write(&script_path, r#"{|req| "version2"}"#).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg(format!("{address}/"))
.output()
.await
.expect("curl failed");
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "version2");
let _ = child.kill().await;
}
#[tokio::test]
async fn test_watch_directory_change_triggers_reload() {
let tmp = tempfile::tempdir().unwrap();
let script_path = tmp.path().join("handler.nu");
let include_path = tmp.path().join("helpers.nu");
std::fs::write(&include_path, r#"def get-version [] { "v1" }"#).unwrap();
std::fs::write(
&script_path,
format!(
r#"source "{}"
{{|req| get-version}}"#,
include_path.display()
),
)
.unwrap();
let mut cmd = tokio::process::Command::new(assert_cmd::cargo::cargo_bin!("http-nu"));
cmd.arg("--log-format")
.arg("jsonl")
.arg("127.0.0.1:0")
.arg(&script_path)
.arg("-w");
let mut child = cmd
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to start http-nu server");
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
let mut addr_tx = Some(addr_tx);
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDOUT] {line}");
if addr_tx.is_some() {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&line) {
if let Some(addr_str) = json.get("address").and_then(|a| a.as_str()) {
if let Some(tx) = addr_tx.take() {
let _ = tx.send(addr_str.trim().to_string());
}
}
}
}
}
});
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDERR] {line}");
}
});
let address = timeout(std::time::Duration::from_secs(5), addr_rx)
.await
.expect("Failed to get address")
.expect("Channel closed");
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg(format!("{address}/"))
.output()
.await
.expect("curl failed");
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "v1");
std::fs::write(&include_path, r#"def get-version [] { "v2" }"#).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg(format!("{address}/"))
.output()
.await
.expect("curl failed");
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "v2");
let _ = child.kill().await;
}
#[tokio::test]
async fn test_stdin_one_shot() {
let mut cmd = tokio::process::Command::new(assert_cmd::cargo::cargo_bin!("http-nu"));
cmd.arg("--log-format")
.arg("jsonl")
.arg("127.0.0.1:0")
.arg("-");
let mut child = cmd
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to start http-nu");
let mut stdin = child.stdin.take().unwrap();
stdin.write_all(br#"{|req| "one-shot"}"#).await.unwrap();
drop(stdin);
let stdout = child.stdout.take().unwrap();
let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
let mut addr_tx = Some(addr_tx);
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
if addr_tx.is_some() {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&line) {
if let Some(addr_str) = json.get("address").and_then(|a| a.as_str()) {
if let Some(tx) = addr_tx.take() {
let _ = tx.send(addr_str.trim().to_string());
}
}
}
}
}
});
let address = timeout(std::time::Duration::from_secs(5), addr_rx)
.await
.expect("Server didn't start")
.expect("Channel closed");
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg(format!("{address}/"))
.output()
.await
.expect("curl failed");
assert_eq!(String::from_utf8_lossy(&output.stdout).trim(), "one-shot");
let _ = child.kill().await;
}
#[tokio::test]
async fn test_watch_stdin_dynamic_reload() {
let (child, mut stdin, addr_rx) = TestServerWithStdin::spawn("127.0.0.1:0", false);
async fn write_script(stdin: &mut ChildStdin, script: &str) {
stdin.write_all(script.as_bytes()).await.unwrap();
stdin.write_all(b"\0").await.unwrap();
stdin.flush().await.unwrap();
tokio::task::yield_now().await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
write_script(&mut stdin, "{|req| { unclosed").await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
write_script(&mut stdin, r#"{|req| "1"}"#).await;
let address = timeout(std::time::Duration::from_secs(5), addr_rx)
.await
.expect("Server didn't start")
.expect("Channel closed");
let mut server = TestServerWithStdin {
child,
address,
stdin: Some(stdin),
};
assert_eq!(server.curl_get().await, "1");
server.write_script("{|req| ] unbalanced").await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert_eq!(server.curl_get().await, "1");
server.write_script(r#"{|req| "2"}"#).await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert_eq!(server.curl_get().await, "2");
{
let stdin = server.stdin.as_mut().unwrap();
stdin.write_all(br#"{|req| "3"}"#).await.unwrap();
stdin.flush().await.unwrap();
tokio::task::yield_now().await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
server.close_stdin().await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert_eq!(server.curl_get().await, "3");
}
#[cfg(feature = "cross-stream")]
#[tokio::test]
async fn test_store_cat_follow_receives_appended_frames() {
use tokio::io::{AsyncBufReadExt, BufReader};
let tmp = tempfile::tempdir().unwrap();
let store_path = tmp.path().join("store");
let server = TestServer::new_with_store(
"127.0.0.1:0",
r#"{|req|
if $req.method == "GET" and $req.path == "/stream" {
.cat -f -n -T ping | each {|frame| $frame | to json -r | $"($in)\n" }
} else if $req.method == "POST" and $req.path == "/append" {
$in | .append ping --meta {source: "test"}
"ok"
} else {
"not found" | metadata set { merge {'http.response': {status: 404}}}
}
}"#,
&store_path,
)
.await;
let stream_url = format!("{}/stream", server.address);
let mut stream_child = tokio::process::Command::new("curl")
.arg("-s")
.arg("-N") .arg(&stream_url)
.stdout(std::process::Stdio::piped())
.spawn()
.expect("Failed to start curl for stream");
let stream_stdout = stream_child.stdout.take().unwrap();
let mut stream_reader = BufReader::new(stream_stdout).lines();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let append_output = tokio::process::Command::new("curl")
.arg("-s")
.arg("-X")
.arg("POST")
.arg("-d")
.arg("hello world")
.arg(format!("{}/append", server.address))
.output()
.await
.expect("Failed to execute append curl");
assert!(append_output.status.success());
assert_eq!(String::from_utf8_lossy(&append_output.stdout).trim(), "ok");
let frame_line = timeout(std::time::Duration::from_secs(5), stream_reader.next_line())
.await
.expect("Timed out waiting for streamed frame")
.expect("Failed to read line")
.expect("Stream ended unexpectedly");
let frame: serde_json::Value =
serde_json::from_str(&frame_line).expect("Failed to parse frame JSON");
assert_eq!(frame["topic"], "ping", "Frame should have topic 'ping'");
assert_eq!(
frame["meta"]["source"], "test",
"Frame should have meta.source 'test'"
);
assert!(frame["hash"].is_string(), "Frame should have a hash");
let _ = stream_child.kill().await;
}
#[cfg(feature = "cross-stream")]
#[tokio::test]
async fn test_services_flag_enables_handlers() {
use tokio::io::{AsyncBufReadExt, BufReader};
let tmp = tempfile::tempdir().unwrap();
let store_path = tmp.path().join("store");
let server =
TestServer::new_with_store_and_services("127.0.0.1:0", r#"{|req| "ok"}"#, &store_path)
.await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let sock_path = store_path.join("sock");
let sock_ready = timeout(std::time::Duration::from_secs(5), async {
loop {
if sock_path.exists() {
return;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
})
.await;
assert!(
sock_ready.is_ok(),
"Socket was not created at {sock_path:?}"
);
let mut stream_child = tokio::process::Command::new("curl")
.arg("-s")
.arg("-N") .arg("--unix-socket")
.arg(&sock_path)
.arg("http://localhost/?follow=true")
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to start curl for stream");
let stream_stdout = stream_child.stdout.take().unwrap();
let mut stream_reader = BufReader::new(stream_stdout).lines();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let handler_script = r#"{run: {|frame, state| $frame.topic | .append echo.out}}"#;
let register_output = tokio::process::Command::new("curl")
.arg("-s")
.arg("-X")
.arg("POST")
.arg("--unix-socket")
.arg(&sock_path)
.arg("-d")
.arg(handler_script)
.arg("http://localhost/append/echo.register")
.output()
.await
.expect("Failed to register handler");
assert!(
register_output.status.success(),
"Handler registration failed: {}",
String::from_utf8_lossy(®ister_output.stderr)
);
let active_frame = timeout(std::time::Duration::from_secs(5), async {
loop {
let line = stream_reader
.next_line()
.await
.expect("Failed to read line")
.expect("Stream ended unexpectedly");
eprintln!("[TEST] Received frame: {line}");
let frame: serde_json::Value = serde_json::from_str(&line).unwrap();
let topic = frame["topic"].as_str().unwrap();
if topic == "echo.active" || topic == "echo.unregistered" {
return frame;
}
}
})
.await
.expect("Handler did not become active or fail");
assert_eq!(
active_frame["topic"], "echo.active",
"Handler failed to activate"
);
let trigger_output = tokio::process::Command::new("curl")
.arg("-s")
.arg("-X")
.arg("POST")
.arg("--unix-socket")
.arg(&sock_path)
.arg("http://localhost/append/test.trigger")
.output()
.await
.expect("Failed to trigger handler");
assert!(
trigger_output.status.success(),
"Trigger failed: {}",
String::from_utf8_lossy(&trigger_output.stderr)
);
let output_frame = timeout(std::time::Duration::from_secs(5), async {
loop {
let line = stream_reader.next_line().await.unwrap().unwrap();
let frame: serde_json::Value = serde_json::from_str(&line).unwrap();
if frame["topic"] == "echo.out" {
return frame;
}
}
})
.await
.expect("Handler did not produce output");
assert_eq!(output_frame["topic"], "echo.out");
let hash = output_frame["hash"].as_str().unwrap();
let cas_output = tokio::process::Command::new("curl")
.arg("-s")
.arg("--unix-socket")
.arg(&sock_path)
.arg(format!("http://localhost/cas/{hash}"))
.output()
.await
.expect("Failed to fetch CAS content");
assert_eq!(String::from_utf8_lossy(&cas_output.stdout), "test.trigger");
let _ = stream_child.kill().await;
drop(server);
}
#[tokio::test]
async fn test_record_json_content_type() {
let server = TestServer::new("127.0.0.1:0", "{|req| {foo: 1, bar: 'hello'}}", false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = std::process::Command::new("curl")
.arg("-s")
.arg("-i")
.arg(&server.address)
.output()
.expect("curl failed");
assert!(output.status.success());
let response = String::from_utf8_lossy(&output.stdout);
assert!(
response.contains("content-type: application/json"),
"Expected application/json content-type, got: {response}"
);
assert!(
response.contains(r#""foo":1"#) || response.contains(r#""foo": 1"#),
"Expected JSON body with foo:1"
);
}
#[tokio::test]
async fn test_list_of_records_json_content_type() {
let server = TestServer::new("127.0.0.1:0", "{|req| [{a: 1}, {b: 2}, {c: 3}]}", false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = std::process::Command::new("curl")
.arg("-s")
.arg("-i")
.arg(&server.address)
.output()
.expect("curl failed");
assert!(output.status.success());
let response = String::from_utf8_lossy(&output.stdout);
assert!(
response.contains("content-type: application/json"),
"Expected application/json content-type, got: {response}"
);
assert!(
response.contains(r#"[{"a":1},{"b":2},{"c":3}]"#),
"Expected JSON array, got: {response}"
);
}
#[tokio::test]
async fn test_html_record_not_jsonl() {
let server = TestServer::new(
"127.0.0.1:0",
r#"{|req| {__html: "<h1>Hello</h1>"}}"#,
false,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = std::process::Command::new("curl")
.arg("-s")
.arg("-i")
.arg(&server.address)
.output()
.expect("curl failed");
assert!(output.status.success());
let response = String::from_utf8_lossy(&output.stdout);
assert!(
response.contains("content-type: text/html"),
"Expected text/html content-type for __html record, got: {response}"
);
assert!(response.contains("<h1>Hello</h1>"), "Expected HTML body");
}
#[tokio::test]
async fn test_binary_octet_stream_content_type() {
let server = TestServer::new("127.0.0.1:0", "{|req| 0x[deadbeef]}", false).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = std::process::Command::new("curl")
.arg("-s")
.arg("-i")
.arg(&server.address)
.output()
.expect("curl failed");
assert!(output.status.success());
let response = String::from_utf8_lossy(&output.stdout);
assert!(
response.contains("content-type: application/octet-stream"),
"Expected application/octet-stream content-type for binary, got: {response}"
);
}
#[tokio::test]
async fn test_sse_cancelled_on_hot_reload() {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
let (mut child, mut stdin, addr_rx) = TestServerWithStdin::spawn("127.0.0.1:0", false);
let sse_script = r#"{|req|
1..100 | each {|i|
sleep 100ms
{data: $"event-($i)"}
} | to sse
}"#;
stdin.write_all(sse_script.as_bytes()).await.unwrap();
stdin.write_all(b"\0").await.unwrap();
stdin.flush().await.unwrap();
let address = tokio::time::timeout(std::time::Duration::from_secs(5), addr_rx)
.await
.expect("Server didn't start")
.expect("Channel closed");
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let mut sse_child = tokio::process::Command::new("curl")
.arg("-sN")
.arg(&address)
.stdout(std::process::Stdio::piped())
.spawn()
.expect("Failed to start curl");
let stdout = sse_child.stdout.take().expect("Failed to get stdout");
let mut reader = BufReader::new(stdout).lines();
let mut events_received = 0;
for _ in 0..3 {
if let Ok(Ok(Some(line))) =
tokio::time::timeout(std::time::Duration::from_secs(2), reader.next_line()).await
{
if line.starts_with("data:") {
events_received += 1;
}
}
}
assert!(
events_received >= 1,
"Should have received at least one SSE event before reload"
);
let new_script = r#"{|req| "reloaded"}"#;
stdin.write_all(new_script.as_bytes()).await.unwrap();
stdin.write_all(b"\0").await.unwrap();
stdin.flush().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let more_events =
tokio::time::timeout(std::time::Duration::from_millis(500), reader.next_line()).await;
let stream_stopped = match more_events {
Err(_) => true, Ok(Ok(None)) => true, Ok(Ok(Some(line))) => !line.starts_with("data:"), Ok(Err(_)) => true, };
assert!(stream_stopped, "SSE stream should stop after reload");
sse_child.kill().await.ok();
let output = std::process::Command::new("curl")
.arg("-s")
.arg(&address)
.output()
.expect("curl failed");
assert_eq!(
String::from_utf8_lossy(&output.stdout).trim(),
"reloaded",
"New handler should be active after reload"
);
child.kill().await.ok();
}
#[tokio::test]
async fn test_sse_cancelled_on_hot_reload_with_brotli() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let (mut child, mut stdin, addr_rx) = TestServerWithStdin::spawn("127.0.0.1:0", false);
let sse_script = r#"{|req|
1..100 | each {|i|
sleep 100ms
{data: $"event-($i)"}
} | to sse
}"#;
stdin.write_all(sse_script.as_bytes()).await.unwrap();
stdin.write_all(b"\0").await.unwrap();
stdin.flush().await.unwrap();
let address = tokio::time::timeout(std::time::Duration::from_secs(5), addr_rx)
.await
.expect("Server didn't start")
.expect("Channel closed");
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let mut sse_child = tokio::process::Command::new("curl")
.arg("-sN")
.arg("-H")
.arg("Accept-Encoding: br")
.arg(&address)
.stdout(std::process::Stdio::piped())
.spawn()
.expect("Failed to start curl");
let stdout = sse_child.stdout.take().expect("Failed to get stdout");
let mut reader = stdout;
let mut buf = vec![0u8; 256];
let n = tokio::time::timeout(std::time::Duration::from_secs(2), reader.read(&mut buf))
.await
.expect("Timeout reading initial SSE data")
.expect("Read error");
assert!(n > 0, "Should have received SSE data");
let new_script = r#"{|req| "reloaded"}"#;
stdin.write_all(new_script.as_bytes()).await.unwrap();
stdin.write_all(b"\0").await.unwrap();
stdin.flush().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let stream_ended = tokio::time::timeout(std::time::Duration::from_secs(2), async {
loop {
match reader.read(&mut buf).await {
Ok(0) => return true, Ok(_) => continue, Err(_) => return true, }
}
})
.await;
assert!(
matches!(stream_ended, Ok(true)),
"Compressed SSE stream should stop after reload"
);
sse_child.kill().await.ok();
let output = std::process::Command::new("curl")
.arg("-s")
.arg(&address)
.output()
.expect("curl failed");
assert_eq!(
String::from_utf8_lossy(&output.stdout).trim(),
"reloaded",
"New handler should be active after reload"
);
child.kill().await.ok();
}
#[cfg(feature = "cross-stream")]
#[tokio::test]
async fn test_watch_topic_reload_on_append() {
let tmp = tempfile::tempdir().unwrap();
let store_path = tmp.path().join("store");
let mut cmd = tokio::process::Command::new(assert_cmd::cargo::cargo_bin!("http-nu"));
cmd.arg("--log-format")
.arg("jsonl")
.arg("--store")
.arg(&store_path)
.arg("--topic")
.arg("serve.nu")
.arg("-w")
.arg("127.0.0.1:0");
let mut child = cmd
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to start http-nu server");
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
let mut addr_tx = Some(addr_tx);
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDOUT] {line}");
if addr_tx.is_some() {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&line) {
if let Some(addr_str) = json.get("address").and_then(|a| a.as_str()) {
if let Some(tx) = addr_tx.take() {
let _ = tx.send(addr_str.trim().to_string());
}
}
}
}
}
});
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDERR] {line}");
}
});
let address = timeout(std::time::Duration::from_secs(5), addr_rx)
.await
.expect("Failed to get address")
.expect("Channel closed");
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg("-o")
.arg("/dev/null")
.arg("-w")
.arg("%{http_code}")
.arg(format!("{address}/"))
.output()
.await
.expect("curl failed");
assert_eq!(
String::from_utf8_lossy(&output.stdout).trim(),
"503",
"Empty topic should serve placeholder with 503"
);
let sock_path = store_path.join("sock");
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg("--unix-socket")
.arg(&sock_path)
.arg("-X")
.arg("POST")
.arg("-d")
.arg(r#"{|req| "version1"}"#)
.arg("http://localhost/append/serve.nu")
.output()
.await
.expect("curl append failed");
assert!(
output.status.success(),
"append should succeed: {}",
String::from_utf8_lossy(&output.stderr)
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg(format!("{address}/"))
.output()
.await
.expect("curl failed");
assert_eq!(
String::from_utf8_lossy(&output.stdout).trim(),
"version1",
"Should serve handler from topic"
);
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg("--unix-socket")
.arg(&sock_path)
.arg("-X")
.arg("POST")
.arg("-d")
.arg(r#"{|req| "version2"}"#)
.arg("http://localhost/append/serve.nu")
.output()
.await
.expect("curl append failed");
assert!(output.status.success(), "second append should succeed");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg(format!("{address}/"))
.output()
.await
.expect("curl failed");
assert_eq!(
String::from_utf8_lossy(&output.stdout).trim(),
"version2",
"Should serve updated handler after topic update"
);
let _ = child.kill().await;
}
#[cfg(feature = "cross-stream")]
#[tokio::test]
async fn test_watch_topic_reload_picks_up_module_changes() {
let tmp = tempfile::tempdir().unwrap();
let store_path = tmp.path().join("store");
let mut cmd = tokio::process::Command::new(assert_cmd::cargo::cargo_bin!("http-nu"));
cmd.arg("--log-format")
.arg("jsonl")
.arg("--store")
.arg(&store_path)
.arg("--topic")
.arg("serve.nu")
.arg("-w")
.arg("127.0.0.1:0");
let mut child = cmd
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to start http-nu server");
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
let mut addr_tx = Some(addr_tx);
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDOUT] {line}");
if addr_tx.is_some() {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&line) {
if let Some(addr_str) = json.get("address").and_then(|a| a.as_str()) {
if let Some(tx) = addr_tx.take() {
let _ = tx.send(addr_str.trim().to_string());
}
}
}
}
}
});
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDERR] {line}");
}
});
let address = timeout(std::time::Duration::from_secs(5), addr_rx)
.await
.expect("Failed to get address")
.expect("Channel closed");
let sock_path = store_path.join("sock");
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg("--unix-socket")
.arg(&sock_path)
.arg("-X")
.arg("POST")
.arg("-d")
.arg(r#"export def hello [] { "foo" }"#)
.arg("http://localhost/append/greeter.nu")
.output()
.await
.expect("curl append module failed");
assert!(output.status.success(), "append module should succeed");
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg("--unix-socket")
.arg(&sock_path)
.arg("-X")
.arg("POST")
.arg("-d")
.arg("{|req| use greeter; greeter hello}")
.arg("http://localhost/append/serve.nu")
.output()
.await
.expect("curl append topic failed");
assert!(output.status.success(), "append topic should succeed");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg(format!("{address}/"))
.output()
.await
.expect("curl failed");
assert_eq!(
String::from_utf8_lossy(&output.stdout).trim(),
"foo",
"Should return foo from greeter module"
);
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg("--unix-socket")
.arg(&sock_path)
.arg("-X")
.arg("POST")
.arg("-d")
.arg(r#"export def hello [] { "bar" }"#)
.arg("http://localhost/append/greeter.nu")
.output()
.await
.expect("curl append updated module failed");
assert!(
output.status.success(),
"append updated module should succeed"
);
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg("--unix-socket")
.arg(&sock_path)
.arg("-X")
.arg("POST")
.arg("-d")
.arg("{|req| use greeter; greeter hello}")
.arg("http://localhost/append/serve.nu")
.output()
.await
.expect("curl append updated topic failed");
assert!(
output.status.success(),
"append updated topic should succeed"
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg(format!("{address}/"))
.output()
.await
.expect("curl failed");
assert_eq!(
String::from_utf8_lossy(&output.stdout).trim(),
"bar",
"Should return bar from updated greeter module after hot reload"
);
let _ = child.kill().await;
}
#[cfg(feature = "cross-stream")]
#[tokio::test]
async fn test_mj_topic_resolves_templates_from_store() {
let tmp = tempfile::tempdir().unwrap();
let store_path = tmp.path().join("store");
let server = TestServer::new_with_store(
"127.0.0.1:0",
r#"{|req|
if $req.path == "/include" {
{name: "world"} | .mj --topic page.include
} else if $req.path == "/extends" {
{title: "Home"} | .mj --topic page.extends
} else {
"not found"
}
}"#,
&store_path,
)
.await;
let sock_path = store_path.join("sock");
for _ in 0..20 {
if sock_path.exists() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let append = |topic: &str, content: &str| {
let sock = sock_path.clone();
let topic = topic.to_string();
let content = content.to_string();
async move {
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg("--unix-socket")
.arg(&sock)
.arg("-X")
.arg("POST")
.arg("-d")
.arg(&content)
.arg(format!("http://localhost/append/{topic}"))
.output()
.await
.expect("curl append failed");
assert!(
output.status.success(),
"append {topic} should succeed: {}",
String::from_utf8_lossy(&output.stderr)
);
}
};
append("header", "<h1>Header</h1>\n").await;
append("layout", "LAYOUT[{% block body %}{% endblock %}]LAYOUT").await;
append(
"page.include",
r#"{% include "header" %}Hello, {{ name }}!"#,
)
.await;
append(
"page.extends",
r#"{% extends "layout" %}{% block body %}page content{% endblock %}"#,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let output = server.curl("/include").await;
let body = String::from_utf8_lossy(&output.stdout);
assert!(
body.contains("<h1>Header</h1>"),
"should include header from store, got: {body}"
);
assert!(
body.contains("Hello, world!"),
"should render template content, got: {body}"
);
let output = server.curl("/extends").await;
let body = String::from_utf8_lossy(&output.stdout);
assert!(
body.contains("LAYOUT[page content]LAYOUT"),
"should extend layout from store, got: {body}"
);
}
#[cfg(feature = "cross-stream")]
#[tokio::test]
async fn test_mj_compile_topic_and_render() {
let tmp = tempfile::tempdir().unwrap();
let store_path = tmp.path().join("store");
let server = TestServer::new_with_store(
"127.0.0.1:0",
r#"{|req|
let t = .mj compile --topic page
{name: "world"} | .mj render $t
}"#,
&store_path,
)
.await;
let sock_path = store_path.join("sock");
for _ in 0..20 {
if sock_path.exists() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let output = tokio::process::Command::new("curl")
.arg("-s")
.arg("--unix-socket")
.arg(&sock_path)
.arg("-X")
.arg("POST")
.arg("-d")
.arg("Hello, {{ name }}!")
.arg("http://localhost/append/page")
.output()
.await
.expect("curl append failed");
assert!(output.status.success());
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let output = server.curl("/").await;
let body = String::from_utf8_lossy(&output.stdout);
assert_eq!(body.trim(), "Hello, world!");
}
#[cfg(feature = "cross-stream")]
#[tokio::test]
async fn test_mj_topic_missing_returns_error() {
let tmp = tempfile::tempdir().unwrap();
let store_path = tmp.path().join("store");
let server = TestServer::new_with_store(
"127.0.0.1:0",
r#"{|req|
{} | .mj --topic nonexistent
}"#,
&store_path,
)
.await;
let sock_path = store_path.join("sock");
for _ in 0..20 {
if sock_path.exists() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let output = server.curl("/").await;
let body = String::from_utf8_lossy(&output.stdout);
assert!(
body.contains("Topic not found") || body.contains("error"),
"should report topic not found, got: {body}"
);
}
#[tokio::test]
async fn test_watch_script_error_ctrl_c_exits() {
let tmp = tempfile::tempdir().unwrap();
let script_path = tmp.path().join("handler.nu");
std::fs::write(
&script_path,
r#"let page = .mj compile "nonexistent/template.html"
{|req| "hello"}"#,
)
.unwrap();
let mut child = tokio::process::Command::new(assert_cmd::cargo::cargo_bin!("http-nu"))
.arg("--log-format")
.arg("jsonl")
.arg("127.0.0.1:0")
.arg(&script_path)
.arg("-w")
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to start http-nu server");
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let (err_tx, err_rx) = tokio::sync::oneshot::channel();
let mut err_tx = Some(err_tx);
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDOUT] {line}");
if err_tx.is_some() && line.contains("Failed to read template file") {
if let Some(tx) = err_tx.take() {
let _ = tx.send(());
}
}
}
});
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
eprintln!("[HTTP-NU STDERR] {line}");
}
});
timeout(std::time::Duration::from_secs(5), err_rx)
.await
.expect("Timed out waiting for script error")
.expect("Channel closed");
#[cfg(unix)]
{
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
let pid = Pid::from_raw(child.id().expect("child id") as i32);
kill(pid, Signal::SIGINT).expect("failed to send SIGINT");
}
#[cfg(not(unix))]
{
let _ = child.start_kill();
}
let status = timeout(std::time::Duration::from_secs(3), child.wait())
.await
.expect(
"server did not exit after SIGINT - Ctrl+C is broken when script fails in watch mode",
)
.expect("failed waiting for child");
eprintln!("Server exited with status: {status}");
}