Skip to main content

perspective_client/virtual_server/
server.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14
15use indexmap::IndexMap;
16use prost::Message as ProstMessage;
17use prost::bytes::{Bytes, BytesMut};
18
19use super::error::VirtualServerError;
20use super::handler::VirtualServerHandler;
21use crate::config::{ViewConfig, ViewConfigUpdate};
22use crate::proto::response::ClientResp;
23use crate::proto::table_validate_expr_resp::ExprValidationError;
24use crate::proto::{
25    ColumnType, GetFeaturesResp, GetHostedTablesResp, MakeTableResp, Request, Response,
26    ServerError, TableMakePortResp, TableMakeViewResp, TableOnDeleteResp, TableRemoveDeleteResp,
27    TableSchemaResp, TableSizeResp, TableValidateExprResp, ViewColumnPathsResp, ViewDeleteResp,
28    ViewDimensionsResp, ViewExpressionSchemaResp, ViewGetConfigResp, ViewGetMinMaxResp,
29    ViewOnDeleteResp, ViewOnUpdateResp, ViewRemoveDeleteResp, ViewRemoveOnUpdateResp,
30    ViewSchemaResp, ViewToArrowResp, ViewToColumnsStringResp, ViewToCsvResp,
31    ViewToNdjsonStringResp, ViewToRowsStringResp,
32};
33
34macro_rules! respond {
35    ($msg:ident, $name:ident { $($rest:tt)* }) => {{
36        let mut resp = BytesMut::new();
37        let resp2 = ClientResp::$name($name {
38            $($rest)*
39        });
40
41        Response {
42            msg_id: $msg.msg_id,
43            entity_id: $msg.entity_id,
44            client_resp: Some(resp2),
45        }.encode(&mut resp).map_err(VirtualServerError::EncodeError)?;
46
47        resp.freeze()
48    }};
49}
50
51/// A virtual server that processes Perspective protocol messages.
52///
53/// `VirtualServer` acts as a bridge between the Perspective protocol and a
54/// custom data backend. It handles protocol decoding/encoding and delegates
55/// actual data operations to the provided [`VirtualServerHandler`].
56pub struct VirtualServer<T: VirtualServerHandler> {
57    handler: T,
58    view_to_table: IndexMap<String, String>,
59    view_configs: IndexMap<String, ViewConfig>,
60    view_schemas: IndexMap<String, IndexMap<String, ColumnType>>,
61}
62
63impl<T: VirtualServerHandler> VirtualServer<T> {
64    /// Creates a new virtual server with the given handler.
65    pub fn new(handler: T) -> Self {
66        Self {
67            handler,
68            view_configs: IndexMap::default(),
69            view_to_table: IndexMap::default(),
70            view_schemas: IndexMap::default(),
71        }
72    }
73
74    /// Processes a Perspective protocol request and returns the response.
75    ///
76    /// Decodes the incoming protobuf message, dispatches to the appropriate
77    /// handler method, and encodes the response.
78    pub async fn handle_request(
79        &mut self,
80        bytes: Bytes,
81    ) -> Result<Bytes, VirtualServerError<T::Error>> {
82        let msg = Request::decode(bytes).map_err(VirtualServerError::DecodeError)?;
83        tracing::debug!(
84            "Handling request: entity_id={}, req={:?}",
85            msg.entity_id,
86            msg.client_req
87        );
88
89        match self.internal_handle_request(msg.clone()).await {
90            Ok(resp) => Ok(resp),
91            Err(err) => {
92                tracing::error!("{}", err);
93                Ok(respond!(msg, ServerError {
94                    message: err.to_string(),
95                    status_code: 0
96                }))
97            },
98        }
99    }
100
101    async fn get_cached_view_schema(
102        &mut self,
103        entity_id: &str,
104        to_psp_format: bool,
105    ) -> Result<IndexMap<String, ColumnType>, VirtualServerError<T::Error>> {
106        if !self.view_schemas.contains_key(entity_id) {
107            self.view_schemas.insert(
108                entity_id.to_string(),
109                self.handler
110                    .view_schema(entity_id, self.view_configs.get(entity_id).unwrap())
111                    .await?,
112            );
113        }
114
115        if to_psp_format {
116            Ok(self
117                .view_schemas
118                .get(entity_id)
119                .unwrap()
120                .iter()
121                .map(|(k, v)| {
122                    (
123                        k.split("_").collect::<Vec<_>>().last().unwrap().to_string(),
124                        *v,
125                    )
126                })
127                .collect())
128        } else {
129            Ok(self.view_schemas.get(entity_id).cloned().unwrap())
130        }
131    }
132
133    async fn internal_handle_request(
134        &mut self,
135        msg: Request,
136    ) -> Result<Bytes, VirtualServerError<T::Error>> {
137        use crate::proto::request::ClientReq::*;
138        let resp = match msg.client_req.unwrap() {
139            GetFeaturesReq(_) => {
140                let features = self.handler.get_features().await?;
141                respond!(msg, GetFeaturesResp { ..features.into() })
142            },
143            GetHostedTablesReq(_) => {
144                respond!(msg, GetHostedTablesResp {
145                    table_infos: self.handler.get_hosted_tables().await?
146                })
147            },
148            TableSchemaReq(_) => {
149                respond!(msg, TableSchemaResp {
150                    schema: Some(crate::proto::Schema {
151                        schema: self
152                            .handler
153                            .table_schema(msg.entity_id.as_str())
154                            .await?
155                            .iter()
156                            .map(|x| crate::proto::schema::KeyTypePair {
157                                name: x.0.to_string(),
158                                r#type: *x.1 as i32,
159                            })
160                            .collect()
161                    })
162                })
163            },
164            TableMakePortReq(req) => {
165                respond!(msg, TableMakePortResp {
166                    port_id: self.handler.table_make_port(&req).await?
167                })
168            },
169            TableMakeViewReq(req) => {
170                self.view_to_table
171                    .insert(req.view_id.clone(), msg.entity_id.clone());
172
173                let mut config: ViewConfigUpdate = req.config.clone().unwrap_or_default().into();
174                let bytes = respond!(msg, TableMakeViewResp {
175                    view_id: self
176                        .handler
177                        .table_make_view(msg.entity_id.as_str(), req.view_id.as_str(), &mut config)
178                        .await?
179                });
180
181                self.view_configs.insert(req.view_id.clone(), config.into());
182                bytes
183            },
184            TableSizeReq(_) => {
185                respond!(msg, TableSizeResp {
186                    size: self.handler.table_size(msg.entity_id.as_str()).await?
187                })
188            },
189            TableValidateExprReq(req) => {
190                let mut expression_schema = HashMap::<String, i32>::default();
191                let mut expression_alias = HashMap::<String, String>::default();
192                let mut errors = HashMap::<String, ExprValidationError>::default();
193                for (name, ex) in req.column_to_expr.iter() {
194                    let _ = expression_alias.insert(name.clone(), ex.clone());
195                    match self
196                        .handler
197                        .table_validate_expression(&msg.entity_id, ex.as_str())
198                        .await
199                    {
200                        Ok(dtype) => {
201                            let _ = expression_schema.insert(name.clone(), dtype as i32);
202                        },
203                        Err(e) => {
204                            let _ = errors.insert(name.clone(), ExprValidationError {
205                                error_message: format!("{}", e),
206                                line: 0,
207                                column: 0,
208                            });
209                        },
210                    }
211                }
212
213                respond!(msg, TableValidateExprResp {
214                    expression_schema,
215                    errors,
216                    expression_alias,
217                })
218            },
219            ViewSchemaReq(_) => {
220                respond!(msg, ViewSchemaResp {
221                    schema: self
222                        .get_cached_view_schema(&msg.entity_id, true)
223                        .await?
224                        .into_iter()
225                        .map(|(x, y)| (x.to_string(), y as i32))
226                        .collect()
227                })
228            },
229            ViewDimensionsReq(_) => {
230                let view_id = &msg.entity_id;
231                let table_id = self
232                    .view_to_table
233                    .get(view_id)
234                    .ok_or_else(|| VirtualServerError::UnknownViewId(view_id.to_string()))?;
235
236                let num_table_rows = self.handler.table_size(table_id).await?;
237                let num_table_columns = self.handler.table_column_size(table_id).await? as u32;
238                let config = self.view_configs.get(view_id).unwrap();
239                let num_view_columns = self.handler.view_column_size(view_id, config).await? as u32;
240                let num_view_rows = self.handler.view_size(view_id).await?;
241                let resp = ViewDimensionsResp {
242                    num_table_columns,
243                    num_table_rows,
244                    num_view_columns,
245                    num_view_rows,
246                };
247
248                respond!(msg, ViewDimensionsResp { ..resp })
249            },
250            ViewGetConfigReq(_) => {
251                respond!(msg, ViewGetConfigResp {
252                    config: Some(
253                        ViewConfigUpdate::from(
254                            self.view_configs.get(&msg.entity_id).unwrap().clone()
255                        )
256                        .into()
257                    )
258                })
259            },
260            ViewExpressionSchemaReq(_) => {
261                let mut schema = HashMap::<String, i32>::default();
262                let table_id = self.view_to_table.get(&msg.entity_id);
263                for (name, ex) in self
264                    .view_configs
265                    .get(&msg.entity_id)
266                    .unwrap()
267                    .expressions
268                    .iter()
269                {
270                    match self
271                        .handler
272                        .table_validate_expression(table_id.unwrap(), ex.as_str())
273                        .await
274                    {
275                        Ok(dtype) => {
276                            let _ = schema.insert(name.clone(), dtype as i32);
277                        },
278                        Err(_e) => {
279                            // TODO: handle error
280                        },
281                    }
282                }
283
284                let resp = ViewExpressionSchemaResp { schema };
285                respond!(msg, ViewExpressionSchemaResp { ..resp })
286            },
287            ViewColumnPathsReq(_) => {
288                respond!(msg, ViewColumnPathsResp {
289                    paths: self
290                        .handler
291                        .view_schema(
292                            msg.entity_id.as_str(),
293                            self.view_configs.get(&msg.entity_id).unwrap()
294                        )
295                        .await?
296                        .keys()
297                        .cloned()
298                        .collect()
299                })
300            },
301            ViewToArrowReq(view_to_arrow_req) => {
302                let viewport = view_to_arrow_req.viewport.unwrap();
303                let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
304                let config = self.view_configs.get(&msg.entity_id).unwrap();
305                let mut cols = self
306                    .handler
307                    .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
308                    .await?;
309
310                let arrow = cols
311                    .render_to_arrow_ipc()
312                    .map_err(|e| VirtualServerError::Other(e.to_string()))?;
313
314                respond!(msg, ViewToArrowResp { arrow })
315            },
316            ViewToCsvReq(view_to_csv_req) => {
317                let viewport = view_to_csv_req.viewport.unwrap();
318                let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
319                let config = self.view_configs.get(&msg.entity_id).unwrap();
320                let mut cols = self
321                    .handler
322                    .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
323                    .await?;
324
325                let rows = cols.render_to_rows();
326                let mut csv = String::new();
327                if let Some(first_row) = rows.first() {
328                    let headers: Vec<&str> = first_row.keys().map(|k| k.as_str()).collect();
329                    csv.push_str(&headers.join(","));
330                    csv.push('\n');
331                }
332
333                for row in &rows {
334                    let values: Vec<String> = row
335                        .values()
336                        .map(|cell| serde_json::to_string(cell).unwrap_or_default())
337                        .collect();
338                    csv.push_str(&values.join(","));
339                    csv.push('\n');
340                }
341
342                respond!(msg, ViewToCsvResp { csv })
343            },
344            ViewToNdjsonStringReq(view_to_ndjson_req) => {
345                let viewport = view_to_ndjson_req.viewport.unwrap();
346                let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
347                let config = self.view_configs.get(&msg.entity_id).unwrap();
348                let mut cols = self
349                    .handler
350                    .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
351                    .await?;
352
353                let rows = cols.render_to_rows();
354                let ndjson_string = rows
355                    .iter()
356                    .map(serde_json::to_string)
357                    .collect::<Result<Vec<_>, _>>()
358                    .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?
359                    .join("\n");
360
361                respond!(msg, ViewToNdjsonStringResp { ndjson_string })
362            },
363            ViewToRowsStringReq(view_to_rows_string_req) => {
364                let viewport = view_to_rows_string_req.viewport.unwrap();
365                let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
366                let config = self.view_configs.get(&msg.entity_id).unwrap();
367                let mut cols = self
368                    .handler
369                    .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
370                    .await?;
371
372                let rows = cols.render_to_rows();
373                let json_string = serde_json::to_string(&rows)
374                    .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?;
375
376                respond!(msg, ViewToRowsStringResp { json_string })
377            },
378            ViewToColumnsStringReq(view_to_columns_string_req) => {
379                let viewport = view_to_columns_string_req.viewport.unwrap();
380                let schema = self.get_cached_view_schema(&msg.entity_id, false).await?;
381                let config = self.view_configs.get(&msg.entity_id).unwrap();
382                let mut cols = self
383                    .handler
384                    .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport)
385                    .await?;
386
387                let json_string = cols
388                    .render_to_columns_json()
389                    .map_err(|e| VirtualServerError::Other(e.to_string()))?;
390
391                respond!(msg, ViewToColumnsStringResp { json_string })
392            },
393            ViewDeleteReq(_) => {
394                self.handler.view_delete(msg.entity_id.as_str()).await?;
395                self.view_to_table.shift_remove(&msg.entity_id);
396                self.view_configs.shift_remove(&msg.entity_id);
397                respond!(msg, ViewDeleteResp {})
398            },
399            MakeTableReq(req) => {
400                self.handler
401                    .make_table(&msg.entity_id, req.data.as_ref().unwrap())
402                    .await?;
403                respond!(msg, MakeTableResp {})
404            },
405            ViewGetMinMaxReq(req) => {
406                let config = self.view_configs.get(&msg.entity_id).unwrap();
407                let (min, max) = self
408                    .handler
409                    .view_get_min_max(&msg.entity_id, &req.column_name, config)
410                    .await?;
411                respond!(msg, ViewGetMinMaxResp {
412                    min: Some(min.into()),
413                    max: Some(max.into()),
414                })
415            },
416
417            // Stub implementations for callback/update requests that VirtualServer doesn't support
418            TableOnDeleteReq(_) => {
419                respond!(msg, TableOnDeleteResp {})
420            },
421            ViewOnUpdateReq(_) => {
422                respond!(msg, ViewOnUpdateResp {
423                    delta: None,
424                    port_id: 0
425                })
426            },
427            ViewOnDeleteReq(_) => {
428                respond!(msg, ViewOnDeleteResp {})
429            },
430            ViewRemoveOnUpdateReq(_) => {
431                respond!(msg, ViewRemoveOnUpdateResp {})
432            },
433            TableRemoveDeleteReq(_) => {
434                respond!(msg, TableRemoveDeleteResp {})
435            },
436            ViewRemoveDeleteReq(_) => {
437                respond!(msg, ViewRemoveDeleteResp {})
438            },
439            x => {
440                // Return an error response instead of empty bytes
441                return Err(VirtualServerError::Other(format!(
442                    "Unhandled request: {:?}",
443                    x
444                )));
445            },
446        };
447
448        Ok(resp)
449    }
450}