1use std::collections::HashMap;
2
3use serde::Deserialize;
4use serde::Serialize;
5use serde_json::Value;
6
7use crate::errors::{Error, Result};
8use crate::response::{DataType, PinotException};
9
10#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
12pub(crate) struct RawBrokerResponse {
13 #[serde(default)]
14 #[serde(rename(deserialize = "aggregationResults"))]
15 pub aggregation_results: Vec<AggregationResult>,
16 #[serde(default)]
17 #[serde(rename(deserialize = "selectionResults"))]
18 pub selection_results: Option<SelectionResults>,
19 #[serde(default)]
20 #[serde(rename(deserialize = "resultTable"))]
21 pub result_table: Option<RawTable>,
22 pub exceptions: Vec<PinotException>,
23 #[serde(default)]
24 #[serde(rename(deserialize = "traceInfo"))]
25 pub trace_info: HashMap<String, String>,
26 #[serde(rename(deserialize = "numServersQueried"))]
27 pub num_servers_queried: i32,
28 #[serde(rename(deserialize = "numServersResponded"))]
29 pub num_servers_responded: i32,
30 #[serde(rename(deserialize = "numSegmentsQueried"))]
31 pub num_segments_queried: i32,
32 #[serde(rename(deserialize = "numSegmentsProcessed"))]
33 pub num_segments_processed: i32,
34 #[serde(rename(deserialize = "numSegmentsMatched"))]
35 pub num_segments_matched: i32,
36 #[serde(rename(deserialize = "numConsumingSegmentsQueried"))]
37 pub num_consuming_segments_queried: i32,
38 #[serde(rename(deserialize = "numDocsScanned"))]
39 pub num_docs_scanned: i64,
40 #[serde(rename(deserialize = "numEntriesScannedInFilter"))]
41 pub num_entries_scanned_in_filter: i64,
42 #[serde(rename(deserialize = "numEntriesScannedPostFilter"))]
43 pub num_entries_scanned_post_filter: i64,
44 #[serde(rename(deserialize = "numGroupsLimitReached"))]
45 pub num_groups_limit_reached: bool,
46 #[serde(rename(deserialize = "totalDocs"))]
47 pub total_docs: i64,
48 #[serde(rename(deserialize = "timeUsedMs"))]
49 pub time_used_ms: i32,
50 #[serde(rename(deserialize = "minConsumingFreshnessTimeMs"))]
51 pub min_consuming_freshness_time_ms: i64,
52}
53
54#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
56pub(crate) struct RawBrokerResponseWithoutStats {
57 #[serde(default)]
58 #[serde(rename(deserialize = "aggregationResults"))]
59 pub aggregation_results: Vec<AggregationResult>,
60 #[serde(default)]
61 #[serde(rename(deserialize = "selectionResults"))]
62 pub selection_results: Option<SelectionResults>,
63 #[serde(default)]
64 #[serde(rename(deserialize = "resultTable"))]
65 pub result_table: Option<RawTable>,
66 pub exceptions: Vec<PinotException>,
67}
68
69#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
71pub struct AggregationResult {
72 pub function: String,
73 #[serde(default)]
74 pub value: String,
75 #[serde(default)]
76 #[serde(rename(deserialize = "traceInfo"))]
77 pub group_by_columns: Vec<String>,
78 #[serde(default)]
79 #[serde(rename(deserialize = "traceInfo"))]
80 pub group_by_result: Vec<GroupValue>,
81}
82
83#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
85pub struct GroupValue {
86 pub value: String,
87 pub group: Vec<String>,
88}
89
90#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
92pub struct SelectionResults {
93 columns: Vec<String>,
94 results: Vec<Vec<Value>>,
95}
96
97impl SelectionResults {
98 pub fn new(columns: Vec<String>, results: Vec<Vec<Value>>) -> Self {
99 Self { columns, results }
100 }
101
102 pub fn get_results_count(&self) -> usize {
104 self.results.len()
105 }
106
107 pub fn get_column_count(&self) -> usize {
109 self.columns.len()
110 }
111
112 pub fn get_column_name(&self, column_index: usize) -> Result<&str> {
114 self.columns.get(column_index)
115 .map(|column| column.as_str())
116 .ok_or(Error::InvalidResultColumnIndex(column_index))
117 }
118
119 pub fn get_row(&self, row_index: usize) -> Result<&Vec<Value>> {
121 self.results.get(row_index)
122 .ok_or(Error::InvalidResultRowIndex(row_index))
123 }
124
125 pub fn get_data(&self, row_index: usize, column_index: usize) -> Result<&Value> {
127 self.get_row(row_index)?.get(column_index)
128 .ok_or(Error::InvalidResultColumnIndex(column_index))
129 }
130}
131
132#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
134pub(crate) struct RawTable {
135 #[serde(rename(deserialize = "dataSchema"))]
136 pub schema: RawSchema,
137 pub rows: Vec<Vec<Value>>,
138}
139
140#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
142pub(crate) struct RawSchema {
143 #[serde(rename(deserialize = "columnDataTypes"))]
144 pub column_data_types: Vec<DataType>,
145 #[serde(rename(deserialize = "columnNames"))]
146 pub column_names: Vec<String>,
147}
148
149#[cfg(test)]
150pub(crate) mod tests {
151 use serde_json::json;
152
153 use crate::response::{
154 DataType::Double as DubT,
155 DataType::Long as LngT,
156 DataType::String as StrT,
157 };
158 use crate::response::tests::{test_broker_response_error_msg, test_error_containing_broker_response};
159 use crate::tests::to_string_vec;
160
161 use super::*;
162
163 #[test]
164 fn pql_response_deserializes_pql_aggregation_query_correctly() {
165 let json: Value = json!({
166 "selectionResults": {
167 "columns": ["cnt", "extra"],
168 "results": [[97889, json!({"a": "b"})]]
169 },
170 "exceptions": [],
171 "numServersQueried": 1,
172 "numServersResponded": 1,
173 "numSegmentsQueried": 1,
174 "numSegmentsProcessed": 1,
175 "numSegmentsMatched": 1,
176 "numConsumingSegmentsQueried": 0,
177 "numDocsScanned": 97889,
178 "numEntriesScannedInFilter": 0,
179 "numEntriesScannedPostFilter": 0,
180 "numGroupsLimitReached": false,
181 "totalDocs": 97889,
182 "timeUsedMs": 5,
183 "segmentStatistics": [],
184 "traceInfo": {},
185 "minConsumingFreshnessTimeMs": 0
186 });
187 let broker_response: RawBrokerResponse = serde_json::from_value(json).unwrap();
188
189 assert_eq!(broker_response, RawBrokerResponse {
190 aggregation_results: vec![],
191 selection_results: Some(SelectionResults::new(
192 to_string_vec(vec!["cnt", "extra"]),
193 vec![vec![json!(97889), json!({"a": "b"})]],
194 )),
195 result_table: None,
196 exceptions: vec![],
197 trace_info: Default::default(),
198 num_servers_queried: 1,
199 num_servers_responded: 1,
200 num_segments_queried: 1,
201 num_segments_processed: 1,
202 num_segments_matched: 1,
203 num_consuming_segments_queried: 0,
204 num_docs_scanned: 97889,
205 num_entries_scanned_in_filter: 0,
206 num_entries_scanned_post_filter: 0,
207 num_groups_limit_reached: false,
208 total_docs: 97889,
209 time_used_ms: 5,
210 min_consuming_freshness_time_ms: 0,
211 });
212 }
213
214 #[test]
215 fn pql_response_deserializes_pql_aggregation_query_without_stats_correctly() {
216 let json: Value = json!({
217 "selectionResults": {
218 "columns": ["cnt", "extra"],
219 "results": [[97889, json!({"a": "b"})]]
220 },
221 "exceptions": [],
222 "numServersQueried": 1,
223 "numServersResponded": 1,
224 "numSegmentsQueried": 1,
225 "numSegmentsProcessed": 1,
226 "numSegmentsMatched": 1,
227 "numConsumingSegmentsQueried": 0,
228 "numDocsScanned": 97889,
229 "numEntriesScannedInFilter": 0,
230 "numEntriesScannedPostFilter": 0,
231 "numGroupsLimitReached": false,
232 "totalDocs": 97889,
233 "timeUsedMs": 5,
234 "segmentStatistics": [],
235 "traceInfo": {},
236 "minConsumingFreshnessTimeMs": 0
237 });
238 let broker_response: RawBrokerResponseWithoutStats = serde_json::from_value(json).unwrap();
239
240 assert_eq!(broker_response, RawBrokerResponseWithoutStats {
241 aggregation_results: vec![],
242 selection_results: Some(SelectionResults::new(
243 to_string_vec(vec!["cnt", "extra"]),
244 vec![vec![json!(97889), json!({"a": "b"})]],
245 )),
246 result_table: None,
247 exceptions: vec![],
248 });
249 }
250
251 #[test]
252 fn pql_response_deserializes_exception_correctly() {
253 let error_message = test_broker_response_error_msg();
254 let json = test_error_containing_broker_response(&error_message);
255 let broker_response: RawBrokerResponse = serde_json::from_value(json).unwrap();
256
257 assert_eq!(broker_response, RawBrokerResponse {
258 aggregation_results: vec![],
259 selection_results: None,
260 result_table: None,
261 exceptions: vec![PinotException {
262 error_code: 200,
263 message: error_message,
264 }],
265 trace_info: Default::default(),
266 num_servers_queried: 1,
267 num_servers_responded: 1,
268 num_segments_queried: 12,
269 num_segments_processed: 0,
270 num_segments_matched: 0,
271 num_consuming_segments_queried: 0,
272 num_docs_scanned: 0,
273 num_entries_scanned_in_filter: 0,
274 num_entries_scanned_post_filter: 0,
275 num_groups_limit_reached: false,
276 total_docs: 97889,
277 time_used_ms: 5,
278 min_consuming_freshness_time_ms: 0,
279 });
280 }
281
282 #[test]
283 fn selection_results_get_row_count_provides_correct_number_of_rows() {
284 assert_eq!(test_selection_results().get_results_count(), 1);
285 }
286
287 #[test]
288 fn selection_results_get_column_count_provides_correct_number_of_columns() {
289 assert_eq!(test_selection_results().get_column_count(), 2);
290 }
291
292 #[test]
293 fn selection_results_get_column_name_provides_correct_name() {
294 assert_eq!(test_selection_results().get_column_name(1).unwrap(), "extra");
295 }
296
297 #[test]
298 fn selection_results_get_column_name_returns_error_for_out_of_bounds() {
299 match test_selection_results().get_column_name(3).unwrap_err() {
300 Error::InvalidResultColumnIndex(index) => assert_eq!(index, 3),
301 _ => panic!("Incorrect error kind"),
302 }
303 }
304
305 #[test]
306 fn selection_results_get_row_provides_correct_row() {
307 assert_eq!(
308 test_selection_results().get_row(0).unwrap(),
309 &vec![json!(48547), json!({"a": "b"})]
310 );
311 }
312
313 #[test]
314 fn selection_results_get_row_returns_error_for_out_of_bounds() {
315 match test_selection_results().get_row(1).unwrap_err() {
316 Error::InvalidResultRowIndex(index) => assert_eq!(index, 1),
317 _ => panic!("Incorrect error kind"),
318 }
319 }
320
321 #[test]
322 fn selection_results_get_data_returns_error_for_out_of_bounds() {
323 match test_selection_results().get_data(1, 0).unwrap_err() {
324 Error::InvalidResultRowIndex(index) => assert_eq!(index, 1),
325 _ => panic!("Incorrect error kind"),
326 }
327 match test_selection_results().get_data(0, 2).unwrap_err() {
328 Error::InvalidResultColumnIndex(index) => assert_eq!(index, 2),
329 _ => panic!("Incorrect error kind"),
330 }
331 }
332
333 #[test]
334 fn selection_results_get_data_provides_correct_data() {
335 assert_eq!(test_selection_results().get_data(0, 0).unwrap(), &json!(48547));
336 }
337
338 #[test]
339 fn sql_response_deserializes_sql_aggregation_query_correctly() {
340 let json: Value = json!({
341 "resultTable": {
342 "dataSchema": {
343 "columnDataTypes": ["LONG"],
344 "columnNames": ["cnt"]
345 },
346 "rows": [[97889]]
347 },
348 "exceptions": [],
349 "numServersQueried": 1,
350 "numServersResponded": 1,
351 "numSegmentsQueried": 1,
352 "numSegmentsProcessed": 1,
353 "numSegmentsMatched": 1,
354 "numConsumingSegmentsQueried": 0,
355 "numDocsScanned": 97889,
356 "numEntriesScannedInFilter": 0,
357 "numEntriesScannedPostFilter": 0,
358 "numGroupsLimitReached": false,
359 "totalDocs": 97889,
360 "timeUsedMs": 5,
361 "segmentStatistics": [],
362 "traceInfo": {},
363 "minConsumingFreshnessTimeMs": 0
364 });
365 let broker_response: RawBrokerResponse = serde_json::from_value(json).unwrap();
366
367 assert_eq!(broker_response, RawBrokerResponse {
368 aggregation_results: vec![],
369 selection_results: None,
370 result_table: Some(RawTable {
371 schema: RawSchema {
372 column_data_types: vec![LngT],
373 column_names: to_string_vec(vec!["cnt"]),
374 },
375 rows: vec![vec![json!(97889)]],
376 }),
377 exceptions: vec![],
378 trace_info: Default::default(),
379 num_servers_queried: 1,
380 num_servers_responded: 1,
381 num_segments_queried: 1,
382 num_segments_processed: 1,
383 num_segments_matched: 1,
384 num_consuming_segments_queried: 0,
385 num_docs_scanned: 97889,
386 num_entries_scanned_in_filter: 0,
387 num_entries_scanned_post_filter: 0,
388 num_groups_limit_reached: false,
389 total_docs: 97889,
390 time_used_ms: 5,
391 min_consuming_freshness_time_ms: 0,
392 });
393 }
394
395 #[test]
396 fn sql_response_deserializes_sql_aggregation_query_without_stats_correctly() {
397 let json: Value = json!({
398 "resultTable": {
399 "dataSchema": {
400 "columnDataTypes": ["LONG"],
401 "columnNames": ["cnt"]
402 },
403 "rows": [[97889]]
404 },
405 "exceptions": [],
406 "numServersQueried": 1,
407 "numServersResponded": 1,
408 "numSegmentsQueried": 1,
409 "numSegmentsProcessed": 1,
410 "numSegmentsMatched": 1,
411 "numConsumingSegmentsQueried": 0,
412 "numDocsScanned": 97889,
413 "numEntriesScannedInFilter": 0,
414 "numEntriesScannedPostFilter": 0,
415 "numGroupsLimitReached": false,
416 "totalDocs": 97889,
417 "timeUsedMs": 5,
418 "segmentStatistics": [],
419 "traceInfo": {},
420 "minConsumingFreshnessTimeMs": 0
421 });
422 let broker_response: RawBrokerResponseWithoutStats = serde_json::from_value(json).unwrap();
423
424 assert_eq!(broker_response, RawBrokerResponseWithoutStats {
425 aggregation_results: vec![],
426 selection_results: None,
427 result_table: Some(RawTable {
428 schema: RawSchema {
429 column_data_types: vec![LngT],
430 column_names: to_string_vec(vec!["cnt"]),
431 },
432 rows: vec![vec![json!(97889)]],
433 }),
434 exceptions: vec![],
435 });
436 }
437
438 #[test]
439 fn sql_response_deserializes_aggregation_group_by_response_correctly() {
440 let json: Value = json!({
441 "resultTable": {
442 "dataSchema": {
443 "columnDataTypes": ["STRING","LONG","DOUBLE"],
444 "columnNames":["teamID","cnt","sum_homeRuns"]
445 },
446 "rows": [
447 ["ANA",337,1324.0],
448 ["BL2",197,136.0],
449 ["ARI",727,2715.0],
450 ["BL1",48,24.0],
451 ["ALT",17,2.0],
452 ["ATL",1951,7312.0],
453 ["BFN",122,105.0],
454 ["BL3",36,32.0],
455 ["BFP",26,20.0],
456 ["BAL",2380,9164.0]
457 ]
458 },
459 "exceptions": [],
460 "numServersQueried": 1,
461 "numServersResponded": 1,
462 "numSegmentsQueried": 1,
463 "numSegmentsProcessed": 1,
464 "numSegmentsMatched": 1,
465 "numConsumingSegmentsQueried": 0,
466 "numDocsScanned": 97889,
467 "numEntriesScannedInFilter": 0,
468 "numEntriesScannedPostFilter": 195778,
469 "numGroupsLimitReached": true,
470 "totalDocs": 97889,
471 "timeUsedMs": 24,
472 "segmentStatistics": [],
473 "traceInfo": {},
474 "minConsumingFreshnessTimeMs": 0
475 });
476 let broker_response: RawBrokerResponse = serde_json::from_value(json).unwrap();
477
478 assert_eq!(broker_response, RawBrokerResponse {
479 aggregation_results: vec![],
480 selection_results: None,
481 result_table: Some(RawTable {
482 schema: RawSchema {
483 column_data_types: vec![StrT, LngT, DubT],
484 column_names: to_string_vec(vec!["teamID", "cnt", "sum_homeRuns"]),
485 },
486 rows: vec![
487 vec![json!("ANA"), json!(337), json!(1324.0)],
488 vec![json!("BL2"), json!(197), json!(136.0)],
489 vec![json!("ARI"), json!(727), json!(2715.0)],
490 vec![json!("BL1"), json!(48), json!(24.0)],
491 vec![json!("ALT"), json!(17), json!(2.0)],
492 vec![json!("ATL"), json!(1951), json!(7312.0)],
493 vec![json!("BFN"), json!(122), json!(105.0)],
494 vec![json!("BL3"), json!(36), json!(32.0)],
495 vec![json!("BFP"), json!(26), json!(20.0)],
496 vec![json!("BAL"), json!(2380), json!(9164.0)],
497 ],
498 }),
499 exceptions: vec![],
500 trace_info: Default::default(),
501 num_servers_queried: 1,
502 num_servers_responded: 1,
503 num_segments_queried: 1,
504 num_segments_processed: 1,
505 num_segments_matched: 1,
506 num_consuming_segments_queried: 0,
507 num_docs_scanned: 97889,
508 num_entries_scanned_in_filter: 0,
509 num_entries_scanned_post_filter: 195778,
510 num_groups_limit_reached: true,
511 total_docs: 97889,
512 time_used_ms: 24,
513 min_consuming_freshness_time_ms: 0,
514 });
515 }
516
517 #[test]
518 fn sql_response_deserializes_exception_correctly() {
519 let error_message = test_broker_response_error_msg();
520 let json = test_error_containing_broker_response(&error_message);
521 let broker_response: RawBrokerResponse = serde_json::from_value(json).unwrap();
522
523 assert_eq!(broker_response, RawBrokerResponse {
524 aggregation_results: vec![],
525 selection_results: None,
526 result_table: None,
527 exceptions: vec![PinotException {
528 error_code: 200,
529 message: error_message,
530 }],
531 trace_info: Default::default(),
532 num_servers_queried: 1,
533 num_servers_responded: 1,
534 num_segments_queried: 12,
535 num_segments_processed: 0,
536 num_segments_matched: 0,
537 num_consuming_segments_queried: 0,
538 num_docs_scanned: 0,
539 num_entries_scanned_in_filter: 0,
540 num_entries_scanned_post_filter: 0,
541 num_groups_limit_reached: false,
542 total_docs: 97889,
543 time_used_ms: 5,
544 min_consuming_freshness_time_ms: 0,
545 });
546 }
547
548 pub fn test_selection_results() -> SelectionResults {
549 SelectionResults::new(
550 to_string_vec(vec!["cnt", "extra"]),
551 vec![vec![json!(48547), json!({"a": "b"})]],
552 )
553 }
554}