deno_node/ops/
ipc.rs

1// Copyright 2018-2025 the Deno authors. MIT license.
2
3pub use impl_::*;
4
5pub struct ChildPipeFd(pub i64);
6
7mod impl_ {
8  use std::cell::RefCell;
9  use std::future::Future;
10  use std::io;
11  use std::rc::Rc;
12
13  use deno_core::CancelFuture;
14  use deno_core::OpState;
15  use deno_core::RcRef;
16  use deno_core::ResourceId;
17  use deno_core::ToV8;
18  use deno_core::op2;
19  use deno_core::serde;
20  use deno_core::serde::Serializer;
21  use deno_core::serde_json;
22  use deno_core::v8;
23  use deno_error::JsErrorBox;
24  pub use deno_process::ipc::INITIAL_CAPACITY;
25  use deno_process::ipc::IpcJsonStreamError;
26  pub use deno_process::ipc::IpcJsonStreamResource;
27  pub use deno_process::ipc::IpcRefTracker;
28  use serde::Serialize;
29
30  /// Wrapper around v8 value that implements Serialize.
31  struct SerializeWrapper<'a, 'b, 'c>(
32    RefCell<&'b mut v8::PinScope<'a, 'c>>,
33    v8::Local<'a, v8::Value>,
34  );
35
36  impl Serialize for SerializeWrapper<'_, '_, '_> {
37    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
38    where
39      S: Serializer,
40    {
41      serialize_v8_value(*self.0.borrow_mut(), self.1, serializer)
42    }
43  }
44
45  /// Serialize a v8 value directly into a serde serializer.
46  /// This allows us to go from v8 values to JSON without having to
47  /// deserialize into a `serde_json::Value` and then reserialize to JSON
48  fn serialize_v8_value<'a, S: Serializer>(
49    scope: &mut v8::PinScope<'a, '_>,
50    value: v8::Local<'a, v8::Value>,
51    ser: S,
52  ) -> Result<S::Ok, S::Error> {
53    use serde::ser::Error;
54    if value.is_null_or_undefined() {
55      ser.serialize_unit()
56    } else if value.is_number() || value.is_number_object() {
57      let num_value = value.number_value(scope).unwrap();
58      if (num_value as i64 as f64) == num_value {
59        ser.serialize_i64(num_value as i64)
60      } else {
61        ser.serialize_f64(num_value)
62      }
63    } else if value.is_string() {
64      let str = deno_core::serde_v8::to_utf8(value.try_into().unwrap(), scope);
65      ser.serialize_str(&str)
66    } else if value.is_string_object() {
67      let str = deno_core::serde_v8::to_utf8(
68        value.to_string(scope).ok_or_else(|| {
69          S::Error::custom(deno_error::JsErrorBox::generic(
70            "toString on string object failed",
71          ))
72        })?,
73        scope,
74      );
75      ser.serialize_str(&str)
76    } else if value.is_boolean() {
77      ser.serialize_bool(value.is_true())
78    } else if value.is_boolean_object() {
79      ser.serialize_bool(value.boolean_value(scope))
80    } else if value.is_array() {
81      use serde::ser::SerializeSeq;
82      let array = value.cast::<v8::Array>();
83      let length = array.length();
84      let mut seq = ser.serialize_seq(Some(length as usize))?;
85      for i in 0..length {
86        let element = array.get_index(scope, i).unwrap();
87        seq
88          .serialize_element(&SerializeWrapper(RefCell::new(scope), element))?;
89      }
90      seq.end()
91    } else if value.is_object() {
92      use serde::ser::SerializeMap;
93      if value.is_array_buffer_view() {
94        let buffer = value.cast::<v8::ArrayBufferView>();
95        let mut buf = vec![0u8; buffer.byte_length()];
96        let copied = buffer.copy_contents(&mut buf);
97        debug_assert_eq!(copied, buf.len());
98        return ser.serialize_bytes(&buf);
99      }
100      let object = value.cast::<v8::Object>();
101      // node uses `JSON.stringify`, so to match its behavior (and allow serializing custom objects)
102      // we need to respect the `toJSON` method if it exists.
103      let to_json_key = v8::String::new_from_utf8(
104        scope,
105        b"toJSON",
106        v8::NewStringType::Internalized,
107      )
108      .unwrap()
109      .into();
110      if let Some(to_json) = object.get(scope, to_json_key)
111        && let Ok(to_json) = to_json.try_cast::<v8::Function>()
112      {
113        let json_value = to_json.call(scope, object.into(), &[]).unwrap();
114        return serialize_v8_value(scope, json_value, ser);
115      }
116
117      let keys = object
118        .get_own_property_names(
119          scope,
120          v8::GetPropertyNamesArgs {
121            ..Default::default()
122          },
123        )
124        .unwrap();
125      let num_keys = keys.length();
126      let mut map = ser.serialize_map(Some(num_keys as usize))?;
127      for i in 0..num_keys {
128        let key = keys.get_index(scope, i).unwrap();
129        let key_str = key.to_rust_string_lossy(scope);
130        let value = object.get(scope, key).unwrap();
131        if value.is_undefined() {
132          continue;
133        }
134        map.serialize_entry(
135          &key_str,
136          &SerializeWrapper(RefCell::new(scope), value),
137        )?;
138      }
139      map.end()
140    } else {
141      // TODO(nathanwhit): better error message
142      Err(S::Error::custom(JsErrorBox::type_error(format!(
143        "Unsupported type: {}",
144        value.type_repr()
145      ))))
146    }
147  }
148
149  // Open IPC pipe from bootstrap options.
150  #[op2]
151  #[smi]
152  pub fn op_node_child_ipc_pipe(
153    state: &mut OpState,
154  ) -> Result<Option<ResourceId>, io::Error> {
155    let fd = match state.try_borrow_mut::<crate::ChildPipeFd>() {
156      Some(child_pipe_fd) => child_pipe_fd.0,
157      None => return Ok(None),
158    };
159    let ref_tracker = IpcRefTracker::new(state.external_ops_tracker.clone());
160    Ok(Some(
161      state
162        .resource_table
163        .add(IpcJsonStreamResource::new(fd, ref_tracker)?),
164    ))
165  }
166
167  #[derive(Debug, thiserror::Error, deno_error::JsError)]
168  pub enum IpcError {
169    #[class(inherit)]
170    #[error(transparent)]
171    Resource(#[from] deno_core::error::ResourceError),
172    #[class(inherit)]
173    #[error(transparent)]
174    IpcJsonStream(#[from] IpcJsonStreamError),
175    #[class(inherit)]
176    #[error(transparent)]
177    Canceled(#[from] deno_core::Canceled),
178    #[class(inherit)]
179    #[error("failed to serialize json value: {0}")]
180    SerdeJson(serde_json::Error),
181  }
182
183  #[op2(async)]
184  pub fn op_node_ipc_write<'a>(
185    scope: &mut v8::PinScope<'a, '_>,
186    state: Rc<RefCell<OpState>>,
187    #[smi] rid: ResourceId,
188    value: v8::Local<'a, v8::Value>,
189    // using an array as an "out parameter".
190    // index 0 is a boolean indicating whether the queue is under the limit.
191    //
192    // ideally we would just return `Result<(impl Future, bool), ..>`, but that's not
193    // supported by `op2` currently.
194    queue_ok: v8::Local<'a, v8::Array>,
195  ) -> Result<impl Future<Output = Result<(), io::Error>> + use<>, IpcError> {
196    let mut serialized = Vec::with_capacity(64);
197    let mut ser = serde_json::Serializer::new(&mut serialized);
198    serialize_v8_value(scope, value, &mut ser).map_err(IpcError::SerdeJson)?;
199    serialized.push(b'\n');
200
201    let stream = state
202      .borrow()
203      .resource_table
204      .get::<IpcJsonStreamResource>(rid)?;
205    let old = stream
206      .queued_bytes
207      .fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed);
208    if old + serialized.len() > 2 * INITIAL_CAPACITY {
209      // sending messages too fast
210      let v = false.to_v8(scope).unwrap(); // Infallible
211      queue_ok.set_index(scope, 0, v);
212    }
213    Ok(async move {
214      let cancel = stream.cancel.clone();
215      let result = stream
216        .clone()
217        .write_msg_bytes(&serialized)
218        .or_cancel(cancel)
219        .await;
220      // adjust count even on error
221      stream
222        .queued_bytes
223        .fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed);
224      result??;
225      Ok(())
226    })
227  }
228
229  /// Value signaling that the other end ipc channel has closed.
230  ///
231  /// Node reserves objects of this form (`{ "cmd": "NODE_<something>"`)
232  /// for internal use, so we use it here as well to avoid breaking anyone.
233  fn stop_sentinel() -> serde_json::Value {
234    serde_json::json!({
235      "cmd": "NODE_CLOSE"
236    })
237  }
238
239  #[op2(async)]
240  #[serde]
241  pub async fn op_node_ipc_read(
242    state: Rc<RefCell<OpState>>,
243    #[smi] rid: ResourceId,
244  ) -> Result<serde_json::Value, IpcError> {
245    let stream = state
246      .borrow()
247      .resource_table
248      .get::<IpcJsonStreamResource>(rid)?;
249
250    let cancel = stream.cancel.clone();
251    let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
252    let msgs = stream.read_msg().or_cancel(cancel).await??;
253    if let Some(msg) = msgs {
254      Ok(msg)
255    } else {
256      Ok(stop_sentinel())
257    }
258  }
259
260  #[op2(fast)]
261  pub fn op_node_ipc_ref(state: &mut OpState, #[smi] rid: ResourceId) {
262    let stream = state
263      .resource_table
264      .get::<IpcJsonStreamResource>(rid)
265      .expect("Invalid resource ID");
266    stream.ref_tracker.ref_();
267  }
268
269  #[op2(fast)]
270  pub fn op_node_ipc_unref(state: &mut OpState, #[smi] rid: ResourceId) {
271    let stream = state
272      .resource_table
273      .get::<IpcJsonStreamResource>(rid)
274      .expect("Invalid resource ID");
275    stream.ref_tracker.unref();
276  }
277
278  #[cfg(test)]
279  mod tests {
280    use deno_core::JsRuntime;
281    use deno_core::RuntimeOptions;
282    use deno_core::v8;
283
284    fn wrap_expr(s: &str) -> String {
285      format!("(function () {{ return {s}; }})()")
286    }
287
288    fn serialize_js_to_json(runtime: &mut JsRuntime, js: String) -> String {
289      let val = runtime.execute_script("", js).unwrap();
290      deno_core::scope!(scope, runtime);
291      let val = v8::Local::new(scope, val);
292      let mut buf = Vec::new();
293      let mut ser = deno_core::serde_json::Serializer::new(&mut buf);
294      super::serialize_v8_value(scope, val, &mut ser).unwrap();
295      String::from_utf8(buf).unwrap()
296    }
297
298    #[test]
299    fn ipc_serialization() {
300      let mut runtime = JsRuntime::new(RuntimeOptions::default());
301
302      let cases = [
303        ("'hello'", "\"hello\""),
304        ("1", "1"),
305        ("1.5", "1.5"),
306        ("Number.NaN", "null"),
307        ("Infinity", "null"),
308        ("Number.MAX_SAFE_INTEGER", &(2i64.pow(53) - 1).to_string()),
309        (
310          "Number.MIN_SAFE_INTEGER",
311          &(-(2i64.pow(53) - 1)).to_string(),
312        ),
313        ("[1, 2, 3]", "[1,2,3]"),
314        ("new Uint8Array([1,2,3])", "[1,2,3]"),
315        (
316          "{ a: 1.5, b: { c: new ArrayBuffer(5) }}",
317          r#"{"a":1.5,"b":{"c":{}}}"#,
318        ),
319        ("new Number(1)", "1"),
320        ("new Boolean(true)", "true"),
321        ("true", "true"),
322        (r#"new String("foo")"#, "\"foo\""),
323        ("null", "null"),
324        (
325          r#"{ a: "field", toJSON() { return "custom"; } }"#,
326          "\"custom\"",
327        ),
328        (r#"{ a: undefined, b: 1 }"#, "{\"b\":1}"),
329      ];
330
331      for (input, expect) in cases {
332        let js = wrap_expr(input);
333        let actual = serialize_js_to_json(&mut runtime, js);
334        assert_eq!(actual, expect);
335      }
336    }
337  }
338}