1use std::pin::{pin, Pin};
20use std::ptr::read;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25use arc_swap::ArcSwap;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::future::err;
29use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
30use http::StatusCode;
31use regex::Regex;
32use tokio::sync::Mutex;
33use tracing::debug;
34
35use crate::helpers::durations::parse_duration_from_golang_string;
36use crate::httpx;
37use crate::httpx::decoder::Decoder;
38use crate::httpx::raw_json_row_streamer::{RawJsonRowItem, RawJsonRowStreamer};
39use crate::httpx::response::Response;
40use crate::memdx::magic::Magic::Res;
41use crate::queryx::error;
42use crate::queryx::error::{
43 Error, ErrorDesc, ErrorKind, ResourceError, ServerError, ServerErrorKind,
44};
45use crate::queryx::query_json::{
46 QueryEarlyMetaData, QueryError, QueryErrorResponse, QueryMetaData, QueryMetrics, QueryWarning,
47};
48use crate::queryx::query_result::{EarlyMetaData, MetaData, Metrics, Warning};
49
50pub struct QueryRespReader {
51 endpoint: String,
52 statement: String,
53 client_context_id: String,
54 status_code: StatusCode,
55
56 streamer: Pin<Box<dyn Stream<Item = httpx::error::Result<RawJsonRowItem>> + Send>>,
57 early_meta_data: EarlyMetaData,
58 meta_data: Option<MetaData>,
59 meta_data_error: Option<Error>,
60}
61
62impl Stream for QueryRespReader {
63 type Item = error::Result<Bytes>;
64
65 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66 let this = self.get_mut();
67
68 match this.streamer.poll_next_unpin(cx) {
69 Poll::Ready(Some(Ok(RawJsonRowItem::Row(row_data)))) => {
70 Poll::Ready(Some(Ok(Bytes::from(row_data))))
71 }
72 Poll::Ready(Some(Ok(RawJsonRowItem::Metadata(metadata)))) => {
73 match this.read_final_metadata(metadata) {
74 Ok(meta) => this.meta_data = Some(meta),
75 Err(e) => {
76 this.meta_data_error = Some(e.clone());
77 return Poll::Ready(Some(Err(e)));
78 }
79 };
80 Poll::Ready(None)
81 }
82 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::new_http_error(
83 e,
84 this.endpoint.clone(),
85 Some(this.statement.clone()),
86 this.client_context_id.clone(),
87 )))),
88 Poll::Ready(None) => Poll::Ready(None),
89 Poll::Pending => Poll::Pending,
90 }
91 }
92}
93
94impl QueryRespReader {
95 pub async fn new(
96 resp: Response,
97 endpoint: impl Into<String>,
98 statement: impl Into<String>,
99 client_context_id: impl Into<String>,
100 ) -> error::Result<Self> {
101 let status_code = resp.status();
102 let endpoint = endpoint.into();
103 let statement = statement.into();
104 let client_context_id = client_context_id.into();
105 if status_code != 200 {
106 let body = match resp.bytes().await {
107 Ok(b) => b,
108 Err(e) => {
109 debug!("Failed to read response body on error {}", &e);
110 return Err(Error::new_http_error(
111 e,
112 endpoint,
113 statement,
114 client_context_id,
115 ));
116 }
117 };
118
119 let errors: QueryErrorResponse = match serde_json::from_slice(&body) {
120 Ok(e) => e,
121 Err(e) => {
122 return Err(Error::new_message_error(
123 format!(
124 "non-200 status code received {status_code} but parsing error response body failed {e}"
125 ),
126 None,
127 None,
128 None,
129 ));
130 }
131 };
132
133 if errors.errors.is_empty() {
134 return Err(Error::new_message_error(
135 format!(
136 "Non-200 status code received {status_code} but response body contained no errors",
137 ),
138 None,
139 None,
140 None,
141 ));
142 }
143
144 return Err(Self::parse_errors(
145 &errors.errors,
146 endpoint,
147 statement,
148 client_context_id,
149 status_code,
150 ));
151 }
152
153 let stream = resp.bytes_stream();
154 let mut streamer = RawJsonRowStreamer::new(Decoder::new(stream), "results");
155
156 let early_meta_data =
157 Self::read_early_metadata(&mut streamer, &endpoint, &statement, &client_context_id)
158 .await?;
159
160 let has_more_rows = streamer.has_more_rows().await;
161 let mut epilog = None;
162 if !has_more_rows {
163 epilog = match streamer.epilog() {
164 Ok(epilog) => Some(epilog),
165 Err(e) => {
166 return Err(Error::new_http_error(
167 e,
168 endpoint,
169 statement,
170 client_context_id,
171 ));
172 }
173 };
174 }
175
176 let mut reader = Self {
177 endpoint,
178 statement,
179 client_context_id,
180 status_code,
181 streamer: Box::pin(streamer.into_stream()),
182 early_meta_data,
183 meta_data: None,
184 meta_data_error: None,
185 };
186
187 if let Some(epilog) = epilog {
188 let meta = reader.read_final_metadata(epilog)?;
189
190 reader.meta_data = Some(meta);
191 }
192
193 Ok(reader)
194 }
195
196 pub fn early_metadata(&self) -> &EarlyMetaData {
197 &self.early_meta_data
198 }
199
200 pub fn metadata(&self) -> error::Result<&MetaData> {
201 if let Some(e) = &self.meta_data_error {
202 return Err(e.clone());
203 }
204
205 if let Some(meta) = &self.meta_data {
206 return Ok(meta);
207 }
208
209 Err(Error::new_message_error(
210 "cannot read meta-data until after all rows are read",
211 None,
212 None,
213 None,
214 ))
215 }
216
217 async fn read_early_metadata(
218 streamer: &mut RawJsonRowStreamer,
219 endpoint: &str,
220 statement: &str,
221 client_context_id: &str,
222 ) -> error::Result<EarlyMetaData> {
223 let prelude = streamer.read_prelude().await.map_err(|e| {
224 Error::new_http_error(
225 e,
226 endpoint,
227 statement.to_string(),
228 client_context_id.to_string(),
229 )
230 })?;
231
232 let early_metadata: QueryEarlyMetaData = serde_json::from_slice(&prelude).map_err(|e| {
233 Error::new_message_error(
234 format!("failed to parse metadata from response: {e}"),
235 endpoint.to_string(),
236 statement.to_string(),
237 client_context_id.to_string(),
238 )
239 })?;
240
241 Ok(EarlyMetaData {
242 prepared: early_metadata.prepared,
243 })
244 }
245
246 fn read_final_metadata(&mut self, epilog: Vec<u8>) -> error::Result<MetaData> {
247 let metadata: QueryMetaData = match serde_json::from_slice(&epilog) {
248 Ok(m) => m,
249 Err(e) => {
250 return Err(Error::new_message_error(
251 format!("failed to parse query metadata from epilog: {e}"),
252 self.endpoint.clone(),
253 self.statement.clone(),
254 self.client_context_id.clone(),
255 ));
256 }
257 };
258
259 self.parse_metadata(metadata)
260 }
261
262 fn parse_metadata(&self, metadata: QueryMetaData) -> error::Result<MetaData> {
263 if !metadata.errors.is_empty() {
264 return Err(Self::parse_errors(
265 &metadata.errors,
266 &self.endpoint,
267 &self.statement,
268 &self.client_context_id,
269 self.status_code,
270 ));
271 }
272
273 let metrics = self.parse_metrics(metadata.metrics);
274 let warnings = self.parse_warnings(metadata.warnings);
275
276 Ok(MetaData {
277 prepared: metadata.early_meta_data.prepared,
278 request_id: metadata.request_id.unwrap_or_default(),
279 client_context_id: metadata.client_context_id.unwrap_or_default(),
280 status: metadata.status,
281 metrics,
282 signature: metadata.signature,
283 warnings,
284 profile: metadata.profile,
285 })
286 }
287
288 fn parse_metrics(&self, metrics: Option<QueryMetrics>) -> Option<Metrics> {
289 metrics.map(|m| {
290 let elapsed_time = if let Some(elapsed) = m.elapsed_time {
291 parse_duration_from_golang_string(&elapsed).unwrap_or_default()
292 } else {
293 Duration::default()
294 };
295
296 let execution_time = if let Some(execution) = m.execution_time {
297 parse_duration_from_golang_string(&execution).unwrap_or_default()
298 } else {
299 Duration::default()
300 };
301
302 Metrics {
303 elapsed_time,
304 execution_time,
305 result_count: m.result_count.unwrap_or_default(),
306 result_size: m.result_size.unwrap_or_default(),
307 mutation_count: m.mutation_count.unwrap_or_default(),
308 sort_count: m.sort_count.unwrap_or_default(),
309 error_count: m.error_count.unwrap_or_default(),
310 warning_count: m.warning_count.unwrap_or_default(),
311 }
312 })
313 }
314
315 fn parse_warnings(&self, warnings: Vec<QueryWarning>) -> Vec<Warning> {
316 let mut converted = vec![];
317 for w in warnings {
318 converted.push(Warning {
319 code: w.code.unwrap_or_default(),
320 message: w.msg.unwrap_or_default(),
321 });
322 }
323
324 converted
325 }
326
327 fn parse_errors(
328 errors: &[QueryError],
329 endpoint: impl Into<String>,
330 statement: impl Into<String>,
331 client_context_id: impl Into<String>,
332 status_code: StatusCode,
333 ) -> Error {
334 let error_descs: Vec<ErrorDesc> = errors
335 .iter()
336 .map(|error| {
337 ErrorDesc::new(
338 Self::parse_error_kind(error),
339 error.code,
340 error.msg.clone(),
341 error.retry.unwrap_or_default(),
342 error.reason.clone(),
343 )
344 })
345 .collect();
346
347 let chosen_desc = error_descs
348 .iter()
349 .find(|desc| !desc.retry())
350 .unwrap_or(&error_descs[0]);
351
352 let mut server_error = ServerError::new(
353 chosen_desc.kind().clone(),
354 endpoint,
355 status_code,
356 chosen_desc.code(),
357 chosen_desc.message(),
358 )
359 .with_client_context_id(client_context_id)
360 .with_statement(statement);
361
362 if error_descs.len() > 1 {
363 server_error = server_error.with_error_descs(error_descs);
364 }
365
366 match server_error.kind() {
367 ServerErrorKind::ScopeNotFound => {
368 Error::new_resource_error(ResourceError::new(server_error))
369 }
370 ServerErrorKind::CollectionNotFound => {
371 Error::new_resource_error(ResourceError::new(server_error))
372 }
373 ServerErrorKind::IndexNotFound => {
374 Error::new_resource_error(ResourceError::new(server_error))
375 }
376 ServerErrorKind::IndexExists => {
377 Error::new_resource_error(ResourceError::new(server_error))
378 }
379 ServerErrorKind::AuthenticationFailure => {
380 if server_error.code() == 13014 {
381 Error::new_resource_error(ResourceError::new(server_error))
382 } else {
383 Error::new_server_error(server_error)
384 }
385 }
386 _ => Error::new_server_error(server_error),
387 }
388 }
389
390 fn parse_error_kind(error: &QueryError) -> ServerErrorKind {
391 let err_code = error.code;
392 let err_code_group = err_code / 1000;
393
394 if err_code_group == 4 {
395 if err_code == 4040
396 || err_code == 4050
397 || err_code == 4060
398 || err_code == 4070
399 || err_code == 4080
400 || err_code == 4090
401 {
402 ServerErrorKind::PreparedStatementFailure
403 } else if err_code == 4300 {
404 ServerErrorKind::IndexExists
405 } else {
406 ServerErrorKind::PlanningFailure
407 }
408 } else if err_code_group == 5 {
409 let msg = error.msg.to_lowercase();
410 if msg.contains("not enough") && msg.contains("replica") {
411 ServerErrorKind::InvalidArgument {
412 argument: "num_replicas".to_string(),
413 reason: "not enough indexer nodes to create index with replica count"
414 .to_string(),
415 }
416 } else if msg.contains("build already in progress") {
417 ServerErrorKind::BuildAlreadyInProgress
418 } else if Regex::new(".*?ndex .*? already exist.*")
419 .unwrap()
420 .is_match(&error.msg)
421 {
422 ServerErrorKind::IndexExists
423 } else if Regex::new(".*?ndex .*? not exist.*")
424 .unwrap()
425 .is_match(&error.msg)
426 {
427 ServerErrorKind::IndexNotFound
428 } else {
429 ServerErrorKind::Internal
430 }
431 } else if err_code_group == 12 {
432 if err_code == 12003 {
433 ServerErrorKind::CollectionNotFound
434 } else if err_code == 12004 {
435 ServerErrorKind::IndexNotFound
436 } else if err_code == 12009 {
437 if !error.reason.is_empty() {
438 if let Some(code) = error.reason.get("code") {
439 if code == 12033 {
440 ServerErrorKind::CasMismatch
441 } else if code == 17014 {
442 ServerErrorKind::DocNotFound
443 } else if code == 17012 {
444 ServerErrorKind::DocExists
445 } else {
446 ServerErrorKind::DMLFailure
447 }
448 } else {
449 ServerErrorKind::DMLFailure
450 }
451 } else if error.msg.to_lowercase().contains("cas mismatch") {
452 ServerErrorKind::CasMismatch
453 } else {
454 ServerErrorKind::DMLFailure
455 }
456 } else if err_code == 12016 {
457 ServerErrorKind::IndexNotFound
458 } else if err_code == 12021 {
459 ServerErrorKind::ScopeNotFound
460 } else {
461 ServerErrorKind::IndexFailure
462 }
463 } else if err_code_group == 14 {
464 ServerErrorKind::IndexFailure
465 } else if err_code_group == 10 {
466 ServerErrorKind::AuthenticationFailure
467 } else if err_code == 1000 {
468 ServerErrorKind::WriteInReadOnlyMode
469 } else if err_code == 1080 {
470 ServerErrorKind::Timeout
471 } else if err_code == 3000 {
472 ServerErrorKind::ParsingFailure
473 } else if err_code == 13014 {
474 ServerErrorKind::AuthenticationFailure
475 } else {
476 ServerErrorKind::Unknown
477 }
478 }
479}