coil-runtime 0.1.1

HTTP runtime and request handling for the Coil framework.
Documentation
use std::collections::BTreeMap;
use std::ffi::OsString;
use std::io::{Read, Write};
use std::net::{Shutdown, TcpListener, TcpStream};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;

use url::Url;

use super::super::super::super::*;
use super::RuntimeOutboundHttpBackend;

struct EnvVarGuard {
    key: &'static str,
    previous: Option<OsString>,
}

impl Drop for EnvVarGuard {
    fn drop(&mut self) {
        unsafe {
            match self.previous.take() {
                Some(value) => std::env::set_var(self.key, value),
                None => std::env::remove_var(self.key),
            }
        }
    }
}

fn execution_context() -> InvocationContext {
    InvocationContext::new(
        CustomerAppContext::new("showcase-events")
            .unwrap()
            .with_tenant_id("101")
            .unwrap()
            .with_locale("en-GB")
            .unwrap(),
        PrincipalRef::user("alice").unwrap(),
        TraceContext::new("trace-network").unwrap(),
        InvocationInput::Api(
            ApiInvocation::new("/network", coil_wasm::HttpMethod::Get).unwrap(),
        ),
    )
}

fn spawn_http_server(
    body: &'static str,
    delay: Option<Duration>,
) -> (String, thread::JoinHandle<()>) {
    let listener = TcpListener::bind("127.0.0.1:0").unwrap();
    let endpoint = format!("http://{}", listener.local_addr().unwrap());
    let handle = thread::spawn(move || {
        let (mut stream, _) = listener.accept().unwrap();
        let mut request = [0u8; 1024];
        let _ = stream.read(&mut request);
        if let Some(delay) = delay {
            std::thread::sleep(delay);
        }
        let response = format!(
            "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
            body.len(),
            body
        );
        let _ = stream.write_all(response.as_bytes());
    });

    (endpoint, handle)
}

fn read_full_http_request(stream: &mut TcpStream) {
    let mut request = Vec::new();
    let mut buffer = [0u8; 1024];
    let mut content_length = None;
    let mut header_end = None;

    loop {
        let bytes_read = match stream.read(&mut buffer) {
            Ok(0) => break,
            Ok(bytes_read) => bytes_read,
            Err(_) => break,
        };
        request.extend_from_slice(&buffer[..bytes_read]);

        if header_end.is_none() {
            if let Some(end) = request.windows(4).position(|window| window == b"\r\n\r\n") {
                let end = end + 4;
                header_end = Some(end);
                let headers = String::from_utf8_lossy(&request[..end]);
                content_length = headers.lines().find_map(|line| {
                    let (name, value) = line.split_once(':')?;
                    if name.eq_ignore_ascii_case("content-length") {
                        value.trim().parse::<usize>().ok()
                    } else {
                        None
                    }
                });
            }
        }

        if let Some(end) = header_end {
            let body_len = request.len().saturating_sub(end);
            if body_len >= content_length.unwrap_or(0) {
                break;
            }
        }
    }
}

#[test]
fn runtime_outbound_http_backend_requires_explicit_mappings() {
    let backend = RuntimeOutboundHttpBackend::with_settings(
        true,
        BTreeMap::new(),
        Duration::from_millis(100),
        1024,
    );

    let error = backend
        .submit_outbound_http_to_blocking_pool("http://127.0.0.1:8080", 64, &execution_context())
        .unwrap_err();

    assert!(error.contains("not mapped"), "unexpected error: {error}");
}

#[test]
fn runtime_outbound_http_backend_ignores_environment_fallbacks() {
    let env_key = "COIL_WASM_HTTP_NO_FALLBACK";
    let previous = std::env::var_os(env_key);
    unsafe {
        std::env::set_var(env_key, "https://env.example.com/api");
    }
    let _guard = EnvVarGuard {
        key: env_key,
        previous,
    };

    let backend = RuntimeOutboundHttpBackend::with_settings(
        true,
        BTreeMap::new(),
        Duration::from_millis(100),
        1024,
    );

    let error = backend
        .submit_outbound_http_to_blocking_pool("no-fallback", 64, &execution_context())
        .unwrap_err();

    assert!(error.contains("not mapped"), "unexpected error: {error}");
}

#[test]
fn runtime_outbound_http_backend_uses_explicit_endpoint_mappings() {
    let (endpoint, server) = spawn_http_server("mapped-response", None);
    let mapped_endpoint = Url::parse(&endpoint).unwrap().to_string();
    let mut targets = BTreeMap::new();
    targets.insert("crm".to_string(), Url::parse(&mapped_endpoint).unwrap());

    let backend =
        RuntimeOutboundHttpBackend::with_settings(true, targets, Duration::from_secs(1), 1024);
    let execution = backend
        .submit_outbound_http_to_blocking_pool("crm", 64, &execution_context())
        .expect("mapped endpoint should succeed");

    assert_eq!(execution.integration, "crm");
    assert_eq!(
        execution.endpoint,
        Url::parse(&endpoint).unwrap().to_string()
    );
    assert_eq!(execution.status, 200);
    assert_eq!(execution.response_bytes, "mapped-response".len() as u64);

    server.join().unwrap();
}

