deno_web/
message_port.rs

1// Copyright 2018-2025 the Deno authors. MIT license.
2
3use std::borrow::Cow;
4use std::cell::RefCell;
5use std::future::poll_fn;
6use std::rc::Rc;
7
8use deno_core::CancelFuture;
9use deno_core::CancelHandle;
10use deno_core::DetachedBuffer;
11use deno_core::OpState;
12use deno_core::RcRef;
13use deno_core::Resource;
14use deno_core::ResourceId;
15use deno_core::op2;
16use serde::Deserialize;
17use serde::Serialize;
18use tokio::sync::mpsc::UnboundedReceiver;
19use tokio::sync::mpsc::UnboundedSender;
20use tokio::sync::mpsc::error::TryRecvError;
21use tokio::sync::mpsc::unbounded_channel;
22
23#[derive(Debug, thiserror::Error, deno_error::JsError)]
24pub enum MessagePortError {
25  #[class(type)]
26  #[error("Invalid message port transfer")]
27  InvalidTransfer,
28  #[class(type)]
29  #[error("Message port is not ready for transfer")]
30  NotReady,
31  #[class(type)]
32  #[error("Can not transfer self message port")]
33  TransferSelf,
34  #[class(inherit)]
35  #[error(transparent)]
36  Canceled(#[from] deno_core::Canceled),
37  #[class(inherit)]
38  #[error(transparent)]
39  Resource(deno_core::error::ResourceError),
40}
41
42pub enum Transferable {
43  MessagePort(MessagePort),
44  ArrayBuffer(u32),
45}
46
47type MessagePortMessage = (DetachedBuffer, Vec<Transferable>);
48
49pub struct MessagePort {
50  rx: RefCell<UnboundedReceiver<MessagePortMessage>>,
51  tx: RefCell<Option<UnboundedSender<MessagePortMessage>>>,
52}
53
54impl MessagePort {
55  pub fn send(
56    &self,
57    state: &mut OpState,
58    data: JsMessageData,
59  ) -> Result<(), MessagePortError> {
60    let transferables =
61      deserialize_js_transferables(state, data.transferables)?;
62
63    // Swallow the failed to send error. It means the channel was disentangled,
64    // but not cleaned up.
65    if let Some(tx) = &*self.tx.borrow() {
66      tx.send((data.data, transferables)).ok();
67    }
68
69    Ok(())
70  }
71
72  pub async fn recv(
73    &self,
74    state: Rc<RefCell<OpState>>,
75  ) -> Result<Option<JsMessageData>, MessagePortError> {
76    let rx = &self.rx;
77
78    let maybe_data = poll_fn(|cx| {
79      let mut rx = rx.borrow_mut();
80      rx.poll_recv(cx)
81    })
82    .await;
83
84    if let Some((data, transferables)) = maybe_data {
85      let js_transferables =
86        serialize_transferables(&mut state.borrow_mut(), transferables);
87      return Ok(Some(JsMessageData {
88        data,
89        transferables: js_transferables,
90      }));
91    }
92    Ok(None)
93  }
94
95  /// This forcefully disconnects the message port from its paired port. This
96  /// will wake up the `.recv` on the paired port, which will return `Ok(None)`.
97  pub fn disentangle(&self) {
98    let mut tx = self.tx.borrow_mut();
99    tx.take();
100  }
101}
102
103pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
104  let (port1_tx, port2_rx) = unbounded_channel::<MessagePortMessage>();
105  let (port2_tx, port1_rx) = unbounded_channel::<MessagePortMessage>();
106
107  let port1 = MessagePort {
108    rx: RefCell::new(port1_rx),
109    tx: RefCell::new(Some(port1_tx)),
110  };
111
112  let port2 = MessagePort {
113    rx: RefCell::new(port2_rx),
114    tx: RefCell::new(Some(port2_tx)),
115  };
116
117  (port1, port2)
118}
119
120pub struct MessagePortResource {
121  port: MessagePort,
122  cancel: CancelHandle,
123}
124
125impl Resource for MessagePortResource {
126  fn name(&self) -> Cow<'_, str> {
127    "messagePort".into()
128  }
129
130  fn close(self: Rc<Self>) {
131    self.cancel.cancel();
132  }
133}
134
135#[op2]
136#[serde]
137pub fn op_message_port_create_entangled(
138  state: &mut OpState,
139) -> (ResourceId, ResourceId) {
140  let (port1, port2) = create_entangled_message_port();
141
142  let port1_id = state.resource_table.add(MessagePortResource {
143    port: port1,
144    cancel: CancelHandle::new(),
145  });
146
147  let port2_id = state.resource_table.add(MessagePortResource {
148    port: port2,
149    cancel: CancelHandle::new(),
150  });
151
152  (port1_id, port2_id)
153}
154
155#[derive(Deserialize, Serialize)]
156#[serde(tag = "kind", content = "data", rename_all = "camelCase")]
157pub enum JsTransferable {
158  #[serde(rename_all = "camelCase")]
159  MessagePort(ResourceId),
160  ArrayBuffer(u32),
161}
162
163pub fn deserialize_js_transferables(
164  state: &mut OpState,
165  js_transferables: Vec<JsTransferable>,
166) -> Result<Vec<Transferable>, MessagePortError> {
167  let mut transferables = Vec::with_capacity(js_transferables.len());
168  for js_transferable in js_transferables {
169    match js_transferable {
170      JsTransferable::MessagePort(id) => {
171        let resource = state
172          .resource_table
173          .take::<MessagePortResource>(id)
174          .map_err(|_| MessagePortError::InvalidTransfer)?;
175        resource.cancel.cancel();
176        let resource =
177          Rc::try_unwrap(resource).map_err(|_| MessagePortError::NotReady)?;
178        transferables.push(Transferable::MessagePort(resource.port));
179      }
180      JsTransferable::ArrayBuffer(id) => {
181        transferables.push(Transferable::ArrayBuffer(id));
182      }
183    }
184  }
185  Ok(transferables)
186}
187
188pub fn serialize_transferables(
189  state: &mut OpState,
190  transferables: Vec<Transferable>,
191) -> Vec<JsTransferable> {
192  let mut js_transferables = Vec::with_capacity(transferables.len());
193  for transferable in transferables {
194    match transferable {
195      Transferable::MessagePort(port) => {
196        let rid = state.resource_table.add(MessagePortResource {
197          port,
198          cancel: CancelHandle::new(),
199        });
200        js_transferables.push(JsTransferable::MessagePort(rid));
201      }
202      Transferable::ArrayBuffer(id) => {
203        js_transferables.push(JsTransferable::ArrayBuffer(id));
204      }
205    }
206  }
207  js_transferables
208}
209
210#[derive(Deserialize, Serialize)]
211pub struct JsMessageData {
212  pub data: DetachedBuffer,
213  pub transferables: Vec<JsTransferable>,
214}
215
216#[op2]
217pub fn op_message_port_post_message(
218  state: &mut OpState,
219  #[smi] rid: ResourceId,
220  #[serde] data: JsMessageData,
221) -> Result<(), MessagePortError> {
222  for js_transferable in &data.transferables {
223    if let JsTransferable::MessagePort(id) = js_transferable
224      && *id == rid
225    {
226      return Err(MessagePortError::TransferSelf);
227    }
228  }
229
230  let resource = state
231    .resource_table
232    .get::<MessagePortResource>(rid)
233    .map_err(MessagePortError::Resource)?;
234  resource.port.send(state, data)
235}
236
237#[op2(async)]
238#[serde]
239pub async fn op_message_port_recv_message(
240  state: Rc<RefCell<OpState>>,
241  #[smi] rid: ResourceId,
242) -> Result<Option<JsMessageData>, MessagePortError> {
243  let resource = {
244    let state = state.borrow();
245    match state.resource_table.get::<MessagePortResource>(rid) {
246      Ok(resource) => resource,
247      Err(_) => return Ok(None),
248    }
249  };
250  let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
251  resource.port.recv(state).or_cancel(cancel).await?
252}
253
254#[op2]
255#[serde]
256pub fn op_message_port_recv_message_sync(
257  state: &mut OpState, // Rc<RefCell<OpState>>,
258  #[smi] rid: ResourceId,
259) -> Result<Option<JsMessageData>, MessagePortError> {
260  let resource = state
261    .resource_table
262    .get::<MessagePortResource>(rid)
263    .map_err(MessagePortError::Resource)?;
264  let mut rx = resource.port.rx.borrow_mut();
265
266  match rx.try_recv() {
267    Ok((d, t)) => Ok(Some(JsMessageData {
268      data: d,
269      transferables: serialize_transferables(state, t),
270    })),
271    Err(TryRecvError::Empty) => Ok(None),
272    Err(TryRecvError::Disconnected) => Ok(None),
273  }
274}