use std::sync::Arc;
use std::time::Duration;
use nika::ast::{FetchParams, TaskAction};
use nika::binding::ResolvedBindings;
use nika::error::NikaError;
use nika::event::EventLog;
use nika::runtime::TaskExecutor;
use nika::store::RunContext;
use rustc_hash::FxHashMap;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
fn mock_executor() -> TaskExecutor {
let event_log = EventLog::new();
TaskExecutor::new("mock", None, None, event_log)
}
fn test_context() -> (ResolvedBindings, RunContext) {
(ResolvedBindings::new(), RunContext::new())
}
fn fetch_params(url: &str) -> FetchParams {
FetchParams {
url: url.to_string(),
method: "GET".to_string(),
headers: FxHashMap::default(),
body: None,
json: None,
timeout: None,
retry: None,
follow_redirects: None,
response: None,
extract: None,
selector: None,
}
}
async fn start_delayed_server(delay: Duration) -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let url = format!("http://{}", addr);
tokio::spawn(async move {
if let Ok((mut socket, _)) = listener.accept().await {
tokio::time::sleep(delay).await;
let response = "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK";
let _ = socket.write_all(response.as_bytes()).await;
}
});
url
}
async fn start_status_server(status: u16, body: &str) -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let url = format!("http://{}", addr);
let response = format!(
"HTTP/1.1 {} Status\r\nContent-Type: text/plain\r\nContent-Length: {}\r\n\r\n{}",
status,
body.len(),
body
);
tokio::spawn(async move {
if let Ok((mut socket, _)) = listener.accept().await {
let mut buf = [0u8; 1024];
let _ = tokio::io::AsyncReadExt::read(&mut socket, &mut buf).await;
let _ = socket.write_all(response.as_bytes()).await;
}
});
tokio::time::sleep(Duration::from_millis(10)).await;
url
}
async fn start_malformed_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let url = format!("http://{}", addr);
tokio::spawn(async move {
if let Ok((mut socket, _)) = listener.accept().await {
let mut buf = [0u8; 1024];
let _ = tokio::io::AsyncReadExt::read(&mut socket, &mut buf).await;
let _ = socket.write_all(b"NOT HTTP AT ALL\r\n").await;
}
});
tokio::time::sleep(Duration::from_millis(10)).await;
url
}
#[tokio::test]
async fn test_fetch_invalid_url_returns_execution_error() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_invalid");
let (bindings, datastore) = test_context();
let action = TaskAction::Fetch {
fetch: fetch_params("not-a-url"),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(result.is_err(), "Invalid URL should fail");
match result.unwrap_err() {
NikaError::Execution(msg) => {
assert!(
msg.contains("HTTP request failed"),
"Error should mention HTTP failure: {msg}"
);
}
err => panic!("Expected Execution error, got: {err:?}"),
}
}
#[tokio::test]
async fn test_fetch_invalid_scheme_returns_error() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_bad_scheme");
let (bindings, datastore) = test_context();
let action = TaskAction::Fetch {
fetch: fetch_params("ftp://example.com/file"),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(result.is_err(), "Unsupported scheme should fail");
match result.unwrap_err() {
NikaError::Execution(msg) => {
assert!(
msg.contains("HTTP request failed"),
"Error should mention HTTP failure: {msg}"
);
}
err => panic!("Expected Execution error, got: {err:?}"),
}
}
#[tokio::test]
async fn test_fetch_non_2xx_status_returns_body_not_error() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_500");
let (bindings, datastore) = test_context();
let url = start_status_server(500, "Internal Server Error").await;
let action = TaskAction::Fetch {
fetch: fetch_params(&url),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(
result.is_ok(),
"Current impl returns body for any HTTP response: {:?}",
result.err()
);
let body = result.unwrap();
assert!(
body.contains("Internal Server Error"),
"Body should contain server response: {body}"
);
}
#[tokio::test]
async fn test_fetch_404_returns_body() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_404");
let (bindings, datastore) = test_context();
let url = start_status_server(404, "Not Found").await;
let action = TaskAction::Fetch {
fetch: fetch_params(&url),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(result.is_ok(), "404 returns body: {:?}", result.err());
assert_eq!(result.unwrap(), "Not Found");
}
#[tokio::test]
async fn test_fetch_connection_refused_returns_error() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_refused");
let (bindings, datastore) = test_context();
let action = TaskAction::Fetch {
fetch: fetch_params("http://127.0.0.1:1/"),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(result.is_err(), "Connection refused should fail");
match result.unwrap_err() {
NikaError::Execution(msg) => {
assert!(
msg.contains("HTTP request failed"),
"Error should mention HTTP failure: {msg}"
);
}
err => panic!("Expected Execution error, got: {err:?}"),
}
}
#[tokio::test]
async fn test_fetch_malformed_response_returns_error() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_malformed");
let (bindings, datastore) = test_context();
let url = start_malformed_server().await;
let action = TaskAction::Fetch {
fetch: fetch_params(&url),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(result.is_err(), "Malformed HTTP should fail");
match result.unwrap_err() {
NikaError::Execution(msg) => {
assert!(
msg.contains("HTTP request failed") || msg.contains("Failed to read"),
"Error should mention HTTP or read failure: {msg}"
);
}
err => panic!("Expected Execution error, got: {err:?}"),
}
}
#[tokio::test]
#[ignore = "Requires custom executor with short timeout - takes too long for CI"]
async fn test_fetch_timeout_with_delayed_server() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_timeout");
let (bindings, datastore) = test_context();
let url = start_delayed_server(Duration::from_secs(35)).await;
let action = TaskAction::Fetch {
fetch: fetch_params(&url),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(result.is_err(), "Timeout should fail");
match result.unwrap_err() {
NikaError::Execution(msg) => {
assert!(
msg.contains("timeout") || msg.contains("timed out"),
"Error should mention timeout: {msg}"
);
}
err => panic!("Expected Execution error with timeout, got: {err:?}"),
}
}
#[tokio::test]
async fn test_fetch_success_returns_body() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_success");
let (bindings, datastore) = test_context();
let url = start_status_server(200, "Hello, World!").await;
let action = TaskAction::Fetch {
fetch: fetch_params(&url),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(result.is_ok(), "200 OK should succeed: {:?}", result.err());
assert_eq!(result.unwrap(), "Hello, World!");
}
#[tokio::test]
async fn test_fetch_with_json_body() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_json");
let (bindings, datastore) = test_context();
let json_body = r#"{"status":"ok","count":42}"#;
let url = start_status_server(200, json_body).await;
let action = TaskAction::Fetch {
fetch: fetch_params(&url),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(result.is_ok(), "JSON response should succeed");
let body = result.unwrap();
assert_eq!(body, json_body);
let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(parsed["count"], 42);
}
#[tokio::test]
async fn test_fetch_empty_url_fails() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_empty");
let (bindings, datastore) = test_context();
let action = TaskAction::Fetch {
fetch: fetch_params(""),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(result.is_err(), "Empty URL should fail");
match result.unwrap_err() {
NikaError::Execution(msg) => {
assert!(
msg.contains("HTTP request failed"),
"Error should mention HTTP failure: {msg}"
);
}
err => panic!("Expected Execution error, got: {err:?}"),
}
}
#[tokio::test]
async fn test_fetch_url_with_invalid_characters() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_invalid_chars");
let (bindings, datastore) = test_context();
let action = TaskAction::Fetch {
fetch: fetch_params("http://[invalid-ipv6/path"),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(result.is_err(), "Malformed URL should fail");
match result.unwrap_err() {
NikaError::Execution(msg) => {
assert!(
msg.contains("HTTP request failed"),
"Error should mention HTTP failure: {msg}"
);
}
err => panic!("Expected Execution error, got: {err:?}"),
}
}
#[tokio::test]
async fn test_fetch_localhost_unreachable_fails() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_unreachable");
let (bindings, datastore) = test_context();
let action = TaskAction::Fetch {
fetch: fetch_params("http://127.0.0.1:59999/nonexistent"),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(result.is_err(), "Unreachable host should fail");
}
#[tokio::test]
async fn test_fetch_dns_resolution_failure() {
let executor = mock_executor();
let task_id: Arc<str> = Arc::from("fetch_dns_fail");
let (bindings, datastore) = test_context();
let action = TaskAction::Fetch {
fetch: fetch_params("http://this-domain-definitely-does-not-exist-12345.invalid/"),
};
let result = executor
.execute(&task_id, &action, &bindings, &datastore, None)
.await;
assert!(result.is_err(), "DNS failure should fail");
match result.unwrap_err() {
NikaError::Execution(msg) => {
assert!(
msg.contains("HTTP request failed"),
"Error should mention HTTP failure: {msg}"
);
}
err => panic!("Expected Execution error, got: {err:?}"),
}
}