1use std::borrow::Cow;
4use std::cell::RefCell;
5use std::future::poll_fn;
6use std::rc::Rc;
7
8use deno_core::CancelFuture;
9use deno_core::CancelHandle;
10use deno_core::DetachedBuffer;
11use deno_core::OpState;
12use deno_core::RcRef;
13use deno_core::Resource;
14use deno_core::ResourceId;
15use deno_core::TransferredResource;
16use deno_core::op2;
17use deno_error::JsErrorBox;
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::sync::mpsc::UnboundedReceiver;
21use tokio::sync::mpsc::UnboundedSender;
22use tokio::sync::mpsc::error::TryRecvError;
23use tokio::sync::mpsc::unbounded_channel;
24
25#[derive(Debug, thiserror::Error, deno_error::JsError)]
26pub enum MessagePortError {
27 #[class(type)]
28 #[error("Invalid message port transfer")]
29 InvalidTransfer,
30 #[class(type)]
31 #[error("Message port is not ready for transfer")]
32 NotReady,
33 #[class(type)]
34 #[error("Can not transfer self message port")]
35 TransferSelf,
36 #[class(inherit)]
37 #[error(transparent)]
38 Canceled(#[from] deno_core::Canceled),
39 #[class(inherit)]
40 #[error(transparent)]
41 Resource(deno_core::error::ResourceError),
42 #[class(inherit)]
43 #[error(transparent)]
44 Generic(JsErrorBox),
45}
46
47pub enum Transferable {
48 Resource(String, Box<dyn TransferredResource>),
49 MultiResource(String, Vec<Box<dyn TransferredResource>>),
50 ArrayBuffer(u32),
51}
52
53type MessagePortMessage = (DetachedBuffer, Vec<Transferable>);
54
55pub struct MessagePort {
56 rx: RefCell<UnboundedReceiver<MessagePortMessage>>,
57 tx: RefCell<Option<UnboundedSender<MessagePortMessage>>>,
58}
59
60impl MessagePort {
61 pub fn send(
62 &self,
63 state: &mut OpState,
64 data: JsMessageData,
65 ) -> Result<(), MessagePortError> {
66 let transferables =
67 deserialize_js_transferables(state, data.transferables)?;
68
69 if let Some(tx) = &*self.tx.borrow() {
72 tx.send((data.data, transferables)).ok();
73 }
74
75 Ok(())
76 }
77
78 pub async fn recv(
79 &self,
80 state: Rc<RefCell<OpState>>,
81 ) -> Result<Option<JsMessageData>, MessagePortError> {
82 let rx = &self.rx;
83
84 let maybe_data = poll_fn(|cx| {
85 let mut rx = rx.borrow_mut();
86 rx.poll_recv(cx)
87 })
88 .await;
89
90 if let Some((data, transferables)) = maybe_data {
91 let js_transferables =
92 serialize_transferables(&mut state.borrow_mut(), transferables);
93 return Ok(Some(JsMessageData {
94 data,
95 transferables: js_transferables,
96 }));
97 }
98 Ok(None)
99 }
100
101 pub fn disentangle(&self) {
104 let mut tx = self.tx.borrow_mut();
105 tx.take();
106 }
107}
108
109pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
110 let (port1_tx, port2_rx) = unbounded_channel::<MessagePortMessage>();
111 let (port2_tx, port1_rx) = unbounded_channel::<MessagePortMessage>();
112
113 let port1 = MessagePort {
114 rx: RefCell::new(port1_rx),
115 tx: RefCell::new(Some(port1_tx)),
116 };
117
118 let port2 = MessagePort {
119 rx: RefCell::new(port2_rx),
120 tx: RefCell::new(Some(port2_tx)),
121 };
122
123 (port1, port2)
124}
125
126pub struct MessagePortResource {
127 port: MessagePort,
128 cancel: CancelHandle,
129}
130
131impl Resource for MessagePortResource {
132 fn name(&self) -> Cow<'_, str> {
133 "messagePort".into()
134 }
135
136 fn close(self: Rc<Self>) {
137 self.cancel.cancel();
138 }
139
140 fn transfer(
141 self: Rc<Self>,
142 ) -> Result<Box<dyn TransferredResource>, JsErrorBox> {
143 self.cancel.cancel();
144 let resource = Rc::try_unwrap(self)
145 .map_err(|_| JsErrorBox::from_err(MessagePortError::NotReady))?;
146 Ok(Box::new(resource.port))
147 }
148}
149
150impl TransferredResource for MessagePort {
151 fn receive(self: Box<Self>) -> Rc<dyn Resource> {
152 Rc::new(MessagePortResource {
153 port: *self,
154 cancel: CancelHandle::new(),
155 })
156 }
157}
158
159#[op2]
160pub fn op_message_port_create_entangled(
161 state: &mut OpState,
162) -> (ResourceId, ResourceId) {
163 let (port1, port2) = create_entangled_message_port();
164
165 let port1_id = state.resource_table.add(MessagePortResource {
166 port: port1,
167 cancel: CancelHandle::new(),
168 });
169
170 let port2_id = state.resource_table.add(MessagePortResource {
171 port: port2,
172 cancel: CancelHandle::new(),
173 });
174
175 (port1_id, port2_id)
176}
177
178#[derive(Deserialize, Serialize)]
179#[serde(tag = "kind", content = "data", rename_all = "camelCase")]
180pub enum JsTransferable {
181 ArrayBuffer(u32),
182 Resource(String, ResourceId),
183 MultiResource(String, Vec<ResourceId>),
184}
185
186pub fn deserialize_js_transferables(
187 state: &mut OpState,
188 js_transferables: Vec<JsTransferable>,
189) -> Result<Vec<Transferable>, MessagePortError> {
190 let mut transferables = Vec::with_capacity(js_transferables.len());
191 for js_transferable in js_transferables {
192 match js_transferable {
193 JsTransferable::Resource(name, rid) => {
194 let resource = state
195 .resource_table
196 .take_any(rid)
197 .map_err(|_| MessagePortError::InvalidTransfer)?;
198 let tx = resource.transfer().map_err(MessagePortError::Generic)?;
199 transferables.push(Transferable::Resource(name, tx));
200 }
201 JsTransferable::MultiResource(name, rids) => {
202 let mut txs = Vec::with_capacity(rids.len());
203 for rid in rids {
204 let resource = state
205 .resource_table
206 .take_any(rid)
207 .map_err(|_| MessagePortError::InvalidTransfer)?;
208 let tx = resource.transfer().map_err(MessagePortError::Generic)?;
209 txs.push(tx);
210 }
211 transferables.push(Transferable::MultiResource(name, txs));
212 }
213 JsTransferable::ArrayBuffer(id) => {
214 transferables.push(Transferable::ArrayBuffer(id));
215 }
216 }
217 }
218 Ok(transferables)
219}
220
221pub fn serialize_transferables(
222 state: &mut OpState,
223 transferables: Vec<Transferable>,
224) -> Vec<JsTransferable> {
225 let mut js_transferables = Vec::with_capacity(transferables.len());
226 for transferable in transferables {
227 match transferable {
228 Transferable::Resource(name, tx) => {
229 let rx = tx.receive();
230 let rid = state.resource_table.add_rc_dyn(rx);
231 js_transferables.push(JsTransferable::Resource(name, rid));
232 }
233 Transferable::MultiResource(name, txs) => {
234 let rids = txs
235 .into_iter()
236 .map(|tx| state.resource_table.add_rc_dyn(tx.receive()))
237 .collect();
238 js_transferables.push(JsTransferable::MultiResource(name, rids));
239 }
240 Transferable::ArrayBuffer(id) => {
241 js_transferables.push(JsTransferable::ArrayBuffer(id));
242 }
243 }
244 }
245 js_transferables
246}
247
248#[derive(Deserialize, Serialize)]
249pub struct JsMessageData {
250 pub data: DetachedBuffer,
251 pub transferables: Vec<JsTransferable>,
252}
253
254#[op2]
255pub fn op_message_port_post_message(
256 state: &mut OpState,
257 #[smi] rid: ResourceId,
258 #[serde] data: JsMessageData,
259) -> Result<(), MessagePortError> {
260 for js_transferable in &data.transferables {
261 if let JsTransferable::Resource(_name, id) = js_transferable
262 && *id == rid
263 {
264 return Err(MessagePortError::TransferSelf);
265 }
266 }
267
268 let resource = state
269 .resource_table
270 .get::<MessagePortResource>(rid)
271 .map_err(MessagePortError::Resource)?;
272 resource.port.send(state, data)
273}
274
275#[op2]
276#[serde]
277pub async fn op_message_port_recv_message(
278 state: Rc<RefCell<OpState>>,
279 #[smi] rid: ResourceId,
280) -> Result<Option<JsMessageData>, MessagePortError> {
281 let resource = {
282 let state = state.borrow();
283 match state.resource_table.get::<MessagePortResource>(rid) {
284 Ok(resource) => resource,
285 Err(_) => return Ok(None),
286 }
287 };
288 let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
289 resource.port.recv(state).or_cancel(cancel).await?
290}
291
292#[op2]
293#[serde]
294pub fn op_message_port_recv_message_sync(
295 state: &mut OpState, #[smi] rid: ResourceId,
297) -> Result<Option<JsMessageData>, MessagePortError> {
298 let resource = state
299 .resource_table
300 .get::<MessagePortResource>(rid)
301 .map_err(MessagePortError::Resource)?;
302 let mut rx = resource.port.rx.borrow_mut();
303
304 match rx.try_recv() {
305 Ok((d, t)) => Ok(Some(JsMessageData {
306 data: d,
307 transferables: serialize_transferables(state, t),
308 })),
309 Err(TryRecvError::Empty) => Ok(None),
310 Err(TryRecvError::Disconnected) => Ok(None),
311 }
312}