1use std::collections::HashMap;
14
15use indexmap::IndexMap;
16use prost::Message as ProstMessage;
17use prost::bytes::{Bytes, BytesMut};
18
19use super::error::VirtualServerError;
20use super::handler::VirtualServerHandler;
21use crate::config::{ViewConfig, ViewConfigUpdate};
22use crate::proto::response::ClientResp;
23use crate::proto::table_validate_expr_resp::ExprValidationError;
24use crate::proto::{
25 ColumnType, GetFeaturesResp, GetHostedTablesResp, MakeTableResp, Request, Response,
26 ServerError, TableMakePortResp, TableMakeViewResp, TableOnDeleteResp, TableRemoveDeleteResp,
27 TableSchemaResp, TableSizeResp, TableValidateExprResp, ViewColumnPathsResp, ViewDeleteResp,
28 ViewDimensionsResp, ViewExpressionSchemaResp, ViewGetConfigResp, ViewGetMinMaxResp,
29 ViewOnDeleteResp, ViewOnUpdateResp, ViewRemoveDeleteResp, ViewRemoveOnUpdateResp,
30 ViewSchemaResp, ViewToArrowResp, ViewToColumnsStringResp, ViewToCsvResp,
31 ViewToNdjsonStringResp, ViewToRowsStringResp,
32};
33
34macro_rules! respond {
35 ($msg:ident, $name:ident { $($rest:tt)* }) => {{
36 let mut resp = BytesMut::new();
37 let resp2 = ClientResp::$name($name {
38 $($rest)*
39 });
40
41 Response {
42 msg_id: $msg.msg_id,
43 entity_id: $msg.entity_id,
44 client_resp: Some(resp2),
45 }.encode(&mut resp).map_err(VirtualServerError::EncodeError)?;
46
47 resp.freeze()
48 }};
49}
50
51pub struct VirtualServer<T: VirtualServerHandler> {
57 handler: T,
58 view_to_table: IndexMap<String, String>,
59 view_configs: IndexMap<String, ViewConfig>,
60 view_schemas: IndexMap<String, IndexMap<String, ColumnType>>,
61}
62
63impl<T: VirtualServerHandler> VirtualServer<T> {
64 pub fn new(handler: T) -> Self {
66 Self {
67 handler,
68 view_configs: IndexMap::default(),
69 view_to_table: IndexMap::default(),
70 view_schemas: IndexMap::default(),
71 }
72 }
73
74 pub async fn handle_request(
79 &mut self,
80 bytes: Bytes,
81 ) -> Result<Bytes, VirtualServerError<T::Error>> {
82 let msg = Request::decode(bytes).map_err(VirtualServerError::DecodeError)?;
83 tracing::debug!(
84 "Handling request: entity_id={}, req={:?}",
85 msg.entity_id,
86 msg.client_req
87 );
88
89 match self.internal_handle_request(msg.clone()).await {
90 Ok(resp) => Ok(resp),
91 Err(err) => {
92 tracing::error!("{}", err);
93 Ok(respond!(msg, ServerError {
94 message: err.to_string(),
95 status_code: 0
96 }))
97 },
98 }
99 }
100
101 async fn get_cached_view_schema(
102 &mut self,
103 entity_id: &str,
104 to_psp_format: bool,
105 ) -> Result<IndexMap<String, ColumnType>, VirtualServerError<T::Error>> {
106 if !self.view_schemas.contains_key(entity_id) {
107 self.view_schemas.insert(
108 entity_id.to_string(),
109 self.handler
110 .view_schema(entity_id, self.view_configs.get(entity_id).unwrap())
111 .await?,
112 );
113 }
114
115 if to_psp_format {
116 Ok(self
117 .view_schemas
118 .get(entity_id)
119 .unwrap()
120 .iter()
121 .map(|(k, v)| {
122 (
123 k.split("_").collect::<Vec<_>>().last().unwrap().to_string(),
124 *v,
125 )
126 })
127 .collect())
128 } else {
129 Ok(self.view_schemas.get(entity_id).cloned().unwrap())
130 }
131 }
132
133 async fn internal_handle_request(
134 &mut self,
135 msg: Request,
136 ) -> Result<Bytes, VirtualServerError<T::Error>> {
137 use crate::proto::request::ClientReq::*;
138 let resp = match msg.client_req.unwrap() {
139 GetFeaturesReq(_) => {
140 let features = self.handler.get_features().await?;
141 respond!(msg, GetFeaturesResp { ..features.into() })
142 },
143 GetHostedTablesReq(_) => {
144 respond!(msg, GetHostedTablesResp {
145 table_infos: self.handler.get_hosted_tables().await?
146 })
147 },
148 TableSchemaReq(_) => {
149 respond!(msg, TableSchemaResp {
150 schema: Some(crate::proto::Schema {
151 schema: self
152 .handler
153 .table_schema(msg.entity_id.as_str())
154 .await?
155 .iter()
156 .map(|x| crate::proto::schema::KeyTypePair {
157 name: x.0.to_string(),
158 r#type: *x.1 as i32,
159 })
160 .collect()
161 })
162 })
163 },
164 TableMakePortReq(req) => {
165 respond!(msg, TableMakePortResp {
166 port_id: self.handler.table_make_port(&req).await?
167 })
168 },
169 TableMakeViewReq(req) => {
170 self.view_to_table
171 .insert(req.view_id.clone(), msg.entity_id.clone());
172
173 let mut config: ViewConfigUpdate = req.config.clone().unwrap_or_default().into();
174 let bytes = respond!(msg, TableMakeViewResp {
175 view_id: self
176 .handler
177 .table_make_view(msg.entity_id.as_str(), req.view_id.as_str(), &mut config)
178 .await?
179 });
180
181 self.view_configs.insert(req.view_id.clone(), config.into());
182 bytes
183 },
184 TableSizeReq(_) => {
185 respond!(msg, TableSizeResp {
186 size: self.handler.table_size(msg.entity_id.as_str()).await?
187 })
188 },
189 TableValidateExprReq(req) => {
190 let mut expression_schema = HashMap::<String, i32>::default();
191 let mut expression_alias = HashMap::<String, String>::default();
192 let mut errors = HashMap::<String, ExprValidationError>::default();
193 for (name, ex) in req.column_to_expr.iter() {
194 let _ = expression_alias.insert(name.clone(), ex.clone());
195 match self
196 .handler
197 .table_validate_expression(&msg.entity_id, ex.as_str())
198 .await
199 {
200 Ok(dtype) => {
201 let _ = expression_schema.insert(name.clone(), dtype as i32);
202 },
203 Err(e) => {
204 let _ = errors.insert(name.clone(), ExprValidationError {
205 error_message: format!("{}", e),
206 line: 0,
207 column: 0,
208 });
209 },
210 }
211 }
212
213 respond!(msg, TableValidateExprResp {
214 expression_schema,
215 errors,
216 expression_alias,
217 })
218 },
219 ViewSchemaReq(_) => {
220 respond!(msg, ViewSchemaResp {
221 schema: self
222 .get_cached_view_schema(&msg.entity_id, true)
223 .await?
224 .into_iter()
225 .map(|(x, y)| (x.to_string(), y as i32))
226 .collect()
227 })
228 },
229 ViewDimensionsReq(_) => {
230 let view_id = &msg.entity_id;
231 let table_id = self
232 .view_to_table
233 .get(view_id)
234 .ok_or_else(|| VirtualServerError::UnknownViewId(view_id.to_string()))?;
235
236 let num_table_rows = self.handler.table_size(table_id).await?;
237 let num_table_columns = self.handler.table_column_size(table_id).await? as u32;
238 let config = self.view_configs.get(view_id).unwrap();
239 let num_view_columns = self.handler.view_column_size(view_id, config).await? as u32;
240 let num_view_rows = self.handler.view_size(view_id).await?;
241 let resp = ViewDimensionsResp {
242 num_table_columns,
243 num_table_rows,
244 num_view_columns,
245 num_view_rows,
246 };
247
248 respond!(msg, ViewDimensionsResp { ..resp })
249 },
250 ViewGetConfigReq(_) => {
251 respond!(msg, ViewGetConfigResp {
252 config: Some(
253 ViewConfigUpdate::from(
254 self.view_configs.get(&msg.entity_id).unwrap().clone()
255 )
256 .into()
257 )
258 })
259 },
260 ViewExpressionSchemaReq(_) => {
261 let mut schema = HashMap::<String, i32>::default();
262 let table_id = self.view_to_table.get(&msg.entity_id);
263 for (name, ex) in self
264 .view_configs
265 .get(&msg.entity_id)
266 .unwrap()
267 .expressions
268 .iter()
269 {
270 match self
271 .handler
272 .table_validate_expression(table_id.unwrap(), ex.as_str())
273 .await
274 {
275 Ok(dtype) => {
276 let _ = schema.insert(name.clone(), dtype as i32);
277 },
278 Err(_e) => {
279 },
281 }
282 }
283
284 let resp = ViewExpressionSchemaResp { schema };
285 respond!(msg, ViewExpressionSchemaResp { ..resp })
286 },
287 ViewColumnPathsReq(_) => {
288 respond!(msg, ViewColumnPathsResp {
289 paths: self
290 .handler
291 .view_schema(
292 msg.entity_id.as_str(),
293 self.view_configs.get(&msg.entity_id).unwrap()
294 )
295 .await?
296 .keys()
297 .cloned()
298 .collect()
299 })
300 },
301 ViewToArrowReq(view_to_arrow_req) => {
302 let viewport = view_to_arrow_req.viewport.unwrap();
303 let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
304 let config = self.view_configs.get(&msg.entity_id).unwrap();
305 let mut cols = self
306 .handler
307 .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
308 .await?;
309
310 let arrow = cols
311 .render_to_arrow_ipc()
312 .map_err(|e| VirtualServerError::Other(e.to_string()))?;
313
314 respond!(msg, ViewToArrowResp { arrow })
315 },
316 ViewToCsvReq(view_to_csv_req) => {
317 let viewport = view_to_csv_req.viewport.unwrap();
318 let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
319 let config = self.view_configs.get(&msg.entity_id).unwrap();
320 let mut cols = self
321 .handler
322 .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
323 .await?;
324
325 let rows = cols.render_to_rows();
326 let mut csv = String::new();
327 if let Some(first_row) = rows.first() {
328 let headers: Vec<&str> = first_row.keys().map(|k| k.as_str()).collect();
329 csv.push_str(&headers.join(","));
330 csv.push('\n');
331 }
332
333 for row in &rows {
334 let values: Vec<String> = row
335 .values()
336 .map(|cell| serde_json::to_string(cell).unwrap_or_default())
337 .collect();
338 csv.push_str(&values.join(","));
339 csv.push('\n');
340 }
341
342 respond!(msg, ViewToCsvResp { csv })
343 },
344 ViewToNdjsonStringReq(view_to_ndjson_req) => {
345 let viewport = view_to_ndjson_req.viewport.unwrap();
346 let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
347 let config = self.view_configs.get(&msg.entity_id).unwrap();
348 let mut cols = self
349 .handler
350 .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
351 .await?;
352
353 let rows = cols.render_to_rows();
354 let ndjson_string = rows
355 .iter()
356 .map(serde_json::to_string)
357 .collect::<Result<Vec<_>, _>>()
358 .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?
359 .join("\n");
360
361 respond!(msg, ViewToNdjsonStringResp { ndjson_string })
362 },
363 ViewToRowsStringReq(view_to_rows_string_req) => {
364 let viewport = view_to_rows_string_req.viewport.unwrap();
365 let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
366 let config = self.view_configs.get(&msg.entity_id).unwrap();
367 let mut cols = self
368 .handler
369 .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
370 .await?;
371
372 let rows = cols.render_to_rows();
373 let json_string = serde_json::to_string(&rows)
374 .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
375
376 respond!(msg, ViewToRowsStringResp { json_string })
377 },
378 ViewToColumnsStringReq(view_to_columns_string_req) => {
379 let viewport = view_to_columns_string_req.viewport.unwrap();
380 let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
381 let config = self.view_configs.get(&msg.entity_id).unwrap();
382 let mut cols = self
383 .handler
384 .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
385 .await?;
386
387 let json_string = cols
388 .render_to_columns_json()
389 .map_err(|e| VirtualServerError::Other(e.to_string()))?;
390
391 respond!(msg, ViewToColumnsStringResp { json_string })
392 },
393 ViewDeleteReq(_) => {
394 self.handler.view_delete(msg.entity_id.as_str()).await?;
395 self.view_to_table.shift_remove(&msg.entity_id);
396 self.view_configs.shift_remove(&msg.entity_id);
397 respond!(msg, ViewDeleteResp {})
398 },
399 MakeTableReq(req) => {
400 self.handler
401 .make_table(&msg.entity_id, req.data.as_ref().unwrap())
402 .await?;
403 respond!(msg, MakeTableResp {})
404 },
405 ViewGetMinMaxReq(req) => {
406 let config = self.view_configs.get(&msg.entity_id).unwrap();
407 let (min, max) = self
408 .handler
409 .view_get_min_max(&msg.entity_id, &req.column_name, config)
410 .await?;
411 respond!(msg, ViewGetMinMaxResp {
412 min: Some(min.into()),
413 max: Some(max.into()),
414 })
415 },
416
417 TableOnDeleteReq(_) => {
419 respond!(msg, TableOnDeleteResp {})
420 },
421 ViewOnUpdateReq(_) => {
422 respond!(msg, ViewOnUpdateResp {
423 delta: None,
424 port_id: 0
425 })
426 },
427 ViewOnDeleteReq(_) => {
428 respond!(msg, ViewOnDeleteResp {})
429 },
430 ViewRemoveOnUpdateReq(_) => {
431 respond!(msg, ViewRemoveOnUpdateResp {})
432 },
433 TableRemoveDeleteReq(_) => {
434 respond!(msg, TableRemoveDeleteResp {})
435 },
436 ViewRemoveDeleteReq(_) => {
437 respond!(msg, ViewRemoveDeleteResp {})
438 },
439 x => {
440 return Err(VirtualServerError::Other(format!(
442 "Unhandled request: {:?}",
443 x
444 )));
445 },
446 };
447
448 Ok(resp)
449 }
450}