1use crate::ipc::imports::*;
2use crate::ipc::method::*;
3use crate::ipc::notification::*;
4use crate::ipc::target::*;
5
6pub type IpcId = Id64;
8
9struct Pending<F> {
10 _timestamp: Instant,
11 callback: F,
12}
13impl<F> Pending<F> {
14 fn new(callback: F) -> Self {
15 Self {
16 _timestamp: Instant::now(),
17 callback,
18 }
19 }
20}
21
22type PendingMap<Id, F> = Arc<Mutex<AHashMap<Id, Pending<F>>>>;
23
24pub type BorshResponseFn = Arc<
27 Box<dyn Fn(Vec<u8>, ResponseResult<Vec<u8>>, Option<&Duration>) -> Result<()> + Sync + Send>,
28>;
29
30struct Inner<Ops>
31where
32 Ops: OpsT,
33{
34 target: IpcTarget,
35 identifier: String,
36 handler: Mutex<Option<Arc<dyn AsCallback>>>,
37 methods: Mutex<AHashMap<Ops, Arc<dyn MethodTrait>>>,
38 notifications: Mutex<AHashMap<Ops, Arc<dyn NotificationTrait>>>,
39}
40
41pub struct Ipc<Ops>
45where
46 Ops: OpsT,
47{
48 inner: Arc<Inner<Ops>>,
49 _ops: PhantomData<Ops>,
50}
51
52unsafe impl<Ops> Send for Ipc<Ops> where Ops: OpsT {}
53unsafe impl<Ops> Sync for Ipc<Ops> where Ops: OpsT {}
54
55impl<Ops> Drop for Ipc<Ops>
56where
57 Ops: OpsT,
58{
59 fn drop(&mut self) {
60 self.unregister_handler().ok();
61 }
62}
63
64impl<Ops> Ipc<Ops>
65where
66 Ops: OpsT,
67{
68 pub fn try_new_global_binding<Ident>(identifier: Ident) -> Result<Arc<Self>>
72 where
73 Ident: ToString,
74 {
75 let target = IpcTarget::new(global::global().as_ref());
76 let ipc = Self::try_new_binding(&target, identifier)?;
77
78 let ipc_handler_source_ptr = &raw mut IPC_HANDLER_SOURCE;
79
80 unsafe {
81 if (*ipc_handler_source_ptr).is_some() {
82 panic!("global ipc handler already registered");
83 }
84 (*ipc_handler_source_ptr).replace(target);
85 }
86
87 Ok(ipc)
88 }
89
90 pub fn try_new_window_binding<Ident>(
94 window: &Arc<Window>,
95 identifier: Ident,
96 ) -> Result<Arc<Self>>
97 where
98 Ident: ToString,
99 {
100 let window = window.window();
101 let target = IpcTarget::new(window.as_ref());
102 let ipc = Self::try_new_binding(&target, identifier)?;
103
104 let ipc_handler_source_ptr = &raw mut IPC_HANDLER_SOURCE;
105
106 unsafe {
107 if (*ipc_handler_source_ptr).is_some() {
108 panic!("global ipc handler already registered");
109 }
110 (*ipc_handler_source_ptr).replace(target);
111 }
112
113 Ok(ipc)
114 }
115
116 fn try_new_binding<Ident>(target: &IpcTarget, identifier: Ident) -> Result<Arc<Self>>
117 where
118 Ident: ToString,
119 {
120 let ipc = Arc::new(Ipc {
121 inner: Arc::new(Inner {
122 target: target.clone(),
123 identifier: identifier.to_string(),
124 handler: Mutex::new(None),
125 methods: Mutex::new(AHashMap::default()),
126 notifications: Mutex::new(AHashMap::default()),
127 }),
128 _ops: PhantomData,
129 });
130
131 ipc.register_handler()?;
132
133 Ok(ipc)
134 }
135
136 fn register_handler(self: &Arc<Self>) -> Result<()> {
137 let this = self.clone();
138 let handler = Arc::new(callback!(move |message: ArrayBuffer, source: JsValue| {
139 let this = this.clone();
140
141 let message = Uint8Array::new(&message);
142 let vec = message.to_vec();
143
144 let source = if source == JsValue::NULL {
145 None
146 } else {
147 Some(IpcTarget::new(source.as_ref()))
148 };
149
150 spawn(async move {
151 match BorshMessage::<IpcId>::try_from(&vec) {
152 Ok(message) => {
153 if let Err(err) = this.handle_message(message, source).await {
154 log_error!("IPC: handler error: {:?}", err);
155 }
156 }
157 Err(err) => {
158 log_error!("Failed to deserialize ipc message: {:?}", err);
159 }
160 }
161 })
162 }));
163
164 js_sys::Reflect::set(
165 self.inner.target.as_ref(),
166 &JsValue::from_str("ipc_handler"),
167 handler.get_fn(),
168 )?;
169 js_sys::Reflect::set(
170 self.inner.target.as_ref(),
171 &JsValue::from_str("ipc_identifier"),
172 &JsValue::from(&self.inner.identifier),
173 )?;
174
175 self.inner.handler.lock().unwrap().replace(handler);
176
177 Ok(())
178 }
179
180 fn unregister_handler(&self) -> Result<()> {
181 if let Some(_handler) = self.inner.handler.lock().unwrap().take() {
182 let object = Object::from(self.inner.target.as_ref().clone());
183 js_sys::Reflect::delete_property(&object, &JsValue::from_str("ipc_handler"))?;
184 js_sys::Reflect::delete_property(&object, &JsValue::from_str("ipc_identifier"))?;
185 }
186
187 Ok(())
188 }
189
190 pub async fn handle_message(
194 &self,
195 message: BorshMessage<'_, IpcId>,
196 source: Option<IpcTarget>,
197 ) -> Result<()> {
198 let BorshMessage::<IpcId> { header, payload } = message;
199 let BorshHeader::<IpcId> { op, id, kind } = header;
200 match kind {
201 MessageKind::Request => {
202 let source = source.unwrap_or_else(|| {
203 panic!("ipc received a call request with no source: {:?}", op)
204 });
205
206 let op = Ops::try_from_slice(&op)?;
207
208 let method = self.inner.methods.lock().unwrap().get(&op).cloned();
209 if let Some(method) = method {
210 let result = method.call_with_borsh(payload).await;
211 let buffer = borsh::to_vec(&result)?;
212 source.call_ipc(
213 to_msg::<Ops, IpcId>(BorshHeader::response(id, op), &buffer)?.as_ref(),
214 None,
215 )?;
216 } else {
217 log_error!("ipc method handler not found: {:?}", op);
218 let resp: ResponseResult<()> = Err(ResponseError::NotFound);
219 let buffer = borsh::to_vec(&resp)?;
220 source.call_ipc(
221 to_msg::<Ops, IpcId>(BorshHeader::response(id, op), &buffer)?.as_ref(),
222 None,
223 )?;
224 }
225 }
226 MessageKind::Notification => {
227 let op = Ops::try_from_slice(&op)?;
228
229 let notification = self.inner.notifications.lock().unwrap().get(&op).cloned();
230
231 if let Some(notification) = notification {
232 match notification.call_with_borsh(payload).await {
233 Ok(_resp) => {}
234 Err(err) => {
235 log_error!("ipc notification error: {:?}", err);
236 }
237 }
238 } else {
239 log_error!("ipc notification handler not found: {:?}", op);
240 }
241 }
242 MessageKind::Response => {
243 let id = id.expect("ipc missing success response id");
244 let mut pending = pending().lock().unwrap();
246 match pending.remove(&id) {
247 Some(pending) => {
248 let resp = ResponseResult::<Vec<u8>>::try_from_slice(payload)?;
249 (pending.callback)(op, resp, None)?;
250 }
251 _ => {
252 log_error!("ipc response id not found: {:?}", id);
253 }
254 }
255 }
256 }
257
258 Ok(())
259 }
260
261 pub fn method<Req, Resp>(&self, op: Ops, method: Method<Req, Resp>)
264 where
265 Ops: Debug + Clone,
266 Req: MsgT,
267 Resp: MsgT,
268 {
269 let method: Arc<dyn MethodTrait> = Arc::new(method);
270 if self
271 .inner
272 .methods
273 .lock()
274 .unwrap()
275 .insert(op.clone(), method)
276 .is_some()
277 {
278 panic!("RPC method {op:?} is declared multiple times")
279 }
280 }
281
282 pub fn notification<Msg>(&self, op: Ops, method: Notification<Msg>)
285 where
286 Ops: Debug + Clone,
287 Msg: MsgT,
288 {
289 let method: Arc<dyn NotificationTrait> = Arc::new(method);
290 if self
291 .inner
292 .notifications
293 .lock()
294 .unwrap()
295 .insert(op.clone(), method)
296 .is_some()
297 {
298 panic!("RPC notification {op:?} is declared multiple times")
299 }
300 }
301}
302
303trait IpcHandler {
304 fn call_ipc(&self, data: &JsValue, source: Option<&IpcTarget>) -> Result<()>;
305}
306
307impl IpcHandler for IpcTarget {
308 fn call_ipc(&self, data: &JsValue, source: Option<&IpcTarget>) -> Result<()> {
309 let target_fn = js_sys::Reflect::get(self.as_ref(), &JsValue::from_str("ipc_handler"))?;
310
311 let target_fn = target_fn.unchecked_into::<js_sys::Function>();
312
313 if let Some(source) = source {
314 target_fn.call2(
315 &JsValue::UNDEFINED,
316 &JsValue::from(data),
317 &JsValue::from(source.as_ref()),
318 )?;
319 } else {
320 target_fn.call2(&JsValue::UNDEFINED, &JsValue::from(data), &JsValue::NULL)?;
321 }
322
323 Ok(())
324 }
325}
326
327static mut PENDING: Option<PendingMap<IpcId, BorshResponseFn>> = None; fn pending() -> &'static mut PendingMap<IpcId, BorshResponseFn> {
329 let pending_ptr = &raw mut PENDING;
330 unsafe {
331 if (*pending_ptr).is_none() {
332 PENDING = Some(PendingMap::default());
333 }
334 (*pending_ptr).as_mut().unwrap()
335 }
336}
337
338static mut IPC_HANDLER_SOURCE: Option<IpcTarget> = None;
339
340#[async_trait]
344pub trait IpcDispatch {
345 fn as_target(&self) -> IpcTarget;
348
349 async fn notify<Ops, Msg>(&self, op: Ops, payload: Msg) -> Result<()>
352 where
353 Ops: OpsT,
354 Msg: BorshSerialize + Send + Sync + 'static,
355 {
356 let payload = borsh::to_vec(&payload).map_err(|_| Error::BorshSerialize)?;
357 self.as_target().call_ipc(
358 to_msg::<Ops, IpcId>(BorshHeader::notification::<Ops>(op), &payload)?.as_ref(),
359 None,
360 )?;
361 Ok(())
362 }
363
364 async fn call<Ops, Req, Resp>(&self, op: Ops, req: Req) -> Result<Resp>
368 where
369 Ops: OpsT,
370 Req: MsgT,
371 Resp: MsgT,
372 {
373 let ipc_handler_source_ptr = &raw const IPC_HANDLER_SOURCE;
374 let source = unsafe {
375 (*ipc_handler_source_ptr)
376 .as_ref()
377 .cloned()
378 .expect("missing ipc handler source (please register a local IPC object)")
379 };
380 self.call_with_source(op, req, &source).await
381 }
382
383 async fn call_with_source<Ops, Req, Resp>(
386 &self,
387 op: Ops,
388 req: Req,
389 source: &IpcTarget,
390 ) -> Result<Resp>
391 where
392 Ops: OpsT,
393 Req: MsgT,
394 Resp: MsgT,
395 {
396 let payload = borsh::to_vec(&req).map_err(|_| Error::BorshSerialize)?;
397
398 let id = Id64::generate();
399 let (sender, receiver) = oneshot();
400
401 {
402 let mut pending = pending().lock().unwrap();
403 pending.insert(
404 id.clone(),
405 Pending::new(Arc::new(Box::new(move |op, result, _duration| {
406 sender.try_send((op, result.map(|data| data.to_vec())))?;
407 Ok(())
408 }))),
409 );
410 }
411
412 self.as_target().call_ipc(
413 to_msg::<Ops, IpcId>(BorshHeader::request::<Ops>(Some(id), op.clone()), &payload)?
414 .as_ref(),
415 Some(source),
416 )?;
417
418 let (op_, data) = receiver.recv().await?;
419
420 let op_ = Ops::try_from_slice(&op_)?;
421 if op != op_ {
422 return Err(Error::Custom(format!(
423 "ipc op mismatch: expected {:?}, got {:?}",
424 op, op_
425 )));
426 }
427
428 let resp = ResponseResult::<Resp>::try_from_slice(data?.as_ref())
429 .map_err(|e| Error::BorshDeserialize(e.to_string()))?;
430
431 Ok(resp?)
432 }
433}
434
435impl IpcDispatch for IpcTarget {
436 fn as_target(&self) -> IpcTarget {
437 self.clone()
438 }
439}
440
441impl IpcDispatch for nw_sys::Window {
442 fn as_target(&self) -> IpcTarget {
443 IpcTarget::new(self.window().as_ref())
444 }
445}
446
447pub async fn get_ipc_target<Ident>(identifier: Ident) -> crate::result::Result<Option<IpcTarget>>
451where
452 Ident: ToString,
453{
454 let ident: String = identifier.to_string();
455
456 if let Some(ipc_ident) =
457 Reflect::get(&global::global(), &JsValue::from("ipc_identifier"))?.as_string()
458 && ipc_ident == ident
459 {
460 return Ok(Some(IpcTarget::new(global::global().as_ref())));
461 }
462
463 let windows = crate::window::get_all_async().await?;
464
465 for window in windows.iter() {
466 let prop =
467 js_sys::Reflect::get(window.window().as_ref(), &JsValue::from("ipc_identifier"))?;
468 if let Some(ipc_ident) = prop.as_string()
469 && ipc_ident == ident
470 {
471 return Ok(Some(IpcTarget::new(window.window().as_ref())));
472 }
473 }
474 Ok(None)
475}