perspective_js/client.rs
1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
5// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors. ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::error::Error;
14use std::future::Future;
15use std::sync::Arc;
16
17use derivative::Derivative;
18use futures::channel::oneshot;
19use js_sys::{Function, Uint8Array};
20#[cfg(doc)]
21use perspective_client::SystemInfo;
22use perspective_client::{
23 ClientError, ReconnectCallback, Session, TableData, TableInitOptions, TableRef, asyncfn,
24};
25use wasm_bindgen::prelude::*;
26use wasm_bindgen_derive::TryFromJsValue;
27use wasm_bindgen_futures::{JsFuture, future_to_promise};
28
29pub use crate::table::*;
30use crate::utils::{ApiError, ApiResult, JsValueSerdeExt, LocalPollLoop};
31use crate::{TableDataExt, apierror};
32
33#[wasm_bindgen]
34extern "C" {
35 #[derive(Clone)]
36 #[wasm_bindgen(typescript_type = "TableInitOptions")]
37 pub type JsTableInitOptions;
38
39 #[derive(Clone)]
40 #[wasm_bindgen(typescript_type = "JoinOptions")]
41 pub type JsJoinOptions;
42}
43
44async fn js_to_table_ref(val: &JsValue) -> ApiResult<TableRef> {
45 if let Some(name) = val.as_string() {
46 Ok(TableRef::from(name))
47 } else {
48 let get_name = js_sys::Reflect::get(val, &wasm_bindgen::intern("get_name").into())
49 .map_err(|_| apierror!(TableRefError))?
50 .dyn_into::<js_sys::Function>()
51 .map_err(|_| apierror!(TableRefError))?;
52
53 let promise = get_name
54 .call0(val)
55 .map_err(|_| apierror!(TableRefError))?
56 .dyn_into::<js_sys::Promise>()
57 .map_err(|_| apierror!(TableRefError))?;
58
59 let name = wasm_bindgen_futures::JsFuture::from(promise)
60 .await
61 .map_err(|_| apierror!(TableRefError))?
62 .as_string()
63 .ok_or_else(|| apierror!(TableRefError))?;
64
65 Ok(TableRef::from(name))
66 }
67}
68
69#[wasm_bindgen]
70#[derive(Clone)]
71pub struct ProxySession(perspective_client::ProxySession);
72
73#[wasm_bindgen]
74impl ProxySession {
75 #[wasm_bindgen(constructor)]
76 pub fn new(client: &Client, on_response: &Function) -> Self {
77 let poll_loop = LocalPollLoop::new({
78 let on_response = on_response.clone();
79 move |msg: Vec<u8>| {
80 let msg = Uint8Array::from(&msg[..]);
81 on_response.call1(&JsValue::UNDEFINED, &JsValue::from(msg))?;
82 Ok(JsValue::null())
83 }
84 });
85 // NB: This swallows any errors raised by the inner callback
86 let on_response = Box::new(move |msg: &[u8]| {
87 wasm_bindgen_futures::spawn_local(poll_loop.poll(msg.to_vec()));
88 Ok(())
89 });
90 Self(perspective_client::ProxySession::new(
91 client.client.clone(),
92 on_response,
93 ))
94 }
95
96 #[wasm_bindgen]
97 pub async fn handle_request(&self, value: JsValue) -> ApiResult<()> {
98 let uint8array = Uint8Array::new(&value);
99 let slice = uint8array.to_vec();
100 self.0.handle_request(&slice).await?;
101 Ok(())
102 }
103
104 pub async fn close(self) {
105 self.0.close().await;
106 }
107}
108
109/// An instance of a [`Client`] is a connection to a single
110/// `perspective_server::Server`, whether locally in-memory or remote over some
111/// transport like a WebSocket.
112///
113/// The browser and node.js libraries both support the `websocket(url)`
114/// constructor, which connects to a remote `perspective_server::Server`
115/// instance over a WebSocket transport.
116///
117/// In the browser, the `worker()` constructor creates a new Web Worker
118/// `perspective_server::Server` and returns a [`Client`] connected to it.
119///
120/// In node.js, a pre-instantied [`Client`] connected synhronously to a global
121/// singleton `perspective_server::Server` is the default module export.
122///
123/// # JavaScript Examples
124///
125/// Create a Web Worker `perspective_server::Server` in the browser and return a
126/// [`Client`] instance connected for it:
127///
128/// ```javascript
129/// import perspective from "@perspective-dev/client";
130/// const client = await perspective.worker();
131/// ```
132///
133/// Create a WebSocket connection to a remote `perspective_server::Server`:
134///
135/// ```javascript
136/// import perspective from "@perspective-dev/client";
137/// const client = await perspective.websocket("ws://locahost:8080/ws");
138/// ```
139///
140/// Access the synchronous client in node.js:
141///
142/// ```javascript
143/// import { default as client } from "@perspective-dev/client";
144/// ```
145#[wasm_bindgen]
146#[derive(TryFromJsValue, Clone)]
147pub struct Client {
148 pub(crate) client: perspective_client::Client,
149 pub(crate) close: Option<Function>,
150}
151
152impl From<perspective_client::Client> for Client {
153 fn from(client: perspective_client::Client) -> Self {
154 Client {
155 client,
156 close: None,
157 }
158 }
159}
160
161impl PartialEq for Client {
162 fn eq(&self, other: &Self) -> bool {
163 self.client.get_name() == other.client.get_name()
164 }
165}
166
167/// A wrapper around [`js_sys::Function`] to ease async integration for the
168/// `reconnect` argument of [`Client::on_error`] callback.
169#[derive(Derivative)]
170#[derivative(Clone(bound = ""))]
171struct JsReconnect<I>(Arc<dyn Fn(I) -> js_sys::Promise>);
172
173// This type is not thread safe, but the JavaScript environment does not allow
174// threading.
175unsafe impl<I> Send for JsReconnect<I> {}
176unsafe impl<I> Sync for JsReconnect<I> {}
177
178impl<I> JsReconnect<I> {
179 fn run(&self, args: I) -> js_sys::Promise {
180 self.0(args)
181 }
182
183 fn run_all(
184 &self,
185 args: I,
186 ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>>
187 + Send
188 + Sync
189 + 'static
190 + use<I> {
191 let (sender, receiver) = oneshot::channel::<Result<(), Box<dyn Error + Send + Sync>>>();
192 let p = self.0(args);
193 let _ = future_to_promise(async move {
194 let result = JsFuture::from(p)
195 .await
196 .map(|_| ())
197 .map_err(|x| format!("{x:?}").into());
198
199 sender.send(result).unwrap();
200 Ok(JsValue::UNDEFINED)
201 });
202
203 async move { receiver.await.unwrap() }
204 }
205}
206
207impl<F, I> From<F> for JsReconnect<I>
208where
209 F: Fn(I) -> js_sys::Promise + 'static,
210{
211 fn from(value: F) -> Self {
212 JsReconnect(Arc::new(value))
213 }
214}
215
216impl Client {
217 pub fn get_client(&self) -> &'_ perspective_client::Client {
218 &self.client
219 }
220}
221
222#[wasm_bindgen]
223impl Client {
224 #[wasm_bindgen(constructor)]
225 pub fn new(send_request: Function, close: Option<Function>) -> ApiResult<Self> {
226 let send_request = JsReconnect::from(move |mut v: Vec<u8>| {
227 let buff2 = unsafe { js_sys::Uint8Array::view_mut_raw(v.as_mut_ptr(), v.len()) };
228 send_request
229 .call1(&JsValue::UNDEFINED, &buff2)
230 .unwrap()
231 .unchecked_into::<js_sys::Promise>()
232 });
233
234 let client = perspective_client::Client::new_with_callback(None, move |msg| {
235 send_request.run_all(msg)
236 })?;
237
238 Ok(Client { close, client })
239 }
240
241 #[wasm_bindgen]
242 pub fn new_proxy_session(&self, on_response: &Function) -> ProxySession {
243 ProxySession::new(self, on_response)
244 }
245
246 #[wasm_bindgen]
247 pub async fn handle_response(&self, value: &JsValue) -> ApiResult<()> {
248 let uint8array = Uint8Array::new(value);
249 let slice = uint8array.to_vec();
250 self.client.handle_response(&slice).await?;
251 Ok(())
252 }
253
254 #[wasm_bindgen]
255 pub async fn handle_error(&self, error: String, reconnect: Option<Function>) -> ApiResult<()> {
256 self.client
257 .handle_error(
258 ClientError::Unknown(error),
259 reconnect.map(|reconnect| {
260 let reconnect =
261 JsReconnect::from(move |()| match reconnect.call0(&JsValue::UNDEFINED) {
262 Ok(x) => x.unchecked_into::<js_sys::Promise>(),
263 Err(e) => {
264 // This error may occur when _invoking_ the function
265 tracing::warn!("{:?}", e);
266 js_sys::Promise::reject(&format!("C {e:?}").into())
267 },
268 });
269
270 asyncfn!(reconnect, async move || {
271 if let Err(e) = JsFuture::from(reconnect.run(())).await {
272 if let Some(e) = e.dyn_ref::<js_sys::Object>() {
273 Err(ClientError::Unknown(e.to_string().as_string().unwrap()))
274 } else {
275 Err(ClientError::Unknown(e.as_string().unwrap()))
276 }
277 } else {
278 Ok(())
279 }
280 })
281 }),
282 )
283 .await?;
284
285 Ok(())
286 }
287
288 #[wasm_bindgen]
289 pub async fn on_error(&self, callback: Function) -> ApiResult<u32> {
290 let callback = JsReconnect::from(
291 move |(message, reconnect): (ClientError, Option<ReconnectCallback>)| {
292 let cl: Closure<dyn Fn() -> js_sys::Promise> = Closure::new(move || {
293 let reconnect = reconnect.clone();
294 future_to_promise(async move {
295 if let Some(f) = reconnect {
296 (*f)().await.map_err(|e| JsValue::from(format!("A {e}")))?;
297 }
298
299 Ok(JsValue::UNDEFINED)
300 })
301 });
302
303 if let Err(e) = callback.call2(
304 &JsValue::UNDEFINED,
305 &JsValue::from(apierror!(ClientError(message))),
306 &cl.into_js_value(),
307 ) {
308 tracing::warn!("{:?}", e);
309 }
310
311 js_sys::Promise::resolve(&JsValue::UNDEFINED)
312 },
313 );
314
315 let poll_loop = LocalPollLoop::new_async(move |x| JsFuture::from(callback.run(x)));
316 let id = self
317 .client
318 .on_error(asyncfn!(poll_loop, async move |message, reconnect| {
319 poll_loop.poll((message, reconnect)).await;
320 Ok(())
321 }))
322 .await?;
323
324 Ok(id)
325 }
326
327 /// Creates a new [`Table`] from either a _schema_ or _data_.
328 ///
329 /// The [`Client::table`] factory function can be initialized with either a
330 /// _schema_ (see [`Table::schema`]), or data in one of these formats:
331 ///
332 /// - Apache Arrow
333 /// - CSV
334 /// - JSON row-oriented
335 /// - JSON column-oriented
336 /// - NDJSON
337 ///
338 /// When instantiated with _data_, the schema is inferred from this data.
339 /// While this is convenient, inferrence is sometimes imperfect e.g.
340 /// when the input is empty, null or ambiguous. For these cases,
341 /// [`Client::table`] can first be instantiated with a explicit schema.
342 ///
343 /// When instantiated with a _schema_, the resulting [`Table`] is empty but
344 /// with known column names and column types. When subsqeuently
345 /// populated with [`Table::update`], these columns will be _coerced_ to
346 /// the schema's type. This behavior can be useful when
347 /// [`Client::table`]'s column type inferences doesn't work.
348 ///
349 /// The resulting [`Table`] is _virtual_, and invoking its methods
350 /// dispatches events to the `perspective_server::Server` this
351 /// [`Client`] connects to, where the data is stored and all calculation
352 /// occurs.
353 ///
354 /// # Arguments
355 ///
356 /// - `arg` - Either _schema_ or initialization _data_.
357 /// - `options` - Optional configuration which provides one of:
358 /// - `limit` - The max number of rows the resulting [`Table`] can
359 /// store.
360 /// - `index` - The column name to use as an _index_ column. If this
361 /// `Table` is being instantiated by _data_, this column name must be
362 /// present in the data.
363 /// - `name` - The name of the table. This will be generated if it is
364 /// not provided.
365 /// - `format` - The explicit format of the input data, can be one of
366 /// `"json"`, `"columns"`, `"csv"` or `"arrow"`. This overrides
367 /// language-specific type dispatch behavior, which allows stringified
368 /// and byte array alternative inputs.
369 ///
370 /// # JavaScript Examples
371 ///
372 /// Load a CSV from a `string`:
373 ///
374 /// ```javascript
375 /// const table = await client.table("x,y\n1,2\n3,4");
376 /// ```
377 ///
378 /// Load an Arrow from an `ArrayBuffer`:
379 ///
380 /// ```javascript
381 /// import * as fs from "node:fs/promises";
382 /// const table2 = await client.table(await fs.readFile("superstore.arrow"));
383 /// ```
384 ///
385 /// Load a CSV from a `UInt8Array` (the default for this type is Arrow)
386 /// using a format override:
387 ///
388 /// ```javascript
389 /// const enc = new TextEncoder();
390 /// const table = await client.table(enc.encode("x,y\n1,2\n3,4"), {
391 /// format: "csv",
392 /// });
393 /// ```
394 ///
395 /// Create a table with an `index`:
396 ///
397 /// ```javascript
398 /// const table = await client.table(data, { index: "Row ID" });
399 /// ```
400 #[wasm_bindgen]
401 pub async fn table(
402 &self,
403 value: &JsTableInitData,
404 options: Option<JsTableInitOptions>,
405 ) -> ApiResult<Table> {
406 let options = options
407 .into_serde_ext::<Option<TableInitOptions>>()?
408 .unwrap_or_default();
409
410 let args = TableData::from_js_value(value, options.format)?;
411 Ok(Table(self.client.table(args, options).await?))
412 }
413
414 /// Creates a new read-only [`Table`] by performing an INNER JOIN on two
415 /// source tables. The resulting table is reactive: when either source
416 /// table is updated, the join is automatically recomputed.
417 ///
418 /// # Arguments
419 ///
420 /// - `left` - The left source table (a [`Table`] instance or a table name
421 /// string).
422 /// - `right` - The right source table (a [`Table`] instance or a table name
423 /// string).
424 /// - `on` - The column name to join on. Must exist in both tables with the
425 /// same type.
426 /// - `options` - Optional join configuration: `{ join_type?: "inner" |
427 /// "left" | "outer", name?: string }`.
428 ///
429 /// # JavaScript Examples
430 ///
431 /// ```javascript
432 /// const joined = await client.join(orders_table, products_table, "Product ID", { join_type: "left" });
433 /// const joined = await client.join("orders", "products", "Product ID", { join_type: "left" });
434 /// ```
435 #[wasm_bindgen]
436 pub async fn join(
437 &self,
438 left: JsValue,
439 right: JsValue,
440 on: &str,
441 options: Option<JsJoinOptions>,
442 ) -> ApiResult<Table> {
443 let options = options
444 .into_serde_ext::<Option<perspective_client::JoinOptions>>()?
445 .unwrap_or_default();
446
447 let left_ref = js_to_table_ref(&left).await?;
448 let right_ref = js_to_table_ref(&right).await?;
449 Ok(Table(
450 self.client.join(left_ref, right_ref, on, options).await?,
451 ))
452 }
453
454 /// Terminates this [`Client`], cleaning up any [`crate::View`] handles the
455 /// [`Client`] has open as well as its callbacks.
456 #[wasm_bindgen]
457 pub fn terminate(&self) -> ApiResult<JsValue> {
458 if let Some(f) = self.close.clone() {
459 Ok(f.call0(&JsValue::UNDEFINED)?)
460 } else {
461 Err(ApiError::new("Client type cannot be terminated"))
462 }
463 }
464
465 /// Opens a [`Table`] that is hosted on the `perspective_server::Server`
466 /// that is connected to this [`Client`].
467 ///
468 /// The `name` property of [`TableInitOptions`] is used to identify each
469 /// [`Table`]. [`Table`] `name`s can be looked up for each [`Client`]
470 /// via [`Client::get_hosted_table_names`].
471 ///
472 /// # JavaScript Examples
473 ///
474 /// Get a virtual [`Table`] named "table_one" from this [`Client`]
475 ///
476 /// ```javascript
477 /// const tables = await client.open_table("table_one");
478 /// ```
479 #[wasm_bindgen]
480 pub async fn open_table(&self, entity_id: String) -> ApiResult<Table> {
481 Ok(Table(self.client.open_table(entity_id).await?))
482 }
483
484 /// Retrieves the names of all tables that this client has access to.
485 ///
486 /// `name` is a string identifier unique to the [`Table`] (per [`Client`]),
487 /// which can be used in conjunction with [`Client::open_table`] to get
488 /// a [`Table`] instance without the use of [`Client::table`]
489 /// constructor directly (e.g., one created by another [`Client`]).
490 ///
491 /// # JavaScript Examples
492 ///
493 /// ```javascript
494 /// const tables = await client.get_hosted_table_names();
495 /// ```
496 #[wasm_bindgen]
497 pub async fn get_hosted_table_names(&self) -> ApiResult<Vec<String>> {
498 Ok(self.client.get_hosted_table_names().await?)
499 }
500
501 /// Register a callback which is invoked whenever [`Client::table`] (on this
502 /// [`Client`]) or [`Table::delete`] (on a [`Table`] belinging to this
503 /// [`Client`]) are called.
504 #[wasm_bindgen]
505 pub async fn on_hosted_tables_update(&self, on_update_js: Function) -> ApiResult<u32> {
506 let poll_loop = LocalPollLoop::new(move |_| on_update_js.call0(&JsValue::UNDEFINED));
507 let on_update = Box::new(move || poll_loop.poll(()));
508 let id = self.client.on_hosted_tables_update(on_update).await?;
509 Ok(id)
510 }
511
512 /// Remove a callback previously registered via
513 /// `Client::on_hosted_tables_update`.
514 #[wasm_bindgen]
515 pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ApiResult<()> {
516 self.client.remove_hosted_tables_update(update_id).await?;
517 Ok(())
518 }
519
520 /// Provides the [`SystemInfo`] struct, implementation-specific metadata
521 /// about the [`perspective_server::Server`] runtime such as Memory and
522 /// CPU usage.
523 ///
524 /// For WebAssembly servers, this method includes the WebAssembly heap size.
525 ///
526 /// # JavaScript Examples
527 ///
528 /// ```javascript
529 /// const info = await client.system_info();
530 /// ```
531 #[wasm_bindgen]
532 pub async fn system_info(&self) -> ApiResult<JsSystemInfo> {
533 let mut info = self.client.system_info().await?;
534
535 #[cfg(feature = "trace-allocator")]
536 if let perspective_client::SystemInfo {
537 client_used: None,
538 client_heap: None,
539 ..
540 } = &info
541 {
542 let (client_used, client_heap) = crate::utils::get_used();
543 info.client_used = Some(client_used as u64);
544 info.client_heap = Some(client_heap as u64);
545 };
546
547 let timestamp = web_sys::window()
548 .and_then(|x| x.performance())
549 .map(|x| x.now() as u64);
550
551 info.timestamp = timestamp;
552 let record = JsValue::from_serde_ext(&info.cast::<f64>())?;
553 Ok(record.unchecked_into())
554 }
555}
556
557#[wasm_bindgen]
558extern "C" {
559 #[wasm_bindgen(typescript_type = "SystemInfo")]
560 pub type JsSystemInfo;
561}