1use std::collections::HashMap;
14use std::str::FromStr;
15use std::sync::Arc;
16
17use futures::Future;
18use prost::bytes::Bytes;
19use serde::{Deserialize, Serialize};
20use ts_rs::TS;
21
22use self::view_on_update_req::Mode;
23use crate::assert_view_api;
24use crate::client::Client;
25use crate::proto::request::ClientReq;
26use crate::proto::response::ClientResp;
27use crate::proto::*;
28#[cfg(doc)]
29use crate::table::Table;
30pub use crate::utils::*;
31
32#[derive(Default, Debug, Deserialize, TS)]
33pub struct OnUpdateOptions {
34 pub mode: Option<OnUpdateMode>,
35}
36
37#[derive(Default, Debug, Deserialize, TS)]
38pub enum OnUpdateMode {
39 #[default]
40 #[serde(rename = "row")]
41 Row,
42}
43
44impl FromStr for OnUpdateMode {
45 type Err = ClientError;
46
47 fn from_str(s: &str) -> Result<Self, Self::Err> {
48 if s == "row" {
49 Ok(OnUpdateMode::Row)
50 } else {
51 Err(ClientError::Option)
52 }
53 }
54}
55
56#[derive(Clone, Debug, Serialize)]
57pub struct Dimensions {
58 pub num_view_rows: usize,
59 pub num_view_columns: usize,
60 pub num_table_rows: usize,
61 pub num_table_columns: usize,
62}
63
64#[derive(Clone, Debug, Default, Deserialize, Serialize, TS)]
65pub struct ViewWindow {
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub start_row: Option<f32>,
68
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub start_col: Option<f32>,
71
72 #[serde(skip_serializing_if = "Option::is_none")]
73 pub end_row: Option<f32>,
74
75 #[serde(skip_serializing_if = "Option::is_none")]
76 pub end_col: Option<f32>,
77
78 #[serde(skip_serializing_if = "Option::is_none")]
79 pub id: Option<bool>,
80
81 #[serde(skip_serializing_if = "Option::is_none")]
82 pub index: Option<bool>,
83
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub leaves_only: Option<bool>,
86
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub formatted: Option<bool>,
89
90 #[serde(skip_serializing_if = "Option::is_none")]
91 pub compression: Option<String>,
92}
93
94impl From<ViewWindow> for ViewPort {
95 fn from(window: ViewWindow) -> Self {
96 ViewPort {
97 start_row: window.start_row.map(|x| x.floor() as u32),
98 start_col: window.start_col.map(|x| x.floor() as u32),
99 end_row: window.end_row.map(|x| x.ceil() as u32),
100 end_col: window.end_col.map(|x| x.ceil() as u32),
101 }
102 }
103}
104
105#[doc = include_str!("../../docs/view.md")]
106#[derive(Clone, Debug)]
107pub struct View {
108 pub name: String,
109 client: Client,
110}
111
112assert_view_api!(View);
113
114impl View {
115 pub fn new(name: String, client: Client) -> Self {
116 View { name, client }
117 }
118
119 fn client_message(&self, req: ClientReq) -> Request {
120 crate::proto::Request {
121 msg_id: self.client.gen_id(),
122 entity_id: self.name.clone(),
123 client_req: Some(req),
124 }
125 }
126
127 #[doc = include_str!("../../docs/view/column_paths.md")]
128 pub async fn column_paths(&self) -> ClientResult<Vec<String>> {
129 let msg = self.client_message(ClientReq::ViewColumnPathsReq(ViewColumnPathsReq {}));
130 match self.client.oneshot(&msg).await? {
131 ClientResp::ViewColumnPathsResp(ViewColumnPathsResp { paths }) => {
132 Ok(paths)
134 },
135 resp => Err(resp.into()),
136 }
137 }
138
139 #[doc = include_str!("../../docs/view/dimensions.md")]
140 pub async fn dimensions(&self) -> ClientResult<ViewDimensionsResp> {
141 let msg = self.client_message(ClientReq::ViewDimensionsReq(ViewDimensionsReq {}));
142 match self.client.oneshot(&msg).await? {
143 ClientResp::ViewDimensionsResp(resp) => Ok(resp),
144 resp => Err(resp.into()),
145 }
146 }
147
148 #[doc = include_str!("../../docs/view/expression_schema.md")]
149 pub async fn expression_schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
150 let msg = self.client_message(ClientReq::ViewExpressionSchemaReq(
151 ViewExpressionSchemaReq {},
152 ));
153 match self.client.oneshot(&msg).await? {
154 ClientResp::ViewExpressionSchemaResp(ViewExpressionSchemaResp { schema }) => Ok(schema
155 .into_iter()
156 .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
157 .collect()),
158 resp => Err(resp.into()),
159 }
160 }
161
162 #[doc = include_str!("../../docs/view/get_config.md")]
163 pub async fn get_config(&self) -> ClientResult<crate::config::ViewConfig> {
164 let msg = self.client_message(ClientReq::ViewGetConfigReq(ViewGetConfigReq {}));
165 match self.client.oneshot(&msg).await? {
166 ClientResp::ViewGetConfigResp(ViewGetConfigResp {
167 config: Some(config),
168 }) => Ok(config.into()),
169 resp => Err(resp.into()),
170 }
171 }
172
173 #[doc = include_str!("../../docs/view/num_rows.md")]
174 pub async fn num_rows(&self) -> ClientResult<u32> {
175 Ok(self.dimensions().await?.num_view_rows)
176 }
177
178 #[doc = include_str!("../../docs/view/schema.md")]
179 pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
180 let msg = self.client_message(ClientReq::ViewSchemaReq(ViewSchemaReq {}));
181 match self.client.oneshot(&msg).await? {
182 ClientResp::ViewSchemaResp(ViewSchemaResp { schema }) => Ok(schema
183 .into_iter()
184 .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
185 .collect()),
186 resp => Err(resp.into()),
187 }
188 }
189
190 #[doc = include_str!("../../docs/view/to_arrow.md")]
191 pub async fn to_arrow(&self, window: ViewWindow) -> ClientResult<Bytes> {
192 let msg = self.client_message(ClientReq::ViewToArrowReq(ViewToArrowReq {
193 viewport: Some(window.clone().into()),
194 compression: window.compression,
195 }));
196
197 match self.client.oneshot(&msg).await? {
198 ClientResp::ViewToArrowResp(ViewToArrowResp { arrow }) => Ok(arrow.into()),
199 resp => Err(resp.into()),
200 }
201 }
202
203 #[doc = include_str!("../../docs/view/to_columns_string.md")]
204 pub async fn to_columns_string(&self, window: ViewWindow) -> ClientResult<String> {
205 let msg = self.client_message(ClientReq::ViewToColumnsStringReq(ViewToColumnsStringReq {
206 viewport: Some(window.clone().into()),
207 id: window.id,
208 index: window.index,
209 formatted: window.formatted,
210 leaves_only: window.leaves_only,
211 }));
212
213 match self.client.oneshot(&msg).await? {
214 ClientResp::ViewToColumnsStringResp(ViewToColumnsStringResp { json_string }) => {
215 Ok(json_string)
216 },
217 resp => Err(resp.into()),
218 }
219 }
220
221 #[doc = include_str!("../../docs/view/to_json_string.md")]
222 pub async fn to_json_string(&self, window: ViewWindow) -> ClientResult<String> {
223 let viewport = ViewPort {
224 start_row: window.start_row.map(|x| x.floor() as u32),
225 start_col: window.start_col.map(|x| x.floor() as u32),
226 end_row: window.end_row.map(|x| x.ceil() as u32),
227 end_col: window.end_col.map(|x| x.ceil() as u32),
228 };
229
230 let msg = self.client_message(ClientReq::ViewToRowsStringReq(ViewToRowsStringReq {
231 viewport: Some(viewport),
232 id: window.id,
233 index: window.index,
234 formatted: window.formatted,
235 leaves_only: window.leaves_only,
236 }));
237
238 match self.client.oneshot(&msg).await? {
239 ClientResp::ViewToRowsStringResp(ViewToRowsStringResp { json_string }) => {
240 Ok(json_string)
241 },
242 resp => Err(resp.into()),
243 }
244 }
245
246 #[doc = include_str!("../../docs/view/to_ndjson.md")]
247 pub async fn to_ndjson(&self, window: ViewWindow) -> ClientResult<String> {
248 let viewport = ViewPort {
249 start_row: window.start_row.map(|x| x.floor() as u32),
250 start_col: window.start_col.map(|x| x.floor() as u32),
251 end_row: window.end_row.map(|x| x.ceil() as u32),
252 end_col: window.end_col.map(|x| x.ceil() as u32),
253 };
254
255 let msg = self.client_message(ClientReq::ViewToNdjsonStringReq(ViewToNdjsonStringReq {
256 viewport: Some(viewport),
257 id: window.id,
258 index: window.index,
259 formatted: window.formatted,
260 leaves_only: window.leaves_only,
261 }));
262
263 match self.client.oneshot(&msg).await? {
264 ClientResp::ViewToNdjsonStringResp(ViewToNdjsonStringResp { ndjson_string }) => {
265 Ok(ndjson_string)
266 },
267 resp => Err(resp.into()),
268 }
269 }
270
271 #[doc = include_str!("../../docs/view/to_csv.md")]
272 pub async fn to_csv(&self, window: ViewWindow) -> ClientResult<String> {
273 let msg = self.client_message(ClientReq::ViewToCsvReq(ViewToCsvReq {
274 viewport: Some(window.into()),
275 }));
276
277 match self.client.oneshot(&msg).await? {
278 ClientResp::ViewToCsvResp(ViewToCsvResp { csv }) => Ok(csv),
279 resp => Err(resp.into()),
280 }
281 }
282
283 #[doc = include_str!("../../docs/view/delete.md")]
284 pub async fn delete(&self) -> ClientResult<()> {
285 let msg = self.client_message(ClientReq::ViewDeleteReq(ViewDeleteReq {}));
286 match self.client.oneshot(&msg).await? {
287 ClientResp::ViewDeleteResp(_) => Ok(()),
288 resp => Err(resp.into()),
289 }
290 }
291
292 #[doc = include_str!("../../docs/view/get_min_max.md")]
293 pub async fn get_min_max(&self, column_name: String) -> ClientResult<(String, String)> {
294 let msg = self.client_message(ClientReq::ViewGetMinMaxReq(ViewGetMinMaxReq {
295 column_name,
296 }));
297
298 match self.client.oneshot(&msg).await? {
299 ClientResp::ViewGetMinMaxResp(ViewGetMinMaxResp { min, max }) => Ok((min, max)),
300 resp => Err(resp.into()),
301 }
302 }
303
304 pub async fn on_update<T, U>(&self, on_update: T, options: OnUpdateOptions) -> ClientResult<u32>
308 where
309 T: Fn(ViewOnUpdateResp) -> U + Send + Sync + 'static,
310 U: Future<Output = ()> + Send + 'static,
311 {
312 let on_update = Arc::new(on_update);
313 let callback = move |resp: Response| {
314 let on_update = on_update.clone();
315 async move {
316 match resp.client_resp {
317 Some(ClientResp::ViewOnUpdateResp(resp)) => {
318 on_update(resp).await;
319 Ok(())
320 },
321 resp => Err(ClientError::OptionResponseFailed(resp.into())),
322 }
323 }
324 };
325
326 let msg = self.client_message(ClientReq::ViewOnUpdateReq(ViewOnUpdateReq {
327 mode: options.mode.map(|OnUpdateMode::Row| Mode::Row as i32),
328 }));
329
330 self.client.subscribe(&msg, callback).await?;
331 Ok(msg.msg_id)
332 }
333
334 #[doc = include_str!("../../docs/view/remove_update.md")]
335 pub async fn remove_update(&self, update_id: u32) -> ClientResult<()> {
336 let msg = self.client_message(ClientReq::ViewRemoveOnUpdateReq(ViewRemoveOnUpdateReq {
337 id: update_id,
338 }));
339
340 self.client.unsubscribe(update_id).await?;
341 match self.client.oneshot(&msg).await? {
342 ClientResp::ViewRemoveOnUpdateResp(_) => Ok(()),
343 resp => Err(resp.into()),
344 }
345 }
346
347 #[doc = include_str!("../../docs/view/on_delete.md")]
348 pub async fn on_delete(
349 &self,
350 on_delete: Box<dyn Fn() + Send + Sync + 'static>,
351 ) -> ClientResult<u32> {
352 let callback = move |resp: Response| match resp.client_resp.unwrap() {
353 ClientResp::ViewOnDeleteResp(_) => {
354 on_delete();
355 Ok(())
356 },
357 resp => Err(resp.into()),
358 };
359
360 let msg = self.client_message(ClientReq::ViewOnDeleteReq(ViewOnDeleteReq {}));
361 self.client.subscribe_once(&msg, Box::new(callback)).await?;
362 Ok(msg.msg_id)
363 }
364
365 #[doc = include_str!("../../docs/view/remove_delete.md")]
366 pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
367 let msg = self.client_message(ClientReq::ViewRemoveDeleteReq(ViewRemoveDeleteReq {
368 id: callback_id,
369 }));
370
371 match self.client.oneshot(&msg).await? {
372 ClientResp::ViewRemoveDeleteResp(ViewRemoveDeleteResp {}) => Ok(()),
373 resp => Err(resp.into()),
374 }
375 }
376
377 #[doc = include_str!("../../docs/view/collapse.md")]
378 pub async fn collapse(&self, row_index: u32) -> ClientResult<u32> {
379 let msg = self.client_message(ClientReq::ViewCollapseReq(ViewCollapseReq { row_index }));
380 match self.client.oneshot(&msg).await? {
381 ClientResp::ViewCollapseResp(ViewCollapseResp { num_changed }) => Ok(num_changed),
382 resp => Err(resp.into()),
383 }
384 }
385
386 #[doc = include_str!("../../docs/view/expand.md")]
387 pub async fn expand(&self, row_index: u32) -> ClientResult<u32> {
388 let msg = self.client_message(ClientReq::ViewExpandReq(ViewExpandReq { row_index }));
389 match self.client.oneshot(&msg).await? {
390 ClientResp::ViewExpandResp(ViewExpandResp { num_changed }) => Ok(num_changed),
391 resp => Err(resp.into()),
392 }
393 }
394
395 #[doc = include_str!("../../docs/view/set_depth.md")]
396 pub async fn set_depth(&self, depth: u32) -> ClientResult<()> {
397 let msg = self.client_message(ClientReq::ViewSetDepthReq(ViewSetDepthReq { depth }));
398 match self.client.oneshot(&msg).await? {
399 ClientResp::ViewSetDepthResp(_) => Ok(()),
400 resp => Err(resp.into()),
401 }
402 }
403}