pinot_client_rust/response/
raw.rs

1use std::collections::HashMap;
2
3use serde::Deserialize;
4use serde::Serialize;
5use serde_json::Value;
6
7use crate::errors::{Error, Result};
8use crate::response::{DataType, PinotException};
9
10/// Data structure for a broker response to any query.
11#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
12pub(crate) struct RawBrokerResponse {
13    #[serde(default)]
14    #[serde(rename(deserialize = "aggregationResults"))]
15    pub aggregation_results: Vec<AggregationResult>,
16    #[serde(default)]
17    #[serde(rename(deserialize = "selectionResults"))]
18    pub selection_results: Option<SelectionResults>,
19    #[serde(default)]
20    #[serde(rename(deserialize = "resultTable"))]
21    pub result_table: Option<RawTable>,
22    pub exceptions: Vec<PinotException>,
23    #[serde(default)]
24    #[serde(rename(deserialize = "traceInfo"))]
25    pub trace_info: HashMap<String, String>,
26    #[serde(rename(deserialize = "numServersQueried"))]
27    pub num_servers_queried: i32,
28    #[serde(rename(deserialize = "numServersResponded"))]
29    pub num_servers_responded: i32,
30    #[serde(rename(deserialize = "numSegmentsQueried"))]
31    pub num_segments_queried: i32,
32    #[serde(rename(deserialize = "numSegmentsProcessed"))]
33    pub num_segments_processed: i32,
34    #[serde(rename(deserialize = "numSegmentsMatched"))]
35    pub num_segments_matched: i32,
36    #[serde(rename(deserialize = "numConsumingSegmentsQueried"))]
37    pub num_consuming_segments_queried: i32,
38    #[serde(rename(deserialize = "numDocsScanned"))]
39    pub num_docs_scanned: i64,
40    #[serde(rename(deserialize = "numEntriesScannedInFilter"))]
41    pub num_entries_scanned_in_filter: i64,
42    #[serde(rename(deserialize = "numEntriesScannedPostFilter"))]
43    pub num_entries_scanned_post_filter: i64,
44    #[serde(rename(deserialize = "numGroupsLimitReached"))]
45    pub num_groups_limit_reached: bool,
46    #[serde(rename(deserialize = "totalDocs"))]
47    pub total_docs: i64,
48    #[serde(rename(deserialize = "timeUsedMs"))]
49    pub time_used_ms: i32,
50    #[serde(rename(deserialize = "minConsumingFreshnessTimeMs"))]
51    pub min_consuming_freshness_time_ms: i64,
52}
53
54/// Data structure for a broker response to any query excluding stats information.
55#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
56pub(crate) struct RawBrokerResponseWithoutStats {
57    #[serde(default)]
58    #[serde(rename(deserialize = "aggregationResults"))]
59    pub aggregation_results: Vec<AggregationResult>,
60    #[serde(default)]
61    #[serde(rename(deserialize = "selectionResults"))]
62    pub selection_results: Option<SelectionResults>,
63    #[serde(default)]
64    #[serde(rename(deserialize = "resultTable"))]
65    pub result_table: Option<RawTable>,
66    pub exceptions: Vec<PinotException>,
67}
68
69/// Data structure for PQL aggregation result
70#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
71pub struct AggregationResult {
72    pub function: String,
73    #[serde(default)]
74    pub value: String,
75    #[serde(default)]
76    #[serde(rename(deserialize = "traceInfo"))]
77    pub group_by_columns: Vec<String>,
78    #[serde(default)]
79    #[serde(rename(deserialize = "traceInfo"))]
80    pub group_by_result: Vec<GroupValue>,
81}
82
83/// Data structure for PQL aggregation GroupBy result
84#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
85pub struct GroupValue {
86    pub value: String,
87    pub group: Vec<String>,
88}
89
90/// Data structure for PQL selection result
91#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
92pub struct SelectionResults {
93    columns: Vec<String>,
94    results: Vec<Vec<Value>>,
95}
96
97impl SelectionResults {
98    pub fn new(columns: Vec<String>, results: Vec<Vec<Value>>) -> Self {
99        Self { columns, results }
100    }
101
102    /// Returns how many rows in the ResultTable
103    pub fn get_results_count(&self) -> usize {
104        self.results.len()
105    }
106
107    /// Returns how many columns in the ResultTable
108    pub fn get_column_count(&self) -> usize {
109        self.columns.len()
110    }
111
112    /// Returns column name given column index
113    pub fn get_column_name(&self, column_index: usize) -> Result<&str> {
114        self.columns.get(column_index)
115            .map(|column| column.as_str())
116            .ok_or(Error::InvalidResultColumnIndex(column_index))
117    }
118
119    /// Returns a row given a row index
120    pub fn get_row(&self, row_index: usize) -> Result<&Vec<Value>> {
121        self.results.get(row_index)
122            .ok_or(Error::InvalidResultRowIndex(row_index))
123    }
124
125    /// Returns a json `Value` entry given row index and column index
126    pub fn get_data(&self, row_index: usize, column_index: usize) -> Result<&Value> {
127        self.get_row(row_index)?.get(column_index)
128            .ok_or(Error::InvalidResultColumnIndex(column_index))
129    }
130}
131
132/// Holder for SQL queries.
133#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
134pub(crate) struct RawTable {
135    #[serde(rename(deserialize = "dataSchema"))]
136    pub schema: RawSchema,
137    pub rows: Vec<Vec<Value>>,
138}
139
140/// Schema as returned by pinot
141#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
142pub(crate) struct RawSchema {
143    #[serde(rename(deserialize = "columnDataTypes"))]
144    pub column_data_types: Vec<DataType>,
145    #[serde(rename(deserialize = "columnNames"))]
146    pub column_names: Vec<String>,
147}
148
149#[cfg(test)]
150pub(crate) mod tests {
151    use serde_json::json;
152
153    use crate::response::{
154        DataType::Double as DubT,
155        DataType::Long as LngT,
156        DataType::String as StrT,
157    };
158    use crate::response::tests::{test_broker_response_error_msg, test_error_containing_broker_response};
159    use crate::tests::to_string_vec;
160
161    use super::*;
162
163    #[test]
164    fn pql_response_deserializes_pql_aggregation_query_correctly() {
165        let json: Value = json!({
166            "selectionResults": {
167                "columns": ["cnt", "extra"],
168                "results": [[97889, json!({"a": "b"})]]
169            },
170            "exceptions": [],
171            "numServersQueried": 1,
172            "numServersResponded": 1,
173            "numSegmentsQueried": 1,
174            "numSegmentsProcessed": 1,
175            "numSegmentsMatched": 1,
176            "numConsumingSegmentsQueried": 0,
177            "numDocsScanned": 97889,
178            "numEntriesScannedInFilter": 0,
179            "numEntriesScannedPostFilter": 0,
180            "numGroupsLimitReached": false,
181            "totalDocs": 97889,
182            "timeUsedMs": 5,
183            "segmentStatistics": [],
184            "traceInfo": {},
185            "minConsumingFreshnessTimeMs": 0
186        });
187        let broker_response: RawBrokerResponse = serde_json::from_value(json).unwrap();
188
189        assert_eq!(broker_response, RawBrokerResponse {
190            aggregation_results: vec![],
191            selection_results: Some(SelectionResults::new(
192                to_string_vec(vec!["cnt", "extra"]),
193                vec![vec![json!(97889), json!({"a": "b"})]],
194            )),
195            result_table: None,
196            exceptions: vec![],
197            trace_info: Default::default(),
198            num_servers_queried: 1,
199            num_servers_responded: 1,
200            num_segments_queried: 1,
201            num_segments_processed: 1,
202            num_segments_matched: 1,
203            num_consuming_segments_queried: 0,
204            num_docs_scanned: 97889,
205            num_entries_scanned_in_filter: 0,
206            num_entries_scanned_post_filter: 0,
207            num_groups_limit_reached: false,
208            total_docs: 97889,
209            time_used_ms: 5,
210            min_consuming_freshness_time_ms: 0,
211        });
212    }
213
214    #[test]
215    fn pql_response_deserializes_pql_aggregation_query_without_stats_correctly() {
216        let json: Value = json!({
217            "selectionResults": {
218                "columns": ["cnt", "extra"],
219                "results": [[97889, json!({"a": "b"})]]
220            },
221            "exceptions": [],
222            "numServersQueried": 1,
223            "numServersResponded": 1,
224            "numSegmentsQueried": 1,
225            "numSegmentsProcessed": 1,
226            "numSegmentsMatched": 1,
227            "numConsumingSegmentsQueried": 0,
228            "numDocsScanned": 97889,
229            "numEntriesScannedInFilter": 0,
230            "numEntriesScannedPostFilter": 0,
231            "numGroupsLimitReached": false,
232            "totalDocs": 97889,
233            "timeUsedMs": 5,
234            "segmentStatistics": [],
235            "traceInfo": {},
236            "minConsumingFreshnessTimeMs": 0
237        });
238        let broker_response: RawBrokerResponseWithoutStats = serde_json::from_value(json).unwrap();
239
240        assert_eq!(broker_response, RawBrokerResponseWithoutStats {
241            aggregation_results: vec![],
242            selection_results: Some(SelectionResults::new(
243                to_string_vec(vec!["cnt", "extra"]),
244                vec![vec![json!(97889), json!({"a": "b"})]],
245            )),
246            result_table: None,
247            exceptions: vec![],
248        });
249    }
250
251    #[test]
252    fn pql_response_deserializes_exception_correctly() {
253        let error_message = test_broker_response_error_msg();
254        let json = test_error_containing_broker_response(&error_message);
255        let broker_response: RawBrokerResponse = serde_json::from_value(json).unwrap();
256
257        assert_eq!(broker_response, RawBrokerResponse {
258            aggregation_results: vec![],
259            selection_results: None,
260            result_table: None,
261            exceptions: vec![PinotException {
262                error_code: 200,
263                message: error_message,
264            }],
265            trace_info: Default::default(),
266            num_servers_queried: 1,
267            num_servers_responded: 1,
268            num_segments_queried: 12,
269            num_segments_processed: 0,
270            num_segments_matched: 0,
271            num_consuming_segments_queried: 0,
272            num_docs_scanned: 0,
273            num_entries_scanned_in_filter: 0,
274            num_entries_scanned_post_filter: 0,
275            num_groups_limit_reached: false,
276            total_docs: 97889,
277            time_used_ms: 5,
278            min_consuming_freshness_time_ms: 0,
279        });
280    }
281
282    #[test]
283    fn selection_results_get_row_count_provides_correct_number_of_rows() {
284        assert_eq!(test_selection_results().get_results_count(), 1);
285    }
286
287    #[test]
288    fn selection_results_get_column_count_provides_correct_number_of_columns() {
289        assert_eq!(test_selection_results().get_column_count(), 2);
290    }
291
292    #[test]
293    fn selection_results_get_column_name_provides_correct_name() {
294        assert_eq!(test_selection_results().get_column_name(1).unwrap(), "extra");
295    }
296
297    #[test]
298    fn selection_results_get_column_name_returns_error_for_out_of_bounds() {
299        match test_selection_results().get_column_name(3).unwrap_err() {
300            Error::InvalidResultColumnIndex(index) => assert_eq!(index, 3),
301            _ => panic!("Incorrect error kind"),
302        }
303    }
304
305    #[test]
306    fn selection_results_get_row_provides_correct_row() {
307        assert_eq!(
308            test_selection_results().get_row(0).unwrap(),
309            &vec![json!(48547), json!({"a": "b"})]
310        );
311    }
312
313    #[test]
314    fn selection_results_get_row_returns_error_for_out_of_bounds() {
315        match test_selection_results().get_row(1).unwrap_err() {
316            Error::InvalidResultRowIndex(index) => assert_eq!(index, 1),
317            _ => panic!("Incorrect error kind"),
318        }
319    }
320
321    #[test]
322    fn selection_results_get_data_returns_error_for_out_of_bounds() {
323        match test_selection_results().get_data(1, 0).unwrap_err() {
324            Error::InvalidResultRowIndex(index) => assert_eq!(index, 1),
325            _ => panic!("Incorrect error kind"),
326        }
327        match test_selection_results().get_data(0, 2).unwrap_err() {
328            Error::InvalidResultColumnIndex(index) => assert_eq!(index, 2),
329            _ => panic!("Incorrect error kind"),
330        }
331    }
332
333    #[test]
334    fn selection_results_get_data_provides_correct_data() {
335        assert_eq!(test_selection_results().get_data(0, 0).unwrap(), &json!(48547));
336    }
337
338    #[test]
339    fn sql_response_deserializes_sql_aggregation_query_correctly() {
340        let json: Value = json!({
341            "resultTable": {
342                "dataSchema": {
343                    "columnDataTypes": ["LONG"],
344                    "columnNames": ["cnt"]
345                },
346                "rows": [[97889]]
347            },
348            "exceptions": [],
349            "numServersQueried": 1,
350            "numServersResponded": 1,
351            "numSegmentsQueried": 1,
352            "numSegmentsProcessed": 1,
353            "numSegmentsMatched": 1,
354            "numConsumingSegmentsQueried": 0,
355            "numDocsScanned": 97889,
356            "numEntriesScannedInFilter": 0,
357            "numEntriesScannedPostFilter": 0,
358            "numGroupsLimitReached": false,
359            "totalDocs": 97889,
360            "timeUsedMs": 5,
361            "segmentStatistics": [],
362            "traceInfo": {},
363            "minConsumingFreshnessTimeMs": 0
364        });
365        let broker_response: RawBrokerResponse = serde_json::from_value(json).unwrap();
366
367        assert_eq!(broker_response, RawBrokerResponse {
368            aggregation_results: vec![],
369            selection_results: None,
370            result_table: Some(RawTable {
371                schema: RawSchema {
372                    column_data_types: vec![LngT],
373                    column_names: to_string_vec(vec!["cnt"]),
374                },
375                rows: vec![vec![json!(97889)]],
376            }),
377            exceptions: vec![],
378            trace_info: Default::default(),
379            num_servers_queried: 1,
380            num_servers_responded: 1,
381            num_segments_queried: 1,
382            num_segments_processed: 1,
383            num_segments_matched: 1,
384            num_consuming_segments_queried: 0,
385            num_docs_scanned: 97889,
386            num_entries_scanned_in_filter: 0,
387            num_entries_scanned_post_filter: 0,
388            num_groups_limit_reached: false,
389            total_docs: 97889,
390            time_used_ms: 5,
391            min_consuming_freshness_time_ms: 0,
392        });
393    }
394
395    #[test]
396    fn sql_response_deserializes_sql_aggregation_query_without_stats_correctly() {
397        let json: Value = json!({
398            "resultTable": {
399                "dataSchema": {
400                    "columnDataTypes": ["LONG"],
401                    "columnNames": ["cnt"]
402                },
403                "rows": [[97889]]
404            },
405            "exceptions": [],
406            "numServersQueried": 1,
407            "numServersResponded": 1,
408            "numSegmentsQueried": 1,
409            "numSegmentsProcessed": 1,
410            "numSegmentsMatched": 1,
411            "numConsumingSegmentsQueried": 0,
412            "numDocsScanned": 97889,
413            "numEntriesScannedInFilter": 0,
414            "numEntriesScannedPostFilter": 0,
415            "numGroupsLimitReached": false,
416            "totalDocs": 97889,
417            "timeUsedMs": 5,
418            "segmentStatistics": [],
419            "traceInfo": {},
420            "minConsumingFreshnessTimeMs": 0
421        });
422        let broker_response: RawBrokerResponseWithoutStats = serde_json::from_value(json).unwrap();
423
424        assert_eq!(broker_response, RawBrokerResponseWithoutStats {
425            aggregation_results: vec![],
426            selection_results: None,
427            result_table: Some(RawTable {
428                schema: RawSchema {
429                    column_data_types: vec![LngT],
430                    column_names: to_string_vec(vec!["cnt"]),
431                },
432                rows: vec![vec![json!(97889)]],
433            }),
434            exceptions: vec![],
435        });
436    }
437
438    #[test]
439    fn sql_response_deserializes_aggregation_group_by_response_correctly() {
440        let json: Value = json!({
441            "resultTable": {
442                "dataSchema": {
443                    "columnDataTypes": ["STRING","LONG","DOUBLE"],
444                    "columnNames":["teamID","cnt","sum_homeRuns"]
445                },
446                "rows": [
447                    ["ANA",337,1324.0],
448                    ["BL2",197,136.0],
449                    ["ARI",727,2715.0],
450                    ["BL1",48,24.0],
451                    ["ALT",17,2.0],
452                    ["ATL",1951,7312.0],
453                    ["BFN",122,105.0],
454                    ["BL3",36,32.0],
455                    ["BFP",26,20.0],
456                    ["BAL",2380,9164.0]
457                ]
458            },
459            "exceptions": [],
460            "numServersQueried": 1,
461            "numServersResponded": 1,
462            "numSegmentsQueried": 1,
463            "numSegmentsProcessed": 1,
464            "numSegmentsMatched": 1,
465            "numConsumingSegmentsQueried": 0,
466            "numDocsScanned": 97889,
467            "numEntriesScannedInFilter": 0,
468            "numEntriesScannedPostFilter": 195778,
469            "numGroupsLimitReached": true,
470            "totalDocs": 97889,
471            "timeUsedMs": 24,
472            "segmentStatistics": [],
473            "traceInfo": {},
474            "minConsumingFreshnessTimeMs": 0
475        });
476        let broker_response: RawBrokerResponse = serde_json::from_value(json).unwrap();
477
478        assert_eq!(broker_response, RawBrokerResponse {
479            aggregation_results: vec![],
480            selection_results: None,
481            result_table: Some(RawTable {
482                schema: RawSchema {
483                    column_data_types: vec![StrT, LngT, DubT],
484                    column_names: to_string_vec(vec!["teamID", "cnt", "sum_homeRuns"]),
485                },
486                rows: vec![
487                    vec![json!("ANA"), json!(337), json!(1324.0)],
488                    vec![json!("BL2"), json!(197), json!(136.0)],
489                    vec![json!("ARI"), json!(727), json!(2715.0)],
490                    vec![json!("BL1"), json!(48), json!(24.0)],
491                    vec![json!("ALT"), json!(17), json!(2.0)],
492                    vec![json!("ATL"), json!(1951), json!(7312.0)],
493                    vec![json!("BFN"), json!(122), json!(105.0)],
494                    vec![json!("BL3"), json!(36), json!(32.0)],
495                    vec![json!("BFP"), json!(26), json!(20.0)],
496                    vec![json!("BAL"), json!(2380), json!(9164.0)],
497                ],
498            }),
499            exceptions: vec![],
500            trace_info: Default::default(),
501            num_servers_queried: 1,
502            num_servers_responded: 1,
503            num_segments_queried: 1,
504            num_segments_processed: 1,
505            num_segments_matched: 1,
506            num_consuming_segments_queried: 0,
507            num_docs_scanned: 97889,
508            num_entries_scanned_in_filter: 0,
509            num_entries_scanned_post_filter: 195778,
510            num_groups_limit_reached: true,
511            total_docs: 97889,
512            time_used_ms: 24,
513            min_consuming_freshness_time_ms: 0,
514        });
515    }
516
517    #[test]
518    fn sql_response_deserializes_exception_correctly() {
519        let error_message = test_broker_response_error_msg();
520        let json = test_error_containing_broker_response(&error_message);
521        let broker_response: RawBrokerResponse = serde_json::from_value(json).unwrap();
522
523        assert_eq!(broker_response, RawBrokerResponse {
524            aggregation_results: vec![],
525            selection_results: None,
526            result_table: None,
527            exceptions: vec![PinotException {
528                error_code: 200,
529                message: error_message,
530            }],
531            trace_info: Default::default(),
532            num_servers_queried: 1,
533            num_servers_responded: 1,
534            num_segments_queried: 12,
535            num_segments_processed: 0,
536            num_segments_matched: 0,
537            num_consuming_segments_queried: 0,
538            num_docs_scanned: 0,
539            num_entries_scanned_in_filter: 0,
540            num_entries_scanned_post_filter: 0,
541            num_groups_limit_reached: false,
542            total_docs: 97889,
543            time_used_ms: 5,
544            min_consuming_freshness_time_ms: 0,
545        });
546    }
547
548    pub fn test_selection_results() -> SelectionResults {
549        SelectionResults::new(
550            to_string_vec(vec!["cnt", "extra"]),
551            vec![vec![json!(48547), json!({"a": "b"})]],
552        )
553    }
554}