Skip to main content

perspective_client/
client.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::error::Error;
15use std::ops::Deref;
16use std::sync::Arc;
17
18use async_lock::{Mutex, RwLock};
19use futures::Future;
20use futures::future::{BoxFuture, LocalBoxFuture, join_all};
21use prost::Message;
22use serde::{Deserialize, Serialize};
23use ts_rs::TS;
24
25use crate::proto::request::ClientReq;
26use crate::proto::response::ClientResp;
27use crate::proto::{
28    ColumnType, GetFeaturesReq, GetFeaturesResp, GetHostedTablesReq, GetHostedTablesResp,
29    HostedTable, MakeTableReq, RemoveHostedTablesUpdateReq, Request, Response, ServerError,
30    ServerSystemInfoReq,
31};
32use crate::table::{Table, TableInitOptions, TableOptions};
33use crate::table_data::{TableData, UpdateData};
34use crate::utils::*;
35use crate::view::{OnUpdateData, ViewWindow};
36use crate::{OnUpdateMode, OnUpdateOptions, asyncfn, clone};
37
38/// Metadata about the engine runtime (such as total heap utilization).
39#[derive(Clone, Debug, Serialize, Deserialize, TS)]
40pub struct SystemInfo<T = u64> {
41    /// Total available bytes for allocation on the [`Server`].
42    pub heap_size: T,
43
44    /// Bytes allocated for use on the [`Server`].
45    pub used_size: T,
46
47    /// Wall-clock time spent processing requests on the [`Server`], in
48    /// milliseconds (estimated). This does not properly account for the
49    /// internal thread pool (which enables column-parallel processing of
50    /// individual requests).
51    pub cpu_time: u32,
52
53    /// Milliseconds since internal CPU time accumulator was reset.
54    pub cpu_time_epoch: u32,
55
56    /// Timestamp (POSIX) this request was made. This field may be omitted
57    /// for wasm due to `perspective-client` lacking a dependency on
58    /// `wasm_bindgen`.
59    pub timestamp: Option<T>,
60
61    /// Total available bytes for allocation on the [`Client`]. This is only
62    /// available if `trace-allocator` is enabled.
63    pub client_heap: Option<T>,
64
65    /// Bytes allocated for use on the [`Client`].  This is only
66    /// available if `trace-allocator` is enabled.
67    pub client_used: Option<T>,
68}
69
70impl<U: Copy + 'static> SystemInfo<U> {
71    /// Convert the numeric representation for `T` to something else, which is
72    /// useful for JavaScript where there is no `u64` native type.
73    pub fn cast<T: Copy + 'static>(&self) -> SystemInfo<T>
74    where
75        U: num_traits::AsPrimitive<T>,
76    {
77        SystemInfo {
78            heap_size: self.heap_size.as_(),
79            used_size: self.used_size.as_(),
80            cpu_time: self.cpu_time,
81            cpu_time_epoch: self.cpu_time_epoch,
82            timestamp: self.timestamp.map(|x| x.as_()),
83            client_heap: self.client_heap.map(|x| x.as_()),
84            client_used: self.client_used.map(|x| x.as_()),
85        }
86    }
87}
88
89/// Metadata about what features are supported by the `Server` to which this
90/// [`Client`] connects.
91#[derive(Clone, Debug, Default)]
92pub struct Features(Arc<GetFeaturesResp>);
93
94impl Features {
95    pub fn get_group_rollup_modes(&self) -> Vec<crate::config::GroupRollupMode> {
96        self.group_rollup_mode
97            .iter()
98            .map(|x| {
99                crate::config::GroupRollupMode::from(
100                    crate::proto::GroupRollupMode::try_from(*x).unwrap(),
101                )
102            })
103            .collect::<Vec<_>>()
104    }
105}
106
107impl Deref for Features {
108    type Target = GetFeaturesResp;
109
110    fn deref(&self) -> &Self::Target {
111        &self.0
112    }
113}
114
115impl GetFeaturesResp {
116    pub fn default_op(&self, col_type: ColumnType) -> Option<&str> {
117        self.filter_ops
118            .get(&(col_type as u32))?
119            .options
120            .first()
121            .map(|x| x.as_str())
122    }
123}
124
125type BoxFn<I, O> = Box<dyn Fn(I) -> O + Send + Sync + 'static>;
126type Box2Fn<I, J, O> = Box<dyn Fn(I, J) -> O + Send + Sync + 'static>;
127
128type Subscriptions<C> = Arc<RwLock<HashMap<u32, C>>>;
129type OnErrorCallback =
130    Box2Fn<ClientError, Option<ReconnectCallback>, BoxFuture<'static, Result<(), ClientError>>>;
131
132type OnceCallback = Box<dyn FnOnce(Response) -> ClientResult<()> + Send + Sync + 'static>;
133type SendCallback = Arc<
134    dyn for<'a> Fn(&'a Request) -> BoxFuture<'a, Result<(), Box<dyn Error + Send + Sync>>>
135        + Send
136        + Sync
137        + 'static,
138>;
139
140/// The client-side representation of a connection to a `Server`.
141pub trait ClientHandler: Clone + Send + Sync + 'static {
142    fn send_request(
143        &self,
144        msg: Vec<u8>,
145    ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync>>> + Send;
146}
147
148mod name_registry {
149    use std::collections::HashSet;
150    use std::sync::{Arc, LazyLock, Mutex};
151
152    use crate::ClientError;
153    use crate::view::ClientResult;
154
155    static CLIENT_ID_GEN: LazyLock<Arc<Mutex<u32>>> = LazyLock::new(Arc::default);
156    static REGISTERED_CLIENTS: LazyLock<Arc<Mutex<HashSet<String>>>> = LazyLock::new(Arc::default);
157
158    pub(crate) fn generate_name(name: Option<&str>) -> ClientResult<String> {
159        if let Some(name) = name {
160            if let Some(name) = REGISTERED_CLIENTS
161                .lock()
162                .map_err(ClientError::from)?
163                .get(name)
164            {
165                Err(ClientError::DuplicateNameError(name.to_owned()))
166            } else {
167                Ok(name.to_owned())
168            }
169        } else {
170            let mut guard = CLIENT_ID_GEN.lock()?;
171            *guard += 1;
172            Ok(format!("client-{guard}"))
173        }
174    }
175}
176
177/// The type of the `reconnect` parameter passed to [`Client::handle_error`},
178/// and to the callback closure of [`Client::on_error`].
179///
180/// Calling this function from a [`Client::on_error`] closure should run the
181/// (implementation specific) client reconnect logic, e.g. rebindign a
182/// websocket.
183#[derive(Clone)]
184#[allow(clippy::type_complexity)]
185pub struct ReconnectCallback(
186    Arc<dyn Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync>,
187);
188
189impl Deref for ReconnectCallback {
190    type Target = dyn Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync;
191
192    fn deref(&self) -> &Self::Target {
193        &*self.0
194    }
195}
196
197impl ReconnectCallback {
198    pub fn new(
199        f: impl Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync + 'static,
200    ) -> Self {
201        ReconnectCallback(Arc::new(f))
202    }
203}
204
205/// An instance of a [`Client`] is a connection to a single
206/// `perspective_server::Server`, whether locally in-memory or remote over some
207/// transport like a WebSocket.
208///
209/// # Examples
210///
211/// Create a `perspective_server::Server` and a synchronous [`Client`] via the
212/// `perspective` crate:
213///
214/// ```rust
215/// use perspective::LocalClient;
216/// use perspective::server::Server;
217///
218/// let server = Server::default();
219/// let client = perspective::LocalClient::new(&server);
220/// ```
221#[derive(Clone)]
222pub struct Client {
223    name: Arc<String>,
224    features: Arc<Mutex<Option<Features>>>,
225    send: SendCallback,
226    id_gen: IDGen,
227    subscriptions_errors: Subscriptions<OnErrorCallback>,
228    subscriptions_once: Subscriptions<OnceCallback>,
229    subscriptions: Subscriptions<BoxFn<Response, BoxFuture<'static, Result<(), ClientError>>>>,
230}
231
232impl PartialEq for Client {
233    fn eq(&self, other: &Self) -> bool {
234        self.name == other.name
235    }
236}
237
238impl std::fmt::Debug for Client {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        f.debug_struct("Client").finish()
241    }
242}
243
244impl Client {
245    /// Create a new client instance with a closure that handles message
246    /// dispatch. See [`Client::new`] for details.
247    pub fn new_with_callback<T, U>(name: Option<&str>, send_request: T) -> ClientResult<Self>
248    where
249        T: Fn(Vec<u8>) -> U + 'static + Sync + Send,
250        U: Future<Output = Result<(), Box<dyn Error + Send + Sync>>> + Send + 'static,
251    {
252        let name = name_registry::generate_name(name)?;
253        let send_request = Arc::new(send_request);
254        let send: SendCallback = Arc::new(move |req| {
255            let mut bytes: Vec<u8> = Vec::new();
256            req.encode(&mut bytes).unwrap();
257            let send_request = send_request.clone();
258            Box::pin(async move { send_request(bytes).await })
259        });
260
261        Ok(Client {
262            name: Arc::new(name),
263            features: Arc::default(),
264            id_gen: IDGen::default(),
265            send,
266            subscriptions: Subscriptions::default(),
267            subscriptions_errors: Arc::default(),
268            subscriptions_once: Arc::default(),
269        })
270    }
271
272    /// Create a new [`Client`] instance with [`ClientHandler`].
273    pub fn new<T>(name: Option<&str>, client_handler: T) -> ClientResult<Self>
274    where
275        T: ClientHandler + 'static + Sync + Send,
276    {
277        Self::new_with_callback(
278            name,
279            asyncfn!(client_handler, async move |req| {
280                client_handler.send_request(req).await
281            }),
282        )
283    }
284
285    pub fn get_name(&self) -> &'_ str {
286        self.name.as_str()
287    }
288
289    /// Handle a message from the external message queue.
290    /// [`Client::handle_response`] is part of the low-level message-handling
291    /// API necessary to implement new transports for a [`Client`]
292    /// connection to a local-or-remote `perspective_server::Server`, and
293    /// doesn't generally need to be called directly by "users" of a
294    /// [`Client`] once connected.
295    pub async fn handle_response<'a>(&'a self, msg: &'a [u8]) -> ClientResult<bool> {
296        let msg = Response::decode(msg)?;
297        tracing::debug!("RECV {}", msg);
298        let mut wr = self.subscriptions_once.write().await;
299        if let Some(handler) = (*wr).remove(&msg.msg_id) {
300            drop(wr);
301            handler(msg)?;
302            return Ok(true);
303        } else if let Some(handler) = self.subscriptions.try_read().unwrap().get(&msg.msg_id) {
304            drop(wr);
305            handler(msg).await?;
306            return Ok(true);
307        }
308
309        if let Response {
310            client_resp: Some(ClientResp::ServerError(ServerError { message, .. })),
311            ..
312        } = &msg
313        {
314            tracing::error!("{}", message);
315        } else {
316            tracing::debug!("Received unsolicited server response: {}", msg);
317        }
318
319        Ok(false)
320    }
321
322    /// Handle an exception from the underlying transport.
323    pub async fn handle_error<T, U>(
324        &self,
325        message: ClientError,
326        reconnect: Option<T>,
327    ) -> ClientResult<()>
328    where
329        T: Fn() -> U + Clone + Send + Sync + 'static,
330        U: Future<Output = ClientResult<()>>,
331    {
332        let subs = self.subscriptions_errors.read().await;
333        let tasks = join_all(subs.values().map(|callback| {
334            callback(
335                message.clone(),
336                reconnect.clone().map(move |f| {
337                    ReconnectCallback(Arc::new(move || {
338                        clone!(f);
339                        Box::pin(async move { Ok(f().await?) }) as LocalBoxFuture<'static, _>
340                    }))
341                }),
342            )
343        }));
344
345        tasks.await.into_iter().collect::<Result<(), _>>()?;
346        self.close_and_error_subscriptions(&message).await
347    }
348
349    /// TODO Synthesize an error to provide to the caller, since the
350    /// server did not respond and the other option is to just drop the call
351    /// which results in a non-descript error message. It would be nice to
352    /// have client-side failures be a native part of the Client API.
353    async fn close_and_error_subscriptions(&self, message: &ClientError) -> ClientResult<()> {
354        let synthetic_error = |msg_id| Response {
355            msg_id,
356            entity_id: "".to_string(),
357            client_resp: Some(ClientResp::ServerError(ServerError {
358                message: format!("{message}"),
359                status_code: 2,
360            })),
361        };
362
363        self.subscriptions.write().await.clear();
364        let callbacks_once = self
365            .subscriptions_once
366            .write()
367            .await
368            .drain()
369            .collect::<Vec<_>>();
370
371        callbacks_once
372            .into_iter()
373            .try_for_each(|(msg_id, f)| f(synthetic_error(msg_id)))
374    }
375
376    pub async fn on_error<T, U, V>(&self, on_error: T) -> ClientResult<u32>
377    where
378        T: Fn(ClientError, Option<ReconnectCallback>) -> U + Clone + Send + Sync + 'static,
379        U: Future<Output = V> + Send + 'static,
380        V: Into<Result<(), ClientError>> + Sync + 'static,
381    {
382        let id = self.gen_id();
383        let callback = asyncfn!(on_error, async move |x, y| on_error(x, y).await.into());
384        self.subscriptions_errors
385            .write()
386            .await
387            .insert(id, Box::new(move |x, y| Box::pin(callback(x, y))));
388
389        Ok(id)
390    }
391
392    /// Generate a message ID unique to this client.
393    pub(crate) fn gen_id(&self) -> u32 {
394        self.id_gen.next()
395    }
396
397    pub(crate) async fn unsubscribe(&self, update_id: u32) -> ClientResult<()> {
398        let callback = self
399            .subscriptions
400            .write()
401            .await
402            .remove(&update_id)
403            .ok_or(ClientError::Unknown("remove_update".to_string()))?;
404
405        drop(callback);
406        Ok(())
407    }
408
409    /// Register a callback which is expected to respond exactly once.
410    pub(crate) async fn subscribe_once(
411        &self,
412        msg: &Request,
413        on_update: Box<dyn FnOnce(Response) -> ClientResult<()> + Send + Sync + 'static>,
414    ) -> ClientResult<()> {
415        self.subscriptions_once
416            .write()
417            .await
418            .insert(msg.msg_id, on_update);
419
420        tracing::debug!("SEND {}", msg);
421        if let Err(e) = (self.send)(msg).await {
422            self.subscriptions_once.write().await.remove(&msg.msg_id);
423            Err(ClientError::Unknown(e.to_string()))
424        } else {
425            Ok(())
426        }
427    }
428
429    pub(crate) async fn subscribe<T, U>(&self, msg: &Request, on_update: T) -> ClientResult<()>
430    where
431        T: Fn(Response) -> U + Send + Sync + 'static,
432        U: Future<Output = Result<(), ClientError>> + Send + 'static,
433    {
434        self.subscriptions
435            .write()
436            .await
437            .insert(msg.msg_id, Box::new(move |x| Box::pin(on_update(x))));
438
439        tracing::debug!("SEND {}", msg);
440        if let Err(e) = (self.send)(msg).await {
441            self.subscriptions.write().await.remove(&msg.msg_id);
442            Err(ClientError::Unknown(e.to_string()))
443        } else {
444            Ok(())
445        }
446    }
447
448    /// Send a `ClientReq` and await both the successful completion of the
449    /// `send`, _and_ the `ClientResp` which is returned.
450    pub(crate) async fn oneshot(&self, req: &Request) -> ClientResult<ClientResp> {
451        let (sender, receiver) = futures::channel::oneshot::channel::<ClientResp>();
452        let on_update = Box::new(move |res: Response| {
453            sender.send(res.client_resp.unwrap()).map_err(|x| x.into())
454        });
455
456        self.subscribe_once(req, on_update).await?;
457        receiver
458            .await
459            .map_err(|_| ClientError::Unknown(format!("Internal error for req {req}")))
460    }
461
462    pub(crate) async fn get_features(&self) -> ClientResult<Features> {
463        let mut guard = self.features.lock().await;
464        let features = if let Some(features) = &*guard {
465            features.clone()
466        } else {
467            let msg = Request {
468                msg_id: self.gen_id(),
469                entity_id: "".to_owned(),
470                client_req: Some(ClientReq::GetFeaturesReq(GetFeaturesReq {})),
471            };
472
473            let features = Features(Arc::new(match self.oneshot(&msg).await? {
474                ClientResp::GetFeaturesResp(features) => Ok(features),
475                resp => Err(resp),
476            }?));
477
478            *guard = Some(features.clone());
479            features
480        };
481
482        Ok(features)
483    }
484
485    /// Creates a new [`Table`] from either a _schema_ or _data_.
486    ///
487    /// The [`Client::table`] factory function can be initialized with either a
488    /// _schema_ (see [`Table::schema`]), or data in one of these formats:
489    ///
490    /// - Apache Arrow
491    /// - CSV
492    /// - JSON row-oriented
493    /// - JSON column-oriented
494    /// - NDJSON
495    ///
496    /// When instantiated with _data_, the schema is inferred from this data.
497    /// While this is convenient, inferrence is sometimes imperfect e.g.
498    /// when the input is empty, null or ambiguous. For these cases,
499    /// [`Client::table`] can first be instantiated with a explicit schema.
500    ///
501    /// When instantiated with a _schema_, the resulting [`Table`] is empty but
502    /// with known column names and column types. When subsqeuently
503    /// populated with [`Table::update`], these columns will be _coerced_ to
504    /// the schema's type. This behavior can be useful when
505    /// [`Client::table`]'s column type inferences doesn't work.
506    ///
507    /// The resulting [`Table`] is _virtual_, and invoking its methods
508    /// dispatches events to the `perspective_server::Server` this
509    /// [`Client`] connects to, where the data is stored and all calculation
510    /// occurs.
511    ///
512    /// # Arguments
513    ///
514    /// - `arg` - Either _schema_ or initialization _data_.
515    /// - `options` - Optional configuration which provides one of:
516    ///     - `limit` - The max number of rows the resulting [`Table`] can
517    ///       store.
518    ///     - `index` - The column name to use as an _index_ column. If this
519    ///       `Table` is being instantiated by _data_, this column name must be
520    ///       present in the data.
521    ///     - `name` - The name of the table. This will be generated if it is
522    ///       not provided.
523    ///     - `format` - The explicit format of the input data, can be one of
524    ///       `"json"`, `"columns"`, `"csv"` or `"arrow"`. This overrides
525    ///       language-specific type dispatch behavior, which allows stringified
526    ///       and byte array alternative inputs.
527    ///
528    /// # Examples
529    ///
530    /// Load a CSV from a `String`:
531    ///
532    /// ```rust
533    /// let opts = TableInitOptions::default();
534    /// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
535    /// let table = client.table(data, opts).await?;
536    /// ```
537    pub async fn table(&self, input: TableData, options: TableInitOptions) -> ClientResult<Table> {
538        let entity_id = match options.name.clone() {
539            Some(x) => x.to_owned(),
540            None => randid(),
541        };
542
543        if let TableData::View(view) = &input {
544            let window = ViewWindow::default();
545            let arrow = view.to_arrow(window).await?;
546            let mut table = self
547                .crate_table_inner(UpdateData::Arrow(arrow).into(), options.into(), entity_id)
548                .await?;
549
550            let table_ = table.clone();
551            let callback = asyncfn!(table_, update, async move |update: OnUpdateData| {
552                let update = UpdateData::Arrow(update.delta.expect("Malformed message").into());
553                let options = crate::UpdateOptions::default();
554                table_.update(update, options).await.unwrap_or_log();
555            });
556
557            let options = OnUpdateOptions {
558                mode: Some(OnUpdateMode::Row),
559            };
560
561            let on_update_token = view.on_update(callback, options).await?;
562            table.view_update_token = Some(on_update_token);
563            Ok(table)
564        } else {
565            self.crate_table_inner(input, options.into(), entity_id)
566                .await
567        }
568    }
569
570    async fn crate_table_inner(
571        &self,
572        input: TableData,
573        options: TableOptions,
574        entity_id: String,
575    ) -> ClientResult<Table> {
576        let msg = Request {
577            msg_id: self.gen_id(),
578            entity_id: entity_id.clone(),
579            client_req: Some(ClientReq::MakeTableReq(MakeTableReq {
580                data: Some(input.into()),
581                options: Some(options.clone().try_into()?),
582            })),
583        };
584
585        let client = self.clone();
586        match self.oneshot(&msg).await? {
587            ClientResp::MakeTableResp(_) => Ok(Table::new(entity_id, client, options)),
588            resp => Err(resp.into()),
589        }
590    }
591
592    async fn get_table_infos(&self) -> ClientResult<Vec<HostedTable>> {
593        let msg = Request {
594            msg_id: self.gen_id(),
595            entity_id: "".to_owned(),
596            client_req: Some(ClientReq::GetHostedTablesReq(GetHostedTablesReq {
597                subscribe: false,
598            })),
599        };
600
601        match self.oneshot(&msg).await? {
602            ClientResp::GetHostedTablesResp(GetHostedTablesResp { table_infos }) => Ok(table_infos),
603            resp => Err(resp.into()),
604        }
605    }
606
607    /// Opens a [`Table`] that is hosted on the `perspective_server::Server`
608    /// that is connected to this [`Client`].
609    ///
610    /// The `name` property of [`TableInitOptions`] is used to identify each
611    /// [`Table`]. [`Table`] `name`s can be looked up for each [`Client`]
612    /// via [`Client::get_hosted_table_names`].
613    ///
614    /// # Examples
615    ///
616    /// ```rust
617    /// let tables = client.open_table("table_one").await;
618    /// ```  
619    pub async fn open_table(&self, entity_id: String) -> ClientResult<Table> {
620        let infos = self.get_table_infos().await?;
621
622        // TODO fix this - name is repeated 2x
623        if let Some(info) = infos.into_iter().find(|i| i.entity_id == entity_id) {
624            let options = TableOptions {
625                index: info.index,
626                limit: info.limit,
627            };
628
629            let client = self.clone();
630            Ok(Table::new(entity_id, client, options))
631        } else {
632            Err(ClientError::Unknown(format!(
633                "Unknown table \"{}\"",
634                entity_id
635            )))
636        }
637    }
638
639    /// Retrieves the names of all tables that this client has access to.
640    ///
641    /// `name` is a string identifier unique to the [`Table`] (per [`Client`]),
642    /// which can be used in conjunction with [`Client::open_table`] to get
643    /// a [`Table`] instance without the use of [`Client::table`]
644    /// constructor directly (e.g., one created by another [`Client`]).
645    ///
646    /// # Examples
647    ///
648    /// ```rust
649    /// let tables = client.get_hosted_table_names().await;
650    /// ```
651    pub async fn get_hosted_table_names(&self) -> ClientResult<Vec<String>> {
652        let msg = Request {
653            msg_id: self.gen_id(),
654            entity_id: "".to_owned(),
655            client_req: Some(ClientReq::GetHostedTablesReq(GetHostedTablesReq {
656                subscribe: false,
657            })),
658        };
659
660        match self.oneshot(&msg).await? {
661            ClientResp::GetHostedTablesResp(GetHostedTablesResp { table_infos }) => {
662                Ok(table_infos.into_iter().map(|i| i.entity_id).collect())
663            },
664            resp => Err(resp.into()),
665        }
666    }
667
668    /// Register a callback which is invoked whenever [`Client::table`] (on this
669    /// [`Client`]) or [`Table::delete`] (on a [`Table`] belinging to this
670    /// [`Client`]) are called.
671    pub async fn on_hosted_tables_update<T, U>(&self, on_update: T) -> ClientResult<u32>
672    where
673        T: Fn() -> U + Send + Sync + 'static,
674        U: Future<Output = ()> + Send + 'static,
675    {
676        let on_update = Arc::new(on_update);
677        let callback = asyncfn!(on_update, async move |resp: Response| {
678            match resp.client_resp {
679                Some(ClientResp::GetHostedTablesResp(_)) | None => {
680                    on_update().await;
681                    Ok(())
682                },
683                resp => Err(resp.into()),
684            }
685        });
686
687        let msg = Request {
688            msg_id: self.gen_id(),
689            entity_id: "".to_owned(),
690            client_req: Some(ClientReq::GetHostedTablesReq(GetHostedTablesReq {
691                subscribe: true,
692            })),
693        };
694
695        self.subscribe(&msg, callback).await?;
696        Ok(msg.msg_id)
697    }
698
699    /// Remove a callback previously registered via
700    /// `Client::on_hosted_tables_update`.
701    pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ClientResult<()> {
702        let msg = Request {
703            msg_id: self.gen_id(),
704            entity_id: "".to_owned(),
705            client_req: Some(ClientReq::RemoveHostedTablesUpdateReq(
706                RemoveHostedTablesUpdateReq { id: update_id },
707            )),
708        };
709
710        self.unsubscribe(update_id).await?;
711        match self.oneshot(&msg).await? {
712            ClientResp::RemoveHostedTablesUpdateResp(_) => Ok(()),
713            resp => Err(resp.into()),
714        }
715    }
716
717    /// Provides the [`SystemInfo`] struct, implementation-specific metadata
718    /// about the [`perspective_server::Server`] runtime such as Memory and
719    /// CPU usage.
720    pub async fn system_info(&self) -> ClientResult<SystemInfo> {
721        let msg = Request {
722            msg_id: self.gen_id(),
723            entity_id: "".to_string(),
724            client_req: Some(ClientReq::ServerSystemInfoReq(ServerSystemInfoReq {})),
725        };
726
727        match self.oneshot(&msg).await? {
728            ClientResp::ServerSystemInfoResp(resp) => {
729                #[cfg(not(target_family = "wasm"))]
730                let timestamp = Some(
731                    std::time::SystemTime::now()
732                        .duration_since(std::time::UNIX_EPOCH)?
733                        .as_millis() as u64,
734                );
735
736                #[cfg(target_family = "wasm")]
737                let timestamp = None;
738
739                #[cfg(feature = "talc-allocator")]
740                let (client_used, client_heap) = {
741                    let (client_used, client_heap) = crate::utils::get_used();
742                    (Some(client_used as u64), Some(client_heap as u64))
743                };
744
745                #[cfg(not(feature = "talc-allocator"))]
746                let (client_used, client_heap) = (None, None);
747
748                let info = SystemInfo {
749                    heap_size: resp.heap_size,
750                    used_size: resp.used_size,
751                    cpu_time: resp.cpu_time,
752                    cpu_time_epoch: resp.cpu_time_epoch,
753                    timestamp,
754                    client_heap,
755                    client_used,
756                };
757
758                Ok(info)
759            },
760            resp => Err(resp.into()),
761        }
762    }
763}