perspective_client/view.rs
1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
5// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors. ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use futures::Future;
19use prost::bytes::Bytes;
20use serde::{Deserialize, Serialize};
21use ts_rs::TS;
22
23use self::view_on_update_req::Mode;
24use crate::assert_view_api;
25use crate::client::Client;
26use crate::proto::request::ClientReq;
27use crate::proto::response::ClientResp;
28use crate::proto::*;
29#[cfg(doc)]
30use crate::table::Table;
31pub use crate::utils::*;
32
33/// Options for [`View::on_update`].
34#[derive(Default, Debug, Deserialize, TS)]
35pub struct OnUpdateOptions {
36 pub mode: Option<OnUpdateMode>,
37}
38
39/// The update mode for [`View::on_update`].
40///
41/// `Row` mode calculates and provides the update batch new rows/columns as an
42/// Apache Arrow to the callback provided to [`View::on_update`]. This allows
43/// incremental updates if your callbakc can read this format, but should be
44/// disabled otherwise.
45#[derive(Default, Debug, Deserialize, TS)]
46pub enum OnUpdateMode {
47 #[default]
48 #[serde(rename = "row")]
49 Row,
50}
51
52impl FromStr for OnUpdateMode {
53 type Err = ClientError;
54
55 fn from_str(s: &str) -> Result<Self, Self::Err> {
56 if s == "row" {
57 Ok(OnUpdateMode::Row)
58 } else {
59 Err(ClientError::Option)
60 }
61 }
62}
63
64#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
65pub struct ColumnWindow {
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub start_col: Option<f32>,
68
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub end_col: Option<f32>,
71}
72
73/// Options for serializing a window of data from a [`View`].
74///
75/// Some fields of [`ViewWindow`] are only applicable to specific methods of
76/// [`View`].
77#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
78pub struct ViewWindow {
79 #[ts(optional)]
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub start_row: Option<f64>,
82
83 #[ts(optional)]
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub start_col: Option<f64>,
86
87 #[ts(optional)]
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub end_row: Option<f64>,
90
91 #[ts(optional)]
92 #[serde(skip_serializing_if = "Option::is_none")]
93 pub end_col: Option<f64>,
94
95 #[ts(optional)]
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub id: Option<bool>,
98
99 #[ts(optional)]
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub index: Option<bool>,
102
103 #[ts(optional)]
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub leaves_only: Option<bool>,
106
107 /// Only impacts [`View::to_csv`]
108 #[ts(optional)]
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub formatted: Option<bool>,
111
112 /// Only impacts [`View::to_arrow`]
113 #[ts(optional)]
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub compression: Option<String>,
116}
117
118impl From<ViewWindow> for ViewPort {
119 fn from(window: ViewWindow) -> Self {
120 ViewPort {
121 start_row: window.start_row.map(|x| x.floor() as u32),
122 start_col: window.start_col.map(|x| x.floor() as u32),
123 end_row: window.end_row.map(|x| x.ceil() as u32),
124 end_col: window.end_col.map(|x| x.ceil() as u32),
125 }
126 }
127}
128
129impl From<ViewPort> for ViewWindow {
130 fn from(window: ViewPort) -> Self {
131 ViewWindow {
132 start_row: window.start_row.map(|x| x as f64),
133 start_col: window.start_col.map(|x| x as f64),
134 end_row: window.end_row.map(|x| x as f64),
135 end_col: window.end_col.map(|x| x as f64),
136 ..ViewWindow::default()
137 }
138 }
139}
140
141/// Rows updated and port ID corresponding to an update batch, provided to the
142/// callback argument to [`View::on_update`] with the "rows" mode.
143#[derive(TS)]
144pub struct OnUpdateData(crate::proto::ViewOnUpdateResp);
145
146impl Deref for OnUpdateData {
147 type Target = crate::proto::ViewOnUpdateResp;
148
149 fn deref(&self) -> &Self::Target {
150 &self.0
151 }
152}
153
154/// The [`View`] struct is Perspective's query and serialization interface. It
155/// represents a query on the `Table`'s dataset and is always created from an
156/// existing `Table` instance via the [`Table::view`] method.
157///
158/// [`View`]s are immutable with respect to the arguments provided to the
159/// [`Table::view`] method; to change these parameters, you must create a new
160/// [`View`] on the same [`Table`]. However, each [`View`] is _live_ with
161/// respect to the [`Table`]'s data, and will (within a conflation window)
162/// update with the latest state as its parent [`Table`] updates, including
163/// incrementally recalculating all aggregates, pivots, filters, etc. [`View`]
164/// query parameters are composable, in that each parameter works independently
165/// _and_ in conjunction with each other, and there is no limit to the number of
166/// pivots, filters, etc. which can be applied.
167///
168/// To construct a [`View`], call the [`Table::view`] factory method. A
169/// [`Table`] can have as many [`View`]s associated with it as you need -
170/// Perspective conserves memory by relying on a single [`Table`] to power
171/// multiple [`View`]s concurrently.
172///
173/// # Examples
174///
175/// ```rust
176/// let opts = TableInitOptions::default();
177/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
178/// let table = client.table(data, opts).await?;
179///
180/// let view = table.view(None).await?;
181/// let arrow = view.to_arrow().await?;
182/// view.delete().await?;
183/// ```
184///
185/// ```rust
186/// use crate::config::*;
187/// let view = table
188/// .view(Some(ViewConfigUpdate {
189/// columns: Some(vec![Some("Sales".into())]),
190/// aggregates: Some(HashMap::from_iter(vec![("Sales".into(), "sum".into())])),
191/// group_by: Some(vec!["Region".into(), "Country".into()]),
192/// filter: Some(vec![Filter::new("Category", "in", &[
193/// "Furniture",
194/// "Technology",
195/// ])]),
196/// ..ViewConfigUpdate::default()
197/// }))
198/// .await?;
199/// ```
200///
201/// Group By
202///
203/// ```rust
204/// let view = table
205/// .view(Some(ViewConfigUpdate {
206/// group_by: Some(vec!["a".into(), "c".into()]),
207/// ..ViewConfigUpdate::default()
208/// }))
209/// .await?;
210/// ```
211///
212/// Split By
213///
214/// ```rust
215/// let view = table
216/// .view(Some(ViewConfigUpdate {
217/// split_by: Some(vec!["a".into(), "c".into()]),
218/// ..ViewConfigUpdate::default()
219/// }))
220/// .await?;
221/// ```
222///
223/// In Javascript, a [`Table`] can be constructed on a [`Table::view`] instance,
224/// which will return a new [`Table`] based on the [`Table::view`]'s dataset,
225/// and all future updates that affect the [`Table::view`] will be forwarded to
226/// the new [`Table`]. This is particularly useful for implementing a
227/// [Client/Server Replicated](server.md#clientserver-replicated) design, by
228/// serializing the `View` to an arrow and setting up an `on_update` callback.
229///
230/// ```rust
231/// let opts = TableInitOptions::default();
232/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
233/// let table = client.table(data, opts).await?;
234/// let view = table.view(None).await?;
235/// let table2 = client.table(TableData::View(view)).await?;
236/// table.update(data).await?;
237/// ```
238#[derive(Clone, Debug)]
239pub struct View {
240 pub name: String,
241 client: Client,
242}
243
244assert_view_api!(View);
245
246impl View {
247 pub fn new(name: String, client: Client) -> Self {
248 View { name, client }
249 }
250
251 fn client_message(&self, req: ClientReq) -> Request {
252 crate::proto::Request {
253 msg_id: self.client.gen_id(),
254 entity_id: self.name.clone(),
255 client_req: Some(req),
256 }
257 }
258
259 /// Returns an array of strings containing the column paths of the [`View`]
260 /// without any of the source columns.
261 ///
262 /// A column path shows the columns that a given cell belongs to after
263 /// pivots are applied.
264 pub async fn column_paths(&self, window: ColumnWindow) -> ClientResult<Vec<String>> {
265 let msg = self.client_message(ClientReq::ViewColumnPathsReq(ViewColumnPathsReq {
266 start_col: window.start_col.map(|x| x as u32),
267 end_col: window.end_col.map(|x| x as u32),
268 }));
269
270 match self.client.oneshot(&msg).await? {
271 ClientResp::ViewColumnPathsResp(ViewColumnPathsResp { paths }) => {
272 // Ok(paths.into_iter().map(|x| x.path).collect())
273 Ok(paths)
274 },
275 resp => Err(resp.into()),
276 }
277 }
278
279 /// Returns this [`View`]'s _dimensions_, row and column count, as well as
280 /// those of the [`crate::Table`] from which it was derived.
281 ///
282 /// - `num_table_rows` - The number of rows in the underlying
283 /// [`crate::Table`].
284 /// - `num_table_columns` - The number of columns in the underlying
285 /// [`crate::Table`] (including the `index` column if this
286 /// [`crate::Table`] was constructed with one).
287 /// - `num_view_rows` - The number of rows in this [`View`]. If this
288 /// [`View`] has a `group_by` clause, `num_view_rows` will also include
289 /// aggregated rows.
290 /// - `num_view_columns` - The number of columns in this [`View`]. If this
291 /// [`View`] has a `split_by` clause, `num_view_columns` will include all
292 /// _column paths_, e.g. the number of `columns` clause times the number
293 /// of `split_by` groups.
294 pub async fn dimensions(&self) -> ClientResult<ViewDimensionsResp> {
295 let msg = self.client_message(ClientReq::ViewDimensionsReq(ViewDimensionsReq {}));
296 match self.client.oneshot(&msg).await? {
297 ClientResp::ViewDimensionsResp(resp) => Ok(resp),
298 resp => Err(resp.into()),
299 }
300 }
301
302 /// The expression schema of this [`View`], which contains only the
303 /// expressions created on this [`View`]. See [`View::schema`] for
304 /// details.
305 pub async fn expression_schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
306 if self.client.get_features().await?.expressions {
307 let msg = self.client_message(ClientReq::ViewExpressionSchemaReq(
308 ViewExpressionSchemaReq {},
309 ));
310 match self.client.oneshot(&msg).await? {
311 ClientResp::ViewExpressionSchemaResp(ViewExpressionSchemaResp { schema }) => {
312 Ok(schema
313 .into_iter()
314 .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
315 .collect())
316 },
317 resp => Err(resp.into()),
318 }
319 } else {
320 Ok([].into_iter().collect())
321 }
322 }
323
324 /// A copy of the [`ViewConfig`] object passed to the [`Table::view`] method
325 /// which created this [`View`].
326 pub async fn get_config(&self) -> ClientResult<crate::config::ViewConfig> {
327 let msg = self.client_message(ClientReq::ViewGetConfigReq(ViewGetConfigReq {}));
328 match self.client.oneshot(&msg).await? {
329 ClientResp::ViewGetConfigResp(ViewGetConfigResp {
330 config: Some(config),
331 }) => Ok(config.into()),
332 resp => Err(resp.into()),
333 }
334 }
335
336 /// The number of aggregated rows in this [`View`]. This is affected by the
337 /// "group_by" configuration parameter supplied to this view's contructor.
338 ///
339 /// # Returns
340 ///
341 /// The number of aggregated rows.
342 pub async fn num_rows(&self) -> ClientResult<u32> {
343 Ok(self.dimensions().await?.num_view_rows)
344 }
345
346 /// The schema of this [`View`].
347 ///
348 /// The [`View`] schema differs from the `schema` returned by
349 /// [`Table::schema`]; it may have different column names due to
350 /// `expressions` or `columns` configs, or it maye have _different
351 /// column types_ due to the application og `group_by` and `aggregates`
352 /// config. You can think of [`Table::schema`] as the _input_ schema and
353 /// [`View::schema`] as the _output_ schema of a Perspective pipeline.
354 pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
355 let msg = self.client_message(ClientReq::ViewSchemaReq(ViewSchemaReq {}));
356 match self.client.oneshot(&msg).await? {
357 ClientResp::ViewSchemaResp(ViewSchemaResp { schema }) => Ok(schema
358 .into_iter()
359 .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
360 .collect()),
361 resp => Err(resp.into()),
362 }
363 }
364
365 /// Serializes a [`View`] to the Apache Arrow data format.
366 pub async fn to_arrow(&self, window: ViewWindow) -> ClientResult<Bytes> {
367 let msg = self.client_message(ClientReq::ViewToArrowReq(ViewToArrowReq {
368 viewport: Some(window.clone().into()),
369 compression: window.compression,
370 }));
371
372 match self.client.oneshot(&msg).await? {
373 ClientResp::ViewToArrowResp(ViewToArrowResp { arrow }) => Ok(arrow.into()),
374 resp => Err(resp.into()),
375 }
376 }
377
378 /// Serializes this [`View`] to a string of JSON data. Useful if you want to
379 /// save additional round trip serialize/deserialize cycles.
380 pub async fn to_columns_string(&self, window: ViewWindow) -> ClientResult<String> {
381 let msg = self.client_message(ClientReq::ViewToColumnsStringReq(ViewToColumnsStringReq {
382 viewport: Some(window.clone().into()),
383 id: window.id,
384 index: window.index,
385 formatted: window.formatted,
386 leaves_only: window.leaves_only,
387 }));
388
389 match self.client.oneshot(&msg).await? {
390 ClientResp::ViewToColumnsStringResp(ViewToColumnsStringResp { json_string }) => {
391 Ok(json_string)
392 },
393 resp => Err(resp.into()),
394 }
395 }
396
397 /// Render this `View` as a JSON string.
398 pub async fn to_json_string(&self, window: ViewWindow) -> ClientResult<String> {
399 let viewport = ViewPort::from(window.clone());
400 let msg = self.client_message(ClientReq::ViewToRowsStringReq(ViewToRowsStringReq {
401 viewport: Some(viewport),
402 id: window.id,
403 index: window.index,
404 formatted: window.formatted,
405 leaves_only: window.leaves_only,
406 }));
407
408 match self.client.oneshot(&msg).await? {
409 ClientResp::ViewToRowsStringResp(ViewToRowsStringResp { json_string }) => {
410 Ok(json_string)
411 },
412 resp => Err(resp.into()),
413 }
414 }
415
416 /// Renders this [`View`] as an [NDJSON](https://github.com/ndjson/ndjson-spec)
417 /// formatted [`String`].
418 pub async fn to_ndjson(&self, window: ViewWindow) -> ClientResult<String> {
419 let viewport = ViewPort::from(window.clone());
420 let msg = self.client_message(ClientReq::ViewToNdjsonStringReq(ViewToNdjsonStringReq {
421 viewport: Some(viewport),
422 id: window.id,
423 index: window.index,
424 formatted: window.formatted,
425 leaves_only: window.leaves_only,
426 }));
427
428 match self.client.oneshot(&msg).await? {
429 ClientResp::ViewToNdjsonStringResp(ViewToNdjsonStringResp { ndjson_string }) => {
430 Ok(ndjson_string)
431 },
432 resp => Err(resp.into()),
433 }
434 }
435
436 /// Serializes this [`View`] to CSV data in a standard format.
437 pub async fn to_csv(&self, window: ViewWindow) -> ClientResult<String> {
438 let msg = self.client_message(ClientReq::ViewToCsvReq(ViewToCsvReq {
439 viewport: Some(window.into()),
440 }));
441
442 match self.client.oneshot(&msg).await? {
443 ClientResp::ViewToCsvResp(ViewToCsvResp { csv }) => Ok(csv),
444 resp => Err(resp.into()),
445 }
446 }
447
448 /// Delete this [`View`] and clean up all resources associated with it.
449 /// [`View`] objects do not stop consuming resources or processing
450 /// updates when they are garbage collected - you must call this method
451 /// to reclaim these.
452 pub async fn delete(&self) -> ClientResult<()> {
453 let msg = self.client_message(ClientReq::ViewDeleteReq(ViewDeleteReq {}));
454 match self.client.oneshot(&msg).await? {
455 ClientResp::ViewDeleteResp(_) => Ok(()),
456 resp => Err(resp.into()),
457 }
458 }
459
460 /// Calculates the [min, max] of the leaf nodes of a column `column_name`.
461 ///
462 /// # Returns
463 ///
464 /// A tuple of [min, max], whose types are column and aggregate dependent.
465 pub async fn get_min_max(&self, column_name: String) -> ClientResult<(String, String)> {
466 let msg = self.client_message(ClientReq::ViewGetMinMaxReq(ViewGetMinMaxReq {
467 column_name,
468 }));
469
470 match self.client.oneshot(&msg).await? {
471 ClientResp::ViewGetMinMaxResp(ViewGetMinMaxResp { min, max }) => Ok((min, max)),
472 resp => Err(resp.into()),
473 }
474 }
475
476 /// Register a callback with this [`View`]. Whenever the view's underlying
477 /// table emits an update, this callback will be invoked with an object
478 /// containing `port_id`, indicating which port the update fired on, and
479 /// optionally `delta`, which is the new data that was updated for each
480 /// cell or each row.
481 ///
482 /// # Arguments
483 ///
484 /// - `on_update` - A callback function invoked on update, which receives an
485 /// object with two keys: `port_id`, indicating which port the update was
486 /// triggered on, and `delta`, whose value is dependent on the mode
487 /// parameter.
488 /// - `options` - If this is provided as `OnUpdateOptions { mode:
489 /// Some(OnUpdateMode::Row) }`, then `delta` is an Arrow of the updated
490 /// rows. Otherwise `delta` will be [`Option::None`].
491 pub async fn on_update<T, U>(&self, on_update: T, options: OnUpdateOptions) -> ClientResult<u32>
492 where
493 T: Fn(OnUpdateData) -> U + Send + Sync + 'static,
494 U: Future<Output = ()> + Send + 'static,
495 {
496 let on_update = Arc::new(on_update);
497 let callback = move |resp: Response| {
498 let on_update = on_update.clone();
499 async move {
500 match resp.client_resp {
501 Some(ClientResp::ViewOnUpdateResp(resp)) => {
502 on_update(OnUpdateData(resp)).await;
503 Ok(())
504 },
505 resp => Err(resp.into()),
506 }
507 }
508 };
509
510 let msg = self.client_message(ClientReq::ViewOnUpdateReq(ViewOnUpdateReq {
511 mode: options.mode.map(|OnUpdateMode::Row| Mode::Row as i32),
512 }));
513
514 self.client.subscribe(&msg, callback).await?;
515 Ok(msg.msg_id)
516 }
517
518 /// Unregister a previously registered update callback with this [`View`].
519 ///
520 /// # Arguments
521 ///
522 /// - `id` - A callback `id` as returned by a recipricol call to
523 /// [`View::on_update`].
524 ///
525 /// # Examples
526 ///
527 /// ```rust
528 /// let callback = |_| async { print!("Updated!") };
529 /// let cid = view.on_update(callback, OnUpdateOptions::default()).await?;
530 /// view.remove_update(cid).await?;
531 /// ```
532 pub async fn remove_update(&self, update_id: u32) -> ClientResult<()> {
533 let msg = self.client_message(ClientReq::ViewRemoveOnUpdateReq(ViewRemoveOnUpdateReq {
534 id: update_id,
535 }));
536
537 self.client.unsubscribe(update_id).await?;
538 match self.client.oneshot(&msg).await? {
539 ClientResp::ViewRemoveOnUpdateResp(_) => Ok(()),
540 resp => Err(resp.into()),
541 }
542 }
543
544 /// Register a callback with this [`View`]. Whenever the [`View`] is
545 /// deleted, this callback will be invoked.
546 pub async fn on_delete(
547 &self,
548 on_delete: Box<dyn Fn() + Send + Sync + 'static>,
549 ) -> ClientResult<u32> {
550 let callback = move |resp: Response| match resp.client_resp.unwrap() {
551 ClientResp::ViewOnDeleteResp(_) => {
552 on_delete();
553 Ok(())
554 },
555 resp => Err(resp.into()),
556 };
557
558 let msg = self.client_message(ClientReq::ViewOnDeleteReq(ViewOnDeleteReq {}));
559 self.client.subscribe_once(&msg, Box::new(callback)).await?;
560 Ok(msg.msg_id)
561 }
562
563 /// Unregister a previously registered [`View::on_delete`] callback.
564 pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
565 let msg = self.client_message(ClientReq::ViewRemoveDeleteReq(ViewRemoveDeleteReq {
566 id: callback_id,
567 }));
568
569 match self.client.oneshot(&msg).await? {
570 ClientResp::ViewRemoveDeleteResp(ViewRemoveDeleteResp {}) => Ok(()),
571 resp => Err(resp.into()),
572 }
573 }
574
575 /// Collapses the `group_by` row at `row_index`.
576 pub async fn collapse(&self, row_index: u32) -> ClientResult<u32> {
577 let msg = self.client_message(ClientReq::ViewCollapseReq(ViewCollapseReq { row_index }));
578 match self.client.oneshot(&msg).await? {
579 ClientResp::ViewCollapseResp(ViewCollapseResp { num_changed }) => Ok(num_changed),
580 resp => Err(resp.into()),
581 }
582 }
583
584 /// Expand the `group_by` row at `row_index`.
585 pub async fn expand(&self, row_index: u32) -> ClientResult<u32> {
586 let msg = self.client_message(ClientReq::ViewExpandReq(ViewExpandReq { row_index }));
587 match self.client.oneshot(&msg).await? {
588 ClientResp::ViewExpandResp(ViewExpandResp { num_changed }) => Ok(num_changed),
589 resp => Err(resp.into()),
590 }
591 }
592
593 /// Set expansion `depth` of the `group_by` tree.
594 pub async fn set_depth(&self, depth: u32) -> ClientResult<()> {
595 let msg = self.client_message(ClientReq::ViewSetDepthReq(ViewSetDepthReq { depth }));
596 match self.client.oneshot(&msg).await? {
597 ClientResp::ViewSetDepthResp(_) => Ok(()),
598 resp => Err(resp.into()),
599 }
600 }
601}