1use std::{
2 cell::RefCell,
3 collections::HashMap,
4 pin::Pin,
5 rc::Rc,
6 task::{Context, Poll},
7};
8
9use futures_channel::{mpsc, oneshot};
10use futures_core::{future::LocalBoxFuture, Future};
11use futures_util::{
12 future::{self, Shared},
13 FutureExt,
14};
15
16#[doc(hidden)]
17pub trait Client {
18 type Response;
19}
20
21#[doc(hidden)]
22pub type CallbackMap<Response> = HashMap<usize, oneshot::Sender<(Response, js_sys::Array)>>;
23
24#[doc(hidden)]
25pub type StreamCallbackMap<Response> =
26 HashMap<usize, mpsc::UnboundedSender<(Response, js_sys::Array)>>;
27
28#[doc(hidden)]
29pub type Configuration<Response> = (
30 Rc<RefCell<CallbackMap<Response>>>,
31 Rc<RefCell<StreamCallbackMap<Response>>>,
32 crate::port::Port,
33 Rc<gloo_events::EventListener>,
34 Shared<LocalBoxFuture<'static, ()>>,
35 Rc<dyn Fn(usize)>,
36);
37
38#[must_use = "Either await this future or remove the return type from the RPC method"]
41pub struct RequestFuture<T: 'static> {
42 result: LocalBoxFuture<'static, T>,
43 abort: Pin<Box<RequestAbort>>,
44}
45
46impl<T> RequestFuture<T> {
47 pub fn new(
48 result: impl Future<Output = T> + 'static,
49 dispatcher: Shared<LocalBoxFuture<'static, ()>>,
50 abort: Box<dyn Fn()>,
51 ) -> Self {
52 Self {
53 result: future::select(result.boxed_local(), dispatcher)
54 .map(|select| match select {
55 future::Either::Left((result, _)) => result,
56 future::Either::Right(_) => unreachable!("dispatcher should not complete"),
57 })
58 .boxed_local(),
59 abort: Box::pin(RequestAbort {
60 active: true,
61 abort,
62 }),
63 }
64 }
65}
66
67struct RequestAbort {
68 active: bool,
69 abort: Box<dyn Fn()>,
70}
71
72impl Drop for RequestAbort {
73 fn drop(&mut self) {
74 if self.active {
75 (self.abort)();
76 }
77 }
78}
79
80impl<T> Future for RequestFuture<T> {
81 type Output = T;
82
83 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
84 let poll_result = self.as_mut().result.poll_unpin(cx);
85 if matches!(poll_result, Poll::Ready(_)) {
86 self.as_mut().abort.active = false;
87 }
88 poll_result
89 }
90}
91
92pub struct StreamReceiver<T: 'static> {
96 inner: Pin<Box<dyn futures_core::Stream<Item = T>>>,
97 dispatcher: Shared<LocalBoxFuture<'static, ()>>,
98 abort: Pin<Box<StreamAbort>>,
99}
100
101struct StreamAbort {
102 active: bool,
103 abort: Box<dyn Fn()>,
104}
105
106impl<T> StreamReceiver<T> {
107 pub fn new(
108 inner: impl futures_core::Stream<Item = T> + 'static,
109 dispatcher: Shared<LocalBoxFuture<'static, ()>>,
110 abort: Box<dyn Fn()>,
111 ) -> Self {
112 Self {
113 inner: Box::pin(inner),
114 dispatcher,
115 abort: Box::pin(StreamAbort {
116 active: true,
117 abort,
118 }),
119 }
120 }
121
122 pub fn close(&mut self) {
125 if self.abort.active {
126 (self.abort.abort)();
127 self.abort.active = false;
128 }
129 }
130}
131
132impl<T> futures_core::Stream for StreamReceiver<T> {
133 type Item = T;
134
135 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136 match self.inner.as_mut().poll_next(cx) {
137 Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
138 Poll::Ready(None) => {
139 self.abort.active = false;
140 Poll::Ready(None)
141 }
142 Poll::Pending => match self.dispatcher.poll_unpin(cx) {
143 Poll::Ready(_) => unreachable!("dispatcher should not complete"),
144 Poll::Pending => Poll::Pending,
145 },
146 }
147 }
148}
149
150impl Drop for StreamAbort {
151 fn drop(&mut self) {
152 if self.active {
153 (self.abort)();
154 }
155 }
156}