use std::cell::RefCell;
use std::collections::HashMap;
use std::net::IpAddr;
pub struct FetchRequest {
pub resolver: v8::Global<v8::PromiseResolver>,
pub url: String,
pub method: String,
pub headers: HashMap<String, String>,
pub body: Option<String>,
}
thread_local! {
pub static FETCH_QUEUE: RefCell<Vec<FetchRequest>> = const { RefCell::new(Vec::new()) };
static HTTP_CLIENT: reqwest::Client = reqwest::Client::new();
static TOKIO_RT: tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create tokio runtime for fetch");
}
macro_rules! set_v8_prop {
($scope:expr, $obj:expr, $name:expr, $value:expr) => {
if let Some(k) = v8::String::new($scope, $name) {
$obj.set($scope, k.into(), $value);
}
};
}
pub fn fetch_callback(
scope: &mut v8::PinScope,
args: v8::FunctionCallbackArguments,
mut ret: v8::ReturnValue,
) {
if args.length() < 1 || args.get(0).is_undefined() || args.get(0).is_null() {
ret.set(v8::undefined(scope).into());
return;
}
let url = args.get(0).to_rust_string_lossy(scope);
let mut method = "GET".to_string();
let mut headers = HashMap::new();
let mut body = None;
if args.length() >= 2 && args.get(1).is_object() {
if let Some(init) = args.get(1).to_object(scope) {
if let Some(method_key) = v8::String::new(scope, "method") {
if let Some(m) = init.get(scope, method_key.into()) {
if !m.is_undefined() && !m.is_null() {
method = m.to_rust_string_lossy(scope).to_uppercase();
}
}
}
if let Some(headers_key) = v8::String::new(scope, "headers") {
if let Some(h) = init.get(scope, headers_key.into()) {
if h.is_object() && !h.is_null() {
if let Some(obj) = h.to_object(scope) {
if let Some(names) =
obj.get_own_property_names(scope, Default::default())
{
for i in 0..names.length() {
if let Some(key) = names.get_index(scope, i) {
if let Some(val) = obj.get(scope, key) {
headers.insert(
key.to_rust_string_lossy(scope),
val.to_rust_string_lossy(scope),
);
}
}
}
}
}
}
}
}
if let Some(body_key) = v8::String::new(scope, "body") {
if let Some(b) = init.get(scope, body_key.into()) {
if !b.is_undefined() && !b.is_null() {
body = Some(b.to_rust_string_lossy(scope));
}
}
}
}
}
let Some(resolver) = v8::PromiseResolver::new(scope) else {
ret.set(v8::undefined(scope).into());
return;
};
let promise = resolver.get_promise(scope);
let global_resolver = v8::Global::new(scope, resolver);
FETCH_QUEUE.with(|q| {
q.borrow_mut().push(FetchRequest {
resolver: global_resolver,
url,
method,
headers,
body,
});
});
ret.set(promise.into());
}
pub struct FetchResult {
pub status: u16,
pub status_text: String,
pub headers: HashMap<String, String>,
pub body: String,
pub url: String,
}
pub fn drain_fetch_queue() -> Vec<FetchRequest> {
FETCH_QUEUE.with(|q| q.borrow_mut().drain(..).collect())
}
pub fn execute_fetch_batch(requests: &[FetchRequest]) -> Vec<Result<FetchResult, String>> {
if requests.is_empty() {
return vec![];
}
TOKIO_RT.with(|rt| {
rt.block_on(async {
let futures: Vec<_> = requests
.iter()
.map(|req| {
let url = req.url.clone();
let method = req.method.clone();
let headers = req.headers.clone();
let body = req.body.clone();
async move { do_fetch(&url, &method, &headers, body.as_deref()).await }
})
.collect();
futures::future::join_all(futures).await
})
})
}
pub fn is_private_ip(ip: &IpAddr) -> bool {
match ip {
IpAddr::V4(v4) => {
v4.is_loopback() || v4.is_private() || v4.is_link_local() || v4.is_unspecified() || v4.is_broadcast() }
IpAddr::V6(v6) => {
v6.is_loopback() || v6.is_unspecified() || v6.to_ipv4_mapped().is_some_and(|v4| {
v4.is_loopback() || v4.is_private() || v4.is_link_local() || v4.is_unspecified()
})
}
}
}
pub async fn validate_url_not_private(url: &str) -> Result<(), String> {
if let Ok(allowed) = std::env::var("REX_INTERNAL_ORIGIN") {
if url.starts_with(&allowed) {
return Ok(());
}
}
let parsed = reqwest::Url::parse(url).map_err(|e| format!("Invalid URL: {e}"))?;
let host = parsed.host_str().ok_or("URL has no host")?;
if let Ok(ip) = host.parse::<IpAddr>() {
if is_private_ip(&ip) {
return Err(format!(
"fetch blocked: {host} resolves to a private address"
));
}
return Ok(());
}
let port = parsed.port_or_known_default().unwrap_or(80);
let addrs = tokio::net::lookup_host(format!("{host}:{port}"))
.await
.map_err(|e| format!("DNS resolution failed for {host}: {e}"))?;
for addr in addrs {
if is_private_ip(&addr.ip()) {
return Err(format!(
"fetch blocked: {host} resolves to private address {}",
addr.ip()
));
}
}
Ok(())
}
async fn do_fetch(
url: &str,
method: &str,
headers: &HashMap<String, String>,
body: Option<&str>,
) -> Result<FetchResult, String> {
validate_url_not_private(url).await?;
let method_parsed = method
.parse::<reqwest::Method>()
.map_err(|e| format!("Invalid method: {e}"))?;
let client = HTTP_CLIENT.with(|c| c.clone());
let mut builder = client.request(method_parsed, url);
for (k, v) in headers {
builder = builder.header(k.as_str(), v.as_str());
}
if let Some(b) = body {
builder = builder.body(b.to_string());
}
let resp = builder
.send()
.await
.map_err(|e| format!("fetch error: {e}"))?;
let status = resp.status().as_u16();
let status_text = resp.status().canonical_reason().unwrap_or("").to_string();
let url = resp.url().to_string();
let resp_headers: HashMap<String, String> = resp
.headers()
.iter()
.map(|(k, v)| {
(
k.as_str().to_lowercase(),
v.to_str().unwrap_or("").to_string(),
)
})
.collect();
let body_text = resp
.text()
.await
.map_err(|e| format!("fetch body error: {e}"))?;
Ok(FetchResult {
status,
status_text,
headers: resp_headers,
body: body_text,
url,
})
}
macro_rules! resolve_fetch_promise {
($scope:expr, $resolver:expr, $result:expr) => {{
let response = v8::Object::new($scope);
set_v8_prop!(
$scope,
response,
"status",
v8::Integer::new($scope, $result.status as i32).into()
);
if let Some(v) = v8::String::new($scope, &$result.status_text) {
set_v8_prop!($scope, response, "statusText", v.into());
}
set_v8_prop!(
$scope,
response,
"ok",
v8::Boolean::new($scope, (200..300).contains(&$result.status)).into()
);
if let Some(v) = v8::String::new($scope, &$result.url) {
set_v8_prop!($scope, response, "url", v.into());
}
let headers_obj = v8::Object::new($scope);
for (hk, hv) in &$result.headers {
if let (Some(k), Some(v)) = (v8::String::new($scope, hk), v8::String::new($scope, hv)) {
headers_obj.set($scope, k.into(), v.into());
}
}
let get_template = v8::FunctionTemplate::new($scope, headers_get_callback);
if let Some(get_fn) = get_template.get_function($scope) {
set_v8_prop!($scope, headers_obj, "get", get_fn.into());
}
set_v8_prop!($scope, response, "headers", headers_obj.into());
if let Some(body_str) = v8::String::new($scope, &$result.body) {
set_v8_prop!($scope, response, "_body", body_str.into());
}
let json_template = v8::FunctionTemplate::new($scope, response_json_callback);
if let Some(json_fn) = json_template.get_function($scope) {
set_v8_prop!($scope, response, "json", json_fn.into());
}
let text_template = v8::FunctionTemplate::new($scope, response_text_callback);
if let Some(text_fn) = text_template.get_function($scope) {
set_v8_prop!($scope, response, "text", text_fn.into());
}
$resolver.resolve($scope, response.into());
}};
}
macro_rules! reject_fetch_promise {
($scope:expr, $resolver:expr, $error_msg:expr) => {
if let Some(msg) = v8::String::new($scope, $error_msg) {
let err = v8::Exception::error($scope, msg);
$resolver.reject($scope, err);
}
};
}
fn headers_get_callback(
scope: &mut v8::PinScope,
args: v8::FunctionCallbackArguments,
mut ret: v8::ReturnValue,
) {
if args.length() < 1 {
ret.set(v8::undefined(scope).into());
return;
}
let name = args.get(0).to_rust_string_lossy(scope).to_lowercase();
let this = args.this();
if let Some(key) = v8::String::new(scope, &name) {
if let Some(val) = this.get(scope, key.into()) {
if !val.is_undefined() && !val.is_function() {
ret.set(val);
return;
}
}
}
ret.set(v8::null(scope).into());
}
fn response_json_callback(
scope: &mut v8::PinScope,
args: v8::FunctionCallbackArguments,
mut ret: v8::ReturnValue,
) {
let this = args.this();
let Some(body_key) = v8::String::new(scope, "_body") else {
return;
};
let body = this
.get(scope, body_key.into())
.unwrap_or_else(|| v8::undefined(scope).into());
let Some(resolver) = v8::PromiseResolver::new(scope) else {
return;
};
let promise = resolver.get_promise(scope);
let json_str = body.to_rust_string_lossy(scope);
let result = v8::String::new(scope, &json_str).and_then(|s| v8::json::parse(scope, s));
match result {
Some(parsed) => {
resolver.resolve(scope, parsed);
}
None => {
let msg = v8::String::new(scope, "Failed to parse JSON response body")
.unwrap_or_else(|| v8::String::empty(scope));
let err = v8::Exception::syntax_error(scope, msg);
resolver.reject(scope, err);
}
}
ret.set(promise.into());
}
fn response_text_callback(
scope: &mut v8::PinScope,
args: v8::FunctionCallbackArguments,
mut ret: v8::ReturnValue,
) {
let this = args.this();
let Some(body_key) = v8::String::new(scope, "_body") else {
return;
};
let body = this
.get(scope, body_key.into())
.unwrap_or_else(|| v8::undefined(scope).into());
let Some(resolver) = v8::PromiseResolver::new(scope) else {
return;
};
let promise = resolver.get_promise(scope);
resolver.resolve(scope, body);
ret.set(promise.into());
}
const FETCH_LOOP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
pub fn drain_js_timers(isolate: &mut v8::OwnedIsolate, context: &v8::Global<v8::Context>) -> bool {
v8::scope_with_context!(scope, isolate, context);
let global = context.open(scope).global(scope);
let Some(key) = v8::String::new(scope, "__rex_drain_timers") else {
return false;
};
let Some(func_val) = global.get(scope, key.into()) else {
return false;
};
let Ok(func) = v8::Local::<v8::Function>::try_from(func_val) else {
return false;
};
let recv = v8::undefined(scope);
match func.call(scope, recv.into(), &[]) {
Some(result) => result.boolean_value(scope),
None => false,
}
}
pub fn run_fetch_loop(isolate: &mut v8::OwnedIsolate, context: &v8::Global<v8::Context>) {
let deadline = std::time::Instant::now() + FETCH_LOOP_TIMEOUT;
loop {
if std::time::Instant::now() > deadline {
tracing::error!(
"IO loop timed out after {}s — possible infinite fetch/IO chain",
FETCH_LOOP_TIMEOUT.as_secs()
);
let remaining = drain_fetch_queue();
if !remaining.is_empty() {
v8::scope_with_context!(scope, isolate, context);
for req in remaining {
let resolver = v8::Local::new(scope, &req.resolver);
if let Some(msg) = v8::String::new(scope, "IO loop timed out") {
let err = v8::Exception::error(scope, msg);
resolver.reject(scope, err);
}
}
}
break;
}
isolate.perform_microtask_checkpoint();
let timer_progress = drain_js_timers(isolate, context);
if timer_progress {
isolate.perform_microtask_checkpoint();
}
let pending_fetch = drain_fetch_queue();
let has_tcp = crate::tcp::has_active_tcp_sockets();
let had_fetch = !pending_fetch.is_empty();
if !had_fetch && !has_tcp && !timer_progress {
break;
}
let mut made_progress = timer_progress;
if had_fetch {
made_progress = true;
let results = execute_fetch_batch(&pending_fetch);
v8::scope_with_context!(scope, isolate, context);
for (req, result) in pending_fetch.into_iter().zip(results) {
let resolver = v8::Local::new(scope, &req.resolver);
match result {
Ok(ref resp) => {
resolve_fetch_promise!(scope, resolver, resp);
}
Err(ref e) => {
reject_fetch_promise!(scope, resolver, e);
}
}
}
}
if has_tcp {
let tcp_progress = crate::tcp::poll_tcp_sockets(isolate, context);
if tcp_progress {
made_progress = true;
isolate.perform_microtask_checkpoint();
}
}
if made_progress {
continue;
}
std::thread::sleep(std::time::Duration::from_millis(1));
isolate.perform_microtask_checkpoint();
if drain_js_timers(isolate, context) {
continue;
}
let new_fetch = FETCH_QUEUE.with(|q| !q.borrow().is_empty());
if new_fetch {
continue;
}
if has_tcp {
let tcp_progress = crate::tcp::poll_tcp_sockets(isolate, context);
if tcp_progress {
isolate.perform_microtask_checkpoint();
continue;
}
}
break;
}
}