perspective_client/view.rs
1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
5// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors. ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use futures::Future;
19use prost::bytes::Bytes;
20use serde::{Deserialize, Serialize};
21use ts_rs::TS;
22
23use self::view_on_update_req::Mode;
24use crate::assert_view_api;
25use crate::client::Client;
26use crate::proto::request::ClientReq;
27use crate::proto::response::ClientResp;
28use crate::proto::*;
29#[cfg(doc)]
30use crate::table::Table;
31pub use crate::utils::*;
32
33/// Options for [`View::on_update`].
34#[derive(Default, Debug, Deserialize, TS)]
35pub struct OnUpdateOptions {
36 pub mode: Option<OnUpdateMode>,
37}
38
39/// The update mode for [`View::on_update`].
40///
41/// `Row` mode calculates and provides the update batch new rows/columns as an
42/// Apache Arrow to the callback provided to [`View::on_update`]. This allows
43/// incremental updates if your callbakc can read this format, but should be
44/// disabled otherwise.
45#[derive(Default, Debug, Deserialize, TS)]
46pub enum OnUpdateMode {
47 #[default]
48 #[serde(rename = "row")]
49 Row,
50}
51
52impl FromStr for OnUpdateMode {
53 type Err = ClientError;
54
55 fn from_str(s: &str) -> Result<Self, Self::Err> {
56 if s == "row" {
57 Ok(OnUpdateMode::Row)
58 } else {
59 Err(ClientError::Option)
60 }
61 }
62}
63
64#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
65pub struct ColumnWindow {
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub start_col: Option<f32>,
68
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub end_col: Option<f32>,
71}
72
73/// Options for serializing a window of data from a [`View`].
74///
75/// Some fields of [`ViewWindow`] are only applicable to specific methods of
76/// [`View`].
77#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
78pub struct ViewWindow {
79 #[ts(optional)]
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub start_row: Option<f64>,
82
83 #[ts(optional)]
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub start_col: Option<f64>,
86
87 #[ts(optional)]
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub end_row: Option<f64>,
90
91 #[ts(optional)]
92 #[serde(skip_serializing_if = "Option::is_none")]
93 pub end_col: Option<f64>,
94
95 #[ts(optional)]
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub id: Option<bool>,
98
99 #[ts(optional)]
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub index: Option<bool>,
102
103 /// Only impacts [`View::to_csv`]
104 #[ts(optional)]
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub formatted: Option<bool>,
107
108 /// Only impacts [`View::to_arrow`]
109 #[ts(optional)]
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub compression: Option<String>,
112
113 /// When `true`, group-by columns use legacy `"colname (Group by N)"`
114 /// naming. When `false`, they use `__ROW_PATH_N__` naming consistent
115 /// with the SQL backend. Defaults to `true` for backwards compatibility.
116 #[ts(optional)]
117 #[serde(skip_serializing_if = "Option::is_none")]
118 pub emit_legacy_row_path_names: Option<bool>,
119}
120
121impl From<ViewWindow> for ViewPort {
122 fn from(window: ViewWindow) -> Self {
123 ViewPort {
124 start_row: window.start_row.map(|x| x.floor() as u32),
125 start_col: window.start_col.map(|x| x.floor() as u32),
126 end_row: window.end_row.map(|x| x.ceil() as u32),
127 end_col: window.end_col.map(|x| x.ceil() as u32),
128 emit_legacy_row_path_names: window.emit_legacy_row_path_names,
129 }
130 }
131}
132
133impl From<ViewPort> for ViewWindow {
134 fn from(window: ViewPort) -> Self {
135 ViewWindow {
136 start_row: window.start_row.map(|x| x as f64),
137 start_col: window.start_col.map(|x| x as f64),
138 end_row: window.end_row.map(|x| x as f64),
139 end_col: window.end_col.map(|x| x as f64),
140 emit_legacy_row_path_names: window.emit_legacy_row_path_names,
141 ..ViewWindow::default()
142 }
143 }
144}
145
146/// Rows updated and port ID corresponding to an update batch, provided to the
147/// callback argument to [`View::on_update`] with the "rows" mode.
148#[derive(TS)]
149pub struct OnUpdateData(crate::proto::ViewOnUpdateResp);
150
151impl Deref for OnUpdateData {
152 type Target = crate::proto::ViewOnUpdateResp;
153
154 fn deref(&self) -> &Self::Target {
155 &self.0
156 }
157}
158
159/// The [`View`] struct is Perspective's query and serialization interface. It
160/// represents a query on the `Table`'s dataset and is always created from an
161/// existing `Table` instance via the [`Table::view`] method.
162///
163/// [`View`]s are immutable with respect to the arguments provided to the
164/// [`Table::view`] method; to change these parameters, you must create a new
165/// [`View`] on the same [`Table`]. However, each [`View`] is _live_ with
166/// respect to the [`Table`]'s data, and will (within a conflation window)
167/// update with the latest state as its parent [`Table`] updates, including
168/// incrementally recalculating all aggregates, pivots, filters, etc. [`View`]
169/// query parameters are composable, in that each parameter works independently
170/// _and_ in conjunction with each other, and there is no limit to the number of
171/// pivots, filters, etc. which can be applied.
172///
173/// To construct a [`View`], call the [`Table::view`] factory method. A
174/// [`Table`] can have as many [`View`]s associated with it as you need -
175/// Perspective conserves memory by relying on a single [`Table`] to power
176/// multiple [`View`]s concurrently.
177///
178/// # Examples
179///
180/// ```no_run
181/// # use perspective_client::{Client, TableData, TableInitOptions, UpdateData, ViewWindow};
182/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
183/// # let client: Client = todo!();
184/// let opts = TableInitOptions::default();
185/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
186/// let table = client.table(data, opts).await?;
187///
188/// let view = table.view(None).await?;
189/// let arrow = view.to_arrow(ViewWindow::default()).await?;
190/// view.delete().await?;
191/// # Ok(()) }
192/// ```
193///
194/// ```no_run
195/// # use std::collections::HashMap;
196/// # use perspective_client::Table;
197/// # use perspective_client::config::*;
198/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
199/// # let table: Table = todo!();
200/// let view = table
201/// .view(Some(ViewConfigUpdate {
202/// columns: Some(vec![Some("Sales".into())]),
203/// aggregates: Some(HashMap::from_iter(vec![("Sales".into(), "sum".into())])),
204/// group_by: Some(vec!["Region".into(), "Country".into()]),
205/// filter: Some(vec![Filter::new("Category", "in", &[
206/// "Furniture",
207/// "Technology",
208/// ])]),
209/// ..ViewConfigUpdate::default()
210/// }))
211/// .await?;
212/// # Ok(()) }
213/// ```
214///
215/// Group By
216///
217/// ```no_run
218/// # use perspective_client::Table;
219/// # use perspective_client::config::*;
220/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
221/// # let table: Table = todo!();
222/// let view = table
223/// .view(Some(ViewConfigUpdate {
224/// group_by: Some(vec!["a".into(), "c".into()]),
225/// ..ViewConfigUpdate::default()
226/// }))
227/// .await?;
228/// # Ok(()) }
229/// ```
230///
231/// Split By
232///
233/// ```no_run
234/// # use perspective_client::Table;
235/// # use perspective_client::config::*;
236/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
237/// # let table: Table = todo!();
238/// let view = table
239/// .view(Some(ViewConfigUpdate {
240/// split_by: Some(vec!["a".into(), "c".into()]),
241/// ..ViewConfigUpdate::default()
242/// }))
243/// .await?;
244/// # Ok(()) }
245/// ```
246///
247/// In Javascript, a [`Table`] can be constructed on a [`Table::view`] instance,
248/// which will return a new [`Table`] based on the [`Table::view`]'s dataset,
249/// and all future updates that affect the [`Table::view`] will be forwarded to
250/// the new [`Table`]. This is particularly useful for implementing a
251/// [Client/Server Replicated](server.md#clientserver-replicated) design, by
252/// serializing the `View` to an arrow and setting up an `on_update` callback.
253///
254/// ```no_run
255/// # use perspective_client::{Client, TableData, TableInitOptions, UpdateData, UpdateOptions};
256/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
257/// # let client: Client = todo!();
258/// let opts = TableInitOptions::default();
259/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
260/// let table = client.table(data, opts.clone()).await?;
261/// let view = table.view(None).await?;
262/// let table2 = client.table(TableData::View(view), opts).await?;
263/// let more = UpdateData::Csv("x,y\n5,6".into());
264/// table.update(more, UpdateOptions::default()).await?;
265/// # Ok(()) }
266/// ```
267#[derive(Clone, Debug)]
268pub struct View {
269 pub name: String,
270 client: Client,
271}
272
273assert_view_api!(View);
274
275impl View {
276 pub fn new(name: String, client: Client) -> Self {
277 View { name, client }
278 }
279
280 fn client_message(&self, req: ClientReq) -> Request {
281 crate::proto::Request {
282 msg_id: self.client.gen_id(),
283 entity_id: self.name.clone(),
284 client_req: Some(req),
285 }
286 }
287
288 /// Returns an array of strings containing the column paths of the [`View`]
289 /// without any of the source columns.
290 ///
291 /// A column path shows the columns that a given cell belongs to after
292 /// pivots are applied.
293 pub async fn column_paths(&self, window: ColumnWindow) -> ClientResult<Vec<String>> {
294 let msg = self.client_message(ClientReq::ViewColumnPathsReq(ViewColumnPathsReq {
295 start_col: window.start_col.map(|x| x as u32),
296 end_col: window.end_col.map(|x| x as u32),
297 }));
298
299 match self.client.oneshot(&msg).await? {
300 ClientResp::ViewColumnPathsResp(ViewColumnPathsResp { paths }) => {
301 // Ok(paths.into_iter().map(|x| x.path).collect())
302 Ok(paths)
303 },
304 resp => Err(resp.into()),
305 }
306 }
307
308 /// Returns this [`View`]'s _dimensions_, row and column count, as well as
309 /// those of the [`crate::Table`] from which it was derived.
310 ///
311 /// - `num_table_rows` - The number of rows in the underlying
312 /// [`crate::Table`].
313 /// - `num_table_columns` - The number of columns in the underlying
314 /// [`crate::Table`] (including the `index` column if this
315 /// [`crate::Table`] was constructed with one).
316 /// - `num_view_rows` - The number of rows in this [`View`]. If this
317 /// [`View`] has a `group_by` clause, `num_view_rows` will also include
318 /// aggregated rows.
319 /// - `num_view_columns` - The number of columns in this [`View`]. If this
320 /// [`View`] has a `split_by` clause, `num_view_columns` will include all
321 /// _column paths_, e.g. the number of `columns` clause times the number
322 /// of `split_by` groups.
323 pub async fn dimensions(&self) -> ClientResult<ViewDimensionsResp> {
324 let msg = self.client_message(ClientReq::ViewDimensionsReq(ViewDimensionsReq {}));
325 match self.client.oneshot(&msg).await? {
326 ClientResp::ViewDimensionsResp(resp) => Ok(resp),
327 resp => Err(resp.into()),
328 }
329 }
330
331 /// The expression schema of this [`View`], which contains only the
332 /// expressions created on this [`View`]. See [`View::schema`] for
333 /// details.
334 pub async fn expression_schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
335 if self.client.get_features().await?.expressions {
336 let msg = self.client_message(ClientReq::ViewExpressionSchemaReq(
337 ViewExpressionSchemaReq {},
338 ));
339 match self.client.oneshot(&msg).await? {
340 ClientResp::ViewExpressionSchemaResp(ViewExpressionSchemaResp { schema }) => {
341 Ok(schema
342 .into_iter()
343 .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
344 .collect())
345 },
346 resp => Err(resp.into()),
347 }
348 } else {
349 Ok([].into_iter().collect())
350 }
351 }
352
353 /// A copy of the [`ViewConfig`] object passed to the [`Table::view`] method
354 /// which created this [`View`].
355 pub async fn get_config(&self) -> ClientResult<crate::config::ViewConfig> {
356 let msg = self.client_message(ClientReq::ViewGetConfigReq(ViewGetConfigReq {}));
357 match self.client.oneshot(&msg).await? {
358 ClientResp::ViewGetConfigResp(ViewGetConfigResp {
359 config: Some(config),
360 }) => Ok(config.into()),
361 resp => Err(resp.into()),
362 }
363 }
364
365 /// The number of aggregated rows in this [`View`]. This is affected by the
366 /// "group_by" configuration parameter supplied to this view's contructor.
367 ///
368 /// # Returns
369 ///
370 /// The number of aggregated rows.
371 pub async fn num_rows(&self) -> ClientResult<u32> {
372 Ok(self.dimensions().await?.num_view_rows)
373 }
374
375 /// The schema of this [`View`].
376 ///
377 /// The [`View`] schema differs from the `schema` returned by
378 /// [`Table::schema`]; it may have different column names due to
379 /// `expressions` or `columns` configs, or it maye have _different
380 /// column types_ due to the application og `group_by` and `aggregates`
381 /// config. You can think of [`Table::schema`] as the _input_ schema and
382 /// [`View::schema`] as the _output_ schema of a Perspective pipeline.
383 pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
384 let msg = self.client_message(ClientReq::ViewSchemaReq(ViewSchemaReq {}));
385 match self.client.oneshot(&msg).await? {
386 ClientResp::ViewSchemaResp(ViewSchemaResp { schema }) => Ok(schema
387 .into_iter()
388 .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
389 .collect()),
390 resp => Err(resp.into()),
391 }
392 }
393
394 /// Serializes a [`View`] to the Apache Arrow data format.
395 pub async fn to_arrow(&self, window: ViewWindow) -> ClientResult<Bytes> {
396 let msg = self.client_message(ClientReq::ViewToArrowReq(ViewToArrowReq {
397 viewport: Some(window.clone().into()),
398 compression: window.compression,
399 }));
400
401 match self.client.oneshot(&msg).await? {
402 ClientResp::ViewToArrowResp(ViewToArrowResp { arrow }) => Ok(arrow.into()),
403 resp => Err(resp.into()),
404 }
405 }
406
407 /// Serializes this [`View`] to a string of JSON data. Useful if you want to
408 /// save additional round trip serialize/deserialize cycles.
409 pub async fn to_columns_string(&self, window: ViewWindow) -> ClientResult<String> {
410 let msg = self.client_message(ClientReq::ViewToColumnsStringReq(ViewToColumnsStringReq {
411 viewport: Some(window.clone().into()),
412 id: window.id,
413 index: window.index,
414 formatted: window.formatted,
415 }));
416
417 match self.client.oneshot(&msg).await? {
418 ClientResp::ViewToColumnsStringResp(ViewToColumnsStringResp { json_string }) => {
419 Ok(json_string)
420 },
421 resp => Err(resp.into()),
422 }
423 }
424
425 /// Render this `View` as a JSON string.
426 pub async fn to_json_string(&self, window: ViewWindow) -> ClientResult<String> {
427 let viewport = ViewPort::from(window.clone());
428 let msg = self.client_message(ClientReq::ViewToRowsStringReq(ViewToRowsStringReq {
429 viewport: Some(viewport),
430 id: window.id,
431 index: window.index,
432 formatted: window.formatted,
433 }));
434
435 match self.client.oneshot(&msg).await? {
436 ClientResp::ViewToRowsStringResp(ViewToRowsStringResp { json_string }) => {
437 Ok(json_string)
438 },
439 resp => Err(resp.into()),
440 }
441 }
442
443 /// Renders this [`View`] as an [NDJSON](https://github.com/ndjson/ndjson-spec)
444 /// formatted [`String`].
445 pub async fn to_ndjson(&self, window: ViewWindow) -> ClientResult<String> {
446 let viewport = ViewPort::from(window.clone());
447 let msg = self.client_message(ClientReq::ViewToNdjsonStringReq(ViewToNdjsonStringReq {
448 viewport: Some(viewport),
449 id: window.id,
450 index: window.index,
451 formatted: window.formatted,
452 }));
453
454 match self.client.oneshot(&msg).await? {
455 ClientResp::ViewToNdjsonStringResp(ViewToNdjsonStringResp { ndjson_string }) => {
456 Ok(ndjson_string)
457 },
458 resp => Err(resp.into()),
459 }
460 }
461
462 /// Serializes this [`View`] to CSV data in a standard format.
463 pub async fn to_csv(&self, window: ViewWindow) -> ClientResult<String> {
464 let msg = self.client_message(ClientReq::ViewToCsvReq(ViewToCsvReq {
465 viewport: Some(window.into()),
466 }));
467
468 match self.client.oneshot(&msg).await? {
469 ClientResp::ViewToCsvResp(ViewToCsvResp { csv }) => Ok(csv),
470 resp => Err(resp.into()),
471 }
472 }
473
474 /// Delete this [`View`] and clean up all resources associated with it.
475 /// [`View`] objects do not stop consuming resources or processing
476 /// updates when they are garbage collected - you must call this method
477 /// to reclaim these.
478 pub async fn delete(&self) -> ClientResult<()> {
479 let msg = self.client_message(ClientReq::ViewDeleteReq(ViewDeleteReq {}));
480 match self.client.oneshot(&msg).await? {
481 ClientResp::ViewDeleteResp(_) => Ok(()),
482 resp => Err(resp.into()),
483 }
484 }
485
486 /// Calculates the [min, max] of the leaf nodes of a column `column_name`.
487 ///
488 /// # Returns
489 ///
490 /// A tuple of [min, max], whose types are column and aggregate dependent.
491 pub async fn get_min_max(
492 &self,
493 column_name: String,
494 ) -> ClientResult<(crate::config::Scalar, crate::config::Scalar)> {
495 let msg = self.client_message(ClientReq::ViewGetMinMaxReq(ViewGetMinMaxReq {
496 column_name,
497 }));
498
499 match self.client.oneshot(&msg).await? {
500 ClientResp::ViewGetMinMaxResp(ViewGetMinMaxResp { min, max }) => {
501 let min = min.map(crate::config::Scalar::from).unwrap_or_default();
502 let max = max.map(crate::config::Scalar::from).unwrap_or_default();
503 Ok((min, max))
504 },
505 resp => Err(resp.into()),
506 }
507 }
508
509 /// Register a callback with this [`View`]. Whenever the view's underlying
510 /// table emits an update, this callback will be invoked with an object
511 /// containing `port_id`, indicating which port the update fired on, and
512 /// optionally `delta`, which is the new data that was updated for each
513 /// cell or each row.
514 ///
515 /// # Arguments
516 ///
517 /// - `on_update` - A callback function invoked on update, which receives an
518 /// object with two keys: `port_id`, indicating which port the update was
519 /// triggered on, and `delta`, whose value is dependent on the mode
520 /// parameter.
521 /// - `options` - If this is provided as `OnUpdateOptions { mode:
522 /// Some(OnUpdateMode::Row) }`, then `delta` is an Arrow of the updated
523 /// rows. Otherwise `delta` will be [`Option::None`].
524 pub async fn on_update<T, U>(&self, on_update: T, options: OnUpdateOptions) -> ClientResult<u32>
525 where
526 T: Fn(OnUpdateData) -> U + Send + Sync + 'static,
527 U: Future<Output = ()> + Send + 'static,
528 {
529 let on_update = Arc::new(on_update);
530 let callback = move |resp: Response| {
531 let on_update = on_update.clone();
532 async move {
533 match resp.client_resp {
534 Some(ClientResp::ViewOnUpdateResp(resp)) => {
535 on_update(OnUpdateData(resp)).await;
536 Ok(())
537 },
538 resp => Err(resp.into()),
539 }
540 }
541 };
542
543 let msg = self.client_message(ClientReq::ViewOnUpdateReq(ViewOnUpdateReq {
544 mode: options.mode.map(|OnUpdateMode::Row| Mode::Row as i32),
545 }));
546
547 self.client.subscribe(&msg, callback).await?;
548 Ok(msg.msg_id)
549 }
550
551 /// Unregister a previously registered update callback with this [`View`].
552 ///
553 /// # Arguments
554 ///
555 /// - `id` - A callback `id` as returned by a recipricol call to
556 /// [`View::on_update`].
557 ///
558 /// # Examples
559 ///
560 /// ```no_run
561 /// # use perspective_client::{OnUpdateOptions, View};
562 /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
563 /// # let view: View = todo!();
564 /// let callback = |_| async { print!("Updated!") };
565 /// let cid = view.on_update(callback, OnUpdateOptions::default()).await?;
566 /// view.remove_update(cid).await?;
567 /// # Ok(()) }
568 /// ```
569 pub async fn remove_update(&self, update_id: u32) -> ClientResult<()> {
570 let msg = self.client_message(ClientReq::ViewRemoveOnUpdateReq(ViewRemoveOnUpdateReq {
571 id: update_id,
572 }));
573
574 self.client.unsubscribe(update_id).await?;
575 match self.client.oneshot(&msg).await? {
576 ClientResp::ViewRemoveOnUpdateResp(_) => Ok(()),
577 resp => Err(resp.into()),
578 }
579 }
580
581 /// Register a callback with this [`View`]. Whenever the [`View`] is
582 /// deleted, this callback will be invoked.
583 pub async fn on_delete(
584 &self,
585 on_delete: Box<dyn Fn() + Send + Sync + 'static>,
586 ) -> ClientResult<u32> {
587 let callback = move |resp: Response| match resp.client_resp.unwrap() {
588 ClientResp::ViewOnDeleteResp(_) => {
589 on_delete();
590 Ok(())
591 },
592 resp => Err(resp.into()),
593 };
594
595 let msg = self.client_message(ClientReq::ViewOnDeleteReq(ViewOnDeleteReq {}));
596 self.client.subscribe_once(&msg, Box::new(callback)).await?;
597 Ok(msg.msg_id)
598 }
599
600 /// Unregister a previously registered [`View::on_delete`] callback.
601 pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
602 let msg = self.client_message(ClientReq::ViewRemoveDeleteReq(ViewRemoveDeleteReq {
603 id: callback_id,
604 }));
605
606 match self.client.oneshot(&msg).await? {
607 ClientResp::ViewRemoveDeleteResp(ViewRemoveDeleteResp {}) => Ok(()),
608 resp => Err(resp.into()),
609 }
610 }
611
612 /// Collapses the `group_by` row at `row_index`.
613 pub async fn collapse(&self, row_index: u32) -> ClientResult<u32> {
614 let msg = self.client_message(ClientReq::ViewCollapseReq(ViewCollapseReq { row_index }));
615 match self.client.oneshot(&msg).await? {
616 ClientResp::ViewCollapseResp(ViewCollapseResp { num_changed }) => Ok(num_changed),
617 resp => Err(resp.into()),
618 }
619 }
620
621 /// Expand the `group_by` row at `row_index`.
622 pub async fn expand(&self, row_index: u32) -> ClientResult<u32> {
623 let msg = self.client_message(ClientReq::ViewExpandReq(ViewExpandReq { row_index }));
624 match self.client.oneshot(&msg).await? {
625 ClientResp::ViewExpandResp(ViewExpandResp { num_changed }) => Ok(num_changed),
626 resp => Err(resp.into()),
627 }
628 }
629
630 /// Set expansion `depth` of the `group_by` tree.
631 pub async fn set_depth(&self, depth: u32) -> ClientResult<()> {
632 let msg = self.client_message(ClientReq::ViewSetDepthReq(ViewSetDepthReq { depth }));
633 match self.client.oneshot(&msg).await? {
634 ClientResp::ViewSetDepthResp(_) => Ok(()),
635 resp => Err(resp.into()),
636 }
637 }
638}