Skip to main content

pipa/runtime/
io_reactor.rs

1use std::collections::{HashMap, VecDeque};
2use std::io::{ErrorKind, Read, Write};
3use std::os::unix::io::RawFd;
4
5use crate::http::conn::Connection;
6use crate::http::connect_state::{ConnEvent, ConnectState};
7use crate::http::headers::Headers;
8use crate::http::method::HttpMethod;
9use crate::http::request::{HttpRequest, RequestEvent, RequestState};
10use crate::http::response::HttpResponse;
11use crate::http::status::HttpStatus;
12use crate::http::url::Url;
13use crate::http::ws::frame::{OpCode, WsFrame};
14use crate::http::ws::handshake::WsHandshake;
15use crate::util::iomux::Poller;
16
17use super::context::JSContext;
18use super::extension::MacroTaskExtension;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum PollEvent {
22    Readable,
23    Writable,
24    Error,
25}
26
27pub enum ReactorTask {
28    Fetch(FetchTask),
29    Ws(WsTask),
30    Sse(SseTask),
31}
32
33pub struct FetchTask {
34    phase: FetchPhase,
35    promise_ptr: usize,
36    url: Url,
37    method: HttpMethod,
38    req_headers: Headers,
39    body: Option<Vec<u8>>,
40}
41
42pub enum FetchPhase {
43    Connecting(ConnectState),
44    Active(HttpRequest),
45}
46
47pub struct WsTask {
48    phase: WsPhase,
49    ws_obj_ptr: usize,
50    promise_ptr: usize,
51    url: Url,
52}
53
54pub enum WsPhase {
55    Connecting(ConnectState),
56    Handshake {
57        conn: Connection,
58        key: String,
59        write_buf: Vec<u8>,
60        write_pos: usize,
61        read_data: Vec<u8>,
62    },
63    Open {
64        conn: Connection,
65        read_buf: [u8; 8192],
66        read_data: Vec<u8>,
67        pending_out: VecDeque<WsFrame>,
68        closing: bool,
69        close_code: u16,
70        close_reason: String,
71    },
72}
73
74pub struct SseTask {
75    phase: SsePhase,
76    es_obj_ptr: usize,
77    promise_ptr: usize,
78    url: Url,
79}
80
81pub enum SsePhase {
82    Connecting(ConnectState),
83    WritingRequest {
84        conn: Connection,
85        write_buf: Vec<u8>,
86        write_pos: usize,
87    },
88    ReadingHeaders {
89        conn: Connection,
90        read_buf: [u8; 8192],
91        read_data: Vec<u8>,
92    },
93    Streaming {
94        conn: Connection,
95        read_buf: [u8; 8192],
96        read_data: Vec<u8>,
97        sse_buf: Vec<u8>,
98    },
99}
100
101impl ReactorTask {
102    pub fn fd(&self) -> Option<RawFd> {
103        match self {
104            ReactorTask::Fetch(t) => match &t.phase {
105                FetchPhase::Connecting(cs) => cs.fd(),
106                FetchPhase::Active(req) => req.fd(),
107            },
108            ReactorTask::Ws(t) => match &t.phase {
109                WsPhase::Connecting(cs) => cs.fd(),
110                WsPhase::Handshake { conn, .. } => Some(conn.raw_fd()),
111                WsPhase::Open { conn, .. } => Some(conn.raw_fd()),
112            },
113            ReactorTask::Sse(t) => match &t.phase {
114                SsePhase::Connecting(cs) => cs.fd(),
115                SsePhase::WritingRequest { conn, .. } => Some(conn.raw_fd()),
116                SsePhase::ReadingHeaders { conn, .. } => Some(conn.raw_fd()),
117                SsePhase::Streaming { conn, .. } => Some(conn.raw_fd()),
118            },
119        }
120    }
121
122    pub fn wants_read(&self) -> bool {
123        match self {
124            ReactorTask::Fetch(t) => match &t.phase {
125                FetchPhase::Connecting(cs) => cs.wants_read(),
126                FetchPhase::Active(req) => req.wants_read(),
127            },
128            ReactorTask::Ws(t) => match &t.phase {
129                WsPhase::Connecting(cs) => cs.wants_read(),
130                WsPhase::Handshake { .. } => true,
131                WsPhase::Open { .. } => true,
132            },
133            ReactorTask::Sse(t) => match &t.phase {
134                SsePhase::Connecting(cs) => cs.wants_read(),
135                SsePhase::WritingRequest { .. } => false,
136                SsePhase::ReadingHeaders { .. } => true,
137                SsePhase::Streaming { .. } => true,
138            },
139        }
140    }
141
142    pub fn wants_write(&self) -> bool {
143        match self {
144            ReactorTask::Fetch(t) => match &t.phase {
145                FetchPhase::Connecting(cs) => cs.wants_write(),
146                FetchPhase::Active(req) => req.wants_write(),
147            },
148            ReactorTask::Ws(t) => match &t.phase {
149                WsPhase::Connecting(cs) => cs.wants_write(),
150                WsPhase::Handshake {
151                    write_buf,
152                    write_pos,
153                    ..
154                } => *write_pos < write_buf.len(),
155                WsPhase::Open { pending_out, .. } => !pending_out.is_empty(),
156            },
157            ReactorTask::Sse(t) => match &t.phase {
158                SsePhase::Connecting(cs) => cs.wants_write(),
159                SsePhase::WritingRequest {
160                    write_buf,
161                    write_pos,
162                    ..
163                } => *write_pos < write_buf.len(),
164                SsePhase::ReadingHeaders { .. } => false,
165                SsePhase::Streaming { .. } => false,
166            },
167        }
168    }
169
170    fn task_type(&self) -> &str {
171        match self {
172            ReactorTask::Fetch(_) => "fetch",
173            ReactorTask::Ws(_) => "ws",
174            ReactorTask::Sse(_) => "sse",
175        }
176    }
177}
178
179impl FetchTask {
180    pub fn new(
181        url: Url,
182        method: HttpMethod,
183        req_headers: Headers,
184        body: Option<Vec<u8>>,
185        promise_ptr: usize,
186    ) -> Result<Self, String> {
187        let use_tls = url.is_tls();
188        let cs = ConnectState::new(&url.host, url.port, use_tls, Vec::new())?;
189        Ok(FetchTask {
190            phase: FetchPhase::Connecting(cs),
191            promise_ptr,
192            url,
193            method,
194            req_headers,
195            body,
196        })
197    }
198}
199
200impl WsTask {
201    pub fn new(url: Url, ws_obj_ptr: usize, promise_ptr: usize) -> Result<Self, String> {
202        let use_tls = url.is_tls();
203        let cs = ConnectState::new(&url.host, url.port, use_tls, Vec::new())?;
204        Ok(WsTask {
205            phase: WsPhase::Connecting(cs),
206            ws_obj_ptr,
207            promise_ptr,
208            url,
209        })
210    }
211}
212
213impl SseTask {
214    pub fn new(url: Url, es_obj_ptr: usize, promise_ptr: usize) -> Result<Self, String> {
215        let use_tls = url.is_tls();
216        let cs = ConnectState::new(&url.host, url.port, use_tls, Vec::new())?;
217        Ok(SseTask {
218            phase: SsePhase::Connecting(cs),
219            es_obj_ptr,
220            promise_ptr,
221            url,
222        })
223    }
224}
225
226pub struct IoReactor {
227    poller: Poller,
228    tasks: HashMap<u64, ReactorTask>,
229    next_id: u64,
230}
231
232impl IoReactor {
233    pub fn new() -> Result<Self, String> {
234        let poller = Poller::new()?;
235        Ok(IoReactor {
236            poller,
237            tasks: HashMap::new(),
238            next_id: 1,
239        })
240    }
241
242    pub fn get_from_ctx(ctx: &mut JSContext) -> Option<&mut IoReactor> {
243        let el = ctx.event_loop_mut();
244        for ext in &mut el.extensions {
245            let any = ext.as_any_mut();
246            if any.is::<IoReactor>() {
247                return any.downcast_mut::<IoReactor>();
248            }
249        }
250        None
251    }
252
253    pub fn register(&mut self, task: ReactorTask) -> Result<u64, String> {
254        let id = self.next_id;
255        self.next_id += 1;
256
257        if let Some(fd) = task.fd() {
258            self.poller
259                .register(fd, task.wants_read(), task.wants_write())?;
260        }
261
262        self.tasks.insert(id, task);
263        Ok(id)
264    }
265
266    fn unregister(&mut self, id: u64) -> Option<ReactorTask> {
267        let task = self.tasks.remove(&id);
268        if let Some(ref t) = task {
269            if let Some(fd) = t.fd() {
270                let _ = self.poller.unregister(fd);
271            }
272        }
273        task
274    }
275
276    fn reregister(&mut self, id: u64, task: ReactorTask) -> Result<(), String> {
277        let old = self.tasks.remove(&id);
278        let old_fd = old.as_ref().and_then(|t| t.fd());
279        let new_fd = task.fd();
280
281        if let Some(fd) = new_fd {
282            let _ = self.poller.unregister(fd);
283            self.poller
284                .register(fd, task.wants_read(), task.wants_write())?;
285        } else if let Some(ofd) = old_fd {
286            let _ = self.poller.unregister(ofd);
287        }
288
289        self.tasks.insert(id, task);
290        Ok(())
291    }
292
293    fn poll(&mut self, timeout_ms: i32) -> Result<Vec<(u64, String, PollEvent)>, String> {
294        if self.tasks.is_empty() {
295            return Ok(Vec::new());
296        }
297
298        let events = self.poller.wait(timeout_ms)?;
299        let mut results = Vec::new();
300
301        for event in &events {
302            let ids_to_check: Vec<u64> = self
303                .tasks
304                .iter()
305                .filter(|(_, t)| t.fd() == Some(event.fd))
306                .map(|(id, _)| *id)
307                .collect();
308
309            for id in ids_to_check {
310                let task_type = self.tasks.get(&id).map(|t| t.task_type().to_string());
311                if event.error {
312                    results.push((id, task_type.unwrap_or_default(), PollEvent::Error));
313                } else if event.readable {
314                    results.push((id, task_type.unwrap_or_default(), PollEvent::Readable));
315                } else if event.writable {
316                    results.push((id, task_type.unwrap_or_default(), PollEvent::Writable));
317                }
318            }
319        }
320
321        Ok(results)
322    }
323
324    pub fn ws_send(&mut self, ws_obj_ptr: usize, data: Vec<u8>) {
325        let frame = WsFrame::new_text(data);
326        for (_, task) in &mut self.tasks {
327            if let ReactorTask::Ws(ws_task) = task {
328                if ws_task.ws_obj_ptr == ws_obj_ptr {
329                    if let WsPhase::Open { pending_out, .. } = &mut ws_task.phase {
330                        pending_out.push_back(frame);
331                    }
332                    break;
333                }
334            }
335        }
336    }
337
338    pub fn ws_close(&mut self, ws_obj_ptr: usize, code: u16, reason: &str) {
339        for (_, task) in &mut self.tasks {
340            if let ReactorTask::Ws(ws_task) = task {
341                if ws_task.ws_obj_ptr == ws_obj_ptr {
342                    if let WsPhase::Open {
343                        pending_out,
344                        closing,
345                        close_code,
346                        close_reason,
347                        ..
348                    } = &mut ws_task.phase
349                    {
350                        if !*closing {
351                            let frame = WsFrame::new_close(code, reason);
352                            pending_out.push_back(frame);
353                            *closing = true;
354                            *close_code = code;
355                            *close_reason = reason.to_string();
356                        }
357                    }
358                }
359            }
360        }
361    }
362
363    pub fn sse_close(&mut self, es_obj_ptr: usize) {
364        let ids_to_remove: Vec<u64> = self
365            .tasks
366            .iter()
367            .filter(|(_, t)| {
368                if let ReactorTask::Sse(sse_task) = t {
369                    sse_task.es_obj_ptr == es_obj_ptr
370                } else {
371                    false
372                }
373            })
374            .map(|(id, _)| *id)
375            .collect();
376        for id in ids_to_remove {
377            self.unregister(id);
378        }
379    }
380
381    fn advance_task(&mut self, ctx: &mut JSContext, id: u64) -> Result<(), String> {
382        let task_type = self
383            .tasks
384            .get(&id)
385            .map(|t| t.task_type().to_string())
386            .unwrap_or_default();
387
388        match task_type.as_str() {
389            "fetch" => self.advance_fetch(ctx, id),
390            "ws" => self.advance_ws(ctx, id),
391            "sse" => self.advance_sse(ctx, id),
392            _ => Ok(()),
393        }
394    }
395
396    fn advance_fetch(&mut self, ctx: &mut JSContext, id: u64) -> Result<(), String> {
397        let task = self.tasks.remove(&id);
398        let mut task = match task {
399            Some(t) => t,
400            None => return Ok(()),
401        };
402
403        if let ReactorTask::Fetch(fetch_task) = &mut task {
404            let phase = std::mem::replace(
405                &mut fetch_task.phase,
406                FetchPhase::Active(HttpRequest::dummy()),
407            );
408            match phase {
409                FetchPhase::Connecting(mut cs) => match cs.try_advance() {
410                    ConnEvent::Connected(conn) => {
411                        let conn = set_conn_nonblocking(conn)?;
412                        let mut req = HttpRequest::new(
413                            fetch_task.url.clone(),
414                            fetch_task.method,
415                            fetch_task.req_headers.clone(),
416                            fetch_task.body.clone(),
417                        );
418                        req.conn = Some(conn);
419                        req.state = RequestState::WritingRequest;
420                        req.build_request();
421                        fetch_task.phase = FetchPhase::Active(req);
422                        self.reregister(id, task)?;
423                        let _ = self.advance_task(ctx, id);
424                        return Ok(());
425                    }
426                    ConnEvent::Error(e) => {
427                        let val = crate::value::JSValue::new_string(ctx.intern(&e));
428                        crate::builtins::promise::reject_promise_with_value(
429                            ctx,
430                            fetch_task.promise_ptr,
431                            val,
432                        );
433                        return Ok(());
434                    }
435                    _ => {
436                        fetch_task.phase = FetchPhase::Connecting(cs);
437                        self.reregister(id, task)?;
438                        return Ok(());
439                    }
440                },
441                FetchPhase::Active(mut req) => match req.try_advance() {
442                    Ok(RequestEvent::Complete(resp)) => {
443                        let resp_val = build_response_js_object(ctx, resp, &fetch_task.url.full);
444                        crate::builtins::promise::fulfill_promise_with_value(
445                            ctx,
446                            fetch_task.promise_ptr,
447                            resp_val,
448                        );
449                    }
450                    Ok(RequestEvent::Error(e)) => {
451                        let val = crate::value::JSValue::new_string(ctx.intern(&e));
452                        crate::builtins::promise::reject_promise_with_value(
453                            ctx,
454                            fetch_task.promise_ptr,
455                            val,
456                        );
457                    }
458                    Ok(_) => {
459                        fetch_task.phase = FetchPhase::Active(req);
460                        self.reregister(id, task)?;
461                    }
462                    Err(e) => {
463                        let val = crate::value::JSValue::new_string(ctx.intern(&e));
464                        crate::builtins::promise::reject_promise_with_value(
465                            ctx,
466                            fetch_task.promise_ptr,
467                            val,
468                        );
469                    }
470                },
471            }
472        }
473        Ok(())
474    }
475
476    fn advance_ws(&mut self, ctx: &mut JSContext, id: u64) -> Result<(), String> {
477        let task = self.tasks.remove(&id);
478        let mut task = match task {
479            Some(t) => t,
480            None => return Ok(()),
481        };
482
483        if let ReactorTask::Ws(ws_task) = &mut task {
484            let phase = std::mem::replace(
485                &mut ws_task.phase,
486                WsPhase::Connecting(ConnectState::dummy()),
487            );
488
489            match phase {
490                WsPhase::Connecting(mut cs) => match cs.try_advance() {
491                    ConnEvent::Connected(conn) => {
492                        let conn = set_conn_nonblocking(conn)?;
493                        let key = WsHandshake::generate_key();
494                        let path = ws_task.url.request_target();
495                        let host = if ws_task.url.port != 80 && ws_task.url.port != 443 {
496                            format!("{}:{}", ws_task.url.host, ws_task.url.port)
497                        } else {
498                            ws_task.url.host.clone()
499                        };
500                        let mut hdrs = WsHandshake::build_request(&host, &path, &key);
501                        if !hdrs.contains("user-agent") {
502                            hdrs.set("User-Agent", "pipa/0.1");
503                        }
504                        let mut write_buf = Vec::new();
505                        write_buf.extend_from_slice(b"GET ");
506                        write_buf.extend_from_slice(path.as_bytes());
507                        write_buf.extend_from_slice(b" HTTP/1.1\r\n");
508                        write_buf.extend_from_slice(hdrs.to_request_bytes().as_ref());
509                        write_buf.extend_from_slice(b"\r\n");
510                        ws_task.phase = WsPhase::Handshake {
511                            conn,
512                            key,
513                            write_buf,
514                            write_pos: 0,
515                            read_data: Vec::new(),
516                        };
517                        self.reregister(id, task)?;
518                        let _ = self.advance_task(ctx, id);
519                        return Ok(());
520                    }
521                    ConnEvent::Error(e) => {
522                        let val = crate::value::JSValue::new_string(ctx.intern(&e));
523                        crate::builtins::promise::reject_promise_with_value(
524                            ctx,
525                            ws_task.promise_ptr,
526                            val,
527                        );
528                        return Ok(());
529                    }
530                    _ => {
531                        ws_task.phase = WsPhase::Connecting(cs);
532                        self.reregister(id, task)?;
533                        return Ok(());
534                    }
535                },
536                WsPhase::Handshake {
537                    mut conn,
538                    key,
539                    write_buf,
540                    mut write_pos,
541                    mut read_data,
542                } => {
543                    if write_pos < write_buf.len() {
544                        let remaining = &write_buf[write_pos..];
545                        match conn.write(remaining) {
546                            Ok(n) => {
547                                write_pos += n;
548                            }
549                            Err(e) if e.kind() == ErrorKind::WouldBlock => {}
550                            Err(e) => {
551                                let val = crate::value::JSValue::new_string(
552                                    ctx.intern(&format!("ws write: {e}")),
553                                );
554                                crate::builtins::promise::reject_promise_with_value(
555                                    ctx,
556                                    ws_task.promise_ptr,
557                                    val,
558                                );
559                                return Ok(());
560                            }
561                        }
562                    }
563                    if write_pos >= write_buf.len() {
564                        let mut tmp = [0u8; 8192];
565                        match conn.read(&mut tmp) {
566                            Ok(0) => {
567                                let val = crate::value::JSValue::new_string(
568                                    ctx.intern("ws handshake closed"),
569                                );
570                                crate::builtins::promise::reject_promise_with_value(
571                                    ctx,
572                                    ws_task.promise_ptr,
573                                    val,
574                                );
575                                return Ok(());
576                            }
577                            Ok(n) => {
578                                read_data.extend_from_slice(&tmp[..n]);
579                                if let Some(pos) =
580                                    read_data.windows(4).position(|w| w == b"\r\n\r\n")
581                                {
582                                    let status_line_end = read_data
583                                        .windows(2)
584                                        .position(|w| w == b"\r\n")
585                                        .unwrap_or(0);
586                                    let status_line = &read_data[..status_line_end];
587                                    let code = if status_line.len() >= 12 {
588                                        String::from_utf8_lossy(&status_line[9..12])
589                                            .parse::<u16>()
590                                            .unwrap_or(0)
591                                    } else {
592                                        0
593                                    };
594                                    let status = HttpStatus(code);
595                                    let header_bytes = &read_data[status_line_end + 2..pos + 4];
596                                    let (headers, _) = Headers::from_bytes(header_bytes)?;
597                                    let accept = WsHandshake::validate_response(status, &headers)?;
598                                    if !WsHandshake::verify_accept(&key, &accept) {
599                                        let val = crate::value::JSValue::new_string(
600                                            ctx.intern("ws accept mismatch"),
601                                        );
602                                        crate::builtins::promise::reject_promise_with_value(
603                                            ctx,
604                                            ws_task.promise_ptr,
605                                            val,
606                                        );
607                                        return Ok(());
608                                    }
609
610                                    let ws_obj = unsafe {
611                                        &mut *(ws_task.ws_obj_ptr
612                                            as *mut crate::object::object::JSObject)
613                                    };
614                                    ws_obj.set(
615                                        ctx.intern("readyState"),
616                                        crate::value::JSValue::new_int(1),
617                                    );
618
619                                    let val = crate::value::JSValue::new_object(ws_task.ws_obj_ptr);
620                                    crate::builtins::promise::fulfill_promise_with_value(
621                                        ctx,
622                                        ws_task.promise_ptr,
623                                        val,
624                                    );
625
626                                    let onopen = ws_obj
627                                        .get(ctx.intern("onopen"))
628                                        .unwrap_or(crate::value::JSValue::undefined());
629                                    if onopen.is_function() {
630                                        ctx.event_loop_mut().schedule_macrotask(onopen, vec![]);
631                                    }
632
633                                    ws_task.phase = WsPhase::Open {
634                                        conn,
635                                        read_buf: [0u8; 8192],
636                                        read_data: Vec::new(),
637                                        pending_out: VecDeque::new(),
638                                        closing: false,
639                                        close_code: 0,
640                                        close_reason: String::new(),
641                                    };
642                                    self.reregister(id, task)?;
643                                    return Ok(());
644                                }
645                            }
646                            Err(e) if e.kind() == ErrorKind::WouldBlock => {}
647                            Err(e) => {
648                                let val = crate::value::JSValue::new_string(
649                                    ctx.intern(&format!("ws handshake read: {e}")),
650                                );
651                                crate::builtins::promise::reject_promise_with_value(
652                                    ctx,
653                                    ws_task.promise_ptr,
654                                    val,
655                                );
656                                return Ok(());
657                            }
658                        }
659                    }
660                    ws_task.phase = WsPhase::Handshake {
661                        conn,
662                        key,
663                        write_buf,
664                        write_pos,
665                        read_data,
666                    };
667                    self.reregister(id, task)?;
668                    return Ok(());
669                }
670                WsPhase::Open {
671                    mut conn,
672                    mut read_buf,
673                    mut read_data,
674                    mut pending_out,
675                    closing,
676                    close_code,
677                    close_reason,
678                } => {
679                    while let Some(frame) = pending_out.pop_front() {
680                        let encoded = frame.encode();
681                        match conn.write_all(&encoded) {
682                            Ok(_) => {}
683                            Err(e) if e.kind() == ErrorKind::WouldBlock => {
684                                pending_out.push_front(frame);
685                                break;
686                            }
687                            Err(e) => {
688                                dispatch_ws_error(
689                                    ctx,
690                                    ws_task.ws_obj_ptr,
691                                    &format!("ws write: {e}"),
692                                );
693                                return Ok(());
694                            }
695                        }
696                    }
697
698                    if closing && pending_out.is_empty() {
699                        let ws_obj = unsafe {
700                            &mut *(ws_task.ws_obj_ptr as *mut crate::object::object::JSObject)
701                        };
702                        ws_obj.set(ctx.intern("readyState"), crate::value::JSValue::new_int(3));
703                        dispatch_ws_close(ctx, ws_task.ws_obj_ptr, close_code, close_reason);
704                        return Ok(());
705                    }
706
707                    match conn.read(&mut read_buf) {
708                        Ok(0) => {
709                            let ws_obj = unsafe {
710                                &mut *(ws_task.ws_obj_ptr as *mut crate::object::object::JSObject)
711                            };
712                            ws_obj.set(ctx.intern("readyState"), crate::value::JSValue::new_int(3));
713                            dispatch_ws_close(
714                                ctx,
715                                ws_task.ws_obj_ptr,
716                                1006,
717                                "connection closed".into(),
718                            );
719                            return Ok(());
720                        }
721                        Ok(n) => {
722                            read_data.extend_from_slice(&read_buf[..n]);
723                            if let Ok(frames) = WsFrame::parse_all(&read_data) {
724                                let consumed = calculate_consumed(&frames);
725                                read_data.drain(..consumed);
726                                for frame in frames {
727                                    match frame.opcode {
728                                        OpCode::Text => {
729                                            let text = String::from_utf8_lossy(&frame.payload);
730                                            dispatch_ws_message(
731                                                ctx,
732                                                ws_task.ws_obj_ptr,
733                                                &text,
734                                                true,
735                                            );
736                                        }
737                                        OpCode::Binary => {
738                                            let text = String::from_utf8_lossy(&frame.payload);
739                                            dispatch_ws_message(
740                                                ctx,
741                                                ws_task.ws_obj_ptr,
742                                                &text,
743                                                false,
744                                            );
745                                        }
746                                        OpCode::Ping => {
747                                            let pong = WsFrame::new_pong(frame.payload);
748                                            pending_out.push_back(pong);
749                                        }
750                                        OpCode::Close => {
751                                            let (code, reason) =
752                                                parse_close_payload(&frame.payload);
753                                            let close_frame = WsFrame::new_close(code, &reason);
754                                            pending_out.push_back(close_frame);
755                                            ws_task.phase = WsPhase::Open {
756                                                conn,
757                                                read_buf,
758                                                read_data,
759                                                pending_out,
760                                                closing: true,
761                                                close_code: code,
762                                                close_reason: reason,
763                                            };
764                                            self.reregister(id, task)?;
765                                            return Ok(());
766                                        }
767                                        OpCode::Pong | OpCode::Continuation => {}
768                                    }
769                                }
770                            }
771                        }
772                        Err(e) if e.kind() == ErrorKind::WouldBlock => {}
773                        Err(e) => {
774                            dispatch_ws_error(ctx, ws_task.ws_obj_ptr, &format!("ws read: {e}"));
775                            return Ok(());
776                        }
777                    }
778                    ws_task.phase = WsPhase::Open {
779                        conn,
780                        read_buf,
781                        read_data,
782                        pending_out,
783                        closing,
784                        close_code,
785                        close_reason,
786                    };
787                    self.reregister(id, task)?;
788                    return Ok(());
789                }
790            }
791        }
792        Ok(())
793    }
794
795    fn advance_sse(&mut self, ctx: &mut JSContext, id: u64) -> Result<(), String> {
796        let task = self.tasks.remove(&id);
797        let mut task = match task {
798            Some(t) => t,
799            None => return Ok(()),
800        };
801
802        if let ReactorTask::Sse(sse_task) = &mut task {
803            let phase = std::mem::replace(
804                &mut sse_task.phase,
805                SsePhase::Connecting(ConnectState::dummy()),
806            );
807
808            match phase {
809                SsePhase::Connecting(mut cs) => match cs.try_advance() {
810                    ConnEvent::Connected(conn) => {
811                        let conn = set_conn_nonblocking(conn)?;
812                        let path = sse_task.url.request_target();
813                        let host_header = format!("{}:{}", sse_task.url.host, sse_task.url.port);
814                        let mut write_buf = Vec::new();
815                        write_buf.extend_from_slice(b"GET ");
816                        write_buf.extend_from_slice(path.as_bytes());
817                        write_buf.extend_from_slice(b" HTTP/1.1\r\nHost: ");
818                        write_buf.extend_from_slice(host_header.as_bytes());
819                        write_buf.extend_from_slice(b"\r\nAccept: text/event-stream\r\nCache-Control: no-cache\r\nConnection: keep-alive\r\n\r\n");
820                        sse_task.phase = SsePhase::WritingRequest {
821                            conn,
822                            write_buf,
823                            write_pos: 0,
824                        };
825                        self.reregister(id, task)?;
826                        let _ = self.advance_task(ctx, id);
827                        return Ok(());
828                    }
829                    ConnEvent::Error(e) => {
830                        let val = crate::value::JSValue::new_string(ctx.intern(&e));
831                        crate::builtins::promise::reject_promise_with_value(
832                            ctx,
833                            sse_task.promise_ptr,
834                            val,
835                        );
836                        return Ok(());
837                    }
838                    _ => {
839                        sse_task.phase = SsePhase::Connecting(cs);
840                        self.reregister(id, task)?;
841                        return Ok(());
842                    }
843                },
844                SsePhase::WritingRequest {
845                    mut conn,
846                    write_buf,
847                    mut write_pos,
848                } => {
849                    if write_pos < write_buf.len() {
850                        let remaining = &write_buf[write_pos..];
851                        match conn.write(remaining) {
852                            Ok(n) => {
853                                write_pos += n;
854                            }
855                            Err(e) if e.kind() == ErrorKind::WouldBlock => {}
856                            Err(e) => {
857                                let val = crate::value::JSValue::new_string(
858                                    ctx.intern(&format!("sse write: {e}")),
859                                );
860                                crate::builtins::promise::reject_promise_with_value(
861                                    ctx,
862                                    sse_task.promise_ptr,
863                                    val,
864                                );
865                                return Ok(());
866                            }
867                        }
868                    }
869                    if write_pos >= write_buf.len() {
870                        sse_task.phase = SsePhase::ReadingHeaders {
871                            conn,
872                            read_buf: [0u8; 8192],
873                            read_data: Vec::new(),
874                        };
875                        self.reregister(id, task)?;
876                        return Ok(());
877                    }
878                    sse_task.phase = SsePhase::WritingRequest {
879                        conn,
880                        write_buf,
881                        write_pos,
882                    };
883                    self.reregister(id, task)?;
884                    return Ok(());
885                }
886                SsePhase::ReadingHeaders {
887                    mut conn,
888                    mut read_buf,
889                    mut read_data,
890                } => {
891                    match conn.read(&mut read_buf) {
892                        Ok(0) => {
893                            let val = crate::value::JSValue::new_string(
894                                ctx.intern("sse closed during headers"),
895                            );
896                            crate::builtins::promise::reject_promise_with_value(
897                                ctx,
898                                sse_task.promise_ptr,
899                                val,
900                            );
901                            return Ok(());
902                        }
903                        Ok(n) => {
904                            read_data.extend_from_slice(&read_buf[..n]);
905                            if let Some(pos) = read_data.windows(4).position(|w| w == b"\r\n\r\n") {
906                                let header_str = String::from_utf8_lossy(&read_data[..pos]);
907                                if !header_str.contains("200") && !header_str.contains("201") {
908                                    let val =
909                                        crate::value::JSValue::new_string(ctx.intern(&format!(
910                                            "sse bad status: {}",
911                                            header_str.lines().next().unwrap_or("")
912                                        )));
913                                    crate::builtins::promise::reject_promise_with_value(
914                                        ctx,
915                                        sse_task.promise_ptr,
916                                        val,
917                                    );
918                                    return Ok(());
919                                }
920
921                                let val = crate::value::JSValue::new_object(sse_task.es_obj_ptr);
922                                crate::builtins::promise::fulfill_promise_with_value(
923                                    ctx,
924                                    sse_task.promise_ptr,
925                                    val,
926                                );
927
928                                let es_obj = unsafe {
929                                    &mut *(sse_task.es_obj_ptr
930                                        as *mut crate::object::object::JSObject)
931                                };
932                                es_obj.set(
933                                    ctx.intern("readyState"),
934                                    crate::value::JSValue::new_int(1),
935                                );
936                                let onopen = es_obj
937                                    .get(ctx.intern("onopen"))
938                                    .unwrap_or(crate::value::JSValue::undefined());
939                                if onopen.is_function() {
940                                    ctx.event_loop_mut().schedule_macrotask(onopen, vec![]);
941                                }
942
943                                let remaining = read_data[pos + 4..].to_vec();
944                                sse_task.phase = SsePhase::Streaming {
945                                    conn,
946                                    read_buf: [0u8; 8192],
947                                    read_data: Vec::new(),
948                                    sse_buf: remaining,
949                                };
950                                self.reregister(id, task)?;
951                                return Ok(());
952                            }
953                        }
954                        Err(e) if e.kind() == ErrorKind::WouldBlock => {}
955                        Err(e) => {
956                            let val = crate::value::JSValue::new_string(
957                                ctx.intern(&format!("sse header read: {e}")),
958                            );
959                            crate::builtins::promise::reject_promise_with_value(
960                                ctx,
961                                sse_task.promise_ptr,
962                                val,
963                            );
964                            return Ok(());
965                        }
966                    }
967                    sse_task.phase = SsePhase::ReadingHeaders {
968                        conn,
969                        read_buf,
970                        read_data,
971                    };
972                    self.reregister(id, task)?;
973                    return Ok(());
974                }
975                SsePhase::Streaming {
976                    mut conn,
977                    mut read_buf,
978                    read_data: _,
979                    mut sse_buf,
980                } => {
981                    match conn.read(&mut read_buf) {
982                        Ok(0) => {
983                            let es_obj = unsafe {
984                                &mut *(sse_task.es_obj_ptr as *mut crate::object::object::JSObject)
985                            };
986                            es_obj.set(ctx.intern("readyState"), crate::value::JSValue::new_int(2));
987                            let onclose = es_obj
988                                .get(ctx.intern("onclose"))
989                                .unwrap_or(crate::value::JSValue::undefined());
990                            if onclose.is_function() {
991                                ctx.event_loop_mut().schedule_macrotask(onclose, vec![]);
992                            }
993                            return Ok(());
994                        }
995                        Ok(n) => {
996                            sse_buf.extend_from_slice(&read_buf[..n]);
997                            while let Some(event) =
998                                crate::builtins::eventsource::parse_sse_event(&mut sse_buf)
999                            {
1000                                if event.is_closed {
1001                                    let es_obj = unsafe {
1002                                        &mut *(sse_task.es_obj_ptr
1003                                            as *mut crate::object::object::JSObject)
1004                                    };
1005                                    es_obj.set(
1006                                        ctx.intern("readyState"),
1007                                        crate::value::JSValue::new_int(2),
1008                                    );
1009                                    let onclose = es_obj
1010                                        .get(ctx.intern("onclose"))
1011                                        .unwrap_or(crate::value::JSValue::undefined());
1012                                    if onclose.is_function() {
1013                                        ctx.event_loop_mut().schedule_macrotask(onclose, vec![]);
1014                                    }
1015                                    return Ok(());
1016                                }
1017                                dispatch_sse_event(ctx, sse_task.es_obj_ptr, &event);
1018                            }
1019                        }
1020                        Err(e) if e.kind() == ErrorKind::WouldBlock => {}
1021                        Err(e) => {
1022                            let es_obj = unsafe {
1023                                &mut *(sse_task.es_obj_ptr as *mut crate::object::object::JSObject)
1024                            };
1025                            let onerror = es_obj
1026                                .get(ctx.intern("onerror"))
1027                                .unwrap_or(crate::value::JSValue::undefined());
1028                            if onerror.is_function() {
1029                                let err_val =
1030                                    crate::value::JSValue::new_string(ctx.intern(&format!("{e}")));
1031                                ctx.event_loop_mut()
1032                                    .schedule_macrotask(onerror, vec![err_val]);
1033                            }
1034                            return Ok(());
1035                        }
1036                    }
1037                    sse_task.phase = SsePhase::Streaming {
1038                        conn,
1039                        read_buf,
1040                        read_data: Vec::new(),
1041                        sse_buf,
1042                    };
1043                    self.reregister(id, task)?;
1044                    return Ok(());
1045                }
1046            }
1047        }
1048        Ok(())
1049    }
1050
1051    pub fn is_empty(&self) -> bool {
1052        self.tasks.is_empty()
1053    }
1054}
1055
1056impl MacroTaskExtension for IoReactor {
1057    fn tick(&mut self, ctx: &mut JSContext) -> Result<bool, String> {
1058        if self.is_empty() {
1059            return Ok(false);
1060        }
1061
1062        let events = self.poll(1)?;
1063        let ids: Vec<u64> = events.iter().map(|(id, _, _)| *id).collect();
1064        for id in ids {
1065            let _ = self.advance_task(ctx, id);
1066        }
1067
1068        Ok(!events.is_empty())
1069    }
1070
1071    fn has_pending(&self) -> bool {
1072        !self.is_empty()
1073    }
1074
1075    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
1076        self
1077    }
1078}
1079
1080fn build_response_js_object(
1081    ctx: &mut JSContext,
1082    mut resp: HttpResponse,
1083    url: &str,
1084) -> crate::value::JSValue {
1085    let body = resp.body_reader.take_body();
1086    let body_text = String::from_utf8_lossy(&body).to_string();
1087
1088    let status = resp.status.0 as i64;
1089    let status_text = resp.status_text;
1090
1091    let mut obj = crate::object::object::JSObject::new();
1092    obj.set(ctx.intern("status"), crate::value::JSValue::new_int(status));
1093    obj.set(
1094        ctx.intern("ok"),
1095        crate::value::JSValue::bool(status >= 200 && status < 300),
1096    );
1097    obj.set(
1098        ctx.intern("statusText"),
1099        crate::value::JSValue::new_string(ctx.intern(&status_text)),
1100    );
1101    obj.set(
1102        ctx.intern("url"),
1103        crate::value::JSValue::new_string(ctx.intern(url)),
1104    );
1105    obj.set(
1106        ctx.intern("__body__"),
1107        crate::value::JSValue::new_string(ctx.intern(&body_text)),
1108    );
1109
1110    let text_fn = create_builtin_function(ctx, "response_text");
1111    obj.set(ctx.intern("text"), text_fn);
1112
1113    let json_fn = create_builtin_function(ctx, "response_json");
1114    obj.set(ctx.intern("json"), json_fn);
1115
1116    let ptr = Box::into_raw(Box::new(obj)) as usize;
1117    crate::value::JSValue::new_object(ptr)
1118}
1119
1120fn create_builtin_function(ctx: &mut JSContext, name: &str) -> crate::value::JSValue {
1121    let mut func = crate::object::function::JSFunction::new_builtin(ctx.intern(name), 1);
1122    func.set_builtin_marker(ctx, name);
1123    let ptr = Box::into_raw(Box::new(func)) as usize;
1124    ctx.runtime_mut().gc_heap_mut().track_function(ptr);
1125    crate::value::JSValue::new_function(ptr)
1126}
1127
1128fn dispatch_ws_message(ctx: &mut JSContext, ws_obj_ptr: usize, data: &str, _is_text: bool) {
1129    let ws_obj = unsafe { &*(ws_obj_ptr as *const crate::object::object::JSObject) };
1130    let on_msg = ws_obj
1131        .get(ctx.intern("onmessage"))
1132        .unwrap_or(crate::value::JSValue::undefined());
1133    if on_msg.is_function() {
1134        let msg_val = crate::value::JSValue::new_string(ctx.intern(data));
1135        ctx.event_loop_mut()
1136            .schedule_macrotask(on_msg, vec![msg_val]);
1137    }
1138}
1139
1140fn dispatch_ws_close(ctx: &mut JSContext, ws_obj_ptr: usize, code: u16, reason: String) {
1141    let ws_obj = unsafe { &*(ws_obj_ptr as *const crate::object::object::JSObject) };
1142    let on_close = ws_obj
1143        .get(ctx.intern("onclose"))
1144        .unwrap_or(crate::value::JSValue::undefined());
1145    if on_close.is_function() {
1146        let mut evt = crate::object::object::JSObject::new();
1147        evt.set(
1148            ctx.intern("code"),
1149            crate::value::JSValue::new_int(code as i64),
1150        );
1151        evt.set(
1152            ctx.intern("reason"),
1153            crate::value::JSValue::new_string(ctx.intern(&reason)),
1154        );
1155        let ptr = Box::into_raw(Box::new(evt)) as usize;
1156        let evt_val = crate::value::JSValue::new_object(ptr);
1157        ctx.event_loop_mut()
1158            .schedule_macrotask(on_close, vec![evt_val]);
1159    }
1160}
1161
1162fn dispatch_ws_error(ctx: &mut JSContext, ws_obj_ptr: usize, error: &str) {
1163    let ws_obj = unsafe { &*(ws_obj_ptr as *const crate::object::object::JSObject) };
1164    let on_error = ws_obj
1165        .get(ctx.intern("onerror"))
1166        .unwrap_or(crate::value::JSValue::undefined());
1167    if on_error.is_function() {
1168        let err_val = crate::value::JSValue::new_string(ctx.intern(error));
1169        ctx.event_loop_mut()
1170            .schedule_macrotask(on_error, vec![err_val]);
1171    }
1172}
1173
1174fn dispatch_sse_event(
1175    ctx: &mut JSContext,
1176    es_obj_ptr: usize,
1177    event: &crate::builtins::eventsource::SseEvent,
1178) {
1179    let es_obj = unsafe { &*(es_obj_ptr as *const crate::object::object::JSObject) };
1180    let on_msg = es_obj
1181        .get(ctx.intern("onmessage"))
1182        .unwrap_or(crate::value::JSValue::undefined());
1183    if on_msg.is_function() {
1184        let mut evt = crate::object::object::JSObject::new();
1185        evt.set(
1186            ctx.intern("data"),
1187            crate::value::JSValue::new_string(ctx.intern(&event.data)),
1188        );
1189        if !event.event_type.is_empty() {
1190            evt.set(
1191                ctx.intern("event"),
1192                crate::value::JSValue::new_string(ctx.intern(&event.event_type)),
1193            );
1194        }
1195        if !event.last_event_id.is_empty() {
1196            evt.set(
1197                ctx.intern("lastEventId"),
1198                crate::value::JSValue::new_string(ctx.intern(&event.last_event_id)),
1199            );
1200        }
1201        let ptr = Box::into_raw(Box::new(evt)) as usize;
1202        let evt_val = crate::value::JSValue::new_object(ptr);
1203        ctx.event_loop_mut()
1204            .schedule_macrotask(on_msg, vec![evt_val]);
1205    }
1206}
1207
1208fn set_conn_nonblocking(conn: Connection) -> Result<Connection, String> {
1209    conn.set_nonblocking(true)?;
1210    Ok(conn)
1211}
1212
1213fn parse_close_payload(payload: &[u8]) -> (u16, String) {
1214    if payload.len() >= 2 {
1215        let code = u16::from_be_bytes([payload[0], payload[1]]);
1216        let reason = if payload.len() > 2 {
1217            String::from_utf8_lossy(&payload[2..]).to_string()
1218        } else {
1219            String::new()
1220        };
1221        (code, reason)
1222    } else {
1223        (1005, String::new())
1224    }
1225}
1226
1227fn calculate_consumed(frames: &[WsFrame]) -> usize {
1228    let mut total = 0usize;
1229    for frame in frames {
1230        let mut frame_size = 2;
1231        let payload_len = frame.payload.len();
1232        if payload_len >= 126 && payload_len <= 0xFFFF {
1233            frame_size += 2;
1234        } else if payload_len > 0xFFFF {
1235            frame_size += 8;
1236        }
1237        if frame.mask.is_some() {
1238            frame_size += 4;
1239        }
1240        frame_size += payload_len;
1241        total += frame_size;
1242    }
1243    total
1244}