perspective_client/
view.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use futures::Future;
19use prost::bytes::Bytes;
20use serde::{Deserialize, Serialize};
21use ts_rs::TS;
22
23use self::view_on_update_req::Mode;
24use crate::assert_view_api;
25use crate::client::Client;
26use crate::proto::request::ClientReq;
27use crate::proto::response::ClientResp;
28use crate::proto::*;
29#[cfg(doc)]
30use crate::table::Table;
31pub use crate::utils::*;
32
33/// Options for [`View::on_update`].
34#[derive(Default, Debug, Deserialize, TS)]
35pub struct OnUpdateOptions {
36    pub mode: Option<OnUpdateMode>,
37}
38
39/// The update mode for [`View::on_update`].
40///
41/// `Row` mode calculates and provides the update batch new rows/columns as an
42/// Apache Arrow to the callback provided to [`View::on_update`]. This allows
43/// incremental updates if your callbakc can read this format, but should be
44/// disabled otherwise.
45#[derive(Default, Debug, Deserialize, TS)]
46pub enum OnUpdateMode {
47    #[default]
48    #[serde(rename = "row")]
49    Row,
50}
51
52impl FromStr for OnUpdateMode {
53    type Err = ClientError;
54
55    fn from_str(s: &str) -> Result<Self, Self::Err> {
56        if s == "row" {
57            Ok(OnUpdateMode::Row)
58        } else {
59            Err(ClientError::Option)
60        }
61    }
62}
63
64#[derive(Clone, Debug, Serialize)]
65pub struct Dimensions {
66    pub num_view_rows: usize,
67    pub num_view_columns: usize,
68    pub num_table_rows: usize,
69    pub num_table_columns: usize,
70}
71
72#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
73pub struct ColumnWindow {
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub start_col: Option<f32>,
76
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub end_col: Option<f32>,
79}
80
81/// Options for serializing a window of data from a [`View`].
82///
83/// Some fields of [`ViewWindow`] are only applicable to specific methods of
84/// [`View`].
85#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
86pub struct ViewWindow {
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub start_row: Option<f64>,
89
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub start_col: Option<f64>,
92
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub end_row: Option<f64>,
95
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub end_col: Option<f64>,
98
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub id: Option<bool>,
101
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub index: Option<bool>,
104
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub leaves_only: Option<bool>,
107
108    /// Only impacts [`View::to_csv`]
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub formatted: Option<bool>,
111
112    /// Only impacts [`View::to_arrow`]
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub compression: Option<String>,
115}
116
117impl From<ViewWindow> for ViewPort {
118    fn from(window: ViewWindow) -> Self {
119        ViewPort {
120            start_row: window.start_row.map(|x| x.floor() as u32),
121            start_col: window.start_col.map(|x| x.floor() as u32),
122            end_row: window.end_row.map(|x| x.ceil() as u32),
123            end_col: window.end_col.map(|x| x.ceil() as u32),
124        }
125    }
126}
127
128impl From<ViewPort> for ViewWindow {
129    fn from(window: ViewPort) -> Self {
130        ViewWindow {
131            start_row: window.start_row.map(|x| x as f64),
132            start_col: window.start_col.map(|x| x as f64),
133            end_row: window.end_row.map(|x| x as f64),
134            end_col: window.end_col.map(|x| x as f64),
135            ..ViewWindow::default()
136        }
137    }
138}
139
140/// Rows updated and port ID corresponding to an update batch, provided to the
141/// callback argument to [`View::on_update`] with the "rows" mode.
142#[derive(TS)]
143pub struct OnUpdateData(crate::proto::ViewOnUpdateResp);
144
145impl Deref for OnUpdateData {
146    type Target = crate::proto::ViewOnUpdateResp;
147
148    fn deref(&self) -> &Self::Target {
149        &self.0
150    }
151}
152
153/// The [`View`] struct is Perspective's query and serialization interface. It
154/// represents a query on the `Table`'s dataset and is always created from an
155/// existing `Table` instance via the [`Table::view`] method.
156///
157/// [`View`]s are immutable with respect to the arguments provided to the
158/// [`Table::view`] method; to change these parameters, you must create a new
159/// [`View`] on the same [`Table`]. However, each [`View`] is _live_ with
160/// respect to the [`Table`]'s data, and will (within a conflation window)
161/// update with the latest state as its parent [`Table`] updates, including
162/// incrementally recalculating all aggregates, pivots, filters, etc. [`View`]
163/// query parameters are composable, in that each parameter works independently
164/// _and_ in conjunction with each other, and there is no limit to the number of
165/// pivots, filters, etc. which can be applied.
166///
167/// To construct a [`View`], call the [`Table::view`] factory method. A
168/// [`Table`] can have as many [`View`]s associated with it as you need -
169/// Perspective conserves memory by relying on a single [`Table`] to power
170/// multiple [`View`]s concurrently.
171///
172/// # Examples
173///
174/// ```rust
175/// let opts = TableInitOptions::default();
176/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
177/// let table = client.table(data, opts).await?;
178///
179/// let view = table.view(None).await?;
180/// let arrow = view.to_arrow().await?;
181/// view.delete().await?;
182/// ```
183///
184/// ```rust
185/// use crate::config::*;
186/// let view = table
187///     .view(Some(ViewConfigUpdate {
188///         columns: Some(vec![Some("Sales".into())]),
189///         aggregates: Some(HashMap::from_iter(vec![("Sales".into(), "sum".into())])),
190///         group_by: Some(vec!["Region".into(), "Country".into()]),
191///         filter: Some(vec![Filter::new("Category", "in", &[
192///             "Furniture",
193///             "Technology",
194///         ])]),
195///         ..ViewConfigUpdate::default()
196///     }))
197///     .await?;
198/// ```
199///
200///  Group By
201///
202/// ```rust
203/// let view = table
204///     .view(Some(ViewConfigUpdate {
205///         group_by: Some(vec!["a".into(), "c".into()]),
206///         ..ViewConfigUpdate::default()
207///     }))
208///     .await?;
209/// ```
210///
211/// Split By
212///
213/// ```rust
214/// let view = table
215///     .view(Some(ViewConfigUpdate {
216///         split_by: Some(vec!["a".into(), "c".into()]),
217///         ..ViewConfigUpdate::default()
218///     }))
219///     .await?;
220/// ```
221///
222/// In Javascript, a [`Table`] can be constructed on a [`Table::view`] instance,
223/// which will return a new [`Table`] based on the [`Table::view`]'s dataset,
224/// and all future updates that affect the [`Table::view`] will be forwarded to
225/// the new [`Table`]. This is particularly useful for implementing a
226/// [Client/Server Replicated](server.md#clientserver-replicated) design, by
227/// serializing the `View` to an arrow and setting up an `on_update` callback.
228///
229/// ```rust
230/// let opts = TableInitOptions::default();
231/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
232/// let table = client.table(data, opts).await?;
233/// let view = table.view(None).await?;
234/// let table2 = client.table(TableData::View(view)).await?;
235/// table.update(data).await?;
236/// ```
237#[derive(Clone, Debug)]
238pub struct View {
239    pub name: String,
240    client: Client,
241}
242
243assert_view_api!(View);
244
245impl View {
246    pub fn new(name: String, client: Client) -> Self {
247        View { name, client }
248    }
249
250    fn client_message(&self, req: ClientReq) -> Request {
251        crate::proto::Request {
252            msg_id: self.client.gen_id(),
253            entity_id: self.name.clone(),
254            client_req: Some(req),
255        }
256    }
257
258    /// Returns an array of strings containing the column paths of the [`View`]
259    /// without any of the source columns.
260    ///
261    /// A column path shows the columns that a given cell belongs to after
262    /// pivots are applied.
263    pub async fn column_paths(&self, window: ColumnWindow) -> ClientResult<Vec<String>> {
264        let msg = self.client_message(ClientReq::ViewColumnPathsReq(ViewColumnPathsReq {
265            start_col: window.start_col.map(|x| x as u32),
266            end_col: window.end_col.map(|x| x as u32),
267        }));
268
269        match self.client.oneshot(&msg).await? {
270            ClientResp::ViewColumnPathsResp(ViewColumnPathsResp { paths }) => {
271                // Ok(paths.into_iter().map(|x| x.path).collect())
272                Ok(paths)
273            },
274            resp => Err(resp.into()),
275        }
276    }
277
278    /// Returns this [`View`]'s _dimensions_, row and column count, as well as
279    /// those of the [`crate::Table`] from which it was derived.
280    ///
281    /// - `num_table_rows` - The number of rows in the underlying
282    ///   [`crate::Table`].
283    /// - `num_table_columns` - The number of columns in the underlying
284    ///   [`crate::Table`] (including the `index` column if this
285    ///   [`crate::Table`] was constructed with one).
286    /// - `num_view_rows` - The number of rows in this [`View`]. If this
287    ///   [`View`] has a `group_by` clause, `num_view_rows` will also include
288    ///   aggregated rows.
289    /// - `num_view_columns` - The number of columns in this [`View`]. If this
290    ///   [`View`] has a `split_by` clause, `num_view_columns` will include all
291    ///   _column paths_, e.g. the number of `columns` clause times the number
292    ///   of `split_by` groups.
293    pub async fn dimensions(&self) -> ClientResult<ViewDimensionsResp> {
294        let msg = self.client_message(ClientReq::ViewDimensionsReq(ViewDimensionsReq {}));
295        match self.client.oneshot(&msg).await? {
296            ClientResp::ViewDimensionsResp(resp) => Ok(resp),
297            resp => Err(resp.into()),
298        }
299    }
300
301    /// The expression schema of this [`View`], which contains only the
302    /// expressions created on this [`View`]. See [`View::schema`] for
303    /// details.
304    pub async fn expression_schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
305        if self.client.get_features().await?.expressions {
306            let msg = self.client_message(ClientReq::ViewExpressionSchemaReq(
307                ViewExpressionSchemaReq {},
308            ));
309            match self.client.oneshot(&msg).await? {
310                ClientResp::ViewExpressionSchemaResp(ViewExpressionSchemaResp { schema }) => {
311                    Ok(schema
312                        .into_iter()
313                        .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
314                        .collect())
315                },
316                resp => Err(resp.into()),
317            }
318        } else {
319            Ok([].into_iter().collect())
320        }
321    }
322
323    /// A copy of the [`ViewConfig`] object passed to the [`Table::view`] method
324    /// which created this [`View`].
325    pub async fn get_config(&self) -> ClientResult<crate::config::ViewConfig> {
326        let msg = self.client_message(ClientReq::ViewGetConfigReq(ViewGetConfigReq {}));
327        match self.client.oneshot(&msg).await? {
328            ClientResp::ViewGetConfigResp(ViewGetConfigResp {
329                config: Some(config),
330            }) => Ok(config.into()),
331            resp => Err(resp.into()),
332        }
333    }
334
335    /// The number of aggregated rows in this [`View`]. This is affected by the
336    /// "group_by" configuration parameter supplied to this view's contructor.
337    ///
338    /// # Returns
339    ///
340    /// The number of aggregated rows.
341    pub async fn num_rows(&self) -> ClientResult<u32> {
342        Ok(self.dimensions().await?.num_view_rows)
343    }
344
345    /// The schema of this [`View`].
346    ///
347    /// The [`View`] schema differs from the `schema` returned by
348    /// [`Table::schema`]; it may have different column names due to
349    /// `expressions` or `columns` configs, or it maye have _different
350    /// column types_ due to the application og `group_by` and `aggregates`
351    /// config. You can think of [`Table::schema`] as the _input_ schema and
352    /// [`View::schema`] as the _output_ schema of a Perspective pipeline.
353    pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
354        let msg = self.client_message(ClientReq::ViewSchemaReq(ViewSchemaReq {}));
355        match self.client.oneshot(&msg).await? {
356            ClientResp::ViewSchemaResp(ViewSchemaResp { schema }) => Ok(schema
357                .into_iter()
358                .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
359                .collect()),
360            resp => Err(resp.into()),
361        }
362    }
363
364    /// Serializes a [`View`] to the Apache Arrow data format.
365    pub async fn to_arrow(&self, window: ViewWindow) -> ClientResult<Bytes> {
366        let msg = self.client_message(ClientReq::ViewToArrowReq(ViewToArrowReq {
367            viewport: Some(window.clone().into()),
368            compression: window.compression,
369        }));
370
371        match self.client.oneshot(&msg).await? {
372            ClientResp::ViewToArrowResp(ViewToArrowResp { arrow }) => Ok(arrow.into()),
373            resp => Err(resp.into()),
374        }
375    }
376
377    /// Serializes this [`View`] to a string of JSON data. Useful if you want to
378    /// save additional round trip serialize/deserialize cycles.    
379    pub async fn to_columns_string(&self, window: ViewWindow) -> ClientResult<String> {
380        let msg = self.client_message(ClientReq::ViewToColumnsStringReq(ViewToColumnsStringReq {
381            viewport: Some(window.clone().into()),
382            id: window.id,
383            index: window.index,
384            formatted: window.formatted,
385            leaves_only: window.leaves_only,
386        }));
387
388        match self.client.oneshot(&msg).await? {
389            ClientResp::ViewToColumnsStringResp(ViewToColumnsStringResp { json_string }) => {
390                Ok(json_string)
391            },
392            resp => Err(resp.into()),
393        }
394    }
395
396    /// Render this `View` as a JSON string.
397    pub async fn to_json_string(&self, window: ViewWindow) -> ClientResult<String> {
398        let viewport = ViewPort::from(window.clone());
399        let msg = self.client_message(ClientReq::ViewToRowsStringReq(ViewToRowsStringReq {
400            viewport: Some(viewport),
401            id: window.id,
402            index: window.index,
403            formatted: window.formatted,
404            leaves_only: window.leaves_only,
405        }));
406
407        match self.client.oneshot(&msg).await? {
408            ClientResp::ViewToRowsStringResp(ViewToRowsStringResp { json_string }) => {
409                Ok(json_string)
410            },
411            resp => Err(resp.into()),
412        }
413    }
414
415    /// Renders this [`View`] as an [NDJSON](https://github.com/ndjson/ndjson-spec)
416    /// formatted [`String`].
417    pub async fn to_ndjson(&self, window: ViewWindow) -> ClientResult<String> {
418        let viewport = ViewPort::from(window.clone());
419        let msg = self.client_message(ClientReq::ViewToNdjsonStringReq(ViewToNdjsonStringReq {
420            viewport: Some(viewport),
421            id: window.id,
422            index: window.index,
423            formatted: window.formatted,
424            leaves_only: window.leaves_only,
425        }));
426
427        match self.client.oneshot(&msg).await? {
428            ClientResp::ViewToNdjsonStringResp(ViewToNdjsonStringResp { ndjson_string }) => {
429                Ok(ndjson_string)
430            },
431            resp => Err(resp.into()),
432        }
433    }
434
435    /// Serializes this [`View`] to CSV data in a standard format.
436    pub async fn to_csv(&self, window: ViewWindow) -> ClientResult<String> {
437        let msg = self.client_message(ClientReq::ViewToCsvReq(ViewToCsvReq {
438            viewport: Some(window.into()),
439        }));
440
441        match self.client.oneshot(&msg).await? {
442            ClientResp::ViewToCsvResp(ViewToCsvResp { csv }) => Ok(csv),
443            resp => Err(resp.into()),
444        }
445    }
446
447    /// Delete this [`View`] and clean up all resources associated with it.
448    /// [`View`] objects do not stop consuming resources or processing
449    /// updates when they are garbage collected - you must call this method
450    /// to reclaim these.
451    pub async fn delete(&self) -> ClientResult<()> {
452        let msg = self.client_message(ClientReq::ViewDeleteReq(ViewDeleteReq {}));
453        match self.client.oneshot(&msg).await? {
454            ClientResp::ViewDeleteResp(_) => Ok(()),
455            resp => Err(resp.into()),
456        }
457    }
458
459    /// Calculates the [min, max] of the leaf nodes of a column `column_name`.
460    ///
461    /// # Returns
462    ///
463    /// A tuple of [min, max], whose types are column and aggregate dependent.
464    pub async fn get_min_max(&self, column_name: String) -> ClientResult<(String, String)> {
465        let msg = self.client_message(ClientReq::ViewGetMinMaxReq(ViewGetMinMaxReq {
466            column_name,
467        }));
468
469        match self.client.oneshot(&msg).await? {
470            ClientResp::ViewGetMinMaxResp(ViewGetMinMaxResp { min, max }) => Ok((min, max)),
471            resp => Err(resp.into()),
472        }
473    }
474
475    /// Register a callback with this [`View`]. Whenever the view's underlying
476    /// table emits an update, this callback will be invoked with an object
477    /// containing `port_id`, indicating which port the update fired on, and
478    /// optionally `delta`, which is the new data that was updated for each
479    /// cell or each row.
480    ///
481    /// # Arguments
482    ///
483    /// - `on_update` - A callback function invoked on update, which receives an
484    ///   object with two keys: `port_id`, indicating which port the update was
485    ///   triggered on, and `delta`, whose value is dependent on the mode
486    ///   parameter.
487    /// - `options` - If this is provided as `OnUpdateOptions { mode:
488    ///   Some(OnUpdateMode::Row) }`, then `delta` is an Arrow of the updated
489    ///   rows. Otherwise `delta` will be [`Option::None`].
490    pub async fn on_update<T, U>(&self, on_update: T, options: OnUpdateOptions) -> ClientResult<u32>
491    where
492        T: Fn(OnUpdateData) -> U + Send + Sync + 'static,
493        U: Future<Output = ()> + Send + 'static,
494    {
495        let on_update = Arc::new(on_update);
496        let callback = move |resp: Response| {
497            let on_update = on_update.clone();
498            async move {
499                match resp.client_resp {
500                    Some(ClientResp::ViewOnUpdateResp(resp)) => {
501                        on_update(OnUpdateData(resp)).await;
502                        Ok(())
503                    },
504                    resp => Err(resp.into()),
505                }
506            }
507        };
508
509        let msg = self.client_message(ClientReq::ViewOnUpdateReq(ViewOnUpdateReq {
510            mode: options.mode.map(|OnUpdateMode::Row| Mode::Row as i32),
511        }));
512
513        self.client.subscribe(&msg, callback).await?;
514        Ok(msg.msg_id)
515    }
516
517    /// Unregister a previously registered update callback with this [`View`].
518    ///
519    /// # Arguments
520    ///
521    /// - `id` - A callback `id` as returned by a recipricol call to
522    ///   [`View::on_update`].
523    ///
524    /// # Examples
525    ///
526    /// ```rust
527    /// let callback = |_| async { print!("Updated!") };
528    /// let cid = view.on_update(callback, OnUpdateOptions::default()).await?;
529    /// view.remove_update(cid).await?;
530    /// ```
531    pub async fn remove_update(&self, update_id: u32) -> ClientResult<()> {
532        let msg = self.client_message(ClientReq::ViewRemoveOnUpdateReq(ViewRemoveOnUpdateReq {
533            id: update_id,
534        }));
535
536        self.client.unsubscribe(update_id).await?;
537        match self.client.oneshot(&msg).await? {
538            ClientResp::ViewRemoveOnUpdateResp(_) => Ok(()),
539            resp => Err(resp.into()),
540        }
541    }
542
543    /// Register a callback with this [`View`]. Whenever the [`View`] is
544    /// deleted, this callback will be invoked.
545    pub async fn on_delete(
546        &self,
547        on_delete: Box<dyn Fn() + Send + Sync + 'static>,
548    ) -> ClientResult<u32> {
549        let callback = move |resp: Response| match resp.client_resp.unwrap() {
550            ClientResp::ViewOnDeleteResp(_) => {
551                on_delete();
552                Ok(())
553            },
554            resp => Err(resp.into()),
555        };
556
557        let msg = self.client_message(ClientReq::ViewOnDeleteReq(ViewOnDeleteReq {}));
558        self.client.subscribe_once(&msg, Box::new(callback)).await?;
559        Ok(msg.msg_id)
560    }
561
562    /// Unregister a previously registered [`View::on_delete`] callback.
563    pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
564        let msg = self.client_message(ClientReq::ViewRemoveDeleteReq(ViewRemoveDeleteReq {
565            id: callback_id,
566        }));
567
568        match self.client.oneshot(&msg).await? {
569            ClientResp::ViewRemoveDeleteResp(ViewRemoveDeleteResp {}) => Ok(()),
570            resp => Err(resp.into()),
571        }
572    }
573
574    /// Collapses the `group_by` row at `row_index`.
575    pub async fn collapse(&self, row_index: u32) -> ClientResult<u32> {
576        let msg = self.client_message(ClientReq::ViewCollapseReq(ViewCollapseReq { row_index }));
577        match self.client.oneshot(&msg).await? {
578            ClientResp::ViewCollapseResp(ViewCollapseResp { num_changed }) => Ok(num_changed),
579            resp => Err(resp.into()),
580        }
581    }
582
583    /// Expand the `group_by` row at `row_index`.
584    pub async fn expand(&self, row_index: u32) -> ClientResult<u32> {
585        let msg = self.client_message(ClientReq::ViewExpandReq(ViewExpandReq { row_index }));
586        match self.client.oneshot(&msg).await? {
587            ClientResp::ViewExpandResp(ViewExpandResp { num_changed }) => Ok(num_changed),
588            resp => Err(resp.into()),
589        }
590    }
591
592    /// Set expansion `depth` of the `group_by` tree.
593    pub async fn set_depth(&self, depth: u32) -> ClientResult<()> {
594        let msg = self.client_message(ClientReq::ViewSetDepthReq(ViewSetDepthReq { depth }));
595        match self.client.oneshot(&msg).await? {
596            ClientResp::ViewSetDepthResp(_) => Ok(()),
597            resp => Err(resp.into()),
598        }
599    }
600}