perspective_client/view.rs
1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
5// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors. ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use futures::Future;
19use prost::bytes::Bytes;
20use serde::{Deserialize, Serialize};
21use ts_rs::TS;
22
23use self::view_on_update_req::Mode;
24use crate::assert_view_api;
25use crate::client::Client;
26use crate::proto::request::ClientReq;
27use crate::proto::response::ClientResp;
28use crate::proto::*;
29#[cfg(doc)]
30use crate::table::Table;
31pub use crate::utils::*;
32
33/// Options for [`View::on_update`].
34#[derive(Default, Debug, Deserialize, TS)]
35pub struct OnUpdateOptions {
36 pub mode: Option<OnUpdateMode>,
37}
38
39/// The update mode for [`View::on_update`].
40///
41/// `Row` mode calculates and provides the update batch new rows/columns as an
42/// Apache Arrow to the callback provided to [`View::on_update`]. This allows
43/// incremental updates if your callbakc can read this format, but should be
44/// disabled otherwise.
45#[derive(Default, Debug, Deserialize, TS)]
46pub enum OnUpdateMode {
47 #[default]
48 #[serde(rename = "row")]
49 Row,
50}
51
52impl FromStr for OnUpdateMode {
53 type Err = ClientError;
54
55 fn from_str(s: &str) -> Result<Self, Self::Err> {
56 if s == "row" {
57 Ok(OnUpdateMode::Row)
58 } else {
59 Err(ClientError::Option)
60 }
61 }
62}
63
64#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
65pub struct ColumnWindow {
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub start_col: Option<f32>,
68
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub end_col: Option<f32>,
71}
72
73/// Options for serializing a window of data from a [`View`].
74///
75/// Some fields of [`ViewWindow`] are only applicable to specific methods of
76/// [`View`].
77#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
78pub struct ViewWindow {
79 #[ts(optional)]
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub start_row: Option<f64>,
82
83 #[ts(optional)]
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub start_col: Option<f64>,
86
87 #[ts(optional)]
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub end_row: Option<f64>,
90
91 #[ts(optional)]
92 #[serde(skip_serializing_if = "Option::is_none")]
93 pub end_col: Option<f64>,
94
95 #[ts(optional)]
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub id: Option<bool>,
98
99 #[ts(optional)]
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub index: Option<bool>,
102
103 /// Only impacts [`View::to_csv`]
104 #[ts(optional)]
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub formatted: Option<bool>,
107
108 /// Only impacts [`View::to_arrow`]
109 #[ts(optional)]
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub compression: Option<String>,
112}
113
114impl From<ViewWindow> for ViewPort {
115 fn from(window: ViewWindow) -> Self {
116 ViewPort {
117 start_row: window.start_row.map(|x| x.floor() as u32),
118 start_col: window.start_col.map(|x| x.floor() as u32),
119 end_row: window.end_row.map(|x| x.ceil() as u32),
120 end_col: window.end_col.map(|x| x.ceil() as u32),
121 }
122 }
123}
124
125impl From<ViewPort> for ViewWindow {
126 fn from(window: ViewPort) -> Self {
127 ViewWindow {
128 start_row: window.start_row.map(|x| x as f64),
129 start_col: window.start_col.map(|x| x as f64),
130 end_row: window.end_row.map(|x| x as f64),
131 end_col: window.end_col.map(|x| x as f64),
132 ..ViewWindow::default()
133 }
134 }
135}
136
137/// Rows updated and port ID corresponding to an update batch, provided to the
138/// callback argument to [`View::on_update`] with the "rows" mode.
139#[derive(TS)]
140pub struct OnUpdateData(crate::proto::ViewOnUpdateResp);
141
142impl Deref for OnUpdateData {
143 type Target = crate::proto::ViewOnUpdateResp;
144
145 fn deref(&self) -> &Self::Target {
146 &self.0
147 }
148}
149
150/// The [`View`] struct is Perspective's query and serialization interface. It
151/// represents a query on the `Table`'s dataset and is always created from an
152/// existing `Table` instance via the [`Table::view`] method.
153///
154/// [`View`]s are immutable with respect to the arguments provided to the
155/// [`Table::view`] method; to change these parameters, you must create a new
156/// [`View`] on the same [`Table`]. However, each [`View`] is _live_ with
157/// respect to the [`Table`]'s data, and will (within a conflation window)
158/// update with the latest state as its parent [`Table`] updates, including
159/// incrementally recalculating all aggregates, pivots, filters, etc. [`View`]
160/// query parameters are composable, in that each parameter works independently
161/// _and_ in conjunction with each other, and there is no limit to the number of
162/// pivots, filters, etc. which can be applied.
163///
164/// To construct a [`View`], call the [`Table::view`] factory method. A
165/// [`Table`] can have as many [`View`]s associated with it as you need -
166/// Perspective conserves memory by relying on a single [`Table`] to power
167/// multiple [`View`]s concurrently.
168///
169/// # Examples
170///
171/// ```rust
172/// let opts = TableInitOptions::default();
173/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
174/// let table = client.table(data, opts).await?;
175///
176/// let view = table.view(None).await?;
177/// let arrow = view.to_arrow().await?;
178/// view.delete().await?;
179/// ```
180///
181/// ```rust
182/// use crate::config::*;
183/// let view = table
184/// .view(Some(ViewConfigUpdate {
185/// columns: Some(vec![Some("Sales".into())]),
186/// aggregates: Some(HashMap::from_iter(vec![("Sales".into(), "sum".into())])),
187/// group_by: Some(vec!["Region".into(), "Country".into()]),
188/// filter: Some(vec![Filter::new("Category", "in", &[
189/// "Furniture",
190/// "Technology",
191/// ])]),
192/// ..ViewConfigUpdate::default()
193/// }))
194/// .await?;
195/// ```
196///
197/// Group By
198///
199/// ```rust
200/// let view = table
201/// .view(Some(ViewConfigUpdate {
202/// group_by: Some(vec!["a".into(), "c".into()]),
203/// ..ViewConfigUpdate::default()
204/// }))
205/// .await?;
206/// ```
207///
208/// Split By
209///
210/// ```rust
211/// let view = table
212/// .view(Some(ViewConfigUpdate {
213/// split_by: Some(vec!["a".into(), "c".into()]),
214/// ..ViewConfigUpdate::default()
215/// }))
216/// .await?;
217/// ```
218///
219/// In Javascript, a [`Table`] can be constructed on a [`Table::view`] instance,
220/// which will return a new [`Table`] based on the [`Table::view`]'s dataset,
221/// and all future updates that affect the [`Table::view`] will be forwarded to
222/// the new [`Table`]. This is particularly useful for implementing a
223/// [Client/Server Replicated](server.md#clientserver-replicated) design, by
224/// serializing the `View` to an arrow and setting up an `on_update` callback.
225///
226/// ```rust
227/// let opts = TableInitOptions::default();
228/// let data = TableData::Update(UpdateData::Csv("x,y\n1,2\n3,4".into()));
229/// let table = client.table(data, opts).await?;
230/// let view = table.view(None).await?;
231/// let table2 = client.table(TableData::View(view)).await?;
232/// table.update(data).await?;
233/// ```
234#[derive(Clone, Debug)]
235pub struct View {
236 pub name: String,
237 client: Client,
238}
239
240assert_view_api!(View);
241
242impl View {
243 pub fn new(name: String, client: Client) -> Self {
244 View { name, client }
245 }
246
247 fn client_message(&self, req: ClientReq) -> Request {
248 crate::proto::Request {
249 msg_id: self.client.gen_id(),
250 entity_id: self.name.clone(),
251 client_req: Some(req),
252 }
253 }
254
255 /// Returns an array of strings containing the column paths of the [`View`]
256 /// without any of the source columns.
257 ///
258 /// A column path shows the columns that a given cell belongs to after
259 /// pivots are applied.
260 pub async fn column_paths(&self, window: ColumnWindow) -> ClientResult<Vec<String>> {
261 let msg = self.client_message(ClientReq::ViewColumnPathsReq(ViewColumnPathsReq {
262 start_col: window.start_col.map(|x| x as u32),
263 end_col: window.end_col.map(|x| x as u32),
264 }));
265
266 match self.client.oneshot(&msg).await? {
267 ClientResp::ViewColumnPathsResp(ViewColumnPathsResp { paths }) => {
268 // Ok(paths.into_iter().map(|x| x.path).collect())
269 Ok(paths)
270 },
271 resp => Err(resp.into()),
272 }
273 }
274
275 /// Returns this [`View`]'s _dimensions_, row and column count, as well as
276 /// those of the [`crate::Table`] from which it was derived.
277 ///
278 /// - `num_table_rows` - The number of rows in the underlying
279 /// [`crate::Table`].
280 /// - `num_table_columns` - The number of columns in the underlying
281 /// [`crate::Table`] (including the `index` column if this
282 /// [`crate::Table`] was constructed with one).
283 /// - `num_view_rows` - The number of rows in this [`View`]. If this
284 /// [`View`] has a `group_by` clause, `num_view_rows` will also include
285 /// aggregated rows.
286 /// - `num_view_columns` - The number of columns in this [`View`]. If this
287 /// [`View`] has a `split_by` clause, `num_view_columns` will include all
288 /// _column paths_, e.g. the number of `columns` clause times the number
289 /// of `split_by` groups.
290 pub async fn dimensions(&self) -> ClientResult<ViewDimensionsResp> {
291 let msg = self.client_message(ClientReq::ViewDimensionsReq(ViewDimensionsReq {}));
292 match self.client.oneshot(&msg).await? {
293 ClientResp::ViewDimensionsResp(resp) => Ok(resp),
294 resp => Err(resp.into()),
295 }
296 }
297
298 /// The expression schema of this [`View`], which contains only the
299 /// expressions created on this [`View`]. See [`View::schema`] for
300 /// details.
301 pub async fn expression_schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
302 if self.client.get_features().await?.expressions {
303 let msg = self.client_message(ClientReq::ViewExpressionSchemaReq(
304 ViewExpressionSchemaReq {},
305 ));
306 match self.client.oneshot(&msg).await? {
307 ClientResp::ViewExpressionSchemaResp(ViewExpressionSchemaResp { schema }) => {
308 Ok(schema
309 .into_iter()
310 .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
311 .collect())
312 },
313 resp => Err(resp.into()),
314 }
315 } else {
316 Ok([].into_iter().collect())
317 }
318 }
319
320 /// A copy of the [`ViewConfig`] object passed to the [`Table::view`] method
321 /// which created this [`View`].
322 pub async fn get_config(&self) -> ClientResult<crate::config::ViewConfig> {
323 let msg = self.client_message(ClientReq::ViewGetConfigReq(ViewGetConfigReq {}));
324 match self.client.oneshot(&msg).await? {
325 ClientResp::ViewGetConfigResp(ViewGetConfigResp {
326 config: Some(config),
327 }) => Ok(config.into()),
328 resp => Err(resp.into()),
329 }
330 }
331
332 /// The number of aggregated rows in this [`View`]. This is affected by the
333 /// "group_by" configuration parameter supplied to this view's contructor.
334 ///
335 /// # Returns
336 ///
337 /// The number of aggregated rows.
338 pub async fn num_rows(&self) -> ClientResult<u32> {
339 Ok(self.dimensions().await?.num_view_rows)
340 }
341
342 /// The schema of this [`View`].
343 ///
344 /// The [`View`] schema differs from the `schema` returned by
345 /// [`Table::schema`]; it may have different column names due to
346 /// `expressions` or `columns` configs, or it maye have _different
347 /// column types_ due to the application og `group_by` and `aggregates`
348 /// config. You can think of [`Table::schema`] as the _input_ schema and
349 /// [`View::schema`] as the _output_ schema of a Perspective pipeline.
350 pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
351 let msg = self.client_message(ClientReq::ViewSchemaReq(ViewSchemaReq {}));
352 match self.client.oneshot(&msg).await? {
353 ClientResp::ViewSchemaResp(ViewSchemaResp { schema }) => Ok(schema
354 .into_iter()
355 .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
356 .collect()),
357 resp => Err(resp.into()),
358 }
359 }
360
361 /// Serializes a [`View`] to the Apache Arrow data format.
362 pub async fn to_arrow(&self, window: ViewWindow) -> ClientResult<Bytes> {
363 let msg = self.client_message(ClientReq::ViewToArrowReq(ViewToArrowReq {
364 viewport: Some(window.clone().into()),
365 compression: window.compression,
366 }));
367
368 match self.client.oneshot(&msg).await? {
369 ClientResp::ViewToArrowResp(ViewToArrowResp { arrow }) => Ok(arrow.into()),
370 resp => Err(resp.into()),
371 }
372 }
373
374 /// Serializes this [`View`] to a string of JSON data. Useful if you want to
375 /// save additional round trip serialize/deserialize cycles.
376 pub async fn to_columns_string(&self, window: ViewWindow) -> ClientResult<String> {
377 let msg = self.client_message(ClientReq::ViewToColumnsStringReq(ViewToColumnsStringReq {
378 viewport: Some(window.clone().into()),
379 id: window.id,
380 index: window.index,
381 formatted: window.formatted,
382 }));
383
384 match self.client.oneshot(&msg).await? {
385 ClientResp::ViewToColumnsStringResp(ViewToColumnsStringResp { json_string }) => {
386 Ok(json_string)
387 },
388 resp => Err(resp.into()),
389 }
390 }
391
392 /// Render this `View` as a JSON string.
393 pub async fn to_json_string(&self, window: ViewWindow) -> ClientResult<String> {
394 let viewport = ViewPort::from(window.clone());
395 let msg = self.client_message(ClientReq::ViewToRowsStringReq(ViewToRowsStringReq {
396 viewport: Some(viewport),
397 id: window.id,
398 index: window.index,
399 formatted: window.formatted,
400 }));
401
402 match self.client.oneshot(&msg).await? {
403 ClientResp::ViewToRowsStringResp(ViewToRowsStringResp { json_string }) => {
404 Ok(json_string)
405 },
406 resp => Err(resp.into()),
407 }
408 }
409
410 /// Renders this [`View`] as an [NDJSON](https://github.com/ndjson/ndjson-spec)
411 /// formatted [`String`].
412 pub async fn to_ndjson(&self, window: ViewWindow) -> ClientResult<String> {
413 let viewport = ViewPort::from(window.clone());
414 let msg = self.client_message(ClientReq::ViewToNdjsonStringReq(ViewToNdjsonStringReq {
415 viewport: Some(viewport),
416 id: window.id,
417 index: window.index,
418 formatted: window.formatted,
419 }));
420
421 match self.client.oneshot(&msg).await? {
422 ClientResp::ViewToNdjsonStringResp(ViewToNdjsonStringResp { ndjson_string }) => {
423 Ok(ndjson_string)
424 },
425 resp => Err(resp.into()),
426 }
427 }
428
429 /// Serializes this [`View`] to CSV data in a standard format.
430 pub async fn to_csv(&self, window: ViewWindow) -> ClientResult<String> {
431 let msg = self.client_message(ClientReq::ViewToCsvReq(ViewToCsvReq {
432 viewport: Some(window.into()),
433 }));
434
435 match self.client.oneshot(&msg).await? {
436 ClientResp::ViewToCsvResp(ViewToCsvResp { csv }) => Ok(csv),
437 resp => Err(resp.into()),
438 }
439 }
440
441 /// Delete this [`View`] and clean up all resources associated with it.
442 /// [`View`] objects do not stop consuming resources or processing
443 /// updates when they are garbage collected - you must call this method
444 /// to reclaim these.
445 pub async fn delete(&self) -> ClientResult<()> {
446 let msg = self.client_message(ClientReq::ViewDeleteReq(ViewDeleteReq {}));
447 match self.client.oneshot(&msg).await? {
448 ClientResp::ViewDeleteResp(_) => Ok(()),
449 resp => Err(resp.into()),
450 }
451 }
452
453 /// Calculates the [min, max] of the leaf nodes of a column `column_name`.
454 ///
455 /// # Returns
456 ///
457 /// A tuple of [min, max], whose types are column and aggregate dependent.
458 pub async fn get_min_max(&self, column_name: String) -> ClientResult<(String, String)> {
459 let msg = self.client_message(ClientReq::ViewGetMinMaxReq(ViewGetMinMaxReq {
460 column_name,
461 }));
462
463 match self.client.oneshot(&msg).await? {
464 ClientResp::ViewGetMinMaxResp(ViewGetMinMaxResp { min, max }) => Ok((min, max)),
465 resp => Err(resp.into()),
466 }
467 }
468
469 /// Register a callback with this [`View`]. Whenever the view's underlying
470 /// table emits an update, this callback will be invoked with an object
471 /// containing `port_id`, indicating which port the update fired on, and
472 /// optionally `delta`, which is the new data that was updated for each
473 /// cell or each row.
474 ///
475 /// # Arguments
476 ///
477 /// - `on_update` - A callback function invoked on update, which receives an
478 /// object with two keys: `port_id`, indicating which port the update was
479 /// triggered on, and `delta`, whose value is dependent on the mode
480 /// parameter.
481 /// - `options` - If this is provided as `OnUpdateOptions { mode:
482 /// Some(OnUpdateMode::Row) }`, then `delta` is an Arrow of the updated
483 /// rows. Otherwise `delta` will be [`Option::None`].
484 pub async fn on_update<T, U>(&self, on_update: T, options: OnUpdateOptions) -> ClientResult<u32>
485 where
486 T: Fn(OnUpdateData) -> U + Send + Sync + 'static,
487 U: Future<Output = ()> + Send + 'static,
488 {
489 let on_update = Arc::new(on_update);
490 let callback = move |resp: Response| {
491 let on_update = on_update.clone();
492 async move {
493 match resp.client_resp {
494 Some(ClientResp::ViewOnUpdateResp(resp)) => {
495 on_update(OnUpdateData(resp)).await;
496 Ok(())
497 },
498 resp => Err(resp.into()),
499 }
500 }
501 };
502
503 let msg = self.client_message(ClientReq::ViewOnUpdateReq(ViewOnUpdateReq {
504 mode: options.mode.map(|OnUpdateMode::Row| Mode::Row as i32),
505 }));
506
507 self.client.subscribe(&msg, callback).await?;
508 Ok(msg.msg_id)
509 }
510
511 /// Unregister a previously registered update callback with this [`View`].
512 ///
513 /// # Arguments
514 ///
515 /// - `id` - A callback `id` as returned by a recipricol call to
516 /// [`View::on_update`].
517 ///
518 /// # Examples
519 ///
520 /// ```rust
521 /// let callback = |_| async { print!("Updated!") };
522 /// let cid = view.on_update(callback, OnUpdateOptions::default()).await?;
523 /// view.remove_update(cid).await?;
524 /// ```
525 pub async fn remove_update(&self, update_id: u32) -> ClientResult<()> {
526 let msg = self.client_message(ClientReq::ViewRemoveOnUpdateReq(ViewRemoveOnUpdateReq {
527 id: update_id,
528 }));
529
530 self.client.unsubscribe(update_id).await?;
531 match self.client.oneshot(&msg).await? {
532 ClientResp::ViewRemoveOnUpdateResp(_) => Ok(()),
533 resp => Err(resp.into()),
534 }
535 }
536
537 /// Register a callback with this [`View`]. Whenever the [`View`] is
538 /// deleted, this callback will be invoked.
539 pub async fn on_delete(
540 &self,
541 on_delete: Box<dyn Fn() + Send + Sync + 'static>,
542 ) -> ClientResult<u32> {
543 let callback = move |resp: Response| match resp.client_resp.unwrap() {
544 ClientResp::ViewOnDeleteResp(_) => {
545 on_delete();
546 Ok(())
547 },
548 resp => Err(resp.into()),
549 };
550
551 let msg = self.client_message(ClientReq::ViewOnDeleteReq(ViewOnDeleteReq {}));
552 self.client.subscribe_once(&msg, Box::new(callback)).await?;
553 Ok(msg.msg_id)
554 }
555
556 /// Unregister a previously registered [`View::on_delete`] callback.
557 pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
558 let msg = self.client_message(ClientReq::ViewRemoveDeleteReq(ViewRemoveDeleteReq {
559 id: callback_id,
560 }));
561
562 match self.client.oneshot(&msg).await? {
563 ClientResp::ViewRemoveDeleteResp(ViewRemoveDeleteResp {}) => Ok(()),
564 resp => Err(resp.into()),
565 }
566 }
567
568 /// Collapses the `group_by` row at `row_index`.
569 pub async fn collapse(&self, row_index: u32) -> ClientResult<u32> {
570 let msg = self.client_message(ClientReq::ViewCollapseReq(ViewCollapseReq { row_index }));
571 match self.client.oneshot(&msg).await? {
572 ClientResp::ViewCollapseResp(ViewCollapseResp { num_changed }) => Ok(num_changed),
573 resp => Err(resp.into()),
574 }
575 }
576
577 /// Expand the `group_by` row at `row_index`.
578 pub async fn expand(&self, row_index: u32) -> ClientResult<u32> {
579 let msg = self.client_message(ClientReq::ViewExpandReq(ViewExpandReq { row_index }));
580 match self.client.oneshot(&msg).await? {
581 ClientResp::ViewExpandResp(ViewExpandResp { num_changed }) => Ok(num_changed),
582 resp => Err(resp.into()),
583 }
584 }
585
586 /// Set expansion `depth` of the `group_by` tree.
587 pub async fn set_depth(&self, depth: u32) -> ClientResult<()> {
588 let msg = self.client_message(ClientReq::ViewSetDepthReq(ViewSetDepthReq { depth }));
589 match self.client.oneshot(&msg).await? {
590 ClientResp::ViewSetDepthResp(_) => Ok(()),
591 resp => Err(resp.into()),
592 }
593 }
594}