1use std::collections::HashMap;
14use std::fmt::Display;
15
16use nanoid::*;
17use serde::{Deserialize, Serialize};
18use ts_rs::TS;
19
20use crate::assert_table_api;
21use crate::client::{Client, Features};
22use crate::config::{Expressions, ViewConfigUpdate};
23use crate::proto::make_table_req::make_table_options::MakeTableType;
24use crate::proto::make_table_req::MakeTableOptions;
25use crate::proto::request::ClientReq;
26use crate::proto::response::ClientResp;
27use crate::proto::*;
28use crate::table_data::UpdateData;
29use crate::utils::*;
30use crate::view::View;
31
32pub type Schema = HashMap<String, ColumnType>;
33
34#[derive(Clone, Copy, Debug, Serialize, Deserialize, TS)]
35pub enum TableReadFormat {
36 #[serde(rename = "csv")]
37 Csv,
38
39 #[serde(rename = "json")]
40 JsonString,
41
42 #[serde(rename = "columns")]
43 ColumnsString,
44
45 #[serde(rename = "arrow")]
46 Arrow,
47
48 #[serde(rename = "ndjson")]
49 Ndjson,
50}
51
52impl TableReadFormat {
53 pub fn parse(value: Option<String>) -> Result<Option<Self>, String> {
54 Ok(match value.as_deref() {
55 Some("csv") => Some(TableReadFormat::Csv),
56 Some("json") => Some(TableReadFormat::JsonString),
57 Some("columns") => Some(TableReadFormat::ColumnsString),
58 Some("arrow") => Some(TableReadFormat::Arrow),
59 Some("ndjson") => Some(TableReadFormat::Ndjson),
60 None => None,
61 Some(x) => return Err(format!("Unknown format \"{}\"", x)),
62 })
63 }
64}
65
66#[derive(Clone, Debug, Default, Serialize, Deserialize, TS)]
69pub struct TableInitOptions {
70 #[serde(default)]
71 #[ts(optional)]
72 pub name: Option<String>,
73
74 #[serde(default)]
75 #[ts(optional)]
76 pub format: Option<TableReadFormat>,
77
78 #[serde(default)]
83 #[ts(optional)]
84 pub index: Option<String>,
85
86 #[serde(default)]
90 #[ts(optional)]
91 pub limit: Option<u32>,
92}
93
94impl TableInitOptions {
95 pub fn set_name<D: Display>(&mut self, name: D) {
96 self.name = Some(format!("{}", name))
97 }
98}
99
100impl TryFrom<TableOptions> for MakeTableOptions {
101 type Error = ClientError;
102
103 fn try_from(value: TableOptions) -> Result<Self, Self::Error> {
104 Ok(MakeTableOptions {
105 make_table_type: match value {
106 TableOptions {
107 index: Some(_),
108 limit: Some(_),
109 } => Err(ClientError::BadTableOptions)?,
110 TableOptions {
111 index: Some(index), ..
112 } => Some(MakeTableType::MakeIndexTable(index)),
113 TableOptions {
114 limit: Some(limit), ..
115 } => Some(MakeTableType::MakeLimitTable(limit)),
116 _ => None,
117 },
118 })
119 }
120}
121
122#[derive(Clone, Debug)]
123pub(crate) struct TableOptions {
124 pub index: Option<String>,
125 pub limit: Option<u32>,
126}
127
128impl From<TableInitOptions> for TableOptions {
129 fn from(value: TableInitOptions) -> Self {
130 TableOptions {
131 index: value.index,
132 limit: value.limit,
133 }
134 }
135}
136
137#[derive(Clone, Debug, Default, Deserialize, Serialize, TS)]
138pub struct UpdateOptions {
139 pub port_id: Option<u32>,
140 pub format: Option<TableReadFormat>,
141}
142
143#[derive(Clone, Debug, Serialize, Deserialize)]
144pub struct ValidateExpressionsData {
145 pub expression_schema: HashMap<String, ColumnType>,
146 pub errors: HashMap<String, table_validate_expr_resp::ExprValidationError>,
147 pub expression_alias: HashMap<String, String>,
148}
149
150#[derive(Clone)]
151pub struct Table {
152 name: String,
153 client: Client,
154 options: TableOptions,
155
156 pub(crate) view_update_token: Option<u32>,
160}
161
162assert_table_api!(Table);
163
164impl Table {
165 pub(crate) fn new(name: String, client: Client, options: TableOptions) -> Self {
166 Table {
167 name,
168 client,
169 options,
170 view_update_token: None,
171 }
172 }
173
174 fn client_message(&self, req: ClientReq) -> Request {
175 Request {
176 msg_id: self.client.gen_id(),
177 entity_id: self.name.clone(),
178 client_req: Some(req),
179 }
180 }
181
182 #[doc = include_str!("../../docs/table/get_client.md")]
183 pub fn get_client(&self) -> Client {
184 self.client.clone()
185 }
186
187 #[doc = include_str!("../../docs/table/get_features.md")]
188 pub fn get_features(&self) -> ClientResult<Features> {
189 self.client.get_features()
190 }
191
192 #[doc = include_str!("../../docs/table/get_index.md")]
193 pub fn get_index(&self) -> Option<String> {
194 self.options.index.as_ref().map(|index| index.to_owned())
195 }
196
197 #[doc = include_str!("../../docs/table/get_limit.md")]
198 pub fn get_limit(&self) -> Option<u32> {
199 self.options.limit.as_ref().map(|limit| *limit)
200 }
201
202 pub fn get_name(&self) -> &str {
204 self.name.as_str()
205 }
206
207 #[doc = include_str!("../../docs/table/clear.md")]
208 pub async fn clear(&self) -> ClientResult<()> {
209 self.replace(UpdateData::JsonRows("[]".to_owned())).await
210 }
211
212 #[doc = include_str!("../../docs/table/delete.md")]
213 pub async fn delete(&self) -> ClientResult<()> {
214 let msg = self.client_message(ClientReq::TableDeleteReq(TableDeleteReq {}));
215 match self.client.oneshot(&msg).await? {
216 ClientResp::TableDeleteResp(_) => Ok(()),
217 resp => Err(resp.into()),
218 }
219 }
220
221 #[doc = include_str!("../../docs/table/columns.md")]
222 pub async fn columns(&self) -> ClientResult<Vec<String>> {
223 let msg = self.client_message(ClientReq::TableSchemaReq(TableSchemaReq {}));
224 match self.client.oneshot(&msg).await? {
225 ClientResp::TableSchemaResp(TableSchemaResp { schema }) => Ok(schema
226 .map(|x| x.schema.into_iter().map(|x| x.name.to_owned()).collect())
227 .unwrap()),
228 resp => Err(resp.into()),
229 }
230 }
231
232 #[doc = include_str!("../../docs/table/size.md")]
233 pub async fn size(&self) -> ClientResult<usize> {
234 let msg = self.client_message(ClientReq::TableSizeReq(TableSizeReq {}));
235 match self.client.oneshot(&msg).await? {
236 ClientResp::TableSizeResp(TableSizeResp { size }) => Ok(size as usize),
237 resp => Err(resp.into()),
238 }
239 }
240
241 #[doc = include_str!("../../docs/table/schema.md")]
242 pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
243 let msg = self.client_message(ClientReq::TableSchemaReq(TableSchemaReq {}));
244 match self.client.oneshot(&msg).await? {
245 ClientResp::TableSchemaResp(TableSchemaResp { schema }) => Ok(schema
246 .map(|x| {
247 x.schema
248 .into_iter()
249 .map(|x| (x.name, ColumnType::try_from(x.r#type).unwrap()))
250 .collect()
251 })
252 .unwrap()),
253 resp => Err(resp.into()),
254 }
255 }
256
257 #[doc = include_str!("../../docs/table/make_port.md")]
258 pub async fn make_port(&self) -> ClientResult<i32> {
259 let msg = self.client_message(ClientReq::TableMakePortReq(TableMakePortReq {}));
260 match self.client.oneshot(&msg).await? {
261 ClientResp::TableMakePortResp(TableMakePortResp { port_id }) => Ok(port_id as i32),
262 _ => Err(ClientError::Unknown("make_port".to_string())),
263 }
264 }
265
266 #[doc = include_str!("../../docs/table/on_delete.md")]
267 pub async fn on_delete(
268 &self,
269 on_delete: Box<dyn Fn() + Send + Sync + 'static>,
270 ) -> ClientResult<u32> {
271 let callback = move |resp: Response| match resp.client_resp {
272 Some(ClientResp::TableOnDeleteResp(_)) => {
273 on_delete();
274 Ok(())
275 },
276 resp => Err(ClientError::OptionResponseFailed(resp.into())),
277 };
278
279 let msg = self.client_message(ClientReq::TableOnDeleteReq(TableOnDeleteReq {}));
280 self.client.subscribe_once(&msg, Box::new(callback)).await?;
281 Ok(msg.msg_id)
282 }
283
284 #[doc = include_str!("../../docs/table/remove_delete.md")]
285 pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
286 let msg = self.client_message(ClientReq::TableRemoveDeleteReq(TableRemoveDeleteReq {
287 id: callback_id,
288 }));
289
290 match self.client.oneshot(&msg).await? {
291 ClientResp::TableRemoveDeleteResp(_) => Ok(()),
292 resp => Err(resp.into()),
293 }
294 }
295
296 #[doc = include_str!("../../docs/table/remove.md")]
297 pub async fn remove(&self, input: UpdateData) -> ClientResult<()> {
298 let msg = self.client_message(ClientReq::TableRemoveReq(TableRemoveReq {
299 data: Some(input.into()),
300 }));
301
302 match self.client.oneshot(&msg).await? {
303 ClientResp::TableRemoveResp(_) => Ok(()),
304 resp => Err(resp.into()),
305 }
306 }
307
308 #[doc = include_str!("../../docs/table/replace.md")]
309 pub async fn replace(&self, input: UpdateData) -> ClientResult<()> {
310 let msg = self.client_message(ClientReq::TableReplaceReq(TableReplaceReq {
311 data: Some(input.into()),
312 }));
313
314 match self.client.oneshot(&msg).await? {
315 ClientResp::TableReplaceResp(_) => Ok(()),
316 resp => Err(resp.into()),
317 }
318 }
319
320 #[doc = include_str!("../../docs/table/update.md")]
321 pub async fn update(&self, input: UpdateData, options: UpdateOptions) -> ClientResult<()> {
322 let msg = self.client_message(ClientReq::TableUpdateReq(TableUpdateReq {
323 data: Some(input.into()),
324 port_id: options.port_id.unwrap_or(0),
325 }));
326
327 match self.client.oneshot(&msg).await? {
328 ClientResp::TableUpdateResp(_) => Ok(()),
329 resp => Err(resp.into()),
330 }
331 }
332
333 #[doc = include_str!("../../docs/table/validate_expressions.md")]
334 pub async fn validate_expressions(
335 &self,
336 expressions: Expressions,
337 ) -> ClientResult<ValidateExpressionsData> {
338 let msg = self.client_message(ClientReq::TableValidateExprReq(TableValidateExprReq {
339 column_to_expr: expressions.0,
340 }));
341
342 match self.client.oneshot(&msg).await? {
343 ClientResp::TableValidateExprResp(result) => Ok(ValidateExpressionsData {
344 errors: result.errors,
345 expression_alias: result.expression_alias,
346 expression_schema: result
347 .expression_schema
348 .into_iter()
349 .map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
350 .collect(),
351 }),
352 resp => Err(resp.into()),
353 }
354 }
355
356 #[doc = include_str!("../../docs/table/view.md")]
357 pub async fn view(&self, config: Option<ViewConfigUpdate>) -> ClientResult<View> {
358 let view_name = nanoid!();
359 let msg = Request {
360 msg_id: self.client.gen_id(),
361 entity_id: self.name.clone(),
362 client_req: ClientReq::TableMakeViewReq(TableMakeViewReq {
363 view_id: view_name.clone(),
364 config: config.map(|x| x.into()),
365 })
366 .into(),
367 };
368
369 match self.client.oneshot(&msg).await? {
370 ClientResp::TableMakeViewResp(TableMakeViewResp { view_id })
371 if view_id == view_name =>
372 {
373 Ok(View::new(view_name, self.client.clone()))
374 },
375 resp => Err(resp.into()),
376 }
377 }
378}