Skip to main content

deno_node/ops/
ipc.rs

1// Copyright 2018-2026 the Deno authors. MIT license.
2
3pub use impl_::*;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
6pub enum ChildIpcSerialization {
7  Json,
8  Advanced,
9}
10
11impl std::str::FromStr for ChildIpcSerialization {
12  type Err = deno_core::anyhow::Error;
13  fn from_str(s: &str) -> Result<Self, Self::Err> {
14    match s {
15      "json" => Ok(ChildIpcSerialization::Json),
16      "advanced" => Ok(ChildIpcSerialization::Advanced),
17      _ => Err(deno_core::anyhow::anyhow!(
18        "Invalid serialization type: {}",
19        s
20      )),
21    }
22  }
23}
24
25pub struct ChildPipeFd(pub i64, pub ChildIpcSerialization);
26
27mod impl_ {
28  use std::cell::RefCell;
29  use std::future::Future;
30  use std::io;
31  use std::rc::Rc;
32
33  use deno_core::CancelFuture;
34  use deno_core::OpState;
35  use deno_core::RcRef;
36  use deno_core::ResourceId;
37  use deno_core::ToV8;
38  use deno_core::op2;
39  use deno_core::serde;
40  use deno_core::serde::Serializer;
41  use deno_core::serde_json;
42  use deno_core::v8;
43  use deno_core::v8::ValueDeserializerHelper;
44  use deno_core::v8::ValueSerializerHelper;
45  use deno_error::JsErrorBox;
46  pub use deno_process::ipc::INITIAL_CAPACITY;
47  use deno_process::ipc::IpcAdvancedStreamError;
48  use deno_process::ipc::IpcAdvancedStreamResource;
49  use deno_process::ipc::IpcJsonStreamError;
50  pub use deno_process::ipc::IpcJsonStreamResource;
51  pub use deno_process::ipc::IpcRefTracker;
52  use serde::Serialize;
53
54  use crate::ChildPipeFd;
55  use crate::ops::ipc::ChildIpcSerialization;
56
57  /// Wrapper around v8 value that implements Serialize.
58  struct SerializeWrapper<'a, 'b, 'c>(
59    RefCell<&'b mut v8::PinScope<'a, 'c>>,
60    v8::Local<'a, v8::Value>,
61  );
62
63  impl Serialize for SerializeWrapper<'_, '_, '_> {
64    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
65    where
66      S: Serializer,
67    {
68      serialize_v8_value(*self.0.borrow_mut(), self.1, serializer)
69    }
70  }
71
72  /// Serialize a v8 value directly into a serde serializer.
73  /// This allows us to go from v8 values to JSON without having to
74  /// deserialize into a `serde_json::Value` and then reserialize to JSON
75  fn serialize_v8_value<'a, S: Serializer>(
76    scope: &mut v8::PinScope<'a, '_>,
77    value: v8::Local<'a, v8::Value>,
78    ser: S,
79  ) -> Result<S::Ok, S::Error> {
80    use serde::ser::Error;
81    if value.is_null_or_undefined() {
82      ser.serialize_unit()
83    } else if value.is_number() || value.is_number_object() {
84      let num_value = value.number_value(scope).unwrap();
85      if (num_value as i64 as f64) == num_value {
86        ser.serialize_i64(num_value as i64)
87      } else {
88        ser.serialize_f64(num_value)
89      }
90    } else if value.is_string() {
91      let str = deno_core::serde_v8::to_utf8(value.try_into().unwrap(), scope);
92      ser.serialize_str(&str)
93    } else if value.is_string_object() {
94      let str = deno_core::serde_v8::to_utf8(
95        value.to_string(scope).ok_or_else(|| {
96          S::Error::custom(deno_error::JsErrorBox::generic(
97            "toString on string object failed",
98          ))
99        })?,
100        scope,
101      );
102      ser.serialize_str(&str)
103    } else if value.is_boolean() {
104      ser.serialize_bool(value.is_true())
105    } else if value.is_boolean_object() {
106      ser.serialize_bool(value.boolean_value(scope))
107    } else if value.is_array() {
108      use serde::ser::SerializeSeq;
109      let array = value.cast::<v8::Array>();
110      let length = array.length();
111      let mut seq = ser.serialize_seq(Some(length as usize))?;
112      for i in 0..length {
113        let element = array.get_index(scope, i).unwrap();
114        seq
115          .serialize_element(&SerializeWrapper(RefCell::new(scope), element))?;
116      }
117      seq.end()
118    } else if value.is_object() {
119      use serde::ser::SerializeMap;
120      if value.is_array_buffer_view() {
121        let buffer = value.cast::<v8::ArrayBufferView>();
122        let mut buf = vec![0u8; buffer.byte_length()];
123        let copied = buffer.copy_contents(&mut buf);
124        debug_assert_eq!(copied, buf.len());
125        return ser.serialize_bytes(&buf);
126      }
127      let object = value.cast::<v8::Object>();
128      // node uses `JSON.stringify`, so to match its behavior (and allow serializing custom objects)
129      // we need to respect the `toJSON` method if it exists.
130      let to_json_key = v8::String::new_from_utf8(
131        scope,
132        b"toJSON",
133        v8::NewStringType::Internalized,
134      )
135      .unwrap()
136      .into();
137      if let Some(to_json) = object.get(scope, to_json_key)
138        && let Ok(to_json) = to_json.try_cast::<v8::Function>()
139      {
140        let json_value = to_json.call(scope, object.into(), &[]).unwrap();
141        return serialize_v8_value(scope, json_value, ser);
142      }
143
144      let keys = object
145        .get_own_property_names(
146          scope,
147          v8::GetPropertyNamesArgs {
148            ..Default::default()
149          },
150        )
151        .unwrap();
152      let num_keys = keys.length();
153      let mut map = ser.serialize_map(Some(num_keys as usize))?;
154      for i in 0..num_keys {
155        let key = keys.get_index(scope, i).unwrap();
156        let key_str = key.to_rust_string_lossy(scope);
157        let value = object.get(scope, key).unwrap();
158        if value.is_undefined() {
159          continue;
160        }
161        map.serialize_entry(
162          &key_str,
163          &SerializeWrapper(RefCell::new(scope), value),
164        )?;
165      }
166      map.end()
167    } else {
168      // TODO(nathanwhit): better error message
169      Err(S::Error::custom(JsErrorBox::type_error(format!(
170        "Unsupported type: {}",
171        value.type_repr()
172      ))))
173    }
174  }
175
176  // Open IPC pipe from bootstrap options.
177  #[op2]
178  pub fn op_node_child_ipc_pipe(
179    state: &mut OpState,
180  ) -> Result<Option<(ResourceId, u8)>, io::Error> {
181    let (fd, serialization) = match state.try_borrow_mut::<crate::ChildPipeFd>()
182    {
183      Some(ChildPipeFd(fd, serialization)) => (*fd, *serialization),
184      None => return Ok(None),
185    };
186    log::debug!("op_node_child_ipc_pipe: {:?}, {:?}", fd, serialization);
187    let ref_tracker = IpcRefTracker::new(state.external_ops_tracker.clone());
188    match serialization {
189      ChildIpcSerialization::Json => {
190        match IpcJsonStreamResource::new(fd, ref_tracker) {
191          Ok(resource) => Ok(Some((state.resource_table.add(resource), 0))),
192          Err(err) => {
193            log::error!(
194              "Failed to open IPC channel from NODE_CHANNEL_FD ({fd}): {err}"
195            );
196            std::process::exit(1);
197          }
198        }
199      }
200      ChildIpcSerialization::Advanced => {
201        match IpcAdvancedStreamResource::new(fd, ref_tracker) {
202          Ok(resource) => Ok(Some((state.resource_table.add(resource), 1))),
203          Err(err) => {
204            log::error!(
205              "Failed to open IPC channel from NODE_CHANNEL_FD ({fd}): {err}"
206            );
207            std::process::exit(1);
208          }
209        }
210      }
211    }
212  }
213
214  #[derive(Debug, thiserror::Error, deno_error::JsError)]
215  pub enum IpcError {
216    #[class(inherit)]
217    #[error(transparent)]
218    Resource(#[from] deno_core::error::ResourceError),
219    #[class(inherit)]
220    #[error(transparent)]
221    IpcAdvancedStream(#[from] IpcAdvancedStreamError),
222    #[class(inherit)]
223    #[error(transparent)]
224    IpcJsonStream(#[from] IpcJsonStreamError),
225    #[class(inherit)]
226    #[error(transparent)]
227    Canceled(#[from] deno_core::Canceled),
228    #[class(inherit)]
229    #[error("failed to serialize json value: {0}")]
230    SerdeJson(serde_json::Error),
231    #[class(type)]
232    #[error("Failed to read header")]
233    ReadHeaderFailed,
234    #[class(type)]
235    #[error("Failed to read value")]
236    ReadValueFailed,
237  }
238
239  #[op2]
240  pub fn op_node_ipc_write_json<'a>(
241    scope: &mut v8::PinScope<'a, '_>,
242    state: Rc<RefCell<OpState>>,
243    #[smi] rid: ResourceId,
244    value: v8::Local<'a, v8::Value>,
245    // using an array as an "out parameter".
246    // index 0 is a boolean indicating whether the queue is under the limit.
247    //
248    // ideally we would just return `Result<(impl Future, bool), ..>`, but that's not
249    // supported by `op2` currently.
250    queue_ok: v8::Local<'a, v8::Array>,
251  ) -> Result<impl Future<Output = Result<(), io::Error>> + use<>, IpcError> {
252    let mut serialized = Vec::with_capacity(64);
253    let mut ser = serde_json::Serializer::new(&mut serialized);
254    serialize_v8_value(scope, value, &mut ser).map_err(IpcError::SerdeJson)?;
255    serialized.push(b'\n');
256
257    let stream = state
258      .borrow()
259      .resource_table
260      .get::<IpcJsonStreamResource>(rid)?;
261    let old = stream
262      .queued_bytes
263      .fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed);
264    if old + serialized.len() > 2 * INITIAL_CAPACITY {
265      // sending messages too fast
266      let v = false.to_v8(scope).unwrap(); // Infallible
267      queue_ok.set_index(scope, 0, v);
268    }
269    Ok(async move {
270      let cancel = stream.cancel.clone();
271      let result = stream
272        .clone()
273        .write_msg_bytes(&serialized)
274        .or_cancel(cancel)
275        .await;
276      // adjust count even on error
277      stream
278        .queued_bytes
279        .fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed);
280      result??;
281      Ok(())
282    })
283  }
284
285  pub struct AdvancedSerializerDelegate {
286    constants: AdvancedIpcConstants,
287  }
288
289  impl AdvancedSerializerDelegate {
290    fn new(constants: AdvancedIpcConstants) -> Self {
291      Self { constants }
292    }
293  }
294
295  const ARRAY_BUFFER_VIEW_TAG: u32 = 0;
296  const NOT_ARRAY_BUFFER_VIEW_TAG: u32 = 1;
297
298  fn ab_view_to_index<'s>(
299    scope: &mut v8::PinScope<'s, '_>,
300    view: v8::Local<'s, v8::ArrayBufferView>,
301    constants: &AdvancedIpcConstants,
302  ) -> Option<u32> {
303    if view.is_int8_array() {
304      Some(0)
305    } else if view.is_uint8_array() {
306      let constructor = view
307        .get(
308          scope,
309          v8::Local::new(scope, &constants.inner.constructor_key).into(),
310        )
311        .unwrap();
312      let buffer_constructor = v8::Local::<v8::Value>::from(v8::Local::new(
313        scope,
314        &constants.inner.buffer_constructor,
315      ));
316      if constructor == buffer_constructor {
317        Some(10)
318      } else {
319        Some(1)
320      }
321    } else if view.is_uint8_clamped_array() {
322      Some(2)
323    } else if view.is_int16_array() {
324      Some(3)
325    } else if view.is_uint16_array() {
326      Some(4)
327    } else if view.is_int32_array() {
328      Some(5)
329    } else if view.is_uint32_array() {
330      Some(6)
331    } else if view.is_float32_array() {
332      Some(7)
333    } else if view.is_float64_array() {
334      Some(8)
335    } else if view.is_data_view() {
336      Some(9)
337    } else if view.is_big_int64_array() {
338      Some(11)
339    } else if view.is_big_uint64_array() {
340      Some(12)
341    } else if view.is_float16_array() {
342      Some(13)
343    } else {
344      None
345    }
346  }
347
348  impl v8::ValueSerializerImpl for AdvancedSerializerDelegate {
349    fn throw_data_clone_error<'s>(
350      &self,
351      scope: &mut v8::PinScope<'s, '_>,
352      message: v8::Local<'s, v8::String>,
353    ) {
354      let error = v8::Exception::type_error(scope, message);
355      scope.throw_exception(error);
356    }
357
358    fn has_custom_host_object(&self, _isolate: &v8::Isolate) -> bool {
359      false
360    }
361
362    fn write_host_object<'s>(
363      &self,
364      scope: &mut v8::PinScope<'s, '_>,
365      object: v8::Local<'s, v8::Object>,
366      value_serializer: &dyn v8::ValueSerializerHelper,
367    ) -> Option<bool> {
368      if object.is_array_buffer_view() {
369        let ab_view = object.cast::<v8::ArrayBufferView>();
370        value_serializer.write_uint32(ARRAY_BUFFER_VIEW_TAG);
371        let Some(index) = ab_view_to_index(scope, ab_view, &self.constants)
372        else {
373          scope.throw_exception(v8::Exception::type_error(
374            scope,
375            v8::String::new_from_utf8(
376              scope,
377              format!("Unserializable host object: {}", object.type_repr())
378                .as_bytes(),
379              v8::NewStringType::Normal,
380            )
381            .unwrap(),
382          ));
383          return None;
384        };
385        value_serializer.write_uint32(index);
386        value_serializer.write_uint32(ab_view.byte_length() as u32);
387        let mut storage = [0u8; v8::TYPED_ARRAY_MAX_SIZE_IN_HEAP];
388        let slice = ab_view.get_contents(&mut storage);
389        value_serializer.write_raw_bytes(slice);
390        Some(true)
391      } else {
392        value_serializer.write_uint32(NOT_ARRAY_BUFFER_VIEW_TAG);
393        value_serializer
394          .write_value(scope.get_current_context(), object.into());
395        Some(true)
396      }
397    }
398
399    fn get_shared_array_buffer_id<'s>(
400      &self,
401      _scope: &mut v8::PinScope<'s, '_>,
402      _shared_array_buffer: v8::Local<'s, v8::SharedArrayBuffer>,
403    ) -> Option<u32> {
404      None
405    }
406  }
407
408  #[derive(Clone)]
409  struct AdvancedIpcConstants {
410    inner: Rc<AdvancedIpcConstantsInner>,
411  }
412  struct AdvancedIpcConstantsInner {
413    buffer_constructor: v8::Global<v8::Function>,
414    constructor_key: v8::Global<v8::String>,
415    fast_buffer_prototype: v8::Global<v8::Object>,
416  }
417
418  #[op2(fast)]
419  pub fn op_node_ipc_buffer_constructor(
420    scope: &mut v8::PinScope<'_, '_>,
421    state: &mut OpState,
422    buffer_constructor: v8::Local<'_, v8::Function>,
423    fast_buffer_prototype: v8::Local<'_, v8::Object>,
424  ) {
425    if state.has::<AdvancedIpcConstants>() {
426      return;
427    }
428    let constants = AdvancedIpcConstants {
429      inner: Rc::new(AdvancedIpcConstantsInner {
430        buffer_constructor: v8::Global::new(scope, buffer_constructor),
431        constructor_key: v8::Global::new(
432          scope,
433          v8::String::new_from_utf8(
434            scope,
435            b"constructor",
436            v8::NewStringType::Internalized,
437          )
438          .unwrap(),
439        ),
440        fast_buffer_prototype: v8::Global::new(scope, fast_buffer_prototype),
441      }),
442    };
443    state.put(constants);
444  }
445
446  #[op2]
447  pub fn op_node_ipc_write_advanced<'a>(
448    scope: &mut v8::PinScope<'a, '_>,
449    state: Rc<RefCell<OpState>>,
450    #[smi] rid: ResourceId,
451    value: v8::Local<'a, v8::Value>,
452    // using an array as an "out parameter".
453    // index 0 is a boolean indicating whether the queue is under the limit.
454    //
455    // ideally we would just return `Result<(impl Future, bool), ..>`, but that's not
456    // supported by `op2` currently.
457    queue_ok: v8::Local<'a, v8::Array>,
458  ) -> Result<impl Future<Output = Result<(), io::Error>> + use<>, IpcError> {
459    let constants = state.borrow().borrow::<AdvancedIpcConstants>().clone();
460    let serializer = AdvancedSerializer::new(scope, constants);
461    let serialized = serializer.serialize(scope, value)?;
462
463    let stream = state
464      .borrow()
465      .resource_table
466      .get::<IpcAdvancedStreamResource>(rid)?;
467    let old = stream
468      .queued_bytes
469      .fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed);
470    if old + serialized.len() > 2 * INITIAL_CAPACITY {
471      // sending messages too fast
472      let Ok(v) = false.to_v8(scope);
473      queue_ok.set_index(scope, 0, v);
474    }
475    Ok(async move {
476      let cancel = stream.cancel.clone();
477      let result = stream
478        .clone()
479        .write_msg_bytes(&serialized)
480        .or_cancel(cancel)
481        .await;
482      // adjust count even on error
483      stream
484        .queued_bytes
485        .fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed);
486      result??;
487      Ok(())
488    })
489  }
490
491  struct AdvancedSerializer {
492    inner: v8::ValueSerializer<'static>,
493  }
494
495  impl AdvancedSerializer {
496    fn new(
497      scope: &mut v8::PinScope<'_, '_>,
498      constants: AdvancedIpcConstants,
499    ) -> Self {
500      let inner = v8::ValueSerializer::new(
501        scope,
502        Box::new(AdvancedSerializerDelegate::new(constants)),
503      );
504      inner.set_treat_array_buffer_views_as_host_objects(true);
505      Self { inner }
506    }
507
508    fn serialize<'s, 'i>(
509      &self,
510      scope: &mut v8::PinScope<'s, 'i>,
511      value: v8::Local<'s, v8::Value>,
512    ) -> Result<Vec<u8>, IpcError> {
513      self.inner.write_raw_bytes(&[0, 0, 0, 0]);
514      self.inner.write_header();
515      let context = scope.get_current_context();
516      self.inner.write_value(context, value);
517      let mut ser = self.inner.release();
518      let length = ser.len() - 4;
519      ser[0] = ((length >> 24) & 0xFF) as u8;
520      ser[1] = ((length >> 16) & 0xFF) as u8;
521      ser[2] = ((length >> 8) & 0xFF) as u8;
522      ser[3] = (length & 0xFF) as u8;
523      Ok(ser)
524    }
525  }
526
527  struct AdvancedIpcDeserializer {
528    inner: v8::ValueDeserializer<'static>,
529  }
530
531  struct AdvancedIpcDeserializerDelegate {
532    constants: AdvancedIpcConstants,
533  }
534
535  impl v8::ValueDeserializerImpl for AdvancedIpcDeserializerDelegate {
536    fn read_host_object<'s>(
537      &self,
538      scope: &mut v8::PinScope<'s, '_>,
539      deser: &dyn ValueDeserializerHelper,
540    ) -> Option<v8::Local<'s, v8::Object>> {
541      let throw_error = |message: &str| {
542        scope.throw_exception(v8::Exception::type_error(
543          scope,
544          v8::String::new_from_utf8(
545            scope,
546            message.as_bytes(),
547            v8::NewStringType::Normal,
548          )
549          .unwrap(),
550        ));
551        None
552      };
553      let mut tag = 0;
554      if !deser.read_uint32(&mut tag) {
555        return throw_error("Failed to read tag");
556      }
557      match tag {
558        ARRAY_BUFFER_VIEW_TAG => {
559          let mut index = 0;
560          if !deser.read_uint32(&mut index) {
561            return throw_error("Failed to read array buffer view type tag");
562          }
563          let mut byte_length = 0;
564          if !deser.read_uint32(&mut byte_length) {
565            return throw_error("Failed to read byte length");
566          }
567          let Some(buf) = deser.read_raw_bytes(byte_length as usize) else {
568            return throw_error("failed to read bytes for typed array");
569          };
570
571          let array_buffer = v8::ArrayBuffer::new(scope, byte_length as usize);
572          // SAFETY: array_buffer is valid as v8 is keeping it alive, and is byte_length bytes
573          // buf is also byte_length bytes long
574          unsafe {
575            std::ptr::copy(
576              buf.as_ptr(),
577              array_buffer.data().unwrap().as_ptr().cast::<u8>(),
578              byte_length as usize,
579            );
580          }
581
582          let value = match index {
583            0 => {
584              v8::Int8Array::new(scope, array_buffer, 0, byte_length as usize)
585                .unwrap()
586                .into()
587            }
588            1 => {
589              v8::Uint8Array::new(scope, array_buffer, 0, byte_length as usize)
590                .unwrap()
591                .into()
592            }
593            10 => {
594              let obj: v8::Local<v8::Object> = v8::Uint8Array::new(
595                scope,
596                array_buffer,
597                0,
598                byte_length as usize,
599              )?
600              .into();
601              let fast_proto = v8::Local::new(
602                scope,
603                &self.constants.inner.fast_buffer_prototype,
604              );
605              obj.set_prototype(scope, fast_proto.into());
606              obj
607            }
608            2 => v8::Uint8ClampedArray::new(
609              scope,
610              array_buffer,
611              0,
612              byte_length as usize,
613            )?
614            .into(),
615            3 => v8::Int16Array::new(
616              scope,
617              array_buffer,
618              0,
619              byte_length as usize / 2,
620            )?
621            .into(),
622            4 => v8::Uint16Array::new(
623              scope,
624              array_buffer,
625              0,
626              byte_length as usize / 2,
627            )?
628            .into(),
629            5 => v8::Int32Array::new(
630              scope,
631              array_buffer,
632              0,
633              byte_length as usize / 4,
634            )?
635            .into(),
636            6 => v8::Uint32Array::new(
637              scope,
638              array_buffer,
639              0,
640              byte_length as usize / 4,
641            )?
642            .into(),
643            7 => v8::Float32Array::new(
644              scope,
645              array_buffer,
646              0,
647              byte_length as usize / 4,
648            )
649            .unwrap()
650            .into(),
651            8 => v8::Float64Array::new(
652              scope,
653              array_buffer,
654              0,
655              byte_length as usize / 8,
656            )?
657            .into(),
658            9 => {
659              v8::DataView::new(scope, array_buffer, 0, byte_length as usize)
660                .into()
661            }
662            11 => v8::BigInt64Array::new(
663              scope,
664              array_buffer,
665              0,
666              byte_length as usize / 8,
667            )?
668            .into(),
669            12 => v8::BigUint64Array::new(
670              scope,
671              array_buffer,
672              0,
673              byte_length as usize / 8,
674            )?
675            .into(),
676            // TODO(nathanwhit): this should just be `into()`, but I forgot to impl it in rusty_v8.
677            // the underlying impl is just a transmute though.
678            // SAFETY: float16array is an object
679            13 => unsafe {
680              std::mem::transmute::<
681                v8::Local<v8::Float16Array>,
682                v8::Local<v8::Object>,
683              >(v8::Float16Array::new(
684                scope,
685                array_buffer,
686                0,
687                byte_length as usize / 2,
688              )?)
689            },
690            _ => return None,
691          };
692          Some(value)
693        }
694        NOT_ARRAY_BUFFER_VIEW_TAG => {
695          let value = deser.read_value(scope.get_current_context());
696          Some(value.unwrap_or_else(|| v8::null(scope).into()).cast())
697        }
698        _ => {
699          throw_error(&format!("Invalid tag: {}", tag));
700          None
701        }
702      }
703    }
704  }
705
706  impl AdvancedIpcDeserializer {
707    fn new(
708      scope: &mut v8::PinScope<'_, '_>,
709      constants: AdvancedIpcConstants,
710      msg_bytes: &[u8],
711    ) -> Self {
712      let inner = v8::ValueDeserializer::new(
713        scope,
714        Box::new(AdvancedIpcDeserializerDelegate { constants }),
715        msg_bytes,
716      );
717      Self { inner }
718    }
719  }
720
721  struct AdvancedIpcReadResult {
722    msg_bytes: Option<Vec<u8>>,
723    constants: AdvancedIpcConstants,
724  }
725
726  fn make_stop_sentinel<'s>(
727    scope: &mut v8::PinScope<'s, '_>,
728  ) -> v8::Local<'s, v8::Value> {
729    let obj = v8::Object::new(scope);
730    obj.set(
731      scope,
732      v8::String::new_from_utf8(scope, b"cmd", v8::NewStringType::Internalized)
733        .unwrap()
734        .into(),
735      v8::String::new_from_utf8(
736        scope,
737        b"NODE_CLOSE",
738        v8::NewStringType::Internalized,
739      )
740      .unwrap()
741      .into(),
742    );
743    obj.into()
744  }
745
746  impl<'a> deno_core::ToV8<'a> for AdvancedIpcReadResult {
747    type Error = IpcError;
748    fn to_v8(
749      self,
750      scope: &mut v8::PinScope<'a, '_>,
751    ) -> Result<v8::Local<'a, v8::Value>, Self::Error> {
752      let Some(msg_bytes) = self.msg_bytes else {
753        return Ok(make_stop_sentinel(scope));
754      };
755      let deser =
756        AdvancedIpcDeserializer::new(scope, self.constants, &msg_bytes);
757      let context = scope.get_current_context();
758      let header_success = deser.inner.read_header(context).unwrap_or(false);
759      if !header_success {
760        return Err(IpcError::ReadHeaderFailed);
761      }
762      let Some(value) = deser.inner.read_value(context) else {
763        return Err(IpcError::ReadValueFailed);
764      };
765      Ok(value)
766    }
767  }
768
769  #[op2]
770  pub async fn op_node_ipc_read_advanced(
771    state: Rc<RefCell<OpState>>,
772    #[smi] rid: ResourceId,
773  ) -> Result<AdvancedIpcReadResult, IpcError> {
774    let stream = state
775      .borrow()
776      .resource_table
777      .get::<IpcAdvancedStreamResource>(rid)?;
778    let cancel = stream.cancel.clone();
779    let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
780    let msg_bytes = stream.read_msg_bytes().or_cancel(cancel).await??;
781
782    Ok(AdvancedIpcReadResult {
783      msg_bytes,
784      constants: state.borrow().borrow::<AdvancedIpcConstants>().clone(),
785    })
786  }
787
788  /// Value signaling that the other end ipc channel has closed.
789  ///
790  /// Node reserves objects of this form (`{ "cmd": "NODE_<something>"`)
791  /// for internal use, so we use it here as well to avoid breaking anyone.
792  fn stop_sentinel() -> serde_json::Value {
793    serde_json::json!({
794      "cmd": "NODE_CLOSE"
795    })
796  }
797
798  #[op2]
799  #[serde]
800  pub async fn op_node_ipc_read_json(
801    state: Rc<RefCell<OpState>>,
802    #[smi] rid: ResourceId,
803  ) -> Result<serde_json::Value, IpcError> {
804    let stream = state
805      .borrow()
806      .resource_table
807      .get::<IpcJsonStreamResource>(rid)?;
808
809    let cancel = stream.cancel.clone();
810    let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
811    let msgs = stream.read_msg().or_cancel(cancel).await??;
812    if let Some(msg) = msgs {
813      Ok(msg)
814    } else {
815      Ok(stop_sentinel())
816    }
817  }
818
819  #[op2(fast)]
820  pub fn op_node_ipc_ref(
821    state: &mut OpState,
822    #[smi] rid: ResourceId,
823    serialization_json: bool,
824  ) {
825    if serialization_json {
826      let stream = state
827        .resource_table
828        .get::<IpcJsonStreamResource>(rid)
829        .expect("Invalid resource ID");
830      stream.ref_tracker.ref_();
831    } else {
832      let stream = state
833        .resource_table
834        .get::<IpcAdvancedStreamResource>(rid)
835        .expect("Invalid resource ID");
836      stream.ref_tracker.ref_();
837    }
838  }
839
840  #[op2(fast)]
841  pub fn op_node_ipc_unref(
842    state: &mut OpState,
843    #[smi] rid: ResourceId,
844    serialization_json: bool,
845  ) {
846    if serialization_json {
847      let stream = state
848        .resource_table
849        .get::<IpcJsonStreamResource>(rid)
850        .expect("Invalid resource ID");
851      stream.ref_tracker.unref();
852    } else {
853      let stream = state
854        .resource_table
855        .get::<IpcAdvancedStreamResource>(rid)
856        .expect("Invalid resource ID");
857      stream.ref_tracker.unref();
858    }
859  }
860
861  #[cfg(test)]
862  mod tests {
863    use deno_core::JsRuntime;
864    use deno_core::RuntimeOptions;
865    use deno_core::v8;
866
867    fn wrap_expr(s: &str) -> String {
868      format!("(function () {{ return {s}; }})()")
869    }
870
871    fn serialize_js_to_json(runtime: &mut JsRuntime, js: String) -> String {
872      let val = runtime.execute_script("", js).unwrap();
873      deno_core::scope!(scope, runtime);
874      let val = v8::Local::new(scope, val);
875      let mut buf = Vec::new();
876      let mut ser = deno_core::serde_json::Serializer::new(&mut buf);
877      super::serialize_v8_value(scope, val, &mut ser).unwrap();
878      String::from_utf8(buf).unwrap()
879    }
880
881    #[test]
882    fn ipc_serialization() {
883      let mut runtime = JsRuntime::new(RuntimeOptions::default());
884
885      let cases = [
886        ("'hello'", "\"hello\""),
887        ("1", "1"),
888        ("1.5", "1.5"),
889        ("Number.NaN", "null"),
890        ("Infinity", "null"),
891        ("Number.MAX_SAFE_INTEGER", &(2i64.pow(53) - 1).to_string()),
892        (
893          "Number.MIN_SAFE_INTEGER",
894          &(-(2i64.pow(53) - 1)).to_string(),
895        ),
896        ("[1, 2, 3]", "[1,2,3]"),
897        ("new Uint8Array([1,2,3])", "[1,2,3]"),
898        (
899          "{ a: 1.5, b: { c: new ArrayBuffer(5) }}",
900          r#"{"a":1.5,"b":{"c":{}}}"#,
901        ),
902        ("new Number(1)", "1"),
903        ("new Boolean(true)", "true"),
904        ("true", "true"),
905        (r#"new String("foo")"#, "\"foo\""),
906        ("null", "null"),
907        (
908          r#"{ a: "field", toJSON() { return "custom"; } }"#,
909          "\"custom\"",
910        ),
911        (r#"{ a: undefined, b: 1 }"#, "{\"b\":1}"),
912      ];
913
914      for (input, expect) in cases {
915        let js = wrap_expr(input);
916        let actual = serialize_js_to_json(&mut runtime, js);
917        assert_eq!(actual, expect);
918      }
919    }
920  }
921}