perspective_client/virtual_server/
server.rs1use std::collections::HashMap;
14
15use indexmap::IndexMap;
16use prost::Message as ProstMessage;
17use prost::bytes::{Bytes, BytesMut};
18
19use super::error::VirtualServerError;
20use super::handler::VirtualServerHandler;
21use crate::config::{ViewConfig, ViewConfigUpdate};
22use crate::proto::response::ClientResp;
23use crate::proto::table_validate_expr_resp::ExprValidationError;
24use crate::proto::{
25 ColumnType, GetFeaturesResp, GetHostedTablesResp, MakeTableResp, Request, Response,
26 ServerError, TableMakePortResp, TableMakeViewResp, TableOnDeleteResp, TableRemoveDeleteResp,
27 TableSchemaResp, TableSizeResp, TableValidateExprResp, ViewColumnPathsResp, ViewDeleteResp,
28 ViewDimensionsResp, ViewExpressionSchemaResp, ViewGetConfigResp, ViewOnDeleteResp,
29 ViewOnUpdateResp, ViewRemoveDeleteResp, ViewRemoveOnUpdateResp, ViewSchemaResp,
30 ViewToColumnsStringResp, ViewToRowsStringResp,
31};
32
33macro_rules! respond {
34 ($msg:ident, $name:ident { $($rest:tt)* }) => {{
35 let mut resp = BytesMut::new();
36 let resp2 = ClientResp::$name($name {
37 $($rest)*
38 });
39
40 Response {
41 msg_id: $msg.msg_id,
42 entity_id: $msg.entity_id,
43 client_resp: Some(resp2),
44 }.encode(&mut resp).map_err(VirtualServerError::EncodeError)?;
45
46 resp.freeze()
47 }};
48}
49
50pub struct VirtualServer<T: VirtualServerHandler> {
56 handler: T,
57 view_to_table: IndexMap<String, String>,
58 view_configs: IndexMap<String, ViewConfig>,
59 view_schemas: IndexMap<String, IndexMap<String, ColumnType>>,
60}
61
62impl<T: VirtualServerHandler> VirtualServer<T> {
63 pub fn new(handler: T) -> Self {
65 Self {
66 handler,
67 view_configs: IndexMap::default(),
68 view_to_table: IndexMap::default(),
69 view_schemas: IndexMap::default(),
70 }
71 }
72
73 pub async fn handle_request(
78 &mut self,
79 bytes: Bytes,
80 ) -> Result<Bytes, VirtualServerError<T::Error>> {
81 let msg = Request::decode(bytes).map_err(VirtualServerError::DecodeError)?;
82 tracing::debug!(
83 "Handling request: entity_id={}, req={:?}",
84 msg.entity_id,
85 msg.client_req
86 );
87
88 match self.internal_handle_request(msg.clone()).await {
89 Ok(resp) => Ok(resp),
90 Err(err) => {
91 tracing::error!("{}", err);
92 Ok(respond!(msg, ServerError {
93 message: err.to_string(),
94 status_code: 1
95 }))
96 },
97 }
98 }
99
100 async fn get_cached_view_schema(
101 &mut self,
102 entity_id: &str,
103 to_psp_format: bool,
104 ) -> Result<IndexMap<String, ColumnType>, VirtualServerError<T::Error>> {
105 if !self.view_schemas.contains_key(entity_id) {
106 self.view_schemas.insert(
107 entity_id.to_string(),
108 self.handler
109 .view_schema(entity_id, self.view_configs.get(entity_id).unwrap())
110 .await?,
111 );
112 }
113
114 if to_psp_format {
115 Ok(self
116 .view_schemas
117 .get(entity_id)
118 .unwrap()
119 .iter()
120 .map(|(k, v)| {
121 (
122 k.split("_").collect::<Vec<_>>().last().unwrap().to_string(),
123 *v,
124 )
125 })
126 .collect())
127 } else {
128 Ok(self.view_schemas.get(entity_id).cloned().unwrap())
129 }
130 }
131
132 async fn internal_handle_request(
133 &mut self,
134 msg: Request,
135 ) -> Result<Bytes, VirtualServerError<T::Error>> {
136 use crate::proto::request::ClientReq::*;
137 let resp = match msg.client_req.unwrap() {
138 GetFeaturesReq(_) => {
139 let features = self.handler.get_features().await?;
140 respond!(msg, GetFeaturesResp { ..features.into() })
141 },
142 GetHostedTablesReq(_) => {
143 respond!(msg, GetHostedTablesResp {
144 table_infos: self.handler.get_hosted_tables().await?
145 })
146 },
147 TableSchemaReq(_) => {
148 respond!(msg, TableSchemaResp {
149 schema: Some(crate::proto::Schema {
150 schema: self
151 .handler
152 .table_schema(msg.entity_id.as_str())
153 .await?
154 .iter()
155 .map(|x| crate::proto::schema::KeyTypePair {
156 name: x.0.to_string(),
157 r#type: *x.1 as i32,
158 })
159 .collect()
160 })
161 })
162 },
163 TableMakePortReq(req) => {
164 respond!(msg, TableMakePortResp {
165 port_id: self.handler.table_make_port(&req).await?
166 })
167 },
168 TableMakeViewReq(req) => {
169 self.view_to_table
170 .insert(req.view_id.clone(), msg.entity_id.clone());
171
172 let mut config: ViewConfigUpdate = req.config.clone().unwrap_or_default().into();
173 let bytes = respond!(msg, TableMakeViewResp {
174 view_id: self
175 .handler
176 .table_make_view(msg.entity_id.as_str(), req.view_id.as_str(), &mut config)
177 .await?
178 });
179
180 self.view_configs.insert(req.view_id.clone(), config.into());
181 bytes
182 },
183 TableSizeReq(_) => {
184 respond!(msg, TableSizeResp {
185 size: self.handler.table_size(msg.entity_id.as_str()).await?
186 })
187 },
188 TableValidateExprReq(req) => {
189 let mut expression_schema = HashMap::<String, i32>::default();
190 let mut expression_alias = HashMap::<String, String>::default();
191 let mut errors = HashMap::<String, ExprValidationError>::default();
192 for (name, ex) in req.column_to_expr.iter() {
193 let _ = expression_alias.insert(name.clone(), ex.clone());
194 match self
195 .handler
196 .table_validate_expression(&msg.entity_id, ex.as_str())
197 .await
198 {
199 Ok(dtype) => {
200 let _ = expression_schema.insert(name.clone(), dtype as i32);
201 },
202 Err(e) => {
203 let _ = errors.insert(name.clone(), ExprValidationError {
204 error_message: format!("{}", e),
205 line: 0,
206 column: 0,
207 });
208 },
209 }
210 }
211
212 respond!(msg, TableValidateExprResp {
213 expression_schema,
214 errors,
215 expression_alias,
216 })
217 },
218 ViewSchemaReq(_) => {
219 respond!(msg, ViewSchemaResp {
220 schema: self
221 .get_cached_view_schema(&msg.entity_id, true)
222 .await?
223 .into_iter()
224 .map(|(x, y)| (x.to_string(), y as i32))
225 .collect()
226 })
227 },
228 ViewDimensionsReq(_) => {
229 let view_id = &msg.entity_id;
230 let table_id = self
231 .view_to_table
232 .get(view_id)
233 .ok_or_else(|| VirtualServerError::UnknownViewId(view_id.to_string()))?;
234
235 let num_table_rows = self.handler.table_size(table_id).await?;
236 let num_table_columns = self.handler.table_column_size(table_id).await? as u32;
237 let config = self.view_configs.get(view_id).unwrap();
238 let num_view_columns = self.handler.view_column_size(view_id, config).await? as u32;
239 let num_view_rows = self.handler.view_size(view_id).await?;
240 let resp = ViewDimensionsResp {
241 num_table_columns,
242 num_table_rows,
243 num_view_columns,
244 num_view_rows,
245 };
246
247 respond!(msg, ViewDimensionsResp { ..resp })
248 },
249 ViewGetConfigReq(_) => {
250 respond!(msg, ViewGetConfigResp {
251 config: Some(
252 ViewConfigUpdate::from(
253 self.view_configs.get(&msg.entity_id).unwrap().clone()
254 )
255 .into()
256 )
257 })
258 },
259 ViewExpressionSchemaReq(_) => {
260 let mut schema = HashMap::<String, i32>::default();
261 let table_id = self.view_to_table.get(&msg.entity_id);
262 for (name, ex) in self
263 .view_configs
264 .get(&msg.entity_id)
265 .unwrap()
266 .expressions
267 .iter()
268 {
269 match self
270 .handler
271 .table_validate_expression(table_id.unwrap(), ex.as_str())
272 .await
273 {
274 Ok(dtype) => {
275 let _ = schema.insert(name.clone(), dtype as i32);
276 },
277 Err(_e) => {
278 },
280 }
281 }
282
283 let resp = ViewExpressionSchemaResp { schema };
284 respond!(msg, ViewExpressionSchemaResp { ..resp })
285 },
286 ViewColumnPathsReq(_) => {
287 respond!(msg, ViewColumnPathsResp {
288 paths: self
289 .handler
290 .view_schema(
291 msg.entity_id.as_str(),
292 self.view_configs.get(&msg.entity_id).unwrap()
293 )
294 .await?
295 .keys()
296 .cloned()
297 .collect()
298 })
299 },
300 ViewToRowsStringReq(view_to_rows_string_req) => {
301 let viewport = view_to_rows_string_req.viewport.unwrap();
302 let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
303 let config = self.view_configs.get(&msg.entity_id).unwrap();
304 let cols = self
305 .handler
306 .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
307 .await?;
308
309 let rows = cols.to_rows();
310 let json_string = serde_json::to_string(&rows)
311 .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
312
313 respond!(msg, ViewToRowsStringResp { json_string })
314 },
315 ViewToColumnsStringReq(view_to_columns_string_req) => {
316 let viewport = view_to_columns_string_req.viewport.unwrap();
317 let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
318 let config = self.view_configs.get(&msg.entity_id).unwrap();
319 let cols = self
320 .handler
321 .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
322 .await?;
323
324 let json_string = serde_json::to_string(&cols)
325 .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
326
327 respond!(msg, ViewToColumnsStringResp { json_string })
328 },
329 ViewDeleteReq(_) => {
330 self.handler.view_delete(msg.entity_id.as_str()).await?;
331 self.view_to_table.shift_remove(&msg.entity_id);
332 self.view_configs.shift_remove(&msg.entity_id);
333 respond!(msg, ViewDeleteResp {})
334 },
335 MakeTableReq(req) => {
336 self.handler
337 .make_table(&msg.entity_id, req.data.as_ref().unwrap())
338 .await?;
339 respond!(msg, MakeTableResp {})
340 },
341
342 TableOnDeleteReq(_) => {
344 respond!(msg, TableOnDeleteResp {})
345 },
346 ViewOnUpdateReq(_) => {
347 respond!(msg, ViewOnUpdateResp {
348 delta: None,
349 port_id: 0
350 })
351 },
352 ViewOnDeleteReq(_) => {
353 respond!(msg, ViewOnDeleteResp {})
354 },
355 ViewRemoveOnUpdateReq(_) => {
356 respond!(msg, ViewRemoveOnUpdateResp {})
357 },
358 TableRemoveDeleteReq(_) => {
359 respond!(msg, TableRemoveDeleteResp {})
360 },
361 ViewRemoveDeleteReq(_) => {
362 respond!(msg, ViewRemoveDeleteResp {})
363 },
364
365 x => {
366 return Err(VirtualServerError::Other(format!(
368 "Unhandled request: {:?}",
369 x
370 )));
371 },
372 };
373
374 Ok(resp)
375 }
376}