perspective_js/
client.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::error::Error;
14use std::future::Future;
15use std::sync::Arc;
16
17use derivative::Derivative;
18use futures::channel::oneshot;
19use js_sys::{Function, Uint8Array};
20use macro_rules_attribute::apply;
21#[cfg(doc)]
22use perspective_client::SystemInfo;
23use perspective_client::{
24    ClientError, ReconnectCallback, Session, TableData, TableInitOptions, asyncfn,
25};
26use wasm_bindgen::prelude::*;
27use wasm_bindgen_derive::TryFromJsValue;
28use wasm_bindgen_futures::{JsFuture, future_to_promise};
29
30pub use crate::table::*;
31use crate::utils::{ApiError, ApiResult, JsValueSerdeExt, LocalPollLoop, inherit_docs};
32
33#[wasm_bindgen]
34extern "C" {
35    #[derive(Clone)]
36    #[wasm_bindgen(typescript_type = "TableInitOptions")]
37    pub type JsTableInitOptions;
38}
39
40#[wasm_bindgen]
41#[derive(Clone)]
42pub struct ProxySession(perspective_client::ProxySession);
43
44#[wasm_bindgen]
45impl ProxySession {
46    #[wasm_bindgen(constructor)]
47    pub fn new(client: &Client, on_response: &Function) -> Self {
48        let poll_loop = LocalPollLoop::new({
49            let on_response = on_response.clone();
50            move |msg: Vec<u8>| {
51                let msg = Uint8Array::from(&msg[..]);
52                on_response.call1(&JsValue::UNDEFINED, &JsValue::from(msg))?;
53                Ok(JsValue::null())
54            }
55        });
56        // NB: This swallows any errors raised by the inner callback
57        let on_response = Box::new(move |msg: &[u8]| {
58            wasm_bindgen_futures::spawn_local(poll_loop.poll(msg.to_vec()));
59            Ok(())
60        });
61        Self(perspective_client::ProxySession::new(
62            client.client.clone(),
63            on_response,
64        ))
65    }
66
67    #[wasm_bindgen]
68    pub async fn handle_request(&self, value: JsValue) -> ApiResult<()> {
69        let uint8array = Uint8Array::new(&value);
70        let slice = uint8array.to_vec();
71        self.0.handle_request(&slice).await?;
72        Ok(())
73    }
74
75    #[wasm_bindgen]
76    pub async fn poll(&self) -> ApiResult<()> {
77        self.0.poll().await?;
78        Ok(())
79    }
80
81    pub async fn close(self) {
82        self.0.close().await;
83    }
84}
85
86#[apply(inherit_docs)]
87#[inherit_doc = "client.md"]
88#[wasm_bindgen]
89#[derive(TryFromJsValue, Clone)]
90pub struct Client {
91    pub(crate) close: Option<Function>,
92    pub(crate) client: perspective_client::Client,
93}
94
95/// A wrapper around [`js_sys::Function`] to ease async integration for the
96/// `reconnect` argument of [`Client::on_error`] callback.
97#[derive(Derivative)]
98#[derivative(Clone(bound = ""))]
99struct JsReconnect<I>(Arc<dyn Fn(I) -> js_sys::Promise>);
100
101unsafe impl<I> Send for JsReconnect<I> {}
102unsafe impl<I> Sync for JsReconnect<I> {}
103
104impl<I> JsReconnect<I> {
105    fn run(&self, args: I) -> js_sys::Promise {
106        self.0(args)
107    }
108
109    fn run_all(
110        &self,
111        args: I,
112    ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>>
113    + Send
114    + Sync
115    + 'static
116    + use<I> {
117        let (sender, receiver) = oneshot::channel::<Result<(), Box<dyn Error + Send + Sync>>>();
118        let p = self.0(args);
119        let _ = future_to_promise(async move {
120            let result = JsFuture::from(p)
121                .await
122                .map(|_| ())
123                .map_err(|x| format!("{:?}", x).into());
124
125            sender.send(result).unwrap();
126            Ok(JsValue::UNDEFINED)
127        });
128
129        async move { receiver.await.unwrap() }
130    }
131}
132
133impl<F, I> From<F> for JsReconnect<I>
134where
135    F: Fn(I) -> js_sys::Promise + 'static,
136{
137    fn from(value: F) -> Self {
138        JsReconnect(Arc::new(value))
139    }
140}
141
142impl Client {
143    pub fn get_client(&self) -> &'_ perspective_client::Client {
144        &self.client
145    }
146}
147
148#[wasm_bindgen]
149impl Client {
150    #[wasm_bindgen(constructor)]
151    pub fn new(send_request: Function, close: Option<Function>) -> Self {
152        let send_request = JsReconnect::from(move |mut v: Vec<u8>| {
153            let buff2 = unsafe { js_sys::Uint8Array::view_mut_raw(v.as_mut_ptr(), v.len()) };
154            send_request
155                .call1(&JsValue::UNDEFINED, &buff2)
156                .unwrap()
157                .unchecked_into::<js_sys::Promise>()
158        });
159
160        let client =
161            perspective_client::Client::new_with_callback(move |msg| send_request.run_all(msg));
162
163        Client { close, client }
164    }
165
166    #[wasm_bindgen]
167    pub fn new_proxy_session(&self, on_response: &Function) -> ProxySession {
168        ProxySession::new(self, on_response)
169    }
170
171    #[wasm_bindgen]
172    pub async fn init(&self) -> ApiResult<()> {
173        self.client.clone().init().await?;
174        Ok(())
175    }
176
177    #[doc(hidden)]
178    #[wasm_bindgen]
179    pub async fn handle_response(&self, value: &JsValue) -> ApiResult<()> {
180        let uint8array = Uint8Array::new(value);
181        let slice = uint8array.to_vec();
182        self.client.handle_response(&slice).await?;
183        Ok(())
184    }
185
186    #[doc(hidden)]
187    #[wasm_bindgen]
188    pub async fn handle_error(
189        &self,
190        error: Option<String>,
191        reconnect: Option<Function>,
192    ) -> ApiResult<()> {
193        self.client
194            .handle_error(
195                error,
196                reconnect.map(|reconnect| {
197                    let reconnect =
198                        JsReconnect::from(move |()| match reconnect.call0(&JsValue::UNDEFINED) {
199                            Ok(x) => x.unchecked_into::<js_sys::Promise>(),
200                            Err(e) => {
201                                // This error may occur when _invoking_ the function
202                                tracing::warn!("{:?}", e);
203                                js_sys::Promise::reject(&format!("C {:?}", e).into())
204                            },
205                        });
206
207                    asyncfn!(reconnect, async move || {
208                        if let Err(e) = JsFuture::from(reconnect.run(())).await {
209                            if let Some(e) = e.dyn_ref::<js_sys::Object>() {
210                                Err(ClientError::Unknown(e.to_string().as_string().unwrap()))
211                            } else {
212                                Err(ClientError::Unknown(e.as_string().unwrap()))
213                            }
214                        } else {
215                            Ok(())
216                        }
217                    })
218                }),
219            )
220            .await?;
221
222        Ok(())
223    }
224
225    #[doc(hidden)]
226    #[wasm_bindgen]
227    pub async fn on_error(&self, callback: Function) -> ApiResult<u32> {
228        let callback = JsReconnect::from(
229            move |(message, reconnect): (Option<String>, Option<ReconnectCallback>)| {
230                let cl: Closure<dyn Fn() -> js_sys::Promise> = Closure::new(move || {
231                    let reconnect = reconnect.clone();
232                    future_to_promise(async move {
233                        if let Some(f) = reconnect {
234                            f().await.map_err(|e| JsValue::from(format!("A {}", e)))?;
235                        }
236
237                        Ok(JsValue::UNDEFINED)
238                    })
239                });
240
241                if let Err(e) = callback.call2(
242                    &JsValue::UNDEFINED,
243                    &JsValue::from(message),
244                    &cl.into_js_value(),
245                ) {
246                    tracing::warn!("D {:?}", e);
247                }
248
249                js_sys::Promise::resolve(&JsValue::UNDEFINED)
250            },
251        );
252
253        let poll_loop = LocalPollLoop::new_async(move |x| JsFuture::from(callback.run(x)));
254        let id = self
255            .client
256            .on_error(asyncfn!(poll_loop, async move |message, reconnect| {
257                poll_loop.poll((message, reconnect)).await;
258                Ok(())
259            }))
260            .await?;
261
262        Ok(id)
263    }
264
265    #[apply(inherit_docs)]
266    #[inherit_doc = "client/table.md"]
267    #[wasm_bindgen]
268    pub async fn table(
269        &self,
270        value: &JsTableInitData,
271        options: Option<JsTableInitOptions>,
272    ) -> ApiResult<Table> {
273        let options = options
274            .into_serde_ext::<Option<TableInitOptions>>()?
275            .unwrap_or_default();
276
277        let args = TableData::from_js_value(value, options.format)?;
278        Ok(Table(self.client.table(args, options).await?))
279    }
280
281    #[apply(inherit_docs)]
282    #[inherit_doc = "client/terminate.md"]
283    #[wasm_bindgen]
284    pub fn terminate(&self) -> ApiResult<JsValue> {
285        if let Some(f) = self.close.clone() {
286            Ok(f.call0(&JsValue::UNDEFINED)?)
287        } else {
288            Err(ApiError::new("Client type cannot be terminated"))
289        }
290    }
291
292    #[apply(inherit_docs)]
293    #[inherit_doc = "client/open_table.md"]
294    #[wasm_bindgen]
295    pub async fn open_table(&self, entity_id: String) -> ApiResult<Table> {
296        Ok(Table(self.client.open_table(entity_id).await?))
297    }
298
299    #[apply(inherit_docs)]
300    #[inherit_doc = "client/get_hosted_table_names.md"]
301    #[wasm_bindgen]
302    pub async fn get_hosted_table_names(&self) -> ApiResult<JsValue> {
303        Ok(JsValue::from_serde_ext(
304            &self.client.get_hosted_table_names().await?,
305        )?)
306    }
307
308    #[apply(inherit_docs)]
309    #[inherit_doc = "client/on_hosted_tables_update.md"]
310    #[wasm_bindgen]
311    pub async fn on_hosted_tables_update(&self, on_update_js: Function) -> ApiResult<u32> {
312        let poll_loop = LocalPollLoop::new(move |_| on_update_js.call0(&JsValue::UNDEFINED));
313        let on_update = Box::new(move || poll_loop.poll(()));
314        let id = self.client.on_hosted_tables_update(on_update).await?;
315        Ok(id)
316    }
317
318    #[apply(inherit_docs)]
319    #[inherit_doc = "client/remove_hosted_tables_update.md"]
320    #[wasm_bindgen]
321    pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ApiResult<()> {
322        self.client.remove_hosted_tables_update(update_id).await?;
323        Ok(())
324    }
325
326    #[apply(inherit_docs)]
327    #[inherit_doc = "client/system_info.md"]
328    #[wasm_bindgen]
329    pub async fn system_info(&self) -> ApiResult<JsValue> {
330        let info = self.client.system_info().await?;
331        Ok(JsValue::from_serde_ext(&info)?)
332    }
333}