Skip to main content

perspective_client/
view.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use futures::Future;
19use prost::bytes::Bytes;
20use serde::{Deserialize, Serialize};
21use ts_rs::TS;
22
23use self::view_on_update_req::Mode;
24use crate::assert_view_api;
25use crate::client::Client;
26use crate::proto::request::ClientReq;
27use crate::proto::response::ClientResp;
28use crate::proto::*;
29#[cfg(doc)]
30use crate::table::Table;
31pub use crate::utils::*;
32
33/// Options for [`View::on_update`].
34#[derive(Default, Debug, Deserialize, TS)]
35pub struct OnUpdateOptions {
36    pub mode: Option<OnUpdateMode>,
37}
38
39/// The update mode for [`View::on_update`].
40///
41/// `Row` mode calculates and provides the update batch new rows/columns as an
42/// Apache Arrow to the callback provided to [`View::on_update`]. This allows
43/// incremental updates if your callbakc can read this format, but should be
44/// disabled otherwise.
45#[derive(Default, Debug, Deserialize, TS)]
46pub enum OnUpdateMode {
47    #[default]
48    #[serde(rename = "row")]
49    Row,
50}
51
52impl FromStr for OnUpdateMode {
53    type Err = ClientError;
54
55    fn from_str(s: &str) -> Result<Self, Self::Err> {
56        if s == "row" {
57            Ok(OnUpdateMode::Row)
58        } else {
59            Err(ClientError::Option)
60        }
61    }
62}
63
64#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
65pub struct ColumnWindow {
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub start_col: Option<f32>,
68
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub end_col: Option<f32>,
71}
72
73/// Options for serializing a window of data from a [`View`].
74///
75/// Some fields of [`ViewWindow`] are only applicable to specific methods of
76/// [`View`].
77#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
78pub struct ViewWindow {
79    #[ts(optional)]
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub start_row: Option<f64>,
82
83    #[ts(optional)]
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub start_col: Option<f64>,
86
87    #[ts(optional)]
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub end_row: Option<f64>,
90
91    #[ts(optional)]
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub end_col: Option<f64>,
94
95    #[ts(optional)]
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub id: Option<bool>,
98
99    #[ts(optional)]
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub index: Option<bool>,
102
103    /// Only impacts [`View::to_csv`]
104    #[ts(optional)]
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub formatted: Option<bool>,
107
108    /// Only impacts [`View::to_arrow`]
109    #[ts(optional)]
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub compression: Option<String>,
112}
113
114impl From<ViewWindow> for ViewPort {
115    fn from(window: ViewWindow) -> Self {
116        ViewPort {
117            start_row: window.start_row.map(|x| x.floor() as u32),
118            start_col: window.start_col.map(|x| x.floor() as u32),
119            end_row: window.end_row.map(|x| x.ceil() as u32),
120            end_col: window.end_col.map(|x| x.ceil() as u32),
121        }
122    }
123}
124
125impl From<ViewPort> for ViewWindow {
126    fn from(window: ViewPort) -> Self {
127        ViewWindow {
128            start_row: window.start_row.map(|x| x as f64),
129            start_col: window.start_col.map(|x| x as f64),
130            end_row: window.end_row.map(|x| x as f64),
131            end_col: window.end_col.map(|x| x as f64),
132            ..ViewWindow::default()
133        }
134    }
135}
136
137/// Rows updated and port ID corresponding to an update batch, provided to the
138/// callback argument to [`View::on_update`] with the "rows" mode.
139#[derive(TS)]
140pub struct OnUpdateData(crate::proto::ViewOnUpdateResp);
141
142impl Deref for OnUpdateData {
143    type Target = crate::proto::ViewOnUpdateResp;
144
145    fn deref(&self) -> &Self::Target {
146        &self.0
147    }
148}
149
150/// The [`View`] struct is Perspective's query and serialization interface. It
151/// represents a query on the `Table`'s dataset and is always created from an
152/// existing `Table` instance via the [`Table::view`] method.
153///
154/// [`View`]s are immutable with respect to the arguments provided to the
155/// [`Table::view`] method; to change these parameters, you must create a new
156/// [`View`] on the same [`Table`]. However, each [`View`] is _live_ with
157/// respect to the [`Table`]'s data, and will (within a conflation window)
158/// update with the latest state as its parent [`Table`] updates, including
159/// incrementally recalculating all aggregates, pivots, filters, etc. [`View`]
160/// query parameters are composable, in that each parameter works independently
161/// _and_ in conjunction with each other, and there is no limit to the number of
162/// pivots, filters, etc. which can be applied.
163///
164/// To construct a [`View`], call the [`Table::view`] factory method. A
165/// [`Table`] can have as many [`View`]s associated with it as you need -
166/// Perspective conserves memory by relying on a single [`Table`] to power
167/// multiple [`View`]s concurrently.
168///
169/// # Examples
170///
171/// ```rust
172/// let opts = TableInitOptions::default();
173/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
174/// let table = client.table(data, opts).await?;
175///
176/// let view = table.view(None).await?;
177/// let arrow = view.to_arrow().await?;
178/// view.delete().await?;
179/// ```
180///
181/// ```rust
182/// use crate::config::*;
183/// let view = table
184///     .view(Some(ViewConfigUpdate {
185///         columns: Some(vec![Some("Sales".into())]),
186///         aggregates: Some(HashMap::from_iter(vec![("Sales".into(), "sum".into())])),
187///         group_by: Some(vec!["Region".into(), "Country".into()]),
188///         filter: Some(vec![Filter::new("Category", "in", &[
189///             "Furniture",
190///             "Technology",
191///         ])]),
192///         ..ViewConfigUpdate::default()
193///     }))
194///     .await?;
195/// ```
196///
197///  Group By
198///
199/// ```rust
200/// let view = table
201///     .view(Some(ViewConfigUpdate {
202///         group_by: Some(vec!["a".into(), "c".into()]),
203///         ..ViewConfigUpdate::default()
204///     }))
205///     .await?;
206/// ```
207///
208/// Split By
209///
210/// ```rust
211/// let view = table
212///     .view(Some(ViewConfigUpdate {
213///         split_by: Some(vec!["a".into(), "c".into()]),
214///         ..ViewConfigUpdate::default()
215///     }))
216///     .await?;
217/// ```
218///
219/// In Javascript, a [`Table`] can be constructed on a [`Table::view`] instance,
220/// which will return a new [`Table`] based on the [`Table::view`]'s dataset,
221/// and all future updates that affect the [`Table::view`] will be forwarded to
222/// the new [`Table`]. This is particularly useful for implementing a
223/// [Client/Server Replicated](server.md#clientserver-replicated) design, by
224/// serializing the `View` to an arrow and setting up an `on_update` callback.
225///
226/// ```rust
227/// let opts = TableInitOptions::default();
228/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
229/// let table = client.table(data, opts).await?;
230/// let view = table.view(None).await?;
231/// let table2 = client.table(TableData::View(view)).await?;
232/// table.update(data).await?;
233/// ```
234#[derive(Clone, Debug)]
235pub struct View {
236    pub name: String,
237    client: Client,
238}
239
240assert_view_api!(View);
241
242impl View {
243    pub fn new(name: String, client: Client) -> Self {
244        View { name, client }
245    }
246
247    fn client_message(&self, req: ClientReq) -> Request {
248        crate::proto::Request {
249            msg_id: self.client.gen_id(),
250            entity_id: self.name.clone(),
251            client_req: Some(req),
252        }
253    }
254
255    /// Returns an array of strings containing the column paths of the [`View`]
256    /// without any of the source columns.
257    ///
258    /// A column path shows the columns that a given cell belongs to after
259    /// pivots are applied.
260    pub async fn column_paths(&self, window: ColumnWindow) -> ClientResult<Vec<String>> {
261        let msg = self.client_message(ClientReq::ViewColumnPathsReq(ViewColumnPathsReq {
262            start_col: window.start_col.map(|x| x as u32),
263            end_col: window.end_col.map(|x| x as u32),
264        }));
265
266        match self.client.oneshot(&msg).await? {
267            ClientResp::ViewColumnPathsResp(ViewColumnPathsResp { paths }) => {
268                // Ok(paths.into_iter().map(|x| x.path).collect())
269                Ok(paths)
270            },
271            resp => Err(resp.into()),
272        }
273    }
274
275    /// Returns this [`View`]'s _dimensions_, row and column count, as well as
276    /// those of the [`crate::Table`] from which it was derived.
277    ///
278    /// - `num_table_rows` - The number of rows in the underlying
279    ///   [`crate::Table`].
280    /// - `num_table_columns` - The number of columns in the underlying
281    ///   [`crate::Table`] (including the `index` column if this
282    ///   [`crate::Table`] was constructed with one).
283    /// - `num_view_rows` - The number of rows in this [`View`]. If this
284    ///   [`View`] has a `group_by` clause, `num_view_rows` will also include
285    ///   aggregated rows.
286    /// - `num_view_columns` - The number of columns in this [`View`]. If this
287    ///   [`View`] has a `split_by` clause, `num_view_columns` will include all
288    ///   _column paths_, e.g. the number of `columns` clause times the number
289    ///   of `split_by` groups.
290    pub async fn dimensions(&self) -> ClientResult<ViewDimensionsResp> {
291        let msg = self.client_message(ClientReq::ViewDimensionsReq(ViewDimensionsReq {}));
292        match self.client.oneshot(&msg).await? {
293            ClientResp::ViewDimensionsResp(resp) => Ok(resp),
294            resp => Err(resp.into()),
295        }
296    }
297
298    /// The expression schema of this [`View`], which contains only the
299    /// expressions created on this [`View`]. See [`View::schema`] for
300    /// details.
301    pub async fn expression_schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
302        if self.client.get_features().await?.expressions {
303            let msg = self.client_message(ClientReq::ViewExpressionSchemaReq(
304                ViewExpressionSchemaReq {},
305            ));
306            match self.client.oneshot(&msg).await? {
307                ClientResp::ViewExpressionSchemaResp(ViewExpressionSchemaResp { schema }) => {
308                    Ok(schema
309                        .into_iter()
310                        .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
311                        .collect())
312                },
313                resp => Err(resp.into()),
314            }
315        } else {
316            Ok([].into_iter().collect())
317        }
318    }
319
320    /// A copy of the [`ViewConfig`] object passed to the [`Table::view`] method
321    /// which created this [`View`].
322    pub async fn get_config(&self) -> ClientResult<crate::config::ViewConfig> {
323        let msg = self.client_message(ClientReq::ViewGetConfigReq(ViewGetConfigReq {}));
324        match self.client.oneshot(&msg).await? {
325            ClientResp::ViewGetConfigResp(ViewGetConfigResp {
326                config: Some(config),
327            }) => Ok(config.into()),
328            resp => Err(resp.into()),
329        }
330    }
331
332    /// The number of aggregated rows in this [`View`]. This is affected by the
333    /// "group_by" configuration parameter supplied to this view's contructor.
334    ///
335    /// # Returns
336    ///
337    /// The number of aggregated rows.
338    pub async fn num_rows(&self) -> ClientResult<u32> {
339        Ok(self.dimensions().await?.num_view_rows)
340    }
341
342    /// The schema of this [`View`].
343    ///
344    /// The [`View`] schema differs from the `schema` returned by
345    /// [`Table::schema`]; it may have different column names due to
346    /// `expressions` or `columns` configs, or it maye have _different
347    /// column types_ due to the application og `group_by` and `aggregates`
348    /// config. You can think of [`Table::schema`] as the _input_ schema and
349    /// [`View::schema`] as the _output_ schema of a Perspective pipeline.
350    pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
351        let msg = self.client_message(ClientReq::ViewSchemaReq(ViewSchemaReq {}));
352        match self.client.oneshot(&msg).await? {
353            ClientResp::ViewSchemaResp(ViewSchemaResp { schema }) => Ok(schema
354                .into_iter()
355                .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
356                .collect()),
357            resp => Err(resp.into()),
358        }
359    }
360
361    /// Serializes a [`View`] to the Apache Arrow data format.
362    pub async fn to_arrow(&self, window: ViewWindow) -> ClientResult<Bytes> {
363        let msg = self.client_message(ClientReq::ViewToArrowReq(ViewToArrowReq {
364            viewport: Some(window.clone().into()),
365            compression: window.compression,
366        }));
367
368        match self.client.oneshot(&msg).await? {
369            ClientResp::ViewToArrowResp(ViewToArrowResp { arrow }) => Ok(arrow.into()),
370            resp => Err(resp.into()),
371        }
372    }
373
374    /// Serializes this [`View`] to a string of JSON data. Useful if you want to
375    /// save additional round trip serialize/deserialize cycles.    
376    pub async fn to_columns_string(&self, window: ViewWindow) -> ClientResult<String> {
377        let msg = self.client_message(ClientReq::ViewToColumnsStringReq(ViewToColumnsStringReq {
378            viewport: Some(window.clone().into()),
379            id: window.id,
380            index: window.index,
381            formatted: window.formatted,
382        }));
383
384        match self.client.oneshot(&msg).await? {
385            ClientResp::ViewToColumnsStringResp(ViewToColumnsStringResp { json_string }) => {
386                Ok(json_string)
387            },
388            resp => Err(resp.into()),
389        }
390    }
391
392    /// Render this `View` as a JSON string.
393    pub async fn to_json_string(&self, window: ViewWindow) -> ClientResult<String> {
394        let viewport = ViewPort::from(window.clone());
395        let msg = self.client_message(ClientReq::ViewToRowsStringReq(ViewToRowsStringReq {
396            viewport: Some(viewport),
397            id: window.id,
398            index: window.index,
399            formatted: window.formatted,
400        }));
401
402        match self.client.oneshot(&msg).await? {
403            ClientResp::ViewToRowsStringResp(ViewToRowsStringResp { json_string }) => {
404                Ok(json_string)
405            },
406            resp => Err(resp.into()),
407        }
408    }
409
410    /// Renders this [`View`] as an [NDJSON](https://github.com/ndjson/ndjson-spec)
411    /// formatted [`String`].
412    pub async fn to_ndjson(&self, window: ViewWindow) -> ClientResult<String> {
413        let viewport = ViewPort::from(window.clone());
414        let msg = self.client_message(ClientReq::ViewToNdjsonStringReq(ViewToNdjsonStringReq {
415            viewport: Some(viewport),
416            id: window.id,
417            index: window.index,
418            formatted: window.formatted,
419        }));
420
421        match self.client.oneshot(&msg).await? {
422            ClientResp::ViewToNdjsonStringResp(ViewToNdjsonStringResp { ndjson_string }) => {
423                Ok(ndjson_string)
424            },
425            resp => Err(resp.into()),
426        }
427    }
428
429    /// Serializes this [`View`] to CSV data in a standard format.
430    pub async fn to_csv(&self, window: ViewWindow) -> ClientResult<String> {
431        let msg = self.client_message(ClientReq::ViewToCsvReq(ViewToCsvReq {
432            viewport: Some(window.into()),
433        }));
434
435        match self.client.oneshot(&msg).await? {
436            ClientResp::ViewToCsvResp(ViewToCsvResp { csv }) => Ok(csv),
437            resp => Err(resp.into()),
438        }
439    }
440
441    /// Delete this [`View`] and clean up all resources associated with it.
442    /// [`View`] objects do not stop consuming resources or processing
443    /// updates when they are garbage collected - you must call this method
444    /// to reclaim these.
445    pub async fn delete(&self) -> ClientResult<()> {
446        let msg = self.client_message(ClientReq::ViewDeleteReq(ViewDeleteReq {}));
447        match self.client.oneshot(&msg).await? {
448            ClientResp::ViewDeleteResp(_) => Ok(()),
449            resp => Err(resp.into()),
450        }
451    }
452
453    /// Calculates the [min, max] of the leaf nodes of a column `column_name`.
454    ///
455    /// # Returns
456    ///
457    /// A tuple of [min, max], whose types are column and aggregate dependent.
458    pub async fn get_min_max(&self, column_name: String) -> ClientResult<(String, String)> {
459        let msg = self.client_message(ClientReq::ViewGetMinMaxReq(ViewGetMinMaxReq {
460            column_name,
461        }));
462
463        match self.client.oneshot(&msg).await? {
464            ClientResp::ViewGetMinMaxResp(ViewGetMinMaxResp { min, max }) => Ok((min, max)),
465            resp => Err(resp.into()),
466        }
467    }
468
469    /// Register a callback with this [`View`]. Whenever the view's underlying
470    /// table emits an update, this callback will be invoked with an object
471    /// containing `port_id`, indicating which port the update fired on, and
472    /// optionally `delta`, which is the new data that was updated for each
473    /// cell or each row.
474    ///
475    /// # Arguments
476    ///
477    /// - `on_update` - A callback function invoked on update, which receives an
478    ///   object with two keys: `port_id`, indicating which port the update was
479    ///   triggered on, and `delta`, whose value is dependent on the mode
480    ///   parameter.
481    /// - `options` - If this is provided as `OnUpdateOptions { mode:
482    ///   Some(OnUpdateMode::Row) }`, then `delta` is an Arrow of the updated
483    ///   rows. Otherwise `delta` will be [`Option::None`].
484    pub async fn on_update<T, U>(&self, on_update: T, options: OnUpdateOptions) -> ClientResult<u32>
485    where
486        T: Fn(OnUpdateData) -> U + Send + Sync + 'static,
487        U: Future<Output = ()> + Send + 'static,
488    {
489        let on_update = Arc::new(on_update);
490        let callback = move |resp: Response| {
491            let on_update = on_update.clone();
492            async move {
493                match resp.client_resp {
494                    Some(ClientResp::ViewOnUpdateResp(resp)) => {
495                        on_update(OnUpdateData(resp)).await;
496                        Ok(())
497                    },
498                    resp => Err(resp.into()),
499                }
500            }
501        };
502
503        let msg = self.client_message(ClientReq::ViewOnUpdateReq(ViewOnUpdateReq {
504            mode: options.mode.map(|OnUpdateMode::Row| Mode::Row as i32),
505        }));
506
507        self.client.subscribe(&msg, callback).await?;
508        Ok(msg.msg_id)
509    }
510
511    /// Unregister a previously registered update callback with this [`View`].
512    ///
513    /// # Arguments
514    ///
515    /// - `id` - A callback `id` as returned by a recipricol call to
516    ///   [`View::on_update`].
517    ///
518    /// # Examples
519    ///
520    /// ```rust
521    /// let callback = |_| async { print!("Updated!") };
522    /// let cid = view.on_update(callback, OnUpdateOptions::default()).await?;
523    /// view.remove_update(cid).await?;
524    /// ```
525    pub async fn remove_update(&self, update_id: u32) -> ClientResult<()> {
526        let msg = self.client_message(ClientReq::ViewRemoveOnUpdateReq(ViewRemoveOnUpdateReq {
527            id: update_id,
528        }));
529
530        self.client.unsubscribe(update_id).await?;
531        match self.client.oneshot(&msg).await? {
532            ClientResp::ViewRemoveOnUpdateResp(_) => Ok(()),
533            resp => Err(resp.into()),
534        }
535    }
536
537    /// Register a callback with this [`View`]. Whenever the [`View`] is
538    /// deleted, this callback will be invoked.
539    pub async fn on_delete(
540        &self,
541        on_delete: Box<dyn Fn() + Send + Sync + 'static>,
542    ) -> ClientResult<u32> {
543        let callback = move |resp: Response| match resp.client_resp.unwrap() {
544            ClientResp::ViewOnDeleteResp(_) => {
545                on_delete();
546                Ok(())
547            },
548            resp => Err(resp.into()),
549        };
550
551        let msg = self.client_message(ClientReq::ViewOnDeleteReq(ViewOnDeleteReq {}));
552        self.client.subscribe_once(&msg, Box::new(callback)).await?;
553        Ok(msg.msg_id)
554    }
555
556    /// Unregister a previously registered [`View::on_delete`] callback.
557    pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
558        let msg = self.client_message(ClientReq::ViewRemoveDeleteReq(ViewRemoveDeleteReq {
559            id: callback_id,
560        }));
561
562        match self.client.oneshot(&msg).await? {
563            ClientResp::ViewRemoveDeleteResp(ViewRemoveDeleteResp {}) => Ok(()),
564            resp => Err(resp.into()),
565        }
566    }
567
568    /// Collapses the `group_by` row at `row_index`.
569    pub async fn collapse(&self, row_index: u32) -> ClientResult<u32> {
570        let msg = self.client_message(ClientReq::ViewCollapseReq(ViewCollapseReq { row_index }));
571        match self.client.oneshot(&msg).await? {
572            ClientResp::ViewCollapseResp(ViewCollapseResp { num_changed }) => Ok(num_changed),
573            resp => Err(resp.into()),
574        }
575    }
576
577    /// Expand the `group_by` row at `row_index`.
578    pub async fn expand(&self, row_index: u32) -> ClientResult<u32> {
579        let msg = self.client_message(ClientReq::ViewExpandReq(ViewExpandReq { row_index }));
580        match self.client.oneshot(&msg).await? {
581            ClientResp::ViewExpandResp(ViewExpandResp { num_changed }) => Ok(num_changed),
582            resp => Err(resp.into()),
583        }
584    }
585
586    /// Set expansion `depth` of the `group_by` tree.
587    pub async fn set_depth(&self, depth: u32) -> ClientResult<()> {
588        let msg = self.client_message(ClientReq::ViewSetDepthReq(ViewSetDepthReq { depth }));
589        match self.client.oneshot(&msg).await? {
590            ClientResp::ViewSetDepthResp(_) => Ok(()),
591            resp => Err(resp.into()),
592        }
593    }
594}