pinot_client_rust/response/
mod.rs

1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4
5pub use pql::PqlResponse;
6pub use sql::SqlResponse;
7
8pub mod data;
9pub mod pql;
10pub mod raw;
11pub mod sql;
12pub mod deserialise;
13
14/// Pinot exception.
15#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
16pub struct PinotException {
17    #[serde(rename(deserialize = "errorCode"))]
18    pub error_code: i32,
19    pub message: String,
20}
21
22/// Carries all stats returned by a query.
23#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
24pub struct ResponseStats {
25    pub trace_info: HashMap<String, String>,
26    pub num_servers_queried: i32,
27    pub num_servers_responded: i32,
28    pub num_segments_queried: i32,
29    pub num_segments_processed: i32,
30    pub num_segments_matched: i32,
31    pub num_consuming_segments_queried: i32,
32    pub num_docs_scanned: i64,
33    pub num_entries_scanned_in_filter: i64,
34    pub num_entries_scanned_post_filter: i64,
35    pub num_groups_limit_reached: bool,
36    pub total_docs: i64,
37    pub time_used_ms: i32,
38    pub min_consuming_freshness_time_ms: i64,
39}
40
41/// Pinot native types
42#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
43#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
44pub enum DataType {
45    Int,
46    Long,
47    Float,
48    Double,
49    Boolean,
50    Timestamp,
51    String,
52    Json,
53    Bytes,
54    IntArray,
55    LongArray,
56    FloatArray,
57    DoubleArray,
58    BooleanArray,
59    TimestampArray,
60    StringArray,
61    BytesArray,
62}
63
64#[cfg(test)]
65pub(crate) mod tests {
66    use serde_json::{json, Value};
67
68    use crate::response::{PqlResponse, ResponseStats};
69    use crate::response::data::DataRow;
70    use crate::response::raw::SelectionResults;
71    use crate::response::sql::SqlResponse;
72    use crate::response::sql::tests::test_table;
73    use crate::tests::to_string_vec;
74
75    use super::*;
76
77    #[test]
78    fn data_type_deserializes_correctly() {
79        assert_eq!(DataType::deserialize(json!("INT")).unwrap(), DataType::Int);
80        assert_eq!(DataType::deserialize(json!("LONG")).unwrap(), DataType::Long);
81        assert_eq!(DataType::deserialize(json!("DOUBLE")).unwrap(), DataType::Double);
82        assert_eq!(DataType::deserialize(json!("BOOLEAN")).unwrap(), DataType::Boolean);
83        assert_eq!(DataType::deserialize(json!("TIMESTAMP")).unwrap(), DataType::Timestamp);
84        assert_eq!(DataType::deserialize(json!("STRING")).unwrap(), DataType::String);
85        assert_eq!(DataType::deserialize(json!("JSON")).unwrap(), DataType::Json);
86        assert_eq!(DataType::deserialize(json!("BYTES")).unwrap(), DataType::Bytes);
87        assert_eq!(DataType::deserialize(json!("INT_ARRAY")).unwrap(), DataType::IntArray);
88        assert_eq!(DataType::deserialize(json!("LONG_ARRAY")).unwrap(), DataType::LongArray);
89        assert_eq!(DataType::deserialize(json!("FLOAT_ARRAY")).unwrap(), DataType::FloatArray);
90        assert_eq!(DataType::deserialize(json!("DOUBLE_ARRAY")).unwrap(), DataType::DoubleArray);
91        assert_eq!(DataType::deserialize(json!("BOOLEAN_ARRAY")).unwrap(), DataType::BooleanArray);
92        assert_eq!(DataType::deserialize(json!("TIMESTAMP_ARRAY")).unwrap(), DataType::TimestampArray);
93        assert_eq!(DataType::deserialize(json!("STRING_ARRAY")).unwrap(), DataType::StringArray);
94        assert_eq!(DataType::deserialize(json!("BYTES_ARRAY")).unwrap(), DataType::BytesArray);
95    }
96
97    #[test]
98    fn data_type_serializes_correctly() {
99        assert_eq!(serde_json::to_value(&DataType::Int).unwrap(), json!("INT"));
100        assert_eq!(serde_json::to_value(&DataType::Long).unwrap(), json!("LONG"));
101        assert_eq!(serde_json::to_value(&DataType::Double).unwrap(), json!("DOUBLE"));
102        assert_eq!(serde_json::to_value(&DataType::Boolean).unwrap(), json!("BOOLEAN"));
103        assert_eq!(serde_json::to_value(&DataType::Timestamp).unwrap(), json!("TIMESTAMP"));
104        assert_eq!(serde_json::to_value(&DataType::String).unwrap(), json!("STRING"));
105        assert_eq!(serde_json::to_value(&DataType::Json).unwrap(), json!("JSON"));
106        assert_eq!(serde_json::to_value(&DataType::Bytes).unwrap(), json!("BYTES"));
107        assert_eq!(serde_json::to_value(&DataType::IntArray).unwrap(), json!("INT_ARRAY"));
108        assert_eq!(serde_json::to_value(&DataType::LongArray).unwrap(), json!("LONG_ARRAY"));
109        assert_eq!(serde_json::to_value(&DataType::FloatArray).unwrap(), json!("FLOAT_ARRAY"));
110        assert_eq!(serde_json::to_value(&DataType::DoubleArray).unwrap(), json!("DOUBLE_ARRAY"));
111        assert_eq!(serde_json::to_value(&DataType::BooleanArray).unwrap(), json!("BOOLEAN_ARRAY"));
112        assert_eq!(serde_json::to_value(&DataType::TimestampArray).unwrap(), json!("TIMESTAMP_ARRAY"));
113        assert_eq!(serde_json::to_value(&DataType::StringArray).unwrap(), json!("STRING_ARRAY"));
114        assert_eq!(serde_json::to_value(&DataType::BytesArray).unwrap(), json!("BYTES_ARRAY"));
115    }
116
117    pub fn test_broker_response_error_msg() -> String {
118        let error_message: &str = concat!(
119        "QueryExecutionError:\n",
120        "java.lang.NumberFormatException: For input string: \"UA\"\n",
121        "\tat sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)\n",
122        "\tat sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)\n",
123        "\tat java.lang.Double.parseDouble(Double.java:538)\n",
124        "\tat org.apache.pinot.core.segment.index.readers.StringDictionary.getDoubleValue(StringDictionary.java:58)\n",
125        "\tat org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator.getNextBlock(DictionaryBasedAggregationOperator.java:81)\n",
126        "\tat org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator.getNextBlock(DictionaryBasedAggregationOperator.java:47)\n",
127        "\tat org.apache.pinot.core.operator.BaseOperator.nextBlock(BaseOperator.java:48)\n",
128        "\tat org.apache.pinot.core.operator.CombineOperator$1.runJob(CombineOperator.java:102)\n",
129        "\tat org.apache.pinot.core.util.trace.TraceRunnable.run(TraceRunnable.java:40)\n",
130        "\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n",
131        "\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n",
132        "\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n",
133        "\tat shaded.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)\n",
134        "\tat shaded.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)",
135        );
136        error_message.to_string()
137    }
138
139    pub fn test_error_containing_broker_response(error_message: &str) -> Value {
140        json!({
141            "exceptions": [{
142                "errorCode": 200,
143                "message": error_message.clone(),
144            }],
145            "numServersQueried": 1,
146            "numServersResponded": 1,
147            "numSegmentsQueried": 12
148            ,"numSegmentsProcessed": 0,
149            "numSegmentsMatched": 0,
150            "numConsumingSegmentsQueried": 0,
151            "numDocsScanned": 0,
152            "numEntriesScannedInFilter": 0,
153            "numEntriesScannedPostFilter": 0,
154            "numGroupsLimitReached": false,
155            "totalDocs": 97889,
156            "timeUsedMs": 5,
157            "segmentStatistics": [],
158            "traceInfo": {},
159            "minConsumingFreshnessTimeMs": 0
160        })
161    }
162
163    pub fn test_broker_response_json() -> Value {
164        json!({
165            "resultTable": {
166                "dataSchema": {
167                    "columnDataTypes": ["LONG", "INT"],
168                    "columnNames": ["cnt", "cnt2"]
169                },
170                "rows": [[97889, 0]]
171            },
172            "exceptions": [],
173            "numServersQueried": 1,
174            "numServersResponded": 1,
175            "numSegmentsQueried": 1,
176            "numSegmentsProcessed": 1,
177            "numSegmentsMatched": 1,
178            "numConsumingSegmentsQueried": 0,
179            "numDocsScanned": 97889,
180            "numEntriesScannedInFilter": 0,
181            "numEntriesScannedPostFilter": 0,
182            "numGroupsLimitReached": false,
183            "totalDocs": 97889,
184            "timeUsedMs": 5,
185            "segmentStatistics": [],
186            "traceInfo": {},
187            "minConsumingFreshnessTimeMs": 0
188        })
189    }
190
191    pub fn test_sql_response() -> SqlResponse<DataRow> {
192        SqlResponse {
193            table: Some(test_table()),
194            stats: Some(ResponseStats {
195                trace_info: Default::default(),
196                num_servers_queried: 1,
197                num_servers_responded: 1,
198                num_segments_queried: 1,
199                num_segments_processed: 1,
200                num_segments_matched: 1,
201                num_consuming_segments_queried: 0,
202                num_docs_scanned: 97889,
203                num_entries_scanned_in_filter: 0,
204                num_entries_scanned_post_filter: 0,
205                num_groups_limit_reached: false,
206                total_docs: 97889,
207                time_used_ms: 5,
208                min_consuming_freshness_time_ms: 0,
209            }),
210        }
211    }
212
213    pub fn test_pql_response() -> PqlResponse {
214        PqlResponse {
215            aggregation_results: vec![],
216            selection_results: Some(SelectionResults::new(
217                to_string_vec(vec!["cnt", "extra"]),
218                vec![vec![
219                    Value::String("1".to_string()),
220                    Value::String("{\"a\": \"b\"}".to_string()),
221                ]],
222            )),
223            stats: Some(ResponseStats {
224                trace_info: Default::default(),
225                num_servers_queried: 1,
226                num_servers_responded: 1,
227                num_segments_queried: 1,
228                num_segments_processed: 1,
229                num_segments_matched: 1,
230                num_consuming_segments_queried: 0,
231                num_docs_scanned: 97889,
232                num_entries_scanned_in_filter: 0,
233                num_entries_scanned_post_filter: 0,
234                num_groups_limit_reached: false,
235                total_docs: 97889,
236                time_used_ms: 5,
237                min_consuming_freshness_time_ms: 0,
238            }),
239        }
240    }
241}