1pub use impl_::*;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
6pub enum ChildIpcSerialization {
7 Json,
8 Advanced,
9}
10
11impl std::str::FromStr for ChildIpcSerialization {
12 type Err = deno_core::anyhow::Error;
13 fn from_str(s: &str) -> Result<Self, Self::Err> {
14 match s {
15 "json" => Ok(ChildIpcSerialization::Json),
16 "advanced" => Ok(ChildIpcSerialization::Advanced),
17 _ => Err(deno_core::anyhow::anyhow!(
18 "Invalid serialization type: {}",
19 s
20 )),
21 }
22 }
23}
24
25pub struct ChildPipeFd(pub i64, pub ChildIpcSerialization);
26
27mod impl_ {
28 use std::cell::RefCell;
29 use std::future::Future;
30 use std::io;
31 use std::rc::Rc;
32
33 use deno_core::CancelFuture;
34 use deno_core::OpState;
35 use deno_core::RcRef;
36 use deno_core::ResourceId;
37 use deno_core::ToV8;
38 use deno_core::op2;
39 use deno_core::serde;
40 use deno_core::serde::Serializer;
41 use deno_core::serde_json;
42 use deno_core::v8;
43 use deno_core::v8::ValueDeserializerHelper;
44 use deno_core::v8::ValueSerializerHelper;
45 use deno_error::JsErrorBox;
46 pub use deno_process::ipc::INITIAL_CAPACITY;
47 use deno_process::ipc::IpcAdvancedStreamError;
48 use deno_process::ipc::IpcAdvancedStreamResource;
49 use deno_process::ipc::IpcJsonStreamError;
50 pub use deno_process::ipc::IpcJsonStreamResource;
51 pub use deno_process::ipc::IpcRefTracker;
52 use serde::Serialize;
53
54 use crate::ChildPipeFd;
55 use crate::ops::ipc::ChildIpcSerialization;
56
57 struct SerializeWrapper<'a, 'b, 'c>(
59 RefCell<&'b mut v8::PinScope<'a, 'c>>,
60 v8::Local<'a, v8::Value>,
61 );
62
63 impl Serialize for SerializeWrapper<'_, '_, '_> {
64 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
65 where
66 S: Serializer,
67 {
68 serialize_v8_value(*self.0.borrow_mut(), self.1, serializer)
69 }
70 }
71
72 fn serialize_v8_value<'a, S: Serializer>(
76 scope: &mut v8::PinScope<'a, '_>,
77 value: v8::Local<'a, v8::Value>,
78 ser: S,
79 ) -> Result<S::Ok, S::Error> {
80 use serde::ser::Error;
81 if value.is_null_or_undefined() {
82 ser.serialize_unit()
83 } else if value.is_number() || value.is_number_object() {
84 let num_value = value.number_value(scope).unwrap();
85 if (num_value as i64 as f64) == num_value {
86 ser.serialize_i64(num_value as i64)
87 } else {
88 ser.serialize_f64(num_value)
89 }
90 } else if value.is_string() {
91 let str = deno_core::serde_v8::to_utf8(value.try_into().unwrap(), scope);
92 ser.serialize_str(&str)
93 } else if value.is_string_object() {
94 let str = deno_core::serde_v8::to_utf8(
95 value.to_string(scope).ok_or_else(|| {
96 S::Error::custom(deno_error::JsErrorBox::generic(
97 "toString on string object failed",
98 ))
99 })?,
100 scope,
101 );
102 ser.serialize_str(&str)
103 } else if value.is_boolean() {
104 ser.serialize_bool(value.is_true())
105 } else if value.is_boolean_object() {
106 ser.serialize_bool(value.boolean_value(scope))
107 } else if value.is_array() {
108 use serde::ser::SerializeSeq;
109 let array = value.cast::<v8::Array>();
110 let length = array.length();
111 let mut seq = ser.serialize_seq(Some(length as usize))?;
112 for i in 0..length {
113 let element = array.get_index(scope, i).unwrap();
114 seq
115 .serialize_element(&SerializeWrapper(RefCell::new(scope), element))?;
116 }
117 seq.end()
118 } else if value.is_object() {
119 use serde::ser::SerializeMap;
120 if value.is_array_buffer_view() {
121 let buffer = value.cast::<v8::ArrayBufferView>();
122 let mut buf = vec![0u8; buffer.byte_length()];
123 let copied = buffer.copy_contents(&mut buf);
124 debug_assert_eq!(copied, buf.len());
125 return ser.serialize_bytes(&buf);
126 }
127 let object = value.cast::<v8::Object>();
128 let to_json_key = v8::String::new_from_utf8(
131 scope,
132 b"toJSON",
133 v8::NewStringType::Internalized,
134 )
135 .unwrap()
136 .into();
137 if let Some(to_json) = object.get(scope, to_json_key)
138 && let Ok(to_json) = to_json.try_cast::<v8::Function>()
139 {
140 let json_value = to_json.call(scope, object.into(), &[]).unwrap();
141 return serialize_v8_value(scope, json_value, ser);
142 }
143
144 let keys = object
145 .get_own_property_names(
146 scope,
147 v8::GetPropertyNamesArgs {
148 ..Default::default()
149 },
150 )
151 .unwrap();
152 let num_keys = keys.length();
153 let mut map = ser.serialize_map(Some(num_keys as usize))?;
154 for i in 0..num_keys {
155 let key = keys.get_index(scope, i).unwrap();
156 let key_str = key.to_rust_string_lossy(scope);
157 let value = object.get(scope, key).unwrap();
158 if value.is_undefined() {
159 continue;
160 }
161 map.serialize_entry(
162 &key_str,
163 &SerializeWrapper(RefCell::new(scope), value),
164 )?;
165 }
166 map.end()
167 } else {
168 Err(S::Error::custom(JsErrorBox::type_error(format!(
170 "Unsupported type: {}",
171 value.type_repr()
172 ))))
173 }
174 }
175
176 #[op2]
178 pub fn op_node_child_ipc_pipe(
179 state: &mut OpState,
180 ) -> Result<Option<(ResourceId, u8)>, io::Error> {
181 let (fd, serialization) = match state.try_borrow_mut::<crate::ChildPipeFd>()
182 {
183 Some(ChildPipeFd(fd, serialization)) => (*fd, *serialization),
184 None => return Ok(None),
185 };
186 log::debug!("op_node_child_ipc_pipe: {:?}, {:?}", fd, serialization);
187 let ref_tracker = IpcRefTracker::new(state.external_ops_tracker.clone());
188 match serialization {
189 ChildIpcSerialization::Json => {
190 match IpcJsonStreamResource::new(fd, ref_tracker) {
191 Ok(resource) => Ok(Some((state.resource_table.add(resource), 0))),
192 Err(err) => {
193 log::error!(
194 "Failed to open IPC channel from NODE_CHANNEL_FD ({fd}): {err}"
195 );
196 std::process::exit(1);
197 }
198 }
199 }
200 ChildIpcSerialization::Advanced => {
201 match IpcAdvancedStreamResource::new(fd, ref_tracker) {
202 Ok(resource) => Ok(Some((state.resource_table.add(resource), 1))),
203 Err(err) => {
204 log::error!(
205 "Failed to open IPC channel from NODE_CHANNEL_FD ({fd}): {err}"
206 );
207 std::process::exit(1);
208 }
209 }
210 }
211 }
212 }
213
214 #[derive(Debug, thiserror::Error, deno_error::JsError)]
215 pub enum IpcError {
216 #[class(inherit)]
217 #[error(transparent)]
218 Resource(#[from] deno_core::error::ResourceError),
219 #[class(inherit)]
220 #[error(transparent)]
221 IpcAdvancedStream(#[from] IpcAdvancedStreamError),
222 #[class(inherit)]
223 #[error(transparent)]
224 IpcJsonStream(#[from] IpcJsonStreamError),
225 #[class(inherit)]
226 #[error(transparent)]
227 Canceled(#[from] deno_core::Canceled),
228 #[class(inherit)]
229 #[error("failed to serialize json value: {0}")]
230 SerdeJson(serde_json::Error),
231 #[class(type)]
232 #[error("Failed to read header")]
233 ReadHeaderFailed,
234 #[class(type)]
235 #[error("Failed to read value")]
236 ReadValueFailed,
237 }
238
239 #[op2]
240 pub fn op_node_ipc_write_json<'a>(
241 scope: &mut v8::PinScope<'a, '_>,
242 state: Rc<RefCell<OpState>>,
243 #[smi] rid: ResourceId,
244 value: v8::Local<'a, v8::Value>,
245 queue_ok: v8::Local<'a, v8::Array>,
251 ) -> Result<impl Future<Output = Result<(), io::Error>> + use<>, IpcError> {
252 let mut serialized = Vec::with_capacity(64);
253 let mut ser = serde_json::Serializer::new(&mut serialized);
254 serialize_v8_value(scope, value, &mut ser).map_err(IpcError::SerdeJson)?;
255 serialized.push(b'\n');
256
257 let stream = state
258 .borrow()
259 .resource_table
260 .get::<IpcJsonStreamResource>(rid)?;
261 let old = stream
262 .queued_bytes
263 .fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed);
264 if old + serialized.len() > 2 * INITIAL_CAPACITY {
265 let v = false.to_v8(scope).unwrap(); queue_ok.set_index(scope, 0, v);
268 }
269 Ok(async move {
270 let cancel = stream.cancel.clone();
271 let result = stream
272 .clone()
273 .write_msg_bytes(&serialized)
274 .or_cancel(cancel)
275 .await;
276 stream
278 .queued_bytes
279 .fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed);
280 result??;
281 Ok(())
282 })
283 }
284
285 pub struct AdvancedSerializerDelegate {
286 constants: AdvancedIpcConstants,
287 }
288
289 impl AdvancedSerializerDelegate {
290 fn new(constants: AdvancedIpcConstants) -> Self {
291 Self { constants }
292 }
293 }
294
295 const ARRAY_BUFFER_VIEW_TAG: u32 = 0;
296 const NOT_ARRAY_BUFFER_VIEW_TAG: u32 = 1;
297
298 fn ab_view_to_index<'s>(
299 scope: &mut v8::PinScope<'s, '_>,
300 view: v8::Local<'s, v8::ArrayBufferView>,
301 constants: &AdvancedIpcConstants,
302 ) -> Option<u32> {
303 if view.is_int8_array() {
304 Some(0)
305 } else if view.is_uint8_array() {
306 let constructor = view
307 .get(
308 scope,
309 v8::Local::new(scope, &constants.inner.constructor_key).into(),
310 )
311 .unwrap();
312 let buffer_constructor = v8::Local::<v8::Value>::from(v8::Local::new(
313 scope,
314 &constants.inner.buffer_constructor,
315 ));
316 if constructor == buffer_constructor {
317 Some(10)
318 } else {
319 Some(1)
320 }
321 } else if view.is_uint8_clamped_array() {
322 Some(2)
323 } else if view.is_int16_array() {
324 Some(3)
325 } else if view.is_uint16_array() {
326 Some(4)
327 } else if view.is_int32_array() {
328 Some(5)
329 } else if view.is_uint32_array() {
330 Some(6)
331 } else if view.is_float32_array() {
332 Some(7)
333 } else if view.is_float64_array() {
334 Some(8)
335 } else if view.is_data_view() {
336 Some(9)
337 } else if view.is_big_int64_array() {
338 Some(11)
339 } else if view.is_big_uint64_array() {
340 Some(12)
341 } else if view.is_float16_array() {
342 Some(13)
343 } else {
344 None
345 }
346 }
347
348 impl v8::ValueSerializerImpl for AdvancedSerializerDelegate {
349 fn throw_data_clone_error<'s>(
350 &self,
351 scope: &mut v8::PinScope<'s, '_>,
352 message: v8::Local<'s, v8::String>,
353 ) {
354 let error = v8::Exception::type_error(scope, message);
355 scope.throw_exception(error);
356 }
357
358 fn has_custom_host_object(&self, _isolate: &v8::Isolate) -> bool {
359 false
360 }
361
362 fn write_host_object<'s>(
363 &self,
364 scope: &mut v8::PinScope<'s, '_>,
365 object: v8::Local<'s, v8::Object>,
366 value_serializer: &dyn v8::ValueSerializerHelper,
367 ) -> Option<bool> {
368 if object.is_array_buffer_view() {
369 let ab_view = object.cast::<v8::ArrayBufferView>();
370 value_serializer.write_uint32(ARRAY_BUFFER_VIEW_TAG);
371 let Some(index) = ab_view_to_index(scope, ab_view, &self.constants)
372 else {
373 scope.throw_exception(v8::Exception::type_error(
374 scope,
375 v8::String::new_from_utf8(
376 scope,
377 format!("Unserializable host object: {}", object.type_repr())
378 .as_bytes(),
379 v8::NewStringType::Normal,
380 )
381 .unwrap(),
382 ));
383 return None;
384 };
385 value_serializer.write_uint32(index);
386 value_serializer.write_uint32(ab_view.byte_length() as u32);
387 let mut storage = [0u8; v8::TYPED_ARRAY_MAX_SIZE_IN_HEAP];
388 let slice = ab_view.get_contents(&mut storage);
389 value_serializer.write_raw_bytes(slice);
390 Some(true)
391 } else {
392 value_serializer.write_uint32(NOT_ARRAY_BUFFER_VIEW_TAG);
393 value_serializer
394 .write_value(scope.get_current_context(), object.into());
395 Some(true)
396 }
397 }
398
399 fn get_shared_array_buffer_id<'s>(
400 &self,
401 _scope: &mut v8::PinScope<'s, '_>,
402 _shared_array_buffer: v8::Local<'s, v8::SharedArrayBuffer>,
403 ) -> Option<u32> {
404 None
405 }
406 }
407
408 #[derive(Clone)]
409 struct AdvancedIpcConstants {
410 inner: Rc<AdvancedIpcConstantsInner>,
411 }
412 struct AdvancedIpcConstantsInner {
413 buffer_constructor: v8::Global<v8::Function>,
414 constructor_key: v8::Global<v8::String>,
415 fast_buffer_prototype: v8::Global<v8::Object>,
416 }
417
418 #[op2(fast)]
419 pub fn op_node_ipc_buffer_constructor(
420 scope: &mut v8::PinScope<'_, '_>,
421 state: &mut OpState,
422 buffer_constructor: v8::Local<'_, v8::Function>,
423 fast_buffer_prototype: v8::Local<'_, v8::Object>,
424 ) {
425 if state.has::<AdvancedIpcConstants>() {
426 return;
427 }
428 let constants = AdvancedIpcConstants {
429 inner: Rc::new(AdvancedIpcConstantsInner {
430 buffer_constructor: v8::Global::new(scope, buffer_constructor),
431 constructor_key: v8::Global::new(
432 scope,
433 v8::String::new_from_utf8(
434 scope,
435 b"constructor",
436 v8::NewStringType::Internalized,
437 )
438 .unwrap(),
439 ),
440 fast_buffer_prototype: v8::Global::new(scope, fast_buffer_prototype),
441 }),
442 };
443 state.put(constants);
444 }
445
446 #[op2]
447 pub fn op_node_ipc_write_advanced<'a>(
448 scope: &mut v8::PinScope<'a, '_>,
449 state: Rc<RefCell<OpState>>,
450 #[smi] rid: ResourceId,
451 value: v8::Local<'a, v8::Value>,
452 queue_ok: v8::Local<'a, v8::Array>,
458 ) -> Result<impl Future<Output = Result<(), io::Error>> + use<>, IpcError> {
459 let constants = state.borrow().borrow::<AdvancedIpcConstants>().clone();
460 let serializer = AdvancedSerializer::new(scope, constants);
461 let serialized = serializer.serialize(scope, value)?;
462
463 let stream = state
464 .borrow()
465 .resource_table
466 .get::<IpcAdvancedStreamResource>(rid)?;
467 let old = stream
468 .queued_bytes
469 .fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed);
470 if old + serialized.len() > 2 * INITIAL_CAPACITY {
471 let Ok(v) = false.to_v8(scope);
473 queue_ok.set_index(scope, 0, v);
474 }
475 Ok(async move {
476 let cancel = stream.cancel.clone();
477 let result = stream
478 .clone()
479 .write_msg_bytes(&serialized)
480 .or_cancel(cancel)
481 .await;
482 stream
484 .queued_bytes
485 .fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed);
486 result??;
487 Ok(())
488 })
489 }
490
491 struct AdvancedSerializer {
492 inner: v8::ValueSerializer<'static>,
493 }
494
495 impl AdvancedSerializer {
496 fn new(
497 scope: &mut v8::PinScope<'_, '_>,
498 constants: AdvancedIpcConstants,
499 ) -> Self {
500 let inner = v8::ValueSerializer::new(
501 scope,
502 Box::new(AdvancedSerializerDelegate::new(constants)),
503 );
504 inner.set_treat_array_buffer_views_as_host_objects(true);
505 Self { inner }
506 }
507
508 fn serialize<'s, 'i>(
509 &self,
510 scope: &mut v8::PinScope<'s, 'i>,
511 value: v8::Local<'s, v8::Value>,
512 ) -> Result<Vec<u8>, IpcError> {
513 self.inner.write_raw_bytes(&[0, 0, 0, 0]);
514 self.inner.write_header();
515 let context = scope.get_current_context();
516 self.inner.write_value(context, value);
517 let mut ser = self.inner.release();
518 let length = ser.len() - 4;
519 ser[0] = ((length >> 24) & 0xFF) as u8;
520 ser[1] = ((length >> 16) & 0xFF) as u8;
521 ser[2] = ((length >> 8) & 0xFF) as u8;
522 ser[3] = (length & 0xFF) as u8;
523 Ok(ser)
524 }
525 }
526
527 struct AdvancedIpcDeserializer {
528 inner: v8::ValueDeserializer<'static>,
529 }
530
531 struct AdvancedIpcDeserializerDelegate {
532 constants: AdvancedIpcConstants,
533 }
534
535 impl v8::ValueDeserializerImpl for AdvancedIpcDeserializerDelegate {
536 fn read_host_object<'s>(
537 &self,
538 scope: &mut v8::PinScope<'s, '_>,
539 deser: &dyn ValueDeserializerHelper,
540 ) -> Option<v8::Local<'s, v8::Object>> {
541 let throw_error = |message: &str| {
542 scope.throw_exception(v8::Exception::type_error(
543 scope,
544 v8::String::new_from_utf8(
545 scope,
546 message.as_bytes(),
547 v8::NewStringType::Normal,
548 )
549 .unwrap(),
550 ));
551 None
552 };
553 let mut tag = 0;
554 if !deser.read_uint32(&mut tag) {
555 return throw_error("Failed to read tag");
556 }
557 match tag {
558 ARRAY_BUFFER_VIEW_TAG => {
559 let mut index = 0;
560 if !deser.read_uint32(&mut index) {
561 return throw_error("Failed to read array buffer view type tag");
562 }
563 let mut byte_length = 0;
564 if !deser.read_uint32(&mut byte_length) {
565 return throw_error("Failed to read byte length");
566 }
567 let Some(buf) = deser.read_raw_bytes(byte_length as usize) else {
568 return throw_error("failed to read bytes for typed array");
569 };
570
571 let array_buffer = v8::ArrayBuffer::new(scope, byte_length as usize);
572 unsafe {
575 std::ptr::copy(
576 buf.as_ptr(),
577 array_buffer.data().unwrap().as_ptr().cast::<u8>(),
578 byte_length as usize,
579 );
580 }
581
582 let value = match index {
583 0 => {
584 v8::Int8Array::new(scope, array_buffer, 0, byte_length as usize)
585 .unwrap()
586 .into()
587 }
588 1 => {
589 v8::Uint8Array::new(scope, array_buffer, 0, byte_length as usize)
590 .unwrap()
591 .into()
592 }
593 10 => {
594 let obj: v8::Local<v8::Object> = v8::Uint8Array::new(
595 scope,
596 array_buffer,
597 0,
598 byte_length as usize,
599 )?
600 .into();
601 let fast_proto = v8::Local::new(
602 scope,
603 &self.constants.inner.fast_buffer_prototype,
604 );
605 obj.set_prototype(scope, fast_proto.into());
606 obj
607 }
608 2 => v8::Uint8ClampedArray::new(
609 scope,
610 array_buffer,
611 0,
612 byte_length as usize,
613 )?
614 .into(),
615 3 => v8::Int16Array::new(
616 scope,
617 array_buffer,
618 0,
619 byte_length as usize / 2,
620 )?
621 .into(),
622 4 => v8::Uint16Array::new(
623 scope,
624 array_buffer,
625 0,
626 byte_length as usize / 2,
627 )?
628 .into(),
629 5 => v8::Int32Array::new(
630 scope,
631 array_buffer,
632 0,
633 byte_length as usize / 4,
634 )?
635 .into(),
636 6 => v8::Uint32Array::new(
637 scope,
638 array_buffer,
639 0,
640 byte_length as usize / 4,
641 )?
642 .into(),
643 7 => v8::Float32Array::new(
644 scope,
645 array_buffer,
646 0,
647 byte_length as usize / 4,
648 )
649 .unwrap()
650 .into(),
651 8 => v8::Float64Array::new(
652 scope,
653 array_buffer,
654 0,
655 byte_length as usize / 8,
656 )?
657 .into(),
658 9 => {
659 v8::DataView::new(scope, array_buffer, 0, byte_length as usize)
660 .into()
661 }
662 11 => v8::BigInt64Array::new(
663 scope,
664 array_buffer,
665 0,
666 byte_length as usize / 8,
667 )?
668 .into(),
669 12 => v8::BigUint64Array::new(
670 scope,
671 array_buffer,
672 0,
673 byte_length as usize / 8,
674 )?
675 .into(),
676 13 => unsafe {
680 std::mem::transmute::<
681 v8::Local<v8::Float16Array>,
682 v8::Local<v8::Object>,
683 >(v8::Float16Array::new(
684 scope,
685 array_buffer,
686 0,
687 byte_length as usize / 2,
688 )?)
689 },
690 _ => return None,
691 };
692 Some(value)
693 }
694 NOT_ARRAY_BUFFER_VIEW_TAG => {
695 let value = deser.read_value(scope.get_current_context());
696 Some(value.unwrap_or_else(|| v8::null(scope).into()).cast())
697 }
698 _ => {
699 throw_error(&format!("Invalid tag: {}", tag));
700 None
701 }
702 }
703 }
704 }
705
706 impl AdvancedIpcDeserializer {
707 fn new(
708 scope: &mut v8::PinScope<'_, '_>,
709 constants: AdvancedIpcConstants,
710 msg_bytes: &[u8],
711 ) -> Self {
712 let inner = v8::ValueDeserializer::new(
713 scope,
714 Box::new(AdvancedIpcDeserializerDelegate { constants }),
715 msg_bytes,
716 );
717 Self { inner }
718 }
719 }
720
721 struct AdvancedIpcReadResult {
722 msg_bytes: Option<Vec<u8>>,
723 constants: AdvancedIpcConstants,
724 }
725
726 fn make_stop_sentinel<'s>(
727 scope: &mut v8::PinScope<'s, '_>,
728 ) -> v8::Local<'s, v8::Value> {
729 let obj = v8::Object::new(scope);
730 obj.set(
731 scope,
732 v8::String::new_from_utf8(scope, b"cmd", v8::NewStringType::Internalized)
733 .unwrap()
734 .into(),
735 v8::String::new_from_utf8(
736 scope,
737 b"NODE_CLOSE",
738 v8::NewStringType::Internalized,
739 )
740 .unwrap()
741 .into(),
742 );
743 obj.into()
744 }
745
746 impl<'a> deno_core::ToV8<'a> for AdvancedIpcReadResult {
747 type Error = IpcError;
748 fn to_v8(
749 self,
750 scope: &mut v8::PinScope<'a, '_>,
751 ) -> Result<v8::Local<'a, v8::Value>, Self::Error> {
752 let Some(msg_bytes) = self.msg_bytes else {
753 return Ok(make_stop_sentinel(scope));
754 };
755 let deser =
756 AdvancedIpcDeserializer::new(scope, self.constants, &msg_bytes);
757 let context = scope.get_current_context();
758 let header_success = deser.inner.read_header(context).unwrap_or(false);
759 if !header_success {
760 return Err(IpcError::ReadHeaderFailed);
761 }
762 let Some(value) = deser.inner.read_value(context) else {
763 return Err(IpcError::ReadValueFailed);
764 };
765 Ok(value)
766 }
767 }
768
769 #[op2]
770 pub async fn op_node_ipc_read_advanced(
771 state: Rc<RefCell<OpState>>,
772 #[smi] rid: ResourceId,
773 ) -> Result<AdvancedIpcReadResult, IpcError> {
774 let stream = state
775 .borrow()
776 .resource_table
777 .get::<IpcAdvancedStreamResource>(rid)?;
778 let cancel = stream.cancel.clone();
779 let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
780 let msg_bytes = stream.read_msg_bytes().or_cancel(cancel).await??;
781
782 Ok(AdvancedIpcReadResult {
783 msg_bytes,
784 constants: state.borrow().borrow::<AdvancedIpcConstants>().clone(),
785 })
786 }
787
788 fn stop_sentinel() -> serde_json::Value {
793 serde_json::json!({
794 "cmd": "NODE_CLOSE"
795 })
796 }
797
798 #[op2]
799 #[serde]
800 pub async fn op_node_ipc_read_json(
801 state: Rc<RefCell<OpState>>,
802 #[smi] rid: ResourceId,
803 ) -> Result<serde_json::Value, IpcError> {
804 let stream = state
805 .borrow()
806 .resource_table
807 .get::<IpcJsonStreamResource>(rid)?;
808
809 let cancel = stream.cancel.clone();
810 let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
811 let msgs = stream.read_msg().or_cancel(cancel).await??;
812 if let Some(msg) = msgs {
813 Ok(msg)
814 } else {
815 Ok(stop_sentinel())
816 }
817 }
818
819 #[op2(fast)]
820 pub fn op_node_ipc_ref(
821 state: &mut OpState,
822 #[smi] rid: ResourceId,
823 serialization_json: bool,
824 ) {
825 if serialization_json {
826 let stream = state
827 .resource_table
828 .get::<IpcJsonStreamResource>(rid)
829 .expect("Invalid resource ID");
830 stream.ref_tracker.ref_();
831 } else {
832 let stream = state
833 .resource_table
834 .get::<IpcAdvancedStreamResource>(rid)
835 .expect("Invalid resource ID");
836 stream.ref_tracker.ref_();
837 }
838 }
839
840 #[op2(fast)]
841 pub fn op_node_ipc_unref(
842 state: &mut OpState,
843 #[smi] rid: ResourceId,
844 serialization_json: bool,
845 ) {
846 if serialization_json {
847 let stream = state
848 .resource_table
849 .get::<IpcJsonStreamResource>(rid)
850 .expect("Invalid resource ID");
851 stream.ref_tracker.unref();
852 } else {
853 let stream = state
854 .resource_table
855 .get::<IpcAdvancedStreamResource>(rid)
856 .expect("Invalid resource ID");
857 stream.ref_tracker.unref();
858 }
859 }
860
861 #[cfg(test)]
862 mod tests {
863 use deno_core::JsRuntime;
864 use deno_core::RuntimeOptions;
865 use deno_core::v8;
866
867 fn wrap_expr(s: &str) -> String {
868 format!("(function () {{ return {s}; }})()")
869 }
870
871 fn serialize_js_to_json(runtime: &mut JsRuntime, js: String) -> String {
872 let val = runtime.execute_script("", js).unwrap();
873 deno_core::scope!(scope, runtime);
874 let val = v8::Local::new(scope, val);
875 let mut buf = Vec::new();
876 let mut ser = deno_core::serde_json::Serializer::new(&mut buf);
877 super::serialize_v8_value(scope, val, &mut ser).unwrap();
878 String::from_utf8(buf).unwrap()
879 }
880
881 #[test]
882 fn ipc_serialization() {
883 let mut runtime = JsRuntime::new(RuntimeOptions::default());
884
885 let cases = [
886 ("'hello'", "\"hello\""),
887 ("1", "1"),
888 ("1.5", "1.5"),
889 ("Number.NaN", "null"),
890 ("Infinity", "null"),
891 ("Number.MAX_SAFE_INTEGER", &(2i64.pow(53) - 1).to_string()),
892 (
893 "Number.MIN_SAFE_INTEGER",
894 &(-(2i64.pow(53) - 1)).to_string(),
895 ),
896 ("[1, 2, 3]", "[1,2,3]"),
897 ("new Uint8Array([1,2,3])", "[1,2,3]"),
898 (
899 "{ a: 1.5, b: { c: new ArrayBuffer(5) }}",
900 r#"{"a":1.5,"b":{"c":{}}}"#,
901 ),
902 ("new Number(1)", "1"),
903 ("new Boolean(true)", "true"),
904 ("true", "true"),
905 (r#"new String("foo")"#, "\"foo\""),
906 ("null", "null"),
907 (
908 r#"{ a: "field", toJSON() { return "custom"; } }"#,
909 "\"custom\"",
910 ),
911 (r#"{ a: undefined, b: 1 }"#, "{\"b\":1}"),
912 ];
913
914 for (input, expect) in cases {
915 let js = wrap_expr(input);
916 let actual = serialize_js_to_json(&mut runtime, js);
917 assert_eq!(actual, expect);
918 }
919 }
920 }
921}