#[test]
fn runtime_outbound_http_backend_serves_linked_customer_hook_requests_through_the_same_mapping() {
    let listener = TcpListener::bind("127.0.0.1:0").unwrap();
    let endpoint = format!("http://{}", listener.local_addr().unwrap());
    let handle = thread::spawn(move || {
        let (mut stream, _) = listener.accept().unwrap();
        read_full_http_request(&mut stream);
        let response = concat!(
            "HTTP/1.1 200 OK\r\n",
            "Content-Type: application/json\r\n",
            "X-Trace: linked-hook\r\n",
            "Content-Length: 15\r\n",
            "Connection: close\r\n\r\n",
            "{\"status\":\"ok\"}"
        );
        let _ = stream.write_all(response.as_bytes());
        let _ = stream.flush();
        let _ = stream.shutdown(Shutdown::Both);
    });
    let mapped_endpoint = Url::parse(&endpoint).unwrap().to_string();
    let mut targets = BTreeMap::new();
    targets.insert("crm".to_string(), Url::parse(&mapped_endpoint).unwrap());

    let backend =
        RuntimeOutboundHttpBackend::with_settings(true, targets, Duration::from_secs(1), 1024);
    let response = backend
        .send(&coil_customer_sdk::OutboundHttpRequest {
            integration: "crm".to_string(),
            method: "POST".to_string(),
            url: mapped_endpoint,
            headers: BTreeMap::from([("content-type".to_string(), "application/json".to_string())]),
            body: br#"{"ping":true}"#.to_vec(),
        })
        .expect("mapped linked-hook request should succeed");
    let normalized_headers = response
        .headers
        .iter()
        .map(|(name, value)| (name.to_ascii_lowercase(), value.clone()))
        .collect::<BTreeMap<_, _>>();

    assert_eq!(response.status, 200);
    assert_eq!(
        normalized_headers.get("content-type").map(String::as_str),
        Some("application/json")
    );
    assert_eq!(
        normalized_headers.get("x-trace").map(String::as_str),
        Some("linked-hook")
    );
    assert_eq!(response.body, br#"{"status":"ok"}"#);

    handle.join().unwrap();
}

#[test]
fn runtime_outbound_http_backend_rejects_url_drift_for_linked_customer_hooks() {
    let mut targets = BTreeMap::new();
    targets.insert(
        "crm".to_string(),
        Url::parse("https://crm.example.com/api").unwrap(),
    );

    let backend =
        RuntimeOutboundHttpBackend::with_settings(true, targets, Duration::from_secs(1), 1024);
    let error = backend
        .send(&coil_customer_sdk::OutboundHttpRequest {
            integration: "crm".to_string(),
            method: "GET".to_string(),
            url: "https://crm.example.com/api/customers".to_string(),
            headers: BTreeMap::new(),
            body: Vec::new(),
        })
        .unwrap_err();

    assert!(
        error.contains("approved endpoint"),
        "unexpected error: {error}"
    );
}

#[test]
fn runtime_outbound_http_backend_enforces_response_size_limits() {
    let (endpoint, server) = spawn_http_server("too-many-bytes", None);
    let mut targets = BTreeMap::new();
    targets.insert("search".to_string(), Url::parse(&endpoint).unwrap());

    let backend =
        RuntimeOutboundHttpBackend::with_settings(true, targets, Duration::from_secs(1), 1024);
    let error = backend
        .submit_outbound_http_to_blocking_pool("search", 4, &execution_context())
        .unwrap_err();

    assert!(
        error.contains("exceeded the configured limit"),
        "unexpected error: {error}"
    );

    server.join().unwrap();
}

#[test]
fn runtime_outbound_http_backend_times_out_slow_endpoints() {
    let (endpoint, server) = spawn_http_server("slow-response", Some(Duration::from_millis(200)));
    let mut targets = BTreeMap::new();
    targets.insert("billing".to_string(), Url::parse(&endpoint).unwrap());

    let backend =
        RuntimeOutboundHttpBackend::with_settings(true, targets, Duration::from_millis(25), 1024);
    let error = backend
        .submit_outbound_http_to_blocking_pool("billing", 64, &execution_context())
        .unwrap_err();

    assert!(error.contains("timed out"), "unexpected error: {error}");

    server.join().unwrap();
}

#[test]
fn runtime_outbound_http_backend_yields_worker_threads_while_waiting() {
    let (endpoint, server) = spawn_http_server("slow-response", Some(Duration::from_millis(150)));
    let mut targets = BTreeMap::new();
    targets.insert("billing".to_string(), Url::parse(&endpoint).unwrap());

    let backend =
        RuntimeOutboundHttpBackend::with_settings(true, targets, Duration::from_millis(500), 1024);
    let progress = Arc::new(AtomicUsize::new(0));
    let armed = Arc::new(AtomicBool::new(false));
    let progress_probe = Arc::clone(&progress);
    let armed_probe = Arc::clone(&armed);
    let arming_thread = thread::spawn({
        let armed = Arc::clone(&armed);
        move || {
            std::thread::sleep(Duration::from_millis(20));
            armed.store(true, Ordering::SeqCst);
        }
    });
    let worker = thread::spawn(move || {
        let runtime = tokio::runtime::Builder::new_multi_thread()
            .worker_threads(1)
            .enable_all()
            .build()
            .unwrap();

        runtime.block_on(async move {
            let probe = tokio::spawn(async move {
                while !armed_probe.load(Ordering::SeqCst) {
                    tokio::task::yield_now().await;
                }
                progress_probe.fetch_add(1, Ordering::SeqCst);
            });

            let result =
                backend.submit_outbound_http_to_blocking_pool("billing", 64, &execution_context());
            assert!(result.is_ok(), "unexpected error: {result:?}");
            probe.await.unwrap();
        });
    });

    std::thread::sleep(Duration::from_millis(50));
    assert!(
        progress.load(Ordering::SeqCst) > 0,
        "worker thread was pinned while outbound HTTP was in flight"
    );

    worker.join().unwrap();
    arming_thread.join().unwrap();
    server.join().unwrap();
}