1use std::pin::{pin, Pin};
20use std::ptr::read;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25use arc_swap::ArcSwap;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::future::err;
29use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
30use http::StatusCode;
31use log::debug;
32use regex::Regex;
33use tokio::sync::Mutex;
34
35use crate::helpers::durations::parse_duration_from_golang_string;
36use crate::httpx;
37use crate::httpx::decoder::Decoder;
38use crate::httpx::raw_json_row_streamer::{RawJsonRowItem, RawJsonRowStreamer};
39use crate::httpx::response::Response;
40use crate::memdx::magic::Magic::Res;
41use crate::queryx::error;
42use crate::queryx::error::{
43 Error, ErrorDesc, ErrorKind, ResourceError, ServerError, ServerErrorKind,
44};
45use crate::queryx::query_json::{
46 QueryEarlyMetaData, QueryError, QueryErrorResponse, QueryMetaData, QueryMetrics, QueryWarning,
47};
48use crate::queryx::query_result::{EarlyMetaData, MetaData, Metrics, Warning};
49
50pub struct QueryRespReader {
51 endpoint: String,
52 statement: String,
53 client_context_id: String,
54 status_code: StatusCode,
55
56 streamer: Pin<Box<dyn Stream<Item = httpx::error::Result<RawJsonRowItem>> + Send>>,
57 early_meta_data: EarlyMetaData,
58 meta_data: Option<MetaData>,
59 meta_data_error: Option<Error>,
60}
61
62impl Stream for QueryRespReader {
63 type Item = error::Result<Bytes>;
64
65 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66 let this = self.get_mut();
67
68 match this.streamer.poll_next_unpin(cx) {
69 Poll::Ready(Some(Ok(RawJsonRowItem::Row(row_data)))) => {
70 Poll::Ready(Some(Ok(Bytes::from(row_data))))
71 }
72 Poll::Ready(Some(Ok(RawJsonRowItem::Metadata(metadata)))) => {
73 match this.read_final_metadata(metadata) {
74 Ok(meta) => this.meta_data = Some(meta),
75 Err(e) => {
76 this.meta_data_error = Some(e.clone());
77 return Poll::Ready(Some(Err(e)));
78 }
79 };
80 Poll::Ready(None)
81 }
82 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::new_http_error(
83 format!("{}: {}", &this.endpoint, e),
84 Some(this.statement.clone()),
85 this.client_context_id.clone(),
86 )))),
87 Poll::Ready(None) => Poll::Ready(None),
88 Poll::Pending => Poll::Pending,
89 }
90 }
91}
92
93impl QueryRespReader {
94 pub async fn new(
95 resp: Response,
96 endpoint: impl Into<String>,
97 statement: impl Into<String>,
98 client_context_id: impl Into<String>,
99 ) -> error::Result<Self> {
100 let status_code = resp.status();
101 let endpoint = endpoint.into();
102 let statement = statement.into();
103 let client_context_id = client_context_id.into();
104 if status_code != 200 {
105 let body = match resp.bytes().await {
106 Ok(b) => b,
107 Err(e) => {
108 debug!("Failed to read response body on error {}", &e);
109 return Err(Error::new_http_error(
110 format!("{endpoint}: {e}"),
111 statement,
112 client_context_id,
113 ));
114 }
115 };
116
117 let errors: QueryErrorResponse = match serde_json::from_slice(&body) {
118 Ok(e) => e,
119 Err(e) => {
120 return Err(Error::new_message_error(
121 format!(
122 "non-200 status code received {status_code} but parsing error response body failed {e}"
123 ),
124 None,
125 None,
126 None,
127 ));
128 }
129 };
130
131 if errors.errors.is_empty() {
132 return Err(Error::new_message_error(
133 format!(
134 "Non-200 status code received {status_code} but response body contained no errors",
135 ),
136 None,
137 None,
138 None,
139 ));
140 }
141
142 return Err(Self::parse_errors(
143 &errors.errors,
144 endpoint,
145 statement,
146 client_context_id,
147 status_code,
148 ));
149 }
150
151 let stream = resp.bytes_stream();
152 let mut streamer = RawJsonRowStreamer::new(Decoder::new(stream), "results");
153
154 let early_meta_data =
155 Self::read_early_metadata(&mut streamer, &endpoint, &statement, &client_context_id)
156 .await?;
157
158 let has_more_rows = streamer.has_more_rows().await;
159 let mut epilog = None;
160 if !has_more_rows {
161 epilog = match streamer.epilog() {
162 Ok(epilog) => Some(epilog),
163 Err(e) => {
164 return Err(Error::new_http_error(
165 format!("{endpoint}: {e}"),
166 statement,
167 client_context_id,
168 ));
169 }
170 };
171 }
172
173 let mut reader = Self {
174 endpoint,
175 statement,
176 client_context_id,
177 status_code,
178 streamer: Box::pin(streamer.into_stream()),
179 early_meta_data,
180 meta_data: None,
181 meta_data_error: None,
182 };
183
184 if let Some(epilog) = epilog {
185 let meta = reader.read_final_metadata(epilog)?;
186
187 reader.meta_data = Some(meta);
188 }
189
190 Ok(reader)
191 }
192
193 pub fn early_metadata(&self) -> &EarlyMetaData {
194 &self.early_meta_data
195 }
196
197 pub fn metadata(&self) -> error::Result<&MetaData> {
198 if let Some(e) = &self.meta_data_error {
199 return Err(e.clone());
200 }
201
202 if let Some(meta) = &self.meta_data {
203 return Ok(meta);
204 }
205
206 Err(Error::new_message_error(
207 "cannot read meta-data until after all rows are read",
208 None,
209 None,
210 None,
211 ))
212 }
213
214 async fn read_early_metadata(
215 streamer: &mut RawJsonRowStreamer,
216 endpoint: &str,
217 statement: &str,
218 client_context_id: &str,
219 ) -> error::Result<EarlyMetaData> {
220 let prelude = streamer.read_prelude().await.map_err(|e| {
221 Error::new_http_error(
222 format!("{endpoint}: {e}"),
223 statement.to_string(),
224 client_context_id.to_string(),
225 )
226 })?;
227
228 let early_metadata: QueryEarlyMetaData = serde_json::from_slice(&prelude).map_err(|e| {
229 Error::new_message_error(
230 format!("failed to parse metadata from response: {e}"),
231 endpoint.to_string(),
232 statement.to_string(),
233 client_context_id.to_string(),
234 )
235 })?;
236
237 Ok(EarlyMetaData {
238 prepared: early_metadata.prepared,
239 })
240 }
241
242 fn read_final_metadata(&mut self, epilog: Vec<u8>) -> error::Result<MetaData> {
243 let metadata: QueryMetaData = match serde_json::from_slice(&epilog) {
244 Ok(m) => m,
245 Err(e) => {
246 return Err(Error::new_message_error(
247 format!("failed to parse query metadata from epilog: {e}"),
248 self.endpoint.clone(),
249 self.statement.clone(),
250 self.client_context_id.clone(),
251 ));
252 }
253 };
254
255 self.parse_metadata(metadata)
256 }
257
258 fn parse_metadata(&self, metadata: QueryMetaData) -> error::Result<MetaData> {
259 if !metadata.errors.is_empty() {
260 return Err(Self::parse_errors(
261 &metadata.errors,
262 &self.endpoint,
263 &self.statement,
264 &self.client_context_id,
265 self.status_code,
266 ));
267 }
268
269 let metrics = self.parse_metrics(metadata.metrics);
270 let warnings = self.parse_warnings(metadata.warnings);
271
272 Ok(MetaData {
273 prepared: metadata.early_meta_data.prepared,
274 request_id: metadata.request_id.unwrap_or_default(),
275 client_context_id: metadata.client_context_id.unwrap_or_default(),
276 status: metadata.status,
277 metrics,
278 signature: metadata.signature,
279 warnings,
280 profile: metadata.profile,
281 })
282 }
283
284 fn parse_metrics(&self, metrics: Option<QueryMetrics>) -> Option<Metrics> {
285 metrics.map(|m| {
286 let elapsed_time = if let Some(elapsed) = m.elapsed_time {
287 parse_duration_from_golang_string(&elapsed).unwrap_or_default()
288 } else {
289 Duration::default()
290 };
291
292 let execution_time = if let Some(execution) = m.execution_time {
293 parse_duration_from_golang_string(&execution).unwrap_or_default()
294 } else {
295 Duration::default()
296 };
297
298 Metrics {
299 elapsed_time,
300 execution_time,
301 result_count: m.result_count.unwrap_or_default(),
302 result_size: m.result_size.unwrap_or_default(),
303 mutation_count: m.mutation_count.unwrap_or_default(),
304 sort_count: m.sort_count.unwrap_or_default(),
305 error_count: m.error_count.unwrap_or_default(),
306 warning_count: m.warning_count.unwrap_or_default(),
307 }
308 })
309 }
310
311 fn parse_warnings(&self, warnings: Vec<QueryWarning>) -> Vec<Warning> {
312 let mut converted = vec![];
313 for w in warnings {
314 converted.push(Warning {
315 code: w.code.unwrap_or_default(),
316 message: w.msg.unwrap_or_default(),
317 });
318 }
319
320 converted
321 }
322
323 fn parse_errors(
324 errors: &[QueryError],
325 endpoint: impl Into<String>,
326 statement: impl Into<String>,
327 client_context_id: impl Into<String>,
328 status_code: StatusCode,
329 ) -> Error {
330 let error_descs: Vec<ErrorDesc> = errors
331 .iter()
332 .map(|error| {
333 ErrorDesc::new(
334 Self::parse_error_kind(error),
335 error.code,
336 error.msg.clone(),
337 error.retry.unwrap_or_default(),
338 error.reason.clone(),
339 )
340 })
341 .collect();
342
343 let chosen_desc = error_descs
344 .iter()
345 .find(|desc| !desc.retry())
346 .unwrap_or(&error_descs[0]);
347
348 let mut server_error = ServerError::new(
349 chosen_desc.kind().clone(),
350 endpoint,
351 status_code,
352 chosen_desc.code(),
353 chosen_desc.message(),
354 )
355 .with_client_context_id(client_context_id)
356 .with_statement(statement);
357
358 if error_descs.len() > 1 {
359 server_error = server_error.with_error_descs(error_descs);
360 }
361
362 match server_error.kind() {
363 ServerErrorKind::ScopeNotFound => {
364 Error::new_resource_error(ResourceError::new(server_error))
365 }
366 ServerErrorKind::CollectionNotFound => {
367 Error::new_resource_error(ResourceError::new(server_error))
368 }
369 ServerErrorKind::IndexNotFound => {
370 Error::new_resource_error(ResourceError::new(server_error))
371 }
372 ServerErrorKind::IndexExists => {
373 Error::new_resource_error(ResourceError::new(server_error))
374 }
375 ServerErrorKind::AuthenticationFailure => {
376 if server_error.code() == 13014 {
377 Error::new_resource_error(ResourceError::new(server_error))
378 } else {
379 Error::new_server_error(server_error)
380 }
381 }
382 _ => Error::new_server_error(server_error),
383 }
384 }
385
386 fn parse_error_kind(error: &QueryError) -> ServerErrorKind {
387 let err_code = error.code;
388 let err_code_group = err_code / 1000;
389
390 if err_code_group == 4 {
391 if err_code == 4040
392 || err_code == 4050
393 || err_code == 4060
394 || err_code == 4070
395 || err_code == 4080
396 || err_code == 4090
397 {
398 ServerErrorKind::PreparedStatementFailure
399 } else if err_code == 4300 {
400 ServerErrorKind::IndexExists
401 } else {
402 ServerErrorKind::PlanningFailure
403 }
404 } else if err_code_group == 5 {
405 let msg = error.msg.to_lowercase();
406 if msg.contains("not enough") && msg.contains("replica") {
407 ServerErrorKind::InvalidArgument {
408 argument: "num_replicas".to_string(),
409 reason: "not enough indexer nodes to create index with replica count"
410 .to_string(),
411 }
412 } else if msg.contains("build already in progress") {
413 ServerErrorKind::BuildAlreadyInProgress
414 } else if Regex::new(".*?ndex .*? already exist.*")
415 .unwrap()
416 .is_match(&error.msg)
417 {
418 ServerErrorKind::IndexExists
419 } else if Regex::new(".*?ndex .*? not exist.*")
420 .unwrap()
421 .is_match(&error.msg)
422 {
423 ServerErrorKind::IndexNotFound
424 } else {
425 ServerErrorKind::Internal
426 }
427 } else if err_code_group == 12 {
428 if err_code == 12003 {
429 ServerErrorKind::CollectionNotFound
430 } else if err_code == 12004 {
431 ServerErrorKind::IndexNotFound
432 } else if err_code == 12009 {
433 if !error.reason.is_empty() {
434 if let Some(code) = error.reason.get("code") {
435 if code == 12033 {
436 ServerErrorKind::CasMismatch
437 } else if code == 17014 {
438 ServerErrorKind::DocNotFound
439 } else if code == 17012 {
440 ServerErrorKind::DocExists
441 } else {
442 ServerErrorKind::DMLFailure
443 }
444 } else {
445 ServerErrorKind::DMLFailure
446 }
447 } else if error.msg.to_lowercase().contains("cas mismatch") {
448 ServerErrorKind::CasMismatch
449 } else {
450 ServerErrorKind::DMLFailure
451 }
452 } else if err_code == 12016 {
453 ServerErrorKind::IndexNotFound
454 } else if err_code == 12021 {
455 ServerErrorKind::ScopeNotFound
456 } else {
457 ServerErrorKind::IndexFailure
458 }
459 } else if err_code_group == 14 {
460 ServerErrorKind::IndexFailure
461 } else if err_code_group == 10 {
462 ServerErrorKind::AuthenticationFailure
463 } else if err_code == 1000 {
464 ServerErrorKind::WriteInReadOnlyMode
465 } else if err_code == 1080 {
466 ServerErrorKind::Timeout
467 } else if err_code == 3000 {
468 ServerErrorKind::ParsingFailure
469 } else if err_code == 13014 {
470 ServerErrorKind::AuthenticationFailure
471 } else {
472 ServerErrorKind::Unknown
473 }
474 }
475}