1use std::collections::HashMap;
14
15use indexmap::IndexMap;
16use prost::Message as ProstMessage;
17use prost::bytes::{Bytes, BytesMut};
18
19use super::data::RowPathStyle;
20use super::error::VirtualServerError;
21use super::handler::VirtualServerHandler;
22use crate::config::{ViewConfig, ViewConfigUpdate};
23use crate::proto::response::ClientResp;
24use crate::proto::table_validate_expr_resp::ExprValidationError;
25use crate::proto::{
26 ColumnType, GetFeaturesResp, GetHostedTablesResp, MakeTableResp, Request, Response,
27 ServerError, TableMakePortResp, TableMakeViewResp, TableOnDeleteResp, TableRemoveDeleteResp,
28 TableSchemaResp, TableSizeResp, TableValidateExprResp, ViewColumnPathsResp, ViewDeleteResp,
29 ViewDimensionsResp, ViewExpressionSchemaResp, ViewGetConfigResp, ViewGetMinMaxResp,
30 ViewOnDeleteResp, ViewOnUpdateResp, ViewRemoveDeleteResp, ViewRemoveOnUpdateResp,
31 ViewSchemaResp, ViewToArrowResp, ViewToColumnsStringResp, ViewToCsvResp,
32 ViewToNdjsonStringResp, ViewToRowsStringResp,
33};
34
35macro_rules! respond {
36 ($msg:ident, $name:ident { $($rest:tt)* }) => {{
37 let mut resp = BytesMut::new();
38 let resp2 = ClientResp::$name($name {
39 $($rest)*
40 });
41
42 Response {
43 msg_id: $msg.msg_id,
44 entity_id: $msg.entity_id,
45 client_resp: Some(resp2),
46 }.encode(&mut resp).map_err(VirtualServerError::EncodeError)?;
47
48 resp.freeze()
49 }};
50}
51
52pub struct VirtualServer<T: VirtualServerHandler> {
58 handler: T,
59 view_to_table: IndexMap<String, String>,
60 view_configs: IndexMap<String, ViewConfig>,
61 view_schemas: IndexMap<String, IndexMap<String, ColumnType>>,
62}
63
64impl<T: VirtualServerHandler> VirtualServer<T> {
65 pub fn new(handler: T) -> Self {
67 Self {
68 handler,
69 view_configs: IndexMap::default(),
70 view_to_table: IndexMap::default(),
71 view_schemas: IndexMap::default(),
72 }
73 }
74
75 pub async fn handle_request(
80 &mut self,
81 bytes: Bytes,
82 ) -> Result<Bytes, VirtualServerError<T::Error>> {
83 let msg = Request::decode(bytes).map_err(VirtualServerError::DecodeError)?;
84 tracing::debug!(
85 "Handling request: entity_id={}, req={:?}",
86 msg.entity_id,
87 msg.client_req
88 );
89
90 match self.internal_handle_request(msg.clone()).await {
91 Ok(resp) => Ok(resp),
92 Err(err) => {
93 tracing::error!("{}", err);
94 Ok(respond!(msg, ServerError {
95 message: err.to_string(),
96 status_code: 0
97 }))
98 },
99 }
100 }
101
102 async fn get_cached_view_schema(
103 &mut self,
104 entity_id: &str,
105 to_psp_format: bool,
106 ) -> Result<IndexMap<String, ColumnType>, VirtualServerError<T::Error>> {
107 if !self.view_schemas.contains_key(entity_id) {
108 self.view_schemas.insert(
109 entity_id.to_string(),
110 self.handler
111 .view_schema(entity_id, self.view_configs.get(entity_id).unwrap())
112 .await?,
113 );
114 }
115
116 if to_psp_format {
117 Ok(self
118 .view_schemas
119 .get(entity_id)
120 .unwrap()
121 .iter()
122 .map(|(k, v)| {
123 (
124 k.split("_").collect::<Vec<_>>().last().unwrap().to_string(),
125 *v,
126 )
127 })
128 .collect())
129 } else {
130 Ok(self.view_schemas.get(entity_id).cloned().unwrap())
131 }
132 }
133
134 async fn internal_handle_request(
135 &mut self,
136 msg: Request,
137 ) -> Result<Bytes, VirtualServerError<T::Error>> {
138 use crate::proto::request::ClientReq::*;
139 let resp = match msg.client_req.unwrap() {
140 GetFeaturesReq(_) => {
141 let features = self.handler.get_features().await?;
142 respond!(msg, GetFeaturesResp { ..features.into() })
143 },
144 GetHostedTablesReq(_) => {
145 respond!(msg, GetHostedTablesResp {
146 table_infos: self.handler.get_hosted_tables().await?
147 })
148 },
149 TableSchemaReq(_) => {
150 respond!(msg, TableSchemaResp {
151 schema: Some(crate::proto::Schema {
152 schema: self
153 .handler
154 .table_schema(msg.entity_id.as_str())
155 .await?
156 .iter()
157 .map(|x| crate::proto::schema::KeyTypePair {
158 name: x.0.to_string(),
159 r#type: *x.1 as i32,
160 })
161 .collect()
162 })
163 })
164 },
165 TableMakePortReq(req) => {
166 respond!(msg, TableMakePortResp {
167 port_id: self.handler.table_make_port(&req).await?
168 })
169 },
170 TableMakeViewReq(req) => {
171 self.view_to_table
172 .insert(req.view_id.clone(), msg.entity_id.clone());
173
174 let mut config: ViewConfigUpdate = req.config.clone().unwrap_or_default().into();
175 let bytes = respond!(msg, TableMakeViewResp {
176 view_id: self
177 .handler
178 .table_make_view(msg.entity_id.as_str(), req.view_id.as_str(), &mut config)
179 .await?
180 });
181
182 self.view_configs.insert(req.view_id.clone(), config.into());
183 bytes
184 },
185 TableSizeReq(_) => {
186 respond!(msg, TableSizeResp {
187 size: self.handler.table_size(msg.entity_id.as_str()).await?
188 })
189 },
190 TableValidateExprReq(req) => {
191 let mut expression_schema = HashMap::<String, i32>::default();
192 let mut expression_alias = HashMap::<String, String>::default();
193 let mut errors = HashMap::<String, ExprValidationError>::default();
194 for (name, ex) in req.column_to_expr.iter() {
195 let _ = expression_alias.insert(name.clone(), ex.clone());
196 match self
197 .handler
198 .table_validate_expression(&msg.entity_id, ex.as_str())
199 .await
200 {
201 Ok(dtype) => {
202 let _ = expression_schema.insert(name.clone(), dtype as i32);
203 },
204 Err(e) => {
205 let _ = errors.insert(name.clone(), ExprValidationError {
206 error_message: format!("{}", e),
207 line: 0,
208 column: 0,
209 });
210 },
211 }
212 }
213
214 respond!(msg, TableValidateExprResp {
215 expression_schema,
216 errors,
217 expression_alias,
218 })
219 },
220 ViewSchemaReq(_) => {
221 respond!(msg, ViewSchemaResp {
222 schema: self
223 .get_cached_view_schema(&msg.entity_id, true)
224 .await?
225 .into_iter()
226 .map(|(x, y)| (x.to_string(), y as i32))
227 .collect()
228 })
229 },
230 ViewDimensionsReq(_) => {
231 let view_id = &msg.entity_id;
232 let table_id = self
233 .view_to_table
234 .get(view_id)
235 .ok_or_else(|| VirtualServerError::UnknownViewId(view_id.to_string()))?;
236
237 let num_table_rows = self.handler.table_size(table_id).await?;
238 let num_table_columns = self.handler.table_column_size(table_id).await? as u32;
239 let config = self.view_configs.get(view_id).unwrap();
240 let num_view_columns = self.handler.view_column_size(view_id, config).await? as u32;
241 let num_view_rows = self.handler.view_size(view_id).await?;
242 let resp = ViewDimensionsResp {
243 num_table_columns,
244 num_table_rows,
245 num_view_columns,
246 num_view_rows,
247 };
248
249 respond!(msg, ViewDimensionsResp { ..resp })
250 },
251 ViewGetConfigReq(_) => {
252 respond!(msg, ViewGetConfigResp {
253 config: Some(
254 ViewConfigUpdate::from(
255 self.view_configs.get(&msg.entity_id).unwrap().clone()
256 )
257 .into()
258 )
259 })
260 },
261 ViewExpressionSchemaReq(_) => {
262 let mut schema = HashMap::<String, i32>::default();
263 let table_id = self.view_to_table.get(&msg.entity_id);
264 for (name, ex) in self
265 .view_configs
266 .get(&msg.entity_id)
267 .unwrap()
268 .expressions
269 .iter()
270 {
271 match self
272 .handler
273 .table_validate_expression(table_id.unwrap(), ex.as_str())
274 .await
275 {
276 Ok(dtype) => {
277 let _ = schema.insert(name.clone(), dtype as i32);
278 },
279 Err(_e) => {
280 },
282 }
283 }
284
285 let resp = ViewExpressionSchemaResp { schema };
286 respond!(msg, ViewExpressionSchemaResp { ..resp })
287 },
288 ViewColumnPathsReq(_) => {
289 respond!(msg, ViewColumnPathsResp {
290 paths: self
291 .handler
292 .view_schema(
293 msg.entity_id.as_str(),
294 self.view_configs.get(&msg.entity_id).unwrap()
295 )
296 .await?
297 .keys()
298 .cloned()
299 .collect()
300 })
301 },
302 ViewToArrowReq(view_to_arrow_req) => {
303 let viewport = view_to_arrow_req.viewport.unwrap();
304 let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
305 let config = self.view_configs.get(&msg.entity_id).unwrap();
306 let mut cols = self
307 .handler
308 .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
309 .await?;
310
311 let arrow = cols
312 .render_to_arrow_ipc()
313 .map_err(|e| VirtualServerError::Other(e.to_string()))?;
314
315 respond!(msg, ViewToArrowResp { arrow })
316 },
317 ViewToCsvReq(view_to_csv_req) => {
318 let viewport = view_to_csv_req.viewport.unwrap();
319 let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
320 let config = self.view_configs.get(&msg.entity_id).unwrap();
321 let mut cols = self
322 .handler
323 .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
324 .await?;
325
326 let rows = cols.render_to_rows(RowPathStyle::PerLevel);
327 let mut csv = String::new();
328 if let Some(first_row) = rows.first() {
329 let headers: Vec<&str> = first_row.keys().map(|k| k.as_str()).collect();
330 csv.push_str(&headers.join(","));
331 csv.push('\n');
332 }
333
334 for row in &rows {
335 let values: Vec<String> = row
336 .values()
337 .map(|cell| serde_json::to_string(cell).unwrap_or_default())
338 .collect();
339 csv.push_str(&values.join(","));
340 csv.push('\n');
341 }
342
343 respond!(msg, ViewToCsvResp { csv })
344 },
345 ViewToNdjsonStringReq(view_to_ndjson_req) => {
346 let viewport = view_to_ndjson_req.viewport.unwrap();
347 let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
348 let config = self.view_configs.get(&msg.entity_id).unwrap();
349 let mut cols = self
350 .handler
351 .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
352 .await?;
353
354 let rows = cols.render_to_rows(RowPathStyle::PerLevel);
355 let ndjson_string = rows
356 .iter()
357 .map(serde_json::to_string)
358 .collect::<Result<Vec<_>, _>>()
359 .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?
360 .join("\n");
361
362 respond!(msg, ViewToNdjsonStringResp { ndjson_string })
363 },
364 ViewToRowsStringReq(view_to_rows_string_req) => {
365 let viewport = view_to_rows_string_req.viewport.unwrap();
366 let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
367 let config = self.view_configs.get(&msg.entity_id).unwrap();
368 let mut cols = self
369 .handler
370 .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
371 .await?;
372
373 let rows = cols.render_to_rows(RowPathStyle::Sidecar);
374 let json_string = serde_json::to_string(&rows)
375 .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
376
377 respond!(msg, ViewToRowsStringResp { json_string })
378 },
379 ViewToColumnsStringReq(view_to_columns_string_req) => {
380 let viewport = view_to_columns_string_req.viewport.unwrap();
381 let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
382 let config = self.view_configs.get(&msg.entity_id).unwrap();
383 let mut cols = self
384 .handler
385 .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
386 .await?;
387
388 let json_string = cols
389 .render_to_columns_json(RowPathStyle::Sidecar)
390 .map_err(|e| VirtualServerError::Other(e.to_string()))?;
391
392 respond!(msg, ViewToColumnsStringResp { json_string })
393 },
394 ViewDeleteReq(_) => {
395 self.handler.view_delete(msg.entity_id.as_str()).await?;
396 self.view_to_table.shift_remove(&msg.entity_id);
397 self.view_configs.shift_remove(&msg.entity_id);
398 respond!(msg, ViewDeleteResp {})
399 },
400 MakeTableReq(req) => {
401 self.handler
402 .make_table(&msg.entity_id, req.data.as_ref().unwrap())
403 .await?;
404 respond!(msg, MakeTableResp {})
405 },
406 ViewGetMinMaxReq(req) => {
407 let config = self.view_configs.get(&msg.entity_id).unwrap();
408 let (min, max) = self
409 .handler
410 .view_get_min_max(&msg.entity_id, &req.column_name, config)
411 .await?;
412 respond!(msg, ViewGetMinMaxResp {
413 min: Some(min.into()),
414 max: Some(max.into()),
415 })
416 },
417
418 TableOnDeleteReq(_) => {
420 respond!(msg, TableOnDeleteResp {})
421 },
422 ViewOnUpdateReq(_) => {
423 respond!(msg, ViewOnUpdateResp {
424 delta: None,
425 port_id: 0
426 })
427 },
428 ViewOnDeleteReq(_) => {
429 respond!(msg, ViewOnDeleteResp {})
430 },
431 ViewRemoveOnUpdateReq(_) => {
432 respond!(msg, ViewRemoveOnUpdateResp {})
433 },
434 TableRemoveDeleteReq(_) => {
435 respond!(msg, TableRemoveDeleteResp {})
436 },
437 ViewRemoveDeleteReq(_) => {
438 respond!(msg, ViewRemoveDeleteResp {})
439 },
440 x => {
441 return Err(VirtualServerError::Other(format!(
443 "Unhandled request: {:?}",
444 x
445 )));
446 },
447 };
448
449 Ok(resp)
450 }
451}