1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4
5pub use pql::PqlResponse;
6pub use sql::SqlResponse;
7
8pub mod data;
9pub mod pql;
10pub mod raw;
11pub mod sql;
12pub mod deserialise;
13
14#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
16pub struct PinotException {
17 #[serde(rename(deserialize = "errorCode"))]
18 pub error_code: i32,
19 pub message: String,
20}
21
22#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
24pub struct ResponseStats {
25 pub trace_info: HashMap<String, String>,
26 pub num_servers_queried: i32,
27 pub num_servers_responded: i32,
28 pub num_segments_queried: i32,
29 pub num_segments_processed: i32,
30 pub num_segments_matched: i32,
31 pub num_consuming_segments_queried: i32,
32 pub num_docs_scanned: i64,
33 pub num_entries_scanned_in_filter: i64,
34 pub num_entries_scanned_post_filter: i64,
35 pub num_groups_limit_reached: bool,
36 pub total_docs: i64,
37 pub time_used_ms: i32,
38 pub min_consuming_freshness_time_ms: i64,
39}
40
41#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
43#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
44pub enum DataType {
45 Int,
46 Long,
47 Float,
48 Double,
49 Boolean,
50 Timestamp,
51 String,
52 Json,
53 Bytes,
54 IntArray,
55 LongArray,
56 FloatArray,
57 DoubleArray,
58 BooleanArray,
59 TimestampArray,
60 StringArray,
61 BytesArray,
62}
63
64#[cfg(test)]
65pub(crate) mod tests {
66 use serde_json::{json, Value};
67
68 use crate::response::{PqlResponse, ResponseStats};
69 use crate::response::data::DataRow;
70 use crate::response::raw::SelectionResults;
71 use crate::response::sql::SqlResponse;
72 use crate::response::sql::tests::test_table;
73 use crate::tests::to_string_vec;
74
75 use super::*;
76
77 #[test]
78 fn data_type_deserializes_correctly() {
79 assert_eq!(DataType::deserialize(json!("INT")).unwrap(), DataType::Int);
80 assert_eq!(DataType::deserialize(json!("LONG")).unwrap(), DataType::Long);
81 assert_eq!(DataType::deserialize(json!("DOUBLE")).unwrap(), DataType::Double);
82 assert_eq!(DataType::deserialize(json!("BOOLEAN")).unwrap(), DataType::Boolean);
83 assert_eq!(DataType::deserialize(json!("TIMESTAMP")).unwrap(), DataType::Timestamp);
84 assert_eq!(DataType::deserialize(json!("STRING")).unwrap(), DataType::String);
85 assert_eq!(DataType::deserialize(json!("JSON")).unwrap(), DataType::Json);
86 assert_eq!(DataType::deserialize(json!("BYTES")).unwrap(), DataType::Bytes);
87 assert_eq!(DataType::deserialize(json!("INT_ARRAY")).unwrap(), DataType::IntArray);
88 assert_eq!(DataType::deserialize(json!("LONG_ARRAY")).unwrap(), DataType::LongArray);
89 assert_eq!(DataType::deserialize(json!("FLOAT_ARRAY")).unwrap(), DataType::FloatArray);
90 assert_eq!(DataType::deserialize(json!("DOUBLE_ARRAY")).unwrap(), DataType::DoubleArray);
91 assert_eq!(DataType::deserialize(json!("BOOLEAN_ARRAY")).unwrap(), DataType::BooleanArray);
92 assert_eq!(DataType::deserialize(json!("TIMESTAMP_ARRAY")).unwrap(), DataType::TimestampArray);
93 assert_eq!(DataType::deserialize(json!("STRING_ARRAY")).unwrap(), DataType::StringArray);
94 assert_eq!(DataType::deserialize(json!("BYTES_ARRAY")).unwrap(), DataType::BytesArray);
95 }
96
97 #[test]
98 fn data_type_serializes_correctly() {
99 assert_eq!(serde_json::to_value(&DataType::Int).unwrap(), json!("INT"));
100 assert_eq!(serde_json::to_value(&DataType::Long).unwrap(), json!("LONG"));
101 assert_eq!(serde_json::to_value(&DataType::Double).unwrap(), json!("DOUBLE"));
102 assert_eq!(serde_json::to_value(&DataType::Boolean).unwrap(), json!("BOOLEAN"));
103 assert_eq!(serde_json::to_value(&DataType::Timestamp).unwrap(), json!("TIMESTAMP"));
104 assert_eq!(serde_json::to_value(&DataType::String).unwrap(), json!("STRING"));
105 assert_eq!(serde_json::to_value(&DataType::Json).unwrap(), json!("JSON"));
106 assert_eq!(serde_json::to_value(&DataType::Bytes).unwrap(), json!("BYTES"));
107 assert_eq!(serde_json::to_value(&DataType::IntArray).unwrap(), json!("INT_ARRAY"));
108 assert_eq!(serde_json::to_value(&DataType::LongArray).unwrap(), json!("LONG_ARRAY"));
109 assert_eq!(serde_json::to_value(&DataType::FloatArray).unwrap(), json!("FLOAT_ARRAY"));
110 assert_eq!(serde_json::to_value(&DataType::DoubleArray).unwrap(), json!("DOUBLE_ARRAY"));
111 assert_eq!(serde_json::to_value(&DataType::BooleanArray).unwrap(), json!("BOOLEAN_ARRAY"));
112 assert_eq!(serde_json::to_value(&DataType::TimestampArray).unwrap(), json!("TIMESTAMP_ARRAY"));
113 assert_eq!(serde_json::to_value(&DataType::StringArray).unwrap(), json!("STRING_ARRAY"));
114 assert_eq!(serde_json::to_value(&DataType::BytesArray).unwrap(), json!("BYTES_ARRAY"));
115 }
116
117 pub fn test_broker_response_error_msg() -> String {
118 let error_message: &str = concat!(
119 "QueryExecutionError:\n",
120 "java.lang.NumberFormatException: For input string: \"UA\"\n",
121 "\tat sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)\n",
122 "\tat sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)\n",
123 "\tat java.lang.Double.parseDouble(Double.java:538)\n",
124 "\tat org.apache.pinot.core.segment.index.readers.StringDictionary.getDoubleValue(StringDictionary.java:58)\n",
125 "\tat org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator.getNextBlock(DictionaryBasedAggregationOperator.java:81)\n",
126 "\tat org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator.getNextBlock(DictionaryBasedAggregationOperator.java:47)\n",
127 "\tat org.apache.pinot.core.operator.BaseOperator.nextBlock(BaseOperator.java:48)\n",
128 "\tat org.apache.pinot.core.operator.CombineOperator$1.runJob(CombineOperator.java:102)\n",
129 "\tat org.apache.pinot.core.util.trace.TraceRunnable.run(TraceRunnable.java:40)\n",
130 "\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n",
131 "\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n",
132 "\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n",
133 "\tat shaded.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)\n",
134 "\tat shaded.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)",
135 );
136 error_message.to_string()
137 }
138
139 pub fn test_error_containing_broker_response(error_message: &str) -> Value {
140 json!({
141 "exceptions": [{
142 "errorCode": 200,
143 "message": error_message.clone(),
144 }],
145 "numServersQueried": 1,
146 "numServersResponded": 1,
147 "numSegmentsQueried": 12
148 ,"numSegmentsProcessed": 0,
149 "numSegmentsMatched": 0,
150 "numConsumingSegmentsQueried": 0,
151 "numDocsScanned": 0,
152 "numEntriesScannedInFilter": 0,
153 "numEntriesScannedPostFilter": 0,
154 "numGroupsLimitReached": false,
155 "totalDocs": 97889,
156 "timeUsedMs": 5,
157 "segmentStatistics": [],
158 "traceInfo": {},
159 "minConsumingFreshnessTimeMs": 0
160 })
161 }
162
163 pub fn test_broker_response_json() -> Value {
164 json!({
165 "resultTable": {
166 "dataSchema": {
167 "columnDataTypes": ["LONG", "INT"],
168 "columnNames": ["cnt", "cnt2"]
169 },
170 "rows": [[97889, 0]]
171 },
172 "exceptions": [],
173 "numServersQueried": 1,
174 "numServersResponded": 1,
175 "numSegmentsQueried": 1,
176 "numSegmentsProcessed": 1,
177 "numSegmentsMatched": 1,
178 "numConsumingSegmentsQueried": 0,
179 "numDocsScanned": 97889,
180 "numEntriesScannedInFilter": 0,
181 "numEntriesScannedPostFilter": 0,
182 "numGroupsLimitReached": false,
183 "totalDocs": 97889,
184 "timeUsedMs": 5,
185 "segmentStatistics": [],
186 "traceInfo": {},
187 "minConsumingFreshnessTimeMs": 0
188 })
189 }
190
191 pub fn test_sql_response() -> SqlResponse<DataRow> {
192 SqlResponse {
193 table: Some(test_table()),
194 stats: Some(ResponseStats {
195 trace_info: Default::default(),
196 num_servers_queried: 1,
197 num_servers_responded: 1,
198 num_segments_queried: 1,
199 num_segments_processed: 1,
200 num_segments_matched: 1,
201 num_consuming_segments_queried: 0,
202 num_docs_scanned: 97889,
203 num_entries_scanned_in_filter: 0,
204 num_entries_scanned_post_filter: 0,
205 num_groups_limit_reached: false,
206 total_docs: 97889,
207 time_used_ms: 5,
208 min_consuming_freshness_time_ms: 0,
209 }),
210 }
211 }
212
213 pub fn test_pql_response() -> PqlResponse {
214 PqlResponse {
215 aggregation_results: vec![],
216 selection_results: Some(SelectionResults::new(
217 to_string_vec(vec!["cnt", "extra"]),
218 vec![vec![
219 Value::String("1".to_string()),
220 Value::String("{\"a\": \"b\"}".to_string()),
221 ]],
222 )),
223 stats: Some(ResponseStats {
224 trace_info: Default::default(),
225 num_servers_queried: 1,
226 num_servers_responded: 1,
227 num_segments_queried: 1,
228 num_segments_processed: 1,
229 num_segments_matched: 1,
230 num_consuming_segments_queried: 0,
231 num_docs_scanned: 97889,
232 num_entries_scanned_in_filter: 0,
233 num_entries_scanned_post_filter: 0,
234 num_groups_limit_reached: false,
235 total_docs: 97889,
236 time_used_ms: 5,
237 min_consuming_freshness_time_ms: 0,
238 }),
239 }
240 }
241}