Skip to main content

deno_web/
message_port.rs

1// Copyright 2018-2026 the Deno authors. MIT license.
2
3use std::borrow::Cow;
4use std::cell::RefCell;
5use std::future::poll_fn;
6use std::rc::Rc;
7
8use deno_core::CancelFuture;
9use deno_core::CancelHandle;
10use deno_core::DetachedBuffer;
11use deno_core::JsBuffer;
12use deno_core::OpState;
13use deno_core::RcRef;
14use deno_core::Resource;
15use deno_core::ResourceId;
16use deno_core::TransferredResource;
17use deno_core::op2;
18use deno_error::JsErrorBox;
19use serde::Deserialize;
20use serde::Serialize;
21use tokio::sync::mpsc::UnboundedReceiver;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio::sync::mpsc::error::TryRecvError;
24use tokio::sync::mpsc::unbounded_channel;
25
26#[derive(Debug, thiserror::Error, deno_error::JsError)]
27pub enum MessagePortError {
28  #[class(type)]
29  #[error("Invalid message port transfer")]
30  InvalidTransfer,
31  #[class(type)]
32  #[error("Message port is not ready for transfer")]
33  NotReady,
34  #[class(type)]
35  #[error("Can not transfer self message port")]
36  TransferSelf,
37  #[class(inherit)]
38  #[error(transparent)]
39  Canceled(#[from] deno_core::Canceled),
40  #[class(inherit)]
41  #[error(transparent)]
42  Resource(deno_core::error::ResourceError),
43  #[class(inherit)]
44  #[error(transparent)]
45  Generic(JsErrorBox),
46}
47
48pub enum Transferable {
49  Resource(String, Box<dyn TransferredResource>),
50  MultiResource(String, Vec<Box<dyn TransferredResource>>),
51  ArrayBuffer(u32),
52}
53
54type MessagePortMessage = (DetachedBuffer, Vec<Transferable>);
55
56pub struct MessagePort {
57  pub rx: RefCell<UnboundedReceiver<MessagePortMessage>>,
58  pub tx: RefCell<Option<UnboundedSender<MessagePortMessage>>>,
59}
60
61impl MessagePort {
62  pub fn send(
63    &self,
64    state: &mut OpState,
65    data: JsMessageData,
66  ) -> Result<(), MessagePortError> {
67    let transferables = if data.transferables.is_empty() {
68      vec![]
69    } else {
70      deserialize_js_transferables(state, data.transferables)?
71    };
72
73    // Swallow the failed to send error. It means the channel was disentangled,
74    // but not cleaned up.
75    if let Some(tx) = &*self.tx.borrow() {
76      tx.send((data.data, transferables)).ok();
77    }
78
79    Ok(())
80  }
81
82  pub async fn recv(
83    &self,
84    state: Rc<RefCell<OpState>>,
85  ) -> Result<Option<JsMessageData>, MessagePortError> {
86    let rx = &self.rx;
87
88    let maybe_data = poll_fn(|cx| {
89      let mut rx = rx.borrow_mut();
90      rx.poll_recv(cx)
91    })
92    .await;
93
94    if let Some((data, transferables)) = maybe_data {
95      let js_transferables = if transferables.is_empty() {
96        vec![]
97      } else {
98        serialize_transferables(&mut state.borrow_mut(), transferables)
99      };
100      return Ok(Some(JsMessageData {
101        data,
102        transferables: js_transferables,
103      }));
104    }
105    Ok(None)
106  }
107
108  /// Try to receive a message synchronously without blocking.
109  /// Returns `Ok(None)` if no message is available or the channel is closed.
110  pub fn try_recv_sync(
111    &self,
112    state: &mut OpState,
113  ) -> Result<Option<JsMessageData>, MessagePortError> {
114    let mut rx = self.rx.borrow_mut();
115    match rx.try_recv() {
116      Ok((data, transferables)) => {
117        let js_transferables = if transferables.is_empty() {
118          vec![]
119        } else {
120          serialize_transferables(state, transferables)
121        };
122        Ok(Some(JsMessageData {
123          data,
124          transferables: js_transferables,
125        }))
126      }
127      Err(TryRecvError::Empty) => Ok(None),
128      Err(TryRecvError::Disconnected) => Ok(None),
129    }
130  }
131
132  /// This forcefully disconnects the message port from its paired port. This
133  /// will wake up the `.recv` on the paired port, which will return `Ok(None)`.
134  pub fn disentangle(&self) {
135    let mut tx = self.tx.borrow_mut();
136    tx.take();
137  }
138}
139
140pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
141  let (port1_tx, port2_rx) = unbounded_channel::<MessagePortMessage>();
142  let (port2_tx, port1_rx) = unbounded_channel::<MessagePortMessage>();
143
144  let port1 = MessagePort {
145    rx: RefCell::new(port1_rx),
146    tx: RefCell::new(Some(port1_tx)),
147  };
148
149  let port2 = MessagePort {
150    rx: RefCell::new(port2_rx),
151    tx: RefCell::new(Some(port2_tx)),
152  };
153
154  (port1, port2)
155}
156
157pub struct MessagePortResource {
158  port: MessagePort,
159  cancel: CancelHandle,
160}
161
162impl Resource for MessagePortResource {
163  fn name(&self) -> Cow<'_, str> {
164    "messagePort".into()
165  }
166
167  fn close(self: Rc<Self>) {
168    self.cancel.cancel();
169  }
170
171  fn transfer(
172    self: Rc<Self>,
173  ) -> Result<Box<dyn TransferredResource>, JsErrorBox> {
174    self.cancel.cancel();
175    let resource = Rc::try_unwrap(self)
176      .map_err(|_| JsErrorBox::from_err(MessagePortError::NotReady))?;
177    Ok(Box::new(resource.port))
178  }
179}
180
181impl TransferredResource for MessagePort {
182  fn receive(self: Box<Self>) -> Rc<dyn Resource> {
183    Rc::new(MessagePortResource {
184      port: *self,
185      cancel: CancelHandle::new(),
186    })
187  }
188}
189
190#[op2]
191pub fn op_message_port_create_entangled(
192  state: &mut OpState,
193) -> (ResourceId, ResourceId) {
194  let (port1, port2) = create_entangled_message_port();
195
196  let port1_id = state.resource_table.add(MessagePortResource {
197    port: port1,
198    cancel: CancelHandle::new(),
199  });
200
201  let port2_id = state.resource_table.add(MessagePortResource {
202    port: port2,
203    cancel: CancelHandle::new(),
204  });
205
206  (port1_id, port2_id)
207}
208
209#[derive(Deserialize, Serialize)]
210#[serde(tag = "kind", content = "data", rename_all = "camelCase")]
211pub enum JsTransferable {
212  ArrayBuffer(u32),
213  Resource(String, ResourceId),
214  MultiResource(String, Vec<ResourceId>),
215}
216
217pub fn deserialize_js_transferables(
218  state: &mut OpState,
219  js_transferables: Vec<JsTransferable>,
220) -> Result<Vec<Transferable>, MessagePortError> {
221  let mut transferables = Vec::with_capacity(js_transferables.len());
222  for js_transferable in js_transferables {
223    match js_transferable {
224      JsTransferable::Resource(name, rid) => {
225        let resource = state
226          .resource_table
227          .take_any(rid)
228          .map_err(|_| MessagePortError::InvalidTransfer)?;
229        let tx = resource.transfer().map_err(MessagePortError::Generic)?;
230        transferables.push(Transferable::Resource(name, tx));
231      }
232      JsTransferable::MultiResource(name, rids) => {
233        let mut txs = Vec::with_capacity(rids.len());
234        for rid in rids {
235          let resource = state
236            .resource_table
237            .take_any(rid)
238            .map_err(|_| MessagePortError::InvalidTransfer)?;
239          let tx = resource.transfer().map_err(MessagePortError::Generic)?;
240          txs.push(tx);
241        }
242        transferables.push(Transferable::MultiResource(name, txs));
243      }
244      JsTransferable::ArrayBuffer(id) => {
245        transferables.push(Transferable::ArrayBuffer(id));
246      }
247    }
248  }
249  Ok(transferables)
250}
251
252pub fn serialize_transferables(
253  state: &mut OpState,
254  transferables: Vec<Transferable>,
255) -> Vec<JsTransferable> {
256  let mut js_transferables = Vec::with_capacity(transferables.len());
257  for transferable in transferables {
258    match transferable {
259      Transferable::Resource(name, tx) => {
260        let rx = tx.receive();
261        let rid = state.resource_table.add_rc_dyn(rx);
262        js_transferables.push(JsTransferable::Resource(name, rid));
263      }
264      Transferable::MultiResource(name, txs) => {
265        let rids = txs
266          .into_iter()
267          .map(|tx| state.resource_table.add_rc_dyn(tx.receive()))
268          .collect();
269        js_transferables.push(JsTransferable::MultiResource(name, rids));
270      }
271      Transferable::ArrayBuffer(id) => {
272        js_transferables.push(JsTransferable::ArrayBuffer(id));
273      }
274    }
275  }
276  js_transferables
277}
278
279#[derive(Deserialize, Serialize)]
280pub struct JsMessageData {
281  pub data: DetachedBuffer,
282  pub transferables: Vec<JsTransferable>,
283}
284
285#[op2]
286pub fn op_message_port_post_message(
287  state: &mut OpState,
288  #[smi] rid: ResourceId,
289  #[serde] data: JsMessageData,
290) -> Result<(), MessagePortError> {
291  for js_transferable in &data.transferables {
292    if let JsTransferable::Resource(_name, id) = js_transferable
293      && *id == rid
294    {
295      return Err(MessagePortError::TransferSelf);
296    }
297  }
298
299  let resource = state
300    .resource_table
301    .get::<MessagePortResource>(rid)
302    .map_err(MessagePortError::Resource)?;
303  resource.port.send(state, data)
304}
305
306#[op2]
307#[serde]
308pub async fn op_message_port_recv_message(
309  state: Rc<RefCell<OpState>>,
310  #[smi] rid: ResourceId,
311) -> Result<Option<JsMessageData>, MessagePortError> {
312  let resource = {
313    let state = state.borrow();
314    match state.resource_table.get::<MessagePortResource>(rid) {
315      Ok(resource) => resource,
316      Err(_) => return Ok(None),
317    }
318  };
319  let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
320  resource.port.recv(state).or_cancel(cancel).await?
321}
322
323#[op2]
324#[serde]
325pub fn op_message_port_recv_message_sync(
326  state: &mut OpState,
327  #[smi] rid: ResourceId,
328) -> Result<Option<JsMessageData>, MessagePortError> {
329  let resource = state
330    .resource_table
331    .get::<MessagePortResource>(rid)
332    .map_err(MessagePortError::Resource)?;
333  resource.port.try_recv_sync(state)
334}
335
336/// Fast-path post: takes a pre-serialized buffer directly, bypassing
337/// the JsMessageData serde overhead. Only for messages with no transferables.
338#[op2]
339pub fn op_message_port_post_message_raw(
340  state: &mut OpState,
341  #[smi] rid: ResourceId,
342  #[buffer(detach)] data: JsBuffer,
343) -> Result<(), MessagePortError> {
344  let resource = state
345    .resource_table
346    .get::<MessagePortResource>(rid)
347    .map_err(MessagePortError::Resource)?;
348  let detached = DetachedBuffer::from_v8slice(data.into_parts());
349  if let Some(tx) = &*resource.port.tx.borrow() {
350    tx.send((detached, vec![])).ok();
351  }
352  Ok(())
353}