perspective_client/virtual_server/
server.rs1use std::collections::HashMap;
14
15use indexmap::IndexMap;
16use prost::Message as ProstMessage;
17use prost::bytes::{Bytes, BytesMut};
18
19use super::error::VirtualServerError;
20use super::handler::VirtualServerHandler;
21use crate::config::{ViewConfig, ViewConfigUpdate};
22use crate::proto::response::ClientResp;
23use crate::proto::table_validate_expr_resp::ExprValidationError;
24use crate::proto::{
25 GetFeaturesResp, GetHostedTablesResp, MakeTableResp, Request, Response, TableMakePortResp,
26 TableMakeViewResp, TableOnDeleteResp, TableRemoveDeleteResp, TableSchemaResp, TableSizeResp,
27 TableValidateExprResp, ViewColumnPathsResp, ViewDeleteResp, ViewDimensionsResp,
28 ViewExpressionSchemaResp, ViewGetConfigResp, ViewOnDeleteResp, ViewOnUpdateResp,
29 ViewRemoveDeleteResp, ViewRemoveOnUpdateResp, ViewSchemaResp, ViewToColumnsStringResp,
30 ViewToRowsStringResp,
31};
32
33macro_rules! respond {
34 ($msg:ident, $name:ident { $($rest:tt)* }) => {{
35 let mut resp = BytesMut::new();
36 let resp2 = ClientResp::$name($name {
37 $($rest)*
38 });
39
40 Response {
41 msg_id: $msg.msg_id,
42 entity_id: $msg.entity_id,
43 client_resp: Some(resp2),
44 }.encode(&mut resp).map_err(VirtualServerError::EncodeError)?;
45
46 resp.freeze()
47 }};
48}
49
50pub struct VirtualServer<T: VirtualServerHandler> {
56 handler: T,
57 view_to_table: IndexMap<String, String>,
58 view_configs: IndexMap<String, ViewConfig>,
59}
60
61impl<T: VirtualServerHandler> VirtualServer<T> {
62 pub fn new(handler: T) -> Self {
64 Self {
65 handler,
66 view_configs: IndexMap::default(),
67 view_to_table: IndexMap::default(),
68 }
69 }
70
71 pub async fn handle_request(
76 &mut self,
77 bytes: Bytes,
78 ) -> Result<Bytes, VirtualServerError<T::Error>> {
79 use crate::proto::request::ClientReq::*;
80 let msg = Request::decode(bytes).map_err(VirtualServerError::DecodeError)?;
81 tracing::debug!(
82 "Handling request: entity_id={}, req={:?}",
83 msg.entity_id,
84 msg.client_req
85 );
86
87 let resp = match msg.client_req.unwrap() {
88 GetFeaturesReq(_) => {
89 let features = self.handler.get_features().await?;
90 respond!(msg, GetFeaturesResp { ..features.into() })
91 },
92 GetHostedTablesReq(_) => {
93 respond!(msg, GetHostedTablesResp {
94 table_infos: self.handler.get_hosted_tables().await?
95 })
96 },
97 TableSchemaReq(_) => {
98 respond!(msg, TableSchemaResp {
99 schema: self
100 .handler
101 .table_schema(msg.entity_id.as_str())
102 .await
103 .ok()
104 .map(|value| crate::proto::Schema {
105 schema: value
106 .iter()
107 .map(|x| crate::proto::schema::KeyTypePair {
108 name: x.0.to_string(),
109 r#type: *x.1 as i32,
110 })
111 .collect(),
112 })
113 })
114 },
115 TableMakePortReq(req) => {
116 respond!(msg, TableMakePortResp {
117 port_id: self.handler.table_make_port(&req).await?
118 })
119 },
120 TableMakeViewReq(req) => {
121 self.view_to_table
122 .insert(req.view_id.clone(), msg.entity_id.clone());
123
124 let mut config: ViewConfigUpdate = req.config.clone().unwrap_or_default().into();
125 let bytes = respond!(msg, TableMakeViewResp {
126 view_id: self
127 .handler
128 .table_make_view(msg.entity_id.as_str(), req.view_id.as_str(), &mut config)
129 .await?
130 });
131
132 self.view_configs.insert(req.view_id.clone(), config.into());
133 bytes
134 },
135 TableSizeReq(_) => {
136 respond!(msg, TableSizeResp {
137 size: self.handler.table_size(msg.entity_id.as_str()).await?
138 })
139 },
140 TableValidateExprReq(req) => {
141 let mut expression_schema = HashMap::<String, i32>::default();
142 let mut expression_alias = HashMap::<String, String>::default();
143 let mut errors = HashMap::<String, ExprValidationError>::default();
144 for (name, ex) in req.column_to_expr.iter() {
145 let _ = expression_alias.insert(name.clone(), ex.clone());
146 match self
147 .handler
148 .table_validate_expression(&msg.entity_id, ex.as_str())
149 .await
150 {
151 Ok(dtype) => {
152 let _ = expression_schema.insert(name.clone(), dtype as i32);
153 },
154 Err(e) => {
155 let _ = errors.insert(name.clone(), ExprValidationError {
156 error_message: format!("{}", e),
157 line: 0,
158 column: 0,
159 });
160 },
161 }
162 }
163
164 respond!(msg, TableValidateExprResp {
165 expression_schema,
166 errors,
167 expression_alias,
168 })
169 },
170 ViewSchemaReq(_) => {
171 respond!(msg, ViewSchemaResp {
172 schema: self
173 .handler
174 .view_schema(
175 msg.entity_id.as_str(),
176 self.view_configs.get(&msg.entity_id).unwrap()
177 )
178 .await?
179 .into_iter()
180 .map(|(x, y)| (x, y as i32))
181 .collect()
182 })
183 },
184 ViewDimensionsReq(_) => {
185 let view_id = &msg.entity_id;
186 let table_id = self
187 .view_to_table
188 .get(view_id)
189 .ok_or_else(|| VirtualServerError::UnknownViewId(view_id.to_string()))?;
190
191 let num_table_rows = self.handler.table_size(table_id).await?;
192 let num_table_columns = self.handler.table_column_size(table_id).await? as u32;
193 let config = self.view_configs.get(view_id).unwrap();
194 let num_view_columns = self.handler.view_column_size(view_id, config).await? as u32;
195 let num_view_rows = self.handler.view_size(view_id).await?;
196 let resp = ViewDimensionsResp {
197 num_table_columns,
198 num_table_rows,
199 num_view_columns,
200 num_view_rows,
201 };
202
203 respond!(msg, ViewDimensionsResp { ..resp })
204 },
205 ViewGetConfigReq(_) => {
206 respond!(msg, ViewGetConfigResp {
207 config: Some(
208 ViewConfigUpdate::from(
209 self.view_configs.get(&msg.entity_id).unwrap().clone()
210 )
211 .into()
212 )
213 })
214 },
215 ViewExpressionSchemaReq(_) => {
216 let mut schema = HashMap::<String, i32>::default();
217 let table_id = self.view_to_table.get(&msg.entity_id);
218 for (name, ex) in self
219 .view_configs
220 .get(&msg.entity_id)
221 .unwrap()
222 .expressions
223 .iter()
224 {
225 match self
226 .handler
227 .table_validate_expression(table_id.unwrap(), ex.as_str())
228 .await
229 {
230 Ok(dtype) => {
231 let _ = schema.insert(name.clone(), dtype as i32);
232 },
233 Err(_e) => {
234 },
236 }
237 }
238
239 let resp = ViewExpressionSchemaResp { schema };
240 respond!(msg, ViewExpressionSchemaResp { ..resp })
241 },
242 ViewColumnPathsReq(_) => {
243 respond!(msg, ViewColumnPathsResp {
244 paths: self
245 .handler
246 .view_schema(
247 msg.entity_id.as_str(),
248 self.view_configs.get(&msg.entity_id).unwrap()
249 )
250 .await?
251 .keys()
252 .cloned()
253 .collect()
254 })
255 },
256 ViewToRowsStringReq(view_to_rows_string_req) => {
257 let viewport = view_to_rows_string_req.viewport.unwrap();
258 let config = self.view_configs.get(&msg.entity_id).unwrap();
259 let cols = self
260 .handler
261 .view_get_data(msg.entity_id.as_str(), config, &viewport)
262 .await?;
263
264 let rows = cols.to_rows();
265 let json_string = serde_json::to_string(&rows)
266 .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
267
268 respond!(msg, ViewToRowsStringResp { json_string })
269 },
270 ViewToColumnsStringReq(view_to_columns_string_req) => {
271 let viewport = view_to_columns_string_req.viewport.unwrap();
272 let config = self.view_configs.get(&msg.entity_id).unwrap();
273 let cols = self
274 .handler
275 .view_get_data(msg.entity_id.as_str(), config, &viewport)
276 .await?;
277
278 let json_string = serde_json::to_string(&cols)
279 .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
280
281 respond!(msg, ViewToColumnsStringResp { json_string })
282 },
283 ViewDeleteReq(_) => {
284 self.handler.view_delete(msg.entity_id.as_str()).await?;
285 self.view_to_table.shift_remove(&msg.entity_id);
286 self.view_configs.shift_remove(&msg.entity_id);
287 respond!(msg, ViewDeleteResp {})
288 },
289 MakeTableReq(req) => {
290 self.handler
291 .make_table(&msg.entity_id, req.data.as_ref().unwrap())
292 .await?;
293 respond!(msg, MakeTableResp {})
294 },
295
296 TableOnDeleteReq(_) => {
298 respond!(msg, TableOnDeleteResp {})
299 },
300 ViewOnUpdateReq(_) => {
301 respond!(msg, ViewOnUpdateResp {
302 delta: None,
303 port_id: 0
304 })
305 },
306 ViewOnDeleteReq(_) => {
307 respond!(msg, ViewOnDeleteResp {})
308 },
309 ViewRemoveOnUpdateReq(_) => {
310 respond!(msg, ViewRemoveOnUpdateResp {})
311 },
312 TableRemoveDeleteReq(_) => {
313 respond!(msg, TableRemoveDeleteResp {})
314 },
315 ViewRemoveDeleteReq(_) => {
316 respond!(msg, ViewRemoveDeleteResp {})
317 },
318
319 x => {
320 return Err(VirtualServerError::Other(format!(
322 "Unhandled request: {:?}",
323 x
324 )));
325 },
326 };
327
328 Ok(resp)
329 }
330}