perspective_client/
table.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::fmt::Display;
15
16use nanoid::*;
17use serde::{Deserialize, Serialize};
18use ts_rs::TS;
19
20use crate::assert_table_api;
21use crate::client::{Client, Features};
22use crate::config::{Expressions, ViewConfigUpdate};
23use crate::proto::make_table_req::make_table_options::MakeTableType;
24use crate::proto::make_table_req::MakeTableOptions;
25use crate::proto::request::ClientReq;
26use crate::proto::response::ClientResp;
27use crate::proto::*;
28use crate::table_data::UpdateData;
29use crate::utils::*;
30use crate::view::View;
31
32pub type Schema = HashMap<String, ColumnType>;
33
34#[derive(Clone, Copy, Debug, Serialize, Deserialize, TS)]
35pub enum TableReadFormat {
36    #[serde(rename = "csv")]
37    Csv,
38
39    #[serde(rename = "json")]
40    JsonString,
41
42    #[serde(rename = "columns")]
43    ColumnsString,
44
45    #[serde(rename = "arrow")]
46    Arrow,
47
48    #[serde(rename = "ndjson")]
49    Ndjson,
50}
51
52impl TableReadFormat {
53    pub fn parse(value: Option<String>) -> Result<Option<Self>, String> {
54        Ok(match value.as_deref() {
55            Some("csv") => Some(TableReadFormat::Csv),
56            Some("json") => Some(TableReadFormat::JsonString),
57            Some("columns") => Some(TableReadFormat::ColumnsString),
58            Some("arrow") => Some(TableReadFormat::Arrow),
59            Some("ndjson") => Some(TableReadFormat::Ndjson),
60            None => None,
61            Some(x) => return Err(format!("Unknown format \"{}\"", x)),
62        })
63    }
64}
65
66/// Options which impact the behavior of [`Client::table`], as well as
67/// subsequent calls to [`Table::update`].
68#[derive(Clone, Debug, Default, Serialize, Deserialize, TS)]
69pub struct TableInitOptions {
70    #[serde(default)]
71    #[ts(optional)]
72    pub name: Option<String>,
73
74    #[serde(default)]
75    #[ts(optional)]
76    pub format: Option<TableReadFormat>,
77
78    /// This [`Table`] should use the column named by the `index` parameter as
79    /// the `index`, which causes [`Table::update`] and [`Client::table`] input
80    /// to either insert or update existing rows based on `index` column
81    /// value equality.
82    #[serde(default)]
83    #[ts(optional)]
84    pub index: Option<String>,
85
86    /// This [`Table`] should be limited to `limit` rows, after which the
87    /// _earliest_ rows will be overwritten (where _earliest_ is defined as
88    /// relative to insertion order).
89    #[serde(default)]
90    #[ts(optional)]
91    pub limit: Option<u32>,
92}
93
94impl TableInitOptions {
95    pub fn set_name<D: Display>(&mut self, name: D) {
96        self.name = Some(format!("{}", name))
97    }
98}
99
100impl TryFrom<TableOptions> for MakeTableOptions {
101    type Error = ClientError;
102
103    fn try_from(value: TableOptions) -> Result<Self, Self::Error> {
104        Ok(MakeTableOptions {
105            make_table_type: match value {
106                TableOptions {
107                    index: Some(_),
108                    limit: Some(_),
109                } => Err(ClientError::BadTableOptions)?,
110                TableOptions {
111                    index: Some(index), ..
112                } => Some(MakeTableType::MakeIndexTable(index)),
113                TableOptions {
114                    limit: Some(limit), ..
115                } => Some(MakeTableType::MakeLimitTable(limit)),
116                _ => None,
117            },
118        })
119    }
120}
121
122#[derive(Clone, Debug)]
123pub(crate) struct TableOptions {
124    pub index: Option<String>,
125    pub limit: Option<u32>,
126}
127
128impl From<TableInitOptions> for TableOptions {
129    fn from(value: TableInitOptions) -> Self {
130        TableOptions {
131            index: value.index,
132            limit: value.limit,
133        }
134    }
135}
136
137#[derive(Clone, Debug, Default, Deserialize, Serialize, TS)]
138pub struct UpdateOptions {
139    pub port_id: Option<u32>,
140    pub format: Option<TableReadFormat>,
141}
142
143#[derive(Clone, Debug, Serialize, Deserialize)]
144pub struct ValidateExpressionsData {
145    pub expression_schema: HashMap<String, ColumnType>,
146    pub errors: HashMap<String, table_validate_expr_resp::ExprValidationError>,
147    pub expression_alias: HashMap<String, String>,
148}
149
150#[derive(Clone)]
151pub struct Table {
152    name: String,
153    client: Client,
154    options: TableOptions,
155
156    /// If this table is constructed from a View, the view's on_update callback
157    /// is wired into this table. So, we store the token to clean it up properly
158    /// on destruction.
159    pub(crate) view_update_token: Option<u32>,
160}
161
162assert_table_api!(Table);
163
164impl Table {
165    pub(crate) fn new(name: String, client: Client, options: TableOptions) -> Self {
166        Table {
167            name,
168            client,
169            options,
170            view_update_token: None,
171        }
172    }
173
174    fn client_message(&self, req: ClientReq) -> Request {
175        Request {
176            msg_id: self.client.gen_id(),
177            entity_id: self.name.clone(),
178            client_req: Some(req),
179        }
180    }
181
182    #[doc = include_str!("../../docs/table/get_client.md")]
183    pub fn get_client(&self) -> Client {
184        self.client.clone()
185    }
186
187    #[doc = include_str!("../../docs/table/get_features.md")]
188    pub fn get_features(&self) -> ClientResult<Features> {
189        self.client.get_features()
190    }
191
192    #[doc = include_str!("../../docs/table/get_index.md")]
193    pub fn get_index(&self) -> Option<String> {
194        self.options.index.as_ref().map(|index| index.to_owned())
195    }
196
197    #[doc = include_str!("../../docs/table/get_limit.md")]
198    pub fn get_limit(&self) -> Option<u32> {
199        self.options.limit.as_ref().map(|limit| *limit)
200    }
201
202    // #[doc = include_str!("../../docs/table/get_limit.md")]
203    pub fn get_name(&self) -> &str {
204        self.name.as_str()
205    }
206
207    #[doc = include_str!("../../docs/table/clear.md")]
208    pub async fn clear(&self) -> ClientResult<()> {
209        self.replace(UpdateData::JsonRows("[]".to_owned())).await
210    }
211
212    #[doc = include_str!("../../docs/table/delete.md")]
213    pub async fn delete(&self) -> ClientResult<()> {
214        let msg = self.client_message(ClientReq::TableDeleteReq(TableDeleteReq {}));
215        match self.client.oneshot(&msg).await? {
216            ClientResp::TableDeleteResp(_) => Ok(()),
217            resp => Err(resp.into()),
218        }
219    }
220
221    #[doc = include_str!("../../docs/table/columns.md")]
222    pub async fn columns(&self) -> ClientResult<Vec<String>> {
223        let msg = self.client_message(ClientReq::TableSchemaReq(TableSchemaReq {}));
224        match self.client.oneshot(&msg).await? {
225            ClientResp::TableSchemaResp(TableSchemaResp { schema }) => Ok(schema
226                .map(|x| x.schema.into_iter().map(|x| x.name.to_owned()).collect())
227                .unwrap()),
228            resp => Err(resp.into()),
229        }
230    }
231
232    #[doc = include_str!("../../docs/table/size.md")]
233    pub async fn size(&self) -> ClientResult<usize> {
234        let msg = self.client_message(ClientReq::TableSizeReq(TableSizeReq {}));
235        match self.client.oneshot(&msg).await? {
236            ClientResp::TableSizeResp(TableSizeResp { size }) => Ok(size as usize),
237            resp => Err(resp.into()),
238        }
239    }
240
241    #[doc = include_str!("../../docs/table/schema.md")]
242    pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
243        let msg = self.client_message(ClientReq::TableSchemaReq(TableSchemaReq {}));
244        match self.client.oneshot(&msg).await? {
245            ClientResp::TableSchemaResp(TableSchemaResp { schema }) => Ok(schema
246                .map(|x| {
247                    x.schema
248                        .into_iter()
249                        .map(|x| (x.name, ColumnType::try_from(x.r#type).unwrap()))
250                        .collect()
251                })
252                .unwrap()),
253            resp => Err(resp.into()),
254        }
255    }
256
257    #[doc = include_str!("../../docs/table/make_port.md")]
258    pub async fn make_port(&self) -> ClientResult<i32> {
259        let msg = self.client_message(ClientReq::TableMakePortReq(TableMakePortReq {}));
260        match self.client.oneshot(&msg).await? {
261            ClientResp::TableMakePortResp(TableMakePortResp { port_id }) => Ok(port_id as i32),
262            _ => Err(ClientError::Unknown("make_port".to_string())),
263        }
264    }
265
266    #[doc = include_str!("../../docs/table/on_delete.md")]
267    pub async fn on_delete(
268        &self,
269        on_delete: Box<dyn Fn() + Send + Sync + 'static>,
270    ) -> ClientResult<u32> {
271        let callback = move |resp: Response| match resp.client_resp {
272            Some(ClientResp::TableOnDeleteResp(_)) => {
273                on_delete();
274                Ok(())
275            },
276            resp => Err(ClientError::OptionResponseFailed(resp.into())),
277        };
278
279        let msg = self.client_message(ClientReq::TableOnDeleteReq(TableOnDeleteReq {}));
280        self.client.subscribe_once(&msg, Box::new(callback)).await?;
281        Ok(msg.msg_id)
282    }
283
284    #[doc = include_str!("../../docs/table/remove_delete.md")]
285    pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
286        let msg = self.client_message(ClientReq::TableRemoveDeleteReq(TableRemoveDeleteReq {
287            id: callback_id,
288        }));
289
290        match self.client.oneshot(&msg).await? {
291            ClientResp::TableRemoveDeleteResp(_) => Ok(()),
292            resp => Err(resp.into()),
293        }
294    }
295
296    #[doc = include_str!("../../docs/table/remove.md")]
297    pub async fn remove(&self, input: UpdateData) -> ClientResult<()> {
298        let msg = self.client_message(ClientReq::TableRemoveReq(TableRemoveReq {
299            data: Some(input.into()),
300        }));
301
302        match self.client.oneshot(&msg).await? {
303            ClientResp::TableRemoveResp(_) => Ok(()),
304            resp => Err(resp.into()),
305        }
306    }
307
308    #[doc = include_str!("../../docs/table/replace.md")]
309    pub async fn replace(&self, input: UpdateData) -> ClientResult<()> {
310        let msg = self.client_message(ClientReq::TableReplaceReq(TableReplaceReq {
311            data: Some(input.into()),
312        }));
313
314        match self.client.oneshot(&msg).await? {
315            ClientResp::TableReplaceResp(_) => Ok(()),
316            resp => Err(resp.into()),
317        }
318    }
319
320    #[doc = include_str!("../../docs/table/update.md")]
321    pub async fn update(&self, input: UpdateData, options: UpdateOptions) -> ClientResult<()> {
322        let msg = self.client_message(ClientReq::TableUpdateReq(TableUpdateReq {
323            data: Some(input.into()),
324            port_id: options.port_id.unwrap_or(0),
325        }));
326
327        match self.client.oneshot(&msg).await? {
328            ClientResp::TableUpdateResp(_) => Ok(()),
329            resp => Err(resp.into()),
330        }
331    }
332
333    #[doc = include_str!("../../docs/table/validate_expressions.md")]
334    pub async fn validate_expressions(
335        &self,
336        expressions: Expressions,
337    ) -> ClientResult<ValidateExpressionsData> {
338        let msg = self.client_message(ClientReq::TableValidateExprReq(TableValidateExprReq {
339            column_to_expr: expressions.0,
340        }));
341
342        match self.client.oneshot(&msg).await? {
343            ClientResp::TableValidateExprResp(result) => Ok(ValidateExpressionsData {
344                errors: result.errors,
345                expression_alias: result.expression_alias,
346                expression_schema: result
347                    .expression_schema
348                    .into_iter()
349                    .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
350                    .collect(),
351            }),
352            resp => Err(resp.into()),
353        }
354    }
355
356    #[doc = include_str!("../../docs/table/view.md")]
357    pub async fn view(&self, config: Option<ViewConfigUpdate>) -> ClientResult<View> {
358        let view_name = nanoid!();
359        let msg = Request {
360            msg_id: self.client.gen_id(),
361            entity_id: self.name.clone(),
362            client_req: ClientReq::TableMakeViewReq(TableMakeViewReq {
363                view_id: view_name.clone(),
364                config: config.map(|x| x.into()),
365            })
366            .into(),
367        };
368
369        match self.client.oneshot(&msg).await? {
370            ClientResp::TableMakeViewResp(TableMakeViewResp { view_id })
371                if view_id == view_name =>
372            {
373                Ok(View::new(view_name, self.client.clone()))
374            },
375            resp => Err(resp.into()),
376        }
377    }
378}