perspective_client/
table.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::fmt::Display;
15
16use serde::{Deserialize, Serialize};
17use ts_rs::TS;
18
19use crate::assert_table_api;
20use crate::client::{Client, Features};
21use crate::config::{Expressions, ViewConfigUpdate};
22use crate::proto::make_table_req::MakeTableOptions;
23use crate::proto::make_table_req::make_table_options::MakeTableType;
24use crate::proto::request::ClientReq;
25use crate::proto::response::ClientResp;
26use crate::proto::*;
27use crate::table_data::UpdateData;
28use crate::utils::*;
29use crate::view::View;
30
31pub type Schema = HashMap<String, ColumnType>;
32
33#[derive(Clone, Copy, Debug, Serialize, Deserialize, TS)]
34pub enum TableReadFormat {
35    #[serde(rename = "csv")]
36    Csv,
37
38    #[serde(rename = "json")]
39    JsonString,
40
41    #[serde(rename = "columns")]
42    ColumnsString,
43
44    #[serde(rename = "arrow")]
45    Arrow,
46
47    #[serde(rename = "ndjson")]
48    Ndjson,
49}
50
51impl TableReadFormat {
52    pub fn parse(value: Option<String>) -> Result<Option<Self>, String> {
53        Ok(match value.as_deref() {
54            Some("csv") => Some(TableReadFormat::Csv),
55            Some("json") => Some(TableReadFormat::JsonString),
56            Some("columns") => Some(TableReadFormat::ColumnsString),
57            Some("arrow") => Some(TableReadFormat::Arrow),
58            Some("ndjson") => Some(TableReadFormat::Ndjson),
59            None => None,
60            Some(x) => return Err(format!("Unknown format \"{x}\"")),
61        })
62    }
63}
64
65/// Options which impact the behavior of [`Client::table`], as well as
66/// subsequent calls to [`Table::update`].
67#[derive(Clone, Debug, Default, Serialize, Deserialize, TS)]
68pub struct TableInitOptions {
69    #[serde(default)]
70    #[ts(optional)]
71    pub name: Option<String>,
72
73    #[serde(default)]
74    #[ts(optional)]
75    pub format: Option<TableReadFormat>,
76
77    /// This [`Table`] should use the column named by the `index` parameter as
78    /// the `index`, which causes [`Table::update`] and [`Client::table`] input
79    /// to either insert or update existing rows based on `index` column
80    /// value equality.
81    #[serde(default)]
82    #[ts(optional)]
83    pub index: Option<String>,
84
85    /// This [`Table`] should be limited to `limit` rows, after which the
86    /// _earliest_ rows will be overwritten (where _earliest_ is defined as
87    /// relative to insertion order).
88    #[serde(default)]
89    #[ts(optional)]
90    pub limit: Option<u32>,
91}
92
93impl TableInitOptions {
94    pub fn set_name<D: Display>(&mut self, name: D) {
95        self.name = Some(format!("{name}"))
96    }
97}
98
99impl TryFrom<TableOptions> for MakeTableOptions {
100    type Error = ClientError;
101
102    fn try_from(value: TableOptions) -> Result<Self, Self::Error> {
103        Ok(MakeTableOptions {
104            make_table_type: match value {
105                TableOptions {
106                    index: Some(_),
107                    limit: Some(_),
108                } => Err(ClientError::BadTableOptions)?,
109                TableOptions {
110                    index: Some(index), ..
111                } => Some(MakeTableType::MakeIndexTable(index)),
112                TableOptions {
113                    limit: Some(limit), ..
114                } => Some(MakeTableType::MakeLimitTable(limit)),
115                _ => None,
116            },
117        })
118    }
119}
120
121#[derive(Clone, Debug)]
122pub(crate) struct TableOptions {
123    pub index: Option<String>,
124    pub limit: Option<u32>,
125}
126
127impl From<TableInitOptions> for TableOptions {
128    fn from(value: TableInitOptions) -> Self {
129        TableOptions {
130            index: value.index,
131            limit: value.limit,
132        }
133    }
134}
135
136#[derive(Clone, Debug, Default, Deserialize, TS)]
137pub struct DeleteOptions {
138    pub lazy: bool,
139}
140
141#[derive(Clone, Debug, Default, Deserialize, Serialize, TS)]
142pub struct UpdateOptions {
143    pub port_id: Option<u32>,
144    pub format: Option<TableReadFormat>,
145}
146
147#[derive(Clone, Debug, Serialize, Deserialize)]
148pub struct ValidateExpressionsData {
149    pub expression_schema: HashMap<String, ColumnType>,
150    pub errors: HashMap<String, table_validate_expr_resp::ExprValidationError>,
151    pub expression_alias: HashMap<String, String>,
152}
153
154#[derive(Clone)]
155pub struct Table {
156    name: String,
157    client: Client,
158    options: TableOptions,
159
160    /// If this table is constructed from a View, the view's on_update callback
161    /// is wired into this table. So, we store the token to clean it up properly
162    /// on destruction.
163    pub(crate) view_update_token: Option<u32>,
164}
165
166assert_table_api!(Table);
167
168impl PartialEq for Table {
169    fn eq(&self, other: &Self) -> bool {
170        self.name == other.name && self.client == other.client
171    }
172}
173
174impl Table {
175    pub(crate) fn new(name: String, client: Client, options: TableOptions) -> Self {
176        Table {
177            name,
178            client,
179            options,
180            view_update_token: None,
181        }
182    }
183
184    fn client_message(&self, req: ClientReq) -> Request {
185        Request {
186            msg_id: self.client.gen_id(),
187            entity_id: self.name.clone(),
188            client_req: Some(req),
189        }
190    }
191
192    /// Get a copy of the [`Client`] this [`Table`] came from.
193    pub fn get_client(&self) -> Client {
194        self.client.clone()
195    }
196
197    /// Get a metadata dictionary of the `perspective_server::Server`'s
198    /// features, which is (currently) implementation specific, but there is
199    /// only one implementation.
200    pub fn get_features(&self) -> ClientResult<Features> {
201        self.client.get_features()
202    }
203
204    /// Returns the name of the index column for the table.
205    ///
206    /// # Examples
207    ///
208    /// ```rust
209    /// let options = TableInitOptions {
210    ///     index: Some("x".to_string()),
211    ///     ..default()
212    /// };
213    /// let table = client.table("x,y\n1,2\n3,4", options).await;
214    /// let index = table.get_index()
215    /// ```
216    pub fn get_index(&self) -> Option<String> {
217        self.options.index.as_ref().map(|index| index.to_owned())
218    }
219
220    /// Returns the user-specified row limit for this table.
221    pub fn get_limit(&self) -> Option<u32> {
222        self.options.limit.as_ref().map(|limit| *limit)
223    }
224
225    /// Returns the user-specified name for this table, or the auto-generated
226    /// name if a name was not specified when the table was created.
227    pub fn get_name(&self) -> &str {
228        self.name.as_str()
229    }
230
231    /// Removes all the rows in the [`Table`], but preserves everything else
232    /// including the schema, index, and any callbacks or registered
233    /// [`View`] instances.
234    ///
235    /// Calling [`Table::clear`], like [`Table::update`] and [`Table::remove`],
236    /// will trigger an update event to any registered listeners via
237    /// [`View::on_update`].
238    pub async fn clear(&self) -> ClientResult<()> {
239        self.replace(UpdateData::JsonRows("[]".to_owned())).await
240    }
241
242    /// Delete this [`Table`] and cleans up associated resources.
243    ///
244    /// [`Table`]s do not stop consuming resources or processing updates when
245    /// they are garbage collected in their host language - you must call
246    /// this method to reclaim these.
247    ///
248    /// # Arguments
249    ///
250    /// - `options` An options dictionary.
251    ///     - `lazy` Whether to delete this [`Table`] _lazily_. When false (the
252    ///       default), the delete will occur immediately, assuming it has no
253    ///       [`View`] instances registered to it (which must be deleted first,
254    ///       otherwise this method will throw an error). When true, the
255    ///       [`Table`] will only be marked for deltion once its [`View`]
256    ///       dependency count reaches 0.
257    ///
258    /// # Examples
259    ///
260    /// ```rust
261    /// let opts = TableInitOptions::default();
262    /// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
263    /// let table = client.table(data, opts).await?;
264    ///
265    /// // ...
266    ///
267    /// table.delete(DeleteOptions::default()).await?;
268    /// ```
269    pub async fn delete(&self, options: DeleteOptions) -> ClientResult<()> {
270        let msg = self.client_message(ClientReq::TableDeleteReq(TableDeleteReq {
271            is_immediate: !options.lazy,
272        }));
273
274        match self.client.oneshot(&msg).await? {
275            ClientResp::TableDeleteResp(_) => Ok(()),
276            resp => Err(resp.into()),
277        }
278    }
279
280    /// Returns the column names of this [`Table`] in "natural" order (the
281    /// ordering implied by the input format).
282    ///  
283    /// # Examples
284    ///
285    /// ```rust
286    /// let columns = table.columns().await;
287    /// ```
288    pub async fn columns(&self) -> ClientResult<Vec<String>> {
289        let msg = self.client_message(ClientReq::TableSchemaReq(TableSchemaReq {}));
290        match self.client.oneshot(&msg).await? {
291            ClientResp::TableSchemaResp(TableSchemaResp { schema }) => Ok(schema
292                .map(|x| x.schema.into_iter().map(|x| x.name.to_owned()).collect())
293                .unwrap()),
294            resp => Err(resp.into()),
295        }
296    }
297
298    /// Returns the number of rows in a [`Table`].
299    pub async fn size(&self) -> ClientResult<usize> {
300        let msg = self.client_message(ClientReq::TableSizeReq(TableSizeReq {}));
301        match self.client.oneshot(&msg).await? {
302            ClientResp::TableSizeResp(TableSizeResp { size }) => Ok(size as usize),
303            resp => Err(resp.into()),
304        }
305    }
306
307    /// Returns a table's [`Schema`], a mapping of column names to column types.
308    ///
309    /// The mapping of a [`Table`]'s column names to data types is referred to
310    /// as a [`Schema`]. Each column has a unique name and a data type, one
311    /// of:
312    ///
313    /// - `"boolean"` - A boolean type
314    /// - `"date"` - A timesonze-agnostic date type (month/day/year)
315    /// - `"datetime"` - A millisecond-precision datetime type in the UTC
316    ///   timezone
317    /// - `"float"` - A 64 bit float
318    /// - `"integer"` - A signed 32 bit integer (the integer type supported by
319    ///   JavaScript)
320    /// - `"string"` - A [`String`] data type (encoded internally as a
321    ///   _dictionary_)
322    ///
323    /// Note that all [`Table`] columns are _nullable_, regardless of the data
324    /// type.
325    pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
326        let msg = self.client_message(ClientReq::TableSchemaReq(TableSchemaReq {}));
327        match self.client.oneshot(&msg).await? {
328            ClientResp::TableSchemaResp(TableSchemaResp { schema }) => Ok(schema
329                .map(|x| {
330                    x.schema
331                        .into_iter()
332                        .map(|x| (x.name, ColumnType::try_from(x.r#type).unwrap()))
333                        .collect()
334                })
335                .unwrap()),
336            resp => Err(resp.into()),
337        }
338    }
339
340    /// Create a unique channel ID on this [`Table`], which allows
341    /// `View::on_update` callback calls to be associated with the
342    /// `Table::update` which caused them.
343    pub async fn make_port(&self) -> ClientResult<i32> {
344        let msg = self.client_message(ClientReq::TableMakePortReq(TableMakePortReq {}));
345        match self.client.oneshot(&msg).await? {
346            ClientResp::TableMakePortResp(TableMakePortResp { port_id }) => Ok(port_id as i32),
347            _ => Err(ClientError::Unknown("make_port".to_string())),
348        }
349    }
350
351    /// Register a callback which is called exactly once, when this [`Table`] is
352    /// deleted with the [`Table::delete`] method.
353    ///
354    /// [`Table::on_delete`] resolves when the subscription message is sent, not
355    /// when the _delete_ event occurs.
356    pub async fn on_delete(
357        &self,
358        on_delete: Box<dyn Fn() + Send + Sync + 'static>,
359    ) -> ClientResult<u32> {
360        let callback = move |resp: Response| match resp.client_resp {
361            Some(ClientResp::TableOnDeleteResp(_)) => {
362                on_delete();
363                Ok(())
364            },
365            resp => Err(resp.into()),
366        };
367
368        let msg = self.client_message(ClientReq::TableOnDeleteReq(TableOnDeleteReq {}));
369        self.client.subscribe_once(&msg, Box::new(callback)).await?;
370        Ok(msg.msg_id)
371    }
372
373    /// Removes a listener with a given ID, as returned by a previous call to
374    /// [`Table::on_delete`].
375    pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
376        let msg = self.client_message(ClientReq::TableRemoveDeleteReq(TableRemoveDeleteReq {
377            id: callback_id,
378        }));
379
380        match self.client.oneshot(&msg).await? {
381            ClientResp::TableRemoveDeleteResp(_) => Ok(()),
382            resp => Err(resp.into()),
383        }
384    }
385
386    /// Removes rows from this [`Table`] with the `index` column values
387    /// supplied.
388    ///
389    /// # Arguments
390    ///
391    /// - `indices` - A list of `index` column values for rows that should be
392    ///   removed.
393    ///
394    /// # Examples
395    ///
396    /// ```rust
397    /// table.remove(UpdateData::Csv("index\n1\n2\n3")).await?;
398    /// ```
399    pub async fn remove(&self, input: UpdateData) -> ClientResult<()> {
400        let msg = self.client_message(ClientReq::TableRemoveReq(TableRemoveReq {
401            data: Some(input.into()),
402        }));
403
404        match self.client.oneshot(&msg).await? {
405            ClientResp::TableRemoveResp(_) => Ok(()),
406            resp => Err(resp.into()),
407        }
408    }
409
410    /// Replace all rows in this [`Table`] with the input data, coerced to this
411    /// [`Table`]'s existing [`Schema`], notifying any derived [`View`] and
412    /// [`View::on_update`] callbacks.
413    ///
414    /// Calling [`Table::replace`] is an easy way to replace _all_ the data in a
415    /// [`Table`] without losing any derived [`View`] instances or
416    /// [`View::on_update`] callbacks. [`Table::replace`] does _not_ infer
417    /// data types like [`Client::table`] does, rather it _coerces_ input
418    /// data to the `Schema` like [`Table::update`]. If you need a [`Table`]
419    /// with a different `Schema`, you must create a new one.
420    ///
421    /// # Examples
422    ///
423    /// ```rust
424    /// let data = UpdateData::Csv("x,y\n1,2".into());
425    /// let opts = UpdateOptions::default();
426    /// table.replace(data, opts).await?;
427    /// ```
428    pub async fn replace(&self, input: UpdateData) -> ClientResult<()> {
429        let msg = self.client_message(ClientReq::TableReplaceReq(TableReplaceReq {
430            data: Some(input.into()),
431        }));
432
433        match self.client.oneshot(&msg).await? {
434            ClientResp::TableReplaceResp(_) => Ok(()),
435            resp => Err(resp.into()),
436        }
437    }
438
439    /// Updates the rows of this table and any derived [`View`] instances.
440    ///
441    /// Calling [`Table::update`] will trigger the [`View::on_update`] callbacks
442    /// register to derived [`View`], and the call itself will not resolve until
443    /// _all_ derived [`View`]'s are notified.
444    ///
445    /// When updating a [`Table`] with an `index`, [`Table::update`] supports
446    /// partial updates, by omitting columns from the update data.
447    ///
448    /// # Arguments
449    ///
450    /// - `input` - The input data for this [`Table`]. The schema of a [`Table`]
451    ///   is immutable after creation, so this method cannot be called with a
452    ///   schema.
453    /// - `options` - Options for this update step - see [`UpdateOptions`].
454    ///
455    /// # Examples
456    ///
457    /// ```rust
458    /// let data = UpdateData::Csv("x,y\n1,2".into());
459    /// let opts = UpdateOptions::default();
460    /// table.update(data, opts).await?;
461    /// ```  
462    pub async fn update(&self, input: UpdateData, options: UpdateOptions) -> ClientResult<()> {
463        let msg = self.client_message(ClientReq::TableUpdateReq(TableUpdateReq {
464            data: Some(input.into()),
465            port_id: options.port_id.unwrap_or(0),
466        }));
467
468        match self.client.oneshot(&msg).await? {
469            ClientResp::TableUpdateResp(_) => Ok(()),
470            resp => Err(resp.into()),
471        }
472    }
473
474    /// Validates the given expressions.
475    pub async fn validate_expressions(
476        &self,
477        expressions: Expressions,
478    ) -> ClientResult<ValidateExpressionsData> {
479        let msg = self.client_message(ClientReq::TableValidateExprReq(TableValidateExprReq {
480            column_to_expr: expressions.0,
481        }));
482
483        match self.client.oneshot(&msg).await? {
484            ClientResp::TableValidateExprResp(result) => Ok(ValidateExpressionsData {
485                errors: result.errors,
486                expression_alias: result.expression_alias,
487                expression_schema: result
488                    .expression_schema
489                    .into_iter()
490                    .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
491                    .collect(),
492            }),
493            resp => Err(resp.into()),
494        }
495    }
496
497    /// Create a new [`View`] from this table with a specified
498    /// [`ViewConfigUpdate`].
499    ///
500    /// See [`View`] struct.
501    ///
502    /// # Examples
503    ///
504    /// ```rust
505    /// use crate::config::*;
506    /// let view = table
507    ///     .view(Some(ViewConfigUpdate {
508    ///         columns: Some(vec![Some("Sales".into())]),
509    ///         aggregates: Some(HashMap::from_iter(vec![("Sales".into(), "sum".into())])),
510    ///         group_by: Some(vec!["Region".into(), "Country".into()]),
511    ///         filter: Some(vec![Filter::new("Category", "in", &[
512    ///             "Furniture",
513    ///             "Technology",
514    ///         ])]),
515    ///         ..ViewConfigUpdate::default()
516    ///     }))
517    ///     .await?;
518    /// ```
519    pub async fn view(&self, config: Option<ViewConfigUpdate>) -> ClientResult<View> {
520        let view_name = randid();
521        let msg = Request {
522            msg_id: self.client.gen_id(),
523            entity_id: self.name.clone(),
524            client_req: ClientReq::TableMakeViewReq(TableMakeViewReq {
525                view_id: view_name.clone(),
526                config: config.map(|x| x.into()),
527            })
528            .into(),
529        };
530
531        match self.client.oneshot(&msg).await? {
532            ClientResp::TableMakeViewResp(TableMakeViewResp { view_id })
533                if view_id == view_name =>
534            {
535                Ok(View::new(view_name, self.client.clone()))
536            },
537            resp => Err(resp.into()),
538        }
539    }
540}