perspective_js/
client.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::error::Error;
14use std::future::Future;
15use std::sync::Arc;
16
17use derivative::Derivative;
18use futures::channel::oneshot;
19use futures::future::LocalBoxFuture;
20use js_sys::{Function, Uint8Array};
21use macro_rules_attribute::apply;
22#[cfg(doc)]
23use perspective_client::SystemInfo;
24use perspective_client::{ReconnectCallback, Session, TableData, TableInitOptions};
25use wasm_bindgen::prelude::*;
26use wasm_bindgen_futures::{JsFuture, future_to_promise};
27
28pub use crate::table::*;
29use crate::utils::{ApiError, ApiResult, JsValueSerdeExt, LocalPollLoop, inherit_docs};
30
31#[wasm_bindgen]
32extern "C" {
33    #[derive(Clone)]
34    #[wasm_bindgen(typescript_type = "TableInitOptions")]
35    pub type JsTableInitOptions;
36}
37
38#[wasm_bindgen]
39#[derive(Clone)]
40pub struct ProxySession(perspective_client::ProxySession);
41
42#[wasm_bindgen]
43impl ProxySession {
44    #[wasm_bindgen(constructor)]
45    pub fn new(client: &Client, on_response: &Function) -> Self {
46        let poll_loop = LocalPollLoop::new({
47            let on_response = on_response.clone();
48            move |msg: Vec<u8>| {
49                let msg = Uint8Array::from(&msg[..]);
50                on_response.call1(&JsValue::UNDEFINED, &JsValue::from(msg))?;
51                Ok(JsValue::null())
52            }
53        });
54        // NB: This swallows any errors raised by the inner callback
55        let on_response = Box::new(move |msg: &[u8]| {
56            wasm_bindgen_futures::spawn_local(poll_loop.poll(msg.to_vec()));
57            Ok(())
58        });
59        Self(perspective_client::ProxySession::new(
60            client.client.clone(),
61            on_response,
62        ))
63    }
64
65    #[wasm_bindgen]
66    pub async fn handle_request(&self, value: JsValue) -> ApiResult<()> {
67        let uint8array = Uint8Array::new(&value);
68        let slice = uint8array.to_vec();
69        self.0.handle_request(&slice).await?;
70        Ok(())
71    }
72
73    #[wasm_bindgen]
74    pub async fn poll(&self) -> ApiResult<()> {
75        self.0.poll().await?;
76        Ok(())
77    }
78
79    pub async fn close(self) {
80        self.0.close().await;
81    }
82}
83
84#[apply(inherit_docs)]
85#[inherit_doc = "client.md"]
86#[wasm_bindgen]
87pub struct Client {
88    pub(crate) close: Option<Function>,
89    pub(crate) client: perspective_client::Client,
90}
91
92/// A wrapper around [`js_sys::Function`] to ease async integration for the
93/// `reconnect` argument of [`Client::on_error`] callback.
94#[derive(Derivative)]
95#[derivative(Clone(bound = ""))]
96struct JsReconnect<I>(Arc<dyn Fn(I) -> js_sys::Promise>);
97
98unsafe impl<I> Send for JsReconnect<I> {}
99unsafe impl<I> Sync for JsReconnect<I> {}
100
101impl<I> JsReconnect<I> {
102    fn run(&self, args: I) -> js_sys::Promise {
103        self.0(args)
104    }
105
106    fn run_all(
107        &self,
108        args: I,
109    ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + Sync + 'static
110    {
111        let (sender, receiver) = oneshot::channel::<Result<(), Box<dyn Error + Send + Sync>>>();
112        let p = self.0(args);
113        let _ = future_to_promise(async move {
114            let result = JsFuture::from(p)
115                .await
116                .map(|_| ())
117                .map_err(|x| format!("{:?}", x).into());
118
119            sender.send(result).unwrap();
120            Ok(JsValue::UNDEFINED)
121        });
122
123        async move { receiver.await.unwrap() }
124    }
125}
126
127impl<F, I> From<F> for JsReconnect<I>
128where
129    F: Fn(I) -> js_sys::Promise + 'static,
130{
131    fn from(value: F) -> Self {
132        JsReconnect(Arc::new(value))
133    }
134}
135
136#[wasm_bindgen]
137impl Client {
138    #[wasm_bindgen(constructor)]
139    pub fn new(send_request: Function, close: Option<Function>) -> Self {
140        let send_request = JsReconnect::from(move |mut v: Vec<u8>| {
141            let buff2 = unsafe { js_sys::Uint8Array::view_mut_raw(v.as_mut_ptr(), v.len()) };
142            send_request
143                .call1(&JsValue::UNDEFINED, &buff2)
144                .unwrap()
145                .unchecked_into::<js_sys::Promise>()
146        });
147
148        let client = perspective_client::Client::new_with_callback(move |msg| {
149            Box::pin(send_request.run_all(msg))
150        });
151
152        Client { close, client }
153    }
154
155    #[wasm_bindgen]
156    pub fn new_proxy_session(&self, on_response: &Function) -> ProxySession {
157        ProxySession::new(self, on_response)
158    }
159
160    #[wasm_bindgen]
161    pub async fn init(&self) -> ApiResult<()> {
162        self.client.clone().init().await?;
163        Ok(())
164    }
165
166    #[doc(hidden)]
167    #[wasm_bindgen]
168    pub async fn handle_response(&self, value: &JsValue) -> ApiResult<()> {
169        let uint8array = Uint8Array::new(value);
170        let slice = uint8array.to_vec();
171        self.client.handle_response(&slice).await?;
172        Ok(())
173    }
174
175    #[doc(hidden)]
176    #[wasm_bindgen]
177    pub async fn handle_error(
178        &self,
179        error: Option<String>,
180        reconnect: Option<Function>,
181    ) -> ApiResult<()> {
182        self.client
183            .handle_error(
184                error,
185                reconnect.map(|reconnect| {
186                    let reconnect =
187                        JsReconnect::from(move |()| match reconnect.call0(&JsValue::UNDEFINED) {
188                            Ok(x) => x.unchecked_into::<js_sys::Promise>(),
189                            Err(e) => {
190                                // This error may occur when _invoking_ the function
191                                tracing::warn!("{:?}", e);
192                                js_sys::Promise::reject(&format!("C {:?}", e).into())
193                            },
194                        });
195
196                    Arc::new(move || {
197                        let fut = JsFuture::from(reconnect.run(()));
198                        Box::pin(async move {
199                            // This error may occur when _awaiting_ the promise returned by the
200                            // function
201                            if let Err(e) = fut.await {
202                                if let Some(e) = e.dyn_ref::<js_sys::Object>() {
203                                    Err(e.to_string().as_string().unwrap().into())
204                                } else {
205                                    Err(e.as_string().unwrap().into())
206                                }
207                            } else {
208                                Ok(())
209                            }
210                        })
211                            as LocalBoxFuture<'static, Result<(), Box<dyn Error>>>
212                    }) as Arc<(dyn Fn() -> _ + Send + Sync)>
213                }),
214            )
215            .await?;
216
217        Ok(())
218    }
219
220    #[doc(hidden)]
221    #[wasm_bindgen]
222    pub async fn on_error(&self, callback: Function) -> ApiResult<u32> {
223        let callback = JsReconnect::from(
224            move |(message, reconnect): (Option<String>, Option<ReconnectCallback>)| {
225                let cl: Closure<dyn Fn() -> js_sys::Promise> = Closure::new(move || {
226                    let reconnect = reconnect.clone();
227                    future_to_promise(async move {
228                        if let Some(f) = reconnect {
229                            f().await.map_err(|e| JsValue::from(format!("A {}", e)))?;
230                        }
231
232                        Ok(JsValue::UNDEFINED)
233                    })
234                });
235
236                if let Err(e) = callback.call2(
237                    &JsValue::UNDEFINED,
238                    &JsValue::from(message),
239                    &cl.into_js_value(),
240                ) {
241                    tracing::warn!("D {:?}", e);
242                }
243
244                js_sys::Promise::resolve(&JsValue::UNDEFINED)
245            },
246        );
247
248        let id = self
249            .client
250            .on_error(Box::new(move |message, reconnect| {
251                let callback = callback.clone();
252                Box::pin(async move {
253                    let _promise = callback.run((message, reconnect));
254                    Ok(())
255                })
256            }))
257            .await?;
258
259        Ok(id)
260    }
261
262    #[apply(inherit_docs)]
263    #[inherit_doc = "client/table.md"]
264    #[wasm_bindgen]
265    pub async fn table(
266        &self,
267        value: &JsTableInitData,
268        options: Option<JsTableInitOptions>,
269    ) -> ApiResult<Table> {
270        let options = options
271            .into_serde_ext::<Option<TableInitOptions>>()?
272            .unwrap_or_default();
273
274        let args = TableData::from_js_value(value, options.format)?;
275        Ok(Table(self.client.table(args, options).await?))
276    }
277
278    #[apply(inherit_docs)]
279    #[inherit_doc = "client/terminate.md"]
280    #[wasm_bindgen]
281    pub fn terminate(&self) -> ApiResult<JsValue> {
282        if let Some(f) = self.close.clone() {
283            Ok(f.call0(&JsValue::UNDEFINED)?)
284        } else {
285            Err(ApiError::new("Client type cannot be terminated"))
286        }
287    }
288
289    #[apply(inherit_docs)]
290    #[inherit_doc = "client/open_table.md"]
291    #[wasm_bindgen]
292    pub async fn open_table(&self, entity_id: String) -> ApiResult<Table> {
293        Ok(Table(self.client.open_table(entity_id).await?))
294    }
295
296    #[apply(inherit_docs)]
297    #[inherit_doc = "client/get_hosted_table_names.md"]
298    #[wasm_bindgen]
299    pub async fn get_hosted_table_names(&self) -> ApiResult<JsValue> {
300        Ok(JsValue::from_serde_ext(
301            &self.client.get_hosted_table_names().await?,
302        )?)
303    }
304
305    #[apply(inherit_docs)]
306    #[inherit_doc = "client/on_hosted_tables_update.md"]
307    #[wasm_bindgen]
308    pub async fn on_hosted_tables_update(&self, on_update_js: Function) -> ApiResult<u32> {
309        let poll_loop = LocalPollLoop::new(move |_| on_update_js.call0(&JsValue::UNDEFINED));
310        let on_update = Box::new(move || poll_loop.poll(()));
311        let id = self.client.on_hosted_tables_update(on_update).await?;
312        Ok(id)
313    }
314
315    #[apply(inherit_docs)]
316    #[inherit_doc = "client/remove_hosted_tables_update.md"]
317    #[wasm_bindgen]
318    pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ApiResult<()> {
319        self.client.remove_hosted_tables_update(update_id).await?;
320        Ok(())
321    }
322
323    #[apply(inherit_docs)]
324    #[inherit_doc = "client/system_info.md"]
325    #[wasm_bindgen]
326    pub async fn system_info(&self) -> ApiResult<JsValue> {
327        let info = self.client.system_info().await?;
328        Ok(JsValue::from_serde_ext(&info)?)
329    }
330}