1use std::borrow::Cow;
4use std::cell::RefCell;
5use std::future::poll_fn;
6use std::rc::Rc;
7
8use deno_core::CancelFuture;
9use deno_core::CancelHandle;
10use deno_core::DetachedBuffer;
11use deno_core::JsBuffer;
12use deno_core::OpState;
13use deno_core::RcRef;
14use deno_core::Resource;
15use deno_core::ResourceId;
16use deno_core::TransferredResource;
17use deno_core::op2;
18use deno_error::JsErrorBox;
19use serde::Deserialize;
20use serde::Serialize;
21use tokio::sync::mpsc::UnboundedReceiver;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio::sync::mpsc::error::TryRecvError;
24use tokio::sync::mpsc::unbounded_channel;
25
26#[derive(Debug, thiserror::Error, deno_error::JsError)]
27pub enum MessagePortError {
28 #[class(type)]
29 #[error("Invalid message port transfer")]
30 InvalidTransfer,
31 #[class(type)]
32 #[error("Message port is not ready for transfer")]
33 NotReady,
34 #[class(type)]
35 #[error("Can not transfer self message port")]
36 TransferSelf,
37 #[class(inherit)]
38 #[error(transparent)]
39 Canceled(#[from] deno_core::Canceled),
40 #[class(inherit)]
41 #[error(transparent)]
42 Resource(deno_core::error::ResourceError),
43 #[class(inherit)]
44 #[error(transparent)]
45 Generic(JsErrorBox),
46}
47
48pub enum Transferable {
49 Resource(String, Box<dyn TransferredResource>),
50 MultiResource(String, Vec<Box<dyn TransferredResource>>),
51 ArrayBuffer(u32),
52}
53
54type MessagePortMessage = (DetachedBuffer, Vec<Transferable>);
55
56pub struct MessagePort {
57 pub rx: RefCell<UnboundedReceiver<MessagePortMessage>>,
58 pub tx: RefCell<Option<UnboundedSender<MessagePortMessage>>>,
59}
60
61impl MessagePort {
62 pub fn send(
63 &self,
64 state: &mut OpState,
65 data: JsMessageData,
66 ) -> Result<(), MessagePortError> {
67 let transferables = if data.transferables.is_empty() {
68 vec![]
69 } else {
70 deserialize_js_transferables(state, data.transferables)?
71 };
72
73 if let Some(tx) = &*self.tx.borrow() {
76 tx.send((data.data, transferables)).ok();
77 }
78
79 Ok(())
80 }
81
82 pub async fn recv(
83 &self,
84 state: Rc<RefCell<OpState>>,
85 ) -> Result<Option<JsMessageData>, MessagePortError> {
86 let rx = &self.rx;
87
88 let maybe_data = poll_fn(|cx| {
89 let mut rx = rx.borrow_mut();
90 rx.poll_recv(cx)
91 })
92 .await;
93
94 if let Some((data, transferables)) = maybe_data {
95 let js_transferables = if transferables.is_empty() {
96 vec![]
97 } else {
98 serialize_transferables(&mut state.borrow_mut(), transferables)
99 };
100 return Ok(Some(JsMessageData {
101 data,
102 transferables: js_transferables,
103 }));
104 }
105 Ok(None)
106 }
107
108 pub fn try_recv_sync(
111 &self,
112 state: &mut OpState,
113 ) -> Result<Option<JsMessageData>, MessagePortError> {
114 let mut rx = self.rx.borrow_mut();
115 match rx.try_recv() {
116 Ok((data, transferables)) => {
117 let js_transferables = if transferables.is_empty() {
118 vec![]
119 } else {
120 serialize_transferables(state, transferables)
121 };
122 Ok(Some(JsMessageData {
123 data,
124 transferables: js_transferables,
125 }))
126 }
127 Err(TryRecvError::Empty) => Ok(None),
128 Err(TryRecvError::Disconnected) => Ok(None),
129 }
130 }
131
132 pub fn disentangle(&self) {
135 let mut tx = self.tx.borrow_mut();
136 tx.take();
137 }
138}
139
140pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
141 let (port1_tx, port2_rx) = unbounded_channel::<MessagePortMessage>();
142 let (port2_tx, port1_rx) = unbounded_channel::<MessagePortMessage>();
143
144 let port1 = MessagePort {
145 rx: RefCell::new(port1_rx),
146 tx: RefCell::new(Some(port1_tx)),
147 };
148
149 let port2 = MessagePort {
150 rx: RefCell::new(port2_rx),
151 tx: RefCell::new(Some(port2_tx)),
152 };
153
154 (port1, port2)
155}
156
157pub struct MessagePortResource {
158 port: MessagePort,
159 cancel: CancelHandle,
160}
161
162impl Resource for MessagePortResource {
163 fn name(&self) -> Cow<'_, str> {
164 "messagePort".into()
165 }
166
167 fn close(self: Rc<Self>) {
168 self.cancel.cancel();
169 }
170
171 fn transfer(
172 self: Rc<Self>,
173 ) -> Result<Box<dyn TransferredResource>, JsErrorBox> {
174 self.cancel.cancel();
175 let resource = Rc::try_unwrap(self)
176 .map_err(|_| JsErrorBox::from_err(MessagePortError::NotReady))?;
177 Ok(Box::new(resource.port))
178 }
179}
180
181impl TransferredResource for MessagePort {
182 fn receive(self: Box<Self>) -> Rc<dyn Resource> {
183 Rc::new(MessagePortResource {
184 port: *self,
185 cancel: CancelHandle::new(),
186 })
187 }
188}
189
190#[op2]
191pub fn op_message_port_create_entangled(
192 state: &mut OpState,
193) -> (ResourceId, ResourceId) {
194 let (port1, port2) = create_entangled_message_port();
195
196 let port1_id = state.resource_table.add(MessagePortResource {
197 port: port1,
198 cancel: CancelHandle::new(),
199 });
200
201 let port2_id = state.resource_table.add(MessagePortResource {
202 port: port2,
203 cancel: CancelHandle::new(),
204 });
205
206 (port1_id, port2_id)
207}
208
209#[derive(Deserialize, Serialize)]
210#[serde(tag = "kind", content = "data", rename_all = "camelCase")]
211pub enum JsTransferable {
212 ArrayBuffer(u32),
213 Resource(String, ResourceId),
214 MultiResource(String, Vec<ResourceId>),
215}
216
217pub fn deserialize_js_transferables(
218 state: &mut OpState,
219 js_transferables: Vec<JsTransferable>,
220) -> Result<Vec<Transferable>, MessagePortError> {
221 let mut transferables = Vec::with_capacity(js_transferables.len());
222 for js_transferable in js_transferables {
223 match js_transferable {
224 JsTransferable::Resource(name, rid) => {
225 let resource = state
226 .resource_table
227 .take_any(rid)
228 .map_err(|_| MessagePortError::InvalidTransfer)?;
229 let tx = resource.transfer().map_err(MessagePortError::Generic)?;
230 transferables.push(Transferable::Resource(name, tx));
231 }
232 JsTransferable::MultiResource(name, rids) => {
233 let mut txs = Vec::with_capacity(rids.len());
234 for rid in rids {
235 let resource = state
236 .resource_table
237 .take_any(rid)
238 .map_err(|_| MessagePortError::InvalidTransfer)?;
239 let tx = resource.transfer().map_err(MessagePortError::Generic)?;
240 txs.push(tx);
241 }
242 transferables.push(Transferable::MultiResource(name, txs));
243 }
244 JsTransferable::ArrayBuffer(id) => {
245 transferables.push(Transferable::ArrayBuffer(id));
246 }
247 }
248 }
249 Ok(transferables)
250}
251
252pub fn serialize_transferables(
253 state: &mut OpState,
254 transferables: Vec<Transferable>,
255) -> Vec<JsTransferable> {
256 let mut js_transferables = Vec::with_capacity(transferables.len());
257 for transferable in transferables {
258 match transferable {
259 Transferable::Resource(name, tx) => {
260 let rx = tx.receive();
261 let rid = state.resource_table.add_rc_dyn(rx);
262 js_transferables.push(JsTransferable::Resource(name, rid));
263 }
264 Transferable::MultiResource(name, txs) => {
265 let rids = txs
266 .into_iter()
267 .map(|tx| state.resource_table.add_rc_dyn(tx.receive()))
268 .collect();
269 js_transferables.push(JsTransferable::MultiResource(name, rids));
270 }
271 Transferable::ArrayBuffer(id) => {
272 js_transferables.push(JsTransferable::ArrayBuffer(id));
273 }
274 }
275 }
276 js_transferables
277}
278
279#[derive(Deserialize, Serialize)]
280pub struct JsMessageData {
281 pub data: DetachedBuffer,
282 pub transferables: Vec<JsTransferable>,
283}
284
285#[op2]
286pub fn op_message_port_post_message(
287 state: &mut OpState,
288 #[smi] rid: ResourceId,
289 #[serde] data: JsMessageData,
290) -> Result<(), MessagePortError> {
291 for js_transferable in &data.transferables {
292 if let JsTransferable::Resource(_name, id) = js_transferable
293 && *id == rid
294 {
295 return Err(MessagePortError::TransferSelf);
296 }
297 }
298
299 let resource = state
300 .resource_table
301 .get::<MessagePortResource>(rid)
302 .map_err(MessagePortError::Resource)?;
303 resource.port.send(state, data)
304}
305
306#[op2]
307#[serde]
308pub async fn op_message_port_recv_message(
309 state: Rc<RefCell<OpState>>,
310 #[smi] rid: ResourceId,
311) -> Result<Option<JsMessageData>, MessagePortError> {
312 let resource = {
313 let state = state.borrow();
314 match state.resource_table.get::<MessagePortResource>(rid) {
315 Ok(resource) => resource,
316 Err(_) => return Ok(None),
317 }
318 };
319 let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
320 resource.port.recv(state).or_cancel(cancel).await?
321}
322
323#[op2]
324#[serde]
325pub fn op_message_port_recv_message_sync(
326 state: &mut OpState,
327 #[smi] rid: ResourceId,
328) -> Result<Option<JsMessageData>, MessagePortError> {
329 let resource = state
330 .resource_table
331 .get::<MessagePortResource>(rid)
332 .map_err(MessagePortError::Resource)?;
333 resource.port.try_recv_sync(state)
334}
335
336#[op2]
339pub fn op_message_port_post_message_raw(
340 state: &mut OpState,
341 #[smi] rid: ResourceId,
342 #[buffer(detach)] data: JsBuffer,
343) -> Result<(), MessagePortError> {
344 let resource = state
345 .resource_table
346 .get::<MessagePortResource>(rid)
347 .map_err(MessagePortError::Resource)?;
348 let detached = DetachedBuffer::from_v8slice(data.into_parts());
349 if let Some(tx) = &*resource.port.tx.borrow() {
350 tx.send((detached, vec![])).ok();
351 }
352 Ok(())
353}