Skip to main content

perspective_client/virtual_server/
server.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14
15use indexmap::IndexMap;
16use prost::Message as ProstMessage;
17use prost::bytes::{Bytes, BytesMut};
18
19use super::data::RowPathStyle;
20use super::error::VirtualServerError;
21use super::handler::VirtualServerHandler;
22use crate::config::{ViewConfig, ViewConfigUpdate};
23use crate::proto::response::ClientResp;
24use crate::proto::table_validate_expr_resp::ExprValidationError;
25use crate::proto::{
26    ColumnType, GetFeaturesResp, GetHostedTablesResp, MakeTableResp, Request, Response,
27    ServerError, TableMakePortResp, TableMakeViewResp, TableOnDeleteResp, TableRemoveDeleteResp,
28    TableSchemaResp, TableSizeResp, TableValidateExprResp, ViewColumnPathsResp, ViewDeleteResp,
29    ViewDimensionsResp, ViewExpressionSchemaResp, ViewGetConfigResp, ViewGetMinMaxResp,
30    ViewOnDeleteResp, ViewOnUpdateResp, ViewRemoveDeleteResp, ViewRemoveOnUpdateResp,
31    ViewSchemaResp, ViewToArrowResp, ViewToColumnsStringResp, ViewToCsvResp,
32    ViewToNdjsonStringResp, ViewToRowsStringResp,
33};
34
35macro_rules! respond {
36    ($msg:ident, $name:ident { $($rest:tt)* }) => {{
37        let mut resp = BytesMut::new();
38        let resp2 = ClientResp::$name($name {
39            $($rest)*
40        });
41
42        Response {
43            msg_id: $msg.msg_id,
44            entity_id: $msg.entity_id,
45            client_resp: Some(resp2),
46        }.encode(&mut resp).map_err(VirtualServerError::EncodeError)?;
47
48        resp.freeze()
49    }};
50}
51
52/// A virtual server that processes Perspective protocol messages.
53///
54/// `VirtualServer` acts as a bridge between the Perspective protocol and a
55/// custom data backend. It handles protocol decoding/encoding and delegates
56/// actual data operations to the provided [`VirtualServerHandler`].
57pub struct VirtualServer<T: VirtualServerHandler> {
58    handler: T,
59    view_to_table: IndexMap<String, String>,
60    view_configs: IndexMap<String, ViewConfig>,
61    view_schemas: IndexMap<String, IndexMap<String, ColumnType>>,
62}
63
64impl<T: VirtualServerHandler> VirtualServer<T> {
65    /// Creates a new virtual server with the given handler.
66    pub fn new(handler: T) -> Self {
67        Self {
68            handler,
69            view_configs: IndexMap::default(),
70            view_to_table: IndexMap::default(),
71            view_schemas: IndexMap::default(),
72        }
73    }
74
75    /// Processes a Perspective protocol request and returns the response.
76    ///
77    /// Decodes the incoming protobuf message, dispatches to the appropriate
78    /// handler method, and encodes the response.
79    pub async fn handle_request(
80        &mut self,
81        bytes: Bytes,
82    ) -> Result<Bytes, VirtualServerError<T::Error>> {
83        let msg = Request::decode(bytes).map_err(VirtualServerError::DecodeError)?;
84        tracing::debug!(
85            "Handling request: entity_id={}, req={:?}",
86            msg.entity_id,
87            msg.client_req
88        );
89
90        match self.internal_handle_request(msg.clone()).await {
91            Ok(resp) => Ok(resp),
92            Err(err) => {
93                tracing::error!("{}", err);
94                Ok(respond!(msg, ServerError {
95                    message: err.to_string(),
96                    status_code: 0
97                }))
98            },
99        }
100    }
101
102    async fn get_cached_view_schema(
103        &mut self,
104        entity_id: &str,
105        to_psp_format: bool,
106    ) -> Result<IndexMap<String, ColumnType>, VirtualServerError<T::Error>> {
107        if !self.view_schemas.contains_key(entity_id) {
108            self.view_schemas.insert(
109                entity_id.to_string(),
110                self.handler
111                    .view_schema(entity_id, self.view_configs.get(entity_id).unwrap())
112                    .await?,
113            );
114        }
115
116        if to_psp_format {
117            Ok(self
118                .view_schemas
119                .get(entity_id)
120                .unwrap()
121                .iter()
122                .map(|(k, v)| {
123                    (
124                        k.split("_").collect::<Vec<_>>().last().unwrap().to_string(),
125                        *v,
126                    )
127                })
128                .collect())
129        } else {
130            Ok(self.view_schemas.get(entity_id).cloned().unwrap())
131        }
132    }
133
134    async fn internal_handle_request(
135        &mut self,
136        msg: Request,
137    ) -> Result<Bytes, VirtualServerError<T::Error>> {
138        use crate::proto::request::ClientReq::*;
139        let resp = match msg.client_req.unwrap() {
140            GetFeaturesReq(_) => {
141                let features = self.handler.get_features().await?;
142                respond!(msg, GetFeaturesResp { ..features.into() })
143            },
144            GetHostedTablesReq(_) => {
145                respond!(msg, GetHostedTablesResp {
146                    table_infos: self.handler.get_hosted_tables().await?
147                })
148            },
149            TableSchemaReq(_) => {
150                respond!(msg, TableSchemaResp {
151                    schema: Some(crate::proto::Schema {
152                        schema: self
153                            .handler
154                            .table_schema(msg.entity_id.as_str())
155                            .await?
156                            .iter()
157                            .map(|x| crate::proto::schema::KeyTypePair {
158                                name: x.0.to_string(),
159                                r#type: *x.1 as i32,
160                            })
161                            .collect()
162                    })
163                })
164            },
165            TableMakePortReq(req) => {
166                respond!(msg, TableMakePortResp {
167                    port_id: self.handler.table_make_port(&req).await?
168                })
169            },
170            TableMakeViewReq(req) => {
171                self.view_to_table
172                    .insert(req.view_id.clone(), msg.entity_id.clone());
173
174                let mut config: ViewConfigUpdate = req.config.clone().unwrap_or_default().into();
175                let bytes = respond!(msg, TableMakeViewResp {
176                    view_id: self
177                        .handler
178                        .table_make_view(msg.entity_id.as_str(), req.view_id.as_str(), &mut config)
179                        .await?
180                });
181
182                self.view_configs.insert(req.view_id.clone(), config.into());
183                bytes
184            },
185            TableSizeReq(_) => {
186                respond!(msg, TableSizeResp {
187                    size: self.handler.table_size(msg.entity_id.as_str()).await?
188                })
189            },
190            TableValidateExprReq(req) => {
191                let mut expression_schema = HashMap::<String, i32>::default();
192                let mut expression_alias = HashMap::<String, String>::default();
193                let mut errors = HashMap::<String, ExprValidationError>::default();
194                for (name, ex) in req.column_to_expr.iter() {
195                    let _ = expression_alias.insert(name.clone(), ex.clone());
196                    match self
197                        .handler
198                        .table_validate_expression(&msg.entity_id, ex.as_str())
199                        .await
200                    {
201                        Ok(dtype) => {
202                            let _ = expression_schema.insert(name.clone(), dtype as i32);
203                        },
204                        Err(e) => {
205                            let _ = errors.insert(name.clone(), ExprValidationError {
206                                error_message: format!("{}", e),
207                                line: 0,
208                                column: 0,
209                            });
210                        },
211                    }
212                }
213
214                respond!(msg, TableValidateExprResp {
215                    expression_schema,
216                    errors,
217                    expression_alias,
218                })
219            },
220            ViewSchemaReq(_) => {
221                respond!(msg, ViewSchemaResp {
222                    schema: self
223                        .get_cached_view_schema(&msg.entity_id, true)
224                        .await?
225                        .into_iter()
226                        .map(|(x, y)| (x.to_string(), y as i32))
227                        .collect()
228                })
229            },
230            ViewDimensionsReq(_) => {
231                let view_id = &msg.entity_id;
232                let table_id = self
233                    .view_to_table
234                    .get(view_id)
235                    .ok_or_else(|| VirtualServerError::UnknownViewId(view_id.to_string()))?;
236
237                let num_table_rows = self.handler.table_size(table_id).await?;
238                let num_table_columns = self.handler.table_column_size(table_id).await? as u32;
239                let config = self.view_configs.get(view_id).unwrap();
240                let num_view_columns = self.handler.view_column_size(view_id, config).await? as u32;
241                let num_view_rows = self.handler.view_size(view_id).await?;
242                let resp = ViewDimensionsResp {
243                    num_table_columns,
244                    num_table_rows,
245                    num_view_columns,
246                    num_view_rows,
247                };
248
249                respond!(msg, ViewDimensionsResp { ..resp })
250            },
251            ViewGetConfigReq(_) => {
252                respond!(msg, ViewGetConfigResp {
253                    config: Some(
254                        ViewConfigUpdate::from(
255                            self.view_configs.get(&msg.entity_id).unwrap().clone()
256                        )
257                        .into()
258                    )
259                })
260            },
261            ViewExpressionSchemaReq(_) => {
262                let mut schema = HashMap::<String, i32>::default();
263                let table_id = self.view_to_table.get(&msg.entity_id);
264                for (name, ex) in self
265                    .view_configs
266                    .get(&msg.entity_id)
267                    .unwrap()
268                    .expressions
269                    .iter()
270                {
271                    match self
272                        .handler
273                        .table_validate_expression(table_id.unwrap(), ex.as_str())
274                        .await
275                    {
276                        Ok(dtype) => {
277                            let _ = schema.insert(name.clone(), dtype as i32);
278                        },
279                        Err(_e) => {
280                            // TODO: handle error
281                        },
282                    }
283                }
284
285                let resp = ViewExpressionSchemaResp { schema };
286                respond!(msg, ViewExpressionSchemaResp { ..resp })
287            },
288            ViewColumnPathsReq(_) => {
289                respond!(msg, ViewColumnPathsResp {
290                    paths: self
291                        .handler
292                        .view_schema(
293                            msg.entity_id.as_str(),
294                            self.view_configs.get(&msg.entity_id).unwrap()
295                        )
296                        .await?
297                        .keys()
298                        .cloned()
299                        .collect()
300                })
301            },
302            ViewToArrowReq(view_to_arrow_req) => {
303                let viewport = view_to_arrow_req.viewport.unwrap();
304                let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
305                let config = self.view_configs.get(&msg.entity_id).unwrap();
306                let mut cols = self
307                    .handler
308                    .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
309                    .await?;
310
311                let arrow = cols
312                    .render_to_arrow_ipc()
313                    .map_err(|e| VirtualServerError::Other(e.to_string()))?;
314
315                respond!(msg, ViewToArrowResp { arrow })
316            },
317            ViewToCsvReq(view_to_csv_req) => {
318                let viewport = view_to_csv_req.viewport.unwrap();
319                let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
320                let config = self.view_configs.get(&msg.entity_id).unwrap();
321                let mut cols = self
322                    .handler
323                    .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
324                    .await?;
325
326                let rows = cols.render_to_rows(RowPathStyle::PerLevel);
327                let mut csv = String::new();
328                if let Some(first_row) = rows.first() {
329                    let headers: Vec<&str> = first_row.keys().map(|k| k.as_str()).collect();
330                    csv.push_str(&headers.join(","));
331                    csv.push('\n');
332                }
333
334                for row in &rows {
335                    let values: Vec<String> = row
336                        .values()
337                        .map(|cell| serde_json::to_string(cell).unwrap_or_default())
338                        .collect();
339                    csv.push_str(&values.join(","));
340                    csv.push('\n');
341                }
342
343                respond!(msg, ViewToCsvResp { csv })
344            },
345            ViewToNdjsonStringReq(view_to_ndjson_req) => {
346                let viewport = view_to_ndjson_req.viewport.unwrap();
347                let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
348                let config = self.view_configs.get(&msg.entity_id).unwrap();
349                let mut cols = self
350                    .handler
351                    .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
352                    .await?;
353
354                let rows = cols.render_to_rows(RowPathStyle::PerLevel);
355                let ndjson_string = rows
356                    .iter()
357                    .map(serde_json::to_string)
358                    .collect::<Result<Vec<_>, _>>()
359                    .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?
360                    .join("\n");
361
362                respond!(msg, ViewToNdjsonStringResp { ndjson_string })
363            },
364            ViewToRowsStringReq(view_to_rows_string_req) => {
365                let viewport = view_to_rows_string_req.viewport.unwrap();
366                let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
367                let config = self.view_configs.get(&msg.entity_id).unwrap();
368                let mut cols = self
369                    .handler
370                    .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
371                    .await?;
372
373                let rows = cols.render_to_rows(RowPathStyle::Sidecar);
374                let json_string = serde_json::to_string(&rows)
375                    .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
376
377                respond!(msg, ViewToRowsStringResp { json_string })
378            },
379            ViewToColumnsStringReq(view_to_columns_string_req) => {
380                let viewport = view_to_columns_string_req.viewport.unwrap();
381                let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
382                let config = self.view_configs.get(&msg.entity_id).unwrap();
383                let mut cols = self
384                    .handler
385                    .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
386                    .await?;
387
388                let json_string = cols
389                    .render_to_columns_json(RowPathStyle::Sidecar)
390                    .map_err(|e| VirtualServerError::Other(e.to_string()))?;
391
392                respond!(msg, ViewToColumnsStringResp { json_string })
393            },
394            ViewDeleteReq(_) => {
395                self.handler.view_delete(msg.entity_id.as_str()).await?;
396                self.view_to_table.shift_remove(&msg.entity_id);
397                self.view_configs.shift_remove(&msg.entity_id);
398                respond!(msg, ViewDeleteResp {})
399            },
400            MakeTableReq(req) => {
401                self.handler
402                    .make_table(&msg.entity_id, req.data.as_ref().unwrap())
403                    .await?;
404                respond!(msg, MakeTableResp {})
405            },
406            ViewGetMinMaxReq(req) => {
407                let config = self.view_configs.get(&msg.entity_id).unwrap();
408                let (min, max) = self
409                    .handler
410                    .view_get_min_max(&msg.entity_id, &req.column_name, config)
411                    .await?;
412                respond!(msg, ViewGetMinMaxResp {
413                    min: Some(min.into()),
414                    max: Some(max.into()),
415                })
416            },
417
418            // Stub implementations for callback/update requests that VirtualServer doesn't support
419            TableOnDeleteReq(_) => {
420                respond!(msg, TableOnDeleteResp {})
421            },
422            ViewOnUpdateReq(_) => {
423                respond!(msg, ViewOnUpdateResp {
424                    delta: None,
425                    port_id: 0
426                })
427            },
428            ViewOnDeleteReq(_) => {
429                respond!(msg, ViewOnDeleteResp {})
430            },
431            ViewRemoveOnUpdateReq(_) => {
432                respond!(msg, ViewRemoveOnUpdateResp {})
433            },
434            TableRemoveDeleteReq(_) => {
435                respond!(msg, TableRemoveDeleteResp {})
436            },
437            ViewRemoveDeleteReq(_) => {
438                respond!(msg, ViewRemoveDeleteResp {})
439            },
440            x => {
441                // Return an error response instead of empty bytes
442                return Err(VirtualServerError::Other(format!(
443                    "Unhandled request: {:?}",
444                    x
445                )));
446            },
447        };
448
449        Ok(resp)
450    }
451}