1use std::collections::{HashMap, VecDeque};
2use std::io::{ErrorKind, Read, Write};
3use std::os::unix::io::RawFd;
4
5use crate::http::conn::Connection;
6use crate::http::connect_state::{ConnEvent, ConnectState};
7use crate::http::headers::Headers;
8use crate::http::method::HttpMethod;
9use crate::http::request::{HttpRequest, RequestEvent, RequestState};
10use crate::http::response::HttpResponse;
11use crate::http::status::HttpStatus;
12use crate::http::url::Url;
13use crate::http::ws::frame::{OpCode, WsFrame};
14use crate::http::ws::handshake::WsHandshake;
15use crate::util::iomux::Poller;
16
17use super::context::JSContext;
18use super::extension::MacroTaskExtension;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum PollEvent {
22 Readable,
23 Writable,
24 Error,
25}
26
27pub enum ReactorTask {
28 Fetch(FetchTask),
29 Ws(WsTask),
30 Sse(SseTask),
31}
32
33pub struct FetchTask {
34 phase: FetchPhase,
35 promise_ptr: usize,
36 url: Url,
37 method: HttpMethod,
38 req_headers: Headers,
39 body: Option<Vec<u8>>,
40}
41
42pub enum FetchPhase {
43 Connecting(ConnectState),
44 Active(HttpRequest),
45}
46
47pub struct WsTask {
48 phase: WsPhase,
49 ws_obj_ptr: usize,
50 promise_ptr: usize,
51 url: Url,
52}
53
54pub enum WsPhase {
55 Connecting(ConnectState),
56 Handshake {
57 conn: Connection,
58 key: String,
59 write_buf: Vec<u8>,
60 write_pos: usize,
61 read_data: Vec<u8>,
62 },
63 Open {
64 conn: Connection,
65 read_buf: [u8; 8192],
66 read_data: Vec<u8>,
67 pending_out: VecDeque<WsFrame>,
68 closing: bool,
69 close_code: u16,
70 close_reason: String,
71 },
72}
73
74pub struct SseTask {
75 phase: SsePhase,
76 es_obj_ptr: usize,
77 promise_ptr: usize,
78 url: Url,
79}
80
81pub enum SsePhase {
82 Connecting(ConnectState),
83 WritingRequest {
84 conn: Connection,
85 write_buf: Vec<u8>,
86 write_pos: usize,
87 },
88 ReadingHeaders {
89 conn: Connection,
90 read_buf: [u8; 8192],
91 read_data: Vec<u8>,
92 },
93 Streaming {
94 conn: Connection,
95 read_buf: [u8; 8192],
96 read_data: Vec<u8>,
97 sse_buf: Vec<u8>,
98 },
99}
100
101impl ReactorTask {
102 pub fn fd(&self) -> Option<RawFd> {
103 match self {
104 ReactorTask::Fetch(t) => match &t.phase {
105 FetchPhase::Connecting(cs) => cs.fd(),
106 FetchPhase::Active(req) => req.fd(),
107 },
108 ReactorTask::Ws(t) => match &t.phase {
109 WsPhase::Connecting(cs) => cs.fd(),
110 WsPhase::Handshake { conn, .. } => Some(conn.raw_fd()),
111 WsPhase::Open { conn, .. } => Some(conn.raw_fd()),
112 },
113 ReactorTask::Sse(t) => match &t.phase {
114 SsePhase::Connecting(cs) => cs.fd(),
115 SsePhase::WritingRequest { conn, .. } => Some(conn.raw_fd()),
116 SsePhase::ReadingHeaders { conn, .. } => Some(conn.raw_fd()),
117 SsePhase::Streaming { conn, .. } => Some(conn.raw_fd()),
118 },
119 }
120 }
121
122 pub fn wants_read(&self) -> bool {
123 match self {
124 ReactorTask::Fetch(t) => match &t.phase {
125 FetchPhase::Connecting(cs) => cs.wants_read(),
126 FetchPhase::Active(req) => req.wants_read(),
127 },
128 ReactorTask::Ws(t) => match &t.phase {
129 WsPhase::Connecting(cs) => cs.wants_read(),
130 WsPhase::Handshake { .. } => true,
131 WsPhase::Open { .. } => true,
132 },
133 ReactorTask::Sse(t) => match &t.phase {
134 SsePhase::Connecting(cs) => cs.wants_read(),
135 SsePhase::WritingRequest { .. } => false,
136 SsePhase::ReadingHeaders { .. } => true,
137 SsePhase::Streaming { .. } => true,
138 },
139 }
140 }
141
142 pub fn wants_write(&self) -> bool {
143 match self {
144 ReactorTask::Fetch(t) => match &t.phase {
145 FetchPhase::Connecting(cs) => cs.wants_write(),
146 FetchPhase::Active(req) => req.wants_write(),
147 },
148 ReactorTask::Ws(t) => match &t.phase {
149 WsPhase::Connecting(cs) => cs.wants_write(),
150 WsPhase::Handshake {
151 write_buf,
152 write_pos,
153 ..
154 } => *write_pos < write_buf.len(),
155 WsPhase::Open { pending_out, .. } => !pending_out.is_empty(),
156 },
157 ReactorTask::Sse(t) => match &t.phase {
158 SsePhase::Connecting(cs) => cs.wants_write(),
159 SsePhase::WritingRequest {
160 write_buf,
161 write_pos,
162 ..
163 } => *write_pos < write_buf.len(),
164 SsePhase::ReadingHeaders { .. } => false,
165 SsePhase::Streaming { .. } => false,
166 },
167 }
168 }
169
170 fn task_type(&self) -> &str {
171 match self {
172 ReactorTask::Fetch(_) => "fetch",
173 ReactorTask::Ws(_) => "ws",
174 ReactorTask::Sse(_) => "sse",
175 }
176 }
177}
178
179impl FetchTask {
180 pub fn new(
181 url: Url,
182 method: HttpMethod,
183 req_headers: Headers,
184 body: Option<Vec<u8>>,
185 promise_ptr: usize,
186 ) -> Result<Self, String> {
187 let use_tls = url.is_tls();
188 let cs = ConnectState::new(&url.host, url.port, use_tls, Vec::new())?;
189 Ok(FetchTask {
190 phase: FetchPhase::Connecting(cs),
191 promise_ptr,
192 url,
193 method,
194 req_headers,
195 body,
196 })
197 }
198}
199
200impl WsTask {
201 pub fn new(url: Url, ws_obj_ptr: usize, promise_ptr: usize) -> Result<Self, String> {
202 let use_tls = url.is_tls();
203 let cs = ConnectState::new(&url.host, url.port, use_tls, Vec::new())?;
204 Ok(WsTask {
205 phase: WsPhase::Connecting(cs),
206 ws_obj_ptr,
207 promise_ptr,
208 url,
209 })
210 }
211}
212
213impl SseTask {
214 pub fn new(url: Url, es_obj_ptr: usize, promise_ptr: usize) -> Result<Self, String> {
215 let use_tls = url.is_tls();
216 let cs = ConnectState::new(&url.host, url.port, use_tls, Vec::new())?;
217 Ok(SseTask {
218 phase: SsePhase::Connecting(cs),
219 es_obj_ptr,
220 promise_ptr,
221 url,
222 })
223 }
224}
225
226pub struct IoReactor {
227 poller: Poller,
228 tasks: HashMap<u64, ReactorTask>,
229 next_id: u64,
230}
231
232impl IoReactor {
233 pub fn new() -> Result<Self, String> {
234 let poller = Poller::new()?;
235 Ok(IoReactor {
236 poller,
237 tasks: HashMap::new(),
238 next_id: 1,
239 })
240 }
241
242 pub fn get_from_ctx(ctx: &mut JSContext) -> Option<&mut IoReactor> {
243 let el = ctx.event_loop_mut();
244 for ext in &mut el.extensions {
245 let any = ext.as_any_mut();
246 if any.is::<IoReactor>() {
247 return any.downcast_mut::<IoReactor>();
248 }
249 }
250 None
251 }
252
253 pub fn register(&mut self, task: ReactorTask) -> Result<u64, String> {
254 let id = self.next_id;
255 self.next_id += 1;
256
257 if let Some(fd) = task.fd() {
258 self.poller
259 .register(fd, task.wants_read(), task.wants_write())?;
260 }
261
262 self.tasks.insert(id, task);
263 Ok(id)
264 }
265
266 fn unregister(&mut self, id: u64) -> Option<ReactorTask> {
267 let task = self.tasks.remove(&id);
268 if let Some(ref t) = task {
269 if let Some(fd) = t.fd() {
270 let _ = self.poller.unregister(fd);
271 }
272 }
273 task
274 }
275
276 fn reregister(&mut self, id: u64, task: ReactorTask) -> Result<(), String> {
277 let old = self.tasks.remove(&id);
278 let old_fd = old.as_ref().and_then(|t| t.fd());
279 let new_fd = task.fd();
280
281 if let Some(fd) = new_fd {
282 let _ = self.poller.unregister(fd);
283 self.poller
284 .register(fd, task.wants_read(), task.wants_write())?;
285 } else if let Some(ofd) = old_fd {
286 let _ = self.poller.unregister(ofd);
287 }
288
289 self.tasks.insert(id, task);
290 Ok(())
291 }
292
293 fn poll(&mut self, timeout_ms: i32) -> Result<Vec<(u64, String, PollEvent)>, String> {
294 if self.tasks.is_empty() {
295 return Ok(Vec::new());
296 }
297
298 let events = self.poller.wait(timeout_ms)?;
299 let mut results = Vec::new();
300
301 for event in &events {
302 let ids_to_check: Vec<u64> = self
303 .tasks
304 .iter()
305 .filter(|(_, t)| t.fd() == Some(event.fd))
306 .map(|(id, _)| *id)
307 .collect();
308
309 for id in ids_to_check {
310 let task_type = self.tasks.get(&id).map(|t| t.task_type().to_string());
311 if event.error {
312 results.push((id, task_type.unwrap_or_default(), PollEvent::Error));
313 } else if event.readable {
314 results.push((id, task_type.unwrap_or_default(), PollEvent::Readable));
315 } else if event.writable {
316 results.push((id, task_type.unwrap_or_default(), PollEvent::Writable));
317 }
318 }
319 }
320
321 Ok(results)
322 }
323
324 pub fn ws_send(&mut self, ws_obj_ptr: usize, data: Vec<u8>) {
325 let frame = WsFrame::new_text(data);
326 for (_, task) in &mut self.tasks {
327 if let ReactorTask::Ws(ws_task) = task {
328 if ws_task.ws_obj_ptr == ws_obj_ptr {
329 if let WsPhase::Open { pending_out, .. } = &mut ws_task.phase {
330 pending_out.push_back(frame);
331 }
332 break;
333 }
334 }
335 }
336 }
337
338 pub fn ws_close(&mut self, ws_obj_ptr: usize, code: u16, reason: &str) {
339 for (_, task) in &mut self.tasks {
340 if let ReactorTask::Ws(ws_task) = task {
341 if ws_task.ws_obj_ptr == ws_obj_ptr {
342 if let WsPhase::Open {
343 pending_out,
344 closing,
345 close_code,
346 close_reason,
347 ..
348 } = &mut ws_task.phase
349 {
350 if !*closing {
351 let frame = WsFrame::new_close(code, reason);
352 pending_out.push_back(frame);
353 *closing = true;
354 *close_code = code;
355 *close_reason = reason.to_string();
356 }
357 }
358 }
359 }
360 }
361 }
362
363 pub fn sse_close(&mut self, es_obj_ptr: usize) {
364 let ids_to_remove: Vec<u64> = self
365 .tasks
366 .iter()
367 .filter(|(_, t)| {
368 if let ReactorTask::Sse(sse_task) = t {
369 sse_task.es_obj_ptr == es_obj_ptr
370 } else {
371 false
372 }
373 })
374 .map(|(id, _)| *id)
375 .collect();
376 for id in ids_to_remove {
377 self.unregister(id);
378 }
379 }
380
381 fn advance_task(&mut self, ctx: &mut JSContext, id: u64) -> Result<(), String> {
382 let task_type = self
383 .tasks
384 .get(&id)
385 .map(|t| t.task_type().to_string())
386 .unwrap_or_default();
387
388 match task_type.as_str() {
389 "fetch" => self.advance_fetch(ctx, id),
390 "ws" => self.advance_ws(ctx, id),
391 "sse" => self.advance_sse(ctx, id),
392 _ => Ok(()),
393 }
394 }
395
396 fn advance_fetch(&mut self, ctx: &mut JSContext, id: u64) -> Result<(), String> {
397 let task = self.tasks.remove(&id);
398 let mut task = match task {
399 Some(t) => t,
400 None => return Ok(()),
401 };
402
403 if let ReactorTask::Fetch(fetch_task) = &mut task {
404 let phase = std::mem::replace(
405 &mut fetch_task.phase,
406 FetchPhase::Active(HttpRequest::dummy()),
407 );
408 match phase {
409 FetchPhase::Connecting(mut cs) => match cs.try_advance() {
410 ConnEvent::Connected(conn) => {
411 let conn = set_conn_nonblocking(conn)?;
412 let mut req = HttpRequest::new(
413 fetch_task.url.clone(),
414 fetch_task.method,
415 fetch_task.req_headers.clone(),
416 fetch_task.body.clone(),
417 );
418 req.conn = Some(conn);
419 req.state = RequestState::WritingRequest;
420 req.build_request();
421 fetch_task.phase = FetchPhase::Active(req);
422 self.reregister(id, task)?;
423 let _ = self.advance_task(ctx, id);
424 return Ok(());
425 }
426 ConnEvent::Error(e) => {
427 let val = crate::value::JSValue::new_string(ctx.intern(&e));
428 crate::builtins::promise::reject_promise_with_value(
429 ctx,
430 fetch_task.promise_ptr,
431 val,
432 );
433 return Ok(());
434 }
435 _ => {
436 fetch_task.phase = FetchPhase::Connecting(cs);
437 self.reregister(id, task)?;
438 return Ok(());
439 }
440 },
441 FetchPhase::Active(mut req) => match req.try_advance() {
442 Ok(RequestEvent::Complete(resp)) => {
443 let resp_val = build_response_js_object(ctx, resp, &fetch_task.url.full);
444 crate::builtins::promise::fulfill_promise_with_value(
445 ctx,
446 fetch_task.promise_ptr,
447 resp_val,
448 );
449 }
450 Ok(RequestEvent::Error(e)) => {
451 let val = crate::value::JSValue::new_string(ctx.intern(&e));
452 crate::builtins::promise::reject_promise_with_value(
453 ctx,
454 fetch_task.promise_ptr,
455 val,
456 );
457 }
458 Ok(_) => {
459 fetch_task.phase = FetchPhase::Active(req);
460 self.reregister(id, task)?;
461 }
462 Err(e) => {
463 let val = crate::value::JSValue::new_string(ctx.intern(&e));
464 crate::builtins::promise::reject_promise_with_value(
465 ctx,
466 fetch_task.promise_ptr,
467 val,
468 );
469 }
470 },
471 }
472 }
473 Ok(())
474 }
475
476 fn advance_ws(&mut self, ctx: &mut JSContext, id: u64) -> Result<(), String> {
477 let task = self.tasks.remove(&id);
478 let mut task = match task {
479 Some(t) => t,
480 None => return Ok(()),
481 };
482
483 if let ReactorTask::Ws(ws_task) = &mut task {
484 let phase = std::mem::replace(
485 &mut ws_task.phase,
486 WsPhase::Connecting(ConnectState::dummy()),
487 );
488
489 match phase {
490 WsPhase::Connecting(mut cs) => match cs.try_advance() {
491 ConnEvent::Connected(conn) => {
492 let conn = set_conn_nonblocking(conn)?;
493 let key = WsHandshake::generate_key();
494 let path = ws_task.url.request_target();
495 let host = if ws_task.url.port != 80 && ws_task.url.port != 443 {
496 format!("{}:{}", ws_task.url.host, ws_task.url.port)
497 } else {
498 ws_task.url.host.clone()
499 };
500 let mut hdrs = WsHandshake::build_request(&host, &path, &key);
501 if !hdrs.contains("user-agent") {
502 hdrs.set("User-Agent", "pipa/0.1");
503 }
504 let mut write_buf = Vec::new();
505 write_buf.extend_from_slice(b"GET ");
506 write_buf.extend_from_slice(path.as_bytes());
507 write_buf.extend_from_slice(b" HTTP/1.1\r\n");
508 write_buf.extend_from_slice(hdrs.to_request_bytes().as_ref());
509 write_buf.extend_from_slice(b"\r\n");
510 ws_task.phase = WsPhase::Handshake {
511 conn,
512 key,
513 write_buf,
514 write_pos: 0,
515 read_data: Vec::new(),
516 };
517 self.reregister(id, task)?;
518 let _ = self.advance_task(ctx, id);
519 return Ok(());
520 }
521 ConnEvent::Error(e) => {
522 let val = crate::value::JSValue::new_string(ctx.intern(&e));
523 crate::builtins::promise::reject_promise_with_value(
524 ctx,
525 ws_task.promise_ptr,
526 val,
527 );
528 return Ok(());
529 }
530 _ => {
531 ws_task.phase = WsPhase::Connecting(cs);
532 self.reregister(id, task)?;
533 return Ok(());
534 }
535 },
536 WsPhase::Handshake {
537 mut conn,
538 key,
539 write_buf,
540 mut write_pos,
541 mut read_data,
542 } => {
543 if write_pos < write_buf.len() {
544 let remaining = &write_buf[write_pos..];
545 match conn.write(remaining) {
546 Ok(n) => {
547 write_pos += n;
548 }
549 Err(e) if e.kind() == ErrorKind::WouldBlock => {}
550 Err(e) => {
551 let val = crate::value::JSValue::new_string(
552 ctx.intern(&format!("ws write: {e}")),
553 );
554 crate::builtins::promise::reject_promise_with_value(
555 ctx,
556 ws_task.promise_ptr,
557 val,
558 );
559 return Ok(());
560 }
561 }
562 }
563 if write_pos >= write_buf.len() {
564 let mut tmp = [0u8; 8192];
565 match conn.read(&mut tmp) {
566 Ok(0) => {
567 let val = crate::value::JSValue::new_string(
568 ctx.intern("ws handshake closed"),
569 );
570 crate::builtins::promise::reject_promise_with_value(
571 ctx,
572 ws_task.promise_ptr,
573 val,
574 );
575 return Ok(());
576 }
577 Ok(n) => {
578 read_data.extend_from_slice(&tmp[..n]);
579 if let Some(pos) =
580 read_data.windows(4).position(|w| w == b"\r\n\r\n")
581 {
582 let status_line_end = read_data
583 .windows(2)
584 .position(|w| w == b"\r\n")
585 .unwrap_or(0);
586 let status_line = &read_data[..status_line_end];
587 let code = if status_line.len() >= 12 {
588 String::from_utf8_lossy(&status_line[9..12])
589 .parse::<u16>()
590 .unwrap_or(0)
591 } else {
592 0
593 };
594 let status = HttpStatus(code);
595 let header_bytes = &read_data[status_line_end + 2..pos + 4];
596 let (headers, _) = Headers::from_bytes(header_bytes)?;
597 let accept = WsHandshake::validate_response(status, &headers)?;
598 if !WsHandshake::verify_accept(&key, &accept) {
599 let val = crate::value::JSValue::new_string(
600 ctx.intern("ws accept mismatch"),
601 );
602 crate::builtins::promise::reject_promise_with_value(
603 ctx,
604 ws_task.promise_ptr,
605 val,
606 );
607 return Ok(());
608 }
609
610 let ws_obj = unsafe {
611 &mut *(ws_task.ws_obj_ptr
612 as *mut crate::object::object::JSObject)
613 };
614 ws_obj.set(
615 ctx.intern("readyState"),
616 crate::value::JSValue::new_int(1),
617 );
618
619 let val = crate::value::JSValue::new_object(ws_task.ws_obj_ptr);
620 crate::builtins::promise::fulfill_promise_with_value(
621 ctx,
622 ws_task.promise_ptr,
623 val,
624 );
625
626 let onopen = ws_obj
627 .get(ctx.intern("onopen"))
628 .unwrap_or(crate::value::JSValue::undefined());
629 if onopen.is_function() {
630 ctx.event_loop_mut().schedule_macrotask(onopen, vec![]);
631 }
632
633 ws_task.phase = WsPhase::Open {
634 conn,
635 read_buf: [0u8; 8192],
636 read_data: Vec::new(),
637 pending_out: VecDeque::new(),
638 closing: false,
639 close_code: 0,
640 close_reason: String::new(),
641 };
642 self.reregister(id, task)?;
643 return Ok(());
644 }
645 }
646 Err(e) if e.kind() == ErrorKind::WouldBlock => {}
647 Err(e) => {
648 let val = crate::value::JSValue::new_string(
649 ctx.intern(&format!("ws handshake read: {e}")),
650 );
651 crate::builtins::promise::reject_promise_with_value(
652 ctx,
653 ws_task.promise_ptr,
654 val,
655 );
656 return Ok(());
657 }
658 }
659 }
660 ws_task.phase = WsPhase::Handshake {
661 conn,
662 key,
663 write_buf,
664 write_pos,
665 read_data,
666 };
667 self.reregister(id, task)?;
668 return Ok(());
669 }
670 WsPhase::Open {
671 mut conn,
672 mut read_buf,
673 mut read_data,
674 mut pending_out,
675 closing,
676 close_code,
677 close_reason,
678 } => {
679 while let Some(frame) = pending_out.pop_front() {
680 let encoded = frame.encode();
681 match conn.write_all(&encoded) {
682 Ok(_) => {}
683 Err(e) if e.kind() == ErrorKind::WouldBlock => {
684 pending_out.push_front(frame);
685 break;
686 }
687 Err(e) => {
688 dispatch_ws_error(
689 ctx,
690 ws_task.ws_obj_ptr,
691 &format!("ws write: {e}"),
692 );
693 return Ok(());
694 }
695 }
696 }
697
698 if closing && pending_out.is_empty() {
699 let ws_obj = unsafe {
700 &mut *(ws_task.ws_obj_ptr as *mut crate::object::object::JSObject)
701 };
702 ws_obj.set(ctx.intern("readyState"), crate::value::JSValue::new_int(3));
703 dispatch_ws_close(ctx, ws_task.ws_obj_ptr, close_code, close_reason);
704 return Ok(());
705 }
706
707 match conn.read(&mut read_buf) {
708 Ok(0) => {
709 let ws_obj = unsafe {
710 &mut *(ws_task.ws_obj_ptr as *mut crate::object::object::JSObject)
711 };
712 ws_obj.set(ctx.intern("readyState"), crate::value::JSValue::new_int(3));
713 dispatch_ws_close(
714 ctx,
715 ws_task.ws_obj_ptr,
716 1006,
717 "connection closed".into(),
718 );
719 return Ok(());
720 }
721 Ok(n) => {
722 read_data.extend_from_slice(&read_buf[..n]);
723 if let Ok(frames) = WsFrame::parse_all(&read_data) {
724 let consumed = calculate_consumed(&frames);
725 read_data.drain(..consumed);
726 for frame in frames {
727 match frame.opcode {
728 OpCode::Text => {
729 let text = String::from_utf8_lossy(&frame.payload);
730 dispatch_ws_message(
731 ctx,
732 ws_task.ws_obj_ptr,
733 &text,
734 true,
735 );
736 }
737 OpCode::Binary => {
738 let text = String::from_utf8_lossy(&frame.payload);
739 dispatch_ws_message(
740 ctx,
741 ws_task.ws_obj_ptr,
742 &text,
743 false,
744 );
745 }
746 OpCode::Ping => {
747 let pong = WsFrame::new_pong(frame.payload);
748 pending_out.push_back(pong);
749 }
750 OpCode::Close => {
751 let (code, reason) =
752 parse_close_payload(&frame.payload);
753 let close_frame = WsFrame::new_close(code, &reason);
754 pending_out.push_back(close_frame);
755 ws_task.phase = WsPhase::Open {
756 conn,
757 read_buf,
758 read_data,
759 pending_out,
760 closing: true,
761 close_code: code,
762 close_reason: reason,
763 };
764 self.reregister(id, task)?;
765 return Ok(());
766 }
767 OpCode::Pong | OpCode::Continuation => {}
768 }
769 }
770 }
771 }
772 Err(e) if e.kind() == ErrorKind::WouldBlock => {}
773 Err(e) => {
774 dispatch_ws_error(ctx, ws_task.ws_obj_ptr, &format!("ws read: {e}"));
775 return Ok(());
776 }
777 }
778 ws_task.phase = WsPhase::Open {
779 conn,
780 read_buf,
781 read_data,
782 pending_out,
783 closing,
784 close_code,
785 close_reason,
786 };
787 self.reregister(id, task)?;
788 return Ok(());
789 }
790 }
791 }
792 Ok(())
793 }
794
795 fn advance_sse(&mut self, ctx: &mut JSContext, id: u64) -> Result<(), String> {
796 let task = self.tasks.remove(&id);
797 let mut task = match task {
798 Some(t) => t,
799 None => return Ok(()),
800 };
801
802 if let ReactorTask::Sse(sse_task) = &mut task {
803 let phase = std::mem::replace(
804 &mut sse_task.phase,
805 SsePhase::Connecting(ConnectState::dummy()),
806 );
807
808 match phase {
809 SsePhase::Connecting(mut cs) => match cs.try_advance() {
810 ConnEvent::Connected(conn) => {
811 let conn = set_conn_nonblocking(conn)?;
812 let path = sse_task.url.request_target();
813 let host_header = format!("{}:{}", sse_task.url.host, sse_task.url.port);
814 let mut write_buf = Vec::new();
815 write_buf.extend_from_slice(b"GET ");
816 write_buf.extend_from_slice(path.as_bytes());
817 write_buf.extend_from_slice(b" HTTP/1.1\r\nHost: ");
818 write_buf.extend_from_slice(host_header.as_bytes());
819 write_buf.extend_from_slice(b"\r\nAccept: text/event-stream\r\nCache-Control: no-cache\r\nConnection: keep-alive\r\n\r\n");
820 sse_task.phase = SsePhase::WritingRequest {
821 conn,
822 write_buf,
823 write_pos: 0,
824 };
825 self.reregister(id, task)?;
826 let _ = self.advance_task(ctx, id);
827 return Ok(());
828 }
829 ConnEvent::Error(e) => {
830 let val = crate::value::JSValue::new_string(ctx.intern(&e));
831 crate::builtins::promise::reject_promise_with_value(
832 ctx,
833 sse_task.promise_ptr,
834 val,
835 );
836 return Ok(());
837 }
838 _ => {
839 sse_task.phase = SsePhase::Connecting(cs);
840 self.reregister(id, task)?;
841 return Ok(());
842 }
843 },
844 SsePhase::WritingRequest {
845 mut conn,
846 write_buf,
847 mut write_pos,
848 } => {
849 if write_pos < write_buf.len() {
850 let remaining = &write_buf[write_pos..];
851 match conn.write(remaining) {
852 Ok(n) => {
853 write_pos += n;
854 }
855 Err(e) if e.kind() == ErrorKind::WouldBlock => {}
856 Err(e) => {
857 let val = crate::value::JSValue::new_string(
858 ctx.intern(&format!("sse write: {e}")),
859 );
860 crate::builtins::promise::reject_promise_with_value(
861 ctx,
862 sse_task.promise_ptr,
863 val,
864 );
865 return Ok(());
866 }
867 }
868 }
869 if write_pos >= write_buf.len() {
870 sse_task.phase = SsePhase::ReadingHeaders {
871 conn,
872 read_buf: [0u8; 8192],
873 read_data: Vec::new(),
874 };
875 self.reregister(id, task)?;
876 return Ok(());
877 }
878 sse_task.phase = SsePhase::WritingRequest {
879 conn,
880 write_buf,
881 write_pos,
882 };
883 self.reregister(id, task)?;
884 return Ok(());
885 }
886 SsePhase::ReadingHeaders {
887 mut conn,
888 mut read_buf,
889 mut read_data,
890 } => {
891 match conn.read(&mut read_buf) {
892 Ok(0) => {
893 let val = crate::value::JSValue::new_string(
894 ctx.intern("sse closed during headers"),
895 );
896 crate::builtins::promise::reject_promise_with_value(
897 ctx,
898 sse_task.promise_ptr,
899 val,
900 );
901 return Ok(());
902 }
903 Ok(n) => {
904 read_data.extend_from_slice(&read_buf[..n]);
905 if let Some(pos) = read_data.windows(4).position(|w| w == b"\r\n\r\n") {
906 let header_str = String::from_utf8_lossy(&read_data[..pos]);
907 if !header_str.contains("200") && !header_str.contains("201") {
908 let val =
909 crate::value::JSValue::new_string(ctx.intern(&format!(
910 "sse bad status: {}",
911 header_str.lines().next().unwrap_or("")
912 )));
913 crate::builtins::promise::reject_promise_with_value(
914 ctx,
915 sse_task.promise_ptr,
916 val,
917 );
918 return Ok(());
919 }
920
921 let val = crate::value::JSValue::new_object(sse_task.es_obj_ptr);
922 crate::builtins::promise::fulfill_promise_with_value(
923 ctx,
924 sse_task.promise_ptr,
925 val,
926 );
927
928 let es_obj = unsafe {
929 &mut *(sse_task.es_obj_ptr
930 as *mut crate::object::object::JSObject)
931 };
932 es_obj.set(
933 ctx.intern("readyState"),
934 crate::value::JSValue::new_int(1),
935 );
936 let onopen = es_obj
937 .get(ctx.intern("onopen"))
938 .unwrap_or(crate::value::JSValue::undefined());
939 if onopen.is_function() {
940 ctx.event_loop_mut().schedule_macrotask(onopen, vec![]);
941 }
942
943 let remaining = read_data[pos + 4..].to_vec();
944 sse_task.phase = SsePhase::Streaming {
945 conn,
946 read_buf: [0u8; 8192],
947 read_data: Vec::new(),
948 sse_buf: remaining,
949 };
950 self.reregister(id, task)?;
951 return Ok(());
952 }
953 }
954 Err(e) if e.kind() == ErrorKind::WouldBlock => {}
955 Err(e) => {
956 let val = crate::value::JSValue::new_string(
957 ctx.intern(&format!("sse header read: {e}")),
958 );
959 crate::builtins::promise::reject_promise_with_value(
960 ctx,
961 sse_task.promise_ptr,
962 val,
963 );
964 return Ok(());
965 }
966 }
967 sse_task.phase = SsePhase::ReadingHeaders {
968 conn,
969 read_buf,
970 read_data,
971 };
972 self.reregister(id, task)?;
973 return Ok(());
974 }
975 SsePhase::Streaming {
976 mut conn,
977 mut read_buf,
978 read_data: _,
979 mut sse_buf,
980 } => {
981 match conn.read(&mut read_buf) {
982 Ok(0) => {
983 let es_obj = unsafe {
984 &mut *(sse_task.es_obj_ptr as *mut crate::object::object::JSObject)
985 };
986 es_obj.set(ctx.intern("readyState"), crate::value::JSValue::new_int(2));
987 let onclose = es_obj
988 .get(ctx.intern("onclose"))
989 .unwrap_or(crate::value::JSValue::undefined());
990 if onclose.is_function() {
991 ctx.event_loop_mut().schedule_macrotask(onclose, vec![]);
992 }
993 return Ok(());
994 }
995 Ok(n) => {
996 sse_buf.extend_from_slice(&read_buf[..n]);
997 while let Some(event) =
998 crate::builtins::eventsource::parse_sse_event(&mut sse_buf)
999 {
1000 if event.is_closed {
1001 let es_obj = unsafe {
1002 &mut *(sse_task.es_obj_ptr
1003 as *mut crate::object::object::JSObject)
1004 };
1005 es_obj.set(
1006 ctx.intern("readyState"),
1007 crate::value::JSValue::new_int(2),
1008 );
1009 let onclose = es_obj
1010 .get(ctx.intern("onclose"))
1011 .unwrap_or(crate::value::JSValue::undefined());
1012 if onclose.is_function() {
1013 ctx.event_loop_mut().schedule_macrotask(onclose, vec![]);
1014 }
1015 return Ok(());
1016 }
1017 dispatch_sse_event(ctx, sse_task.es_obj_ptr, &event);
1018 }
1019 }
1020 Err(e) if e.kind() == ErrorKind::WouldBlock => {}
1021 Err(e) => {
1022 let es_obj = unsafe {
1023 &mut *(sse_task.es_obj_ptr as *mut crate::object::object::JSObject)
1024 };
1025 let onerror = es_obj
1026 .get(ctx.intern("onerror"))
1027 .unwrap_or(crate::value::JSValue::undefined());
1028 if onerror.is_function() {
1029 let err_val =
1030 crate::value::JSValue::new_string(ctx.intern(&format!("{e}")));
1031 ctx.event_loop_mut()
1032 .schedule_macrotask(onerror, vec![err_val]);
1033 }
1034 return Ok(());
1035 }
1036 }
1037 sse_task.phase = SsePhase::Streaming {
1038 conn,
1039 read_buf,
1040 read_data: Vec::new(),
1041 sse_buf,
1042 };
1043 self.reregister(id, task)?;
1044 return Ok(());
1045 }
1046 }
1047 }
1048 Ok(())
1049 }
1050
1051 pub fn is_empty(&self) -> bool {
1052 self.tasks.is_empty()
1053 }
1054}
1055
1056impl MacroTaskExtension for IoReactor {
1057 fn tick(&mut self, ctx: &mut JSContext) -> Result<bool, String> {
1058 if self.is_empty() {
1059 return Ok(false);
1060 }
1061
1062 let events = self.poll(1)?;
1063 let ids: Vec<u64> = events.iter().map(|(id, _, _)| *id).collect();
1064 for id in ids {
1065 let _ = self.advance_task(ctx, id);
1066 }
1067
1068 Ok(!events.is_empty())
1069 }
1070
1071 fn has_pending(&self) -> bool {
1072 !self.is_empty()
1073 }
1074
1075 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
1076 self
1077 }
1078}
1079
1080fn build_response_js_object(
1081 ctx: &mut JSContext,
1082 mut resp: HttpResponse,
1083 url: &str,
1084) -> crate::value::JSValue {
1085 let body = resp.body_reader.take_body();
1086 let body_text = String::from_utf8_lossy(&body).to_string();
1087
1088 let status = resp.status.0 as i64;
1089 let status_text = resp.status_text;
1090
1091 let mut obj = crate::object::object::JSObject::new();
1092 obj.set(ctx.intern("status"), crate::value::JSValue::new_int(status));
1093 obj.set(
1094 ctx.intern("ok"),
1095 crate::value::JSValue::bool(status >= 200 && status < 300),
1096 );
1097 obj.set(
1098 ctx.intern("statusText"),
1099 crate::value::JSValue::new_string(ctx.intern(&status_text)),
1100 );
1101 obj.set(
1102 ctx.intern("url"),
1103 crate::value::JSValue::new_string(ctx.intern(url)),
1104 );
1105 obj.set(
1106 ctx.intern("__body__"),
1107 crate::value::JSValue::new_string(ctx.intern(&body_text)),
1108 );
1109
1110 let text_fn = create_builtin_function(ctx, "response_text");
1111 obj.set(ctx.intern("text"), text_fn);
1112
1113 let json_fn = create_builtin_function(ctx, "response_json");
1114 obj.set(ctx.intern("json"), json_fn);
1115
1116 let ptr = Box::into_raw(Box::new(obj)) as usize;
1117 crate::value::JSValue::new_object(ptr)
1118}
1119
1120fn create_builtin_function(ctx: &mut JSContext, name: &str) -> crate::value::JSValue {
1121 let mut func = crate::object::function::JSFunction::new_builtin(ctx.intern(name), 1);
1122 func.set_builtin_marker(ctx, name);
1123 let ptr = Box::into_raw(Box::new(func)) as usize;
1124 ctx.runtime_mut().gc_heap_mut().track_function(ptr);
1125 crate::value::JSValue::new_function(ptr)
1126}
1127
1128fn dispatch_ws_message(ctx: &mut JSContext, ws_obj_ptr: usize, data: &str, _is_text: bool) {
1129 let ws_obj = unsafe { &*(ws_obj_ptr as *const crate::object::object::JSObject) };
1130 let on_msg = ws_obj
1131 .get(ctx.intern("onmessage"))
1132 .unwrap_or(crate::value::JSValue::undefined());
1133 if on_msg.is_function() {
1134 let msg_val = crate::value::JSValue::new_string(ctx.intern(data));
1135 ctx.event_loop_mut()
1136 .schedule_macrotask(on_msg, vec![msg_val]);
1137 }
1138}
1139
1140fn dispatch_ws_close(ctx: &mut JSContext, ws_obj_ptr: usize, code: u16, reason: String) {
1141 let ws_obj = unsafe { &*(ws_obj_ptr as *const crate::object::object::JSObject) };
1142 let on_close = ws_obj
1143 .get(ctx.intern("onclose"))
1144 .unwrap_or(crate::value::JSValue::undefined());
1145 if on_close.is_function() {
1146 let mut evt = crate::object::object::JSObject::new();
1147 evt.set(
1148 ctx.intern("code"),
1149 crate::value::JSValue::new_int(code as i64),
1150 );
1151 evt.set(
1152 ctx.intern("reason"),
1153 crate::value::JSValue::new_string(ctx.intern(&reason)),
1154 );
1155 let ptr = Box::into_raw(Box::new(evt)) as usize;
1156 let evt_val = crate::value::JSValue::new_object(ptr);
1157 ctx.event_loop_mut()
1158 .schedule_macrotask(on_close, vec![evt_val]);
1159 }
1160}
1161
1162fn dispatch_ws_error(ctx: &mut JSContext, ws_obj_ptr: usize, error: &str) {
1163 let ws_obj = unsafe { &*(ws_obj_ptr as *const crate::object::object::JSObject) };
1164 let on_error = ws_obj
1165 .get(ctx.intern("onerror"))
1166 .unwrap_or(crate::value::JSValue::undefined());
1167 if on_error.is_function() {
1168 let err_val = crate::value::JSValue::new_string(ctx.intern(error));
1169 ctx.event_loop_mut()
1170 .schedule_macrotask(on_error, vec![err_val]);
1171 }
1172}
1173
1174fn dispatch_sse_event(
1175 ctx: &mut JSContext,
1176 es_obj_ptr: usize,
1177 event: &crate::builtins::eventsource::SseEvent,
1178) {
1179 let es_obj = unsafe { &*(es_obj_ptr as *const crate::object::object::JSObject) };
1180 let on_msg = es_obj
1181 .get(ctx.intern("onmessage"))
1182 .unwrap_or(crate::value::JSValue::undefined());
1183 if on_msg.is_function() {
1184 let mut evt = crate::object::object::JSObject::new();
1185 evt.set(
1186 ctx.intern("data"),
1187 crate::value::JSValue::new_string(ctx.intern(&event.data)),
1188 );
1189 if !event.event_type.is_empty() {
1190 evt.set(
1191 ctx.intern("event"),
1192 crate::value::JSValue::new_string(ctx.intern(&event.event_type)),
1193 );
1194 }
1195 if !event.last_event_id.is_empty() {
1196 evt.set(
1197 ctx.intern("lastEventId"),
1198 crate::value::JSValue::new_string(ctx.intern(&event.last_event_id)),
1199 );
1200 }
1201 let ptr = Box::into_raw(Box::new(evt)) as usize;
1202 let evt_val = crate::value::JSValue::new_object(ptr);
1203 ctx.event_loop_mut()
1204 .schedule_macrotask(on_msg, vec![evt_val]);
1205 }
1206}
1207
1208fn set_conn_nonblocking(conn: Connection) -> Result<Connection, String> {
1209 conn.set_nonblocking(true)?;
1210 Ok(conn)
1211}
1212
1213fn parse_close_payload(payload: &[u8]) -> (u16, String) {
1214 if payload.len() >= 2 {
1215 let code = u16::from_be_bytes([payload[0], payload[1]]);
1216 let reason = if payload.len() > 2 {
1217 String::from_utf8_lossy(&payload[2..]).to_string()
1218 } else {
1219 String::new()
1220 };
1221 (code, reason)
1222 } else {
1223 (1005, String::new())
1224 }
1225}
1226
1227fn calculate_consumed(frames: &[WsFrame]) -> usize {
1228 let mut total = 0usize;
1229 for frame in frames {
1230 let mut frame_size = 2;
1231 let payload_len = frame.payload.len();
1232 if payload_len >= 126 && payload_len <= 0xFFFF {
1233 frame_size += 2;
1234 } else if payload_len > 0xFFFF {
1235 frame_size += 8;
1236 }
1237 if frame.mask.is_some() {
1238 frame_size += 4;
1239 }
1240 frame_size += payload_len;
1241 total += frame_size;
1242 }
1243 total
1244}