perspective_client/
table.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::fmt::Display;
15
16use nanoid::*;
17use serde::{Deserialize, Serialize};
18use ts_rs::TS;
19
20use crate::assert_table_api;
21use crate::client::{Client, Features};
22use crate::config::{Expressions, ViewConfigUpdate};
23use crate::proto::make_table_req::MakeTableOptions;
24use crate::proto::make_table_req::make_table_options::MakeTableType;
25use crate::proto::request::ClientReq;
26use crate::proto::response::ClientResp;
27use crate::proto::*;
28use crate::table_data::UpdateData;
29use crate::utils::*;
30use crate::view::View;
31
32pub type Schema = HashMap<String, ColumnType>;
33
34#[derive(Clone, Copy, Debug, Serialize, Deserialize, TS)]
35pub enum TableReadFormat {
36    #[serde(rename = "csv")]
37    Csv,
38
39    #[serde(rename = "json")]
40    JsonString,
41
42    #[serde(rename = "columns")]
43    ColumnsString,
44
45    #[serde(rename = "arrow")]
46    Arrow,
47
48    #[serde(rename = "ndjson")]
49    Ndjson,
50}
51
52impl TableReadFormat {
53    pub fn parse(value: Option<String>) -> Result<Option<Self>, String> {
54        Ok(match value.as_deref() {
55            Some("csv") => Some(TableReadFormat::Csv),
56            Some("json") => Some(TableReadFormat::JsonString),
57            Some("columns") => Some(TableReadFormat::ColumnsString),
58            Some("arrow") => Some(TableReadFormat::Arrow),
59            Some("ndjson") => Some(TableReadFormat::Ndjson),
60            None => None,
61            Some(x) => return Err(format!("Unknown format \"{}\"", x)),
62        })
63    }
64}
65
66/// Options which impact the behavior of [`Client::table`], as well as
67/// subsequent calls to [`Table::update`].
68#[derive(Clone, Debug, Default, Serialize, Deserialize, TS)]
69pub struct TableInitOptions {
70    #[serde(default)]
71    #[ts(optional)]
72    pub name: Option<String>,
73
74    #[serde(default)]
75    #[ts(optional)]
76    pub format: Option<TableReadFormat>,
77
78    /// This [`Table`] should use the column named by the `index` parameter as
79    /// the `index`, which causes [`Table::update`] and [`Client::table`] input
80    /// to either insert or update existing rows based on `index` column
81    /// value equality.
82    #[serde(default)]
83    #[ts(optional)]
84    pub index: Option<String>,
85
86    /// This [`Table`] should be limited to `limit` rows, after which the
87    /// _earliest_ rows will be overwritten (where _earliest_ is defined as
88    /// relative to insertion order).
89    #[serde(default)]
90    #[ts(optional)]
91    pub limit: Option<u32>,
92}
93
94impl TableInitOptions {
95    pub fn set_name<D: Display>(&mut self, name: D) {
96        self.name = Some(format!("{}", name))
97    }
98}
99
100impl TryFrom<TableOptions> for MakeTableOptions {
101    type Error = ClientError;
102
103    fn try_from(value: TableOptions) -> Result<Self, Self::Error> {
104        Ok(MakeTableOptions {
105            make_table_type: match value {
106                TableOptions {
107                    index: Some(_),
108                    limit: Some(_),
109                } => Err(ClientError::BadTableOptions)?,
110                TableOptions {
111                    index: Some(index), ..
112                } => Some(MakeTableType::MakeIndexTable(index)),
113                TableOptions {
114                    limit: Some(limit), ..
115                } => Some(MakeTableType::MakeLimitTable(limit)),
116                _ => None,
117            },
118        })
119    }
120}
121
122#[derive(Clone, Debug)]
123pub(crate) struct TableOptions {
124    pub index: Option<String>,
125    pub limit: Option<u32>,
126}
127
128impl From<TableInitOptions> for TableOptions {
129    fn from(value: TableInitOptions) -> Self {
130        TableOptions {
131            index: value.index,
132            limit: value.limit,
133        }
134    }
135}
136
137#[derive(Clone, Debug, Default, Deserialize, TS)]
138pub struct DeleteOptions {
139    pub lazy: bool,
140}
141
142#[derive(Clone, Debug, Default, Deserialize, Serialize, TS)]
143pub struct UpdateOptions {
144    pub port_id: Option<u32>,
145    pub format: Option<TableReadFormat>,
146}
147
148#[derive(Clone, Debug, Serialize, Deserialize)]
149pub struct ValidateExpressionsData {
150    pub expression_schema: HashMap<String, ColumnType>,
151    pub errors: HashMap<String, table_validate_expr_resp::ExprValidationError>,
152    pub expression_alias: HashMap<String, String>,
153}
154
155#[derive(Clone)]
156pub struct Table {
157    name: String,
158    client: Client,
159    options: TableOptions,
160
161    /// If this table is constructed from a View, the view's on_update callback
162    /// is wired into this table. So, we store the token to clean it up properly
163    /// on destruction.
164    pub(crate) view_update_token: Option<u32>,
165}
166
167assert_table_api!(Table);
168
169impl PartialEq for Table {
170    fn eq(&self, other: &Self) -> bool {
171        self.name == other.name && self.client == other.client
172    }
173}
174
175impl Table {
176    pub(crate) fn new(name: String, client: Client, options: TableOptions) -> Self {
177        Table {
178            name,
179            client,
180            options,
181            view_update_token: None,
182        }
183    }
184
185    fn client_message(&self, req: ClientReq) -> Request {
186        Request {
187            msg_id: self.client.gen_id(),
188            entity_id: self.name.clone(),
189            client_req: Some(req),
190        }
191    }
192
193    /// Get a copy of the [`Client`] this [`Table`] came from.
194    pub fn get_client(&self) -> Client {
195        self.client.clone()
196    }
197
198    /// Get a metadata dictionary of the `perspective_server::Server`'s
199    /// features, which is (currently) implementation specific, but there is
200    /// only one implementation.
201    pub fn get_features(&self) -> ClientResult<Features> {
202        self.client.get_features()
203    }
204
205    /// Returns the name of the index column for the table.
206    ///
207    /// # Examples
208    ///
209    /// ```rust
210    /// let options = TableInitOptions {
211    ///     index: Some("x".to_string()),
212    ///     ..default()
213    /// };
214    /// let table = client.table("x,y\n1,2\n3,4", options).await;
215    /// let tables = client.open_table("table_one").await;
216    /// ```
217    pub fn get_index(&self) -> Option<String> {
218        self.options.index.as_ref().map(|index| index.to_owned())
219    }
220
221    /// Returns the user-specified row limit for this table.
222    pub fn get_limit(&self) -> Option<u32> {
223        self.options.limit.as_ref().map(|limit| *limit)
224    }
225
226    /// Returns the user-specified name for this table, or the auto-generated
227    /// name if a name was not specified when the table was created.
228    pub fn get_name(&self) -> &str {
229        self.name.as_str()
230    }
231
232    /// Removes all the rows in the [`Table`], but preserves everything else
233    /// including the schema, index, and any callbacks or registered
234    /// [`View`] instances.
235    ///
236    /// Calling [`Table::clear`], like [`Table::update`] and [`Table::remove`],
237    /// will trigger an update event to any registered listeners via
238    /// [`View::on_update`].
239    pub async fn clear(&self) -> ClientResult<()> {
240        self.replace(UpdateData::JsonRows("[]".to_owned())).await
241    }
242
243    /// Delete this [`Table`] and cleans up associated resources.
244    ///
245    /// [`Table`]s do not stop consuming resources or processing updates when
246    /// they are garbage collected in their host language - you must call
247    /// this method to reclaim these.
248    ///
249    /// # Arguments
250    ///
251    /// - `options` An options dictionary.
252    ///     - `lazy` Whether to delete this [`Table`] _lazily_. When false (the
253    ///       default), the delete will occur immediately, assuming it has no
254    ///       [`View`] instances registered to it (which must be deleted first,
255    ///       otherwise this method will throw an error). When true, the
256    ///       [`Table`] will only be marked for deltion once its [`View`]
257    ///       dependency count reaches 0.
258    ///
259    /// # Examples
260    ///
261    /// ```rust
262    /// let opts = TableInitOptions::default();
263    /// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
264    /// let table = client.table(data, opts).await?;
265    ///
266    /// // ...
267    ///
268    /// table.delete(DeleteOptions::default()).await?;
269    /// ```
270    pub async fn delete(&self, options: DeleteOptions) -> ClientResult<()> {
271        let msg = self.client_message(ClientReq::TableDeleteReq(TableDeleteReq {
272            is_immediate: !options.lazy,
273        }));
274
275        match self.client.oneshot(&msg).await? {
276            ClientResp::TableDeleteResp(_) => Ok(()),
277            resp => Err(resp.into()),
278        }
279    }
280
281    /// Returns the column names of this [`Table`] in "natural" order (the
282    /// ordering implied by the input format).
283    ///  
284    /// # Examples
285    ///
286    /// ```rust
287    /// let columns = table.columns().await;
288    /// ```
289    pub async fn columns(&self) -> ClientResult<Vec<String>> {
290        let msg = self.client_message(ClientReq::TableSchemaReq(TableSchemaReq {}));
291        match self.client.oneshot(&msg).await? {
292            ClientResp::TableSchemaResp(TableSchemaResp { schema }) => Ok(schema
293                .map(|x| x.schema.into_iter().map(|x| x.name.to_owned()).collect())
294                .unwrap()),
295            resp => Err(resp.into()),
296        }
297    }
298
299    /// Returns the number of rows in a [`Table`].
300    pub async fn size(&self) -> ClientResult<usize> {
301        let msg = self.client_message(ClientReq::TableSizeReq(TableSizeReq {}));
302        match self.client.oneshot(&msg).await? {
303            ClientResp::TableSizeResp(TableSizeResp { size }) => Ok(size as usize),
304            resp => Err(resp.into()),
305        }
306    }
307
308    /// Returns a table's [`Schema`], a mapping of column names to column types.
309    ///
310    /// The mapping of a [`Table`]'s column names to data types is referred to
311    /// as a [`Schema`]. Each column has a unique name and a data type, one
312    /// of:
313    ///
314    /// - `"boolean"` - A boolean type
315    /// - `"date"` - A timesonze-agnostic date type (month/day/year)
316    /// - `"datetime"` - A millisecond-precision datetime type in the UTC
317    ///   timezone
318    /// - `"float"` - A 64 bit float
319    /// - `"integer"` - A signed 32 bit integer (the integer type supported by
320    ///   JavaScript)
321    /// - `"string"` - A [`String`] data type (encoded internally as a
322    ///   _dictionary_)
323    ///
324    /// Note that all [`Table`] columns are _nullable_, regardless of the data
325    /// type.
326    pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
327        let msg = self.client_message(ClientReq::TableSchemaReq(TableSchemaReq {}));
328        match self.client.oneshot(&msg).await? {
329            ClientResp::TableSchemaResp(TableSchemaResp { schema }) => Ok(schema
330                .map(|x| {
331                    x.schema
332                        .into_iter()
333                        .map(|x| (x.name, ColumnType::try_from(x.r#type).unwrap()))
334                        .collect()
335                })
336                .unwrap()),
337            resp => Err(resp.into()),
338        }
339    }
340
341    /// Create a unique channel ID on this [`Table`], which allows
342    /// `View::on_update` callback calls to be associated with the
343    /// `Table::update` which caused them.
344    pub async fn make_port(&self) -> ClientResult<i32> {
345        let msg = self.client_message(ClientReq::TableMakePortReq(TableMakePortReq {}));
346        match self.client.oneshot(&msg).await? {
347            ClientResp::TableMakePortResp(TableMakePortResp { port_id }) => Ok(port_id as i32),
348            _ => Err(ClientError::Unknown("make_port".to_string())),
349        }
350    }
351
352    /// Register a callback which is called exactly once, when this [`Table`] is
353    /// deleted with the [`Table::delete`] method.
354    ///
355    /// [`Table::on_delete`] resolves when the subscription message is sent, not
356    /// when the _delete_ event occurs.
357    pub async fn on_delete(
358        &self,
359        on_delete: Box<dyn Fn() + Send + Sync + 'static>,
360    ) -> ClientResult<u32> {
361        let callback = move |resp: Response| match resp.client_resp {
362            Some(ClientResp::TableOnDeleteResp(_)) => {
363                on_delete();
364                Ok(())
365            },
366            resp => Err(resp.into()),
367        };
368
369        let msg = self.client_message(ClientReq::TableOnDeleteReq(TableOnDeleteReq {}));
370        self.client.subscribe_once(&msg, Box::new(callback)).await?;
371        Ok(msg.msg_id)
372    }
373
374    /// Removes a listener with a given ID, as returned by a previous call to
375    /// [`Table::on_delete`].
376    pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
377        let msg = self.client_message(ClientReq::TableRemoveDeleteReq(TableRemoveDeleteReq {
378            id: callback_id,
379        }));
380
381        match self.client.oneshot(&msg).await? {
382            ClientResp::TableRemoveDeleteResp(_) => Ok(()),
383            resp => Err(resp.into()),
384        }
385    }
386
387    /// Removes rows from this [`Table`] with the `index` column values
388    /// supplied.
389    ///
390    /// # Arguments
391    ///
392    /// - `indices` - A list of `index` column values for rows that should be
393    ///   removed.
394    ///
395    /// # Examples
396    ///
397    /// ```rust
398    /// table.remove(UpdateData::Csv("index\n1\n2\n3")).await?;
399    /// ```
400    pub async fn remove(&self, input: UpdateData) -> ClientResult<()> {
401        let msg = self.client_message(ClientReq::TableRemoveReq(TableRemoveReq {
402            data: Some(input.into()),
403        }));
404
405        match self.client.oneshot(&msg).await? {
406            ClientResp::TableRemoveResp(_) => Ok(()),
407            resp => Err(resp.into()),
408        }
409    }
410
411    /// Replace all rows in this [`Table`] with the input data, coerced to this
412    /// [`Table`]'s existing [`Schema`], notifying any derived [`View`] and
413    /// [`View::on_update`] callbacks.
414    ///
415    /// Calling [`Table::replace`] is an easy way to replace _all_ the data in a
416    /// [`Table`] without losing any derived [`View`] instances or
417    /// [`View::on_update`] callbacks. [`Table::replace`] does _not_ infer
418    /// data types like [`Client::table`] does, rather it _coerces_ input
419    /// data to the `Schema` like [`Table::update`]. If you need a [`Table`]
420    /// with a different `Schema`, you must create a new one.
421    ///
422    /// # Examples
423    ///
424    /// ```rust
425    /// let data = UpdateData::Csv("x,y\n1,2".into());
426    /// let opts = UpdateOptions::default();
427    /// table.replace(data, opts).await?;
428    /// ```
429    pub async fn replace(&self, input: UpdateData) -> ClientResult<()> {
430        let msg = self.client_message(ClientReq::TableReplaceReq(TableReplaceReq {
431            data: Some(input.into()),
432        }));
433
434        match self.client.oneshot(&msg).await? {
435            ClientResp::TableReplaceResp(_) => Ok(()),
436            resp => Err(resp.into()),
437        }
438    }
439
440    /// Updates the rows of this table and any derived [`View`] instances.
441    ///
442    /// Calling [`Table::update`] will trigger the [`View::on_update`] callbacks
443    /// register to derived [`View`], and the call itself will not resolve until
444    /// _all_ derived [`View`]'s are notified.
445    ///
446    /// When updating a [`Table`] with an `index`, [`Table::update`] supports
447    /// partial updates, by omitting columns from the update data.
448    ///
449    /// # Arguments
450    ///
451    /// - `input` - The input data for this [`Table`]. The schema of a [`Table`]
452    ///   is immutable after creation, so this method cannot be called with a
453    ///   schema.
454    /// - `options` - Options for this update step - see [`UpdateOptions`].
455    ///
456    /// # Examples
457    ///
458    /// ```rust
459    /// let data = UpdateData::Csv("x,y\n1,2".into());
460    /// let opts = UpdateOptions::default();
461    /// table.update(data, opts).await?;
462    /// ```  
463    pub async fn update(&self, input: UpdateData, options: UpdateOptions) -> ClientResult<()> {
464        let msg = self.client_message(ClientReq::TableUpdateReq(TableUpdateReq {
465            data: Some(input.into()),
466            port_id: options.port_id.unwrap_or(0),
467        }));
468
469        match self.client.oneshot(&msg).await? {
470            ClientResp::TableUpdateResp(_) => Ok(()),
471            resp => Err(resp.into()),
472        }
473    }
474
475    /// Validates the given expressions.
476    pub async fn validate_expressions(
477        &self,
478        expressions: Expressions,
479    ) -> ClientResult<ValidateExpressionsData> {
480        let msg = self.client_message(ClientReq::TableValidateExprReq(TableValidateExprReq {
481            column_to_expr: expressions.0,
482        }));
483
484        match self.client.oneshot(&msg).await? {
485            ClientResp::TableValidateExprResp(result) => Ok(ValidateExpressionsData {
486                errors: result.errors,
487                expression_alias: result.expression_alias,
488                expression_schema: result
489                    .expression_schema
490                    .into_iter()
491                    .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
492                    .collect(),
493            }),
494            resp => Err(resp.into()),
495        }
496    }
497
498    /// Create a new [`View`] from this table with a specified
499    /// [`ViewConfigUpdate`].
500    ///
501    /// See [`View`] struct.
502    ///
503    /// # Examples
504    ///
505    /// ```rust
506    /// use crate::config::*;
507    /// let view = table
508    ///     .view(Some(ViewConfigUpdate {
509    ///         columns: Some(vec![Some("Sales".into())]),
510    ///         aggregates: Some(HashMap::from_iter(vec![("Sales".into(), "sum".into())])),
511    ///         group_by: Some(vec!["Region".into(), "Country".into()]),
512    ///         filter: Some(vec![Filter::new("Category", "in", &[
513    ///             "Furniture",
514    ///             "Technology",
515    ///         ])]),
516    ///         ..ViewConfigUpdate::default()
517    ///     }))
518    ///     .await?;
519    /// ```
520    pub async fn view(&self, config: Option<ViewConfigUpdate>) -> ClientResult<View> {
521        let view_name = nanoid!();
522        let msg = Request {
523            msg_id: self.client.gen_id(),
524            entity_id: self.name.clone(),
525            client_req: ClientReq::TableMakeViewReq(TableMakeViewReq {
526                view_id: view_name.clone(),
527                config: config.map(|x| x.into()),
528            })
529            .into(),
530        };
531
532        match self.client.oneshot(&msg).await? {
533            ClientResp::TableMakeViewResp(TableMakeViewResp { view_id })
534                if view_id == view_name =>
535            {
536                Ok(View::new(view_name, self.client.clone()))
537            },
538            resp => Err(resp.into()),
539        }
540    }
541}