perspective_client/table.rs
1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
5// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors. ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::fmt::Display;
15
16use serde::{Deserialize, Serialize};
17use ts_rs::TS;
18
19use crate::assert_table_api;
20use crate::client::{Client, Features};
21use crate::config::{Expressions, ViewConfigUpdate};
22use crate::proto::make_table_req::MakeTableOptions;
23use crate::proto::make_table_req::make_table_options::MakeTableType;
24use crate::proto::request::ClientReq;
25use crate::proto::response::ClientResp;
26use crate::proto::*;
27use crate::table_data::UpdateData;
28use crate::utils::*;
29use crate::view::View;
30
31pub type Schema = HashMap<String, ColumnType>;
32
33/// The format to interpret data preovided to [`Client::table`].
34///
35/// When serialized, these values are `"csv"`, `"json"`, `"columns"`, `"arrow"`
36/// and `"ndjson"`.
37#[derive(Clone, Copy, Debug, Serialize, Deserialize, TS)]
38pub enum TableReadFormat {
39 #[serde(rename = "csv")]
40 Csv,
41
42 #[serde(rename = "json")]
43 JsonString,
44
45 #[serde(rename = "columns")]
46 ColumnsString,
47
48 #[serde(rename = "arrow")]
49 Arrow,
50
51 #[serde(rename = "ndjson")]
52 Ndjson,
53}
54
55impl TableReadFormat {
56 pub fn parse(value: Option<String>) -> Result<Option<Self>, String> {
57 Ok(match value.as_deref() {
58 Some("csv") => Some(TableReadFormat::Csv),
59 Some("json") => Some(TableReadFormat::JsonString),
60 Some("columns") => Some(TableReadFormat::ColumnsString),
61 Some("arrow") => Some(TableReadFormat::Arrow),
62 Some("ndjson") => Some(TableReadFormat::Ndjson),
63 None => None,
64 Some(x) => return Err(format!("Unknown format \"{x}\"")),
65 })
66 }
67}
68
69/// Options which impact the behavior of [`Client::table`], as well as
70/// subsequent calls to [`Table::update`].
71#[derive(Clone, Debug, Default, Serialize, Deserialize, TS)]
72pub struct TableInitOptions {
73 #[serde(default)]
74 #[ts(optional)]
75 pub name: Option<String>,
76
77 #[serde(default)]
78 #[ts(optional)]
79 pub format: Option<TableReadFormat>,
80
81 /// This [`Table`] should use the column named by the `index` parameter as
82 /// the `index`, which causes [`Table::update`] and [`Client::table`] input
83 /// to either insert or update existing rows based on `index` column
84 /// value equality.
85 #[serde(default)]
86 #[ts(optional)]
87 pub index: Option<String>,
88
89 /// This [`Table`] should be limited to `limit` rows, after which the
90 /// _earliest_ rows will be overwritten (where _earliest_ is defined as
91 /// relative to insertion order).
92 #[serde(default)]
93 #[ts(optional)]
94 pub limit: Option<u32>,
95}
96
97impl TableInitOptions {
98 pub fn set_name<D: Display>(&mut self, name: D) {
99 self.name = Some(format!("{name}"))
100 }
101}
102
103impl TryFrom<TableOptions> for MakeTableOptions {
104 type Error = ClientError;
105
106 fn try_from(value: TableOptions) -> Result<Self, Self::Error> {
107 Ok(MakeTableOptions {
108 make_table_type: match value {
109 TableOptions {
110 index: Some(_),
111 limit: Some(_),
112 } => Err(ClientError::BadTableOptions)?,
113 TableOptions {
114 index: Some(index), ..
115 } => Some(MakeTableType::MakeIndexTable(index)),
116 TableOptions {
117 limit: Some(limit), ..
118 } => Some(MakeTableType::MakeLimitTable(limit)),
119 _ => None,
120 },
121 })
122 }
123}
124
125#[derive(Clone, Debug)]
126pub(crate) struct TableOptions {
127 pub index: Option<String>,
128 pub limit: Option<u32>,
129}
130
131impl From<TableInitOptions> for TableOptions {
132 fn from(value: TableInitOptions) -> Self {
133 TableOptions {
134 index: value.index,
135 limit: value.limit,
136 }
137 }
138}
139
140/// Options for [`Client::join`].
141#[derive(Clone, Debug, Default, Serialize, Deserialize, TS)]
142pub struct JoinOptions {
143 #[serde(default)]
144 #[ts(optional)]
145 pub join_type: Option<crate::proto::JoinType>,
146
147 #[serde(default)]
148 #[ts(optional)]
149 pub name: Option<String>,
150
151 #[serde(default)]
152 #[ts(optional)]
153 pub right_on: Option<String>,
154}
155
156/// Options for [`Table::delete`].
157#[derive(Clone, Debug, Default, Deserialize, TS)]
158pub struct DeleteOptions {
159 pub lazy: bool,
160}
161
162/// Options for [`Table::update`].
163#[derive(Clone, Debug, Default, Deserialize, Serialize, TS)]
164pub struct UpdateOptions {
165 pub port_id: Option<u32>,
166 pub format: Option<TableReadFormat>,
167}
168
169/// Result of a call to [`Table::validate_expressions`], containing a schema
170/// for valid expressions and error messages for invalid ones.
171#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
172pub struct ExprValidationResult {
173 pub expression_schema: Schema,
174 pub errors: HashMap<String, table_validate_expr_resp::ExprValidationError>,
175 pub expression_alias: HashMap<String, String>,
176}
177
178/// [`Table`] is Perspective's columnar data frame, analogous to a Pandas/Polars
179/// `DataFrame` or Apache Arrow, supporting append & in-place updates, removal
180/// by index, and update notifications.
181///
182/// A [`Table`] contains columns, each of which have a unique name, are strongly
183/// and consistently typed, and contains rows of data conforming to the column's
184/// type. Each column in a [`Table`] must have the same number of rows, though
185/// not every row must contain data; null-values are used to indicate missing
186/// values in the dataset. The schema of a [`Table`] is _immutable after
187/// creation_, which means the column names and data types cannot be changed
188/// after the [`Table`] has been created. Columns cannot be added or deleted
189/// after creation either, but a [`View`] can be used to select an arbitrary set
190/// of columns from the [`Table`].
191#[derive(Clone)]
192pub struct Table {
193 name: String,
194 client: Client,
195 options: TableOptions,
196
197 /// If this table is constructed from a View, the view's on_update callback
198 /// is wired into this table. So, we store the token to clean it up properly
199 /// on destruction.
200 pub(crate) view_update_token: Option<u32>,
201}
202
203assert_table_api!(Table);
204
205impl PartialEq for Table {
206 fn eq(&self, other: &Self) -> bool {
207 self.name == other.name && self.client == other.client
208 }
209}
210
211impl Table {
212 pub(crate) fn new(name: String, client: Client, options: TableOptions) -> Self {
213 Table {
214 name,
215 client,
216 options,
217 view_update_token: None,
218 }
219 }
220
221 fn client_message(&self, req: ClientReq) -> Request {
222 Request {
223 msg_id: self.client.gen_id(),
224 entity_id: self.name.clone(),
225 client_req: Some(req),
226 }
227 }
228
229 /// Get a copy of the [`Client`] this [`Table`] came from.
230 pub fn get_client(&self) -> Client {
231 self.client.clone()
232 }
233
234 /// Get a metadata dictionary of the `perspective_server::Server`'s
235 /// features, which is (currently) implementation specific, but there is
236 /// only one implementation.
237 pub async fn get_features(&self) -> ClientResult<Features> {
238 self.client.get_features().await
239 }
240
241 /// Returns the name of the index column for the table.
242 ///
243 /// # Examples
244 ///
245 /// ```no_run
246 /// # use perspective_client::{Client, TableData, TableInitOptions, UpdateData};
247 /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
248 /// # let client: Client = todo!();
249 /// let options = TableInitOptions {
250 /// index: Some("x".to_string()),
251 /// ..TableInitOptions::default()
252 /// };
253 /// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
254 /// let table = client.table(data, options).await?;
255 /// let index = table.get_index();
256 /// # Ok(()) }
257 /// ```
258 pub fn get_index(&self) -> Option<String> {
259 self.options.index.as_ref().map(|index| index.to_owned())
260 }
261
262 /// Returns the user-specified row limit for this table.
263 pub fn get_limit(&self) -> Option<u32> {
264 self.options.limit.as_ref().map(|limit| *limit)
265 }
266
267 /// Returns the user-specified name for this table, or the auto-generated
268 /// name if a name was not specified when the table was created.
269 pub fn get_name(&self) -> &str {
270 self.name.as_str()
271 }
272
273 /// Removes all the rows in the [`Table`], but preserves everything else
274 /// including the schema, index, and any callbacks or registered
275 /// [`View`] instances.
276 ///
277 /// Calling [`Table::clear`], like [`Table::update`] and [`Table::remove`],
278 /// will trigger an update event to any registered listeners via
279 /// [`View::on_update`].
280 pub async fn clear(&self) -> ClientResult<()> {
281 self.replace(UpdateData::JsonRows("[]".to_owned())).await
282 }
283
284 /// Delete this [`Table`] and cleans up associated resources.
285 ///
286 /// [`Table`]s do not stop consuming resources or processing updates when
287 /// they are garbage collected in their host language - you must call
288 /// this method to reclaim these.
289 ///
290 /// # Arguments
291 ///
292 /// - `options` An options dictionary.
293 /// - `lazy` Whether to delete this [`Table`] _lazily_. When false (the
294 /// default), the delete will occur immediately, assuming it has no
295 /// [`View`] instances registered to it (which must be deleted first,
296 /// otherwise this method will throw an error). When true, the
297 /// [`Table`] will only be marked for deltion once its [`View`]
298 /// dependency count reaches 0.
299 ///
300 /// # Examples
301 ///
302 /// ```no_run
303 /// # use perspective_client::{Client, DeleteOptions, TableData, TableInitOptions, UpdateData};
304 /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
305 /// # let client: Client = todo!();
306 /// let opts = TableInitOptions::default();
307 /// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
308 /// let table = client.table(data, opts).await?;
309 ///
310 /// // ...
311 ///
312 /// table.delete(DeleteOptions::default()).await?;
313 /// # Ok(()) }
314 /// ```
315 pub async fn delete(&self, options: DeleteOptions) -> ClientResult<()> {
316 let msg = self.client_message(ClientReq::TableDeleteReq(TableDeleteReq {
317 is_immediate: !options.lazy,
318 }));
319
320 match self.client.oneshot(&msg).await? {
321 ClientResp::TableDeleteResp(_) => Ok(()),
322 resp => Err(resp.into()),
323 }
324 }
325
326 /// Returns the column names of this [`Table`] in "natural" order (the
327 /// ordering implied by the input format).
328 ///
329 /// # Examples
330 ///
331 /// ```no_run
332 /// # use perspective_client::Table;
333 /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
334 /// # let table: Table = todo!();
335 /// let columns = table.columns().await?;
336 /// # Ok(()) }
337 /// ```
338 pub async fn columns(&self) -> ClientResult<Vec<String>> {
339 let msg = self.client_message(ClientReq::TableSchemaReq(TableSchemaReq {}));
340 match self.client.oneshot(&msg).await? {
341 ClientResp::TableSchemaResp(TableSchemaResp { schema }) => Ok(schema
342 .map(|x| x.schema.into_iter().map(|x| x.name.to_owned()).collect())
343 .unwrap()),
344 resp => Err(resp.into()),
345 }
346 }
347
348 /// Returns the number of rows in a [`Table`].
349 pub async fn size(&self) -> ClientResult<usize> {
350 let msg = self.client_message(ClientReq::TableSizeReq(TableSizeReq {}));
351 match self.client.oneshot(&msg).await? {
352 ClientResp::TableSizeResp(TableSizeResp { size }) => Ok(size as usize),
353 resp => Err(resp.into()),
354 }
355 }
356
357 /// Returns a table's [`Schema`], a mapping of column names to column types.
358 ///
359 /// The mapping of a [`Table`]'s column names to data types is referred to
360 /// as a [`Schema`]. Each column has a unique name and a data type, one
361 /// of:
362 ///
363 /// - `"boolean"` - A boolean type
364 /// - `"date"` - A timesonze-agnostic date type (month/day/year)
365 /// - `"datetime"` - A millisecond-precision datetime type in the UTC
366 /// timezone
367 /// - `"float"` - A 64 bit float
368 /// - `"integer"` - A signed 32 bit integer (the integer type supported by
369 /// JavaScript)
370 /// - `"string"` - A [`String`] data type (encoded internally as a
371 /// _dictionary_)
372 ///
373 /// Note that all [`Table`] columns are _nullable_, regardless of the data
374 /// type.
375 pub async fn schema(&self) -> ClientResult<Schema> {
376 let msg = self.client_message(ClientReq::TableSchemaReq(TableSchemaReq {}));
377 match self.client.oneshot(&msg).await? {
378 ClientResp::TableSchemaResp(TableSchemaResp { schema }) => Ok(schema
379 .map(|x| {
380 x.schema
381 .into_iter()
382 .map(|x| (x.name, ColumnType::try_from(x.r#type).unwrap()))
383 .collect()
384 })
385 .unwrap()),
386 resp => Err(resp.into()),
387 }
388 }
389
390 /// Create a unique channel ID on this [`Table`], which allows
391 /// `View::on_update` callback calls to be associated with the
392 /// `Table::update` which caused them.
393 pub async fn make_port(&self) -> ClientResult<i32> {
394 let msg = self.client_message(ClientReq::TableMakePortReq(TableMakePortReq {}));
395 match self.client.oneshot(&msg).await? {
396 ClientResp::TableMakePortResp(TableMakePortResp { port_id }) => Ok(port_id as i32),
397 _ => Err(ClientError::Unknown("make_port".to_string())),
398 }
399 }
400
401 /// Register a callback which is called exactly once, when this [`Table`] is
402 /// deleted with the [`Table::delete`] method.
403 ///
404 /// [`Table::on_delete`] resolves when the subscription message is sent, not
405 /// when the _delete_ event occurs.
406 pub async fn on_delete(
407 &self,
408 on_delete: Box<dyn Fn() + Send + Sync + 'static>,
409 ) -> ClientResult<u32> {
410 let callback = move |resp: Response| match resp.client_resp {
411 Some(ClientResp::TableOnDeleteResp(_)) => {
412 on_delete();
413 Ok(())
414 },
415 resp => Err(resp.into()),
416 };
417
418 let msg = self.client_message(ClientReq::TableOnDeleteReq(TableOnDeleteReq {}));
419 self.client.subscribe_once(&msg, Box::new(callback)).await?;
420 Ok(msg.msg_id)
421 }
422
423 /// Removes a listener with a given ID, as returned by a previous call to
424 /// [`Table::on_delete`].
425 pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
426 let msg = self.client_message(ClientReq::TableRemoveDeleteReq(TableRemoveDeleteReq {
427 id: callback_id,
428 }));
429
430 match self.client.oneshot(&msg).await? {
431 ClientResp::TableRemoveDeleteResp(_) => Ok(()),
432 resp => Err(resp.into()),
433 }
434 }
435
436 /// Removes rows from this [`Table`] with the `index` column values
437 /// supplied.
438 ///
439 /// # Arguments
440 ///
441 /// - `indices` - A list of `index` column values for rows that should be
442 /// removed.
443 ///
444 /// # Examples
445 ///
446 /// ```no_run
447 /// # use perspective_client::{Table, UpdateData};
448 /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
449 /// # let table: Table = todo!();
450 /// table
451 /// .remove(UpdateData::Csv("index\n1\n2\n3".into()))
452 /// .await?;
453 /// # Ok(()) }
454 /// ```
455 pub async fn remove(&self, input: UpdateData) -> ClientResult<()> {
456 let msg = self.client_message(ClientReq::TableRemoveReq(TableRemoveReq {
457 data: Some(input.into()),
458 }));
459
460 match self.client.oneshot(&msg).await? {
461 ClientResp::TableRemoveResp(_) => Ok(()),
462 resp => Err(resp.into()),
463 }
464 }
465
466 /// Replace all rows in this [`Table`] with the input data, coerced to this
467 /// [`Table`]'s existing [`Schema`], notifying any derived [`View`] and
468 /// [`View::on_update`] callbacks.
469 ///
470 /// Calling [`Table::replace`] is an easy way to replace _all_ the data in a
471 /// [`Table`] without losing any derived [`View`] instances or
472 /// [`View::on_update`] callbacks. [`Table::replace`] does _not_ infer
473 /// data types like [`Client::table`] does, rather it _coerces_ input
474 /// data to the `Schema` like [`Table::update`]. If you need a [`Table`]
475 /// with a different `Schema`, you must create a new one.
476 ///
477 /// # Examples
478 ///
479 /// ```no_run
480 /// # use perspective_client::{Table, UpdateData};
481 /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
482 /// # let table: Table = todo!();
483 /// let data = UpdateData::Csv("x,y\n1,2".into());
484 /// table.replace(data).await?;
485 /// # Ok(()) }
486 /// ```
487 pub async fn replace(&self, input: UpdateData) -> ClientResult<()> {
488 let msg = self.client_message(ClientReq::TableReplaceReq(TableReplaceReq {
489 data: Some(input.into()),
490 }));
491
492 match self.client.oneshot(&msg).await? {
493 ClientResp::TableReplaceResp(_) => Ok(()),
494 resp => Err(resp.into()),
495 }
496 }
497
498 /// Updates the rows of this table and any derived [`View`] instances.
499 ///
500 /// Calling [`Table::update`] will trigger the [`View::on_update`] callbacks
501 /// register to derived [`View`], and the call itself will not resolve until
502 /// _all_ derived [`View`]'s are notified.
503 ///
504 /// When updating a [`Table`] with an `index`, [`Table::update`] supports
505 /// partial updates, by omitting columns from the update data.
506 ///
507 /// # Arguments
508 ///
509 /// - `input` - The input data for this [`Table`]. The schema of a [`Table`]
510 /// is immutable after creation, so this method cannot be called with a
511 /// schema.
512 /// - `options` - Options for this update step - see [`UpdateOptions`].
513 ///
514 /// # Examples
515 ///
516 /// ```no_run
517 /// # use perspective_client::{Table, UpdateData, UpdateOptions};
518 /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
519 /// # let table: Table = todo!();
520 /// let data = UpdateData::Csv("x,y\n1,2".into());
521 /// let opts = UpdateOptions::default();
522 /// table.update(data, opts).await?;
523 /// # Ok(()) }
524 /// ```
525 pub async fn update(&self, input: UpdateData, options: UpdateOptions) -> ClientResult<()> {
526 let msg = self.client_message(ClientReq::TableUpdateReq(TableUpdateReq {
527 data: Some(input.into()),
528 port_id: options.port_id.unwrap_or(0),
529 }));
530
531 match self.client.oneshot(&msg).await? {
532 ClientResp::TableUpdateResp(_) => Ok(()),
533 resp => Err(resp.into()),
534 }
535 }
536
537 /// Validates the given expressions.
538 pub async fn validate_expressions(
539 &self,
540 expressions: Expressions,
541 ) -> ClientResult<ExprValidationResult> {
542 let msg = self.client_message(ClientReq::TableValidateExprReq(TableValidateExprReq {
543 column_to_expr: expressions.0,
544 }));
545
546 match self.client.oneshot(&msg).await? {
547 ClientResp::TableValidateExprResp(result) => Ok(ExprValidationResult {
548 errors: result.errors,
549 expression_alias: result.expression_alias,
550 expression_schema: result
551 .expression_schema
552 .into_iter()
553 .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
554 .collect(),
555 }),
556 resp => Err(resp.into()),
557 }
558 }
559
560 /// Create a new [`View`] from this table with a specified
561 /// [`ViewConfigUpdate`].
562 ///
563 /// See [`View`] struct.
564 ///
565 /// # Examples
566 ///
567 /// ```no_run
568 /// # use std::collections::HashMap;
569 /// # use perspective_client::Table;
570 /// # use perspective_client::config::*;
571 /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
572 /// # let table: Table = todo!();
573 /// let view = table
574 /// .view(Some(ViewConfigUpdate {
575 /// columns: Some(vec![Some("Sales".into())]),
576 /// aggregates: Some(HashMap::from_iter(vec![("Sales".into(), "sum".into())])),
577 /// group_by: Some(vec!["Region".into(), "Country".into()]),
578 /// filter: Some(vec![Filter::new("Category", "in", &[
579 /// "Furniture",
580 /// "Technology",
581 /// ])]),
582 /// ..ViewConfigUpdate::default()
583 /// }))
584 /// .await?;
585 /// # Ok(()) }
586 /// ```
587 pub async fn view(&self, config: Option<ViewConfigUpdate>) -> ClientResult<View> {
588 let view_name = randid();
589 let msg = Request {
590 msg_id: self.client.gen_id(),
591 entity_id: self.name.clone(),
592 client_req: ClientReq::TableMakeViewReq(TableMakeViewReq {
593 view_id: view_name.clone(),
594 config: config.map(|x| x.into()),
595 })
596 .into(),
597 };
598
599 match self.client.oneshot(&msg).await? {
600 ClientResp::TableMakeViewResp(TableMakeViewResp { view_id })
601 if view_id == view_name =>
602 {
603 Ok(View::new(view_name, self.client.clone()))
604 },
605 resp => Err(resp.into()),
606 }
607 }
608}