perspective_js/client.rs
1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
5// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors. ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::error::Error;
14use std::future::Future;
15use std::sync::Arc;
16
17use derivative::Derivative;
18use futures::channel::oneshot;
19use js_sys::{Function, Uint8Array};
20#[cfg(doc)]
21use perspective_client::SystemInfo;
22use perspective_client::{
23 ClientError, ReconnectCallback, Session, TableData, TableInitOptions, asyncfn,
24};
25use wasm_bindgen::prelude::*;
26use wasm_bindgen_derive::TryFromJsValue;
27use wasm_bindgen_futures::{JsFuture, future_to_promise};
28
29pub use crate::table::*;
30use crate::utils::{ApiError, ApiResult, JsValueSerdeExt, LocalPollLoop};
31use crate::{TableDataExt, apierror};
32
33#[wasm_bindgen]
34extern "C" {
35 #[derive(Clone)]
36 #[wasm_bindgen(typescript_type = "TableInitOptions")]
37 pub type JsTableInitOptions;
38}
39
40#[wasm_bindgen]
41#[derive(Clone)]
42pub struct ProxySession(perspective_client::ProxySession);
43
44#[wasm_bindgen]
45impl ProxySession {
46 #[wasm_bindgen(constructor)]
47 pub fn new(client: &Client, on_response: &Function) -> Self {
48 let poll_loop = LocalPollLoop::new({
49 let on_response = on_response.clone();
50 move |msg: Vec<u8>| {
51 let msg = Uint8Array::from(&msg[..]);
52 on_response.call1(&JsValue::UNDEFINED, &JsValue::from(msg))?;
53 Ok(JsValue::null())
54 }
55 });
56 // NB: This swallows any errors raised by the inner callback
57 let on_response = Box::new(move |msg: &[u8]| {
58 wasm_bindgen_futures::spawn_local(poll_loop.poll(msg.to_vec()));
59 Ok(())
60 });
61 Self(perspective_client::ProxySession::new(
62 client.client.clone(),
63 on_response,
64 ))
65 }
66
67 #[wasm_bindgen]
68 pub async fn handle_request(&self, value: JsValue) -> ApiResult<()> {
69 let uint8array = Uint8Array::new(&value);
70 let slice = uint8array.to_vec();
71 self.0.handle_request(&slice).await?;
72 Ok(())
73 }
74
75 pub async fn close(self) {
76 self.0.close().await;
77 }
78}
79
80/// An instance of a [`Client`] is a connection to a single
81/// `perspective_server::Server`, whether locally in-memory or remote over some
82/// transport like a WebSocket.
83///
84/// The browser and node.js libraries both support the `websocket(url)`
85/// constructor, which connects to a remote `perspective_server::Server`
86/// instance over a WebSocket transport.
87///
88/// In the browser, the `worker()` constructor creates a new Web Worker
89/// `perspective_server::Server` and returns a [`Client`] connected to it.
90///
91/// In node.js, a pre-instantied [`Client`] connected synhronously to a global
92/// singleton `perspective_server::Server` is the default module export.
93///
94/// # JavaScript Examples
95///
96/// Create a Web Worker `perspective_server::Server` in the browser and return a
97/// [`Client`] instance connected for it:
98///
99/// ```javascript
100/// import perspective from "@perspective-dev/client";
101/// const client = await perspective.worker();
102/// ```
103///
104/// Create a WebSocket connection to a remote `perspective_server::Server`:
105///
106/// ```javascript
107/// import perspective from "@perspective-dev/client";
108/// const client = await perspective.websocket("ws://locahost:8080/ws");
109/// ```
110///
111/// Access the synchronous client in node.js:
112///
113/// ```javascript
114/// import { default as client } from "@perspective-dev/client";
115/// ```
116#[wasm_bindgen]
117#[derive(TryFromJsValue, Clone)]
118pub struct Client {
119 pub(crate) close: Option<Function>,
120 pub(crate) client: perspective_client::Client,
121}
122
123impl PartialEq for Client {
124 fn eq(&self, other: &Self) -> bool {
125 self.client.get_name() == other.client.get_name()
126 }
127}
128
129/// A wrapper around [`js_sys::Function`] to ease async integration for the
130/// `reconnect` argument of [`Client::on_error`] callback.
131#[derive(Derivative)]
132#[derivative(Clone(bound = ""))]
133struct JsReconnect<I>(Arc<dyn Fn(I) -> js_sys::Promise>);
134
135unsafe impl<I> Send for JsReconnect<I> {}
136unsafe impl<I> Sync for JsReconnect<I> {}
137
138impl<I> JsReconnect<I> {
139 fn run(&self, args: I) -> js_sys::Promise {
140 self.0(args)
141 }
142
143 fn run_all(
144 &self,
145 args: I,
146 ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>>
147 + Send
148 + Sync
149 + 'static
150 + use<I> {
151 let (sender, receiver) = oneshot::channel::<Result<(), Box<dyn Error + Send + Sync>>>();
152 let p = self.0(args);
153 let _ = future_to_promise(async move {
154 let result = JsFuture::from(p)
155 .await
156 .map(|_| ())
157 .map_err(|x| format!("{x:?}").into());
158
159 sender.send(result).unwrap();
160 Ok(JsValue::UNDEFINED)
161 });
162
163 async move { receiver.await.unwrap() }
164 }
165}
166
167impl<F, I> From<F> for JsReconnect<I>
168where
169 F: Fn(I) -> js_sys::Promise + 'static,
170{
171 fn from(value: F) -> Self {
172 JsReconnect(Arc::new(value))
173 }
174}
175
176impl Client {
177 pub fn get_client(&self) -> &'_ perspective_client::Client {
178 &self.client
179 }
180}
181
182#[wasm_bindgen]
183impl Client {
184 #[wasm_bindgen(constructor)]
185 pub fn new(send_request: Function, close: Option<Function>) -> ApiResult<Self> {
186 let send_request = JsReconnect::from(move |mut v: Vec<u8>| {
187 let buff2 = unsafe { js_sys::Uint8Array::view_mut_raw(v.as_mut_ptr(), v.len()) };
188 send_request
189 .call1(&JsValue::UNDEFINED, &buff2)
190 .unwrap()
191 .unchecked_into::<js_sys::Promise>()
192 });
193
194 let client = perspective_client::Client::new_with_callback(None, move |msg| {
195 send_request.run_all(msg)
196 })?;
197
198 Ok(Client { close, client })
199 }
200
201 #[wasm_bindgen]
202 pub fn new_proxy_session(&self, on_response: &Function) -> ProxySession {
203 ProxySession::new(self, on_response)
204 }
205
206 #[wasm_bindgen]
207 pub async fn handle_response(&self, value: &JsValue) -> ApiResult<()> {
208 let uint8array = Uint8Array::new(value);
209 let slice = uint8array.to_vec();
210 self.client.handle_response(&slice).await?;
211 Ok(())
212 }
213
214 #[wasm_bindgen]
215 pub async fn handle_error(&self, error: String, reconnect: Option<Function>) -> ApiResult<()> {
216 self.client
217 .handle_error(
218 ClientError::Unknown(error),
219 reconnect.map(|reconnect| {
220 let reconnect =
221 JsReconnect::from(move |()| match reconnect.call0(&JsValue::UNDEFINED) {
222 Ok(x) => x.unchecked_into::<js_sys::Promise>(),
223 Err(e) => {
224 // This error may occur when _invoking_ the function
225 tracing::warn!("{:?}", e);
226 js_sys::Promise::reject(&format!("C {e:?}").into())
227 },
228 });
229
230 asyncfn!(reconnect, async move || {
231 if let Err(e) = JsFuture::from(reconnect.run(())).await {
232 if let Some(e) = e.dyn_ref::<js_sys::Object>() {
233 Err(ClientError::Unknown(e.to_string().as_string().unwrap()))
234 } else {
235 Err(ClientError::Unknown(e.as_string().unwrap()))
236 }
237 } else {
238 Ok(())
239 }
240 })
241 }),
242 )
243 .await?;
244
245 Ok(())
246 }
247
248 #[wasm_bindgen]
249 pub async fn on_error(&self, callback: Function) -> ApiResult<u32> {
250 let callback = JsReconnect::from(
251 move |(message, reconnect): (ClientError, Option<ReconnectCallback>)| {
252 let cl: Closure<dyn Fn() -> js_sys::Promise> = Closure::new(move || {
253 let reconnect = reconnect.clone();
254 future_to_promise(async move {
255 if let Some(f) = reconnect {
256 (*f)().await.map_err(|e| JsValue::from(format!("A {e}")))?;
257 }
258
259 Ok(JsValue::UNDEFINED)
260 })
261 });
262
263 if let Err(e) = callback.call2(
264 &JsValue::UNDEFINED,
265 &JsValue::from(apierror!(ClientError(message))),
266 &cl.into_js_value(),
267 ) {
268 tracing::warn!("{:?}", e);
269 }
270
271 js_sys::Promise::resolve(&JsValue::UNDEFINED)
272 },
273 );
274
275 let poll_loop = LocalPollLoop::new_async(move |x| JsFuture::from(callback.run(x)));
276 let id = self
277 .client
278 .on_error(asyncfn!(poll_loop, async move |message, reconnect| {
279 poll_loop.poll((message, reconnect)).await;
280 Ok(())
281 }))
282 .await?;
283
284 Ok(id)
285 }
286
287 /// Creates a new [`Table`] from either a _schema_ or _data_.
288 ///
289 /// The [`Client::table`] factory function can be initialized with either a
290 /// _schema_ (see [`Table::schema`]), or data in one of these formats:
291 ///
292 /// - Apache Arrow
293 /// - CSV
294 /// - JSON row-oriented
295 /// - JSON column-oriented
296 /// - NDJSON
297 ///
298 /// When instantiated with _data_, the schema is inferred from this data.
299 /// While this is convenient, inferrence is sometimes imperfect e.g.
300 /// when the input is empty, null or ambiguous. For these cases,
301 /// [`Client::table`] can first be instantiated with a explicit schema.
302 ///
303 /// When instantiated with a _schema_, the resulting [`Table`] is empty but
304 /// with known column names and column types. When subsqeuently
305 /// populated with [`Table::update`], these columns will be _coerced_ to
306 /// the schema's type. This behavior can be useful when
307 /// [`Client::table`]'s column type inferences doesn't work.
308 ///
309 /// The resulting [`Table`] is _virtual_, and invoking its methods
310 /// dispatches events to the `perspective_server::Server` this
311 /// [`Client`] connects to, where the data is stored and all calculation
312 /// occurs.
313 ///
314 /// # Arguments
315 ///
316 /// - `arg` - Either _schema_ or initialization _data_.
317 /// - `options` - Optional configuration which provides one of:
318 /// - `limit` - The max number of rows the resulting [`Table`] can
319 /// store.
320 /// - `index` - The column name to use as an _index_ column. If this
321 /// `Table` is being instantiated by _data_, this column name must be
322 /// present in the data.
323 /// - `name` - The name of the table. This will be generated if it is
324 /// not provided.
325 /// - `format` - The explicit format of the input data, can be one of
326 /// `"json"`, `"columns"`, `"csv"` or `"arrow"`. This overrides
327 /// language-specific type dispatch behavior, which allows stringified
328 /// and byte array alternative inputs.
329 ///
330 /// # JavaScript Examples
331 ///
332 /// Load a CSV from a `string`:
333 ///
334 /// ```javascript
335 /// const table = await client.table("x,y\n1,2\n3,4");
336 /// ```
337 ///
338 /// Load an Arrow from an `ArrayBuffer`:
339 ///
340 /// ```javascript
341 /// import * as fs from "node:fs/promises";
342 /// const table2 = await client.table(await fs.readFile("superstore.arrow"));
343 /// ```
344 ///
345 /// Load a CSV from a `UInt8Array` (the default for this type is Arrow)
346 /// using a format override:
347 ///
348 /// ```javascript
349 /// const enc = new TextEncoder();
350 /// const table = await client.table(enc.encode("x,y\n1,2\n3,4"), {
351 /// format: "csv",
352 /// });
353 /// ```
354 ///
355 /// Create a table with an `index`:
356 ///
357 /// ```javascript
358 /// const table = await client.table(data, { index: "Row ID" });
359 /// ```
360 #[wasm_bindgen]
361 pub async fn table(
362 &self,
363 value: &JsTableInitData,
364 options: Option<JsTableInitOptions>,
365 ) -> ApiResult<Table> {
366 let options = options
367 .into_serde_ext::<Option<TableInitOptions>>()?
368 .unwrap_or_default();
369
370 let args = TableData::from_js_value(value, options.format)?;
371 Ok(Table(self.client.table(args, options).await?))
372 }
373
374 /// Terminates this [`Client`], cleaning up any [`crate::View`] handles the
375 /// [`Client`] has open as well as its callbacks.
376 #[wasm_bindgen]
377 pub fn terminate(&self) -> ApiResult<JsValue> {
378 if let Some(f) = self.close.clone() {
379 Ok(f.call0(&JsValue::UNDEFINED)?)
380 } else {
381 Err(ApiError::new("Client type cannot be terminated"))
382 }
383 }
384
385 /// Opens a [`Table`] that is hosted on the `perspective_server::Server`
386 /// that is connected to this [`Client`].
387 ///
388 /// The `name` property of [`TableInitOptions`] is used to identify each
389 /// [`Table`]. [`Table`] `name`s can be looked up for each [`Client`]
390 /// via [`Client::get_hosted_table_names`].
391 ///
392 /// # JavaScript Examples
393 ///
394 /// Get a virtual [`Table`] named "table_one" from this [`Client`]
395 ///
396 /// ```javascript
397 /// const tables = await client.open_table("table_one");
398 /// ```
399 #[wasm_bindgen]
400 pub async fn open_table(&self, entity_id: String) -> ApiResult<Table> {
401 Ok(Table(self.client.open_table(entity_id).await?))
402 }
403
404 /// Retrieves the names of all tables that this client has access to.
405 ///
406 /// `name` is a string identifier unique to the [`Table`] (per [`Client`]),
407 /// which can be used in conjunction with [`Client::open_table`] to get
408 /// a [`Table`] instance without the use of [`Client::table`]
409 /// constructor directly (e.g., one created by another [`Client`]).
410 ///
411 /// # JavaScript Examples
412 ///
413 /// ```javascript
414 /// const tables = await client.get_hosted_table_names();
415 /// ```
416 #[wasm_bindgen]
417 pub async fn get_hosted_table_names(&self) -> ApiResult<JsValue> {
418 Ok(JsValue::from_serde_ext(
419 &self.client.get_hosted_table_names().await?,
420 )?)
421 }
422
423 /// Register a callback which is invoked whenever [`Client::table`] (on this
424 /// [`Client`]) or [`Table::delete`] (on a [`Table`] belinging to this
425 /// [`Client`]) are called.
426 #[wasm_bindgen]
427 pub async fn on_hosted_tables_update(&self, on_update_js: Function) -> ApiResult<u32> {
428 let poll_loop = LocalPollLoop::new(move |_| on_update_js.call0(&JsValue::UNDEFINED));
429 let on_update = Box::new(move || poll_loop.poll(()));
430 let id = self.client.on_hosted_tables_update(on_update).await?;
431 Ok(id)
432 }
433
434 /// Remove a callback previously registered via
435 /// `Client::on_hosted_tables_update`.
436 #[wasm_bindgen]
437 pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ApiResult<()> {
438 self.client.remove_hosted_tables_update(update_id).await?;
439 Ok(())
440 }
441
442 /// Provides the [`SystemInfo`] struct, implementation-specific metadata
443 /// about the [`perspective_server::Server`] runtime such as Memory and
444 /// CPU usage.
445 ///
446 /// For WebAssembly servers, this method includes the WebAssembly heap size.
447 ///
448 /// # JavaScript Examples
449 ///
450 /// ```javascript
451 /// const info = await client.system_info();
452 /// ```
453 #[wasm_bindgen]
454 pub async fn system_info(&self) -> ApiResult<JsSystemInfo> {
455 let mut info = self.client.system_info().await?;
456
457 #[cfg(feature = "trace-allocator")]
458 if let perspective_client::SystemInfo {
459 client_used: None,
460 client_heap: None,
461 ..
462 } = &info
463 {
464 let (client_used, client_heap) = crate::utils::get_used();
465 info.client_used = Some(client_used as u64);
466 info.client_heap = Some(client_heap as u64);
467 };
468
469 let timestamp = web_sys::window()
470 .and_then(|x| x.performance())
471 .map(|x| x.now() as u64);
472
473 info.timestamp = timestamp;
474 let record = JsValue::from_serde_ext(&info.cast::<f64>())?;
475 Ok(record.unchecked_into())
476 }
477}
478
479#[wasm_bindgen]
480extern "C" {
481 #[wasm_bindgen(typescript_type = "SystemInfo")]
482 pub type JsSystemInfo;
483}