1use bimap::BiMap;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4
5use crate::errors::{Error, Result};
6use crate::response::{DataType, ResponseStats};
7use crate::response::raw::{RawBrokerResponse, RawBrokerResponseWithoutStats, RawSchema, RawTable};
8
9#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
11pub struct SqlResponse<T: FromRow> {
12 pub table: Option<Table<T>>,
13 pub stats: Option<ResponseStats>,
14}
15
16impl<T: FromRow> From<RawBrokerResponse> for Result<SqlResponse<T>> {
17 fn from(raw: RawBrokerResponse) -> Self {
18 if !raw.exceptions.is_empty() {
19 return Err(Error::PinotExceptions(raw.exceptions));
20 };
21
22 let table: Option<Table<T>> = match raw.result_table {
23 None => None,
24 Some(raw) => Some(Result::from(raw)?),
25 };
26 Ok(SqlResponse {
27 table,
28 stats: Some(ResponseStats {
29 trace_info: raw.trace_info,
30 num_servers_queried: raw.num_servers_queried,
31 num_servers_responded: raw.num_servers_responded,
32 num_segments_queried: raw.num_segments_queried,
33 num_segments_processed: raw.num_segments_processed,
34 num_segments_matched: raw.num_segments_matched,
35 num_consuming_segments_queried: raw.num_consuming_segments_queried,
36 num_docs_scanned: raw.num_docs_scanned,
37 num_entries_scanned_in_filter: raw.num_entries_scanned_in_filter,
38 num_entries_scanned_post_filter: raw.num_entries_scanned_post_filter,
39 num_groups_limit_reached: raw.num_groups_limit_reached,
40 total_docs: raw.total_docs,
41 time_used_ms: raw.time_used_ms,
42 min_consuming_freshness_time_ms: raw.min_consuming_freshness_time_ms,
43 }),
44 })
45 }
46}
47
48impl<T: FromRow> From<RawBrokerResponseWithoutStats> for Result<SqlResponse<T>> {
49 fn from(raw: RawBrokerResponseWithoutStats) -> Self {
50 if !raw.exceptions.is_empty() {
51 return Err(Error::PinotExceptions(raw.exceptions));
52 };
53
54 let table: Option<Table<T>> = match raw.result_table {
55 None => None,
56 Some(raw) => Some(Result::from(raw)?),
57 };
58 Ok(SqlResponse {
59 table,
60 stats: None,
61 })
62 }
63}
64
65pub trait FromRow: Sized {
68 fn from_row(
69 data_schema: &Schema,
70 row: Vec<Value>,
71 ) -> std::result::Result<Self, serde_json::Error>;
72}
73
74#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
76pub struct Table<T: FromRow> {
77 schema: Schema,
78 rows: Vec<T>,
79}
80
81impl<T: FromRow> Table<T> {
82 pub fn new(
83 schema: Schema,
84 rows: Vec<T>,
85 ) -> Self {
86 Table { schema, rows }
87 }
88
89 pub fn get_schema(&self) -> &Schema {
91 &self.schema
92 }
93
94 pub fn get_row_count(&self) -> usize {
96 self.rows.len()
97 }
98
99 pub fn get_row(&self, row_index: usize) -> Result<&T> {
101 self.rows.get(row_index).ok_or(Error::InvalidResultRowIndex(row_index))
102 }
103
104 pub fn into_rows(self) -> Vec<T> {
106 self.rows
107 }
108
109 pub fn into_schema_and_rows(self) -> (Schema, Vec<T>) {
111 (self.schema, self.rows)
112 }
113}
114
115impl<T: FromRow> From<RawTable> for Result<Table<T>> {
116 fn from(raw: RawTable) -> Self {
117 let schema: Schema = raw.schema.into();
118 let rows = raw.rows
119 .into_iter()
120 .map(|row| T::from_row(&schema, row))
121 .collect::<std::result::Result<Vec<T>, serde_json::Error>>()?;
122 Ok(Table::new(schema, rows))
123 }
124}
125
126#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
128pub struct Schema {
129 column_data_types: Vec<DataType>,
130 column_name_to_index: bimap::BiMap::<String, usize>,
131}
132
133impl Schema {
134 pub fn new(
135 column_data_types: Vec<DataType>,
136 column_name_to_index: bimap::BiMap::<String, usize>,
137 ) -> Self {
138 Self { column_data_types, column_name_to_index }
139 }
140
141 pub fn get_column_count(&self) -> usize {
143 self.column_data_types.len()
144 }
145
146 pub fn get_column_name(&self, column_index: usize) -> Result<&str> {
148 self.column_name_to_index.get_by_right(&column_index)
149 .map(|column| column.as_str())
150 .ok_or(Error::InvalidResultColumnIndex(column_index))
151 }
152
153 pub fn get_column_index(&self, column_name: &str) -> Result<usize> {
155 self.column_name_to_index.get_by_left(column_name)
156 .copied()
157 .ok_or_else(|| Error::InvalidResultColumnName(column_name.to_string()))
158 }
159
160 pub fn get_column_data_type(&self, column_index: usize) -> Result<DataType> {
162 self.column_data_types.get(column_index)
163 .copied()
164 .ok_or(Error::InvalidResultColumnIndex(column_index))
165 }
166
167 pub fn get_column_data_type_by_name(&self, column_name: &str) -> Result<DataType> {
169 let column_index = self.get_column_index(column_name)?;
170 self.get_column_data_type(column_index)
171 }
172
173 pub fn get_colum_data_types(&self) -> &[DataType] {
175 &self.column_data_types
176 }
177
178 pub fn get_column_name_to_index_map(&self) -> &BiMap<String, usize> {
180 &self.column_name_to_index
181 }
182
183 pub fn into_data_types_and_name_to_index_map(self) -> (Vec<DataType>, BiMap<String, usize>) {
184 (self.column_data_types, self.column_name_to_index)
185 }
186}
187
188impl From<RawSchema> for Schema {
189 fn from(raw_schema: RawSchema) -> Self {
190 let column_data_types = raw_schema.column_data_types;
191 let mut column_name_to_index: BiMap::<String, usize> = BiMap::with_capacity(
192 raw_schema.column_names.len());
193 for (index, column_name) in raw_schema.column_names.into_iter().enumerate() {
194 column_name_to_index.insert(column_name, index);
195 }
196 Schema { column_data_types, column_name_to_index }
197 }
198}
199
200#[cfg(test)]
201pub(crate) mod tests {
202 use std::iter::FromIterator;
203
204 use serde::Deserialize;
205 use serde_json::json;
206
207 use crate::response::{DataType::Double as DubT, DataType::Int as IntT, DataType::Long as LngT, PinotException};
208 use crate::response::data::{
209 Data::Double as DubD,
210 Data::Long as LngD,
211 };
212 use crate::response::data::DataRow;
213 use crate::response::data::tests::test_data_row;
214 use crate::tests::to_string_vec;
215
216 use super::*;
217
218 #[test]
219 fn sql_response_with_pinot_data_types_converts_from_raw_broker_response() {
220 let raw_broker_response = RawBrokerResponse {
221 aggregation_results: vec![],
222 selection_results: None,
223 result_table: Some(RawTable {
224 schema: RawSchema {
225 column_data_types: vec![LngT, DubT],
226 column_names: to_string_vec(vec!["cnt", "score"]),
227 },
228 rows: vec![vec![json!(97889), json!(232.1)]],
229 }),
230 exceptions: vec![],
231 trace_info: Default::default(),
232 num_servers_queried: 1,
233 num_servers_responded: 2,
234 num_segments_queried: 3,
235 num_segments_processed: 4,
236 num_segments_matched: 5,
237 num_consuming_segments_queried: 6,
238 num_docs_scanned: 7,
239 num_entries_scanned_in_filter: 8,
240 num_entries_scanned_post_filter: 9,
241 num_groups_limit_reached: false,
242 total_docs: 10,
243 time_used_ms: 11,
244 min_consuming_freshness_time_ms: 12,
245 };
246 let broker_response: SqlResponse<DataRow> = Result::from(raw_broker_response).unwrap();
247 assert_eq!(broker_response, SqlResponse {
248 table: Some(Table {
249 schema: Schema {
250 column_data_types: vec![LngT, DubT],
251 column_name_to_index: BiMap::from_iter(vec![
252 ("cnt".to_string(), 0), ("score".to_string(), 1),
253 ]),
254 },
255 rows: vec![DataRow::new(vec![LngD(97889), DubD(232.1)])],
256 }),
257 stats: Some(ResponseStats {
258 trace_info: Default::default(),
259 num_servers_queried: 1,
260 num_servers_responded: 2,
261 num_segments_queried: 3,
262 num_segments_processed: 4,
263 num_segments_matched: 5,
264 num_consuming_segments_queried: 6,
265 num_docs_scanned: 7,
266 num_entries_scanned_in_filter: 8,
267 num_entries_scanned_post_filter: 9,
268 num_groups_limit_reached: false,
269 total_docs: 10,
270 time_used_ms: 11,
271 min_consuming_freshness_time_ms: 12,
272 }),
273 });
274 }
275
276 #[test]
277 fn sql_response_with_pinot_data_types_converts_from_raw_broker_response_without_stats() {
278 let raw_broker_response = RawBrokerResponseWithoutStats {
279 aggregation_results: vec![],
280 selection_results: None,
281 result_table: Some(RawTable {
282 schema: RawSchema {
283 column_data_types: vec![LngT, DubT],
284 column_names: to_string_vec(vec!["cnt", "score"]),
285 },
286 rows: vec![vec![json!(97889), json!(232.1)]],
287 }),
288 exceptions: vec![],
289 };
290 let broker_response: SqlResponse<DataRow> = Result::from(raw_broker_response).unwrap();
291
292 assert_eq!(broker_response, SqlResponse {
293 table: Some(Table {
294 schema: Schema {
295 column_data_types: vec![LngT, DubT],
296 column_name_to_index: BiMap::from_iter(vec![
297 ("cnt".to_string(), 0), ("score".to_string(), 1),
298 ]),
299 },
300 rows: vec![DataRow::new(vec![LngD(97889), DubD(232.1)])],
301 }),
302 stats: None,
303 });
304 }
305
306 #[test]
307 fn pql_response_deserializes_exceptions_correctly() {
308 let raw_broker_response = RawBrokerResponse {
309 aggregation_results: vec![],
310 selection_results: None,
311 result_table: None,
312 exceptions: vec![PinotException { error_code: 0, message: "msg".to_string() }],
313 trace_info: Default::default(),
314 num_servers_queried: 1,
315 num_servers_responded: 2,
316 num_segments_queried: 3,
317 num_segments_processed: 4,
318 num_segments_matched: 5,
319 num_consuming_segments_queried: 6,
320 num_docs_scanned: 7,
321 num_entries_scanned_in_filter: 8,
322 num_entries_scanned_post_filter: 9,
323 num_groups_limit_reached: false,
324 total_docs: 10,
325 time_used_ms: 11,
326 min_consuming_freshness_time_ms: 12,
327 };
328 let broker_response: Result<SqlResponse<DataRow>> = Result::from(raw_broker_response);
329 match broker_response.unwrap_err() {
330 Error::PinotExceptions(exceptions) => assert_eq!(
331 exceptions, vec![PinotException { error_code: 0, message: "msg".to_string() }]),
332 _ => panic!("Wrong variant")
333 };
334 }
335
336 #[test]
337 fn pql_response_deserializes_exceptions_without_stats_correctly() {
338 let raw_broker_response = RawBrokerResponseWithoutStats {
339 aggregation_results: vec![],
340 selection_results: None,
341 result_table: None,
342 exceptions: vec![PinotException { error_code: 0, message: "msg".to_string() }],
343 };
344 let broker_response: Result<SqlResponse<DataRow>> = Result::from(raw_broker_response);
345 match broker_response.unwrap_err() {
346 Error::PinotExceptions(exceptions) => assert_eq!(
347 exceptions, vec![PinotException { error_code: 0, message: "msg".to_string() }]),
348 _ => panic!("Wrong variant")
349 };
350 }
351
352 #[test]
353 fn sql_response_with_deserializable_struct_converts_from_raw_broker_response() {
354 #[derive(Deserialize, PartialEq, Debug)]
355 struct TestRow {
356 cnt: i64,
357 score: f64,
358 }
359
360 let raw_broker_response = RawBrokerResponse {
361 aggregation_results: vec![],
362 selection_results: None,
363 result_table: Some(RawTable {
364 schema: RawSchema {
365 column_data_types: vec![LngT, DubT],
366 column_names: to_string_vec(vec!["cnt", "score"]),
367 },
368 rows: vec![vec![json!(97889), json!(232.1)]],
369 }),
370 exceptions: vec![],
371 trace_info: Default::default(),
372 num_servers_queried: 1,
373 num_servers_responded: 2,
374 num_segments_queried: 3,
375 num_segments_processed: 4,
376 num_segments_matched: 5,
377 num_consuming_segments_queried: 6,
378 num_docs_scanned: 7,
379 num_entries_scanned_in_filter: 8,
380 num_entries_scanned_post_filter: 9,
381 num_groups_limit_reached: false,
382 total_docs: 10,
383 time_used_ms: 11,
384 min_consuming_freshness_time_ms: 12,
385 };
386 let broker_response: SqlResponse<TestRow> = Result::from(raw_broker_response).unwrap();
387
388 assert_eq!(broker_response, SqlResponse {
389 table: Some(Table {
390 schema: Schema {
391 column_data_types: vec![LngT, DubT],
392 column_name_to_index: BiMap::from_iter(vec![
393 ("cnt".to_string(), 0), ("score".to_string(), 1),
394 ]),
395 },
396 rows: vec![TestRow { cnt: 97889, score: 232.1 }],
397 }),
398 stats: Some(ResponseStats {
399 trace_info: Default::default(),
400 num_servers_queried: 1,
401 num_servers_responded: 2,
402 num_segments_queried: 3,
403 num_segments_processed: 4,
404 num_segments_matched: 5,
405 num_consuming_segments_queried: 6,
406 num_docs_scanned: 7,
407 num_entries_scanned_in_filter: 8,
408 num_entries_scanned_post_filter: 9,
409 num_groups_limit_reached: false,
410 total_docs: 10,
411 time_used_ms: 11,
412 min_consuming_freshness_time_ms: 12,
413 }),
414 });
415 }
416
417 #[test]
418 fn sql_response_with_deserializable_struct_converts_from_raw_broker_response_without_stats() {
419 #[derive(Deserialize, PartialEq, Debug)]
420 struct TestRow {
421 cnt: i64,
422 score: f64,
423 }
424
425 let raw_broker_response = RawBrokerResponseWithoutStats {
426 aggregation_results: vec![],
427 selection_results: None,
428 result_table: Some(RawTable {
429 schema: RawSchema {
430 column_data_types: vec![LngT, DubT],
431 column_names: to_string_vec(vec!["cnt", "score"]),
432 },
433 rows: vec![vec![json!(97889), json!(232.1)]],
434 }),
435 exceptions: vec![],
436 };
437 let broker_response: SqlResponse<TestRow> = Result::from(raw_broker_response).unwrap();
438
439 assert_eq!(broker_response, SqlResponse {
440 table: Some(Table {
441 schema: Schema {
442 column_data_types: vec![LngT, DubT],
443 column_name_to_index: BiMap::from_iter(vec![
444 ("cnt".to_string(), 0), ("score".to_string(), 1),
445 ]),
446 },
447 rows: vec![TestRow { cnt: 97889, score: 232.1 }],
448 }),
449 stats: None,
450 });
451 }
452
453 #[test]
454 fn table_get_row_count_provides_correct_number_of_rows() {
455 assert_eq!(test_table().get_row_count(), 1);
456 }
457
458 #[test]
459 fn table_get_row_provides_correct_row() {
460 assert_eq!(test_table().get_row(0).unwrap(), &test_data_row());
461 }
462
463 #[test]
464 fn table_get_row_returns_error_for_out_of_bounds() {
465 match test_table().get_row(1).unwrap_err() {
466 Error::InvalidResultRowIndex(index) => assert_eq!(index, 1),
467 _ => panic!("Incorrect error kind"),
468 }
469 }
470
471 #[test]
472 fn schema_get_column_count_provides_correct_number_of_columns() {
473 assert_eq!(test_schema().get_column_count(), 2);
474 }
475
476 #[test]
477 fn schema_get_column_name_provides_correct_name() {
478 assert_eq!(test_schema().get_column_name(1).unwrap(), "cnt2");
479 }
480
481 #[test]
482 fn schema_get_column_name_returns_error_for_out_of_bounds() {
483 match test_schema().get_column_name(3).unwrap_err() {
484 Error::InvalidResultColumnIndex(index) => assert_eq!(index, 3),
485 _ => panic!("Incorrect error kind"),
486 }
487 }
488
489 #[test]
490 fn schema_get_column_index_provides_correct_index() {
491 assert_eq!(test_schema().get_column_index("cnt2").unwrap(), 1);
492 }
493
494 #[test]
495 fn schema_get_column_index_returns_error_for_out_of_bounds() {
496 match test_schema().get_column_index("unknown").unwrap_err() {
497 Error::InvalidResultColumnName(name) => assert_eq!(name, "unknown".to_string()),
498 _ => panic!("Incorrect error kind"),
499 }
500 }
501
502 #[test]
503 fn schema_get_column_data_type_provides_correct_date_type() {
504 assert_eq!(test_schema().get_column_data_type(1).unwrap(), IntT);
505 }
506
507 #[test]
508 fn schema_get_column_date_type_returns_error_for_out_of_bounds() {
509 match test_schema().get_column_data_type(3).unwrap_err() {
510 Error::InvalidResultColumnIndex(index) => assert_eq!(index, 3),
511 _ => panic!("Incorrect error kind"),
512 }
513 }
514
515 #[test]
516 fn schema_get_column_data_type_by_name_provides_correct_date_type() {
517 assert_eq!(test_schema().get_column_data_type_by_name("cnt2").unwrap(), IntT);
518 }
519
520 #[test]
521 fn schema_get_column_date_type_by_name_returns_error_for_out_of_bounds() {
522 match test_schema().get_column_data_type_by_name("unknown").unwrap_err() {
523 Error::InvalidResultColumnName(name) => assert_eq!(name, "unknown".to_string()),
524 _ => panic!("Incorrect error kind"),
525 }
526 }
527
528 pub fn test_table() -> Table<DataRow> {
529 Table {
530 schema: test_schema(),
531 rows: vec![test_data_row()],
532 }
533 }
534
535 pub fn test_schema() -> Schema {
536 Schema {
537 column_data_types: vec![LngT, IntT],
538 column_name_to_index: BiMap::from_iter(vec![
539 ("cnt".to_string(), 0),
540 ("cnt2".to_string(), 1),
541 ]),
542 }
543 }
544}