pipa-js 0.1.3

A fast, minimal ES2023 JavaScript runtime built in Rust.
Documentation
use std::io::{Read, Write};

use crate::host::HostFunction;
use crate::http::conn::Connection;
use crate::http::url::Url;
use crate::object::function::JSFunction;
use crate::runtime::context::JSContext;
use crate::util::iomux::Poller;
use crate::value::JSValue;

pub fn register_eventsource(ctx: &mut JSContext) {
    ctx.register_builtin("EventSource", HostFunction::new("EventSource", 1, es_ctor));
    ctx.register_builtin("es_read", HostFunction::new("read", 1, es_read));
    ctx.register_builtin("es_close", HostFunction::new("close", 1, es_close));
}

fn create_builtin_function(ctx: &mut JSContext, name: &str) -> JSValue {
    let mut func = JSFunction::new_builtin(ctx.intern(name), 1);
    func.set_builtin_marker(ctx, name);
    let ptr = Box::into_raw(Box::new(func)) as usize;
    ctx.runtime_mut().gc_heap_mut().track_function(ptr);
    JSValue::new_function(ptr)
}

fn es_ctor(ctx: &mut JSContext, args: &[JSValue]) -> JSValue {
    if args.is_empty() || !args[0].is_string() {
        eprintln!("EventSource: url required");
        return JSValue::undefined();
    }

    let url_str = ctx.get_atom_str(args[0].get_atom()).to_string();
    let url = match Url::parse(&url_str) {
        Ok(u) => u,
        Err(e) => {
            eprintln!("EventSource: invalid URL: {e}");
            return JSValue::undefined();
        }
    };

    let use_tls = url.is_tls();
    let rx = match Connection::connect_async(url.host.clone(), url.port, use_tls) {
        Ok(r) => r,
        Err(e) => {
            eprintln!("EventSource: connect failed: {e}");
            return JSValue::undefined();
        }
    };

    let mut conn = match rx.recv() {
        Ok(Ok(c)) => c,
        _ => {
            eprintln!("EventSource: connection failed");
            return JSValue::undefined();
        }
    };

    let path = url.request_target();
    let host_header = format!("{}:{}", url.host, url.port);
    let request = format!(
        "GET {path} HTTP/1.1\r\n\
         Host: {host}\r\n\
         Accept: text/event-stream\r\n\
         Cache-Control: no-cache\r\n\
         Connection: keep-alive\r\n\r\n",
        host = host_header
    );

    conn.set_nonblocking(false).ok();
    if conn.write_all(request.as_bytes()).is_err() {
        eprintln!("EventSource: write failed");
        return JSValue::undefined();
    }

    conn.set_read_timeout(Some(std::time::Duration::from_secs(5)))
        .ok();
    let mut resp_buf = Vec::new();
    loop {
        let mut tmp = [0u8; 4096];
        match conn.read(&mut tmp) {
            Ok(0) => break,
            Ok(n) => {
                resp_buf.extend_from_slice(&tmp[..n]);
                if resp_buf.windows(4).any(|w| w == b"\r\n\r\n") {
                    break;
                }
            }
            Err(e) => {
                eprintln!("EventSource: header read error: {e}");
                return JSValue::undefined();
            }
        }
    }

    let resp_str = String::from_utf8_lossy(&resp_buf);
    if !resp_str.contains("200") && !resp_str.contains("201") {
        eprintln!(
            "EventSource: bad status: {}",
            &resp_str[..resp_str.find('\r').unwrap_or(resp_str.len())]
        );
        return JSValue::undefined();
    }

    let mut es_obj = crate::object::object::JSObject::new();
    es_obj.set(ctx.intern("url"), JSValue::new_string(ctx.intern(&url_str)));
    es_obj.set(ctx.intern("readyState"), JSValue::new_int(1));
    es_obj.set(ctx.intern("onopen"), JSValue::undefined());
    es_obj.set(ctx.intern("onmessage"), JSValue::undefined());
    es_obj.set(ctx.intern("onerror"), JSValue::undefined());

    let read_fn = create_builtin_function(ctx, "es_read");
    es_obj.set(ctx.intern("read"), read_fn);

    let close_fn = create_builtin_function(ctx, "es_close");
    es_obj.set(ctx.intern("close"), close_fn);

    let conn_idx = ctx.runtime_mut().add_connection(conn);
    es_obj.set(ctx.intern("__conn__"), JSValue::new_int(conn_idx as i64));

    let ptr = Box::into_raw(Box::new(es_obj)) as usize;
    JSValue::new_object(ptr)
}

fn get_conn_idx(ctx: &mut JSContext, args: &[JSValue]) -> usize {
    if args.is_empty() || !args[0].is_object() {
        return usize::MAX;
    }
    let obj = args[0].as_object();
    let val = obj
        .get(ctx.intern("__conn__"))
        .unwrap_or(JSValue::undefined());
    if !val.is_int() {
        return usize::MAX;
    }
    val.get_int() as usize
}

#[derive(Debug, Clone)]
pub struct SseEvent {
    pub data: String,
    pub event_type: String,
    pub last_event_id: String,
    pub is_closed: bool,
}

