Skip to main content

perspective_client/virtual_server/
server.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14
15use indexmap::IndexMap;
16use prost::Message as ProstMessage;
17use prost::bytes::{Bytes, BytesMut};
18
19use super::error::VirtualServerError;
20use super::handler::VirtualServerHandler;
21use crate::config::{ViewConfig, ViewConfigUpdate};
22use crate::proto::response::ClientResp;
23use crate::proto::table_validate_expr_resp::ExprValidationError;
24use crate::proto::{
25    GetFeaturesResp, GetHostedTablesResp, MakeTableResp, Request, Response, ServerError,
26    TableMakePortResp, TableMakeViewResp, TableOnDeleteResp, TableRemoveDeleteResp,
27    TableSchemaResp, TableSizeResp, TableValidateExprResp, ViewColumnPathsResp, ViewDeleteResp,
28    ViewDimensionsResp, ViewExpressionSchemaResp, ViewGetConfigResp, ViewOnDeleteResp,
29    ViewOnUpdateResp, ViewRemoveDeleteResp, ViewRemoveOnUpdateResp, ViewSchemaResp,
30    ViewToColumnsStringResp, ViewToRowsStringResp,
31};
32
33macro_rules! respond {
34    ($msg:ident, $name:ident { $($rest:tt)* }) => {{
35        let mut resp = BytesMut::new();
36        let resp2 = ClientResp::$name($name {
37            $($rest)*
38        });
39
40        Response {
41            msg_id: $msg.msg_id,
42            entity_id: $msg.entity_id,
43            client_resp: Some(resp2),
44        }.encode(&mut resp).map_err(VirtualServerError::EncodeError)?;
45
46        resp.freeze()
47    }};
48}
49
50/// A virtual server that processes Perspective protocol messages.
51///
52/// `VirtualServer` acts as a bridge between the Perspective protocol and a
53/// custom data backend. It handles protocol decoding/encoding and delegates
54/// actual data operations to the provided [`VirtualServerHandler`].
55pub struct VirtualServer<T: VirtualServerHandler> {
56    handler: T,
57    view_to_table: IndexMap<String, String>,
58    view_configs: IndexMap<String, ViewConfig>,
59}
60
61impl<T: VirtualServerHandler> VirtualServer<T> {
62    /// Creates a new virtual server with the given handler.
63    pub fn new(handler: T) -> Self {
64        Self {
65            handler,
66            view_configs: IndexMap::default(),
67            view_to_table: IndexMap::default(),
68        }
69    }
70
71    /// Processes a Perspective protocol request and returns the response.
72    ///
73    /// Decodes the incoming protobuf message, dispatches to the appropriate
74    /// handler method, and encodes the response.
75    pub async fn handle_request(
76        &mut self,
77        bytes: Bytes,
78    ) -> Result<Bytes, VirtualServerError<T::Error>> {
79        let msg = Request::decode(bytes).map_err(VirtualServerError::DecodeError)?;
80        tracing::debug!(
81            "Handling request: entity_id={}, req={:?}",
82            msg.entity_id,
83            msg.client_req
84        );
85
86        match self.internal_handle_request(msg.clone()).await {
87            Ok(resp) => Ok(resp),
88            Err(err) => Ok(respond!(msg, ServerError {
89                message: err.to_string(),
90                status_code: 1
91            })),
92        }
93    }
94
95    async fn internal_handle_request(
96        &mut self,
97        msg: Request,
98    ) -> Result<Bytes, VirtualServerError<T::Error>> {
99        use crate::proto::request::ClientReq::*;
100        let resp = match msg.client_req.unwrap() {
101            GetFeaturesReq(_) => {
102                let features = self.handler.get_features().await?;
103                respond!(msg, GetFeaturesResp { ..features.into() })
104            },
105            GetHostedTablesReq(_) => {
106                respond!(msg, GetHostedTablesResp {
107                    table_infos: self.handler.get_hosted_tables().await?
108                })
109            },
110            TableSchemaReq(_) => {
111                respond!(msg, TableSchemaResp {
112                    schema: self
113                        .handler
114                        .table_schema(msg.entity_id.as_str())
115                        .await
116                        .ok()
117                        .map(|value| crate::proto::Schema {
118                            schema: value
119                                .iter()
120                                .map(|x| crate::proto::schema::KeyTypePair {
121                                    name: x.0.to_string(),
122                                    r#type: *x.1 as i32,
123                                })
124                                .collect(),
125                        })
126                })
127            },
128            TableMakePortReq(req) => {
129                respond!(msg, TableMakePortResp {
130                    port_id: self.handler.table_make_port(&req).await?
131                })
132            },
133            TableMakeViewReq(req) => {
134                self.view_to_table
135                    .insert(req.view_id.clone(), msg.entity_id.clone());
136
137                let mut config: ViewConfigUpdate = req.config.clone().unwrap_or_default().into();
138                let bytes = respond!(msg, TableMakeViewResp {
139                    view_id: self
140                        .handler
141                        .table_make_view(msg.entity_id.as_str(), req.view_id.as_str(), &mut config)
142                        .await?
143                });
144
145                self.view_configs.insert(req.view_id.clone(), config.into());
146                bytes
147            },
148            TableSizeReq(_) => {
149                respond!(msg, TableSizeResp {
150                    size: self.handler.table_size(msg.entity_id.as_str()).await?
151                })
152            },
153            TableValidateExprReq(req) => {
154                let mut expression_schema = HashMap::<String, i32>::default();
155                let mut expression_alias = HashMap::<String, String>::default();
156                let mut errors = HashMap::<String, ExprValidationError>::default();
157                for (name, ex) in req.column_to_expr.iter() {
158                    let _ = expression_alias.insert(name.clone(), ex.clone());
159                    match self
160                        .handler
161                        .table_validate_expression(&msg.entity_id, ex.as_str())
162                        .await
163                    {
164                        Ok(dtype) => {
165                            let _ = expression_schema.insert(name.clone(), dtype as i32);
166                        },
167                        Err(e) => {
168                            let _ = errors.insert(name.clone(), ExprValidationError {
169                                error_message: format!("{}", e),
170                                line: 0,
171                                column: 0,
172                            });
173                        },
174                    }
175                }
176
177                respond!(msg, TableValidateExprResp {
178                    expression_schema,
179                    errors,
180                    expression_alias,
181                })
182            },
183            ViewSchemaReq(_) => {
184                respond!(msg, ViewSchemaResp {
185                    schema: self
186                        .handler
187                        .view_schema(
188                            msg.entity_id.as_str(),
189                            self.view_configs.get(&msg.entity_id).unwrap()
190                        )
191                        .await?
192                        .into_iter()
193                        .map(|(x, y)| (x, y as i32))
194                        .collect()
195                })
196            },
197            ViewDimensionsReq(_) => {
198                let view_id = &msg.entity_id;
199                let table_id = self
200                    .view_to_table
201                    .get(view_id)
202                    .ok_or_else(|| VirtualServerError::UnknownViewId(view_id.to_string()))?;
203
204                let num_table_rows = self.handler.table_size(table_id).await?;
205                let num_table_columns = self.handler.table_column_size(table_id).await? as u32;
206                let config = self.view_configs.get(view_id).unwrap();
207                let num_view_columns = self.handler.view_column_size(view_id, config).await? as u32;
208                let num_view_rows = self.handler.view_size(view_id).await?;
209                let resp = ViewDimensionsResp {
210                    num_table_columns,
211                    num_table_rows,
212                    num_view_columns,
213                    num_view_rows,
214                };
215
216                respond!(msg, ViewDimensionsResp { ..resp })
217            },
218            ViewGetConfigReq(_) => {
219                respond!(msg, ViewGetConfigResp {
220                    config: Some(
221                        ViewConfigUpdate::from(
222                            self.view_configs.get(&msg.entity_id).unwrap().clone()
223                        )
224                        .into()
225                    )
226                })
227            },
228            ViewExpressionSchemaReq(_) => {
229                let mut schema = HashMap::<String, i32>::default();
230                let table_id = self.view_to_table.get(&msg.entity_id);
231                for (name, ex) in self
232                    .view_configs
233                    .get(&msg.entity_id)
234                    .unwrap()
235                    .expressions
236                    .iter()
237                {
238                    match self
239                        .handler
240                        .table_validate_expression(table_id.unwrap(), ex.as_str())
241                        .await
242                    {
243                        Ok(dtype) => {
244                            let _ = schema.insert(name.clone(), dtype as i32);
245                        },
246                        Err(_e) => {
247                            // TODO: handle error
248                        },
249                    }
250                }
251
252                let resp = ViewExpressionSchemaResp { schema };
253                respond!(msg, ViewExpressionSchemaResp { ..resp })
254            },
255            ViewColumnPathsReq(_) => {
256                respond!(msg, ViewColumnPathsResp {
257                    paths: self
258                        .handler
259                        .view_schema(
260                            msg.entity_id.as_str(),
261                            self.view_configs.get(&msg.entity_id).unwrap()
262                        )
263                        .await?
264                        .keys()
265                        .cloned()
266                        .collect()
267                })
268            },
269            ViewToRowsStringReq(view_to_rows_string_req) => {
270                let viewport = view_to_rows_string_req.viewport.unwrap();
271                let config = self.view_configs.get(&msg.entity_id).unwrap();
272                let cols = self
273                    .handler
274                    .view_get_data(msg.entity_id.as_str(), config, &viewport)
275                    .await?;
276
277                let rows = cols.to_rows();
278                let json_string = serde_json::to_string(&rows)
279                    .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
280
281                respond!(msg, ViewToRowsStringResp { json_string })
282            },
283            ViewToColumnsStringReq(view_to_columns_string_req) => {
284                let viewport = view_to_columns_string_req.viewport.unwrap();
285                let config = self.view_configs.get(&msg.entity_id).unwrap();
286                let cols = self
287                    .handler
288                    .view_get_data(msg.entity_id.as_str(), config, &viewport)
289                    .await?;
290
291                let json_string = serde_json::to_string(&cols)
292                    .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
293
294                respond!(msg, ViewToColumnsStringResp { json_string })
295            },
296            ViewDeleteReq(_) => {
297                self.handler.view_delete(msg.entity_id.as_str()).await?;
298                self.view_to_table.shift_remove(&msg.entity_id);
299                self.view_configs.shift_remove(&msg.entity_id);
300                respond!(msg, ViewDeleteResp {})
301            },
302            MakeTableReq(req) => {
303                self.handler
304                    .make_table(&msg.entity_id, req.data.as_ref().unwrap())
305                    .await?;
306                respond!(msg, MakeTableResp {})
307            },
308
309            // Stub implementations for callback/update requests that VirtualServer doesn't support
310            TableOnDeleteReq(_) => {
311                respond!(msg, TableOnDeleteResp {})
312            },
313            ViewOnUpdateReq(_) => {
314                respond!(msg, ViewOnUpdateResp {
315                    delta: None,
316                    port_id: 0
317                })
318            },
319            ViewOnDeleteReq(_) => {
320                respond!(msg, ViewOnDeleteResp {})
321            },
322            ViewRemoveOnUpdateReq(_) => {
323                respond!(msg, ViewRemoveOnUpdateResp {})
324            },
325            TableRemoveDeleteReq(_) => {
326                respond!(msg, TableRemoveDeleteResp {})
327            },
328            ViewRemoveDeleteReq(_) => {
329                respond!(msg, ViewRemoveDeleteResp {})
330            },
331
332            x => {
333                // Return an error response instead of empty bytes
334                return Err(VirtualServerError::Other(format!(
335                    "Unhandled request: {:?}",
336                    x
337                )));
338            },
339        };
340
341        Ok(resp)
342    }
343}