Skip to main content

perspective_client/virtual_server/
server.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14
15use indexmap::IndexMap;
16use prost::Message as ProstMessage;
17use prost::bytes::{Bytes, BytesMut};
18
19use super::error::VirtualServerError;
20use super::handler::VirtualServerHandler;
21use crate::config::{ViewConfig, ViewConfigUpdate};
22use crate::proto::response::ClientResp;
23use crate::proto::table_validate_expr_resp::ExprValidationError;
24use crate::proto::{
25    GetFeaturesResp, GetHostedTablesResp, MakeTableResp, Request, Response, TableMakePortResp,
26    TableMakeViewResp, TableOnDeleteResp, TableRemoveDeleteResp, TableSchemaResp, TableSizeResp,
27    TableValidateExprResp, ViewColumnPathsResp, ViewDeleteResp, ViewDimensionsResp,
28    ViewExpressionSchemaResp, ViewGetConfigResp, ViewOnDeleteResp, ViewOnUpdateResp,
29    ViewRemoveDeleteResp, ViewRemoveOnUpdateResp, ViewSchemaResp, ViewToColumnsStringResp,
30    ViewToRowsStringResp,
31};
32
33macro_rules! respond {
34    ($msg:ident, $name:ident { $($rest:tt)* }) => {{
35        let mut resp = BytesMut::new();
36        let resp2 = ClientResp::$name($name {
37            $($rest)*
38        });
39
40        Response {
41            msg_id: $msg.msg_id,
42            entity_id: $msg.entity_id,
43            client_resp: Some(resp2),
44        }.encode(&mut resp).map_err(VirtualServerError::EncodeError)?;
45
46        resp.freeze()
47    }};
48}
49
50/// A virtual server that processes Perspective protocol messages.
51///
52/// `VirtualServer` acts as a bridge between the Perspective protocol and a
53/// custom data backend. It handles protocol decoding/encoding and delegates
54/// actual data operations to the provided [`VirtualServerHandler`].
55pub struct VirtualServer<T: VirtualServerHandler> {
56    handler: T,
57    view_to_table: IndexMap<String, String>,
58    view_configs: IndexMap<String, ViewConfig>,
59}
60
61impl<T: VirtualServerHandler> VirtualServer<T> {
62    /// Creates a new virtual server with the given handler.
63    pub fn new(handler: T) -> Self {
64        Self {
65            handler,
66            view_configs: IndexMap::default(),
67            view_to_table: IndexMap::default(),
68        }
69    }
70
71    /// Processes a Perspective protocol request and returns the response.
72    ///
73    /// Decodes the incoming protobuf message, dispatches to the appropriate
74    /// handler method, and encodes the response.
75    pub async fn handle_request(
76        &mut self,
77        bytes: Bytes,
78    ) -> Result<Bytes, VirtualServerError<T::Error>> {
79        use crate::proto::request::ClientReq::*;
80        let msg = Request::decode(bytes).map_err(VirtualServerError::DecodeError)?;
81        tracing::debug!(
82            "Handling request: entity_id={}, req={:?}",
83            msg.entity_id,
84            msg.client_req
85        );
86
87        let resp = match msg.client_req.unwrap() {
88            GetFeaturesReq(_) => {
89                let features = self.handler.get_features().await?;
90                respond!(msg, GetFeaturesResp { ..features.into() })
91            },
92            GetHostedTablesReq(_) => {
93                respond!(msg, GetHostedTablesResp {
94                    table_infos: self.handler.get_hosted_tables().await?
95                })
96            },
97            TableSchemaReq(_) => {
98                respond!(msg, TableSchemaResp {
99                    schema: self
100                        .handler
101                        .table_schema(msg.entity_id.as_str())
102                        .await
103                        .ok()
104                        .map(|value| crate::proto::Schema {
105                            schema: value
106                                .iter()
107                                .map(|x| crate::proto::schema::KeyTypePair {
108                                    name: x.0.to_string(),
109                                    r#type: *x.1 as i32,
110                                })
111                                .collect(),
112                        })
113                })
114            },
115            TableMakePortReq(req) => {
116                respond!(msg, TableMakePortResp {
117                    port_id: self.handler.table_make_port(&req).await?
118                })
119            },
120            TableMakeViewReq(req) => {
121                self.view_to_table
122                    .insert(req.view_id.clone(), msg.entity_id.clone());
123
124                let mut config: ViewConfigUpdate = req.config.clone().unwrap_or_default().into();
125                let bytes = respond!(msg, TableMakeViewResp {
126                    view_id: self
127                        .handler
128                        .table_make_view(msg.entity_id.as_str(), req.view_id.as_str(), &mut config)
129                        .await?
130                });
131
132                self.view_configs.insert(req.view_id.clone(), config.into());
133                bytes
134            },
135            TableSizeReq(_) => {
136                respond!(msg, TableSizeResp {
137                    size: self.handler.table_size(msg.entity_id.as_str()).await?
138                })
139            },
140            TableValidateExprReq(req) => {
141                let mut expression_schema = HashMap::<String, i32>::default();
142                let mut expression_alias = HashMap::<String, String>::default();
143                let mut errors = HashMap::<String, ExprValidationError>::default();
144                for (name, ex) in req.column_to_expr.iter() {
145                    let _ = expression_alias.insert(name.clone(), ex.clone());
146                    match self
147                        .handler
148                        .table_validate_expression(&msg.entity_id, ex.as_str())
149                        .await
150                    {
151                        Ok(dtype) => {
152                            let _ = expression_schema.insert(name.clone(), dtype as i32);
153                        },
154                        Err(e) => {
155                            let _ = errors.insert(name.clone(), ExprValidationError {
156                                error_message: format!("{}", e),
157                                line: 0,
158                                column: 0,
159                            });
160                        },
161                    }
162                }
163
164                respond!(msg, TableValidateExprResp {
165                    expression_schema,
166                    errors,
167                    expression_alias,
168                })
169            },
170            ViewSchemaReq(_) => {
171                respond!(msg, ViewSchemaResp {
172                    schema: self
173                        .handler
174                        .view_schema(
175                            msg.entity_id.as_str(),
176                            self.view_configs.get(&msg.entity_id).unwrap()
177                        )
178                        .await?
179                        .into_iter()
180                        .map(|(x, y)| (x, y as i32))
181                        .collect()
182                })
183            },
184            ViewDimensionsReq(_) => {
185                let view_id = &msg.entity_id;
186                let table_id = self
187                    .view_to_table
188                    .get(view_id)
189                    .ok_or_else(|| VirtualServerError::UnknownViewId(view_id.to_string()))?;
190
191                let num_table_rows = self.handler.table_size(table_id).await?;
192                let num_table_columns = self.handler.table_column_size(table_id).await? as u32;
193                let config = self.view_configs.get(view_id).unwrap();
194                let num_view_columns = self.handler.view_column_size(view_id, config).await? as u32;
195                let num_view_rows = self.handler.view_size(view_id).await?;
196                let resp = ViewDimensionsResp {
197                    num_table_columns,
198                    num_table_rows,
199                    num_view_columns,
200                    num_view_rows,
201                };
202
203                respond!(msg, ViewDimensionsResp { ..resp })
204            },
205            ViewGetConfigReq(_) => {
206                respond!(msg, ViewGetConfigResp {
207                    config: Some(
208                        ViewConfigUpdate::from(
209                            self.view_configs.get(&msg.entity_id).unwrap().clone()
210                        )
211                        .into()
212                    )
213                })
214            },
215            ViewExpressionSchemaReq(_) => {
216                let mut schema = HashMap::<String, i32>::default();
217                let table_id = self.view_to_table.get(&msg.entity_id);
218                for (name, ex) in self
219                    .view_configs
220                    .get(&msg.entity_id)
221                    .unwrap()
222                    .expressions
223                    .iter()
224                {
225                    match self
226                        .handler
227                        .table_validate_expression(table_id.unwrap(), ex.as_str())
228                        .await
229                    {
230                        Ok(dtype) => {
231                            let _ = schema.insert(name.clone(), dtype as i32);
232                        },
233                        Err(_e) => {
234                            // TODO: handle error
235                        },
236                    }
237                }
238
239                let resp = ViewExpressionSchemaResp { schema };
240                respond!(msg, ViewExpressionSchemaResp { ..resp })
241            },
242            ViewColumnPathsReq(_) => {
243                respond!(msg, ViewColumnPathsResp {
244                    paths: self
245                        .handler
246                        .view_schema(
247                            msg.entity_id.as_str(),
248                            self.view_configs.get(&msg.entity_id).unwrap()
249                        )
250                        .await?
251                        .keys()
252                        .cloned()
253                        .collect()
254                })
255            },
256            ViewToRowsStringReq(view_to_rows_string_req) => {
257                let viewport = view_to_rows_string_req.viewport.unwrap();
258                let config = self.view_configs.get(&msg.entity_id).unwrap();
259                let cols = self
260                    .handler
261                    .view_get_data(msg.entity_id.as_str(), config, &viewport)
262                    .await?;
263
264                let rows = cols.to_rows();
265                let json_string = serde_json::to_string(&rows)
266                    .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
267
268                respond!(msg, ViewToRowsStringResp { json_string })
269            },
270            ViewToColumnsStringReq(view_to_columns_string_req) => {
271                let viewport = view_to_columns_string_req.viewport.unwrap();
272                let config = self.view_configs.get(&msg.entity_id).unwrap();
273                let cols = self
274                    .handler
275                    .view_get_data(msg.entity_id.as_str(), config, &viewport)
276                    .await?;
277
278                let json_string = serde_json::to_string(&cols)
279                    .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
280
281                respond!(msg, ViewToColumnsStringResp { json_string })
282            },
283            ViewDeleteReq(_) => {
284                self.handler.view_delete(msg.entity_id.as_str()).await?;
285                self.view_to_table.shift_remove(&msg.entity_id);
286                self.view_configs.shift_remove(&msg.entity_id);
287                respond!(msg, ViewDeleteResp {})
288            },
289            MakeTableReq(req) => {
290                self.handler
291                    .make_table(&msg.entity_id, req.data.as_ref().unwrap())
292                    .await?;
293                respond!(msg, MakeTableResp {})
294            },
295
296            // Stub implementations for callback/update requests that VirtualServer doesn't support
297            TableOnDeleteReq(_) => {
298                respond!(msg, TableOnDeleteResp {})
299            },
300            ViewOnUpdateReq(_) => {
301                respond!(msg, ViewOnUpdateResp {
302                    delta: None,
303                    port_id: 0
304                })
305            },
306            ViewOnDeleteReq(_) => {
307                respond!(msg, ViewOnDeleteResp {})
308            },
309            ViewRemoveOnUpdateReq(_) => {
310                respond!(msg, ViewRemoveOnUpdateResp {})
311            },
312            TableRemoveDeleteReq(_) => {
313                respond!(msg, TableRemoveDeleteResp {})
314            },
315            ViewRemoveDeleteReq(_) => {
316                respond!(msg, ViewRemoveDeleteResp {})
317            },
318
319            x => {
320                // Return an error response instead of empty bytes
321                return Err(VirtualServerError::Other(format!(
322                    "Unhandled request: {:?}",
323                    x
324                )));
325            },
326        };
327
328        Ok(resp)
329    }
330}