pinot_client_rust/response/
sql.rs

1use bimap::BiMap;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4
5use crate::errors::{Error, Result};
6use crate::response::{DataType, ResponseStats};
7use crate::response::raw::{RawBrokerResponse, RawBrokerResponseWithoutStats, RawSchema, RawTable};
8
9/// Data structure for a broker response to an SQL query.
10#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
11pub struct SqlResponse<T: FromRow> {
12    pub table: Option<Table<T>>,
13    pub stats: Option<ResponseStats>,
14}
15
16impl<T: FromRow> From<RawBrokerResponse> for Result<SqlResponse<T>> {
17    fn from(raw: RawBrokerResponse) -> Self {
18        if !raw.exceptions.is_empty() {
19            return Err(Error::PinotExceptions(raw.exceptions));
20        };
21
22        let table: Option<Table<T>> = match raw.result_table {
23            None => None,
24            Some(raw) => Some(Result::from(raw)?),
25        };
26        Ok(SqlResponse {
27            table,
28            stats: Some(ResponseStats {
29                trace_info: raw.trace_info,
30                num_servers_queried: raw.num_servers_queried,
31                num_servers_responded: raw.num_servers_responded,
32                num_segments_queried: raw.num_segments_queried,
33                num_segments_processed: raw.num_segments_processed,
34                num_segments_matched: raw.num_segments_matched,
35                num_consuming_segments_queried: raw.num_consuming_segments_queried,
36                num_docs_scanned: raw.num_docs_scanned,
37                num_entries_scanned_in_filter: raw.num_entries_scanned_in_filter,
38                num_entries_scanned_post_filter: raw.num_entries_scanned_post_filter,
39                num_groups_limit_reached: raw.num_groups_limit_reached,
40                total_docs: raw.total_docs,
41                time_used_ms: raw.time_used_ms,
42                min_consuming_freshness_time_ms: raw.min_consuming_freshness_time_ms,
43            }),
44        })
45    }
46}
47
48impl<T: FromRow> From<RawBrokerResponseWithoutStats> for Result<SqlResponse<T>> {
49    fn from(raw: RawBrokerResponseWithoutStats) -> Self {
50        if !raw.exceptions.is_empty() {
51            return Err(Error::PinotExceptions(raw.exceptions));
52        };
53
54        let table: Option<Table<T>> = match raw.result_table {
55            None => None,
56            Some(raw) => Some(Result::from(raw)?),
57        };
58        Ok(SqlResponse {
59            table,
60            stats: None,
61        })
62    }
63}
64
65/// Represents any structure which can deserialize
66/// a table row of json fields provided a `Schema`
67pub trait FromRow: Sized {
68    fn from_row(
69        data_schema: &Schema,
70        row: Vec<Value>,
71    ) -> std::result::Result<Self, serde_json::Error>;
72}
73
74/// Holder for SQL queries.
75#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
76pub struct Table<T: FromRow> {
77    schema: Schema,
78    rows: Vec<T>,
79}
80
81impl<T: FromRow> Table<T> {
82    pub fn new(
83        schema: Schema,
84        rows: Vec<T>,
85    ) -> Self {
86        Table { schema, rows }
87    }
88
89    /// Returns the schema
90    pub fn get_schema(&self) -> &Schema {
91        &self.schema
92    }
93
94    /// Returns how many rows in the Table
95    pub fn get_row_count(&self) -> usize {
96        self.rows.len()
97    }
98
99    /// Returns a row given a row index
100    pub fn get_row(&self, row_index: usize) -> Result<&T> {
101        self.rows.get(row_index).ok_or(Error::InvalidResultRowIndex(row_index))
102    }
103
104    /// Converts result table into rows vector
105    pub fn into_rows(self) -> Vec<T> {
106        self.rows
107    }
108
109    /// Converts result table into a `Schema` and rows vector
110    pub fn into_schema_and_rows(self) -> (Schema, Vec<T>) {
111        (self.schema, self.rows)
112    }
113}
114
115impl<T: FromRow> From<RawTable> for Result<Table<T>> {
116    fn from(raw: RawTable) -> Self {
117        let schema: Schema = raw.schema.into();
118        let rows = raw.rows
119            .into_iter()
120            .map(|row| T::from_row(&schema, row))
121            .collect::<std::result::Result<Vec<T>, serde_json::Error>>()?;
122        Ok(Table::new(schema, rows))
123    }
124}
125
126/// Schema with a bimap to allow easy name <-> index retrieval
127#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
128pub struct Schema {
129    column_data_types: Vec<DataType>,
130    column_name_to_index: bimap::BiMap::<String, usize>,
131}
132
133impl Schema {
134    pub fn new(
135        column_data_types: Vec<DataType>,
136        column_name_to_index: bimap::BiMap::<String, usize>,
137    ) -> Self {
138        Self { column_data_types, column_name_to_index }
139    }
140
141    /// Returns how many columns in the Table
142    pub fn get_column_count(&self) -> usize {
143        self.column_data_types.len()
144    }
145
146    /// Returns column name given a column index
147    pub fn get_column_name(&self, column_index: usize) -> Result<&str> {
148        self.column_name_to_index.get_by_right(&column_index)
149            .map(|column| column.as_str())
150            .ok_or(Error::InvalidResultColumnIndex(column_index))
151    }
152
153    /// Returns column index given a column name
154    pub fn get_column_index(&self, column_name: &str) -> Result<usize> {
155        self.column_name_to_index.get_by_left(column_name)
156            .copied()
157            .ok_or_else(|| Error::InvalidResultColumnName(column_name.to_string()))
158    }
159
160    /// Returns column data type given a column index
161    pub fn get_column_data_type(&self, column_index: usize) -> Result<DataType> {
162        self.column_data_types.get(column_index)
163            .copied()
164            .ok_or(Error::InvalidResultColumnIndex(column_index))
165    }
166
167    /// Returns column data type given a column index
168    pub fn get_column_data_type_by_name(&self, column_name: &str) -> Result<DataType> {
169        let column_index = self.get_column_index(column_name)?;
170        self.get_column_data_type(column_index)
171    }
172
173    /// Returns column data types
174    pub fn get_colum_data_types(&self) -> &[DataType] {
175        &self.column_data_types
176    }
177
178    /// Returns column data types
179    pub fn get_column_name_to_index_map(&self) -> &BiMap<String, usize> {
180        &self.column_name_to_index
181    }
182
183    pub fn into_data_types_and_name_to_index_map(self) -> (Vec<DataType>, BiMap<String, usize>) {
184        (self.column_data_types, self.column_name_to_index)
185    }
186}
187
188impl From<RawSchema> for Schema {
189    fn from(raw_schema: RawSchema) -> Self {
190        let column_data_types = raw_schema.column_data_types;
191        let mut column_name_to_index: BiMap::<String, usize> = BiMap::with_capacity(
192            raw_schema.column_names.len());
193        for (index, column_name) in raw_schema.column_names.into_iter().enumerate() {
194            column_name_to_index.insert(column_name, index);
195        }
196        Schema { column_data_types, column_name_to_index }
197    }
198}
199
200#[cfg(test)]
201pub(crate) mod tests {
202    use std::iter::FromIterator;
203
204    use serde::Deserialize;
205    use serde_json::json;
206
207    use crate::response::{DataType::Double as DubT, DataType::Int as IntT, DataType::Long as LngT, PinotException};
208    use crate::response::data::{
209        Data::Double as DubD,
210        Data::Long as LngD,
211    };
212    use crate::response::data::DataRow;
213    use crate::response::data::tests::test_data_row;
214    use crate::tests::to_string_vec;
215
216    use super::*;
217
218    #[test]
219    fn sql_response_with_pinot_data_types_converts_from_raw_broker_response() {
220        let raw_broker_response = RawBrokerResponse {
221            aggregation_results: vec![],
222            selection_results: None,
223            result_table: Some(RawTable {
224                schema: RawSchema {
225                    column_data_types: vec![LngT, DubT],
226                    column_names: to_string_vec(vec!["cnt", "score"]),
227                },
228                rows: vec![vec![json!(97889), json!(232.1)]],
229            }),
230            exceptions: vec![],
231            trace_info: Default::default(),
232            num_servers_queried: 1,
233            num_servers_responded: 2,
234            num_segments_queried: 3,
235            num_segments_processed: 4,
236            num_segments_matched: 5,
237            num_consuming_segments_queried: 6,
238            num_docs_scanned: 7,
239            num_entries_scanned_in_filter: 8,
240            num_entries_scanned_post_filter: 9,
241            num_groups_limit_reached: false,
242            total_docs: 10,
243            time_used_ms: 11,
244            min_consuming_freshness_time_ms: 12,
245        };
246        let broker_response: SqlResponse<DataRow> = Result::from(raw_broker_response).unwrap();
247        assert_eq!(broker_response, SqlResponse {
248            table: Some(Table {
249                schema: Schema {
250                    column_data_types: vec![LngT, DubT],
251                    column_name_to_index: BiMap::from_iter(vec![
252                        ("cnt".to_string(), 0), ("score".to_string(), 1),
253                    ]),
254                },
255                rows: vec![DataRow::new(vec![LngD(97889), DubD(232.1)])],
256            }),
257            stats: Some(ResponseStats {
258                trace_info: Default::default(),
259                num_servers_queried: 1,
260                num_servers_responded: 2,
261                num_segments_queried: 3,
262                num_segments_processed: 4,
263                num_segments_matched: 5,
264                num_consuming_segments_queried: 6,
265                num_docs_scanned: 7,
266                num_entries_scanned_in_filter: 8,
267                num_entries_scanned_post_filter: 9,
268                num_groups_limit_reached: false,
269                total_docs: 10,
270                time_used_ms: 11,
271                min_consuming_freshness_time_ms: 12,
272            }),
273        });
274    }
275
276    #[test]
277    fn sql_response_with_pinot_data_types_converts_from_raw_broker_response_without_stats() {
278        let raw_broker_response = RawBrokerResponseWithoutStats {
279            aggregation_results: vec![],
280            selection_results: None,
281            result_table: Some(RawTable {
282                schema: RawSchema {
283                    column_data_types: vec![LngT, DubT],
284                    column_names: to_string_vec(vec!["cnt", "score"]),
285                },
286                rows: vec![vec![json!(97889), json!(232.1)]],
287            }),
288            exceptions: vec![],
289        };
290        let broker_response: SqlResponse<DataRow> = Result::from(raw_broker_response).unwrap();
291
292        assert_eq!(broker_response, SqlResponse {
293            table: Some(Table {
294                schema: Schema {
295                    column_data_types: vec![LngT, DubT],
296                    column_name_to_index: BiMap::from_iter(vec![
297                        ("cnt".to_string(), 0), ("score".to_string(), 1),
298                    ]),
299                },
300                rows: vec![DataRow::new(vec![LngD(97889), DubD(232.1)])],
301            }),
302            stats: None,
303        });
304    }
305
306    #[test]
307    fn pql_response_deserializes_exceptions_correctly() {
308        let raw_broker_response = RawBrokerResponse {
309            aggregation_results: vec![],
310            selection_results: None,
311            result_table: None,
312            exceptions: vec![PinotException { error_code: 0, message: "msg".to_string() }],
313            trace_info: Default::default(),
314            num_servers_queried: 1,
315            num_servers_responded: 2,
316            num_segments_queried: 3,
317            num_segments_processed: 4,
318            num_segments_matched: 5,
319            num_consuming_segments_queried: 6,
320            num_docs_scanned: 7,
321            num_entries_scanned_in_filter: 8,
322            num_entries_scanned_post_filter: 9,
323            num_groups_limit_reached: false,
324            total_docs: 10,
325            time_used_ms: 11,
326            min_consuming_freshness_time_ms: 12,
327        };
328        let broker_response: Result<SqlResponse<DataRow>> = Result::from(raw_broker_response);
329        match broker_response.unwrap_err() {
330            Error::PinotExceptions(exceptions) => assert_eq!(
331                exceptions, vec![PinotException { error_code: 0, message: "msg".to_string() }]),
332            _ => panic!("Wrong variant")
333        };
334    }
335
336    #[test]
337    fn pql_response_deserializes_exceptions_without_stats_correctly() {
338        let raw_broker_response = RawBrokerResponseWithoutStats {
339            aggregation_results: vec![],
340            selection_results: None,
341            result_table: None,
342            exceptions: vec![PinotException { error_code: 0, message: "msg".to_string() }],
343        };
344        let broker_response: Result<SqlResponse<DataRow>> = Result::from(raw_broker_response);
345        match broker_response.unwrap_err() {
346            Error::PinotExceptions(exceptions) => assert_eq!(
347                exceptions, vec![PinotException { error_code: 0, message: "msg".to_string() }]),
348            _ => panic!("Wrong variant")
349        };
350    }
351
352    #[test]
353    fn sql_response_with_deserializable_struct_converts_from_raw_broker_response() {
354        #[derive(Deserialize, PartialEq, Debug)]
355        struct TestRow {
356            cnt: i64,
357            score: f64,
358        }
359
360        let raw_broker_response = RawBrokerResponse {
361            aggregation_results: vec![],
362            selection_results: None,
363            result_table: Some(RawTable {
364                schema: RawSchema {
365                    column_data_types: vec![LngT, DubT],
366                    column_names: to_string_vec(vec!["cnt", "score"]),
367                },
368                rows: vec![vec![json!(97889), json!(232.1)]],
369            }),
370            exceptions: vec![],
371            trace_info: Default::default(),
372            num_servers_queried: 1,
373            num_servers_responded: 2,
374            num_segments_queried: 3,
375            num_segments_processed: 4,
376            num_segments_matched: 5,
377            num_consuming_segments_queried: 6,
378            num_docs_scanned: 7,
379            num_entries_scanned_in_filter: 8,
380            num_entries_scanned_post_filter: 9,
381            num_groups_limit_reached: false,
382            total_docs: 10,
383            time_used_ms: 11,
384            min_consuming_freshness_time_ms: 12,
385        };
386        let broker_response: SqlResponse<TestRow> = Result::from(raw_broker_response).unwrap();
387
388        assert_eq!(broker_response, SqlResponse {
389            table: Some(Table {
390                schema: Schema {
391                    column_data_types: vec![LngT, DubT],
392                    column_name_to_index: BiMap::from_iter(vec![
393                        ("cnt".to_string(), 0), ("score".to_string(), 1),
394                    ]),
395                },
396                rows: vec![TestRow { cnt: 97889, score: 232.1 }],
397            }),
398            stats: Some(ResponseStats {
399                trace_info: Default::default(),
400                num_servers_queried: 1,
401                num_servers_responded: 2,
402                num_segments_queried: 3,
403                num_segments_processed: 4,
404                num_segments_matched: 5,
405                num_consuming_segments_queried: 6,
406                num_docs_scanned: 7,
407                num_entries_scanned_in_filter: 8,
408                num_entries_scanned_post_filter: 9,
409                num_groups_limit_reached: false,
410                total_docs: 10,
411                time_used_ms: 11,
412                min_consuming_freshness_time_ms: 12,
413            }),
414        });
415    }
416
417    #[test]
418    fn sql_response_with_deserializable_struct_converts_from_raw_broker_response_without_stats() {
419        #[derive(Deserialize, PartialEq, Debug)]
420        struct TestRow {
421            cnt: i64,
422            score: f64,
423        }
424
425        let raw_broker_response = RawBrokerResponseWithoutStats {
426            aggregation_results: vec![],
427            selection_results: None,
428            result_table: Some(RawTable {
429                schema: RawSchema {
430                    column_data_types: vec![LngT, DubT],
431                    column_names: to_string_vec(vec!["cnt", "score"]),
432                },
433                rows: vec![vec![json!(97889), json!(232.1)]],
434            }),
435            exceptions: vec![],
436        };
437        let broker_response: SqlResponse<TestRow> = Result::from(raw_broker_response).unwrap();
438
439        assert_eq!(broker_response, SqlResponse {
440            table: Some(Table {
441                schema: Schema {
442                    column_data_types: vec![LngT, DubT],
443                    column_name_to_index: BiMap::from_iter(vec![
444                        ("cnt".to_string(), 0), ("score".to_string(), 1),
445                    ]),
446                },
447                rows: vec![TestRow { cnt: 97889, score: 232.1 }],
448            }),
449            stats: None,
450        });
451    }
452
453    #[test]
454    fn table_get_row_count_provides_correct_number_of_rows() {
455        assert_eq!(test_table().get_row_count(), 1);
456    }
457
458    #[test]
459    fn table_get_row_provides_correct_row() {
460        assert_eq!(test_table().get_row(0).unwrap(), &test_data_row());
461    }
462
463    #[test]
464    fn table_get_row_returns_error_for_out_of_bounds() {
465        match test_table().get_row(1).unwrap_err() {
466            Error::InvalidResultRowIndex(index) => assert_eq!(index, 1),
467            _ => panic!("Incorrect error kind"),
468        }
469    }
470
471    #[test]
472    fn schema_get_column_count_provides_correct_number_of_columns() {
473        assert_eq!(test_schema().get_column_count(), 2);
474    }
475
476    #[test]
477    fn schema_get_column_name_provides_correct_name() {
478        assert_eq!(test_schema().get_column_name(1).unwrap(), "cnt2");
479    }
480
481    #[test]
482    fn schema_get_column_name_returns_error_for_out_of_bounds() {
483        match test_schema().get_column_name(3).unwrap_err() {
484            Error::InvalidResultColumnIndex(index) => assert_eq!(index, 3),
485            _ => panic!("Incorrect error kind"),
486        }
487    }
488
489    #[test]
490    fn schema_get_column_index_provides_correct_index() {
491        assert_eq!(test_schema().get_column_index("cnt2").unwrap(), 1);
492    }
493
494    #[test]
495    fn schema_get_column_index_returns_error_for_out_of_bounds() {
496        match test_schema().get_column_index("unknown").unwrap_err() {
497            Error::InvalidResultColumnName(name) => assert_eq!(name, "unknown".to_string()),
498            _ => panic!("Incorrect error kind"),
499        }
500    }
501
502    #[test]
503    fn schema_get_column_data_type_provides_correct_date_type() {
504        assert_eq!(test_schema().get_column_data_type(1).unwrap(), IntT);
505    }
506
507    #[test]
508    fn schema_get_column_date_type_returns_error_for_out_of_bounds() {
509        match test_schema().get_column_data_type(3).unwrap_err() {
510            Error::InvalidResultColumnIndex(index) => assert_eq!(index, 3),
511            _ => panic!("Incorrect error kind"),
512        }
513    }
514
515    #[test]
516    fn schema_get_column_data_type_by_name_provides_correct_date_type() {
517        assert_eq!(test_schema().get_column_data_type_by_name("cnt2").unwrap(), IntT);
518    }
519
520    #[test]
521    fn schema_get_column_date_type_by_name_returns_error_for_out_of_bounds() {
522        match test_schema().get_column_data_type_by_name("unknown").unwrap_err() {
523            Error::InvalidResultColumnName(name) => assert_eq!(name, "unknown".to_string()),
524            _ => panic!("Incorrect error kind"),
525        }
526    }
527
528    pub fn test_table() -> Table<DataRow> {
529        Table {
530            schema: test_schema(),
531            rows: vec![test_data_row()],
532        }
533    }
534
535    pub fn test_schema() -> Schema {
536        Schema {
537            column_data_types: vec![LngT, IntT],
538            column_name_to_index: BiMap::from_iter(vec![
539                ("cnt".to_string(), 0),
540                ("cnt2".to_string(), 1),
541            ]),
542        }
543    }
544}