Skip to main content

perspective_js/
client.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::error::Error;
14use std::future::Future;
15use std::sync::Arc;
16
17use derivative::Derivative;
18use futures::channel::oneshot;
19use js_sys::{Function, Uint8Array};
20#[cfg(doc)]
21use perspective_client::SystemInfo;
22use perspective_client::{
23    ClientError, ReconnectCallback, Session, TableData, TableInitOptions, TableRef, asyncfn,
24};
25use wasm_bindgen::prelude::*;
26use wasm_bindgen_derive::TryFromJsValue;
27use wasm_bindgen_futures::{JsFuture, future_to_promise};
28
29pub use crate::table::*;
30use crate::utils::{ApiError, ApiResult, JsValueSerdeExt, LocalPollLoop};
31use crate::{TableDataExt, apierror};
32
33#[wasm_bindgen]
34extern "C" {
35    #[derive(Clone)]
36    #[wasm_bindgen(typescript_type = "TableInitOptions")]
37    pub type JsTableInitOptions;
38
39    #[derive(Clone)]
40    #[wasm_bindgen(typescript_type = "JoinOptions")]
41    pub type JsJoinOptions;
42}
43
44async fn js_to_table_ref(val: &JsValue) -> ApiResult<TableRef> {
45    if let Some(name) = val.as_string() {
46        Ok(TableRef::from(name))
47    } else {
48        let get_name = js_sys::Reflect::get(val, &wasm_bindgen::intern("get_name").into())
49            .map_err(|_| apierror!(TableRefError))?
50            .dyn_into::<js_sys::Function>()
51            .map_err(|_| apierror!(TableRefError))?;
52
53        let promise = get_name
54            .call0(val)
55            .map_err(|_| apierror!(TableRefError))?
56            .dyn_into::<js_sys::Promise>()
57            .map_err(|_| apierror!(TableRefError))?;
58
59        let name = wasm_bindgen_futures::JsFuture::from(promise)
60            .await
61            .map_err(|_| apierror!(TableRefError))?
62            .as_string()
63            .ok_or_else(|| apierror!(TableRefError))?;
64
65        Ok(TableRef::from(name))
66    }
67}
68
69#[wasm_bindgen]
70#[derive(Clone)]
71pub struct ProxySession(perspective_client::ProxySession);
72
73#[wasm_bindgen]
74impl ProxySession {
75    #[wasm_bindgen(constructor)]
76    pub fn new(client: &Client, on_response: &Function) -> Self {
77        let poll_loop = LocalPollLoop::new({
78            let on_response = on_response.clone();
79            move |msg: Vec<u8>| {
80                let msg = Uint8Array::from(&msg[..]);
81                on_response.call1(&JsValue::UNDEFINED, &JsValue::from(msg))?;
82                Ok(JsValue::null())
83            }
84        });
85        // NB: This swallows any errors raised by the inner callback
86        let on_response = Box::new(move |msg: &[u8]| {
87            wasm_bindgen_futures::spawn_local(poll_loop.poll(msg.to_vec()));
88            Ok(())
89        });
90        Self(perspective_client::ProxySession::new(
91            client.client.clone(),
92            on_response,
93        ))
94    }
95
96    #[wasm_bindgen]
97    pub async fn handle_request(&self, value: JsValue) -> ApiResult<()> {
98        let uint8array = Uint8Array::new(&value);
99        let slice = uint8array.to_vec();
100        self.0.handle_request(&slice).await?;
101        Ok(())
102    }
103
104    pub async fn close(self) {
105        self.0.close().await;
106    }
107}
108
109/// An instance of a [`Client`] is a connection to a single
110/// `perspective_server::Server`, whether locally in-memory or remote over some
111/// transport like a WebSocket.
112///
113/// The browser and node.js libraries both support the `websocket(url)`
114/// constructor, which connects to a remote `perspective_server::Server`
115/// instance over a WebSocket transport.
116///
117/// In the browser, the `worker()` constructor creates a new Web Worker
118/// `perspective_server::Server` and returns a [`Client`] connected to it.
119///
120/// In node.js, a pre-instantied [`Client`] connected synhronously to a global
121/// singleton `perspective_server::Server` is the default module export.
122///
123/// # JavaScript Examples
124///
125/// Create a Web Worker `perspective_server::Server` in the browser and return a
126/// [`Client`] instance connected for it:
127///
128/// ```javascript
129/// import perspective from "@perspective-dev/client";
130/// const client = await perspective.worker();
131/// ```
132///
133/// Create a WebSocket connection to a remote `perspective_server::Server`:
134///
135/// ```javascript
136/// import perspective from "@perspective-dev/client";
137/// const client = await perspective.websocket("ws://locahost:8080/ws");
138/// ```
139///
140/// Access the synchronous client in node.js:
141///
142/// ```javascript
143/// import { default as client } from "@perspective-dev/client";
144/// ```
145#[wasm_bindgen]
146#[derive(TryFromJsValue, Clone)]
147pub struct Client {
148    pub(crate) client: perspective_client::Client,
149    pub(crate) close: Option<Function>,
150}
151
152impl From<perspective_client::Client> for Client {
153    fn from(client: perspective_client::Client) -> Self {
154        Client {
155            client,
156            close: None,
157        }
158    }
159}
160
161impl PartialEq for Client {
162    fn eq(&self, other: &Self) -> bool {
163        self.client.get_name() == other.client.get_name()
164    }
165}
166
167/// A wrapper around [`js_sys::Function`] to ease async integration for the
168/// `reconnect` argument of [`Client::on_error`] callback.
169#[derive(Derivative)]
170#[derivative(Clone(bound = ""))]
171struct JsReconnect<I>(Arc<dyn Fn(I) -> js_sys::Promise>);
172
173// This type is not thread safe, but the JavaScript environment does not allow
174// threading.
175unsafe impl<I> Send for JsReconnect<I> {}
176unsafe impl<I> Sync for JsReconnect<I> {}
177
178impl<I> JsReconnect<I> {
179    fn run(&self, args: I) -> js_sys::Promise {
180        self.0(args)
181    }
182
183    fn run_all(
184        &self,
185        args: I,
186    ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>>
187    + Send
188    + Sync
189    + 'static
190    + use<I> {
191        let (sender, receiver) = oneshot::channel::<Result<(), Box<dyn Error + Send + Sync>>>();
192        let p = self.0(args);
193        let _ = future_to_promise(async move {
194            let result = JsFuture::from(p)
195                .await
196                .map(|_| ())
197                .map_err(|x| format!("{x:?}").into());
198
199            sender.send(result).unwrap();
200            Ok(JsValue::UNDEFINED)
201        });
202
203        async move { receiver.await.unwrap() }
204    }
205}
206
207impl<F, I> From<F> for JsReconnect<I>
208where
209    F: Fn(I) -> js_sys::Promise + 'static,
210{
211    fn from(value: F) -> Self {
212        JsReconnect(Arc::new(value))
213    }
214}
215
216impl Client {
217    pub fn get_client(&self) -> &'_ perspective_client::Client {
218        &self.client
219    }
220}
221
222#[wasm_bindgen]
223impl Client {
224    #[wasm_bindgen(constructor)]
225    pub fn new(send_request: Function, close: Option<Function>) -> ApiResult<Self> {
226        let send_request = JsReconnect::from(move |mut v: Vec<u8>| {
227            let buff2 = unsafe { js_sys::Uint8Array::view_mut_raw(v.as_mut_ptr(), v.len()) };
228            send_request
229                .call1(&JsValue::UNDEFINED, &buff2)
230                .unwrap()
231                .unchecked_into::<js_sys::Promise>()
232        });
233
234        let client = perspective_client::Client::new_with_callback(None, move |msg| {
235            send_request.run_all(msg)
236        })?;
237
238        Ok(Client { close, client })
239    }
240
241    #[wasm_bindgen]
242    pub fn new_proxy_session(&self, on_response: &Function) -> ProxySession {
243        ProxySession::new(self, on_response)
244    }
245
246    #[wasm_bindgen]
247    pub async fn handle_response(&self, value: &JsValue) -> ApiResult<()> {
248        let uint8array = Uint8Array::new(value);
249        let slice = uint8array.to_vec();
250        self.client.handle_response(&slice).await?;
251        Ok(())
252    }
253
254    #[wasm_bindgen]
255    pub async fn handle_error(&self, error: String, reconnect: Option<Function>) -> ApiResult<()> {
256        self.client
257            .handle_error(
258                ClientError::Unknown(error),
259                reconnect.map(|reconnect| {
260                    let reconnect =
261                        JsReconnect::from(move |()| match reconnect.call0(&JsValue::UNDEFINED) {
262                            Ok(x) => x.unchecked_into::<js_sys::Promise>(),
263                            Err(e) => {
264                                // This error may occur when _invoking_ the function
265                                tracing::warn!("{:?}", e);
266                                js_sys::Promise::reject(&format!("C {e:?}").into())
267                            },
268                        });
269
270                    asyncfn!(reconnect, async move || {
271                        if let Err(e) = JsFuture::from(reconnect.run(())).await {
272                            if let Some(e) = e.dyn_ref::<js_sys::Object>() {
273                                Err(ClientError::Unknown(e.to_string().as_string().unwrap()))
274                            } else {
275                                Err(ClientError::Unknown(e.as_string().unwrap()))
276                            }
277                        } else {
278                            Ok(())
279                        }
280                    })
281                }),
282            )
283            .await?;
284
285        Ok(())
286    }
287
288    #[wasm_bindgen]
289    pub async fn on_error(&self, callback: Function) -> ApiResult<u32> {
290        let callback = JsReconnect::from(
291            move |(message, reconnect): (ClientError, Option<ReconnectCallback>)| {
292                let cl: Closure<dyn Fn() -> js_sys::Promise> = Closure::new(move || {
293                    let reconnect = reconnect.clone();
294                    future_to_promise(async move {
295                        if let Some(f) = reconnect {
296                            (*f)().await.map_err(|e| JsValue::from(format!("A {e}")))?;
297                        }
298
299                        Ok(JsValue::UNDEFINED)
300                    })
301                });
302
303                if let Err(e) = callback.call2(
304                    &JsValue::UNDEFINED,
305                    &JsValue::from(apierror!(ClientError(message))),
306                    &cl.into_js_value(),
307                ) {
308                    tracing::warn!("{:?}", e);
309                }
310
311                js_sys::Promise::resolve(&JsValue::UNDEFINED)
312            },
313        );
314
315        let poll_loop = LocalPollLoop::new_async(move |x| JsFuture::from(callback.run(x)));
316        let id = self
317            .client
318            .on_error(asyncfn!(poll_loop, async move |message, reconnect| {
319                poll_loop.poll((message, reconnect)).await;
320                Ok(())
321            }))
322            .await?;
323
324        Ok(id)
325    }
326
327    /// Creates a new [`Table`] from either a _schema_ or _data_.
328    ///
329    /// The [`Client::table`] factory function can be initialized with either a
330    /// _schema_ (see [`Table::schema`]), or data in one of these formats:
331    ///
332    /// - Apache Arrow
333    /// - CSV
334    /// - JSON row-oriented
335    /// - JSON column-oriented
336    /// - NDJSON
337    ///
338    /// When instantiated with _data_, the schema is inferred from this data.
339    /// While this is convenient, inferrence is sometimes imperfect e.g.
340    /// when the input is empty, null or ambiguous. For these cases,
341    /// [`Client::table`] can first be instantiated with a explicit schema.
342    ///
343    /// When instantiated with a _schema_, the resulting [`Table`] is empty but
344    /// with known column names and column types. When subsqeuently
345    /// populated with [`Table::update`], these columns will be _coerced_ to
346    /// the schema's type. This behavior can be useful when
347    /// [`Client::table`]'s column type inferences doesn't work.
348    ///
349    /// The resulting [`Table`] is _virtual_, and invoking its methods
350    /// dispatches events to the `perspective_server::Server` this
351    /// [`Client`] connects to, where the data is stored and all calculation
352    /// occurs.
353    ///
354    /// # Arguments
355    ///
356    /// - `arg` - Either _schema_ or initialization _data_.
357    /// - `options` - Optional configuration which provides one of:
358    ///     - `limit` - The max number of rows the resulting [`Table`] can
359    ///       store.
360    ///     - `index` - The column name to use as an _index_ column. If this
361    ///       `Table` is being instantiated by _data_, this column name must be
362    ///       present in the data.
363    ///     - `name` - The name of the table. This will be generated if it is
364    ///       not provided.
365    ///     - `format` - The explicit format of the input data, can be one of
366    ///       `"json"`, `"columns"`, `"csv"` or `"arrow"`. This overrides
367    ///       language-specific type dispatch behavior, which allows stringified
368    ///       and byte array alternative inputs.
369    ///
370    /// # JavaScript Examples
371    ///
372    /// Load a CSV from a `string`:
373    ///
374    /// ```javascript
375    /// const table = await client.table("x,y\n1,2\n3,4");
376    /// ```
377    ///
378    /// Load an Arrow from an `ArrayBuffer`:
379    ///
380    /// ```javascript
381    /// import * as fs from "node:fs/promises";
382    /// const table2 = await client.table(await fs.readFile("superstore.arrow"));
383    /// ```
384    ///
385    /// Load a CSV from a `UInt8Array` (the default for this type is Arrow)
386    /// using a format override:
387    ///
388    /// ```javascript
389    /// const enc = new TextEncoder();
390    /// const table = await client.table(enc.encode("x,y\n1,2\n3,4"), {
391    ///     format: "csv",
392    /// });
393    /// ```
394    ///
395    /// Create a table with an `index`:
396    ///
397    /// ```javascript
398    /// const table = await client.table(data, { index: "Row ID" });
399    /// ```
400    #[wasm_bindgen]
401    pub async fn table(
402        &self,
403        value: &JsTableInitData,
404        options: Option<JsTableInitOptions>,
405    ) -> ApiResult<Table> {
406        let options = options
407            .into_serde_ext::<Option<TableInitOptions>>()?
408            .unwrap_or_default();
409
410        let args = TableData::from_js_value(value, options.format)?;
411        Ok(Table(self.client.table(args, options).await?))
412    }
413
414    /// Creates a new read-only [`Table`] by performing an INNER JOIN on two
415    /// source tables. The resulting table is reactive: when either source
416    /// table is updated, the join is automatically recomputed.
417    ///
418    /// # Arguments
419    ///
420    /// - `left` - The left source table (a [`Table`] instance or a table name
421    ///   string).
422    /// - `right` - The right source table (a [`Table`] instance or a table name
423    ///   string).
424    /// - `on` - The column name to join on. Must exist in both tables with the
425    ///   same type.
426    /// - `options` - Optional join configuration: `{ join_type?: "inner" |
427    ///   "left" | "outer", name?: string }`.
428    ///
429    /// # JavaScript Examples
430    ///
431    /// ```javascript
432    /// const joined = await client.join(orders_table, products_table, "Product ID", { join_type: "left" });
433    /// const joined = await client.join("orders", "products", "Product ID", { join_type: "left" });
434    /// ```
435    #[wasm_bindgen]
436    pub async fn join(
437        &self,
438        left: JsValue,
439        right: JsValue,
440        on: &str,
441        options: Option<JsJoinOptions>,
442    ) -> ApiResult<Table> {
443        let options = options
444            .into_serde_ext::<Option<perspective_client::JoinOptions>>()?
445            .unwrap_or_default();
446
447        let left_ref = js_to_table_ref(&left).await?;
448        let right_ref = js_to_table_ref(&right).await?;
449        Ok(Table(
450            self.client.join(left_ref, right_ref, on, options).await?,
451        ))
452    }
453
454    /// Terminates this [`Client`], cleaning up any [`crate::View`] handles the
455    /// [`Client`] has open as well as its callbacks.
456    #[wasm_bindgen]
457    pub fn terminate(&self) -> ApiResult<JsValue> {
458        if let Some(f) = self.close.clone() {
459            Ok(f.call0(&JsValue::UNDEFINED)?)
460        } else {
461            Err(ApiError::new("Client type cannot be terminated"))
462        }
463    }
464
465    /// Opens a [`Table`] that is hosted on the `perspective_server::Server`
466    /// that is connected to this [`Client`].
467    ///
468    /// The `name` property of [`TableInitOptions`] is used to identify each
469    /// [`Table`]. [`Table`] `name`s can be looked up for each [`Client`]
470    /// via [`Client::get_hosted_table_names`].
471    ///
472    /// # JavaScript Examples
473    ///
474    /// Get a virtual [`Table`] named "table_one" from this [`Client`]
475    ///
476    /// ```javascript
477    /// const tables = await client.open_table("table_one");
478    /// ```
479    #[wasm_bindgen]
480    pub async fn open_table(&self, entity_id: String) -> ApiResult<Table> {
481        Ok(Table(self.client.open_table(entity_id).await?))
482    }
483
484    /// Retrieves the names of all tables that this client has access to.
485    ///
486    /// `name` is a string identifier unique to the [`Table`] (per [`Client`]),
487    /// which can be used in conjunction with [`Client::open_table`] to get
488    /// a [`Table`] instance without the use of [`Client::table`]
489    /// constructor directly (e.g., one created by another [`Client`]).
490    ///
491    /// # JavaScript Examples
492    ///
493    /// ```javascript
494    /// const tables = await client.get_hosted_table_names();
495    /// ```
496    #[wasm_bindgen]
497    pub async fn get_hosted_table_names(&self) -> ApiResult<Vec<String>> {
498        Ok(self.client.get_hosted_table_names().await?)
499    }
500
501    /// Register a callback which is invoked whenever [`Client::table`] (on this
502    /// [`Client`]) or [`Table::delete`] (on a [`Table`] belinging to this
503    /// [`Client`]) are called.
504    #[wasm_bindgen]
505    pub async fn on_hosted_tables_update(&self, on_update_js: Function) -> ApiResult<u32> {
506        let poll_loop = LocalPollLoop::new(move |_| on_update_js.call0(&JsValue::UNDEFINED));
507        let on_update = Box::new(move || poll_loop.poll(()));
508        let id = self.client.on_hosted_tables_update(on_update).await?;
509        Ok(id)
510    }
511
512    /// Remove a callback previously registered via
513    /// `Client::on_hosted_tables_update`.
514    #[wasm_bindgen]
515    pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ApiResult<()> {
516        self.client.remove_hosted_tables_update(update_id).await?;
517        Ok(())
518    }
519
520    /// Provides the [`SystemInfo`] struct, implementation-specific metadata
521    /// about the [`perspective_server::Server`] runtime such as Memory and
522    /// CPU usage.
523    ///
524    /// For WebAssembly servers, this method includes the WebAssembly heap size.
525    ///
526    /// # JavaScript Examples
527    ///
528    /// ```javascript
529    /// const info = await client.system_info();
530    /// ```
531    #[wasm_bindgen]
532    pub async fn system_info(&self) -> ApiResult<JsSystemInfo> {
533        let mut info = self.client.system_info().await?;
534
535        #[cfg(feature = "trace-allocator")]
536        if let perspective_client::SystemInfo {
537            client_used: None,
538            client_heap: None,
539            ..
540        } = &info
541        {
542            let (client_used, client_heap) = crate::utils::get_used();
543            info.client_used = Some(client_used as u64);
544            info.client_heap = Some(client_heap as u64);
545        };
546
547        let timestamp = web_sys::window()
548            .and_then(|x| x.performance())
549            .map(|x| x.now() as u64);
550
551        info.timestamp = timestamp;
552        let record = JsValue::from_serde_ext(&info.cast::<f64>())?;
553        Ok(record.unchecked_into())
554    }
555}
556
557#[wasm_bindgen]
558extern "C" {
559    #[wasm_bindgen(typescript_type = "SystemInfo")]
560    pub type JsSystemInfo;
561}