Skip to main content

perspective_js/
client.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::error::Error;
14use std::future::Future;
15use std::sync::Arc;
16
17use derivative::Derivative;
18use futures::channel::oneshot;
19use js_sys::{Function, Uint8Array};
20#[cfg(doc)]
21use perspective_client::SystemInfo;
22use perspective_client::{
23    ClientError, ReconnectCallback, Session, TableData, TableInitOptions, asyncfn,
24};
25use wasm_bindgen::prelude::*;
26use wasm_bindgen_derive::TryFromJsValue;
27use wasm_bindgen_futures::{JsFuture, future_to_promise};
28
29pub use crate::table::*;
30use crate::utils::{ApiError, ApiResult, JsValueSerdeExt, LocalPollLoop};
31use crate::{TableDataExt, apierror};
32
33#[wasm_bindgen]
34extern "C" {
35    #[derive(Clone)]
36    #[wasm_bindgen(typescript_type = "TableInitOptions")]
37    pub type JsTableInitOptions;
38}
39
40#[wasm_bindgen]
41#[derive(Clone)]
42pub struct ProxySession(perspective_client::ProxySession);
43
44#[wasm_bindgen]
45impl ProxySession {
46    #[wasm_bindgen(constructor)]
47    pub fn new(client: &Client, on_response: &Function) -> Self {
48        let poll_loop = LocalPollLoop::new({
49            let on_response = on_response.clone();
50            move |msg: Vec<u8>| {
51                let msg = Uint8Array::from(&msg[..]);
52                on_response.call1(&JsValue::UNDEFINED, &JsValue::from(msg))?;
53                Ok(JsValue::null())
54            }
55        });
56        // NB: This swallows any errors raised by the inner callback
57        let on_response = Box::new(move |msg: &[u8]| {
58            wasm_bindgen_futures::spawn_local(poll_loop.poll(msg.to_vec()));
59            Ok(())
60        });
61        Self(perspective_client::ProxySession::new(
62            client.client.clone(),
63            on_response,
64        ))
65    }
66
67    #[wasm_bindgen]
68    pub async fn handle_request(&self, value: JsValue) -> ApiResult<()> {
69        let uint8array = Uint8Array::new(&value);
70        let slice = uint8array.to_vec();
71        self.0.handle_request(&slice).await?;
72        Ok(())
73    }
74
75    pub async fn close(self) {
76        self.0.close().await;
77    }
78}
79
80/// An instance of a [`Client`] is a connection to a single
81/// `perspective_server::Server`, whether locally in-memory or remote over some
82/// transport like a WebSocket.
83///
84/// The browser and node.js libraries both support the `websocket(url)`
85/// constructor, which connects to a remote `perspective_server::Server`
86/// instance over a WebSocket transport.
87///
88/// In the browser, the `worker()` constructor creates a new Web Worker
89/// `perspective_server::Server` and returns a [`Client`] connected to it.
90///
91/// In node.js, a pre-instantied [`Client`] connected synhronously to a global
92/// singleton `perspective_server::Server` is the default module export.
93///
94/// # JavaScript Examples
95///
96/// Create a Web Worker `perspective_server::Server` in the browser and return a
97/// [`Client`] instance connected for it:
98///
99/// ```javascript
100/// import perspective from "@perspective-dev/client";
101/// const client = await perspective.worker();
102/// ```
103///
104/// Create a WebSocket connection to a remote `perspective_server::Server`:
105///
106/// ```javascript
107/// import perspective from "@perspective-dev/client";
108/// const client = await perspective.websocket("ws://locahost:8080/ws");
109/// ```
110///
111/// Access the synchronous client in node.js:
112///
113/// ```javascript
114/// import { default as client } from "@perspective-dev/client";
115/// ```
116#[wasm_bindgen]
117#[derive(TryFromJsValue, Clone)]
118pub struct Client {
119    pub(crate) client: perspective_client::Client,
120    pub(crate) close: Option<Function>,
121}
122
123impl From<perspective_client::Client> for Client {
124    fn from(client: perspective_client::Client) -> Self {
125        Client {
126            client,
127            close: None,
128        }
129    }
130}
131
132impl PartialEq for Client {
133    fn eq(&self, other: &Self) -> bool {
134        self.client.get_name() == other.client.get_name()
135    }
136}
137
138/// A wrapper around [`js_sys::Function`] to ease async integration for the
139/// `reconnect` argument of [`Client::on_error`] callback.
140#[derive(Derivative)]
141#[derivative(Clone(bound = ""))]
142struct JsReconnect<I>(Arc<dyn Fn(I) -> js_sys::Promise>);
143
144// This type is not thread safe, but the JavaScript environment does not allow
145// threading.
146unsafe impl<I> Send for JsReconnect<I> {}
147unsafe impl<I> Sync for JsReconnect<I> {}
148
149impl<I> JsReconnect<I> {
150    fn run(&self, args: I) -> js_sys::Promise {
151        self.0(args)
152    }
153
154    fn run_all(
155        &self,
156        args: I,
157    ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>>
158    + Send
159    + Sync
160    + 'static
161    + use<I> {
162        let (sender, receiver) = oneshot::channel::<Result<(), Box<dyn Error + Send + Sync>>>();
163        let p = self.0(args);
164        let _ = future_to_promise(async move {
165            let result = JsFuture::from(p)
166                .await
167                .map(|_| ())
168                .map_err(|x| format!("{x:?}").into());
169
170            sender.send(result).unwrap();
171            Ok(JsValue::UNDEFINED)
172        });
173
174        async move { receiver.await.unwrap() }
175    }
176}
177
178impl<F, I> From<F> for JsReconnect<I>
179where
180    F: Fn(I) -> js_sys::Promise + 'static,
181{
182    fn from(value: F) -> Self {
183        JsReconnect(Arc::new(value))
184    }
185}
186
187impl Client {
188    pub fn get_client(&self) -> &'_ perspective_client::Client {
189        &self.client
190    }
191}
192
193#[wasm_bindgen]
194impl Client {
195    #[wasm_bindgen(constructor)]
196    pub fn new(send_request: Function, close: Option<Function>) -> ApiResult<Self> {
197        let send_request = JsReconnect::from(move |mut v: Vec<u8>| {
198            let buff2 = unsafe { js_sys::Uint8Array::view_mut_raw(v.as_mut_ptr(), v.len()) };
199            send_request
200                .call1(&JsValue::UNDEFINED, &buff2)
201                .unwrap()
202                .unchecked_into::<js_sys::Promise>()
203        });
204
205        let client = perspective_client::Client::new_with_callback(None, move |msg| {
206            send_request.run_all(msg)
207        })?;
208
209        Ok(Client { close, client })
210    }
211
212    #[wasm_bindgen]
213    pub fn new_proxy_session(&self, on_response: &Function) -> ProxySession {
214        ProxySession::new(self, on_response)
215    }
216
217    #[wasm_bindgen]
218    pub async fn handle_response(&self, value: &JsValue) -> ApiResult<()> {
219        let uint8array = Uint8Array::new(value);
220        let slice = uint8array.to_vec();
221        self.client.handle_response(&slice).await?;
222        Ok(())
223    }
224
225    #[wasm_bindgen]
226    pub async fn handle_error(&self, error: String, reconnect: Option<Function>) -> ApiResult<()> {
227        self.client
228            .handle_error(
229                ClientError::Unknown(error),
230                reconnect.map(|reconnect| {
231                    let reconnect =
232                        JsReconnect::from(move |()| match reconnect.call0(&JsValue::UNDEFINED) {
233                            Ok(x) => x.unchecked_into::<js_sys::Promise>(),
234                            Err(e) => {
235                                // This error may occur when _invoking_ the function
236                                tracing::warn!("{:?}", e);
237                                js_sys::Promise::reject(&format!("C {e:?}").into())
238                            },
239                        });
240
241                    asyncfn!(reconnect, async move || {
242                        if let Err(e) = JsFuture::from(reconnect.run(())).await {
243                            if let Some(e) = e.dyn_ref::<js_sys::Object>() {
244                                Err(ClientError::Unknown(e.to_string().as_string().unwrap()))
245                            } else {
246                                Err(ClientError::Unknown(e.as_string().unwrap()))
247                            }
248                        } else {
249                            Ok(())
250                        }
251                    })
252                }),
253            )
254            .await?;
255
256        Ok(())
257    }
258
259    #[wasm_bindgen]
260    pub async fn on_error(&self, callback: Function) -> ApiResult<u32> {
261        let callback = JsReconnect::from(
262            move |(message, reconnect): (ClientError, Option<ReconnectCallback>)| {
263                let cl: Closure<dyn Fn() -> js_sys::Promise> = Closure::new(move || {
264                    let reconnect = reconnect.clone();
265                    future_to_promise(async move {
266                        if let Some(f) = reconnect {
267                            (*f)().await.map_err(|e| JsValue::from(format!("A {e}")))?;
268                        }
269
270                        Ok(JsValue::UNDEFINED)
271                    })
272                });
273
274                if let Err(e) = callback.call2(
275                    &JsValue::UNDEFINED,
276                    &JsValue::from(apierror!(ClientError(message))),
277                    &cl.into_js_value(),
278                ) {
279                    tracing::warn!("{:?}", e);
280                }
281
282                js_sys::Promise::resolve(&JsValue::UNDEFINED)
283            },
284        );
285
286        let poll_loop = LocalPollLoop::new_async(move |x| JsFuture::from(callback.run(x)));
287        let id = self
288            .client
289            .on_error(asyncfn!(poll_loop, async move |message, reconnect| {
290                poll_loop.poll((message, reconnect)).await;
291                Ok(())
292            }))
293            .await?;
294
295        Ok(id)
296    }
297
298    /// Creates a new [`Table`] from either a _schema_ or _data_.
299    ///
300    /// The [`Client::table`] factory function can be initialized with either a
301    /// _schema_ (see [`Table::schema`]), or data in one of these formats:
302    ///
303    /// - Apache Arrow
304    /// - CSV
305    /// - JSON row-oriented
306    /// - JSON column-oriented
307    /// - NDJSON
308    ///
309    /// When instantiated with _data_, the schema is inferred from this data.
310    /// While this is convenient, inferrence is sometimes imperfect e.g.
311    /// when the input is empty, null or ambiguous. For these cases,
312    /// [`Client::table`] can first be instantiated with a explicit schema.
313    ///
314    /// When instantiated with a _schema_, the resulting [`Table`] is empty but
315    /// with known column names and column types. When subsqeuently
316    /// populated with [`Table::update`], these columns will be _coerced_ to
317    /// the schema's type. This behavior can be useful when
318    /// [`Client::table`]'s column type inferences doesn't work.
319    ///
320    /// The resulting [`Table`] is _virtual_, and invoking its methods
321    /// dispatches events to the `perspective_server::Server` this
322    /// [`Client`] connects to, where the data is stored and all calculation
323    /// occurs.
324    ///
325    /// # Arguments
326    ///
327    /// - `arg` - Either _schema_ or initialization _data_.
328    /// - `options` - Optional configuration which provides one of:
329    ///     - `limit` - The max number of rows the resulting [`Table`] can
330    ///       store.
331    ///     - `index` - The column name to use as an _index_ column. If this
332    ///       `Table` is being instantiated by _data_, this column name must be
333    ///       present in the data.
334    ///     - `name` - The name of the table. This will be generated if it is
335    ///       not provided.
336    ///     - `format` - The explicit format of the input data, can be one of
337    ///       `"json"`, `"columns"`, `"csv"` or `"arrow"`. This overrides
338    ///       language-specific type dispatch behavior, which allows stringified
339    ///       and byte array alternative inputs.
340    ///
341    /// # JavaScript Examples
342    ///
343    /// Load a CSV from a `string`:
344    ///
345    /// ```javascript
346    /// const table = await client.table("x,y\n1,2\n3,4");
347    /// ```
348    ///
349    /// Load an Arrow from an `ArrayBuffer`:
350    ///
351    /// ```javascript
352    /// import * as fs from "node:fs/promises";
353    /// const table2 = await client.table(await fs.readFile("superstore.arrow"));
354    /// ```
355    ///
356    /// Load a CSV from a `UInt8Array` (the default for this type is Arrow)
357    /// using a format override:
358    ///
359    /// ```javascript
360    /// const enc = new TextEncoder();
361    /// const table = await client.table(enc.encode("x,y\n1,2\n3,4"), {
362    ///     format: "csv",
363    /// });
364    /// ```
365    ///
366    /// Create a table with an `index`:
367    ///
368    /// ```javascript
369    /// const table = await client.table(data, { index: "Row ID" });
370    /// ```
371    #[wasm_bindgen]
372    pub async fn table(
373        &self,
374        value: &JsTableInitData,
375        options: Option<JsTableInitOptions>,
376    ) -> ApiResult<Table> {
377        let options = options
378            .into_serde_ext::<Option<TableInitOptions>>()?
379            .unwrap_or_default();
380
381        let args = TableData::from_js_value(value, options.format)?;
382        Ok(Table(self.client.table(args, options).await?))
383    }
384
385    /// Terminates this [`Client`], cleaning up any [`crate::View`] handles the
386    /// [`Client`] has open as well as its callbacks.
387    #[wasm_bindgen]
388    pub fn terminate(&self) -> ApiResult<JsValue> {
389        if let Some(f) = self.close.clone() {
390            Ok(f.call0(&JsValue::UNDEFINED)?)
391        } else {
392            Err(ApiError::new("Client type cannot be terminated"))
393        }
394    }
395
396    /// Opens a [`Table`] that is hosted on the `perspective_server::Server`
397    /// that is connected to this [`Client`].
398    ///
399    /// The `name` property of [`TableInitOptions`] is used to identify each
400    /// [`Table`]. [`Table`] `name`s can be looked up for each [`Client`]
401    /// via [`Client::get_hosted_table_names`].
402    ///
403    /// # JavaScript Examples
404    ///
405    /// Get a virtual [`Table`] named "table_one" from this [`Client`]
406    ///
407    /// ```javascript
408    /// const tables = await client.open_table("table_one");
409    /// ```
410    #[wasm_bindgen]
411    pub async fn open_table(&self, entity_id: String) -> ApiResult<Table> {
412        Ok(Table(self.client.open_table(entity_id).await?))
413    }
414
415    /// Retrieves the names of all tables that this client has access to.
416    ///
417    /// `name` is a string identifier unique to the [`Table`] (per [`Client`]),
418    /// which can be used in conjunction with [`Client::open_table`] to get
419    /// a [`Table`] instance without the use of [`Client::table`]
420    /// constructor directly (e.g., one created by another [`Client`]).
421    ///
422    /// # JavaScript Examples
423    ///
424    /// ```javascript
425    /// const tables = await client.get_hosted_table_names();
426    /// ```
427    #[wasm_bindgen]
428    pub async fn get_hosted_table_names(&self) -> ApiResult<Vec<String>> {
429        Ok(self.client.get_hosted_table_names().await?)
430    }
431
432    /// Register a callback which is invoked whenever [`Client::table`] (on this
433    /// [`Client`]) or [`Table::delete`] (on a [`Table`] belinging to this
434    /// [`Client`]) are called.
435    #[wasm_bindgen]
436    pub async fn on_hosted_tables_update(&self, on_update_js: Function) -> ApiResult<u32> {
437        let poll_loop = LocalPollLoop::new(move |_| on_update_js.call0(&JsValue::UNDEFINED));
438        let on_update = Box::new(move || poll_loop.poll(()));
439        let id = self.client.on_hosted_tables_update(on_update).await?;
440        Ok(id)
441    }
442
443    /// Remove a callback previously registered via
444    /// `Client::on_hosted_tables_update`.
445    #[wasm_bindgen]
446    pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ApiResult<()> {
447        self.client.remove_hosted_tables_update(update_id).await?;
448        Ok(())
449    }
450
451    /// Provides the [`SystemInfo`] struct, implementation-specific metadata
452    /// about the [`perspective_server::Server`] runtime such as Memory and
453    /// CPU usage.
454    ///
455    /// For WebAssembly servers, this method includes the WebAssembly heap size.
456    ///
457    /// # JavaScript Examples
458    ///
459    /// ```javascript
460    /// const info = await client.system_info();
461    /// ```
462    #[wasm_bindgen]
463    pub async fn system_info(&self) -> ApiResult<JsSystemInfo> {
464        let mut info = self.client.system_info().await?;
465
466        #[cfg(feature = "trace-allocator")]
467        if let perspective_client::SystemInfo {
468            client_used: None,
469            client_heap: None,
470            ..
471        } = &info
472        {
473            let (client_used, client_heap) = crate::utils::get_used();
474            info.client_used = Some(client_used as u64);
475            info.client_heap = Some(client_heap as u64);
476        };
477
478        let timestamp = web_sys::window()
479            .and_then(|x| x.performance())
480            .map(|x| x.now() as u64);
481
482        info.timestamp = timestamp;
483        let record = JsValue::from_serde_ext(&info.cast::<f64>())?;
484        Ok(record.unchecked_into())
485    }
486}
487
488#[wasm_bindgen]
489extern "C" {
490    #[wasm_bindgen(typescript_type = "SystemInfo")]
491    pub type JsSystemInfo;
492}