Skip to main content

workflow_nw/ipc/
ipc.rs

1use crate::ipc::imports::*;
2use crate::ipc::method::*;
3use crate::ipc::notification::*;
4use crate::ipc::target::*;
5
6/// Identifier type used to correlate IPC requests with their responses.
7pub type IpcId = Id64;
8
9struct Pending<F> {
10    _timestamp: Instant,
11    callback: F,
12}
13impl<F> Pending<F> {
14    fn new(callback: F) -> Self {
15        Self {
16            _timestamp: Instant::now(),
17            callback,
18        }
19    }
20}
21
22type PendingMap<Id, F> = Arc<Mutex<AHashMap<Id, Pending<F>>>>;
23
24/// Callback type invoked to deliver a Borsh-encoded response (or error) back
25/// to the originator of a pending IPC request.
26pub type BorshResponseFn = Arc<
27    Box<dyn Fn(Vec<u8>, ResponseResult<Vec<u8>>, Option<&Duration>) -> Result<()> + Sync + Send>,
28>;
29
30struct Inner<Ops>
31where
32    Ops: OpsT,
33{
34    target: IpcTarget,
35    identifier: String,
36    handler: Mutex<Option<Arc<dyn AsCallback>>>,
37    methods: Mutex<AHashMap<Ops, Arc<dyn MethodTrait>>>,
38    notifications: Mutex<AHashMap<Ops, Arc<dyn NotificationTrait>>>,
39}
40
41/// IPC endpoint that handles inbound messages for a target context and
42/// maintains the registered method and notification handlers, keyed by the
43/// operation type `Ops`.
44pub struct Ipc<Ops>
45where
46    Ops: OpsT,
47{
48    inner: Arc<Inner<Ops>>,
49    _ops: PhantomData<Ops>,
50}
51
52unsafe impl<Ops> Send for Ipc<Ops> where Ops: OpsT {}
53unsafe impl<Ops> Sync for Ipc<Ops> where Ops: OpsT {}
54
55impl<Ops> Drop for Ipc<Ops>
56where
57    Ops: OpsT,
58{
59    fn drop(&mut self) {
60        self.unregister_handler().ok();
61    }
62}
63
64impl<Ops> Ipc<Ops>
65where
66    Ops: OpsT,
67{
68    /// Creates an IPC binding bound to the global context, registering it as
69    /// the global IPC handler source. Panics if a handler source is already
70    /// registered.
71    pub fn try_new_global_binding<Ident>(identifier: Ident) -> Result<Arc<Self>>
72    where
73        Ident: ToString,
74    {
75        let target = IpcTarget::new(global::global().as_ref());
76        let ipc = Self::try_new_binding(&target, identifier)?;
77
78        let ipc_handler_source_ptr = &raw mut IPC_HANDLER_SOURCE;
79
80        unsafe {
81            if (*ipc_handler_source_ptr).is_some() {
82                panic!("global ipc handler already registered");
83            }
84            (*ipc_handler_source_ptr).replace(target);
85        }
86
87        Ok(ipc)
88    }
89
90    /// Creates an IPC binding bound to the given window's context, registering
91    /// it as the global IPC handler source. Panics if a handler source is
92    /// already registered.
93    pub fn try_new_window_binding<Ident>(
94        window: &Arc<Window>,
95        identifier: Ident,
96    ) -> Result<Arc<Self>>
97    where
98        Ident: ToString,
99    {
100        let window = window.window();
101        let target = IpcTarget::new(window.as_ref());
102        let ipc = Self::try_new_binding(&target, identifier)?;
103
104        let ipc_handler_source_ptr = &raw mut IPC_HANDLER_SOURCE;
105
106        unsafe {
107            if (*ipc_handler_source_ptr).is_some() {
108                panic!("global ipc handler already registered");
109            }
110            (*ipc_handler_source_ptr).replace(target);
111        }
112
113        Ok(ipc)
114    }
115
116    fn try_new_binding<Ident>(target: &IpcTarget, identifier: Ident) -> Result<Arc<Self>>
117    where
118        Ident: ToString,
119    {
120        let ipc = Arc::new(Ipc {
121            inner: Arc::new(Inner {
122                target: target.clone(),
123                identifier: identifier.to_string(),
124                handler: Mutex::new(None),
125                methods: Mutex::new(AHashMap::default()),
126                notifications: Mutex::new(AHashMap::default()),
127            }),
128            _ops: PhantomData,
129        });
130
131        ipc.register_handler()?;
132
133        Ok(ipc)
134    }
135
136    fn register_handler(self: &Arc<Self>) -> Result<()> {
137        let this = self.clone();
138        let handler = Arc::new(callback!(move |message: ArrayBuffer, source: JsValue| {
139            let this = this.clone();
140
141            let message = Uint8Array::new(&message);
142            let vec = message.to_vec();
143
144            let source = if source == JsValue::NULL {
145                None
146            } else {
147                Some(IpcTarget::new(source.as_ref()))
148            };
149
150            spawn(async move {
151                match BorshMessage::<IpcId>::try_from(&vec) {
152                    Ok(message) => {
153                        if let Err(err) = this.handle_message(message, source).await {
154                            log_error!("IPC: handler error: {:?}", err);
155                        }
156                    }
157                    Err(err) => {
158                        log_error!("Failed to deserialize ipc message: {:?}", err);
159                    }
160                }
161            })
162        }));
163
164        js_sys::Reflect::set(
165            self.inner.target.as_ref(),
166            &JsValue::from_str("ipc_handler"),
167            handler.get_fn(),
168        )?;
169        js_sys::Reflect::set(
170            self.inner.target.as_ref(),
171            &JsValue::from_str("ipc_identifier"),
172            &JsValue::from(&self.inner.identifier),
173        )?;
174
175        self.inner.handler.lock().unwrap().replace(handler);
176
177        Ok(())
178    }
179
180    fn unregister_handler(&self) -> Result<()> {
181        if let Some(_handler) = self.inner.handler.lock().unwrap().take() {
182            let object = Object::from(self.inner.target.as_ref().clone());
183            js_sys::Reflect::delete_property(&object, &JsValue::from_str("ipc_handler"))?;
184            js_sys::Reflect::delete_property(&object, &JsValue::from_str("ipc_identifier"))?;
185        }
186
187        Ok(())
188    }
189
190    /// Processes an incoming Borsh-encoded IPC message, dispatching it to the
191    /// appropriate registered method or notification handler, or resolving a
192    /// pending outbound call when the message is a response.
193    pub async fn handle_message(
194        &self,
195        message: BorshMessage<'_, IpcId>,
196        source: Option<IpcTarget>,
197    ) -> Result<()> {
198        let BorshMessage::<IpcId> { header, payload } = message;
199        let BorshHeader::<IpcId> { op, id, kind } = header;
200        match kind {
201            MessageKind::Request => {
202                let source = source.unwrap_or_else(|| {
203                    panic!("ipc received a call request with no source: {:?}", op)
204                });
205
206                let op = Ops::try_from_slice(&op)?;
207
208                let method = self.inner.methods.lock().unwrap().get(&op).cloned();
209                if let Some(method) = method {
210                    let result = method.call_with_borsh(payload).await;
211                    let buffer = borsh::to_vec(&result)?;
212                    source.call_ipc(
213                        to_msg::<Ops, IpcId>(BorshHeader::response(id, op), &buffer)?.as_ref(),
214                        None,
215                    )?;
216                } else {
217                    log_error!("ipc method handler not found: {:?}", op);
218                    let resp: ResponseResult<()> = Err(ResponseError::NotFound);
219                    let buffer = borsh::to_vec(&resp)?;
220                    source.call_ipc(
221                        to_msg::<Ops, IpcId>(BorshHeader::response(id, op), &buffer)?.as_ref(),
222                        None,
223                    )?;
224                }
225            }
226            MessageKind::Notification => {
227                let op = Ops::try_from_slice(&op)?;
228
229                let notification = self.inner.notifications.lock().unwrap().get(&op).cloned();
230
231                if let Some(notification) = notification {
232                    match notification.call_with_borsh(payload).await {
233                        Ok(_resp) => {}
234                        Err(err) => {
235                            log_error!("ipc notification error: {:?}", err);
236                        }
237                    }
238                } else {
239                    log_error!("ipc notification handler not found: {:?}", op);
240                }
241            }
242            MessageKind::Response => {
243                let id = id.expect("ipc missing success response id");
244                // let id = Id64::from(id);
245                let mut pending = pending().lock().unwrap();
246                match pending.remove(&id) {
247                    Some(pending) => {
248                        let resp = ResponseResult::<Vec<u8>>::try_from_slice(payload)?;
249                        (pending.callback)(op, resp, None)?;
250                    }
251                    _ => {
252                        log_error!("ipc response id not found: {:?}", id);
253                    }
254                }
255            }
256        }
257
258        Ok(())
259    }
260
261    /// Registers a request/response method handler for the given operation.
262    /// Panics if a handler for the same operation has already been registered.
263    pub fn method<Req, Resp>(&self, op: Ops, method: Method<Req, Resp>)
264    where
265        Ops: Debug + Clone,
266        Req: MsgT,
267        Resp: MsgT,
268    {
269        let method: Arc<dyn MethodTrait> = Arc::new(method);
270        if self
271            .inner
272            .methods
273            .lock()
274            .unwrap()
275            .insert(op.clone(), method)
276            .is_some()
277        {
278            panic!("RPC method {op:?} is declared multiple times")
279        }
280    }
281
282    /// Registers a notification handler for the given operation. Panics if a
283    /// handler for the same operation has already been registered.
284    pub fn notification<Msg>(&self, op: Ops, method: Notification<Msg>)
285    where
286        Ops: Debug + Clone,
287        Msg: MsgT,
288    {
289        let method: Arc<dyn NotificationTrait> = Arc::new(method);
290        if self
291            .inner
292            .notifications
293            .lock()
294            .unwrap()
295            .insert(op.clone(), method)
296            .is_some()
297        {
298            panic!("RPC notification {op:?} is declared multiple times")
299        }
300    }
301}
302
303trait IpcHandler {
304    fn call_ipc(&self, data: &JsValue, source: Option<&IpcTarget>) -> Result<()>;
305}
306
307impl IpcHandler for IpcTarget {
308    fn call_ipc(&self, data: &JsValue, source: Option<&IpcTarget>) -> Result<()> {
309        let target_fn = js_sys::Reflect::get(self.as_ref(), &JsValue::from_str("ipc_handler"))?;
310
311        let target_fn = target_fn.unchecked_into::<js_sys::Function>();
312
313        if let Some(source) = source {
314            target_fn.call2(
315                &JsValue::UNDEFINED,
316                &JsValue::from(data),
317                &JsValue::from(source.as_ref()),
318            )?;
319        } else {
320            target_fn.call2(&JsValue::UNDEFINED, &JsValue::from(data), &JsValue::NULL)?;
321        }
322
323        Ok(())
324    }
325}
326
327static mut PENDING: Option<PendingMap<IpcId, BorshResponseFn>> = None; //PendingMap::default();
328fn pending() -> &'static mut PendingMap<IpcId, BorshResponseFn> {
329    let pending_ptr = &raw mut PENDING;
330    unsafe {
331        if (*pending_ptr).is_none() {
332            PENDING = Some(PendingMap::default());
333        }
334        (*pending_ptr).as_mut().unwrap()
335    }
336}
337
338static mut IPC_HANDLER_SOURCE: Option<IpcTarget> = None;
339
340/// Trait implemented by types that can act as an IPC peer, providing the
341/// ability to send notifications and issue request/response calls to a
342/// target context (a window or the global object).
343#[async_trait]
344pub trait IpcDispatch {
345    /// Returns the [`IpcTarget`] that messages dispatched through this peer
346    /// should be delivered to.
347    fn as_target(&self) -> IpcTarget;
348
349    /// Sends a one-way notification carrying `op` and a Borsh-serialized
350    /// `payload` to the target context, without awaiting a response.
351    async fn notify<Ops, Msg>(&self, op: Ops, payload: Msg) -> Result<()>
352    where
353        Ops: OpsT,
354        Msg: BorshSerialize + Send + Sync + 'static,
355    {
356        let payload = borsh::to_vec(&payload).map_err(|_| Error::BorshSerialize)?;
357        self.as_target().call_ipc(
358            to_msg::<Ops, IpcId>(BorshHeader::notification::<Ops>(op), &payload)?.as_ref(),
359            None,
360        )?;
361        Ok(())
362    }
363
364    /// Issues a request/response call carrying `op` and `req`, awaiting and
365    /// deserializing the response. Uses the registered local IPC object as the
366    /// reply source.
367    async fn call<Ops, Req, Resp>(&self, op: Ops, req: Req) -> Result<Resp>
368    where
369        Ops: OpsT,
370        Req: MsgT,
371        Resp: MsgT,
372    {
373        let ipc_handler_source_ptr = &raw const IPC_HANDLER_SOURCE;
374        let source = unsafe {
375            (*ipc_handler_source_ptr)
376                .as_ref()
377                .cloned()
378                .expect("missing ipc handler source (please register a local IPC object)")
379        };
380        self.call_with_source(op, req, &source).await
381    }
382
383    /// Like [`call`](Self::call), but routes the response to an explicit
384    /// `source` [`IpcTarget`] instead of the registered local IPC object.
385    async fn call_with_source<Ops, Req, Resp>(
386        &self,
387        op: Ops,
388        req: Req,
389        source: &IpcTarget,
390    ) -> Result<Resp>
391    where
392        Ops: OpsT,
393        Req: MsgT,
394        Resp: MsgT,
395    {
396        let payload = borsh::to_vec(&req).map_err(|_| Error::BorshSerialize)?;
397
398        let id = Id64::generate();
399        let (sender, receiver) = oneshot();
400
401        {
402            let mut pending = pending().lock().unwrap();
403            pending.insert(
404                id.clone(),
405                Pending::new(Arc::new(Box::new(move |op, result, _duration| {
406                    sender.try_send((op, result.map(|data| data.to_vec())))?;
407                    Ok(())
408                }))),
409            );
410        }
411
412        self.as_target().call_ipc(
413            to_msg::<Ops, IpcId>(BorshHeader::request::<Ops>(Some(id), op.clone()), &payload)?
414                .as_ref(),
415            Some(source),
416        )?;
417
418        let (op_, data) = receiver.recv().await?;
419
420        let op_ = Ops::try_from_slice(&op_)?;
421        if op != op_ {
422            return Err(Error::Custom(format!(
423                "ipc op mismatch: expected {:?}, got {:?}",
424                op, op_
425            )));
426        }
427
428        let resp = ResponseResult::<Resp>::try_from_slice(data?.as_ref())
429            .map_err(|e| Error::BorshDeserialize(e.to_string()))?;
430
431        Ok(resp?)
432    }
433}
434
435impl IpcDispatch for IpcTarget {
436    fn as_target(&self) -> IpcTarget {
437        self.clone()
438    }
439}
440
441impl IpcDispatch for nw_sys::Window {
442    fn as_target(&self) -> IpcTarget {
443        IpcTarget::new(self.window().as_ref())
444    }
445}
446
447/// Locates an [`IpcTarget`] (the global context or one of the open windows)
448/// whose registered IPC handler matches the given identifier, returning
449/// `Ok(None)` if no such target exists.
450pub async fn get_ipc_target<Ident>(identifier: Ident) -> crate::result::Result<Option<IpcTarget>>
451where
452    Ident: ToString,
453{
454    let ident: String = identifier.to_string();
455
456    if let Some(ipc_ident) =
457        Reflect::get(&global::global(), &JsValue::from("ipc_identifier"))?.as_string()
458        && ipc_ident == ident
459    {
460        return Ok(Some(IpcTarget::new(global::global().as_ref())));
461    }
462
463    let windows = crate::window::get_all_async().await?;
464
465    for window in windows.iter() {
466        let prop =
467            js_sys::Reflect::get(window.window().as_ref(), &JsValue::from("ipc_identifier"))?;
468        if let Some(ipc_ident) = prop.as_string()
469            && ipc_ident == ident
470        {
471            return Ok(Some(IpcTarget::new(window.window().as_ref())));
472        }
473    }
474    Ok(None)
475}