perspective_client/
view.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::str::FromStr;
15use std::sync::Arc;
16
17use futures::Future;
18use prost::bytes::Bytes;
19use serde::{Deserialize, Serialize};
20use ts_rs::TS;
21
22use self::view_on_update_req::Mode;
23use crate::assert_view_api;
24use crate::client::Client;
25use crate::proto::request::ClientReq;
26use crate::proto::response::ClientResp;
27use crate::proto::*;
28#[cfg(doc)]
29use crate::table::Table;
30pub use crate::utils::*;
31
32#[derive(Default, Debug, Deserialize, TS)]
33pub struct OnUpdateOptions {
34    pub mode: Option<OnUpdateMode>,
35}
36
37#[derive(Default, Debug, Deserialize, TS)]
38pub enum OnUpdateMode {
39    #[default]
40    #[serde(rename = "row")]
41    Row,
42}
43
44impl FromStr for OnUpdateMode {
45    type Err = ClientError;
46
47    fn from_str(s: &str) -> Result<Self, Self::Err> {
48        if s == "row" {
49            Ok(OnUpdateMode::Row)
50        } else {
51            Err(ClientError::Option)
52        }
53    }
54}
55
56#[derive(Clone, Debug, Serialize)]
57pub struct Dimensions {
58    pub num_view_rows: usize,
59    pub num_view_columns: usize,
60    pub num_table_rows: usize,
61    pub num_table_columns: usize,
62}
63
64#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
65pub struct ViewWindow {
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub start_row: Option<f32>,
68
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub start_col: Option<f32>,
71
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub end_row: Option<f32>,
74
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub end_col: Option<f32>,
77
78    #[serde(skip_serializing_if = "Option::is_none")]
79    pub id: Option<bool>,
80
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub index: Option<bool>,
83
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub leaves_only: Option<bool>,
86
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub formatted: Option<bool>,
89
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub compression: Option<String>,
92}
93
94impl From<ViewWindow> for ViewPort {
95    fn from(window: ViewWindow) -> Self {
96        ViewPort {
97            start_row: window.start_row.map(|x| x.floor() as u32),
98            start_col: window.start_col.map(|x| x.floor() as u32),
99            end_row: window.end_row.map(|x| x.ceil() as u32),
100            end_col: window.end_col.map(|x| x.ceil() as u32),
101        }
102    }
103}
104
105/// The [`View`] struct is Perspective's query and serialization interface. It
106/// represents a query on the `Table`'s dataset and is always created from an
107/// existing `Table` instance via the [`Table::view`] method.
108///
109/// [`View`]s are immutable with respect to the arguments provided to the
110/// [`Table::view`] method; to change these parameters, you must create a new
111/// [`View`] on the same [`Table`]. However, each [`View`] is _live_ with
112/// respect to the [`Table`]'s data, and will (within a conflation window)
113/// update with the latest state as its parent [`Table`] updates, including
114/// incrementally recalculating all aggregates, pivots, filters, etc. [`View`]
115/// query parameters are composable, in that each parameter works independently
116/// _and_ in conjunction with each other, and there is no limit to the number of
117/// pivots, filters, etc. which can be applied.
118///
119/// To construct a [`View`], call the [`Table::view`] factory method. A
120/// [`Table`] can have as many [`View`]s associated with it as you need -
121/// Perspective conserves memory by relying on a single [`Table`] to power
122/// multiple [`View`]s concurrently.
123///
124/// # Examples
125///
126/// ```rust
127/// let opts = TableInitOptions::default();
128/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
129/// let table = client.table(data, opts).await?;
130///
131/// let view = table.view(None).await?;
132/// let arrow = view.to_arrow().await?;
133/// view.delete().await?;
134/// ```
135///
136/// ```rust
137/// use crate::config::*;
138/// let view = table
139///     .view(Some(ViewConfigUpdate {
140///         columns: Some(vec![Some("Sales".into())]),
141///         aggregates: Some(HashMap::from_iter(vec![("Sales".into(), "sum".into())])),
142///         group_by: Some(vec!["Region".into(), "Country".into()]),
143///         filter: Some(vec![Filter::new("Category", "in", &[
144///             "Furniture",
145///             "Technology",
146///         ])]),
147///         ..ViewConfigUpdate::default()
148///     }))
149///     .await?;
150/// ```
151///
152///  Group By
153///
154/// ```rust
155/// let view = table
156///     .view(Some(ViewConfigUpdate {
157///         group_by: Some(vec!["a".into(), "c".into()]),
158///         ..ViewConfigUpdate::default()
159///     }))
160///     .await?;
161/// ```
162///
163/// Split By
164///
165/// ```rust
166/// let view = table
167///     .view(Some(ViewConfigUpdate {
168///         split_by: Some(vec!["a".into(), "c".into()]),
169///         ..ViewConfigUpdate::default()
170///     }))
171///     .await?;
172/// ```
173///
174/// In Javascript, a [`Table`] can be constructed on a [`Table::view`] instance,
175/// which will return a new [`Table`] based on the [`Table::view`]'s dataset,
176/// and all future updates that affect the [`Table::view`] will be forwarded to
177/// the new [`Table`]. This is particularly useful for implementing a
178/// [Client/Server Replicated](server.md#clientserver-replicated) design, by
179/// serializing the `View` to an arrow and setting up an `on_update` callback.
180///
181/// ```rust
182/// let opts = TableInitOptions::default();
183/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
184/// let table = client.table(data, opts).await?;
185/// let view = table.view(None).await?;
186/// let table2 = client.table(TableData::View(view)).await?;
187/// table.update(data).await?;
188/// ```
189#[derive(Clone, Debug)]
190pub struct View {
191    pub name: String,
192    client: Client,
193}
194
195assert_view_api!(View);
196
197impl View {
198    pub fn new(name: String, client: Client) -> Self {
199        View { name, client }
200    }
201
202    fn client_message(&self, req: ClientReq) -> Request {
203        crate::proto::Request {
204            msg_id: self.client.gen_id(),
205            entity_id: self.name.clone(),
206            client_req: Some(req),
207        }
208    }
209
210    /// Returns an array of strings containing the column paths of the [`View`]
211    /// without any of the source columns.
212    ///
213    /// A column path shows the columns that a given cell belongs to after
214    /// pivots are applied.
215    pub async fn column_paths(&self) -> ClientResult<Vec<String>> {
216        let msg = self.client_message(ClientReq::ViewColumnPathsReq(ViewColumnPathsReq {}));
217        match self.client.oneshot(&msg).await? {
218            ClientResp::ViewColumnPathsResp(ViewColumnPathsResp { paths }) => {
219                // Ok(paths.into_iter().map(|x| x.path).collect())
220                Ok(paths)
221            },
222            resp => Err(resp.into()),
223        }
224    }
225
226    /// Returns this [`View`]'s _dimensions_, row and column count, as well as
227    /// those of the [`crate::Table`] from which it was derived.
228    ///
229    /// - `num_table_rows` - The number of rows in the underlying
230    ///   [`crate::Table`].
231    /// - `num_table_columns` - The number of columns in the underlying
232    ///   [`crate::Table`] (including the `index` column if this
233    ///   [`crate::Table`] was constructed with one).
234    /// - `num_view_rows` - The number of rows in this [`View`]. If this
235    ///   [`View`] has a `group_by` clause, `num_view_rows` will also include
236    ///   aggregated rows.
237    /// - `num_view_columns` - The number of columns in this [`View`]. If this
238    ///   [`View`] has a `split_by` clause, `num_view_columns` will include all
239    ///   _column paths_, e.g. the number of `columns` clause times the number
240    ///   of `split_by` groups.
241    pub async fn dimensions(&self) -> ClientResult<ViewDimensionsResp> {
242        let msg = self.client_message(ClientReq::ViewDimensionsReq(ViewDimensionsReq {}));
243        match self.client.oneshot(&msg).await? {
244            ClientResp::ViewDimensionsResp(resp) => Ok(resp),
245            resp => Err(resp.into()),
246        }
247    }
248
249    /// The expression schema of this [`View`], which contains only the
250    /// expressions created on this [`View`]. See [`View::schema`] for
251    /// details.
252    pub async fn expression_schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
253        let msg = self.client_message(ClientReq::ViewExpressionSchemaReq(
254            ViewExpressionSchemaReq {},
255        ));
256        match self.client.oneshot(&msg).await? {
257            ClientResp::ViewExpressionSchemaResp(ViewExpressionSchemaResp { schema }) => Ok(schema
258                .into_iter()
259                .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
260                .collect()),
261            resp => Err(resp.into()),
262        }
263    }
264
265    /// A copy of the config object passed to the [`Table::view`] method which
266    /// created this [`View`].
267    pub async fn get_config(&self) -> ClientResult<crate::config::ViewConfig> {
268        let msg = self.client_message(ClientReq::ViewGetConfigReq(ViewGetConfigReq {}));
269        match self.client.oneshot(&msg).await? {
270            ClientResp::ViewGetConfigResp(ViewGetConfigResp {
271                config: Some(config),
272            }) => Ok(config.into()),
273            resp => Err(resp.into()),
274        }
275    }
276
277    /// The number of aggregated rows in this [`View`]. This is affected by the
278    /// "group_by" configuration parameter supplied to this view's contructor.
279    ///
280    /// # Returns
281    ///
282    /// The number of aggregated rows.
283    pub async fn num_rows(&self) -> ClientResult<u32> {
284        Ok(self.dimensions().await?.num_view_rows)
285    }
286
287    /// The schema of this [`View`].
288    ///
289    /// The [`View`] schema differs from the `schema` returned by
290    /// [`Table::schema`]; it may have different column names due to
291    /// `expressions` or `columns` configs, or it maye have _different
292    /// column types_ due to the application og `group_by` and `aggregates`
293    /// config. You can think of [`Table::schema`] as the _input_ schema and
294    /// [`View::schema`] as the _output_ schema of a Perspective pipeline.
295    pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
296        let msg = self.client_message(ClientReq::ViewSchemaReq(ViewSchemaReq {}));
297        match self.client.oneshot(&msg).await? {
298            ClientResp::ViewSchemaResp(ViewSchemaResp { schema }) => Ok(schema
299                .into_iter()
300                .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
301                .collect()),
302            resp => Err(resp.into()),
303        }
304    }
305
306    /// Serializes a [`View`] to the Apache Arrow data format.
307    pub async fn to_arrow(&self, window: ViewWindow) -> ClientResult<Bytes> {
308        let msg = self.client_message(ClientReq::ViewToArrowReq(ViewToArrowReq {
309            viewport: Some(window.clone().into()),
310            compression: window.compression,
311        }));
312
313        match self.client.oneshot(&msg).await? {
314            ClientResp::ViewToArrowResp(ViewToArrowResp { arrow }) => Ok(arrow.into()),
315            resp => Err(resp.into()),
316        }
317    }
318
319    /// Serializes this [`View`] to a string of JSON data. Useful if you want to
320    /// save additional round trip serialize/deserialize cycles.    
321    pub async fn to_columns_string(&self, window: ViewWindow) -> ClientResult<String> {
322        let msg = self.client_message(ClientReq::ViewToColumnsStringReq(ViewToColumnsStringReq {
323            viewport: Some(window.clone().into()),
324            id: window.id,
325            index: window.index,
326            formatted: window.formatted,
327            leaves_only: window.leaves_only,
328        }));
329
330        match self.client.oneshot(&msg).await? {
331            ClientResp::ViewToColumnsStringResp(ViewToColumnsStringResp { json_string }) => {
332                Ok(json_string)
333            },
334            resp => Err(resp.into()),
335        }
336    }
337
338    /// Render this `View` as a JSON string.
339    pub async fn to_json_string(&self, window: ViewWindow) -> ClientResult<String> {
340        let viewport = ViewPort {
341            start_row: window.start_row.map(|x| x.floor() as u32),
342            start_col: window.start_col.map(|x| x.floor() as u32),
343            end_row: window.end_row.map(|x| x.ceil() as u32),
344            end_col: window.end_col.map(|x| x.ceil() as u32),
345        };
346
347        let msg = self.client_message(ClientReq::ViewToRowsStringReq(ViewToRowsStringReq {
348            viewport: Some(viewport),
349            id: window.id,
350            index: window.index,
351            formatted: window.formatted,
352            leaves_only: window.leaves_only,
353        }));
354
355        match self.client.oneshot(&msg).await? {
356            ClientResp::ViewToRowsStringResp(ViewToRowsStringResp { json_string }) => {
357                Ok(json_string)
358            },
359            resp => Err(resp.into()),
360        }
361    }
362
363    /// Renders this [`View`] as an [NDJSON](https://github.com/ndjson/ndjson-spec)
364    /// formatted [`String`].
365    pub async fn to_ndjson(&self, window: ViewWindow) -> ClientResult<String> {
366        let viewport = ViewPort {
367            start_row: window.start_row.map(|x| x.floor() as u32),
368            start_col: window.start_col.map(|x| x.floor() as u32),
369            end_row: window.end_row.map(|x| x.ceil() as u32),
370            end_col: window.end_col.map(|x| x.ceil() as u32),
371        };
372
373        let msg = self.client_message(ClientReq::ViewToNdjsonStringReq(ViewToNdjsonStringReq {
374            viewport: Some(viewport),
375            id: window.id,
376            index: window.index,
377            formatted: window.formatted,
378            leaves_only: window.leaves_only,
379        }));
380
381        match self.client.oneshot(&msg).await? {
382            ClientResp::ViewToNdjsonStringResp(ViewToNdjsonStringResp { ndjson_string }) => {
383                Ok(ndjson_string)
384            },
385            resp => Err(resp.into()),
386        }
387    }
388
389    /// Serializes this [`View`] to CSV data in a standard format.
390    pub async fn to_csv(&self, window: ViewWindow) -> ClientResult<String> {
391        let msg = self.client_message(ClientReq::ViewToCsvReq(ViewToCsvReq {
392            viewport: Some(window.into()),
393        }));
394
395        match self.client.oneshot(&msg).await? {
396            ClientResp::ViewToCsvResp(ViewToCsvResp { csv }) => Ok(csv),
397            resp => Err(resp.into()),
398        }
399    }
400
401    /// Delete this [`View`] and clean up all resources associated with it.
402    /// [`View`] objects do not stop consuming resources or processing
403    /// updates when they are garbage collected - you must call this method
404    /// to reclaim these.
405    pub async fn delete(&self) -> ClientResult<()> {
406        let msg = self.client_message(ClientReq::ViewDeleteReq(ViewDeleteReq {}));
407        match self.client.oneshot(&msg).await? {
408            ClientResp::ViewDeleteResp(_) => Ok(()),
409            resp => Err(resp.into()),
410        }
411    }
412
413    /// Calculates the [min, max] of the leaf nodes of a column `column_name`.
414    ///
415    /// # Returns
416    ///
417    /// A tuple of [min, max], whose types are column and aggregate dependent.
418    pub async fn get_min_max(&self, column_name: String) -> ClientResult<(String, String)> {
419        let msg = self.client_message(ClientReq::ViewGetMinMaxReq(ViewGetMinMaxReq {
420            column_name,
421        }));
422
423        match self.client.oneshot(&msg).await? {
424            ClientResp::ViewGetMinMaxResp(ViewGetMinMaxResp { min, max }) => Ok((min, max)),
425            resp => Err(resp.into()),
426        }
427    }
428
429    /// Register a callback with this [`View`]. Whenever the view's underlying
430    /// table emits an update, this callback will be invoked with an object
431    /// containing `port_id`, indicating which port the update fired on, and
432    /// optionally `delta`, which is the new data that was updated for each
433    /// cell or each row.
434    ///
435    /// # Arguments
436    ///
437    /// - `on_update` - A callback function invoked on update, which receives an
438    ///   object with two keys: `port_id`, indicating which port the update was
439    ///   triggered on, and `delta`, whose value is dependent on the mode
440    ///   parameter.
441    /// - `options` - If this is provided as `OnUpdateOptions { mode:
442    ///   Some(OnUpdateMode::Row) }`, then `delta` is an Arrow of the updated
443    ///   rows. Otherwise `delta` will be [`Option::None`].
444    pub async fn on_update<T, U>(&self, on_update: T, options: OnUpdateOptions) -> ClientResult<u32>
445    where
446        T: Fn(ViewOnUpdateResp) -> U + Send + Sync + 'static,
447        U: Future<Output = ()> + Send + 'static,
448    {
449        let on_update = Arc::new(on_update);
450        let callback = move |resp: Response| {
451            let on_update = on_update.clone();
452            async move {
453                match resp.client_resp {
454                    Some(ClientResp::ViewOnUpdateResp(resp)) => {
455                        on_update(resp).await;
456                        Ok(())
457                    },
458                    resp => Err(resp.into()),
459                }
460            }
461        };
462
463        let msg = self.client_message(ClientReq::ViewOnUpdateReq(ViewOnUpdateReq {
464            mode: options.mode.map(|OnUpdateMode::Row| Mode::Row as i32),
465        }));
466
467        self.client.subscribe(&msg, callback).await?;
468        Ok(msg.msg_id)
469    }
470
471    /// Unregister a previously registered update callback with this [`View`].
472    ///
473    /// # Arguments
474    ///
475    /// - `id` - A callback `id` as returned by a recipricol call to
476    ///   [`View::on_update`].
477    ///
478    /// # Examples
479    ///
480    /// ```rust
481    /// let callback = |_| async { print!("Updated!") };
482    /// let cid = view.on_update(callback, OnUpdateOptions::default()).await?;
483    /// view.remove_update(cid).await?;
484    /// ```
485    pub async fn remove_update(&self, update_id: u32) -> ClientResult<()> {
486        let msg = self.client_message(ClientReq::ViewRemoveOnUpdateReq(ViewRemoveOnUpdateReq {
487            id: update_id,
488        }));
489
490        self.client.unsubscribe(update_id).await?;
491        match self.client.oneshot(&msg).await? {
492            ClientResp::ViewRemoveOnUpdateResp(_) => Ok(()),
493            resp => Err(resp.into()),
494        }
495    }
496
497    /// Register a callback with this [`View`]. Whenever the [`View`] is
498    /// deleted, this callback will be invoked.
499    pub async fn on_delete(
500        &self,
501        on_delete: Box<dyn Fn() + Send + Sync + 'static>,
502    ) -> ClientResult<u32> {
503        let callback = move |resp: Response| match resp.client_resp.unwrap() {
504            ClientResp::ViewOnDeleteResp(_) => {
505                on_delete();
506                Ok(())
507            },
508            resp => Err(resp.into()),
509        };
510
511        let msg = self.client_message(ClientReq::ViewOnDeleteReq(ViewOnDeleteReq {}));
512        self.client.subscribe_once(&msg, Box::new(callback)).await?;
513        Ok(msg.msg_id)
514    }
515
516    /// Unregister a previously registered [`View::on_delete`] callback.
517    pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
518        let msg = self.client_message(ClientReq::ViewRemoveDeleteReq(ViewRemoveDeleteReq {
519            id: callback_id,
520        }));
521
522        match self.client.oneshot(&msg).await? {
523            ClientResp::ViewRemoveDeleteResp(ViewRemoveDeleteResp {}) => Ok(()),
524            resp => Err(resp.into()),
525        }
526    }
527
528    /// Collapses the `group_by` row at `row_index`.
529    pub async fn collapse(&self, row_index: u32) -> ClientResult<u32> {
530        let msg = self.client_message(ClientReq::ViewCollapseReq(ViewCollapseReq { row_index }));
531        match self.client.oneshot(&msg).await? {
532            ClientResp::ViewCollapseResp(ViewCollapseResp { num_changed }) => Ok(num_changed),
533            resp => Err(resp.into()),
534        }
535    }
536
537    /// Expand the `group_by` row at `row_index`.
538    pub async fn expand(&self, row_index: u32) -> ClientResult<u32> {
539        let msg = self.client_message(ClientReq::ViewExpandReq(ViewExpandReq { row_index }));
540        match self.client.oneshot(&msg).await? {
541            ClientResp::ViewExpandResp(ViewExpandResp { num_changed }) => Ok(num_changed),
542            resp => Err(resp.into()),
543        }
544    }
545
546    /// Set expansion `depth` of the `group_by` tree.
547    pub async fn set_depth(&self, depth: u32) -> ClientResult<()> {
548        let msg = self.client_message(ClientReq::ViewSetDepthReq(ViewSetDepthReq { depth }));
549        match self.client.oneshot(&msg).await? {
550            ClientResp::ViewSetDepthResp(_) => Ok(()),
551            resp => Err(resp.into()),
552        }
553    }
554}