perspective_client/virtual_server/
server.rs1use std::collections::HashMap;
14
15use indexmap::IndexMap;
16use prost::Message as ProstMessage;
17use prost::bytes::{Bytes, BytesMut};
18
19use super::error::VirtualServerError;
20use super::handler::VirtualServerHandler;
21use crate::config::{ViewConfig, ViewConfigUpdate};
22use crate::proto::response::ClientResp;
23use crate::proto::table_validate_expr_resp::ExprValidationError;
24use crate::proto::{
25 GetFeaturesResp, GetHostedTablesResp, MakeTableResp, Request, Response, ServerError,
26 TableMakePortResp, TableMakeViewResp, TableOnDeleteResp, TableRemoveDeleteResp,
27 TableSchemaResp, TableSizeResp, TableValidateExprResp, ViewColumnPathsResp, ViewDeleteResp,
28 ViewDimensionsResp, ViewExpressionSchemaResp, ViewGetConfigResp, ViewOnDeleteResp,
29 ViewOnUpdateResp, ViewRemoveDeleteResp, ViewRemoveOnUpdateResp, ViewSchemaResp,
30 ViewToColumnsStringResp, ViewToRowsStringResp,
31};
32
33macro_rules! respond {
34 ($msg:ident, $name:ident { $($rest:tt)* }) => {{
35 let mut resp = BytesMut::new();
36 let resp2 = ClientResp::$name($name {
37 $($rest)*
38 });
39
40 Response {
41 msg_id: $msg.msg_id,
42 entity_id: $msg.entity_id,
43 client_resp: Some(resp2),
44 }.encode(&mut resp).map_err(VirtualServerError::EncodeError)?;
45
46 resp.freeze()
47 }};
48}
49
50pub struct VirtualServer<T: VirtualServerHandler> {
56 handler: T,
57 view_to_table: IndexMap<String, String>,
58 view_configs: IndexMap<String, ViewConfig>,
59}
60
61impl<T: VirtualServerHandler> VirtualServer<T> {
62 pub fn new(handler: T) -> Self {
64 Self {
65 handler,
66 view_configs: IndexMap::default(),
67 view_to_table: IndexMap::default(),
68 }
69 }
70
71 pub async fn handle_request(
76 &mut self,
77 bytes: Bytes,
78 ) -> Result<Bytes, VirtualServerError<T::Error>> {
79 let msg = Request::decode(bytes).map_err(VirtualServerError::DecodeError)?;
80 tracing::debug!(
81 "Handling request: entity_id={}, req={:?}",
82 msg.entity_id,
83 msg.client_req
84 );
85
86 match self.internal_handle_request(msg.clone()).await {
87 Ok(resp) => Ok(resp),
88 Err(err) => Ok(respond!(msg, ServerError {
89 message: err.to_string(),
90 status_code: 1
91 })),
92 }
93 }
94
95 async fn internal_handle_request(
96 &mut self,
97 msg: Request,
98 ) -> Result<Bytes, VirtualServerError<T::Error>> {
99 use crate::proto::request::ClientReq::*;
100 let resp = match msg.client_req.unwrap() {
101 GetFeaturesReq(_) => {
102 let features = self.handler.get_features().await?;
103 respond!(msg, GetFeaturesResp { ..features.into() })
104 },
105 GetHostedTablesReq(_) => {
106 respond!(msg, GetHostedTablesResp {
107 table_infos: self.handler.get_hosted_tables().await?
108 })
109 },
110 TableSchemaReq(_) => {
111 respond!(msg, TableSchemaResp {
112 schema: self
113 .handler
114 .table_schema(msg.entity_id.as_str())
115 .await
116 .ok()
117 .map(|value| crate::proto::Schema {
118 schema: value
119 .iter()
120 .map(|x| crate::proto::schema::KeyTypePair {
121 name: x.0.to_string(),
122 r#type: *x.1 as i32,
123 })
124 .collect(),
125 })
126 })
127 },
128 TableMakePortReq(req) => {
129 respond!(msg, TableMakePortResp {
130 port_id: self.handler.table_make_port(&req).await?
131 })
132 },
133 TableMakeViewReq(req) => {
134 self.view_to_table
135 .insert(req.view_id.clone(), msg.entity_id.clone());
136
137 let mut config: ViewConfigUpdate = req.config.clone().unwrap_or_default().into();
138 let bytes = respond!(msg, TableMakeViewResp {
139 view_id: self
140 .handler
141 .table_make_view(msg.entity_id.as_str(), req.view_id.as_str(), &mut config)
142 .await?
143 });
144
145 self.view_configs.insert(req.view_id.clone(), config.into());
146 bytes
147 },
148 TableSizeReq(_) => {
149 respond!(msg, TableSizeResp {
150 size: self.handler.table_size(msg.entity_id.as_str()).await?
151 })
152 },
153 TableValidateExprReq(req) => {
154 let mut expression_schema = HashMap::<String, i32>::default();
155 let mut expression_alias = HashMap::<String, String>::default();
156 let mut errors = HashMap::<String, ExprValidationError>::default();
157 for (name, ex) in req.column_to_expr.iter() {
158 let _ = expression_alias.insert(name.clone(), ex.clone());
159 match self
160 .handler
161 .table_validate_expression(&msg.entity_id, ex.as_str())
162 .await
163 {
164 Ok(dtype) => {
165 let _ = expression_schema.insert(name.clone(), dtype as i32);
166 },
167 Err(e) => {
168 let _ = errors.insert(name.clone(), ExprValidationError {
169 error_message: format!("{}", e),
170 line: 0,
171 column: 0,
172 });
173 },
174 }
175 }
176
177 respond!(msg, TableValidateExprResp {
178 expression_schema,
179 errors,
180 expression_alias,
181 })
182 },
183 ViewSchemaReq(_) => {
184 respond!(msg, ViewSchemaResp {
185 schema: self
186 .handler
187 .view_schema(
188 msg.entity_id.as_str(),
189 self.view_configs.get(&msg.entity_id).unwrap()
190 )
191 .await?
192 .into_iter()
193 .map(|(x, y)| (x, y as i32))
194 .collect()
195 })
196 },
197 ViewDimensionsReq(_) => {
198 let view_id = &msg.entity_id;
199 let table_id = self
200 .view_to_table
201 .get(view_id)
202 .ok_or_else(|| VirtualServerError::UnknownViewId(view_id.to_string()))?;
203
204 let num_table_rows = self.handler.table_size(table_id).await?;
205 let num_table_columns = self.handler.table_column_size(table_id).await? as u32;
206 let config = self.view_configs.get(view_id).unwrap();
207 let num_view_columns = self.handler.view_column_size(view_id, config).await? as u32;
208 let num_view_rows = self.handler.view_size(view_id).await?;
209 let resp = ViewDimensionsResp {
210 num_table_columns,
211 num_table_rows,
212 num_view_columns,
213 num_view_rows,
214 };
215
216 respond!(msg, ViewDimensionsResp { ..resp })
217 },
218 ViewGetConfigReq(_) => {
219 respond!(msg, ViewGetConfigResp {
220 config: Some(
221 ViewConfigUpdate::from(
222 self.view_configs.get(&msg.entity_id).unwrap().clone()
223 )
224 .into()
225 )
226 })
227 },
228 ViewExpressionSchemaReq(_) => {
229 let mut schema = HashMap::<String, i32>::default();
230 let table_id = self.view_to_table.get(&msg.entity_id);
231 for (name, ex) in self
232 .view_configs
233 .get(&msg.entity_id)
234 .unwrap()
235 .expressions
236 .iter()
237 {
238 match self
239 .handler
240 .table_validate_expression(table_id.unwrap(), ex.as_str())
241 .await
242 {
243 Ok(dtype) => {
244 let _ = schema.insert(name.clone(), dtype as i32);
245 },
246 Err(_e) => {
247 },
249 }
250 }
251
252 let resp = ViewExpressionSchemaResp { schema };
253 respond!(msg, ViewExpressionSchemaResp { ..resp })
254 },
255 ViewColumnPathsReq(_) => {
256 respond!(msg, ViewColumnPathsResp {
257 paths: self
258 .handler
259 .view_schema(
260 msg.entity_id.as_str(),
261 self.view_configs.get(&msg.entity_id).unwrap()
262 )
263 .await?
264 .keys()
265 .cloned()
266 .collect()
267 })
268 },
269 ViewToRowsStringReq(view_to_rows_string_req) => {
270 let viewport = view_to_rows_string_req.viewport.unwrap();
271 let config = self.view_configs.get(&msg.entity_id).unwrap();
272 let cols = self
273 .handler
274 .view_get_data(msg.entity_id.as_str(), config, &viewport)
275 .await?;
276
277 let rows = cols.to_rows();
278 let json_string = serde_json::to_string(&rows)
279 .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
280
281 respond!(msg, ViewToRowsStringResp { json_string })
282 },
283 ViewToColumnsStringReq(view_to_columns_string_req) => {
284 let viewport = view_to_columns_string_req.viewport.unwrap();
285 let config = self.view_configs.get(&msg.entity_id).unwrap();
286 let cols = self
287 .handler
288 .view_get_data(msg.entity_id.as_str(), config, &viewport)
289 .await?;
290
291 let json_string = serde_json::to_string(&cols)
292 .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
293
294 respond!(msg, ViewToColumnsStringResp { json_string })
295 },
296 ViewDeleteReq(_) => {
297 self.handler.view_delete(msg.entity_id.as_str()).await?;
298 self.view_to_table.shift_remove(&msg.entity_id);
299 self.view_configs.shift_remove(&msg.entity_id);
300 respond!(msg, ViewDeleteResp {})
301 },
302 MakeTableReq(req) => {
303 self.handler
304 .make_table(&msg.entity_id, req.data.as_ref().unwrap())
305 .await?;
306 respond!(msg, MakeTableResp {})
307 },
308
309 TableOnDeleteReq(_) => {
311 respond!(msg, TableOnDeleteResp {})
312 },
313 ViewOnUpdateReq(_) => {
314 respond!(msg, ViewOnUpdateResp {
315 delta: None,
316 port_id: 0
317 })
318 },
319 ViewOnDeleteReq(_) => {
320 respond!(msg, ViewOnDeleteResp {})
321 },
322 ViewRemoveOnUpdateReq(_) => {
323 respond!(msg, ViewRemoveOnUpdateResp {})
324 },
325 TableRemoveDeleteReq(_) => {
326 respond!(msg, TableRemoveDeleteResp {})
327 },
328 ViewRemoveDeleteReq(_) => {
329 respond!(msg, ViewRemoveDeleteResp {})
330 },
331
332 x => {
333 return Err(VirtualServerError::Other(format!(
335 "Unhandled request: {:?}",
336 x
337 )));
338 },
339 };
340
341 Ok(resp)
342 }
343}