1pub use impl_::*;
4
5pub struct ChildPipeFd(pub i64);
6
7mod impl_ {
8 use std::cell::RefCell;
9 use std::future::Future;
10 use std::io;
11 use std::rc::Rc;
12
13 use deno_core::CancelFuture;
14 use deno_core::OpState;
15 use deno_core::RcRef;
16 use deno_core::ResourceId;
17 use deno_core::ToV8;
18 use deno_core::op2;
19 use deno_core::serde;
20 use deno_core::serde::Serializer;
21 use deno_core::serde_json;
22 use deno_core::v8;
23 use deno_error::JsErrorBox;
24 pub use deno_process::ipc::INITIAL_CAPACITY;
25 use deno_process::ipc::IpcJsonStreamError;
26 pub use deno_process::ipc::IpcJsonStreamResource;
27 pub use deno_process::ipc::IpcRefTracker;
28 use serde::Serialize;
29
30 struct SerializeWrapper<'a, 'b, 'c>(
32 RefCell<&'b mut v8::PinScope<'a, 'c>>,
33 v8::Local<'a, v8::Value>,
34 );
35
36 impl Serialize for SerializeWrapper<'_, '_, '_> {
37 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
38 where
39 S: Serializer,
40 {
41 serialize_v8_value(*self.0.borrow_mut(), self.1, serializer)
42 }
43 }
44
45 fn serialize_v8_value<'a, S: Serializer>(
49 scope: &mut v8::PinScope<'a, '_>,
50 value: v8::Local<'a, v8::Value>,
51 ser: S,
52 ) -> Result<S::Ok, S::Error> {
53 use serde::ser::Error;
54 if value.is_null_or_undefined() {
55 ser.serialize_unit()
56 } else if value.is_number() || value.is_number_object() {
57 let num_value = value.number_value(scope).unwrap();
58 if (num_value as i64 as f64) == num_value {
59 ser.serialize_i64(num_value as i64)
60 } else {
61 ser.serialize_f64(num_value)
62 }
63 } else if value.is_string() {
64 let str = deno_core::serde_v8::to_utf8(value.try_into().unwrap(), scope);
65 ser.serialize_str(&str)
66 } else if value.is_string_object() {
67 let str = deno_core::serde_v8::to_utf8(
68 value.to_string(scope).ok_or_else(|| {
69 S::Error::custom(deno_error::JsErrorBox::generic(
70 "toString on string object failed",
71 ))
72 })?,
73 scope,
74 );
75 ser.serialize_str(&str)
76 } else if value.is_boolean() {
77 ser.serialize_bool(value.is_true())
78 } else if value.is_boolean_object() {
79 ser.serialize_bool(value.boolean_value(scope))
80 } else if value.is_array() {
81 use serde::ser::SerializeSeq;
82 let array = value.cast::<v8::Array>();
83 let length = array.length();
84 let mut seq = ser.serialize_seq(Some(length as usize))?;
85 for i in 0..length {
86 let element = array.get_index(scope, i).unwrap();
87 seq
88 .serialize_element(&SerializeWrapper(RefCell::new(scope), element))?;
89 }
90 seq.end()
91 } else if value.is_object() {
92 use serde::ser::SerializeMap;
93 if value.is_array_buffer_view() {
94 let buffer = value.cast::<v8::ArrayBufferView>();
95 let mut buf = vec![0u8; buffer.byte_length()];
96 let copied = buffer.copy_contents(&mut buf);
97 debug_assert_eq!(copied, buf.len());
98 return ser.serialize_bytes(&buf);
99 }
100 let object = value.cast::<v8::Object>();
101 let to_json_key = v8::String::new_from_utf8(
104 scope,
105 b"toJSON",
106 v8::NewStringType::Internalized,
107 )
108 .unwrap()
109 .into();
110 if let Some(to_json) = object.get(scope, to_json_key)
111 && let Ok(to_json) = to_json.try_cast::<v8::Function>()
112 {
113 let json_value = to_json.call(scope, object.into(), &[]).unwrap();
114 return serialize_v8_value(scope, json_value, ser);
115 }
116
117 let keys = object
118 .get_own_property_names(
119 scope,
120 v8::GetPropertyNamesArgs {
121 ..Default::default()
122 },
123 )
124 .unwrap();
125 let num_keys = keys.length();
126 let mut map = ser.serialize_map(Some(num_keys as usize))?;
127 for i in 0..num_keys {
128 let key = keys.get_index(scope, i).unwrap();
129 let key_str = key.to_rust_string_lossy(scope);
130 let value = object.get(scope, key).unwrap();
131 if value.is_undefined() {
132 continue;
133 }
134 map.serialize_entry(
135 &key_str,
136 &SerializeWrapper(RefCell::new(scope), value),
137 )?;
138 }
139 map.end()
140 } else {
141 Err(S::Error::custom(JsErrorBox::type_error(format!(
143 "Unsupported type: {}",
144 value.type_repr()
145 ))))
146 }
147 }
148
149 #[op2]
151 #[smi]
152 pub fn op_node_child_ipc_pipe(
153 state: &mut OpState,
154 ) -> Result<Option<ResourceId>, io::Error> {
155 let fd = match state.try_borrow_mut::<crate::ChildPipeFd>() {
156 Some(child_pipe_fd) => child_pipe_fd.0,
157 None => return Ok(None),
158 };
159 let ref_tracker = IpcRefTracker::new(state.external_ops_tracker.clone());
160 Ok(Some(
161 state
162 .resource_table
163 .add(IpcJsonStreamResource::new(fd, ref_tracker)?),
164 ))
165 }
166
167 #[derive(Debug, thiserror::Error, deno_error::JsError)]
168 pub enum IpcError {
169 #[class(inherit)]
170 #[error(transparent)]
171 Resource(#[from] deno_core::error::ResourceError),
172 #[class(inherit)]
173 #[error(transparent)]
174 IpcJsonStream(#[from] IpcJsonStreamError),
175 #[class(inherit)]
176 #[error(transparent)]
177 Canceled(#[from] deno_core::Canceled),
178 #[class(inherit)]
179 #[error("failed to serialize json value: {0}")]
180 SerdeJson(serde_json::Error),
181 }
182
183 #[op2(async)]
184 pub fn op_node_ipc_write<'a>(
185 scope: &mut v8::PinScope<'a, '_>,
186 state: Rc<RefCell<OpState>>,
187 #[smi] rid: ResourceId,
188 value: v8::Local<'a, v8::Value>,
189 queue_ok: v8::Local<'a, v8::Array>,
195 ) -> Result<impl Future<Output = Result<(), io::Error>> + use<>, IpcError> {
196 let mut serialized = Vec::with_capacity(64);
197 let mut ser = serde_json::Serializer::new(&mut serialized);
198 serialize_v8_value(scope, value, &mut ser).map_err(IpcError::SerdeJson)?;
199 serialized.push(b'\n');
200
201 let stream = state
202 .borrow()
203 .resource_table
204 .get::<IpcJsonStreamResource>(rid)?;
205 let old = stream
206 .queued_bytes
207 .fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed);
208 if old + serialized.len() > 2 * INITIAL_CAPACITY {
209 let v = false.to_v8(scope).unwrap(); queue_ok.set_index(scope, 0, v);
212 }
213 Ok(async move {
214 let cancel = stream.cancel.clone();
215 let result = stream
216 .clone()
217 .write_msg_bytes(&serialized)
218 .or_cancel(cancel)
219 .await;
220 stream
222 .queued_bytes
223 .fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed);
224 result??;
225 Ok(())
226 })
227 }
228
229 fn stop_sentinel() -> serde_json::Value {
234 serde_json::json!({
235 "cmd": "NODE_CLOSE"
236 })
237 }
238
239 #[op2(async)]
240 #[serde]
241 pub async fn op_node_ipc_read(
242 state: Rc<RefCell<OpState>>,
243 #[smi] rid: ResourceId,
244 ) -> Result<serde_json::Value, IpcError> {
245 let stream = state
246 .borrow()
247 .resource_table
248 .get::<IpcJsonStreamResource>(rid)?;
249
250 let cancel = stream.cancel.clone();
251 let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
252 let msgs = stream.read_msg().or_cancel(cancel).await??;
253 if let Some(msg) = msgs {
254 Ok(msg)
255 } else {
256 Ok(stop_sentinel())
257 }
258 }
259
260 #[op2(fast)]
261 pub fn op_node_ipc_ref(state: &mut OpState, #[smi] rid: ResourceId) {
262 let stream = state
263 .resource_table
264 .get::<IpcJsonStreamResource>(rid)
265 .expect("Invalid resource ID");
266 stream.ref_tracker.ref_();
267 }
268
269 #[op2(fast)]
270 pub fn op_node_ipc_unref(state: &mut OpState, #[smi] rid: ResourceId) {
271 let stream = state
272 .resource_table
273 .get::<IpcJsonStreamResource>(rid)
274 .expect("Invalid resource ID");
275 stream.ref_tracker.unref();
276 }
277
278 #[cfg(test)]
279 mod tests {
280 use deno_core::JsRuntime;
281 use deno_core::RuntimeOptions;
282 use deno_core::v8;
283
284 fn wrap_expr(s: &str) -> String {
285 format!("(function () {{ return {s}; }})()")
286 }
287
288 fn serialize_js_to_json(runtime: &mut JsRuntime, js: String) -> String {
289 let val = runtime.execute_script("", js).unwrap();
290 deno_core::scope!(scope, runtime);
291 let val = v8::Local::new(scope, val);
292 let mut buf = Vec::new();
293 let mut ser = deno_core::serde_json::Serializer::new(&mut buf);
294 super::serialize_v8_value(scope, val, &mut ser).unwrap();
295 String::from_utf8(buf).unwrap()
296 }
297
298 #[test]
299 fn ipc_serialization() {
300 let mut runtime = JsRuntime::new(RuntimeOptions::default());
301
302 let cases = [
303 ("'hello'", "\"hello\""),
304 ("1", "1"),
305 ("1.5", "1.5"),
306 ("Number.NaN", "null"),
307 ("Infinity", "null"),
308 ("Number.MAX_SAFE_INTEGER", &(2i64.pow(53) - 1).to_string()),
309 (
310 "Number.MIN_SAFE_INTEGER",
311 &(-(2i64.pow(53) - 1)).to_string(),
312 ),
313 ("[1, 2, 3]", "[1,2,3]"),
314 ("new Uint8Array([1,2,3])", "[1,2,3]"),
315 (
316 "{ a: 1.5, b: { c: new ArrayBuffer(5) }}",
317 r#"{"a":1.5,"b":{"c":{}}}"#,
318 ),
319 ("new Number(1)", "1"),
320 ("new Boolean(true)", "true"),
321 ("true", "true"),
322 (r#"new String("foo")"#, "\"foo\""),
323 ("null", "null"),
324 (
325 r#"{ a: "field", toJSON() { return "custom"; } }"#,
326 "\"custom\"",
327 ),
328 (r#"{ a: undefined, b: 1 }"#, "{\"b\":1}"),
329 ];
330
331 for (input, expect) in cases {
332 let js = wrap_expr(input);
333 let actual = serialize_js_to_json(&mut runtime, js);
334 assert_eq!(actual, expect);
335 }
336 }
337 }
338}