fn es_read(ctx: &mut JSContext, args: &[JSValue]) -> JSValue {
    let parsed = match read_parsed(ctx, args) {
        Some(p) => p,
        None => return JSValue::undefined(),
    };

    if parsed.is_closed {
        return JSValue::new_string(ctx.intern("__CLOSED__"));
    }

    let mut evt = crate::object::object::JSObject::new();
    evt.set(
        ctx.intern("data"),
        JSValue::new_string(ctx.intern(&parsed.data)),
    );
    if !parsed.event_type.is_empty() {
        evt.set(
            ctx.intern("event"),
            JSValue::new_string(ctx.intern(&parsed.event_type)),
        );
    }
    if !parsed.last_event_id.is_empty() {
        evt.set(
            ctx.intern("lastEventId"),
            JSValue::new_string(ctx.intern(&parsed.last_event_id)),
        );
    }

    let ptr = Box::into_raw(Box::new(evt)) as usize;
    let evt_val = JSValue::new_object(ptr);

    if args[0].is_object() {
        let cb = args[0]
            .as_object()
            .get(ctx.intern("onmessage"))
            .unwrap_or(JSValue::undefined());
        if cb.is_function() {
            if let Some(vp) = ctx.get_register_vm_ptr() {
                let vm = unsafe { &mut *(vp as *mut crate::runtime::vm::VM) };
                let _ = vm.call_function(ctx, cb, &[evt_val]);
            }
        }
    }

    evt_val
}

fn read_parsed(ctx: &mut JSContext, args: &[JSValue]) -> Option<SseEvent> {
    let idx = get_conn_idx(ctx, args);
    if idx == usize::MAX {
        return None;
    }

    let timeout_ms = if args.len() > 1 && args[1].is_int() {
        args[1].get_int().max(0)
    } else {
        0i64
    };

    let conn = match ctx.runtime_mut().get_connection(idx) {
        Some(c) => c,
        None => return None,
    };

    conn.set_nonblocking(true).ok();
    let fd = conn.raw_fd();

    let mut poller = if timeout_ms > 0 {
        let mut p = Poller::new().ok()?;
        p.register(fd, true, false).ok()?;
        Some(p)
    } else {
        None
    };

    let start = std::time::Instant::now();
    let mut sse_buf = Vec::new();

    loop {
        let mut buf = [0u8; 16384];
        match conn.read(&mut buf) {
            Ok(0) => {
                return Some(SseEvent {
                    data: String::new(),
                    event_type: String::new(),
                    last_event_id: String::new(),
                    is_closed: true,
                });
            }
            Ok(n) => {
                sse_buf.extend_from_slice(&buf[..n]);
                if let Some(evt) = parse_sse_event(&mut sse_buf) {
                    return Some(evt);
                }
                if timeout_ms > 0 && start.elapsed().as_millis() as i64 >= timeout_ms {
                    return None;
                }
            }
            _ => {
                if !sse_buf.is_empty() {
                    if let Some(evt) = parse_sse_event(&mut sse_buf) {
                        return Some(evt);
                    }
                }
                if timeout_ms <= 0 {
                    return None;
                }
                let elapsed = start.elapsed().as_millis() as i64;
                if elapsed >= timeout_ms {
                    return None;
                }
                if let Some(ref mut p) = poller {
                    let remaining = (timeout_ms - elapsed) as i32;
                    let _ = p.wait(remaining.max(1));
                }
            }
        }
    }
}

fn es_close(ctx: &mut JSContext, args: &[JSValue]) -> JSValue {
    let idx = get_conn_idx(ctx, args);
    if idx != usize::MAX {
        ctx.runtime_mut().release_connection(idx);
    }
    JSValue::undefined()
}

pub fn parse_sse_event(buf: &mut Vec<u8>) -> Option<SseEvent> {
    let double_nl = buf.windows(2).position(|w| w == b"\n\n")?;

    let section = &buf[..double_nl];
    let mut data = String::new();
    let mut event_type = String::new();
    let mut last_event_id = String::new();

    for line in section.split(|&b| b == b'\n') {
        if line.is_empty() || line == b"\r" {
            continue;
        }
        if line.starts_with(b"data:") {
            let val = line[5..].strip_prefix(b" ").unwrap_or(&line[5..]);
            if !data.is_empty() {
                data.push('\n');
            }
            data.push_str(&String::from_utf8_lossy(val));
        } else if line.starts_with(b"event:") {
            let val = line[6..].strip_prefix(b" ").unwrap_or(&line[6..]);
            event_type = String::from_utf8_lossy(val).to_string();
        } else if line.starts_with(b"id:") {
            let val = line[3..].strip_prefix(b" ").unwrap_or(&line[3..]);
            last_event_id = String::from_utf8_lossy(val).to_string();
        }
    }

    buf.drain(..double_nl + 2);

    if data.is_empty() {
        None
    } else {
        Some(SseEvent {
            data,
            event_type,
            last_event_id,
            is_closed: false,
        })
    }
}