Skip to main content

perspective_client/
view.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use futures::Future;
19use prost::bytes::Bytes;
20use serde::{Deserialize, Serialize};
21use ts_rs::TS;
22
23use self::view_on_update_req::Mode;
24use crate::assert_view_api;
25use crate::client::Client;
26use crate::proto::request::ClientReq;
27use crate::proto::response::ClientResp;
28use crate::proto::*;
29#[cfg(doc)]
30use crate::table::Table;
31pub use crate::utils::*;
32
33/// Options for [`View::on_update`].
34#[derive(Default, Debug, Deserialize, TS)]
35pub struct OnUpdateOptions {
36    pub mode: Option<OnUpdateMode>,
37}
38
39/// The update mode for [`View::on_update`].
40///
41/// `Row` mode calculates and provides the update batch new rows/columns as an
42/// Apache Arrow to the callback provided to [`View::on_update`]. This allows
43/// incremental updates if your callbakc can read this format, but should be
44/// disabled otherwise.
45#[derive(Default, Debug, Deserialize, TS)]
46pub enum OnUpdateMode {
47    #[default]
48    #[serde(rename = "row")]
49    Row,
50}
51
52impl FromStr for OnUpdateMode {
53    type Err = ClientError;
54
55    fn from_str(s: &str) -> Result<Self, Self::Err> {
56        if s == "row" {
57            Ok(OnUpdateMode::Row)
58        } else {
59            Err(ClientError::Option)
60        }
61    }
62}
63
64#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
65pub struct ColumnWindow {
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub start_col: Option<f32>,
68
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub end_col: Option<f32>,
71}
72
73/// Options for serializing a window of data from a [`View`].
74///
75/// Some fields of [`ViewWindow`] are only applicable to specific methods of
76/// [`View`].
77#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
78pub struct ViewWindow {
79    #[ts(optional)]
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub start_row: Option<f64>,
82
83    #[ts(optional)]
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub start_col: Option<f64>,
86
87    #[ts(optional)]
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub end_row: Option<f64>,
90
91    #[ts(optional)]
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub end_col: Option<f64>,
94
95    #[ts(optional)]
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub id: Option<bool>,
98
99    #[ts(optional)]
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub index: Option<bool>,
102
103    /// Only impacts [`View::to_csv`]
104    #[ts(optional)]
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub formatted: Option<bool>,
107
108    /// Only impacts [`View::to_arrow`]
109    #[ts(optional)]
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub compression: Option<String>,
112
113    /// When `true`, group-by columns use legacy `"colname (Group by N)"`
114    /// naming. When `false`, they use `__ROW_PATH_N__` naming consistent
115    /// with the SQL backend. Defaults to `true` for backwards compatibility.
116    #[ts(optional)]
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub emit_legacy_row_path_names: Option<bool>,
119}
120
121impl From<ViewWindow> for ViewPort {
122    fn from(window: ViewWindow) -> Self {
123        ViewPort {
124            start_row: window.start_row.map(|x| x.floor() as u32),
125            start_col: window.start_col.map(|x| x.floor() as u32),
126            end_row: window.end_row.map(|x| x.ceil() as u32),
127            end_col: window.end_col.map(|x| x.ceil() as u32),
128            emit_legacy_row_path_names: window.emit_legacy_row_path_names,
129        }
130    }
131}
132
133impl From<ViewPort> for ViewWindow {
134    fn from(window: ViewPort) -> Self {
135        ViewWindow {
136            start_row: window.start_row.map(|x| x as f64),
137            start_col: window.start_col.map(|x| x as f64),
138            end_row: window.end_row.map(|x| x as f64),
139            end_col: window.end_col.map(|x| x as f64),
140            emit_legacy_row_path_names: window.emit_legacy_row_path_names,
141            ..ViewWindow::default()
142        }
143    }
144}
145
146/// Rows updated and port ID corresponding to an update batch, provided to the
147/// callback argument to [`View::on_update`] with the "rows" mode.
148#[derive(TS)]
149pub struct OnUpdateData(crate::proto::ViewOnUpdateResp);
150
151impl Deref for OnUpdateData {
152    type Target = crate::proto::ViewOnUpdateResp;
153
154    fn deref(&self) -> &Self::Target {
155        &self.0
156    }
157}
158
159/// The [`View`] struct is Perspective's query and serialization interface. It
160/// represents a query on the `Table`'s dataset and is always created from an
161/// existing `Table` instance via the [`Table::view`] method.
162///
163/// [`View`]s are immutable with respect to the arguments provided to the
164/// [`Table::view`] method; to change these parameters, you must create a new
165/// [`View`] on the same [`Table`]. However, each [`View`] is _live_ with
166/// respect to the [`Table`]'s data, and will (within a conflation window)
167/// update with the latest state as its parent [`Table`] updates, including
168/// incrementally recalculating all aggregates, pivots, filters, etc. [`View`]
169/// query parameters are composable, in that each parameter works independently
170/// _and_ in conjunction with each other, and there is no limit to the number of
171/// pivots, filters, etc. which can be applied.
172///
173/// To construct a [`View`], call the [`Table::view`] factory method. A
174/// [`Table`] can have as many [`View`]s associated with it as you need -
175/// Perspective conserves memory by relying on a single [`Table`] to power
176/// multiple [`View`]s concurrently.
177///
178/// # Examples
179///
180/// ```no_run
181/// # use perspective_client::{Client, TableData, TableInitOptions, UpdateData, ViewWindow};
182/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
183/// # let client: Client = todo!();
184/// let opts = TableInitOptions::default();
185/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
186/// let table = client.table(data, opts).await?;
187///
188/// let view = table.view(None).await?;
189/// let arrow = view.to_arrow(ViewWindow::default()).await?;
190/// view.delete().await?;
191/// # Ok(()) }
192/// ```
193///
194/// ```no_run
195/// # use std::collections::HashMap;
196/// # use perspective_client::Table;
197/// # use perspective_client::config::*;
198/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
199/// # let table: Table = todo!();
200/// let view = table
201///     .view(Some(ViewConfigUpdate {
202///         columns: Some(vec![Some("Sales".into())]),
203///         aggregates: Some(HashMap::from_iter(vec![("Sales".into(), "sum".into())])),
204///         group_by: Some(vec!["Region".into(), "Country".into()]),
205///         filter: Some(vec![Filter::new("Category", "in", &[
206///             "Furniture",
207///             "Technology",
208///         ])]),
209///         ..ViewConfigUpdate::default()
210///     }))
211///     .await?;
212/// # Ok(()) }
213/// ```
214///
215///  Group By
216///
217/// ```no_run
218/// # use perspective_client::Table;
219/// # use perspective_client::config::*;
220/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
221/// # let table: Table = todo!();
222/// let view = table
223///     .view(Some(ViewConfigUpdate {
224///         group_by: Some(vec!["a".into(), "c".into()]),
225///         ..ViewConfigUpdate::default()
226///     }))
227///     .await?;
228/// # Ok(()) }
229/// ```
230///
231/// Split By
232///
233/// ```no_run
234/// # use perspective_client::Table;
235/// # use perspective_client::config::*;
236/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
237/// # let table: Table = todo!();
238/// let view = table
239///     .view(Some(ViewConfigUpdate {
240///         split_by: Some(vec!["a".into(), "c".into()]),
241///         ..ViewConfigUpdate::default()
242///     }))
243///     .await?;
244/// # Ok(()) }
245/// ```
246///
247/// In Javascript, a [`Table`] can be constructed on a [`Table::view`] instance,
248/// which will return a new [`Table`] based on the [`Table::view`]'s dataset,
249/// and all future updates that affect the [`Table::view`] will be forwarded to
250/// the new [`Table`]. This is particularly useful for implementing a
251/// [Client/Server Replicated](server.md#clientserver-replicated) design, by
252/// serializing the `View` to an arrow and setting up an `on_update` callback.
253///
254/// ```no_run
255/// # use perspective_client::{Client, TableData, TableInitOptions, UpdateData, UpdateOptions};
256/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
257/// # let client: Client = todo!();
258/// let opts = TableInitOptions::default();
259/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
260/// let table = client.table(data, opts.clone()).await?;
261/// let view = table.view(None).await?;
262/// let table2 = client.table(TableData::View(view), opts).await?;
263/// let more = UpdateData::Csv("x,y\n5,6".into());
264/// table.update(more, UpdateOptions::default()).await?;
265/// # Ok(()) }
266/// ```
267#[derive(Clone, Debug)]
268pub struct View {
269    pub name: String,
270    client: Client,
271}
272
273assert_view_api!(View);
274
275impl View {
276    pub fn new(name: String, client: Client) -> Self {
277        View { name, client }
278    }
279
280    fn client_message(&self, req: ClientReq) -> Request {
281        crate::proto::Request {
282            msg_id: self.client.gen_id(),
283            entity_id: self.name.clone(),
284            client_req: Some(req),
285        }
286    }
287
288    /// Returns an array of strings containing the column paths of the [`View`]
289    /// without any of the source columns.
290    ///
291    /// A column path shows the columns that a given cell belongs to after
292    /// pivots are applied.
293    pub async fn column_paths(&self, window: ColumnWindow) -> ClientResult<Vec<String>> {
294        let msg = self.client_message(ClientReq::ViewColumnPathsReq(ViewColumnPathsReq {
295            start_col: window.start_col.map(|x| x as u32),
296            end_col: window.end_col.map(|x| x as u32),
297        }));
298
299        match self.client.oneshot(&msg).await? {
300            ClientResp::ViewColumnPathsResp(ViewColumnPathsResp { paths }) => {
301                // Ok(paths.into_iter().map(|x| x.path).collect())
302                Ok(paths)
303            },
304            resp => Err(resp.into()),
305        }
306    }
307
308    /// Returns this [`View`]'s _dimensions_, row and column count, as well as
309    /// those of the [`crate::Table`] from which it was derived.
310    ///
311    /// - `num_table_rows` - The number of rows in the underlying
312    ///   [`crate::Table`].
313    /// - `num_table_columns` - The number of columns in the underlying
314    ///   [`crate::Table`] (including the `index` column if this
315    ///   [`crate::Table`] was constructed with one).
316    /// - `num_view_rows` - The number of rows in this [`View`]. If this
317    ///   [`View`] has a `group_by` clause, `num_view_rows` will also include
318    ///   aggregated rows.
319    /// - `num_view_columns` - The number of columns in this [`View`]. If this
320    ///   [`View`] has a `split_by` clause, `num_view_columns` will include all
321    ///   _column paths_, e.g. the number of `columns` clause times the number
322    ///   of `split_by` groups.
323    pub async fn dimensions(&self) -> ClientResult<ViewDimensionsResp> {
324        let msg = self.client_message(ClientReq::ViewDimensionsReq(ViewDimensionsReq {}));
325        match self.client.oneshot(&msg).await? {
326            ClientResp::ViewDimensionsResp(resp) => Ok(resp),
327            resp => Err(resp.into()),
328        }
329    }
330
331    /// The expression schema of this [`View`], which contains only the
332    /// expressions created on this [`View`]. See [`View::schema`] for
333    /// details.
334    pub async fn expression_schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
335        if self.client.get_features().await?.expressions {
336            let msg = self.client_message(ClientReq::ViewExpressionSchemaReq(
337                ViewExpressionSchemaReq {},
338            ));
339            match self.client.oneshot(&msg).await? {
340                ClientResp::ViewExpressionSchemaResp(ViewExpressionSchemaResp { schema }) => {
341                    Ok(schema
342                        .into_iter()
343                        .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
344                        .collect())
345                },
346                resp => Err(resp.into()),
347            }
348        } else {
349            Ok([].into_iter().collect())
350        }
351    }
352
353    /// A copy of the [`ViewConfig`] object passed to the [`Table::view`] method
354    /// which created this [`View`].
355    pub async fn get_config(&self) -> ClientResult<crate::config::ViewConfig> {
356        let msg = self.client_message(ClientReq::ViewGetConfigReq(ViewGetConfigReq {}));
357        match self.client.oneshot(&msg).await? {
358            ClientResp::ViewGetConfigResp(ViewGetConfigResp {
359                config: Some(config),
360            }) => Ok(config.into()),
361            resp => Err(resp.into()),
362        }
363    }
364
365    /// The number of aggregated rows in this [`View`]. This is affected by the
366    /// "group_by" configuration parameter supplied to this view's contructor.
367    ///
368    /// # Returns
369    ///
370    /// The number of aggregated rows.
371    pub async fn num_rows(&self) -> ClientResult<u32> {
372        Ok(self.dimensions().await?.num_view_rows)
373    }
374
375    /// The schema of this [`View`].
376    ///
377    /// The [`View`] schema differs from the `schema` returned by
378    /// [`Table::schema`]; it may have different column names due to
379    /// `expressions` or `columns` configs, or it maye have _different
380    /// column types_ due to the application og `group_by` and `aggregates`
381    /// config. You can think of [`Table::schema`] as the _input_ schema and
382    /// [`View::schema`] as the _output_ schema of a Perspective pipeline.
383    pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
384        let msg = self.client_message(ClientReq::ViewSchemaReq(ViewSchemaReq {}));
385        match self.client.oneshot(&msg).await? {
386            ClientResp::ViewSchemaResp(ViewSchemaResp { schema }) => Ok(schema
387                .into_iter()
388                .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
389                .collect()),
390            resp => Err(resp.into()),
391        }
392    }
393
394    /// Serializes a [`View`] to the Apache Arrow data format.
395    pub async fn to_arrow(&self, window: ViewWindow) -> ClientResult<Bytes> {
396        let msg = self.client_message(ClientReq::ViewToArrowReq(ViewToArrowReq {
397            viewport: Some(window.clone().into()),
398            compression: window.compression,
399        }));
400
401        match self.client.oneshot(&msg).await? {
402            ClientResp::ViewToArrowResp(ViewToArrowResp { arrow }) => Ok(arrow.into()),
403            resp => Err(resp.into()),
404        }
405    }
406
407    /// Serializes this [`View`] to a string of JSON data. Useful if you want to
408    /// save additional round trip serialize/deserialize cycles.    
409    pub async fn to_columns_string(&self, window: ViewWindow) -> ClientResult<String> {
410        let msg = self.client_message(ClientReq::ViewToColumnsStringReq(ViewToColumnsStringReq {
411            viewport: Some(window.clone().into()),
412            id: window.id,
413            index: window.index,
414            formatted: window.formatted,
415        }));
416
417        match self.client.oneshot(&msg).await? {
418            ClientResp::ViewToColumnsStringResp(ViewToColumnsStringResp { json_string }) => {
419                Ok(json_string)
420            },
421            resp => Err(resp.into()),
422        }
423    }
424
425    /// Render this `View` as a JSON string.
426    pub async fn to_json_string(&self, window: ViewWindow) -> ClientResult<String> {
427        let viewport = ViewPort::from(window.clone());
428        let msg = self.client_message(ClientReq::ViewToRowsStringReq(ViewToRowsStringReq {
429            viewport: Some(viewport),
430            id: window.id,
431            index: window.index,
432            formatted: window.formatted,
433        }));
434
435        match self.client.oneshot(&msg).await? {
436            ClientResp::ViewToRowsStringResp(ViewToRowsStringResp { json_string }) => {
437                Ok(json_string)
438            },
439            resp => Err(resp.into()),
440        }
441    }
442
443    /// Renders this [`View`] as an [NDJSON](https://github.com/ndjson/ndjson-spec)
444    /// formatted [`String`].
445    pub async fn to_ndjson(&self, window: ViewWindow) -> ClientResult<String> {
446        let viewport = ViewPort::from(window.clone());
447        let msg = self.client_message(ClientReq::ViewToNdjsonStringReq(ViewToNdjsonStringReq {
448            viewport: Some(viewport),
449            id: window.id,
450            index: window.index,
451            formatted: window.formatted,
452        }));
453
454        match self.client.oneshot(&msg).await? {
455            ClientResp::ViewToNdjsonStringResp(ViewToNdjsonStringResp { ndjson_string }) => {
456                Ok(ndjson_string)
457            },
458            resp => Err(resp.into()),
459        }
460    }
461
462    /// Serializes this [`View`] to CSV data in a standard format.
463    pub async fn to_csv(&self, window: ViewWindow) -> ClientResult<String> {
464        let msg = self.client_message(ClientReq::ViewToCsvReq(ViewToCsvReq {
465            viewport: Some(window.into()),
466        }));
467
468        match self.client.oneshot(&msg).await? {
469            ClientResp::ViewToCsvResp(ViewToCsvResp { csv }) => Ok(csv),
470            resp => Err(resp.into()),
471        }
472    }
473
474    /// Delete this [`View`] and clean up all resources associated with it.
475    /// [`View`] objects do not stop consuming resources or processing
476    /// updates when they are garbage collected - you must call this method
477    /// to reclaim these.
478    pub async fn delete(&self) -> ClientResult<()> {
479        let msg = self.client_message(ClientReq::ViewDeleteReq(ViewDeleteReq {}));
480        match self.client.oneshot(&msg).await? {
481            ClientResp::ViewDeleteResp(_) => Ok(()),
482            resp => Err(resp.into()),
483        }
484    }
485
486    /// Calculates the [min, max] of the leaf nodes of a column `column_name`.
487    ///
488    /// # Returns
489    ///
490    /// A tuple of [min, max], whose types are column and aggregate dependent.
491    pub async fn get_min_max(
492        &self,
493        column_name: String,
494    ) -> ClientResult<(crate::config::Scalar, crate::config::Scalar)> {
495        let msg = self.client_message(ClientReq::ViewGetMinMaxReq(ViewGetMinMaxReq {
496            column_name,
497        }));
498
499        match self.client.oneshot(&msg).await? {
500            ClientResp::ViewGetMinMaxResp(ViewGetMinMaxResp { min, max }) => {
501                let min = min.map(crate::config::Scalar::from).unwrap_or_default();
502                let max = max.map(crate::config::Scalar::from).unwrap_or_default();
503                Ok((min, max))
504            },
505            resp => Err(resp.into()),
506        }
507    }
508
509    /// Register a callback with this [`View`]. Whenever the view's underlying
510    /// table emits an update, this callback will be invoked with an object
511    /// containing `port_id`, indicating which port the update fired on, and
512    /// optionally `delta`, which is the new data that was updated for each
513    /// cell or each row.
514    ///
515    /// # Arguments
516    ///
517    /// - `on_update` - A callback function invoked on update, which receives an
518    ///   object with two keys: `port_id`, indicating which port the update was
519    ///   triggered on, and `delta`, whose value is dependent on the mode
520    ///   parameter.
521    /// - `options` - If this is provided as `OnUpdateOptions { mode:
522    ///   Some(OnUpdateMode::Row) }`, then `delta` is an Arrow of the updated
523    ///   rows. Otherwise `delta` will be [`Option::None`].
524    pub async fn on_update<T, U>(&self, on_update: T, options: OnUpdateOptions) -> ClientResult<u32>
525    where
526        T: Fn(OnUpdateData) -> U + Send + Sync + 'static,
527        U: Future<Output = ()> + Send + 'static,
528    {
529        let on_update = Arc::new(on_update);
530        let callback = move |resp: Response| {
531            let on_update = on_update.clone();
532            async move {
533                match resp.client_resp {
534                    Some(ClientResp::ViewOnUpdateResp(resp)) => {
535                        on_update(OnUpdateData(resp)).await;
536                        Ok(())
537                    },
538                    resp => Err(resp.into()),
539                }
540            }
541        };
542
543        let msg = self.client_message(ClientReq::ViewOnUpdateReq(ViewOnUpdateReq {
544            mode: options.mode.map(|OnUpdateMode::Row| Mode::Row as i32),
545        }));
546
547        self.client.subscribe(&msg, callback).await?;
548        Ok(msg.msg_id)
549    }
550
551    /// Unregister a previously registered update callback with this [`View`].
552    ///
553    /// # Arguments
554    ///
555    /// - `id` - A callback `id` as returned by a recipricol call to
556    ///   [`View::on_update`].
557    ///
558    /// # Examples
559    ///
560    /// ```no_run
561    /// # use perspective_client::{OnUpdateOptions, View};
562    /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
563    /// # let view: View = todo!();
564    /// let callback = |_| async { print!("Updated!") };
565    /// let cid = view.on_update(callback, OnUpdateOptions::default()).await?;
566    /// view.remove_update(cid).await?;
567    /// # Ok(()) }
568    /// ```
569    pub async fn remove_update(&self, update_id: u32) -> ClientResult<()> {
570        let msg = self.client_message(ClientReq::ViewRemoveOnUpdateReq(ViewRemoveOnUpdateReq {
571            id: update_id,
572        }));
573
574        self.client.unsubscribe(update_id).await?;
575        match self.client.oneshot(&msg).await? {
576            ClientResp::ViewRemoveOnUpdateResp(_) => Ok(()),
577            resp => Err(resp.into()),
578        }
579    }
580
581    /// Register a callback with this [`View`]. Whenever the [`View`] is
582    /// deleted, this callback will be invoked.
583    pub async fn on_delete(
584        &self,
585        on_delete: Box<dyn Fn() + Send + Sync + 'static>,
586    ) -> ClientResult<u32> {
587        let callback = move |resp: Response| match resp.client_resp.unwrap() {
588            ClientResp::ViewOnDeleteResp(_) => {
589                on_delete();
590                Ok(())
591            },
592            resp => Err(resp.into()),
593        };
594
595        let msg = self.client_message(ClientReq::ViewOnDeleteReq(ViewOnDeleteReq {}));
596        self.client.subscribe_once(&msg, Box::new(callback)).await?;
597        Ok(msg.msg_id)
598    }
599
600    /// Unregister a previously registered [`View::on_delete`] callback.
601    pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
602        let msg = self.client_message(ClientReq::ViewRemoveDeleteReq(ViewRemoveDeleteReq {
603            id: callback_id,
604        }));
605
606        match self.client.oneshot(&msg).await? {
607            ClientResp::ViewRemoveDeleteResp(ViewRemoveDeleteResp {}) => Ok(()),
608            resp => Err(resp.into()),
609        }
610    }
611
612    /// Collapses the `group_by` row at `row_index`.
613    pub async fn collapse(&self, row_index: u32) -> ClientResult<u32> {
614        let msg = self.client_message(ClientReq::ViewCollapseReq(ViewCollapseReq { row_index }));
615        match self.client.oneshot(&msg).await? {
616            ClientResp::ViewCollapseResp(ViewCollapseResp { num_changed }) => Ok(num_changed),
617            resp => Err(resp.into()),
618        }
619    }
620
621    /// Expand the `group_by` row at `row_index`.
622    pub async fn expand(&self, row_index: u32) -> ClientResult<u32> {
623        let msg = self.client_message(ClientReq::ViewExpandReq(ViewExpandReq { row_index }));
624        match self.client.oneshot(&msg).await? {
625            ClientResp::ViewExpandResp(ViewExpandResp { num_changed }) => Ok(num_changed),
626            resp => Err(resp.into()),
627        }
628    }
629
630    /// Set expansion `depth` of the `group_by` tree.
631    pub async fn set_depth(&self, depth: u32) -> ClientResult<()> {
632        let msg = self.client_message(ClientReq::ViewSetDepthReq(ViewSetDepthReq { depth }));
633        match self.client.oneshot(&msg).await? {
634            ClientResp::ViewSetDepthResp(_) => Ok(()),
635            resp => Err(resp.into()),
636        }
637    }
638}