Skip to main content

perspective_client/
table.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::fmt::Display;
15
16use serde::{Deserialize, Serialize};
17use ts_rs::TS;
18
19use crate::assert_table_api;
20use crate::client::{Client, Features};
21use crate::config::{Expressions, ViewConfigUpdate};
22use crate::proto::make_table_req::MakeTableOptions;
23use crate::proto::make_table_req::make_table_options::MakeTableType;
24use crate::proto::request::ClientReq;
25use crate::proto::response::ClientResp;
26use crate::proto::*;
27use crate::table_data::UpdateData;
28use crate::utils::*;
29use crate::view::View;
30
31pub type Schema = HashMap<String, ColumnType>;
32
33/// The format to interpret data preovided to [`Client::table`].
34///
35/// When serialized, these values are `"csv"`, `"json"`, `"columns"`, `"arrow"`
36/// and `"ndjson"`.
37#[derive(Clone, Copy, Debug, Serialize, Deserialize, TS)]
38pub enum TableReadFormat {
39    #[serde(rename = "csv")]
40    Csv,
41
42    #[serde(rename = "json")]
43    JsonString,
44
45    #[serde(rename = "columns")]
46    ColumnsString,
47
48    #[serde(rename = "arrow")]
49    Arrow,
50
51    #[serde(rename = "ndjson")]
52    Ndjson,
53}
54
55impl TableReadFormat {
56    pub fn parse(value: Option<String>) -> Result<Option<Self>, String> {
57        Ok(match value.as_deref() {
58            Some("csv") => Some(TableReadFormat::Csv),
59            Some("json") => Some(TableReadFormat::JsonString),
60            Some("columns") => Some(TableReadFormat::ColumnsString),
61            Some("arrow") => Some(TableReadFormat::Arrow),
62            Some("ndjson") => Some(TableReadFormat::Ndjson),
63            None => None,
64            Some(x) => return Err(format!("Unknown format \"{x}\"")),
65        })
66    }
67}
68
69/// Options which impact the behavior of [`Client::table`], as well as
70/// subsequent calls to [`Table::update`].
71#[derive(Clone, Debug, Default, Serialize, Deserialize, TS)]
72pub struct TableInitOptions {
73    #[serde(default)]
74    #[ts(optional)]
75    pub name: Option<String>,
76
77    #[serde(default)]
78    #[ts(optional)]
79    pub format: Option<TableReadFormat>,
80
81    /// This [`Table`] should use the column named by the `index` parameter as
82    /// the `index`, which causes [`Table::update`] and [`Client::table`] input
83    /// to either insert or update existing rows based on `index` column
84    /// value equality.
85    #[serde(default)]
86    #[ts(optional)]
87    pub index: Option<String>,
88
89    /// This [`Table`] should be limited to `limit` rows, after which the
90    /// _earliest_ rows will be overwritten (where _earliest_ is defined as
91    /// relative to insertion order).
92    #[serde(default)]
93    #[ts(optional)]
94    pub limit: Option<u32>,
95}
96
97impl TableInitOptions {
98    pub fn set_name<D: Display>(&mut self, name: D) {
99        self.name = Some(format!("{name}"))
100    }
101}
102
103impl TryFrom<TableOptions> for MakeTableOptions {
104    type Error = ClientError;
105
106    fn try_from(value: TableOptions) -> Result<Self, Self::Error> {
107        Ok(MakeTableOptions {
108            make_table_type: match value {
109                TableOptions {
110                    index: Some(_),
111                    limit: Some(_),
112                } => Err(ClientError::BadTableOptions)?,
113                TableOptions {
114                    index: Some(index), ..
115                } => Some(MakeTableType::MakeIndexTable(index)),
116                TableOptions {
117                    limit: Some(limit), ..
118                } => Some(MakeTableType::MakeLimitTable(limit)),
119                _ => None,
120            },
121        })
122    }
123}
124
125#[derive(Clone, Debug)]
126pub(crate) struct TableOptions {
127    pub index: Option<String>,
128    pub limit: Option<u32>,
129}
130
131impl From<TableInitOptions> for TableOptions {
132    fn from(value: TableInitOptions) -> Self {
133        TableOptions {
134            index: value.index,
135            limit: value.limit,
136        }
137    }
138}
139
140/// Options for [`Client::join`].
141#[derive(Clone, Debug, Default, Serialize, Deserialize, TS)]
142pub struct JoinOptions {
143    #[serde(default)]
144    #[ts(optional)]
145    pub join_type: Option<crate::proto::JoinType>,
146
147    #[serde(default)]
148    #[ts(optional)]
149    pub name: Option<String>,
150
151    #[serde(default)]
152    #[ts(optional)]
153    pub right_on: Option<String>,
154}
155
156/// Options for [`Table::delete`].
157#[derive(Clone, Debug, Default, Deserialize, TS)]
158pub struct DeleteOptions {
159    pub lazy: bool,
160}
161
162/// Options for [`Table::update`].
163#[derive(Clone, Debug, Default, Deserialize, Serialize, TS)]
164pub struct UpdateOptions {
165    pub port_id: Option<u32>,
166    pub format: Option<TableReadFormat>,
167}
168
169/// Result of a call to [`Table::validate_expressions`], containing a schema
170/// for valid expressions and error messages for invalid ones.
171#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
172pub struct ExprValidationResult {
173    pub expression_schema: Schema,
174    pub errors: HashMap<String, table_validate_expr_resp::ExprValidationError>,
175    pub expression_alias: HashMap<String, String>,
176}
177
178/// [`Table`] is Perspective's columnar data frame, analogous to a Pandas/Polars
179/// `DataFrame` or Apache Arrow, supporting append & in-place updates, removal
180/// by index, and update notifications.
181///
182/// A [`Table`] contains columns, each of which have a unique name, are strongly
183/// and consistently typed, and contains rows of data conforming to the column's
184/// type. Each column in a [`Table`] must have the same number of rows, though
185/// not every row must contain data; null-values are used to indicate missing
186/// values in the dataset. The schema of a [`Table`] is _immutable after
187/// creation_, which means the column names and data types cannot be changed
188/// after the [`Table`] has been created. Columns cannot be added or deleted
189/// after creation either, but a [`View`] can be used to select an arbitrary set
190/// of columns from the [`Table`].
191#[derive(Clone)]
192pub struct Table {
193    name: String,
194    client: Client,
195    options: TableOptions,
196
197    /// If this table is constructed from a View, the view's on_update callback
198    /// is wired into this table. So, we store the token to clean it up properly
199    /// on destruction.
200    pub(crate) view_update_token: Option<u32>,
201}
202
203assert_table_api!(Table);
204
205impl PartialEq for Table {
206    fn eq(&self, other: &Self) -> bool {
207        self.name == other.name && self.client == other.client
208    }
209}
210
211impl Table {
212    pub(crate) fn new(name: String, client: Client, options: TableOptions) -> Self {
213        Table {
214            name,
215            client,
216            options,
217            view_update_token: None,
218        }
219    }
220
221    fn client_message(&self, req: ClientReq) -> Request {
222        Request {
223            msg_id: self.client.gen_id(),
224            entity_id: self.name.clone(),
225            client_req: Some(req),
226        }
227    }
228
229    /// Get a copy of the [`Client`] this [`Table`] came from.
230    pub fn get_client(&self) -> Client {
231        self.client.clone()
232    }
233
234    /// Get a metadata dictionary of the `perspective_server::Server`'s
235    /// features, which is (currently) implementation specific, but there is
236    /// only one implementation.
237    pub async fn get_features(&self) -> ClientResult<Features> {
238        self.client.get_features().await
239    }
240
241    /// Returns the name of the index column for the table.
242    ///
243    /// # Examples
244    ///
245    /// ```rust
246    /// let options = TableInitOptions {
247    ///     index: Some("x".to_string()),
248    ///     ..default()
249    /// };
250    /// let table = client.table("x,y\n1,2\n3,4", options).await;
251    /// let index = table.get_index()
252    /// ```
253    pub fn get_index(&self) -> Option<String> {
254        self.options.index.as_ref().map(|index| index.to_owned())
255    }
256
257    /// Returns the user-specified row limit for this table.
258    pub fn get_limit(&self) -> Option<u32> {
259        self.options.limit.as_ref().map(|limit| *limit)
260    }
261
262    /// Returns the user-specified name for this table, or the auto-generated
263    /// name if a name was not specified when the table was created.
264    pub fn get_name(&self) -> &str {
265        self.name.as_str()
266    }
267
268    /// Removes all the rows in the [`Table`], but preserves everything else
269    /// including the schema, index, and any callbacks or registered
270    /// [`View`] instances.
271    ///
272    /// Calling [`Table::clear`], like [`Table::update`] and [`Table::remove`],
273    /// will trigger an update event to any registered listeners via
274    /// [`View::on_update`].
275    pub async fn clear(&self) -> ClientResult<()> {
276        self.replace(UpdateData::JsonRows("[]".to_owned())).await
277    }
278
279    /// Delete this [`Table`] and cleans up associated resources.
280    ///
281    /// [`Table`]s do not stop consuming resources or processing updates when
282    /// they are garbage collected in their host language - you must call
283    /// this method to reclaim these.
284    ///
285    /// # Arguments
286    ///
287    /// - `options` An options dictionary.
288    ///     - `lazy` Whether to delete this [`Table`] _lazily_. When false (the
289    ///       default), the delete will occur immediately, assuming it has no
290    ///       [`View`] instances registered to it (which must be deleted first,
291    ///       otherwise this method will throw an error). When true, the
292    ///       [`Table`] will only be marked for deltion once its [`View`]
293    ///       dependency count reaches 0.
294    ///
295    /// # Examples
296    ///
297    /// ```rust
298    /// let opts = TableInitOptions::default();
299    /// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
300    /// let table = client.table(data, opts).await?;
301    ///
302    /// // ...
303    ///
304    /// table.delete(DeleteOptions::default()).await?;
305    /// ```
306    pub async fn delete(&self, options: DeleteOptions) -> ClientResult<()> {
307        let msg = self.client_message(ClientReq::TableDeleteReq(TableDeleteReq {
308            is_immediate: !options.lazy,
309        }));
310
311        match self.client.oneshot(&msg).await? {
312            ClientResp::TableDeleteResp(_) => Ok(()),
313            resp => Err(resp.into()),
314        }
315    }
316
317    /// Returns the column names of this [`Table`] in "natural" order (the
318    /// ordering implied by the input format).
319    ///  
320    /// # Examples
321    ///
322    /// ```rust
323    /// let columns = table.columns().await;
324    /// ```
325    pub async fn columns(&self) -> ClientResult<Vec<String>> {
326        let msg = self.client_message(ClientReq::TableSchemaReq(TableSchemaReq {}));
327        match self.client.oneshot(&msg).await? {
328            ClientResp::TableSchemaResp(TableSchemaResp { schema }) => Ok(schema
329                .map(|x| x.schema.into_iter().map(|x| x.name.to_owned()).collect())
330                .unwrap()),
331            resp => Err(resp.into()),
332        }
333    }
334
335    /// Returns the number of rows in a [`Table`].
336    pub async fn size(&self) -> ClientResult<usize> {
337        let msg = self.client_message(ClientReq::TableSizeReq(TableSizeReq {}));
338        match self.client.oneshot(&msg).await? {
339            ClientResp::TableSizeResp(TableSizeResp { size }) => Ok(size as usize),
340            resp => Err(resp.into()),
341        }
342    }
343
344    /// Returns a table's [`Schema`], a mapping of column names to column types.
345    ///
346    /// The mapping of a [`Table`]'s column names to data types is referred to
347    /// as a [`Schema`]. Each column has a unique name and a data type, one
348    /// of:
349    ///
350    /// - `"boolean"` - A boolean type
351    /// - `"date"` - A timesonze-agnostic date type (month/day/year)
352    /// - `"datetime"` - A millisecond-precision datetime type in the UTC
353    ///   timezone
354    /// - `"float"` - A 64 bit float
355    /// - `"integer"` - A signed 32 bit integer (the integer type supported by
356    ///   JavaScript)
357    /// - `"string"` - A [`String`] data type (encoded internally as a
358    ///   _dictionary_)
359    ///
360    /// Note that all [`Table`] columns are _nullable_, regardless of the data
361    /// type.
362    pub async fn schema(&self) -> ClientResult<Schema> {
363        let msg = self.client_message(ClientReq::TableSchemaReq(TableSchemaReq {}));
364        match self.client.oneshot(&msg).await? {
365            ClientResp::TableSchemaResp(TableSchemaResp { schema }) => Ok(schema
366                .map(|x| {
367                    x.schema
368                        .into_iter()
369                        .map(|x| (x.name, ColumnType::try_from(x.r#type).unwrap()))
370                        .collect()
371                })
372                .unwrap()),
373            resp => Err(resp.into()),
374        }
375    }
376
377    /// Create a unique channel ID on this [`Table`], which allows
378    /// `View::on_update` callback calls to be associated with the
379    /// `Table::update` which caused them.
380    pub async fn make_port(&self) -> ClientResult<i32> {
381        let msg = self.client_message(ClientReq::TableMakePortReq(TableMakePortReq {}));
382        match self.client.oneshot(&msg).await? {
383            ClientResp::TableMakePortResp(TableMakePortResp { port_id }) => Ok(port_id as i32),
384            _ => Err(ClientError::Unknown("make_port".to_string())),
385        }
386    }
387
388    /// Register a callback which is called exactly once, when this [`Table`] is
389    /// deleted with the [`Table::delete`] method.
390    ///
391    /// [`Table::on_delete`] resolves when the subscription message is sent, not
392    /// when the _delete_ event occurs.
393    pub async fn on_delete(
394        &self,
395        on_delete: Box<dyn Fn() + Send + Sync + 'static>,
396    ) -> ClientResult<u32> {
397        let callback = move |resp: Response| match resp.client_resp {
398            Some(ClientResp::TableOnDeleteResp(_)) => {
399                on_delete();
400                Ok(())
401            },
402            resp => Err(resp.into()),
403        };
404
405        let msg = self.client_message(ClientReq::TableOnDeleteReq(TableOnDeleteReq {}));
406        self.client.subscribe_once(&msg, Box::new(callback)).await?;
407        Ok(msg.msg_id)
408    }
409
410    /// Removes a listener with a given ID, as returned by a previous call to
411    /// [`Table::on_delete`].
412    pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
413        let msg = self.client_message(ClientReq::TableRemoveDeleteReq(TableRemoveDeleteReq {
414            id: callback_id,
415        }));
416
417        match self.client.oneshot(&msg).await? {
418            ClientResp::TableRemoveDeleteResp(_) => Ok(()),
419            resp => Err(resp.into()),
420        }
421    }
422
423    /// Removes rows from this [`Table`] with the `index` column values
424    /// supplied.
425    ///
426    /// # Arguments
427    ///
428    /// - `indices` - A list of `index` column values for rows that should be
429    ///   removed.
430    ///
431    /// # Examples
432    ///
433    /// ```rust
434    /// table.remove(UpdateData::Csv("index\n1\n2\n3")).await?;
435    /// ```
436    pub async fn remove(&self, input: UpdateData) -> ClientResult<()> {
437        let msg = self.client_message(ClientReq::TableRemoveReq(TableRemoveReq {
438            data: Some(input.into()),
439        }));
440
441        match self.client.oneshot(&msg).await? {
442            ClientResp::TableRemoveResp(_) => Ok(()),
443            resp => Err(resp.into()),
444        }
445    }
446
447    /// Replace all rows in this [`Table`] with the input data, coerced to this
448    /// [`Table`]'s existing [`Schema`], notifying any derived [`View`] and
449    /// [`View::on_update`] callbacks.
450    ///
451    /// Calling [`Table::replace`] is an easy way to replace _all_ the data in a
452    /// [`Table`] without losing any derived [`View`] instances or
453    /// [`View::on_update`] callbacks. [`Table::replace`] does _not_ infer
454    /// data types like [`Client::table`] does, rather it _coerces_ input
455    /// data to the `Schema` like [`Table::update`]. If you need a [`Table`]
456    /// with a different `Schema`, you must create a new one.
457    ///
458    /// # Examples
459    ///
460    /// ```rust
461    /// let data = UpdateData::Csv("x,y\n1,2".into());
462    /// let opts = UpdateOptions::default();
463    /// table.replace(data, opts).await?;
464    /// ```
465    pub async fn replace(&self, input: UpdateData) -> ClientResult<()> {
466        let msg = self.client_message(ClientReq::TableReplaceReq(TableReplaceReq {
467            data: Some(input.into()),
468        }));
469
470        match self.client.oneshot(&msg).await? {
471            ClientResp::TableReplaceResp(_) => Ok(()),
472            resp => Err(resp.into()),
473        }
474    }
475
476    /// Updates the rows of this table and any derived [`View`] instances.
477    ///
478    /// Calling [`Table::update`] will trigger the [`View::on_update`] callbacks
479    /// register to derived [`View`], and the call itself will not resolve until
480    /// _all_ derived [`View`]'s are notified.
481    ///
482    /// When updating a [`Table`] with an `index`, [`Table::update`] supports
483    /// partial updates, by omitting columns from the update data.
484    ///
485    /// # Arguments
486    ///
487    /// - `input` - The input data for this [`Table`]. The schema of a [`Table`]
488    ///   is immutable after creation, so this method cannot be called with a
489    ///   schema.
490    /// - `options` - Options for this update step - see [`UpdateOptions`].
491    ///
492    /// # Examples
493    ///
494    /// ```rust
495    /// let data = UpdateData::Csv("x,y\n1,2".into());
496    /// let opts = UpdateOptions::default();
497    /// table.update(data, opts).await?;
498    /// ```  
499    pub async fn update(&self, input: UpdateData, options: UpdateOptions) -> ClientResult<()> {
500        let msg = self.client_message(ClientReq::TableUpdateReq(TableUpdateReq {
501            data: Some(input.into()),
502            port_id: options.port_id.unwrap_or(0),
503        }));
504
505        match self.client.oneshot(&msg).await? {
506            ClientResp::TableUpdateResp(_) => Ok(()),
507            resp => Err(resp.into()),
508        }
509    }
510
511    /// Validates the given expressions.
512    pub async fn validate_expressions(
513        &self,
514        expressions: Expressions,
515    ) -> ClientResult<ExprValidationResult> {
516        let msg = self.client_message(ClientReq::TableValidateExprReq(TableValidateExprReq {
517            column_to_expr: expressions.0,
518        }));
519
520        match self.client.oneshot(&msg).await? {
521            ClientResp::TableValidateExprResp(result) => Ok(ExprValidationResult {
522                errors: result.errors,
523                expression_alias: result.expression_alias,
524                expression_schema: result
525                    .expression_schema
526                    .into_iter()
527                    .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
528                    .collect(),
529            }),
530            resp => Err(resp.into()),
531        }
532    }
533
534    /// Create a new [`View`] from this table with a specified
535    /// [`ViewConfigUpdate`].
536    ///
537    /// See [`View`] struct.
538    ///
539    /// # Examples
540    ///
541    /// ```rust
542    /// use crate::config::*;
543    /// let view = table
544    ///     .view(Some(ViewConfigUpdate {
545    ///         columns: Some(vec![Some("Sales".into())]),
546    ///         aggregates: Some(HashMap::from_iter(vec![("Sales".into(), "sum".into())])),
547    ///         group_by: Some(vec!["Region".into(), "Country".into()]),
548    ///         filter: Some(vec![Filter::new("Category", "in", &[
549    ///             "Furniture",
550    ///             "Technology",
551    ///         ])]),
552    ///         ..ViewConfigUpdate::default()
553    ///     }))
554    ///     .await?;
555    /// ```
556    pub async fn view(&self, config: Option<ViewConfigUpdate>) -> ClientResult<View> {
557        let view_name = randid();
558        let msg = Request {
559            msg_id: self.client.gen_id(),
560            entity_id: self.name.clone(),
561            client_req: ClientReq::TableMakeViewReq(TableMakeViewReq {
562                view_id: view_name.clone(),
563                config: config.map(|x| x.into()),
564            })
565            .into(),
566        };
567
568        match self.client.oneshot(&msg).await? {
569            ClientResp::TableMakeViewResp(TableMakeViewResp { view_id })
570                if view_id == view_name =>
571            {
572                Ok(View::new(view_name, self.client.clone()))
573            },
574            resp => Err(resp.into()),
575        }
576    }
577}