Skip to main content

web_rpc/
client.rs

1use std::{
2    cell::RefCell,
3    collections::HashMap,
4    pin::Pin,
5    rc::Rc,
6    task::{Context, Poll},
7};
8
9use futures_channel::{mpsc, oneshot};
10use futures_core::{future::LocalBoxFuture, Future};
11use futures_util::{
12    future::{self, Shared},
13    FutureExt,
14};
15
16#[doc(hidden)]
17pub trait Client {
18    type Response;
19}
20
21#[doc(hidden)]
22pub type CallbackMap<Response> = HashMap<usize, oneshot::Sender<(Response, js_sys::Array)>>;
23
24#[doc(hidden)]
25pub type StreamCallbackMap<Response> =
26    HashMap<usize, mpsc::UnboundedSender<(Response, js_sys::Array)>>;
27
28#[doc(hidden)]
29pub type Configuration<Response> = (
30    Rc<RefCell<CallbackMap<Response>>>,
31    Rc<RefCell<StreamCallbackMap<Response>>>,
32    crate::port::Port,
33    Rc<gloo_events::EventListener>,
34    Shared<LocalBoxFuture<'static, ()>>,
35    Rc<dyn Fn(usize)>,
36);
37
38/// This future represents a RPC request that is currently being executed. Note that
39/// dropping this future will result in the RPC request being cancelled
40#[must_use = "Either await this future or remove the return type from the RPC method"]
41pub struct RequestFuture<T: 'static> {
42    result: LocalBoxFuture<'static, T>,
43    abort: Pin<Box<RequestAbort>>,
44}
45
46impl<T> RequestFuture<T> {
47    pub fn new(
48        result: impl Future<Output = T> + 'static,
49        dispatcher: Shared<LocalBoxFuture<'static, ()>>,
50        abort: Box<dyn Fn()>,
51    ) -> Self {
52        Self {
53            result: future::select(result.boxed_local(), dispatcher)
54                .map(|select| match select {
55                    future::Either::Left((result, _)) => result,
56                    future::Either::Right(_) => unreachable!("dispatcher should not complete"),
57                })
58                .boxed_local(),
59            abort: Box::pin(RequestAbort {
60                active: true,
61                abort,
62            }),
63        }
64    }
65}
66
67struct RequestAbort {
68    active: bool,
69    abort: Box<dyn Fn()>,
70}
71
72impl Drop for RequestAbort {
73    fn drop(&mut self) {
74        if self.active {
75            (self.abort)();
76        }
77    }
78}
79
80impl<T> Future for RequestFuture<T> {
81    type Output = T;
82
83    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
84        let poll_result = self.as_mut().result.poll_unpin(cx);
85        if matches!(poll_result, Poll::Ready(_)) {
86            self.as_mut().abort.active = false;
87        }
88        poll_result
89    }
90}
91
92/// A stream of items from a streaming RPC method. Dropping this will send an
93/// abort to the server, cancelling the stream. Call [`close`](StreamReceiver::close)
94/// to stop the server while still draining buffered items.
95pub struct StreamReceiver<T: 'static> {
96    inner: Pin<Box<dyn futures_core::Stream<Item = T>>>,
97    dispatcher: Shared<LocalBoxFuture<'static, ()>>,
98    abort: Pin<Box<StreamAbort>>,
99}
100
101struct StreamAbort {
102    active: bool,
103    abort: Box<dyn Fn()>,
104}
105
106impl<T> StreamReceiver<T> {
107    pub fn new(
108        inner: impl futures_core::Stream<Item = T> + 'static,
109        dispatcher: Shared<LocalBoxFuture<'static, ()>>,
110        abort: Box<dyn Fn()>,
111    ) -> Self {
112        Self {
113            inner: Box::pin(inner),
114            dispatcher,
115            abort: Box::pin(StreamAbort {
116                active: true,
117                abort,
118            }),
119        }
120    }
121
122    /// Stop the server from producing more items. Buffered items can still
123    /// be drained by continuing to poll the stream.
124    pub fn close(&mut self) {
125        if self.abort.active {
126            (self.abort.abort)();
127            self.abort.active = false;
128        }
129    }
130}
131
132impl<T> futures_core::Stream for StreamReceiver<T> {
133    type Item = T;
134
135    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136        match self.inner.as_mut().poll_next(cx) {
137            Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
138            Poll::Ready(None) => {
139                self.abort.active = false;
140                Poll::Ready(None)
141            }
142            Poll::Pending => match self.dispatcher.poll_unpin(cx) {
143                Poll::Ready(_) => unreachable!("dispatcher should not complete"),
144                Poll::Pending => Poll::Pending,
145            },
146        }
147    }
148}
149
150impl Drop for StreamAbort {
151    fn drop(&mut self) {
152        if self.active {
153            (self.abort)();
154        }
155    }
156}