Skip to main content

perspective_client/virtual_server/
server.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14
15use indexmap::IndexMap;
16use prost::Message as ProstMessage;
17use prost::bytes::{Bytes, BytesMut};
18
19use super::error::VirtualServerError;
20use super::handler::VirtualServerHandler;
21use crate::config::{ViewConfig, ViewConfigUpdate};
22use crate::proto::response::ClientResp;
23use crate::proto::table_validate_expr_resp::ExprValidationError;
24use crate::proto::{
25    ColumnType, GetFeaturesResp, GetHostedTablesResp, MakeTableResp, Request, Response,
26    ServerError, TableMakePortResp, TableMakeViewResp, TableOnDeleteResp, TableRemoveDeleteResp,
27    TableSchemaResp, TableSizeResp, TableValidateExprResp, ViewColumnPathsResp, ViewDeleteResp,
28    ViewDimensionsResp, ViewExpressionSchemaResp, ViewGetConfigResp, ViewOnDeleteResp,
29    ViewOnUpdateResp, ViewRemoveDeleteResp, ViewRemoveOnUpdateResp, ViewSchemaResp,
30    ViewToColumnsStringResp, ViewToRowsStringResp,
31};
32
33macro_rules! respond {
34    ($msg:ident, $name:ident { $($rest:tt)* }) => {{
35        let mut resp = BytesMut::new();
36        let resp2 = ClientResp::$name($name {
37            $($rest)*
38        });
39
40        Response {
41            msg_id: $msg.msg_id,
42            entity_id: $msg.entity_id,
43            client_resp: Some(resp2),
44        }.encode(&mut resp).map_err(VirtualServerError::EncodeError)?;
45
46        resp.freeze()
47    }};
48}
49
50/// A virtual server that processes Perspective protocol messages.
51///
52/// `VirtualServer` acts as a bridge between the Perspective protocol and a
53/// custom data backend. It handles protocol decoding/encoding and delegates
54/// actual data operations to the provided [`VirtualServerHandler`].
55pub struct VirtualServer<T: VirtualServerHandler> {
56    handler: T,
57    view_to_table: IndexMap<String, String>,
58    view_configs: IndexMap<String, ViewConfig>,
59    view_schemas: IndexMap<String, IndexMap<String, ColumnType>>,
60}
61
62impl<T: VirtualServerHandler> VirtualServer<T> {
63    /// Creates a new virtual server with the given handler.
64    pub fn new(handler: T) -> Self {
65        Self {
66            handler,
67            view_configs: IndexMap::default(),
68            view_to_table: IndexMap::default(),
69            view_schemas: IndexMap::default(),
70        }
71    }
72
73    /// Processes a Perspective protocol request and returns the response.
74    ///
75    /// Decodes the incoming protobuf message, dispatches to the appropriate
76    /// handler method, and encodes the response.
77    pub async fn handle_request(
78        &mut self,
79        bytes: Bytes,
80    ) -> Result<Bytes, VirtualServerError<T::Error>> {
81        let msg = Request::decode(bytes).map_err(VirtualServerError::DecodeError)?;
82        tracing::debug!(
83            "Handling request: entity_id={}, req={:?}",
84            msg.entity_id,
85            msg.client_req
86        );
87
88        match self.internal_handle_request(msg.clone()).await {
89            Ok(resp) => Ok(resp),
90            Err(err) => {
91                tracing::error!("{}", err);
92                Ok(respond!(msg, ServerError {
93                    message: err.to_string(),
94                    status_code: 1
95                }))
96            },
97        }
98    }
99
100    async fn get_cached_view_schema(
101        &mut self,
102        entity_id: &str,
103        to_psp_format: bool,
104    ) -> Result<IndexMap<String, ColumnType>, VirtualServerError<T::Error>> {
105        if !self.view_schemas.contains_key(entity_id) {
106            self.view_schemas.insert(
107                entity_id.to_string(),
108                self.handler
109                    .view_schema(entity_id, self.view_configs.get(entity_id).unwrap())
110                    .await?,
111            );
112        }
113
114        if to_psp_format {
115            Ok(self
116                .view_schemas
117                .get(entity_id)
118                .unwrap()
119                .iter()
120                .map(|(k, v)| {
121                    (
122                        k.split("_").collect::<Vec<_>>().last().unwrap().to_string(),
123                        *v,
124                    )
125                })
126                .collect())
127        } else {
128            Ok(self.view_schemas.get(entity_id).cloned().unwrap())
129        }
130    }
131
132    async fn internal_handle_request(
133        &mut self,
134        msg: Request,
135    ) -> Result<Bytes, VirtualServerError<T::Error>> {
136        use crate::proto::request::ClientReq::*;
137        let resp = match msg.client_req.unwrap() {
138            GetFeaturesReq(_) => {
139                let features = self.handler.get_features().await?;
140                respond!(msg, GetFeaturesResp { ..features.into() })
141            },
142            GetHostedTablesReq(_) => {
143                respond!(msg, GetHostedTablesResp {
144                    table_infos: self.handler.get_hosted_tables().await?
145                })
146            },
147            TableSchemaReq(_) => {
148                respond!(msg, TableSchemaResp {
149                    schema: Some(crate::proto::Schema {
150                        schema: self
151                            .handler
152                            .table_schema(msg.entity_id.as_str())
153                            .await?
154                            .iter()
155                            .map(|x| crate::proto::schema::KeyTypePair {
156                                name: x.0.to_string(),
157                                r#type: *x.1 as i32,
158                            })
159                            .collect()
160                    })
161                })
162            },
163            TableMakePortReq(req) => {
164                respond!(msg, TableMakePortResp {
165                    port_id: self.handler.table_make_port(&req).await?
166                })
167            },
168            TableMakeViewReq(req) => {
169                self.view_to_table
170                    .insert(req.view_id.clone(), msg.entity_id.clone());
171
172                let mut config: ViewConfigUpdate = req.config.clone().unwrap_or_default().into();
173                let bytes = respond!(msg, TableMakeViewResp {
174                    view_id: self
175                        .handler
176                        .table_make_view(msg.entity_id.as_str(), req.view_id.as_str(), &mut config)
177                        .await?
178                });
179
180                self.view_configs.insert(req.view_id.clone(), config.into());
181                bytes
182            },
183            TableSizeReq(_) => {
184                respond!(msg, TableSizeResp {
185                    size: self.handler.table_size(msg.entity_id.as_str()).await?
186                })
187            },
188            TableValidateExprReq(req) => {
189                let mut expression_schema = HashMap::<String, i32>::default();
190                let mut expression_alias = HashMap::<String, String>::default();
191                let mut errors = HashMap::<String, ExprValidationError>::default();
192                for (name, ex) in req.column_to_expr.iter() {
193                    let _ = expression_alias.insert(name.clone(), ex.clone());
194                    match self
195                        .handler
196                        .table_validate_expression(&msg.entity_id, ex.as_str())
197                        .await
198                    {
199                        Ok(dtype) => {
200                            let _ = expression_schema.insert(name.clone(), dtype as i32);
201                        },
202                        Err(e) => {
203                            let _ = errors.insert(name.clone(), ExprValidationError {
204                                error_message: format!("{}", e),
205                                line: 0,
206                                column: 0,
207                            });
208                        },
209                    }
210                }
211
212                respond!(msg, TableValidateExprResp {
213                    expression_schema,
214                    errors,
215                    expression_alias,
216                })
217            },
218            ViewSchemaReq(_) => {
219                respond!(msg, ViewSchemaResp {
220                    schema: self
221                        .get_cached_view_schema(&msg.entity_id, true)
222                        .await?
223                        .into_iter()
224                        .map(|(x, y)| (x.to_string(), y as i32))
225                        .collect()
226                })
227            },
228            ViewDimensionsReq(_) => {
229                let view_id = &msg.entity_id;
230                let table_id = self
231                    .view_to_table
232                    .get(view_id)
233                    .ok_or_else(|| VirtualServerError::UnknownViewId(view_id.to_string()))?;
234
235                let num_table_rows = self.handler.table_size(table_id).await?;
236                let num_table_columns = self.handler.table_column_size(table_id).await? as u32;
237                let config = self.view_configs.get(view_id).unwrap();
238                let num_view_columns = self.handler.view_column_size(view_id, config).await? as u32;
239                let num_view_rows = self.handler.view_size(view_id).await?;
240                let resp = ViewDimensionsResp {
241                    num_table_columns,
242                    num_table_rows,
243                    num_view_columns,
244                    num_view_rows,
245                };
246
247                respond!(msg, ViewDimensionsResp { ..resp })
248            },
249            ViewGetConfigReq(_) => {
250                respond!(msg, ViewGetConfigResp {
251                    config: Some(
252                        ViewConfigUpdate::from(
253                            self.view_configs.get(&msg.entity_id).unwrap().clone()
254                        )
255                        .into()
256                    )
257                })
258            },
259            ViewExpressionSchemaReq(_) => {
260                let mut schema = HashMap::<String, i32>::default();
261                let table_id = self.view_to_table.get(&msg.entity_id);
262                for (name, ex) in self
263                    .view_configs
264                    .get(&msg.entity_id)
265                    .unwrap()
266                    .expressions
267                    .iter()
268                {
269                    match self
270                        .handler
271                        .table_validate_expression(table_id.unwrap(), ex.as_str())
272                        .await
273                    {
274                        Ok(dtype) => {
275                            let _ = schema.insert(name.clone(), dtype as i32);
276                        },
277                        Err(_e) => {
278                            // TODO: handle error
279                        },
280                    }
281                }
282
283                let resp = ViewExpressionSchemaResp { schema };
284                respond!(msg, ViewExpressionSchemaResp { ..resp })
285            },
286            ViewColumnPathsReq(_) => {
287                respond!(msg, ViewColumnPathsResp {
288                    paths: self
289                        .handler
290                        .view_schema(
291                            msg.entity_id.as_str(),
292                            self.view_configs.get(&msg.entity_id).unwrap()
293                        )
294                        .await?
295                        .keys()
296                        .cloned()
297                        .collect()
298                })
299            },
300            ViewToRowsStringReq(view_to_rows_string_req) => {
301                let viewport = view_to_rows_string_req.viewport.unwrap();
302                let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
303                let config = self.view_configs.get(&msg.entity_id).unwrap();
304                let cols = self
305                    .handler
306                    .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
307                    .await?;
308
309                let rows = cols.to_rows();
310                let json_string = serde_json::to_string(&rows)
311                    .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
312
313                respond!(msg, ViewToRowsStringResp { json_string })
314            },
315            ViewToColumnsStringReq(view_to_columns_string_req) => {
316                let viewport = view_to_columns_string_req.viewport.unwrap();
317                let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
318                let config = self.view_configs.get(&msg.entity_id).unwrap();
319                let cols = self
320                    .handler
321                    .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
322                    .await?;
323
324                let json_string = serde_json::to_string(&cols)
325                    .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
326
327                respond!(msg, ViewToColumnsStringResp { json_string })
328            },
329            ViewDeleteReq(_) => {
330                self.handler.view_delete(msg.entity_id.as_str()).await?;
331                self.view_to_table.shift_remove(&msg.entity_id);
332                self.view_configs.shift_remove(&msg.entity_id);
333                respond!(msg, ViewDeleteResp {})
334            },
335            MakeTableReq(req) => {
336                self.handler
337                    .make_table(&msg.entity_id, req.data.as_ref().unwrap())
338                    .await?;
339                respond!(msg, MakeTableResp {})
340            },
341
342            // Stub implementations for callback/update requests that VirtualServer doesn't support
343            TableOnDeleteReq(_) => {
344                respond!(msg, TableOnDeleteResp {})
345            },
346            ViewOnUpdateReq(_) => {
347                respond!(msg, ViewOnUpdateResp {
348                    delta: None,
349                    port_id: 0
350                })
351            },
352            ViewOnDeleteReq(_) => {
353                respond!(msg, ViewOnDeleteResp {})
354            },
355            ViewRemoveOnUpdateReq(_) => {
356                respond!(msg, ViewRemoveOnUpdateResp {})
357            },
358            TableRemoveDeleteReq(_) => {
359                respond!(msg, TableRemoveDeleteResp {})
360            },
361            ViewRemoveDeleteReq(_) => {
362                respond!(msg, ViewRemoveDeleteResp {})
363            },
364
365            x => {
366                // Return an error response instead of empty bytes
367                return Err(VirtualServerError::Other(format!(
368                    "Unhandled request: {:?}",
369                    x
370                )));
371            },
372        };
373
374        Ok(resp)
375    }
376}