perspective_client/
view.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::str::FromStr;
15use std::sync::Arc;
16
17use futures::Future;
18use prost::bytes::Bytes;
19use serde::{Deserialize, Serialize};
20use ts_rs::TS;
21
22use self::view_on_update_req::Mode;
23use crate::assert_view_api;
24use crate::client::Client;
25use crate::proto::request::ClientReq;
26use crate::proto::response::ClientResp;
27use crate::proto::*;
28#[cfg(doc)]
29use crate::table::Table;
30pub use crate::utils::*;
31
32#[derive(Default, Debug, Deserialize, TS)]
33pub struct OnUpdateOptions {
34    pub mode: Option<OnUpdateMode>,
35}
36
37#[derive(Default, Debug, Deserialize, TS)]
38pub enum OnUpdateMode {
39    #[default]
40    #[serde(rename = "row")]
41    Row,
42}
43
44impl FromStr for OnUpdateMode {
45    type Err = ClientError;
46
47    fn from_str(s: &str) -> Result<Self, Self::Err> {
48        if s == "row" {
49            Ok(OnUpdateMode::Row)
50        } else {
51            Err(ClientError::Option)
52        }
53    }
54}
55
56#[derive(Clone, Debug, Serialize)]
57pub struct Dimensions {
58    pub num_view_rows: usize,
59    pub num_view_columns: usize,
60    pub num_table_rows: usize,
61    pub num_table_columns: usize,
62}
63
64#[derive(Clone, Debug, Default, Deserialize, Serialize, TS)]
65pub struct ViewWindow {
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub start_row: Option<f32>,
68
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub start_col: Option<f32>,
71
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub end_row: Option<f32>,
74
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub end_col: Option<f32>,
77
78    #[serde(skip_serializing_if = "Option::is_none")]
79    pub id: Option<bool>,
80
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub index: Option<bool>,
83
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub leaves_only: Option<bool>,
86
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub formatted: Option<bool>,
89
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub compression: Option<String>,
92}
93
94impl From<ViewWindow> for ViewPort {
95    fn from(window: ViewWindow) -> Self {
96        ViewPort {
97            start_row: window.start_row.map(|x| x.floor() as u32),
98            start_col: window.start_col.map(|x| x.floor() as u32),
99            end_row: window.end_row.map(|x| x.ceil() as u32),
100            end_col: window.end_col.map(|x| x.ceil() as u32),
101        }
102    }
103}
104
105#[doc = include_str!("../../docs/view.md")]
106#[derive(Clone, Debug)]
107pub struct View {
108    pub name: String,
109    client: Client,
110}
111
112assert_view_api!(View);
113
114impl View {
115    pub fn new(name: String, client: Client) -> Self {
116        View { name, client }
117    }
118
119    fn client_message(&self, req: ClientReq) -> Request {
120        crate::proto::Request {
121            msg_id: self.client.gen_id(),
122            entity_id: self.name.clone(),
123            client_req: Some(req),
124        }
125    }
126
127    #[doc = include_str!("../../docs/view/column_paths.md")]
128    pub async fn column_paths(&self) -> ClientResult<Vec<String>> {
129        let msg = self.client_message(ClientReq::ViewColumnPathsReq(ViewColumnPathsReq {}));
130        match self.client.oneshot(&msg).await? {
131            ClientResp::ViewColumnPathsResp(ViewColumnPathsResp { paths }) => {
132                // Ok(paths.into_iter().map(|x| x.path).collect())
133                Ok(paths)
134            },
135            resp => Err(resp.into()),
136        }
137    }
138
139    #[doc = include_str!("../../docs/view/dimensions.md")]
140    pub async fn dimensions(&self) -> ClientResult<ViewDimensionsResp> {
141        let msg = self.client_message(ClientReq::ViewDimensionsReq(ViewDimensionsReq {}));
142        match self.client.oneshot(&msg).await? {
143            ClientResp::ViewDimensionsResp(resp) => Ok(resp),
144            resp => Err(resp.into()),
145        }
146    }
147
148    #[doc = include_str!("../../docs/view/expression_schema.md")]
149    pub async fn expression_schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
150        let msg = self.client_message(ClientReq::ViewExpressionSchemaReq(
151            ViewExpressionSchemaReq {},
152        ));
153        match self.client.oneshot(&msg).await? {
154            ClientResp::ViewExpressionSchemaResp(ViewExpressionSchemaResp { schema }) => Ok(schema
155                .into_iter()
156                .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
157                .collect()),
158            resp => Err(resp.into()),
159        }
160    }
161
162    #[doc = include_str!("../../docs/view/get_config.md")]
163    pub async fn get_config(&self) -> ClientResult<crate::config::ViewConfig> {
164        let msg = self.client_message(ClientReq::ViewGetConfigReq(ViewGetConfigReq {}));
165        match self.client.oneshot(&msg).await? {
166            ClientResp::ViewGetConfigResp(ViewGetConfigResp {
167                config: Some(config),
168            }) => Ok(config.into()),
169            resp => Err(resp.into()),
170        }
171    }
172
173    #[doc = include_str!("../../docs/view/num_rows.md")]
174    pub async fn num_rows(&self) -> ClientResult<u32> {
175        Ok(self.dimensions().await?.num_view_rows)
176    }
177
178    #[doc = include_str!("../../docs/view/schema.md")]
179    pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
180        let msg = self.client_message(ClientReq::ViewSchemaReq(ViewSchemaReq {}));
181        match self.client.oneshot(&msg).await? {
182            ClientResp::ViewSchemaResp(ViewSchemaResp { schema }) => Ok(schema
183                .into_iter()
184                .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
185                .collect()),
186            resp => Err(resp.into()),
187        }
188    }
189
190    #[doc = include_str!("../../docs/view/to_arrow.md")]
191    pub async fn to_arrow(&self, window: ViewWindow) -> ClientResult<Bytes> {
192        let msg = self.client_message(ClientReq::ViewToArrowReq(ViewToArrowReq {
193            viewport: Some(window.clone().into()),
194            compression: window.compression,
195        }));
196
197        match self.client.oneshot(&msg).await? {
198            ClientResp::ViewToArrowResp(ViewToArrowResp { arrow }) => Ok(arrow.into()),
199            resp => Err(resp.into()),
200        }
201    }
202
203    #[doc = include_str!("../../docs/view/to_columns_string.md")]
204    pub async fn to_columns_string(&self, window: ViewWindow) -> ClientResult<String> {
205        let msg = self.client_message(ClientReq::ViewToColumnsStringReq(ViewToColumnsStringReq {
206            viewport: Some(window.clone().into()),
207            id: window.id,
208            index: window.index,
209            formatted: window.formatted,
210            leaves_only: window.leaves_only,
211        }));
212
213        match self.client.oneshot(&msg).await? {
214            ClientResp::ViewToColumnsStringResp(ViewToColumnsStringResp { json_string }) => {
215                Ok(json_string)
216            },
217            resp => Err(resp.into()),
218        }
219    }
220
221    #[doc = include_str!("../../docs/view/to_json_string.md")]
222    pub async fn to_json_string(&self, window: ViewWindow) -> ClientResult<String> {
223        let viewport = ViewPort {
224            start_row: window.start_row.map(|x| x.floor() as u32),
225            start_col: window.start_col.map(|x| x.floor() as u32),
226            end_row: window.end_row.map(|x| x.ceil() as u32),
227            end_col: window.end_col.map(|x| x.ceil() as u32),
228        };
229
230        let msg = self.client_message(ClientReq::ViewToRowsStringReq(ViewToRowsStringReq {
231            viewport: Some(viewport),
232            id: window.id,
233            index: window.index,
234            formatted: window.formatted,
235            leaves_only: window.leaves_only,
236        }));
237
238        match self.client.oneshot(&msg).await? {
239            ClientResp::ViewToRowsStringResp(ViewToRowsStringResp { json_string }) => {
240                Ok(json_string)
241            },
242            resp => Err(resp.into()),
243        }
244    }
245
246    #[doc = include_str!("../../docs/view/to_ndjson.md")]
247    pub async fn to_ndjson(&self, window: ViewWindow) -> ClientResult<String> {
248        let viewport = ViewPort {
249            start_row: window.start_row.map(|x| x.floor() as u32),
250            start_col: window.start_col.map(|x| x.floor() as u32),
251            end_row: window.end_row.map(|x| x.ceil() as u32),
252            end_col: window.end_col.map(|x| x.ceil() as u32),
253        };
254
255        let msg = self.client_message(ClientReq::ViewToNdjsonStringReq(ViewToNdjsonStringReq {
256            viewport: Some(viewport),
257            id: window.id,
258            index: window.index,
259            formatted: window.formatted,
260            leaves_only: window.leaves_only,
261        }));
262
263        match self.client.oneshot(&msg).await? {
264            ClientResp::ViewToNdjsonStringResp(ViewToNdjsonStringResp { ndjson_string }) => {
265                Ok(ndjson_string)
266            },
267            resp => Err(resp.into()),
268        }
269    }
270
271    #[doc = include_str!("../../docs/view/to_csv.md")]
272    pub async fn to_csv(&self, window: ViewWindow) -> ClientResult<String> {
273        let msg = self.client_message(ClientReq::ViewToCsvReq(ViewToCsvReq {
274            viewport: Some(window.into()),
275        }));
276
277        match self.client.oneshot(&msg).await? {
278            ClientResp::ViewToCsvResp(ViewToCsvResp { csv }) => Ok(csv),
279            resp => Err(resp.into()),
280        }
281    }
282
283    #[doc = include_str!("../../docs/view/delete.md")]
284    pub async fn delete(&self) -> ClientResult<()> {
285        let msg = self.client_message(ClientReq::ViewDeleteReq(ViewDeleteReq {}));
286        match self.client.oneshot(&msg).await? {
287            ClientResp::ViewDeleteResp(_) => Ok(()),
288            resp => Err(resp.into()),
289        }
290    }
291
292    #[doc = include_str!("../../docs/view/get_min_max.md")]
293    pub async fn get_min_max(&self, column_name: String) -> ClientResult<(String, String)> {
294        let msg = self.client_message(ClientReq::ViewGetMinMaxReq(ViewGetMinMaxReq {
295            column_name,
296        }));
297
298        match self.client.oneshot(&msg).await? {
299            ClientResp::ViewGetMinMaxResp(ViewGetMinMaxResp { min, max }) => Ok((min, max)),
300            resp => Err(resp.into()),
301        }
302    }
303
304    /// This is used when constructing a [`super::table::Table`] from a
305    /// [`View`]. The callback needs to be async to wire up the views
306    /// on_update to the tables.
307    pub async fn on_update<T, U>(&self, on_update: T, options: OnUpdateOptions) -> ClientResult<u32>
308    where
309        T: Fn(ViewOnUpdateResp) -> U + Send + Sync + 'static,
310        U: Future<Output = ()> + Send + 'static,
311    {
312        let on_update = Arc::new(on_update);
313        let callback = move |resp: Response| {
314            let on_update = on_update.clone();
315            async move {
316                match resp.client_resp {
317                    Some(ClientResp::ViewOnUpdateResp(resp)) => {
318                        on_update(resp).await;
319                        Ok(())
320                    },
321                    resp => Err(ClientError::OptionResponseFailed(resp.into())),
322                }
323            }
324        };
325
326        let msg = self.client_message(ClientReq::ViewOnUpdateReq(ViewOnUpdateReq {
327            mode: options.mode.map(|OnUpdateMode::Row| Mode::Row as i32),
328        }));
329
330        self.client.subscribe(&msg, callback).await?;
331        Ok(msg.msg_id)
332    }
333
334    #[doc = include_str!("../../docs/view/remove_update.md")]
335    pub async fn remove_update(&self, update_id: u32) -> ClientResult<()> {
336        let msg = self.client_message(ClientReq::ViewRemoveOnUpdateReq(ViewRemoveOnUpdateReq {
337            id: update_id,
338        }));
339
340        self.client.unsubscribe(update_id).await?;
341        match self.client.oneshot(&msg).await? {
342            ClientResp::ViewRemoveOnUpdateResp(_) => Ok(()),
343            resp => Err(resp.into()),
344        }
345    }
346
347    #[doc = include_str!("../../docs/view/on_delete.md")]
348    pub async fn on_delete(
349        &self,
350        on_delete: Box<dyn Fn() + Send + Sync + 'static>,
351    ) -> ClientResult<u32> {
352        let callback = move |resp: Response| match resp.client_resp.unwrap() {
353            ClientResp::ViewOnDeleteResp(_) => {
354                on_delete();
355                Ok(())
356            },
357            resp => Err(resp.into()),
358        };
359
360        let msg = self.client_message(ClientReq::ViewOnDeleteReq(ViewOnDeleteReq {}));
361        self.client.subscribe_once(&msg, Box::new(callback)).await?;
362        Ok(msg.msg_id)
363    }
364
365    #[doc = include_str!("../../docs/view/remove_delete.md")]
366    pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
367        let msg = self.client_message(ClientReq::ViewRemoveDeleteReq(ViewRemoveDeleteReq {
368            id: callback_id,
369        }));
370
371        match self.client.oneshot(&msg).await? {
372            ClientResp::ViewRemoveDeleteResp(ViewRemoveDeleteResp {}) => Ok(()),
373            resp => Err(resp.into()),
374        }
375    }
376
377    #[doc = include_str!("../../docs/view/collapse.md")]
378    pub async fn collapse(&self, row_index: u32) -> ClientResult<u32> {
379        let msg = self.client_message(ClientReq::ViewCollapseReq(ViewCollapseReq { row_index }));
380        match self.client.oneshot(&msg).await? {
381            ClientResp::ViewCollapseResp(ViewCollapseResp { num_changed }) => Ok(num_changed),
382            resp => Err(resp.into()),
383        }
384    }
385
386    #[doc = include_str!("../../docs/view/expand.md")]
387    pub async fn expand(&self, row_index: u32) -> ClientResult<u32> {
388        let msg = self.client_message(ClientReq::ViewExpandReq(ViewExpandReq { row_index }));
389        match self.client.oneshot(&msg).await? {
390            ClientResp::ViewExpandResp(ViewExpandResp { num_changed }) => Ok(num_changed),
391            resp => Err(resp.into()),
392        }
393    }
394
395    #[doc = include_str!("../../docs/view/set_depth.md")]
396    pub async fn set_depth(&self, depth: u32) -> ClientResult<()> {
397        let msg = self.client_message(ClientReq::ViewSetDepthReq(ViewSetDepthReq { depth }));
398        match self.client.oneshot(&msg).await? {
399            ClientResp::ViewSetDepthResp(_) => Ok(()),
400            resp => Err(resp.into()),
401        }
402    }
403}