1use std::borrow::Cow;
4use std::cell::RefCell;
5use std::future::poll_fn;
6use std::rc::Rc;
7
8use deno_core::CancelFuture;
9use deno_core::CancelHandle;
10use deno_core::DetachedBuffer;
11use deno_core::OpState;
12use deno_core::RcRef;
13use deno_core::Resource;
14use deno_core::ResourceId;
15use deno_core::op2;
16use serde::Deserialize;
17use serde::Serialize;
18use tokio::sync::mpsc::UnboundedReceiver;
19use tokio::sync::mpsc::UnboundedSender;
20use tokio::sync::mpsc::error::TryRecvError;
21use tokio::sync::mpsc::unbounded_channel;
22
23#[derive(Debug, thiserror::Error, deno_error::JsError)]
24pub enum MessagePortError {
25 #[class(type)]
26 #[error("Invalid message port transfer")]
27 InvalidTransfer,
28 #[class(type)]
29 #[error("Message port is not ready for transfer")]
30 NotReady,
31 #[class(type)]
32 #[error("Can not transfer self message port")]
33 TransferSelf,
34 #[class(inherit)]
35 #[error(transparent)]
36 Canceled(#[from] deno_core::Canceled),
37 #[class(inherit)]
38 #[error(transparent)]
39 Resource(deno_core::error::ResourceError),
40}
41
42pub enum Transferable {
43 MessagePort(MessagePort),
44 ArrayBuffer(u32),
45}
46
47type MessagePortMessage = (DetachedBuffer, Vec<Transferable>);
48
49pub struct MessagePort {
50 rx: RefCell<UnboundedReceiver<MessagePortMessage>>,
51 tx: RefCell<Option<UnboundedSender<MessagePortMessage>>>,
52}
53
54impl MessagePort {
55 pub fn send(
56 &self,
57 state: &mut OpState,
58 data: JsMessageData,
59 ) -> Result<(), MessagePortError> {
60 let transferables =
61 deserialize_js_transferables(state, data.transferables)?;
62
63 if let Some(tx) = &*self.tx.borrow() {
66 tx.send((data.data, transferables)).ok();
67 }
68
69 Ok(())
70 }
71
72 pub async fn recv(
73 &self,
74 state: Rc<RefCell<OpState>>,
75 ) -> Result<Option<JsMessageData>, MessagePortError> {
76 let rx = &self.rx;
77
78 let maybe_data = poll_fn(|cx| {
79 let mut rx = rx.borrow_mut();
80 rx.poll_recv(cx)
81 })
82 .await;
83
84 if let Some((data, transferables)) = maybe_data {
85 let js_transferables =
86 serialize_transferables(&mut state.borrow_mut(), transferables);
87 return Ok(Some(JsMessageData {
88 data,
89 transferables: js_transferables,
90 }));
91 }
92 Ok(None)
93 }
94
95 pub fn disentangle(&self) {
98 let mut tx = self.tx.borrow_mut();
99 tx.take();
100 }
101}
102
103pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
104 let (port1_tx, port2_rx) = unbounded_channel::<MessagePortMessage>();
105 let (port2_tx, port1_rx) = unbounded_channel::<MessagePortMessage>();
106
107 let port1 = MessagePort {
108 rx: RefCell::new(port1_rx),
109 tx: RefCell::new(Some(port1_tx)),
110 };
111
112 let port2 = MessagePort {
113 rx: RefCell::new(port2_rx),
114 tx: RefCell::new(Some(port2_tx)),
115 };
116
117 (port1, port2)
118}
119
120pub struct MessagePortResource {
121 port: MessagePort,
122 cancel: CancelHandle,
123}
124
125impl Resource for MessagePortResource {
126 fn name(&self) -> Cow<'_, str> {
127 "messagePort".into()
128 }
129
130 fn close(self: Rc<Self>) {
131 self.cancel.cancel();
132 }
133}
134
135#[op2]
136#[serde]
137pub fn op_message_port_create_entangled(
138 state: &mut OpState,
139) -> (ResourceId, ResourceId) {
140 let (port1, port2) = create_entangled_message_port();
141
142 let port1_id = state.resource_table.add(MessagePortResource {
143 port: port1,
144 cancel: CancelHandle::new(),
145 });
146
147 let port2_id = state.resource_table.add(MessagePortResource {
148 port: port2,
149 cancel: CancelHandle::new(),
150 });
151
152 (port1_id, port2_id)
153}
154
155#[derive(Deserialize, Serialize)]
156#[serde(tag = "kind", content = "data", rename_all = "camelCase")]
157pub enum JsTransferable {
158 #[serde(rename_all = "camelCase")]
159 MessagePort(ResourceId),
160 ArrayBuffer(u32),
161}
162
163pub fn deserialize_js_transferables(
164 state: &mut OpState,
165 js_transferables: Vec<JsTransferable>,
166) -> Result<Vec<Transferable>, MessagePortError> {
167 let mut transferables = Vec::with_capacity(js_transferables.len());
168 for js_transferable in js_transferables {
169 match js_transferable {
170 JsTransferable::MessagePort(id) => {
171 let resource = state
172 .resource_table
173 .take::<MessagePortResource>(id)
174 .map_err(|_| MessagePortError::InvalidTransfer)?;
175 resource.cancel.cancel();
176 let resource =
177 Rc::try_unwrap(resource).map_err(|_| MessagePortError::NotReady)?;
178 transferables.push(Transferable::MessagePort(resource.port));
179 }
180 JsTransferable::ArrayBuffer(id) => {
181 transferables.push(Transferable::ArrayBuffer(id));
182 }
183 }
184 }
185 Ok(transferables)
186}
187
188pub fn serialize_transferables(
189 state: &mut OpState,
190 transferables: Vec<Transferable>,
191) -> Vec<JsTransferable> {
192 let mut js_transferables = Vec::with_capacity(transferables.len());
193 for transferable in transferables {
194 match transferable {
195 Transferable::MessagePort(port) => {
196 let rid = state.resource_table.add(MessagePortResource {
197 port,
198 cancel: CancelHandle::new(),
199 });
200 js_transferables.push(JsTransferable::MessagePort(rid));
201 }
202 Transferable::ArrayBuffer(id) => {
203 js_transferables.push(JsTransferable::ArrayBuffer(id));
204 }
205 }
206 }
207 js_transferables
208}
209
210#[derive(Deserialize, Serialize)]
211pub struct JsMessageData {
212 pub data: DetachedBuffer,
213 pub transferables: Vec<JsTransferable>,
214}
215
216#[op2]
217pub fn op_message_port_post_message(
218 state: &mut OpState,
219 #[smi] rid: ResourceId,
220 #[serde] data: JsMessageData,
221) -> Result<(), MessagePortError> {
222 for js_transferable in &data.transferables {
223 if let JsTransferable::MessagePort(id) = js_transferable
224 && *id == rid
225 {
226 return Err(MessagePortError::TransferSelf);
227 }
228 }
229
230 let resource = state
231 .resource_table
232 .get::<MessagePortResource>(rid)
233 .map_err(MessagePortError::Resource)?;
234 resource.port.send(state, data)
235}
236
237#[op2(async)]
238#[serde]
239pub async fn op_message_port_recv_message(
240 state: Rc<RefCell<OpState>>,
241 #[smi] rid: ResourceId,
242) -> Result<Option<JsMessageData>, MessagePortError> {
243 let resource = {
244 let state = state.borrow();
245 match state.resource_table.get::<MessagePortResource>(rid) {
246 Ok(resource) => resource,
247 Err(_) => return Ok(None),
248 }
249 };
250 let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
251 resource.port.recv(state).or_cancel(cancel).await?
252}
253
254#[op2]
255#[serde]
256pub fn op_message_port_recv_message_sync(
257 state: &mut OpState, #[smi] rid: ResourceId,
259) -> Result<Option<JsMessageData>, MessagePortError> {
260 let resource = state
261 .resource_table
262 .get::<MessagePortResource>(rid)
263 .map_err(MessagePortError::Resource)?;
264 let mut rx = resource.port.rx.borrow_mut();
265
266 match rx.try_recv() {
267 Ok((d, t)) => Ok(Some(JsMessageData {
268 data: d,
269 transferables: serialize_transferables(state, t),
270 })),
271 Err(TryRecvError::Empty) => Ok(None),
272 Err(TryRecvError::Disconnected) => Ok(None),
273 }
274}