Skip to main content

perspective_client/
client.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::error::Error;
15use std::ops::Deref;
16use std::sync::Arc;
17
18use async_lock::{Mutex, RwLock};
19use futures::Future;
20use futures::future::{BoxFuture, LocalBoxFuture, join_all};
21use prost::Message;
22use serde::{Deserialize, Serialize};
23use ts_rs::TS;
24
25use crate::proto::request::ClientReq;
26use crate::proto::response::ClientResp;
27use crate::proto::{
28    ColumnType, GetFeaturesReq, GetFeaturesResp, GetHostedTablesReq, GetHostedTablesResp,
29    HostedTable, JoinType, MakeJoinTableReq, MakeTableReq, RemoveHostedTablesUpdateReq, Request,
30    Response, ServerError, ServerSystemInfoReq,
31};
32use crate::table::{JoinOptions, Table, TableInitOptions, TableOptions};
33use crate::table_data::{TableData, UpdateData};
34use crate::table_ref::TableRef;
35use crate::utils::*;
36use crate::view::{OnUpdateData, ViewWindow};
37use crate::{OnUpdateMode, OnUpdateOptions, asyncfn, clone};
38
39/// Metadata about the engine runtime (such as total heap utilization).
40#[derive(Clone, Debug, Serialize, Deserialize, TS)]
41pub struct SystemInfo<T = u64> {
42    /// Total available bytes for allocation on the [`Server`].
43    pub heap_size: T,
44
45    /// Bytes allocated for use on the [`Server`].
46    pub used_size: T,
47
48    /// Wall-clock time spent processing requests on the [`Server`], in
49    /// milliseconds (estimated). This does not properly account for the
50    /// internal thread pool (which enables column-parallel processing of
51    /// individual requests).
52    pub cpu_time: u32,
53
54    /// Milliseconds since internal CPU time accumulator was reset.
55    pub cpu_time_epoch: u32,
56
57    /// Timestamp (POSIX) this request was made. This field may be omitted
58    /// for wasm due to `perspective-client` lacking a dependency on
59    /// `wasm_bindgen`.
60    pub timestamp: Option<T>,
61
62    /// Total available bytes for allocation on the [`Client`]. This is only
63    /// available if `trace-allocator` is enabled.
64    pub client_heap: Option<T>,
65
66    /// Bytes allocated for use on the [`Client`].  This is only
67    /// available if `trace-allocator` is enabled.
68    pub client_used: Option<T>,
69}
70
71impl<U: Copy + 'static> SystemInfo<U> {
72    /// Convert the numeric representation for `T` to something else, which is
73    /// useful for JavaScript where there is no `u64` native type.
74    pub fn cast<T: Copy + 'static>(&self) -> SystemInfo<T>
75    where
76        U: num_traits::AsPrimitive<T>,
77    {
78        SystemInfo {
79            heap_size: self.heap_size.as_(),
80            used_size: self.used_size.as_(),
81            cpu_time: self.cpu_time,
82            cpu_time_epoch: self.cpu_time_epoch,
83            timestamp: self.timestamp.map(|x| x.as_()),
84            client_heap: self.client_heap.map(|x| x.as_()),
85            client_used: self.client_used.map(|x| x.as_()),
86        }
87    }
88}
89
90/// Metadata about what features are supported by the `Server` to which this
91/// [`Client`] connects.
92#[derive(Clone, Debug, Default, PartialEq)]
93pub struct Features(Arc<GetFeaturesResp>);
94
95impl Features {
96    pub fn get_group_rollup_modes(&self) -> Vec<crate::config::GroupRollupMode> {
97        self.group_rollup_mode
98            .iter()
99            .map(|x| {
100                crate::config::GroupRollupMode::from(
101                    crate::proto::GroupRollupMode::try_from(*x).unwrap(),
102                )
103            })
104            .collect::<Vec<_>>()
105    }
106}
107
108impl Deref for Features {
109    type Target = GetFeaturesResp;
110
111    fn deref(&self) -> &Self::Target {
112        &self.0
113    }
114}
115
116impl GetFeaturesResp {
117    pub fn default_op(&self, col_type: ColumnType) -> Option<&str> {
118        self.filter_ops
119            .get(&(col_type as u32))?
120            .options
121            .first()
122            .map(|x| x.as_str())
123    }
124}
125
126type BoxFn<I, O> = Box<dyn Fn(I) -> O + Send + Sync + 'static>;
127type Box2Fn<I, J, O> = Box<dyn Fn(I, J) -> O + Send + Sync + 'static>;
128
129type Subscriptions<C> = Arc<RwLock<HashMap<u32, C>>>;
130type OnErrorCallback =
131    Box2Fn<ClientError, Option<ReconnectCallback>, BoxFuture<'static, Result<(), ClientError>>>;
132
133type OnceCallback = Box<dyn FnOnce(Response) -> ClientResult<()> + Send + Sync + 'static>;
134type SendCallback = Arc<
135    dyn for<'a> Fn(&'a Request) -> BoxFuture<'a, Result<(), Box<dyn Error + Send + Sync>>>
136        + Send
137        + Sync
138        + 'static,
139>;
140
141/// The client-side representation of a connection to a `Server`.
142pub trait ClientHandler: Clone + Send + Sync + 'static {
143    fn send_request(
144        &self,
145        msg: Vec<u8>,
146    ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync>>> + Send;
147}
148
149mod name_registry {
150    use std::collections::HashSet;
151    use std::sync::{Arc, LazyLock, Mutex};
152
153    use crate::ClientError;
154    use crate::view::ClientResult;
155
156    static CLIENT_ID_GEN: LazyLock<Arc<Mutex<u32>>> = LazyLock::new(Arc::default);
157    static REGISTERED_CLIENTS: LazyLock<Arc<Mutex<HashSet<String>>>> = LazyLock::new(Arc::default);
158
159    pub(crate) fn generate_name(name: Option<&str>) -> ClientResult<String> {
160        if let Some(name) = name {
161            if let Some(name) = REGISTERED_CLIENTS
162                .lock()
163                .map_err(ClientError::from)?
164                .get(name)
165            {
166                Err(ClientError::DuplicateNameError(name.to_owned()))
167            } else {
168                Ok(name.to_owned())
169            }
170        } else {
171            let mut guard = CLIENT_ID_GEN.lock()?;
172            *guard += 1;
173            Ok(format!("client-{guard}"))
174        }
175    }
176}
177
178/// The type of the `reconnect` parameter passed to [`Client::handle_error`},
179/// and to the callback closure of [`Client::on_error`].
180///
181/// Calling this function from a [`Client::on_error`] closure should run the
182/// (implementation specific) client reconnect logic, e.g. rebindign a
183/// websocket.
184#[derive(Clone)]
185#[allow(clippy::type_complexity)]
186pub struct ReconnectCallback(
187    Arc<dyn Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync>,
188);
189
190impl Deref for ReconnectCallback {
191    type Target = dyn Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync;
192
193    fn deref(&self) -> &Self::Target {
194        &*self.0
195    }
196}
197
198impl ReconnectCallback {
199    pub fn new(
200        f: impl Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync + 'static,
201    ) -> Self {
202        ReconnectCallback(Arc::new(f))
203    }
204}
205
206/// An instance of a [`Client`] is a connection to a single
207/// `perspective_server::Server`, whether locally in-memory or remote over some
208/// transport like a WebSocket.
209///
210/// # Examples
211///
212/// Create a `perspective_server::Server` and a synchronous [`Client`] via the
213/// `perspective` crate:
214///
215/// ```rust
216/// use perspective::LocalClient;
217/// use perspective::server::Server;
218///
219/// let server = Server::default();
220/// let client = perspective::LocalClient::new(&server);
221/// ```
222#[derive(Clone)]
223pub struct Client {
224    name: Arc<String>,
225    features: Arc<Mutex<Option<Features>>>,
226    send: SendCallback,
227    id_gen: IDGen,
228    subscriptions_errors: Subscriptions<OnErrorCallback>,
229    subscriptions_once: Subscriptions<OnceCallback>,
230    subscriptions: Subscriptions<BoxFn<Response, BoxFuture<'static, Result<(), ClientError>>>>,
231}
232
233impl PartialEq for Client {
234    fn eq(&self, other: &Self) -> bool {
235        self.name == other.name
236    }
237}
238
239impl std::fmt::Debug for Client {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        f.debug_struct("Client").finish()
242    }
243}
244
245impl Client {
246    /// Create a new client instance with a closure that handles message
247    /// dispatch. See [`Client::new`] for details.
248    pub fn new_with_callback<T, U>(name: Option<&str>, send_request: T) -> ClientResult<Self>
249    where
250        T: Fn(Vec<u8>) -> U + 'static + Sync + Send,
251        U: Future<Output = Result<(), Box<dyn Error + Send + Sync>>> + Send + 'static,
252    {
253        let name = name_registry::generate_name(name)?;
254        let send_request = Arc::new(send_request);
255        let send: SendCallback = Arc::new(move |req| {
256            let mut bytes: Vec<u8> = Vec::new();
257            req.encode(&mut bytes).unwrap();
258            let send_request = send_request.clone();
259            Box::pin(async move { send_request(bytes).await })
260        });
261
262        Ok(Client {
263            name: Arc::new(name),
264            features: Arc::default(),
265            id_gen: IDGen::default(),
266            send,
267            subscriptions: Subscriptions::default(),
268            subscriptions_errors: Arc::default(),
269            subscriptions_once: Arc::default(),
270        })
271    }
272
273    /// Create a new [`Client`] instance with [`ClientHandler`].
274    pub fn new<T>(name: Option<&str>, client_handler: T) -> ClientResult<Self>
275    where
276        T: ClientHandler + 'static + Sync + Send,
277    {
278        Self::new_with_callback(
279            name,
280            asyncfn!(client_handler, async move |req| {
281                client_handler.send_request(req).await
282            }),
283        )
284    }
285
286    pub fn get_name(&self) -> &'_ str {
287        self.name.as_str()
288    }
289
290    /// Handle a message from the external message queue.
291    /// [`Client::handle_response`] is part of the low-level message-handling
292    /// API necessary to implement new transports for a [`Client`]
293    /// connection to a local-or-remote `perspective_server::Server`, and
294    /// doesn't generally need to be called directly by "users" of a
295    /// [`Client`] once connected.
296    pub async fn handle_response<'a>(&'a self, msg: &'a [u8]) -> ClientResult<bool> {
297        let msg = Response::decode(msg)?;
298        tracing::debug!("RECV {}", msg);
299        let mut wr = self.subscriptions_once.write().await;
300        if let Some(handler) = (*wr).remove(&msg.msg_id) {
301            drop(wr);
302            handler(msg)?;
303            return Ok(true);
304        } else if let Some(handler) = self.subscriptions.try_read().unwrap().get(&msg.msg_id) {
305            drop(wr);
306            handler(msg).await?;
307            return Ok(true);
308        }
309
310        if let Response {
311            client_resp: Some(ClientResp::ServerError(ServerError { message, .. })),
312            ..
313        } = &msg
314        {
315            tracing::error!("{}", message);
316        } else {
317            tracing::debug!("Received unsolicited server response: {}", msg);
318        }
319
320        Ok(false)
321    }
322
323    /// Handle an exception from the underlying transport.
324    pub async fn handle_error<T, U>(
325        &self,
326        message: ClientError,
327        reconnect: Option<T>,
328    ) -> ClientResult<()>
329    where
330        T: Fn() -> U + Clone + Send + Sync + 'static,
331        U: Future<Output = ClientResult<()>>,
332    {
333        let subs = self.subscriptions_errors.read().await;
334        let tasks = join_all(subs.values().map(|callback| {
335            callback(
336                message.clone(),
337                reconnect.clone().map(move |f| {
338                    ReconnectCallback(Arc::new(move || {
339                        clone!(f);
340                        Box::pin(async move { Ok(f().await?) }) as LocalBoxFuture<'static, _>
341                    }))
342                }),
343            )
344        }));
345
346        tasks.await.into_iter().collect::<Result<(), _>>()?;
347        self.close_and_error_subscriptions(&message).await
348    }
349
350    /// TODO Synthesize an error to provide to the caller, since the
351    /// server did not respond and the other option is to just drop the call
352    /// which results in a non-descript error message. It would be nice to
353    /// have client-side failures be a native part of the Client API.
354    async fn close_and_error_subscriptions(&self, message: &ClientError) -> ClientResult<()> {
355        let synthetic_error = |msg_id| Response {
356            msg_id,
357            entity_id: "".to_string(),
358            client_resp: Some(ClientResp::ServerError(ServerError {
359                message: format!("{message}"),
360                status_code: 2,
361            })),
362        };
363
364        self.subscriptions.write().await.clear();
365        let callbacks_once = self
366            .subscriptions_once
367            .write()
368            .await
369            .drain()
370            .collect::<Vec<_>>();
371
372        callbacks_once
373            .into_iter()
374            .try_for_each(|(msg_id, f)| f(synthetic_error(msg_id)))
375    }
376
377    pub async fn on_error<T, U, V>(&self, on_error: T) -> ClientResult<u32>
378    where
379        T: Fn(ClientError, Option<ReconnectCallback>) -> U + Clone + Send + Sync + 'static,
380        U: Future<Output = V> + Send + 'static,
381        V: Into<Result<(), ClientError>> + Sync + 'static,
382    {
383        let id = self.gen_id();
384        let callback = asyncfn!(on_error, async move |x, y| on_error(x, y).await.into());
385        self.subscriptions_errors
386            .write()
387            .await
388            .insert(id, Box::new(move |x, y| Box::pin(callback(x, y))));
389
390        Ok(id)
391    }
392
393    /// Generate a message ID unique to this client.
394    pub(crate) fn gen_id(&self) -> u32 {
395        self.id_gen.next()
396    }
397
398    pub(crate) async fn unsubscribe(&self, update_id: u32) -> ClientResult<()> {
399        let callback = self
400            .subscriptions
401            .write()
402            .await
403            .remove(&update_id)
404            .ok_or(ClientError::Unknown("remove_update".to_string()))?;
405
406        drop(callback);
407        Ok(())
408    }
409
410    /// Register a callback which is expected to respond exactly once.
411    pub(crate) async fn subscribe_once(
412        &self,
413        msg: &Request,
414        on_update: Box<dyn FnOnce(Response) -> ClientResult<()> + Send + Sync + 'static>,
415    ) -> ClientResult<()> {
416        self.subscriptions_once
417            .write()
418            .await
419            .insert(msg.msg_id, on_update);
420
421        tracing::debug!("SEND {}", msg);
422        if let Err(e) = (self.send)(msg).await {
423            self.subscriptions_once.write().await.remove(&msg.msg_id);
424            Err(ClientError::Unknown(e.to_string()))
425        } else {
426            Ok(())
427        }
428    }
429
430    pub(crate) async fn subscribe<T, U>(&self, msg: &Request, on_update: T) -> ClientResult<()>
431    where
432        T: Fn(Response) -> U + Send + Sync + 'static,
433        U: Future<Output = Result<(), ClientError>> + Send + 'static,
434    {
435        self.subscriptions
436            .write()
437            .await
438            .insert(msg.msg_id, Box::new(move |x| Box::pin(on_update(x))));
439
440        tracing::debug!("SEND {}", msg);
441        if let Err(e) = (self.send)(msg).await {
442            self.subscriptions.write().await.remove(&msg.msg_id);
443            Err(ClientError::Unknown(e.to_string()))
444        } else {
445            Ok(())
446        }
447    }
448
449    /// Send a `ClientReq` and await both the successful completion of the
450    /// `send`, _and_ the `ClientResp` which is returned.
451    pub(crate) async fn oneshot(&self, req: &Request) -> ClientResult<ClientResp> {
452        let (sender, receiver) = futures::channel::oneshot::channel::<ClientResp>();
453        let on_update = Box::new(move |res: Response| {
454            sender.send(res.client_resp.unwrap()).map_err(|x| x.into())
455        });
456
457        self.subscribe_once(req, on_update).await?;
458        receiver
459            .await
460            .map_err(|_| ClientError::Unknown(format!("Internal error for req {req}")))
461    }
462
463    pub(crate) async fn get_features(&self) -> ClientResult<Features> {
464        let mut guard = self.features.lock().await;
465        let features = if let Some(features) = &*guard {
466            features.clone()
467        } else {
468            let msg = Request {
469                msg_id: self.gen_id(),
470                entity_id: "".to_owned(),
471                client_req: Some(ClientReq::GetFeaturesReq(GetFeaturesReq {})),
472            };
473
474            let features = Features(Arc::new(match self.oneshot(&msg).await? {
475                ClientResp::GetFeaturesResp(features) => Ok(features),
476                resp => Err(resp),
477            }?));
478
479            *guard = Some(features.clone());
480            features
481        };
482
483        Ok(features)
484    }
485
486    /// Creates a new [`Table`] from either a _schema_ or _data_.
487    ///
488    /// The [`Client::table`] factory function can be initialized with either a
489    /// _schema_ (see [`Table::schema`]), or data in one of these formats:
490    ///
491    /// - Apache Arrow
492    /// - CSV
493    /// - JSON row-oriented
494    /// - JSON column-oriented
495    /// - NDJSON
496    ///
497    /// When instantiated with _data_, the schema is inferred from this data.
498    /// While this is convenient, inferrence is sometimes imperfect e.g.
499    /// when the input is empty, null or ambiguous. For these cases,
500    /// [`Client::table`] can first be instantiated with a explicit schema.
501    ///
502    /// When instantiated with a _schema_, the resulting [`Table`] is empty but
503    /// with known column names and column types. When subsqeuently
504    /// populated with [`Table::update`], these columns will be _coerced_ to
505    /// the schema's type. This behavior can be useful when
506    /// [`Client::table`]'s column type inferences doesn't work.
507    ///
508    /// The resulting [`Table`] is _virtual_, and invoking its methods
509    /// dispatches events to the `perspective_server::Server` this
510    /// [`Client`] connects to, where the data is stored and all calculation
511    /// occurs.
512    ///
513    /// # Arguments
514    ///
515    /// - `arg` - Either _schema_ or initialization _data_.
516    /// - `options` - Optional configuration which provides one of:
517    ///     - `limit` - The max number of rows the resulting [`Table`] can
518    ///       store.
519    ///     - `index` - The column name to use as an _index_ column. If this
520    ///       `Table` is being instantiated by _data_, this column name must be
521    ///       present in the data.
522    ///     - `name` - The name of the table. This will be generated if it is
523    ///       not provided.
524    ///     - `format` - The explicit format of the input data, can be one of
525    ///       `"json"`, `"columns"`, `"csv"` or `"arrow"`. This overrides
526    ///       language-specific type dispatch behavior, which allows stringified
527    ///       and byte array alternative inputs.
528    ///
529    /// # Examples
530    ///
531    /// Load a CSV from a `String`:
532    ///
533    /// ```rust
534    /// let opts = TableInitOptions::default();
535    /// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
536    /// let table = client.table(data, opts).await?;
537    /// ```
538    pub async fn table(&self, input: TableData, options: TableInitOptions) -> ClientResult<Table> {
539        let entity_id = match options.name.clone() {
540            Some(x) => x.to_owned(),
541            None => randid(),
542        };
543
544        if let TableData::View(view) = &input {
545            let window = ViewWindow::default();
546            let arrow = view.to_arrow(window).await?;
547            let mut table = self
548                .crate_table_inner(UpdateData::Arrow(arrow).into(), options.into(), entity_id)
549                .await?;
550
551            let table_ = table.clone();
552            let callback = asyncfn!(table_, update, async move |update: OnUpdateData| {
553                let update = UpdateData::Arrow(update.delta.expect("Malformed message").into());
554                let options = crate::UpdateOptions::default();
555                table_.update(update, options).await.unwrap_or_log();
556            });
557
558            let options = OnUpdateOptions {
559                mode: Some(OnUpdateMode::Row),
560            };
561
562            let on_update_token = view.on_update(callback, options).await?;
563            table.view_update_token = Some(on_update_token);
564            Ok(table)
565        } else {
566            self.crate_table_inner(input, options.into(), entity_id)
567                .await
568        }
569    }
570
571    async fn crate_table_inner(
572        &self,
573        input: TableData,
574        options: TableOptions,
575        entity_id: String,
576    ) -> ClientResult<Table> {
577        let msg = Request {
578            msg_id: self.gen_id(),
579            entity_id: entity_id.clone(),
580            client_req: Some(ClientReq::MakeTableReq(MakeTableReq {
581                data: Some(input.into()),
582                options: Some(options.clone().try_into()?),
583            })),
584        };
585
586        let client = self.clone();
587        match self.oneshot(&msg).await? {
588            ClientResp::MakeTableResp(_) => Ok(Table::new(entity_id, client, options)),
589            resp => Err(resp.into()),
590        }
591    }
592
593    /// Create a new read-only [`Table`] by performing a JOIN on two source
594    /// tables. The resulting table is reactive: when either source table is
595    /// updated, the join is automatically recomputed.
596    ///
597    /// # Arguments
598    ///
599    /// * `left` - The left source table (as a [`Table`] or name string).
600    /// * `right` - The right source table (as a [`Table`] or name string).
601    /// * `on` - The column name to join on. Must exist in both tables with the
602    ///   same type.
603    /// * `options` - Join configuration (join type, table name).
604    pub async fn join(
605        &self,
606        left: TableRef,
607        right: TableRef,
608        on: &str,
609        options: JoinOptions,
610    ) -> ClientResult<Table> {
611        let entity_id = options.name.unwrap_or_else(randid);
612        let join_type: JoinType = options.join_type.unwrap_or_default();
613        let right_on_column = options.right_on.unwrap_or_default();
614        let msg = Request {
615            msg_id: self.gen_id(),
616            entity_id: entity_id.clone(),
617            client_req: Some(ClientReq::MakeJoinTableReq(MakeJoinTableReq {
618                left_table_id: left.table_name().to_owned(),
619                right_table_id: right.table_name().to_owned(),
620                on_column: on.to_owned(),
621                join_type: join_type.into(),
622                right_on_column,
623            })),
624        };
625
626        let client = self.clone();
627        match self.oneshot(&msg).await? {
628            ClientResp::MakeJoinTableResp(_) => Ok(Table::new(entity_id, client, TableOptions {
629                index: Some(on.to_owned()),
630                limit: None,
631            })),
632            resp => Err(resp.into()),
633        }
634    }
635
636    async fn get_table_infos(&self) -> ClientResult<Vec<HostedTable>> {
637        let msg = Request {
638            msg_id: self.gen_id(),
639            entity_id: "".to_owned(),
640            client_req: Some(ClientReq::GetHostedTablesReq(GetHostedTablesReq {
641                subscribe: false,
642            })),
643        };
644
645        match self.oneshot(&msg).await? {
646            ClientResp::GetHostedTablesResp(GetHostedTablesResp { table_infos }) => Ok(table_infos),
647            resp => Err(resp.into()),
648        }
649    }
650
651    /// Opens a [`Table`] that is hosted on the `perspective_server::Server`
652    /// that is connected to this [`Client`].
653    ///
654    /// The `name` property of [`TableInitOptions`] is used to identify each
655    /// [`Table`]. [`Table`] `name`s can be looked up for each [`Client`]
656    /// via [`Client::get_hosted_table_names`].
657    ///
658    /// # Examples
659    ///
660    /// ```rust
661    /// let tables = client.open_table("table_one").await;
662    /// ```  
663    pub async fn open_table(&self, entity_id: String) -> ClientResult<Table> {
664        let infos = self.get_table_infos().await?;
665
666        // TODO fix this - name is repeated 2x
667        if let Some(info) = infos.into_iter().find(|i| i.entity_id == entity_id) {
668            let options = TableOptions {
669                index: info.index,
670                limit: info.limit,
671            };
672
673            let client = self.clone();
674            Ok(Table::new(entity_id, client, options))
675        } else {
676            Err(ClientError::Unknown(format!(
677                "Unknown table \"{}\"",
678                entity_id
679            )))
680        }
681    }
682
683    /// Retrieves the names of all tables that this client has access to.
684    ///
685    /// `name` is a string identifier unique to the [`Table`] (per [`Client`]),
686    /// which can be used in conjunction with [`Client::open_table`] to get
687    /// a [`Table`] instance without the use of [`Client::table`]
688    /// constructor directly (e.g., one created by another [`Client`]).
689    ///
690    /// # Examples
691    ///
692    /// ```rust
693    /// let tables = client.get_hosted_table_names().await;
694    /// ```
695    pub async fn get_hosted_table_names(&self) -> ClientResult<Vec<String>> {
696        let msg = Request {
697            msg_id: self.gen_id(),
698            entity_id: "".to_owned(),
699            client_req: Some(ClientReq::GetHostedTablesReq(GetHostedTablesReq {
700                subscribe: false,
701            })),
702        };
703
704        match self.oneshot(&msg).await? {
705            ClientResp::GetHostedTablesResp(GetHostedTablesResp { table_infos }) => {
706                Ok(table_infos.into_iter().map(|i| i.entity_id).collect())
707            },
708            resp => Err(resp.into()),
709        }
710    }
711
712    /// Register a callback which is invoked whenever [`Client::table`] (on this
713    /// [`Client`]) or [`Table::delete`] (on a [`Table`] belinging to this
714    /// [`Client`]) are called.
715    pub async fn on_hosted_tables_update<T, U>(&self, on_update: T) -> ClientResult<u32>
716    where
717        T: Fn() -> U + Send + Sync + 'static,
718        U: Future<Output = ()> + Send + 'static,
719    {
720        let on_update = Arc::new(on_update);
721        let callback = asyncfn!(on_update, async move |resp: Response| {
722            match resp.client_resp {
723                Some(ClientResp::GetHostedTablesResp(_)) | None => {
724                    on_update().await;
725                    Ok(())
726                },
727                resp => Err(resp.into()),
728            }
729        });
730
731        let msg = Request {
732            msg_id: self.gen_id(),
733            entity_id: "".to_owned(),
734            client_req: Some(ClientReq::GetHostedTablesReq(GetHostedTablesReq {
735                subscribe: true,
736            })),
737        };
738
739        self.subscribe(&msg, callback).await?;
740        Ok(msg.msg_id)
741    }
742
743    /// Remove a callback previously registered via
744    /// `Client::on_hosted_tables_update`.
745    pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ClientResult<()> {
746        let msg = Request {
747            msg_id: self.gen_id(),
748            entity_id: "".to_owned(),
749            client_req: Some(ClientReq::RemoveHostedTablesUpdateReq(
750                RemoveHostedTablesUpdateReq { id: update_id },
751            )),
752        };
753
754        self.unsubscribe(update_id).await?;
755        match self.oneshot(&msg).await? {
756            ClientResp::RemoveHostedTablesUpdateResp(_) => Ok(()),
757            resp => Err(resp.into()),
758        }
759    }
760
761    /// Provides the [`SystemInfo`] struct, implementation-specific metadata
762    /// about the [`perspective_server::Server`] runtime such as Memory and
763    /// CPU usage.
764    pub async fn system_info(&self) -> ClientResult<SystemInfo> {
765        let msg = Request {
766            msg_id: self.gen_id(),
767            entity_id: "".to_string(),
768            client_req: Some(ClientReq::ServerSystemInfoReq(ServerSystemInfoReq {})),
769        };
770
771        match self.oneshot(&msg).await? {
772            ClientResp::ServerSystemInfoResp(resp) => {
773                #[cfg(not(target_family = "wasm"))]
774                let timestamp = Some(
775                    std::time::SystemTime::now()
776                        .duration_since(std::time::UNIX_EPOCH)?
777                        .as_millis() as u64,
778                );
779
780                #[cfg(target_family = "wasm")]
781                let timestamp = None;
782
783                #[cfg(feature = "talc-allocator")]
784                let (client_used, client_heap) = {
785                    let (client_used, client_heap) = crate::utils::get_used();
786                    (Some(client_used as u64), Some(client_heap as u64))
787                };
788
789                #[cfg(not(feature = "talc-allocator"))]
790                let (client_used, client_heap) = (None, None);
791
792                let info = SystemInfo {
793                    heap_size: resp.heap_size,
794                    used_size: resp.used_size,
795                    cpu_time: resp.cpu_time,
796                    cpu_time_epoch: resp.cpu_time_epoch,
797                    timestamp,
798                    client_heap,
799                    client_used,
800                };
801
802                Ok(info)
803            },
804            resp => Err(resp.into()),
805        }
806    }
807}