Skip to main content

deno_web/
message_port.rs

1// Copyright 2018-2026 the Deno authors. MIT license.
2
3use std::borrow::Cow;
4use std::cell::RefCell;
5use std::future::poll_fn;
6use std::rc::Rc;
7
8use deno_core::CancelFuture;
9use deno_core::CancelHandle;
10use deno_core::DetachedBuffer;
11use deno_core::OpState;
12use deno_core::RcRef;
13use deno_core::Resource;
14use deno_core::ResourceId;
15use deno_core::TransferredResource;
16use deno_core::op2;
17use deno_error::JsErrorBox;
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::sync::mpsc::UnboundedReceiver;
21use tokio::sync::mpsc::UnboundedSender;
22use tokio::sync::mpsc::error::TryRecvError;
23use tokio::sync::mpsc::unbounded_channel;
24
25#[derive(Debug, thiserror::Error, deno_error::JsError)]
26pub enum MessagePortError {
27  #[class(type)]
28  #[error("Invalid message port transfer")]
29  InvalidTransfer,
30  #[class(type)]
31  #[error("Message port is not ready for transfer")]
32  NotReady,
33  #[class(type)]
34  #[error("Can not transfer self message port")]
35  TransferSelf,
36  #[class(inherit)]
37  #[error(transparent)]
38  Canceled(#[from] deno_core::Canceled),
39  #[class(inherit)]
40  #[error(transparent)]
41  Resource(deno_core::error::ResourceError),
42  #[class(inherit)]
43  #[error(transparent)]
44  Generic(JsErrorBox),
45}
46
47pub enum Transferable {
48  Resource(String, Box<dyn TransferredResource>),
49  MultiResource(String, Vec<Box<dyn TransferredResource>>),
50  ArrayBuffer(u32),
51}
52
53type MessagePortMessage = (DetachedBuffer, Vec<Transferable>);
54
55pub struct MessagePort {
56  rx: RefCell<UnboundedReceiver<MessagePortMessage>>,
57  tx: RefCell<Option<UnboundedSender<MessagePortMessage>>>,
58}
59
60impl MessagePort {
61  pub fn send(
62    &self,
63    state: &mut OpState,
64    data: JsMessageData,
65  ) -> Result<(), MessagePortError> {
66    let transferables =
67      deserialize_js_transferables(state, data.transferables)?;
68
69    // Swallow the failed to send error. It means the channel was disentangled,
70    // but not cleaned up.
71    if let Some(tx) = &*self.tx.borrow() {
72      tx.send((data.data, transferables)).ok();
73    }
74
75    Ok(())
76  }
77
78  pub async fn recv(
79    &self,
80    state: Rc<RefCell<OpState>>,
81  ) -> Result<Option<JsMessageData>, MessagePortError> {
82    let rx = &self.rx;
83
84    let maybe_data = poll_fn(|cx| {
85      let mut rx = rx.borrow_mut();
86      rx.poll_recv(cx)
87    })
88    .await;
89
90    if let Some((data, transferables)) = maybe_data {
91      let js_transferables =
92        serialize_transferables(&mut state.borrow_mut(), transferables);
93      return Ok(Some(JsMessageData {
94        data,
95        transferables: js_transferables,
96      }));
97    }
98    Ok(None)
99  }
100
101  /// This forcefully disconnects the message port from its paired port. This
102  /// will wake up the `.recv` on the paired port, which will return `Ok(None)`.
103  pub fn disentangle(&self) {
104    let mut tx = self.tx.borrow_mut();
105    tx.take();
106  }
107}
108
109pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
110  let (port1_tx, port2_rx) = unbounded_channel::<MessagePortMessage>();
111  let (port2_tx, port1_rx) = unbounded_channel::<MessagePortMessage>();
112
113  let port1 = MessagePort {
114    rx: RefCell::new(port1_rx),
115    tx: RefCell::new(Some(port1_tx)),
116  };
117
118  let port2 = MessagePort {
119    rx: RefCell::new(port2_rx),
120    tx: RefCell::new(Some(port2_tx)),
121  };
122
123  (port1, port2)
124}
125
126pub struct MessagePortResource {
127  port: MessagePort,
128  cancel: CancelHandle,
129}
130
131impl Resource for MessagePortResource {
132  fn name(&self) -> Cow<'_, str> {
133    "messagePort".into()
134  }
135
136  fn close(self: Rc<Self>) {
137    self.cancel.cancel();
138  }
139
140  fn transfer(
141    self: Rc<Self>,
142  ) -> Result<Box<dyn TransferredResource>, JsErrorBox> {
143    self.cancel.cancel();
144    let resource = Rc::try_unwrap(self)
145      .map_err(|_| JsErrorBox::from_err(MessagePortError::NotReady))?;
146    Ok(Box::new(resource.port))
147  }
148}
149
150impl TransferredResource for MessagePort {
151  fn receive(self: Box<Self>) -> Rc<dyn Resource> {
152    Rc::new(MessagePortResource {
153      port: *self,
154      cancel: CancelHandle::new(),
155    })
156  }
157}
158
159#[op2]
160pub fn op_message_port_create_entangled(
161  state: &mut OpState,
162) -> (ResourceId, ResourceId) {
163  let (port1, port2) = create_entangled_message_port();
164
165  let port1_id = state.resource_table.add(MessagePortResource {
166    port: port1,
167    cancel: CancelHandle::new(),
168  });
169
170  let port2_id = state.resource_table.add(MessagePortResource {
171    port: port2,
172    cancel: CancelHandle::new(),
173  });
174
175  (port1_id, port2_id)
176}
177
178#[derive(Deserialize, Serialize)]
179#[serde(tag = "kind", content = "data", rename_all = "camelCase")]
180pub enum JsTransferable {
181  ArrayBuffer(u32),
182  Resource(String, ResourceId),
183  MultiResource(String, Vec<ResourceId>),
184}
185
186pub fn deserialize_js_transferables(
187  state: &mut OpState,
188  js_transferables: Vec<JsTransferable>,
189) -> Result<Vec<Transferable>, MessagePortError> {
190  let mut transferables = Vec::with_capacity(js_transferables.len());
191  for js_transferable in js_transferables {
192    match js_transferable {
193      JsTransferable::Resource(name, rid) => {
194        let resource = state
195          .resource_table
196          .take_any(rid)
197          .map_err(|_| MessagePortError::InvalidTransfer)?;
198        let tx = resource.transfer().map_err(MessagePortError::Generic)?;
199        transferables.push(Transferable::Resource(name, tx));
200      }
201      JsTransferable::MultiResource(name, rids) => {
202        let mut txs = Vec::with_capacity(rids.len());
203        for rid in rids {
204          let resource = state
205            .resource_table
206            .take_any(rid)
207            .map_err(|_| MessagePortError::InvalidTransfer)?;
208          let tx = resource.transfer().map_err(MessagePortError::Generic)?;
209          txs.push(tx);
210        }
211        transferables.push(Transferable::MultiResource(name, txs));
212      }
213      JsTransferable::ArrayBuffer(id) => {
214        transferables.push(Transferable::ArrayBuffer(id));
215      }
216    }
217  }
218  Ok(transferables)
219}
220
221pub fn serialize_transferables(
222  state: &mut OpState,
223  transferables: Vec<Transferable>,
224) -> Vec<JsTransferable> {
225  let mut js_transferables = Vec::with_capacity(transferables.len());
226  for transferable in transferables {
227    match transferable {
228      Transferable::Resource(name, tx) => {
229        let rx = tx.receive();
230        let rid = state.resource_table.add_rc_dyn(rx);
231        js_transferables.push(JsTransferable::Resource(name, rid));
232      }
233      Transferable::MultiResource(name, txs) => {
234        let rids = txs
235          .into_iter()
236          .map(|tx| state.resource_table.add_rc_dyn(tx.receive()))
237          .collect();
238        js_transferables.push(JsTransferable::MultiResource(name, rids));
239      }
240      Transferable::ArrayBuffer(id) => {
241        js_transferables.push(JsTransferable::ArrayBuffer(id));
242      }
243    }
244  }
245  js_transferables
246}
247
248#[derive(Deserialize, Serialize)]
249pub struct JsMessageData {
250  pub data: DetachedBuffer,
251  pub transferables: Vec<JsTransferable>,
252}
253
254#[op2]
255pub fn op_message_port_post_message(
256  state: &mut OpState,
257  #[smi] rid: ResourceId,
258  #[serde] data: JsMessageData,
259) -> Result<(), MessagePortError> {
260  for js_transferable in &data.transferables {
261    if let JsTransferable::Resource(_name, id) = js_transferable
262      && *id == rid
263    {
264      return Err(MessagePortError::TransferSelf);
265    }
266  }
267
268  let resource = state
269    .resource_table
270    .get::<MessagePortResource>(rid)
271    .map_err(MessagePortError::Resource)?;
272  resource.port.send(state, data)
273}
274
275#[op2]
276#[serde]
277pub async fn op_message_port_recv_message(
278  state: Rc<RefCell<OpState>>,
279  #[smi] rid: ResourceId,
280) -> Result<Option<JsMessageData>, MessagePortError> {
281  let resource = {
282    let state = state.borrow();
283    match state.resource_table.get::<MessagePortResource>(rid) {
284      Ok(resource) => resource,
285      Err(_) => return Ok(None),
286    }
287  };
288  let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
289  resource.port.recv(state).or_cancel(cancel).await?
290}
291
292#[op2]
293#[serde]
294pub fn op_message_port_recv_message_sync(
295  state: &mut OpState, // Rc<RefCell<OpState>>,
296  #[smi] rid: ResourceId,
297) -> Result<Option<JsMessageData>, MessagePortError> {
298  let resource = state
299    .resource_table
300    .get::<MessagePortResource>(rid)
301    .map_err(MessagePortError::Resource)?;
302  let mut rx = resource.port.rx.borrow_mut();
303
304  match rx.try_recv() {
305    Ok((d, t)) => Ok(Some(JsMessageData {
306      data: d,
307      transferables: serialize_transferables(state, t),
308    })),
309    Err(TryRecvError::Empty) => Ok(None),
310    Err(TryRecvError::Disconnected) => Ok(None),
311  }
312}