perspective_client/view.rs
1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
5// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors. ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use futures::Future;
19use prost::bytes::Bytes;
20use serde::{Deserialize, Serialize};
21use ts_rs::TS;
22
23use self::view_on_update_req::Mode;
24use crate::assert_view_api;
25use crate::client::Client;
26use crate::proto::request::ClientReq;
27use crate::proto::response::ClientResp;
28use crate::proto::*;
29#[cfg(doc)]
30use crate::table::Table;
31pub use crate::utils::*;
32
33/// Options for [`View::on_update`].
34#[derive(Default, Debug, Deserialize, TS)]
35pub struct OnUpdateOptions {
36 pub mode: Option<OnUpdateMode>,
37}
38
39/// The update mode for [`View::on_update`].
40///
41/// `Row` mode calculates and provides the update batch new rows/columns as an
42/// Apache Arrow to the callback provided to [`View::on_update`]. This allows
43/// incremental updates if your callbakc can read this format, but should be
44/// disabled otherwise.
45#[derive(Default, Debug, Deserialize, TS)]
46pub enum OnUpdateMode {
47 #[default]
48 #[serde(rename = "row")]
49 Row,
50}
51
52impl FromStr for OnUpdateMode {
53 type Err = ClientError;
54
55 fn from_str(s: &str) -> Result<Self, Self::Err> {
56 if s == "row" {
57 Ok(OnUpdateMode::Row)
58 } else {
59 Err(ClientError::Option)
60 }
61 }
62}
63
64#[derive(Clone, Debug, Serialize)]
65pub struct Dimensions {
66 pub num_view_rows: usize,
67 pub num_view_columns: usize,
68 pub num_table_rows: usize,
69 pub num_table_columns: usize,
70}
71
72/// Options for serializing a window of data from a [`View`].
73///
74/// Some fields of [`ViewWindow`] are only applicable to specific methods of
75/// [`View`].
76#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
77pub struct ViewWindow {
78 #[serde(skip_serializing_if = "Option::is_none")]
79 pub start_row: Option<f32>,
80
81 #[serde(skip_serializing_if = "Option::is_none")]
82 pub start_col: Option<f32>,
83
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub end_row: Option<f32>,
86
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub end_col: Option<f32>,
89
90 #[serde(skip_serializing_if = "Option::is_none")]
91 pub id: Option<bool>,
92
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub index: Option<bool>,
95
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub leaves_only: Option<bool>,
98
99 /// Only impacts [`View::to_csv`]
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub formatted: Option<bool>,
102
103 /// Only impacts [`View::to_arrow`]
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub compression: Option<String>,
106}
107
108impl From<ViewWindow> for ViewPort {
109 fn from(window: ViewWindow) -> Self {
110 ViewPort {
111 start_row: window.start_row.map(|x| x.floor() as u32),
112 start_col: window.start_col.map(|x| x.floor() as u32),
113 end_row: window.end_row.map(|x| x.ceil() as u32),
114 end_col: window.end_col.map(|x| x.ceil() as u32),
115 }
116 }
117}
118
119/// Rows updated and port ID corresponding to an update batch, provided to the
120/// callback argument to [`View::on_update`] with the "rows" mode.
121#[derive(TS)]
122pub struct OnUpdateData(crate::proto::ViewOnUpdateResp);
123
124impl Deref for OnUpdateData {
125 type Target = crate::proto::ViewOnUpdateResp;
126
127 fn deref(&self) -> &Self::Target {
128 &self.0
129 }
130}
131
132/// The [`View`] struct is Perspective's query and serialization interface. It
133/// represents a query on the `Table`'s dataset and is always created from an
134/// existing `Table` instance via the [`Table::view`] method.
135///
136/// [`View`]s are immutable with respect to the arguments provided to the
137/// [`Table::view`] method; to change these parameters, you must create a new
138/// [`View`] on the same [`Table`]. However, each [`View`] is _live_ with
139/// respect to the [`Table`]'s data, and will (within a conflation window)
140/// update with the latest state as its parent [`Table`] updates, including
141/// incrementally recalculating all aggregates, pivots, filters, etc. [`View`]
142/// query parameters are composable, in that each parameter works independently
143/// _and_ in conjunction with each other, and there is no limit to the number of
144/// pivots, filters, etc. which can be applied.
145///
146/// To construct a [`View`], call the [`Table::view`] factory method. A
147/// [`Table`] can have as many [`View`]s associated with it as you need -
148/// Perspective conserves memory by relying on a single [`Table`] to power
149/// multiple [`View`]s concurrently.
150///
151/// # Examples
152///
153/// ```rust
154/// let opts = TableInitOptions::default();
155/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
156/// let table = client.table(data, opts).await?;
157///
158/// let view = table.view(None).await?;
159/// let arrow = view.to_arrow().await?;
160/// view.delete().await?;
161/// ```
162///
163/// ```rust
164/// use crate::config::*;
165/// let view = table
166/// .view(Some(ViewConfigUpdate {
167/// columns: Some(vec![Some("Sales".into())]),
168/// aggregates: Some(HashMap::from_iter(vec![("Sales".into(), "sum".into())])),
169/// group_by: Some(vec!["Region".into(), "Country".into()]),
170/// filter: Some(vec![Filter::new("Category", "in", &[
171/// "Furniture",
172/// "Technology",
173/// ])]),
174/// ..ViewConfigUpdate::default()
175/// }))
176/// .await?;
177/// ```
178///
179/// Group By
180///
181/// ```rust
182/// let view = table
183/// .view(Some(ViewConfigUpdate {
184/// group_by: Some(vec!["a".into(), "c".into()]),
185/// ..ViewConfigUpdate::default()
186/// }))
187/// .await?;
188/// ```
189///
190/// Split By
191///
192/// ```rust
193/// let view = table
194/// .view(Some(ViewConfigUpdate {
195/// split_by: Some(vec!["a".into(), "c".into()]),
196/// ..ViewConfigUpdate::default()
197/// }))
198/// .await?;
199/// ```
200///
201/// In Javascript, a [`Table`] can be constructed on a [`Table::view`] instance,
202/// which will return a new [`Table`] based on the [`Table::view`]'s dataset,
203/// and all future updates that affect the [`Table::view`] will be forwarded to
204/// the new [`Table`]. This is particularly useful for implementing a
205/// [Client/Server Replicated](server.md#clientserver-replicated) design, by
206/// serializing the `View` to an arrow and setting up an `on_update` callback.
207///
208/// ```rust
209/// let opts = TableInitOptions::default();
210/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
211/// let table = client.table(data, opts).await?;
212/// let view = table.view(None).await?;
213/// let table2 = client.table(TableData::View(view)).await?;
214/// table.update(data).await?;
215/// ```
216#[derive(Clone, Debug)]
217pub struct View {
218 pub name: String,
219 client: Client,
220}
221
222assert_view_api!(View);
223
224impl View {
225 pub fn new(name: String, client: Client) -> Self {
226 View { name, client }
227 }
228
229 fn client_message(&self, req: ClientReq) -> Request {
230 crate::proto::Request {
231 msg_id: self.client.gen_id(),
232 entity_id: self.name.clone(),
233 client_req: Some(req),
234 }
235 }
236
237 /// Returns an array of strings containing the column paths of the [`View`]
238 /// without any of the source columns.
239 ///
240 /// A column path shows the columns that a given cell belongs to after
241 /// pivots are applied.
242 pub async fn column_paths(&self) -> ClientResult<Vec<String>> {
243 let msg = self.client_message(ClientReq::ViewColumnPathsReq(ViewColumnPathsReq {}));
244 match self.client.oneshot(&msg).await? {
245 ClientResp::ViewColumnPathsResp(ViewColumnPathsResp { paths }) => {
246 // Ok(paths.into_iter().map(|x| x.path).collect())
247 Ok(paths)
248 },
249 resp => Err(resp.into()),
250 }
251 }
252
253 /// Returns this [`View`]'s _dimensions_, row and column count, as well as
254 /// those of the [`crate::Table`] from which it was derived.
255 ///
256 /// - `num_table_rows` - The number of rows in the underlying
257 /// [`crate::Table`].
258 /// - `num_table_columns` - The number of columns in the underlying
259 /// [`crate::Table`] (including the `index` column if this
260 /// [`crate::Table`] was constructed with one).
261 /// - `num_view_rows` - The number of rows in this [`View`]. If this
262 /// [`View`] has a `group_by` clause, `num_view_rows` will also include
263 /// aggregated rows.
264 /// - `num_view_columns` - The number of columns in this [`View`]. If this
265 /// [`View`] has a `split_by` clause, `num_view_columns` will include all
266 /// _column paths_, e.g. the number of `columns` clause times the number
267 /// of `split_by` groups.
268 pub async fn dimensions(&self) -> ClientResult<ViewDimensionsResp> {
269 let msg = self.client_message(ClientReq::ViewDimensionsReq(ViewDimensionsReq {}));
270 match self.client.oneshot(&msg).await? {
271 ClientResp::ViewDimensionsResp(resp) => Ok(resp),
272 resp => Err(resp.into()),
273 }
274 }
275
276 /// The expression schema of this [`View`], which contains only the
277 /// expressions created on this [`View`]. See [`View::schema`] for
278 /// details.
279 pub async fn expression_schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
280 let msg = self.client_message(ClientReq::ViewExpressionSchemaReq(
281 ViewExpressionSchemaReq {},
282 ));
283 match self.client.oneshot(&msg).await? {
284 ClientResp::ViewExpressionSchemaResp(ViewExpressionSchemaResp { schema }) => Ok(schema
285 .into_iter()
286 .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
287 .collect()),
288 resp => Err(resp.into()),
289 }
290 }
291
292 /// A copy of the [`ViewConfig`] object passed to the [`Table::view`] method
293 /// which created this [`View`].
294 pub async fn get_config(&self) -> ClientResult<crate::config::ViewConfig> {
295 let msg = self.client_message(ClientReq::ViewGetConfigReq(ViewGetConfigReq {}));
296 match self.client.oneshot(&msg).await? {
297 ClientResp::ViewGetConfigResp(ViewGetConfigResp {
298 config: Some(config),
299 }) => Ok(config.into()),
300 resp => Err(resp.into()),
301 }
302 }
303
304 /// The number of aggregated rows in this [`View`]. This is affected by the
305 /// "group_by" configuration parameter supplied to this view's contructor.
306 ///
307 /// # Returns
308 ///
309 /// The number of aggregated rows.
310 pub async fn num_rows(&self) -> ClientResult<u32> {
311 Ok(self.dimensions().await?.num_view_rows)
312 }
313
314 /// The schema of this [`View`].
315 ///
316 /// The [`View`] schema differs from the `schema` returned by
317 /// [`Table::schema`]; it may have different column names due to
318 /// `expressions` or `columns` configs, or it maye have _different
319 /// column types_ due to the application og `group_by` and `aggregates`
320 /// config. You can think of [`Table::schema`] as the _input_ schema and
321 /// [`View::schema`] as the _output_ schema of a Perspective pipeline.
322 pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
323 let msg = self.client_message(ClientReq::ViewSchemaReq(ViewSchemaReq {}));
324 match self.client.oneshot(&msg).await? {
325 ClientResp::ViewSchemaResp(ViewSchemaResp { schema }) => Ok(schema
326 .into_iter()
327 .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
328 .collect()),
329 resp => Err(resp.into()),
330 }
331 }
332
333 /// Serializes a [`View`] to the Apache Arrow data format.
334 pub async fn to_arrow(&self, window: ViewWindow) -> ClientResult<Bytes> {
335 let msg = self.client_message(ClientReq::ViewToArrowReq(ViewToArrowReq {
336 viewport: Some(window.clone().into()),
337 compression: window.compression,
338 }));
339
340 match self.client.oneshot(&msg).await? {
341 ClientResp::ViewToArrowResp(ViewToArrowResp { arrow }) => Ok(arrow.into()),
342 resp => Err(resp.into()),
343 }
344 }
345
346 /// Serializes this [`View`] to a string of JSON data. Useful if you want to
347 /// save additional round trip serialize/deserialize cycles.
348 pub async fn to_columns_string(&self, window: ViewWindow) -> ClientResult<String> {
349 let msg = self.client_message(ClientReq::ViewToColumnsStringReq(ViewToColumnsStringReq {
350 viewport: Some(window.clone().into()),
351 id: window.id,
352 index: window.index,
353 formatted: window.formatted,
354 leaves_only: window.leaves_only,
355 }));
356
357 match self.client.oneshot(&msg).await? {
358 ClientResp::ViewToColumnsStringResp(ViewToColumnsStringResp { json_string }) => {
359 Ok(json_string)
360 },
361 resp => Err(resp.into()),
362 }
363 }
364
365 /// Render this `View` as a JSON string.
366 pub async fn to_json_string(&self, window: ViewWindow) -> ClientResult<String> {
367 let viewport = ViewPort {
368 start_row: window.start_row.map(|x| x.floor() as u32),
369 start_col: window.start_col.map(|x| x.floor() as u32),
370 end_row: window.end_row.map(|x| x.ceil() as u32),
371 end_col: window.end_col.map(|x| x.ceil() as u32),
372 };
373
374 let msg = self.client_message(ClientReq::ViewToRowsStringReq(ViewToRowsStringReq {
375 viewport: Some(viewport),
376 id: window.id,
377 index: window.index,
378 formatted: window.formatted,
379 leaves_only: window.leaves_only,
380 }));
381
382 match self.client.oneshot(&msg).await? {
383 ClientResp::ViewToRowsStringResp(ViewToRowsStringResp { json_string }) => {
384 Ok(json_string)
385 },
386 resp => Err(resp.into()),
387 }
388 }
389
390 /// Renders this [`View`] as an [NDJSON](https://github.com/ndjson/ndjson-spec)
391 /// formatted [`String`].
392 pub async fn to_ndjson(&self, window: ViewWindow) -> ClientResult<String> {
393 let viewport = ViewPort {
394 start_row: window.start_row.map(|x| x.floor() as u32),
395 start_col: window.start_col.map(|x| x.floor() as u32),
396 end_row: window.end_row.map(|x| x.ceil() as u32),
397 end_col: window.end_col.map(|x| x.ceil() as u32),
398 };
399
400 let msg = self.client_message(ClientReq::ViewToNdjsonStringReq(ViewToNdjsonStringReq {
401 viewport: Some(viewport),
402 id: window.id,
403 index: window.index,
404 formatted: window.formatted,
405 leaves_only: window.leaves_only,
406 }));
407
408 match self.client.oneshot(&msg).await? {
409 ClientResp::ViewToNdjsonStringResp(ViewToNdjsonStringResp { ndjson_string }) => {
410 Ok(ndjson_string)
411 },
412 resp => Err(resp.into()),
413 }
414 }
415
416 /// Serializes this [`View`] to CSV data in a standard format.
417 pub async fn to_csv(&self, window: ViewWindow) -> ClientResult<String> {
418 let msg = self.client_message(ClientReq::ViewToCsvReq(ViewToCsvReq {
419 viewport: Some(window.into()),
420 }));
421
422 match self.client.oneshot(&msg).await? {
423 ClientResp::ViewToCsvResp(ViewToCsvResp { csv }) => Ok(csv),
424 resp => Err(resp.into()),
425 }
426 }
427
428 /// Delete this [`View`] and clean up all resources associated with it.
429 /// [`View`] objects do not stop consuming resources or processing
430 /// updates when they are garbage collected - you must call this method
431 /// to reclaim these.
432 pub async fn delete(&self) -> ClientResult<()> {
433 let msg = self.client_message(ClientReq::ViewDeleteReq(ViewDeleteReq {}));
434 match self.client.oneshot(&msg).await? {
435 ClientResp::ViewDeleteResp(_) => Ok(()),
436 resp => Err(resp.into()),
437 }
438 }
439
440 /// Calculates the [min, max] of the leaf nodes of a column `column_name`.
441 ///
442 /// # Returns
443 ///
444 /// A tuple of [min, max], whose types are column and aggregate dependent.
445 pub async fn get_min_max(&self, column_name: String) -> ClientResult<(String, String)> {
446 let msg = self.client_message(ClientReq::ViewGetMinMaxReq(ViewGetMinMaxReq {
447 column_name,
448 }));
449
450 match self.client.oneshot(&msg).await? {
451 ClientResp::ViewGetMinMaxResp(ViewGetMinMaxResp { min, max }) => Ok((min, max)),
452 resp => Err(resp.into()),
453 }
454 }
455
456 /// Register a callback with this [`View`]. Whenever the view's underlying
457 /// table emits an update, this callback will be invoked with an object
458 /// containing `port_id`, indicating which port the update fired on, and
459 /// optionally `delta`, which is the new data that was updated for each
460 /// cell or each row.
461 ///
462 /// # Arguments
463 ///
464 /// - `on_update` - A callback function invoked on update, which receives an
465 /// object with two keys: `port_id`, indicating which port the update was
466 /// triggered on, and `delta`, whose value is dependent on the mode
467 /// parameter.
468 /// - `options` - If this is provided as `OnUpdateOptions { mode:
469 /// Some(OnUpdateMode::Row) }`, then `delta` is an Arrow of the updated
470 /// rows. Otherwise `delta` will be [`Option::None`].
471 pub async fn on_update<T, U>(&self, on_update: T, options: OnUpdateOptions) -> ClientResult<u32>
472 where
473 T: Fn(OnUpdateData) -> U + Send + Sync + 'static,
474 U: Future<Output = ()> + Send + 'static,
475 {
476 let on_update = Arc::new(on_update);
477 let callback = move |resp: Response| {
478 let on_update = on_update.clone();
479 async move {
480 match resp.client_resp {
481 Some(ClientResp::ViewOnUpdateResp(resp)) => {
482 on_update(OnUpdateData(resp)).await;
483 Ok(())
484 },
485 resp => Err(resp.into()),
486 }
487 }
488 };
489
490 let msg = self.client_message(ClientReq::ViewOnUpdateReq(ViewOnUpdateReq {
491 mode: options.mode.map(|OnUpdateMode::Row| Mode::Row as i32),
492 }));
493
494 self.client.subscribe(&msg, callback).await?;
495 Ok(msg.msg_id)
496 }
497
498 /// Unregister a previously registered update callback with this [`View`].
499 ///
500 /// # Arguments
501 ///
502 /// - `id` - A callback `id` as returned by a recipricol call to
503 /// [`View::on_update`].
504 ///
505 /// # Examples
506 ///
507 /// ```rust
508 /// let callback = |_| async { print!("Updated!") };
509 /// let cid = view.on_update(callback, OnUpdateOptions::default()).await?;
510 /// view.remove_update(cid).await?;
511 /// ```
512 pub async fn remove_update(&self, update_id: u32) -> ClientResult<()> {
513 let msg = self.client_message(ClientReq::ViewRemoveOnUpdateReq(ViewRemoveOnUpdateReq {
514 id: update_id,
515 }));
516
517 self.client.unsubscribe(update_id).await?;
518 match self.client.oneshot(&msg).await? {
519 ClientResp::ViewRemoveOnUpdateResp(_) => Ok(()),
520 resp => Err(resp.into()),
521 }
522 }
523
524 /// Register a callback with this [`View`]. Whenever the [`View`] is
525 /// deleted, this callback will be invoked.
526 pub async fn on_delete(
527 &self,
528 on_delete: Box<dyn Fn() + Send + Sync + 'static>,
529 ) -> ClientResult<u32> {
530 let callback = move |resp: Response| match resp.client_resp.unwrap() {
531 ClientResp::ViewOnDeleteResp(_) => {
532 on_delete();
533 Ok(())
534 },
535 resp => Err(resp.into()),
536 };
537
538 let msg = self.client_message(ClientReq::ViewOnDeleteReq(ViewOnDeleteReq {}));
539 self.client.subscribe_once(&msg, Box::new(callback)).await?;
540 Ok(msg.msg_id)
541 }
542
543 /// Unregister a previously registered [`View::on_delete`] callback.
544 pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
545 let msg = self.client_message(ClientReq::ViewRemoveDeleteReq(ViewRemoveDeleteReq {
546 id: callback_id,
547 }));
548
549 match self.client.oneshot(&msg).await? {
550 ClientResp::ViewRemoveDeleteResp(ViewRemoveDeleteResp {}) => Ok(()),
551 resp => Err(resp.into()),
552 }
553 }
554
555 /// Collapses the `group_by` row at `row_index`.
556 pub async fn collapse(&self, row_index: u32) -> ClientResult<u32> {
557 let msg = self.client_message(ClientReq::ViewCollapseReq(ViewCollapseReq { row_index }));
558 match self.client.oneshot(&msg).await? {
559 ClientResp::ViewCollapseResp(ViewCollapseResp { num_changed }) => Ok(num_changed),
560 resp => Err(resp.into()),
561 }
562 }
563
564 /// Expand the `group_by` row at `row_index`.
565 pub async fn expand(&self, row_index: u32) -> ClientResult<u32> {
566 let msg = self.client_message(ClientReq::ViewExpandReq(ViewExpandReq { row_index }));
567 match self.client.oneshot(&msg).await? {
568 ClientResp::ViewExpandResp(ViewExpandResp { num_changed }) => Ok(num_changed),
569 resp => Err(resp.into()),
570 }
571 }
572
573 /// Set expansion `depth` of the `group_by` tree.
574 pub async fn set_depth(&self, depth: u32) -> ClientResult<()> {
575 let msg = self.client_message(ClientReq::ViewSetDepthReq(ViewSetDepthReq { depth }));
576 match self.client.oneshot(&msg).await? {
577 ClientResp::ViewSetDepthResp(_) => Ok(()),
578 resp => Err(resp.into()),
579 }
580 }
581}