workflow_nw/ipc/
ipc.rs

1use crate::ipc::imports::*;
2use crate::ipc::method::*;
3use crate::ipc::notification::*;
4use crate::ipc::target::*;
5
6pub type IpcId = Id64;
7
8struct Pending<F> {
9    _timestamp: Instant,
10    callback: F,
11}
12impl<F> Pending<F> {
13    fn new(callback: F) -> Self {
14        Self {
15            _timestamp: Instant::now(),
16            callback,
17        }
18    }
19}
20
21type PendingMap<Id, F> = Arc<Mutex<AHashMap<Id, Pending<F>>>>;
22
23pub type BorshResponseFn = Arc<
24    Box<(dyn Fn(Vec<u8>, ResponseResult<Vec<u8>>, Option<&Duration>) -> Result<()> + Sync + Send)>,
25>;
26
27struct Inner<Ops>
28where
29    Ops: OpsT,
30{
31    target: IpcTarget,
32    identifier: String,
33    handler: Mutex<Option<Arc<dyn AsCallback>>>,
34    methods: Mutex<AHashMap<Ops, Arc<dyn MethodTrait>>>,
35    notifications: Mutex<AHashMap<Ops, Arc<dyn NotificationTrait>>>,
36}
37
38pub struct Ipc<Ops>
39where
40    Ops: OpsT,
41{
42    inner: Arc<Inner<Ops>>,
43    _ops: PhantomData<Ops>,
44}
45
46unsafe impl<Ops> Send for Ipc<Ops> where Ops: OpsT {}
47unsafe impl<Ops> Sync for Ipc<Ops> where Ops: OpsT {}
48
49impl<Ops> Drop for Ipc<Ops>
50where
51    Ops: OpsT,
52{
53    fn drop(&mut self) {
54        self.unregister_handler().ok();
55    }
56}
57
58impl<Ops> Ipc<Ops>
59where
60    Ops: OpsT,
61{
62    pub fn try_new_global_binding<Ident>(identifier: Ident) -> Result<Arc<Self>>
63    where
64        Ident: ToString,
65    {
66        let target = IpcTarget::new(global::global().as_ref());
67        let ipc = Self::try_new_binding(&target, identifier)?;
68
69        unsafe {
70            if IPC_HANDLER_SOURCE.is_some() {
71                panic!("global ipc handler already registered");
72            }
73            IPC_HANDLER_SOURCE.replace(target);
74        }
75
76        Ok(ipc)
77    }
78
79    pub fn try_new_window_binding<Ident>(
80        window: &Arc<Window>,
81        identifier: Ident,
82    ) -> Result<Arc<Self>>
83    where
84        Ident: ToString,
85    {
86        let window = window.window();
87        let target = IpcTarget::new(window.as_ref());
88        let ipc = Self::try_new_binding(&target, identifier)?;
89
90        unsafe {
91            if IPC_HANDLER_SOURCE.is_some() {
92                panic!("global ipc handler already registered");
93            }
94            IPC_HANDLER_SOURCE.replace(target);
95        }
96
97        Ok(ipc)
98    }
99
100    fn try_new_binding<Ident>(target: &IpcTarget, identifier: Ident) -> Result<Arc<Self>>
101    where
102        Ident: ToString,
103    {
104        let ipc = Arc::new(Ipc {
105            inner: Arc::new(Inner {
106                target: target.clone(),
107                identifier: identifier.to_string(),
108                handler: Mutex::new(None),
109                methods: Mutex::new(AHashMap::default()),
110                notifications: Mutex::new(AHashMap::default()),
111            }),
112            _ops: PhantomData,
113        });
114
115        ipc.register_handler()?;
116
117        Ok(ipc)
118    }
119
120    fn register_handler(self: &Arc<Self>) -> Result<()> {
121        let this = self.clone();
122        let handler = Arc::new(callback!(move |message: ArrayBuffer, source: JsValue| {
123            let this = this.clone();
124
125            let message = Uint8Array::new(&message);
126            let vec = message.to_vec();
127
128            let source = if source == JsValue::NULL {
129                None
130            } else {
131                Some(IpcTarget::new(source.as_ref()))
132            };
133
134            spawn(async move {
135                match BorshMessage::<IpcId>::try_from(&vec) {
136                    Ok(message) => {
137                        if let Err(err) = this.handle_message(message, source).await {
138                            log_error!("IPC: handler error: {:?}", err);
139                        }
140                    }
141                    Err(err) => {
142                        log_error!("Failed to deserialize ipc message: {:?}", err);
143                    }
144                }
145            })
146        }));
147
148        js_sys::Reflect::set(
149            self.inner.target.as_ref(),
150            &JsValue::from_str("ipc_handler"),
151            handler.get_fn(),
152        )?;
153        js_sys::Reflect::set(
154            self.inner.target.as_ref(),
155            &JsValue::from_str("ipc_identifier"),
156            &JsValue::from(&self.inner.identifier),
157        )?;
158
159        self.inner.handler.lock().unwrap().replace(handler);
160
161        Ok(())
162    }
163
164    fn unregister_handler(&self) -> Result<()> {
165        if let Some(_handler) = self.inner.handler.lock().unwrap().take() {
166            let object = Object::from(self.inner.target.as_ref().clone());
167            js_sys::Reflect::delete_property(&object, &JsValue::from_str("ipc_handler"))?;
168            js_sys::Reflect::delete_property(&object, &JsValue::from_str("ipc_identifier"))?;
169        }
170
171        Ok(())
172    }
173
174    pub async fn handle_message<'data>(
175        &self,
176        message: BorshMessage<'data, IpcId>,
177        source: Option<IpcTarget>,
178    ) -> Result<()> {
179        let BorshMessage::<IpcId> { header, payload } = message;
180        let BorshHeader::<IpcId> { op, id, kind } = header;
181        match kind {
182            MessageKind::Request => {
183                let source = source.unwrap_or_else(|| {
184                    panic!("ipc received a call request with no source: {:?}", op)
185                });
186
187                let op = Ops::try_from_slice(&op)?;
188
189                let method = self.inner.methods.lock().unwrap().get(&op).cloned();
190                if let Some(method) = method {
191                    let result = method.call_with_borsh(payload).await;
192                    let buffer = borsh::to_vec(&result)?;
193                    source.call_ipc(
194                        to_msg::<Ops, IpcId>(BorshHeader::response(id, op), &buffer)?.as_ref(),
195                        None,
196                    )?;
197                } else {
198                    log_error!("ipc method handler not found: {:?}", op);
199                    let resp: ResponseResult<()> = Err(ResponseError::NotFound);
200                    let buffer = borsh::to_vec(&resp)?;
201                    source.call_ipc(
202                        to_msg::<Ops, IpcId>(BorshHeader::response(id, op), &buffer)?.as_ref(),
203                        None,
204                    )?;
205                }
206            }
207            MessageKind::Notification => {
208                let op = Ops::try_from_slice(&op)?;
209
210                let notification = self.inner.notifications.lock().unwrap().get(&op).cloned();
211
212                if let Some(notification) = notification {
213                    match notification.call_with_borsh(payload).await {
214                        Ok(_resp) => {}
215                        Err(err) => {
216                            log_error!("ipc notification error: {:?}", err);
217                        }
218                    }
219                } else {
220                    log_error!("ipc notification handler not found: {:?}", op);
221                }
222            }
223            MessageKind::Response => {
224                let id = id.expect("ipc missing success response id");
225                // let id = Id64::from(id);
226                let mut pending = pending().lock().unwrap();
227                if let Some(pending) = pending.remove(&id) {
228                    let resp = ResponseResult::<Vec<u8>>::try_from_slice(payload)?;
229                    (pending.callback)(op, resp, None)?;
230                } else {
231                    log_error!("ipc response id not found: {:?}", id);
232                }
233            }
234        }
235
236        Ok(())
237    }
238
239    pub fn method<Req, Resp>(&self, op: Ops, method: Method<Req, Resp>)
240    where
241        Ops: Debug + Clone,
242        Req: MsgT,
243        Resp: MsgT,
244    {
245        let method: Arc<dyn MethodTrait> = Arc::new(method);
246        if self
247            .inner
248            .methods
249            .lock()
250            .unwrap()
251            .insert(op.clone(), method)
252            .is_some()
253        {
254            panic!("RPC method {op:?} is declared multiple times")
255        }
256    }
257
258    pub fn notification<Msg>(&self, op: Ops, method: Notification<Msg>)
259    where
260        Ops: Debug + Clone,
261        Msg: MsgT,
262    {
263        let method: Arc<dyn NotificationTrait> = Arc::new(method);
264        if self
265            .inner
266            .notifications
267            .lock()
268            .unwrap()
269            .insert(op.clone(), method)
270            .is_some()
271        {
272            panic!("RPC notification {op:?} is declared multiple times")
273        }
274    }
275}
276
277trait IpcHandler {
278    fn call_ipc(&self, data: &JsValue, source: Option<&IpcTarget>) -> Result<()>;
279}
280
281impl IpcHandler for IpcTarget {
282    fn call_ipc(&self, data: &JsValue, source: Option<&IpcTarget>) -> Result<()> {
283        let target_fn = js_sys::Reflect::get(self.as_ref(), &JsValue::from_str("ipc_handler"))?;
284
285        let target_fn = target_fn.unchecked_into::<js_sys::Function>();
286
287        if let Some(source) = source {
288            target_fn.call2(
289                &JsValue::UNDEFINED,
290                &JsValue::from(data),
291                &JsValue::from(source.as_ref()),
292            )?;
293        } else {
294            target_fn.call2(&JsValue::UNDEFINED, &JsValue::from(data), &JsValue::NULL)?;
295        }
296
297        Ok(())
298    }
299}
300
301static mut PENDING: Option<PendingMap<IpcId, BorshResponseFn>> = None; //PendingMap::default();
302fn pending() -> &'static mut PendingMap<IpcId, BorshResponseFn> {
303    unsafe {
304        if PENDING.is_none() {
305            PENDING = Some(PendingMap::default());
306        }
307        PENDING.as_mut().unwrap()
308    }
309}
310
311static mut IPC_HANDLER_SOURCE: Option<IpcTarget> = None;
312
313#[async_trait]
314pub trait IpcDispatch {
315    fn as_target(&self) -> IpcTarget;
316
317    async fn notify<Ops, Msg>(&self, op: Ops, payload: Msg) -> Result<()>
318    where
319        Ops: OpsT,
320        Msg: BorshSerialize + Send + Sync + 'static,
321    {
322        let payload = borsh::to_vec(&payload).map_err(|_| Error::BorshSerialize)?;
323        self.as_target().call_ipc(
324            to_msg::<Ops, IpcId>(BorshHeader::notification::<Ops>(op), &payload)?.as_ref(),
325            None,
326        )?;
327        Ok(())
328    }
329
330    async fn call<Ops, Req, Resp>(&self, op: Ops, req: Req) -> Result<Resp>
331    where
332        Ops: OpsT,
333        Req: MsgT,
334        Resp: MsgT,
335    {
336        let source = unsafe {
337            IPC_HANDLER_SOURCE
338                .as_ref()
339                .cloned()
340                .expect("missing ipc handler source (please register a local IPC object)")
341        };
342        self.call_with_source(op, req, &source).await
343    }
344
345    async fn call_with_source<Ops, Req, Resp>(
346        &self,
347        op: Ops,
348        req: Req,
349        source: &IpcTarget,
350    ) -> Result<Resp>
351    where
352        Ops: OpsT,
353        Req: MsgT,
354        Resp: MsgT,
355    {
356        let payload = borsh::to_vec(&req).map_err(|_| Error::BorshSerialize)?;
357
358        let id = Id64::generate();
359        let (sender, receiver) = oneshot();
360
361        {
362            let mut pending = pending().lock().unwrap();
363            pending.insert(
364                id.clone(),
365                Pending::new(Arc::new(Box::new(move |op, result, _duration| {
366                    sender.try_send((op, result.map(|data| data.to_vec())))?;
367                    Ok(())
368                }))),
369            );
370        }
371
372        self.as_target().call_ipc(
373            to_msg::<Ops, IpcId>(BorshHeader::request::<Ops>(Some(id), op.clone()), &payload)?
374                .as_ref(),
375            Some(source),
376        )?;
377
378        let (op_, data) = receiver.recv().await?;
379
380        let op_ = Ops::try_from_slice(&op_)?;
381        if op != op_ {
382            return Err(Error::Custom(format!(
383                "ipc op mismatch: expected {:?}, got {:?}",
384                op, op_
385            )));
386        }
387
388        let resp = ResponseResult::<Resp>::try_from_slice(data?.as_ref())
389            .map_err(|e| Error::BorshDeserialize(e.to_string()))?;
390
391        Ok(resp?)
392    }
393}
394
395impl IpcDispatch for IpcTarget {
396    fn as_target(&self) -> IpcTarget {
397        self.clone()
398    }
399}
400
401impl IpcDispatch for nw_sys::Window {
402    fn as_target(&self) -> IpcTarget {
403        IpcTarget::new(self.window().as_ref())
404    }
405}
406
407pub async fn get_ipc_target<Ident>(identifier: Ident) -> crate::result::Result<Option<IpcTarget>>
408where
409    Ident: ToString,
410{
411    let ident: String = identifier.to_string();
412
413    if let Some(ipc_ident) =
414        Reflect::get(&global::global(), &JsValue::from("ipc_identifier"))?.as_string()
415    {
416        if ipc_ident == ident {
417            return Ok(Some(IpcTarget::new(global::global().as_ref())));
418        }
419    }
420
421    let windows = crate::window::get_all_async().await?;
422
423    for window in windows.iter() {
424        let prop =
425            js_sys::Reflect::get(window.window().as_ref(), &JsValue::from("ipc_identifier"))?;
426        if let Some(ipc_ident) = prop.as_string() {
427            if ipc_ident == ident {
428                return Ok(Some(IpcTarget::new(window.window().as_ref())));
429            }
430        }
431    }
432    Ok(None)
433}