Skip to main content

couchbase_core/analyticsx/
query_respreader.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use std::pin::{pin, Pin};
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use crate::analyticsx::error;
25use crate::analyticsx::error::ErrorKind::Server;
26use crate::analyticsx::error::{Error, ErrorDesc, ServerError, ServerErrorKind};
27use crate::analyticsx::query_json::{
28    QueryError, QueryErrorResponse, QueryMetaData, QueryMetrics, QueryWarning,
29};
30use crate::analyticsx::query_result::{MetaData, MetadataPlans, Metrics, Warning};
31use crate::helpers::durations::parse_duration_from_golang_string;
32use crate::httpx;
33use crate::httpx::decoder::Decoder;
34use crate::httpx::raw_json_row_streamer::{RawJsonRowItem, RawJsonRowStreamer};
35use crate::httpx::response::Response;
36use arc_swap::ArcSwap;
37use async_trait::async_trait;
38use bytes::Bytes;
39use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
40use http::StatusCode;
41use tracing::debug;
42
43pub struct QueryRespReader {
44    endpoint: String,
45    statement: String,
46    client_context_id: String,
47    status_code: StatusCode,
48
49    streamer: Pin<Box<dyn Stream<Item = httpx::error::Result<RawJsonRowItem>> + Send>>,
50    meta_data: Option<MetaData>,
51    meta_data_error: Option<Error>,
52}
53
54impl Stream for QueryRespReader {
55    type Item = error::Result<Bytes>;
56
57    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58        let this = self.get_mut();
59
60        match this.streamer.poll_next_unpin(cx) {
61            Poll::Ready(Some(Ok(RawJsonRowItem::Row(row_data)))) => {
62                Poll::Ready(Some(Ok(Bytes::from(row_data))))
63            }
64            Poll::Ready(Some(Ok(RawJsonRowItem::Metadata(metadata)))) => {
65                match this.read_final_metadata(metadata) {
66                    Ok(meta) => this.meta_data = Some(meta),
67                    Err(e) => {
68                        this.meta_data_error = Some(e.clone());
69                        return Poll::Ready(Some(Err(e)));
70                    }
71                };
72                Poll::Ready(None)
73            }
74            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::new_http_error(
75                e,
76                this.endpoint.to_string(),
77                Some(this.statement.clone()),
78                this.client_context_id.clone(),
79            )))),
80            Poll::Ready(None) => Poll::Ready(None),
81            Poll::Pending => Poll::Pending,
82        }
83    }
84}
85
86impl QueryRespReader {
87    pub async fn new(
88        resp: Response,
89        endpoint: impl Into<String>,
90        statement: impl Into<String>,
91        client_context_id: impl Into<String>,
92    ) -> error::Result<Self> {
93        let status_code = resp.status();
94        let endpoint = endpoint.into();
95        let statement = statement.into();
96        let client_context_id = client_context_id.into();
97        if status_code != 200 {
98            let body = match resp.bytes().await {
99                Ok(b) => b,
100                Err(e) => {
101                    debug!("Failed to read response body on error {}", &e);
102                    return Err(Error::new_http_error(
103                        e,
104                        endpoint,
105                        statement,
106                        client_context_id,
107                    ));
108                }
109            };
110
111            let errors: QueryErrorResponse = match serde_json::from_slice(&body) {
112                Ok(e) => e,
113                Err(e) => {
114                    return Err(Error::new_message_error(
115                        format!(
116                        "non-200 status code received {status_code} but parsing error response body failed {e}"
117                    ),
118                        None,
119                        None,
120                        None,
121                    ));
122                }
123            };
124
125            if errors.errors.is_empty() {
126                return Err(Error::new_message_error(
127                    format!(
128                        "Non-200 status code received {status_code} but response body contained no errors",
129                    ),
130                    None,
131                    None,
132                    None,
133                ));
134            }
135
136            return Err(Self::parse_errors(
137                &errors.errors,
138                endpoint,
139                statement,
140                client_context_id,
141                status_code,
142            ));
143        }
144
145        let stream = resp.bytes_stream();
146        let mut streamer = RawJsonRowStreamer::new(Decoder::new(stream), "results");
147        // There is no actual prelude we need to hold onto, but we need to trigger the reading of
148        // the stream to advance the streamer to the rows.
149        streamer.read_prelude().await.map_err(|e| {
150            Error::new_http_error(
151                e,
152                endpoint.clone(),
153                statement.to_string(),
154                client_context_id.to_string(),
155            )
156        })?;
157
158        let has_more_rows = streamer.has_more_rows().await;
159        let mut epilog = None;
160        if !has_more_rows {
161            epilog = match streamer.epilog() {
162                Ok(epilog) => Some(epilog),
163                Err(e) => {
164                    return Err(Error::new_http_error(
165                        e,
166                        endpoint.clone(),
167                        statement,
168                        client_context_id,
169                    ));
170                }
171            };
172        }
173
174        let mut reader = Self {
175            endpoint,
176            statement,
177            client_context_id,
178            status_code,
179            streamer: Box::pin(streamer.into_stream()),
180            meta_data: None,
181            meta_data_error: None,
182        };
183
184        if let Some(epilog) = epilog {
185            let meta = reader.read_final_metadata(epilog)?;
186
187            reader.meta_data = Some(meta);
188        }
189
190        Ok(reader)
191    }
192
193    pub fn metadata(&self) -> error::Result<&MetaData> {
194        if let Some(e) = &self.meta_data_error {
195            return Err(e.clone());
196        }
197
198        if let Some(meta) = &self.meta_data {
199            return Ok(meta);
200        }
201
202        Err(Error::new_message_error(
203            "cannot read meta-data until after all rows are read",
204            None,
205            None,
206            None,
207        ))
208    }
209
210    fn read_final_metadata(&mut self, epilog: Vec<u8>) -> error::Result<MetaData> {
211        let metadata: QueryMetaData = match serde_json::from_slice(&epilog) {
212            Ok(m) => m,
213            Err(e) => {
214                return Err(Error::new_message_error(
215                    format!("failed to parse analytics metadata from epilog: {e}"),
216                    self.endpoint.clone(),
217                    self.statement.clone(),
218                    self.client_context_id.clone(),
219                ));
220            }
221        };
222
223        self.parse_metadata(metadata)
224    }
225
226    fn parse_metadata(&self, metadata: QueryMetaData) -> error::Result<MetaData> {
227        if !metadata.errors.is_empty() {
228            return Err(Self::parse_errors(
229                &metadata.errors,
230                &self.endpoint,
231                &self.statement,
232                &self.client_context_id,
233                self.status_code,
234            ));
235        }
236
237        let metrics = self.parse_metrics(metadata.metrics);
238        let warnings = self.parse_warnings(metadata.warnings);
239
240        Ok(MetaData {
241            request_id: metadata.request_id,
242            client_context_id: metadata.client_context_id,
243            status: metadata.status,
244            metrics,
245            signature: metadata.signature,
246            warnings,
247            plans: metadata.plans.map(|p| MetadataPlans {
248                logical_plan: p.logical_plan,
249                optimized_logical_plan: p.optimized_logical_plan,
250                rewritten_expression_tree: p.rewritten_expression_tree,
251                expression_tree: p.expression_tree,
252                job: p.job,
253            }),
254        })
255    }
256
257    fn parse_metrics(&self, metrics: Option<QueryMetrics>) -> Option<Metrics> {
258        metrics.map(|m| {
259            let elapsed_time = if let Some(elapsed) = m.elapsed_time {
260                parse_duration_from_golang_string(&elapsed).unwrap_or_default()
261            } else {
262                Duration::default()
263            };
264
265            let execution_time = if let Some(execution) = m.execution_time {
266                parse_duration_from_golang_string(&execution).unwrap_or_default()
267            } else {
268                Duration::default()
269            };
270
271            Metrics {
272                elapsed_time,
273                execution_time,
274                result_count: m.result_count.unwrap_or_default(),
275                result_size: m.result_size.unwrap_or_default(),
276                error_count: m.error_count.unwrap_or_default(),
277                warning_count: m.warning_count.unwrap_or_default(),
278                processed_objects: m.processed_objects.unwrap_or_default(),
279            }
280        })
281    }
282
283    fn parse_warnings(&self, warnings: Vec<QueryWarning>) -> Vec<Warning> {
284        let mut converted = vec![];
285        for w in warnings {
286            converted.push(Warning {
287                code: w.code.unwrap_or_default(),
288                message: w.msg.unwrap_or_default(),
289            });
290        }
291
292        converted
293    }
294
295    fn parse_errors(
296        errors: &[QueryError],
297        endpoint: impl Into<String>,
298        statement: impl Into<String>,
299        client_context_id: impl Into<String>,
300        status_code: StatusCode,
301    ) -> Error {
302        let error_descs: Vec<ErrorDesc> = errors
303            .iter()
304            .map(|error| {
305                ErrorDesc::new(Self::parse_error_kind(error), error.code, error.msg.clone())
306            })
307            .collect();
308
309        let chosen_desc = &error_descs[0];
310
311        let mut server_error = ServerError::new(
312            chosen_desc.kind().clone(),
313            endpoint,
314            status_code,
315            chosen_desc.code(),
316            chosen_desc.message(),
317        )
318        .with_client_context_id(client_context_id)
319        .with_statement(statement);
320
321        if error_descs.len() > 1 {
322            server_error = server_error.with_error_descs(error_descs);
323        }
324
325        Error::new_server_error(server_error)
326    }
327
328    fn parse_error_kind(error: &QueryError) -> ServerErrorKind {
329        let err_code = error.code;
330        let err_code_group = err_code / 1000;
331
332        if err_code_group == 20 {
333            ServerErrorKind::AuthenticationFailure
334        } else if err_code_group == 24 {
335            if err_code == 24000 {
336                ServerErrorKind::ParsingFailure
337            } else if err_code == 24006 {
338                ServerErrorKind::LinkNotFound
339            } else if err_code == 24025 || err_code == 24044 || err_code == 24045 {
340                ServerErrorKind::DatasetNotFound
341            } else if err_code == 24034 {
342                ServerErrorKind::DataverseNotFound
343            } else if err_code == 24039 {
344                ServerErrorKind::DataverseExists
345            } else if err_code == 24040 {
346                ServerErrorKind::DatasetExists
347            } else if err_code == 24047 {
348                ServerErrorKind::IndexNotFound
349            } else if err_code == 24048 {
350                ServerErrorKind::IndexExists
351            } else if err_code == 24055 {
352                ServerErrorKind::LinkExists
353            } else {
354                ServerErrorKind::CompilationFailure
355            }
356        } else if err_code_group == 25 {
357            ServerErrorKind::Internal
358        } else if err_code == 23000 || err_code == 23003 {
359            ServerErrorKind::TemporaryFailure
360        } else if err_code == 23007 {
361            ServerErrorKind::JobQueueFull
362        } else {
363            ServerErrorKind::Unknown
364        }
365    }
366}