web_rpc/
client.rs

1use std::{
2    cell::RefCell,
3    collections::HashMap,
4    pin::Pin,
5    rc::Rc,
6    task::{Context, Poll},
7};
8
9use futures_channel::oneshot;
10use futures_core::{future::LocalBoxFuture, Future};
11use futures_util::{
12    future::{self, Shared},
13    FutureExt,
14};
15
16#[doc(hidden)]
17pub trait Client {
18    type Request;
19    type Response;
20}
21
22#[doc(hidden)]
23pub type CallbackMap<Response> = HashMap<usize, oneshot::Sender<(Response, js_sys::Array)>>;
24
25#[doc(hidden)]
26pub type Configuration<Request, Response> = (
27    Rc<RefCell<CallbackMap<Response>>>,
28    crate::port::Port,
29    Rc<gloo_events::EventListener>,
30    Shared<LocalBoxFuture<'static, ()>>,
31    Rc<dyn Fn(usize, Request) -> Vec<u8>>,
32    Rc<dyn Fn(usize)>,
33);
34
35/// This future represents a RPC request that is currently being executed. Note that
36/// dropping this future will result in the RPC request being cancelled
37#[must_use = "Either await this future or remove the return type from the RPC method"]
38pub struct RequestFuture<T: 'static> {
39    result: LocalBoxFuture<'static, T>,
40    abort: Pin<Box<RequestAbort>>,
41}
42
43impl<T> RequestFuture<T> {
44    pub fn new(
45        result: impl Future<Output = T> + 'static,
46        dispatcher: Shared<LocalBoxFuture<'static, ()>>,
47        abort: Box<dyn Fn()>,
48    ) -> Self {
49        Self {
50            result: future::select(result.boxed_local(), dispatcher)
51                .map(|select| match select {
52                    future::Either::Left((result, _)) => result,
53                    future::Either::Right(_) => unreachable!("dispatcher should not complete"),
54                })
55                .boxed_local(),
56            abort: Box::pin(RequestAbort {
57                active: true,
58                abort,
59            }),
60        }
61    }
62}
63
64struct RequestAbort {
65    active: bool,
66    abort: Box<dyn Fn()>,
67}
68
69impl Drop for RequestAbort {
70    fn drop(&mut self) {
71        if self.active {
72            (self.abort)();
73        }
74    }
75}
76
77impl<T> Future for RequestFuture<T> {
78    type Output = T;
79
80    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81        let poll_result = self.as_mut().result.poll_unpin(cx);
82        if matches!(poll_result, Poll::Ready(_)) {
83            self.as_mut().abort.active = false;
84        }
85        poll_result
86    }
87}