couchbase_core/analyticsx/
query_respreader.rs1use std::pin::{pin, Pin};
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use crate::analyticsx::error;
25use crate::analyticsx::error::ErrorKind::Server;
26use crate::analyticsx::error::{Error, ErrorDesc, ServerError, ServerErrorKind};
27use crate::analyticsx::query_json::{
28 QueryError, QueryErrorResponse, QueryMetaData, QueryMetrics, QueryWarning,
29};
30use crate::analyticsx::query_result::{MetaData, MetadataPlans, Metrics, Warning};
31use crate::helpers::durations::parse_duration_from_golang_string;
32use crate::httpx;
33use crate::httpx::decoder::Decoder;
34use crate::httpx::raw_json_row_streamer::{RawJsonRowItem, RawJsonRowStreamer};
35use crate::httpx::response::Response;
36use arc_swap::ArcSwap;
37use async_trait::async_trait;
38use bytes::Bytes;
39use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
40use http::StatusCode;
41use tracing::debug;
42
43pub struct QueryRespReader {
44 endpoint: String,
45 statement: String,
46 client_context_id: String,
47 status_code: StatusCode,
48
49 streamer: Pin<Box<dyn Stream<Item = httpx::error::Result<RawJsonRowItem>> + Send>>,
50 meta_data: Option<MetaData>,
51 meta_data_error: Option<Error>,
52}
53
54impl Stream for QueryRespReader {
55 type Item = error::Result<Bytes>;
56
57 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58 let this = self.get_mut();
59
60 match this.streamer.poll_next_unpin(cx) {
61 Poll::Ready(Some(Ok(RawJsonRowItem::Row(row_data)))) => {
62 Poll::Ready(Some(Ok(Bytes::from(row_data))))
63 }
64 Poll::Ready(Some(Ok(RawJsonRowItem::Metadata(metadata)))) => {
65 match this.read_final_metadata(metadata) {
66 Ok(meta) => this.meta_data = Some(meta),
67 Err(e) => {
68 this.meta_data_error = Some(e.clone());
69 return Poll::Ready(Some(Err(e)));
70 }
71 };
72 Poll::Ready(None)
73 }
74 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::new_http_error(
75 e,
76 this.endpoint.to_string(),
77 Some(this.statement.clone()),
78 this.client_context_id.clone(),
79 )))),
80 Poll::Ready(None) => Poll::Ready(None),
81 Poll::Pending => Poll::Pending,
82 }
83 }
84}
85
86impl QueryRespReader {
87 pub async fn new(
88 resp: Response,
89 endpoint: impl Into<String>,
90 statement: impl Into<String>,
91 client_context_id: impl Into<String>,
92 ) -> error::Result<Self> {
93 let status_code = resp.status();
94 let endpoint = endpoint.into();
95 let statement = statement.into();
96 let client_context_id = client_context_id.into();
97 if status_code != 200 {
98 let body = match resp.bytes().await {
99 Ok(b) => b,
100 Err(e) => {
101 debug!("Failed to read response body on error {}", &e);
102 return Err(Error::new_http_error(
103 e,
104 endpoint,
105 statement,
106 client_context_id,
107 ));
108 }
109 };
110
111 let errors: QueryErrorResponse = match serde_json::from_slice(&body) {
112 Ok(e) => e,
113 Err(e) => {
114 return Err(Error::new_message_error(
115 format!(
116 "non-200 status code received {status_code} but parsing error response body failed {e}"
117 ),
118 None,
119 None,
120 None,
121 ));
122 }
123 };
124
125 if errors.errors.is_empty() {
126 return Err(Error::new_message_error(
127 format!(
128 "Non-200 status code received {status_code} but response body contained no errors",
129 ),
130 None,
131 None,
132 None,
133 ));
134 }
135
136 return Err(Self::parse_errors(
137 &errors.errors,
138 endpoint,
139 statement,
140 client_context_id,
141 status_code,
142 ));
143 }
144
145 let stream = resp.bytes_stream();
146 let mut streamer = RawJsonRowStreamer::new(Decoder::new(stream), "results");
147 streamer.read_prelude().await.map_err(|e| {
150 Error::new_http_error(
151 e,
152 endpoint.clone(),
153 statement.to_string(),
154 client_context_id.to_string(),
155 )
156 })?;
157
158 let has_more_rows = streamer.has_more_rows().await;
159 let mut epilog = None;
160 if !has_more_rows {
161 epilog = match streamer.epilog() {
162 Ok(epilog) => Some(epilog),
163 Err(e) => {
164 return Err(Error::new_http_error(
165 e,
166 endpoint.clone(),
167 statement,
168 client_context_id,
169 ));
170 }
171 };
172 }
173
174 let mut reader = Self {
175 endpoint,
176 statement,
177 client_context_id,
178 status_code,
179 streamer: Box::pin(streamer.into_stream()),
180 meta_data: None,
181 meta_data_error: None,
182 };
183
184 if let Some(epilog) = epilog {
185 let meta = reader.read_final_metadata(epilog)?;
186
187 reader.meta_data = Some(meta);
188 }
189
190 Ok(reader)
191 }
192
193 pub fn metadata(&self) -> error::Result<&MetaData> {
194 if let Some(e) = &self.meta_data_error {
195 return Err(e.clone());
196 }
197
198 if let Some(meta) = &self.meta_data {
199 return Ok(meta);
200 }
201
202 Err(Error::new_message_error(
203 "cannot read meta-data until after all rows are read",
204 None,
205 None,
206 None,
207 ))
208 }
209
210 fn read_final_metadata(&mut self, epilog: Vec<u8>) -> error::Result<MetaData> {
211 let metadata: QueryMetaData = match serde_json::from_slice(&epilog) {
212 Ok(m) => m,
213 Err(e) => {
214 return Err(Error::new_message_error(
215 format!("failed to parse analytics metadata from epilog: {e}"),
216 self.endpoint.clone(),
217 self.statement.clone(),
218 self.client_context_id.clone(),
219 ));
220 }
221 };
222
223 self.parse_metadata(metadata)
224 }
225
226 fn parse_metadata(&self, metadata: QueryMetaData) -> error::Result<MetaData> {
227 if !metadata.errors.is_empty() {
228 return Err(Self::parse_errors(
229 &metadata.errors,
230 &self.endpoint,
231 &self.statement,
232 &self.client_context_id,
233 self.status_code,
234 ));
235 }
236
237 let metrics = self.parse_metrics(metadata.metrics);
238 let warnings = self.parse_warnings(metadata.warnings);
239
240 Ok(MetaData {
241 request_id: metadata.request_id,
242 client_context_id: metadata.client_context_id,
243 status: metadata.status,
244 metrics,
245 signature: metadata.signature,
246 warnings,
247 plans: metadata.plans.map(|p| MetadataPlans {
248 logical_plan: p.logical_plan,
249 optimized_logical_plan: p.optimized_logical_plan,
250 rewritten_expression_tree: p.rewritten_expression_tree,
251 expression_tree: p.expression_tree,
252 job: p.job,
253 }),
254 })
255 }
256
257 fn parse_metrics(&self, metrics: Option<QueryMetrics>) -> Option<Metrics> {
258 metrics.map(|m| {
259 let elapsed_time = if let Some(elapsed) = m.elapsed_time {
260 parse_duration_from_golang_string(&elapsed).unwrap_or_default()
261 } else {
262 Duration::default()
263 };
264
265 let execution_time = if let Some(execution) = m.execution_time {
266 parse_duration_from_golang_string(&execution).unwrap_or_default()
267 } else {
268 Duration::default()
269 };
270
271 Metrics {
272 elapsed_time,
273 execution_time,
274 result_count: m.result_count.unwrap_or_default(),
275 result_size: m.result_size.unwrap_or_default(),
276 error_count: m.error_count.unwrap_or_default(),
277 warning_count: m.warning_count.unwrap_or_default(),
278 processed_objects: m.processed_objects.unwrap_or_default(),
279 }
280 })
281 }
282
283 fn parse_warnings(&self, warnings: Vec<QueryWarning>) -> Vec<Warning> {
284 let mut converted = vec![];
285 for w in warnings {
286 converted.push(Warning {
287 code: w.code.unwrap_or_default(),
288 message: w.msg.unwrap_or_default(),
289 });
290 }
291
292 converted
293 }
294
295 fn parse_errors(
296 errors: &[QueryError],
297 endpoint: impl Into<String>,
298 statement: impl Into<String>,
299 client_context_id: impl Into<String>,
300 status_code: StatusCode,
301 ) -> Error {
302 let error_descs: Vec<ErrorDesc> = errors
303 .iter()
304 .map(|error| {
305 ErrorDesc::new(Self::parse_error_kind(error), error.code, error.msg.clone())
306 })
307 .collect();
308
309 let chosen_desc = &error_descs[0];
310
311 let mut server_error = ServerError::new(
312 chosen_desc.kind().clone(),
313 endpoint,
314 status_code,
315 chosen_desc.code(),
316 chosen_desc.message(),
317 )
318 .with_client_context_id(client_context_id)
319 .with_statement(statement);
320
321 if error_descs.len() > 1 {
322 server_error = server_error.with_error_descs(error_descs);
323 }
324
325 Error::new_server_error(server_error)
326 }
327
328 fn parse_error_kind(error: &QueryError) -> ServerErrorKind {
329 let err_code = error.code;
330 let err_code_group = err_code / 1000;
331
332 if err_code_group == 20 {
333 ServerErrorKind::AuthenticationFailure
334 } else if err_code_group == 24 {
335 if err_code == 24000 {
336 ServerErrorKind::ParsingFailure
337 } else if err_code == 24006 {
338 ServerErrorKind::LinkNotFound
339 } else if err_code == 24025 || err_code == 24044 || err_code == 24045 {
340 ServerErrorKind::DatasetNotFound
341 } else if err_code == 24034 {
342 ServerErrorKind::DataverseNotFound
343 } else if err_code == 24039 {
344 ServerErrorKind::DataverseExists
345 } else if err_code == 24040 {
346 ServerErrorKind::DatasetExists
347 } else if err_code == 24047 {
348 ServerErrorKind::IndexNotFound
349 } else if err_code == 24048 {
350 ServerErrorKind::IndexExists
351 } else if err_code == 24055 {
352 ServerErrorKind::LinkExists
353 } else {
354 ServerErrorKind::CompilationFailure
355 }
356 } else if err_code_group == 25 {
357 ServerErrorKind::Internal
358 } else if err_code == 23000 || err_code == 23003 {
359 ServerErrorKind::TemporaryFailure
360 } else if err_code == 23007 {
361 ServerErrorKind::JobQueueFull
362 } else {
363 ServerErrorKind::Unknown
364 }
365 }
366}