couchbase_core/queryx/
query_respreader.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use std::pin::{pin, Pin};
20use std::ptr::read;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25use arc_swap::ArcSwap;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::future::err;
29use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
30use http::StatusCode;
31use log::debug;
32use regex::Regex;
33use tokio::sync::Mutex;
34
35use crate::helpers::durations::parse_duration_from_golang_string;
36use crate::httpx;
37use crate::httpx::decoder::Decoder;
38use crate::httpx::raw_json_row_streamer::{RawJsonRowItem, RawJsonRowStreamer};
39use crate::httpx::response::Response;
40use crate::memdx::magic::Magic::Res;
41use crate::queryx::error;
42use crate::queryx::error::{
43    Error, ErrorDesc, ErrorKind, ResourceError, ServerError, ServerErrorKind,
44};
45use crate::queryx::query_json::{
46    QueryEarlyMetaData, QueryError, QueryErrorResponse, QueryMetaData, QueryMetrics, QueryWarning,
47};
48use crate::queryx::query_result::{EarlyMetaData, MetaData, Metrics, Warning};
49
50pub struct QueryRespReader {
51    endpoint: String,
52    statement: String,
53    client_context_id: String,
54    status_code: StatusCode,
55
56    streamer: Pin<Box<dyn Stream<Item = httpx::error::Result<RawJsonRowItem>> + Send>>,
57    early_meta_data: EarlyMetaData,
58    meta_data: Option<MetaData>,
59    meta_data_error: Option<Error>,
60}
61
62impl Stream for QueryRespReader {
63    type Item = error::Result<Bytes>;
64
65    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66        let this = self.get_mut();
67
68        match this.streamer.poll_next_unpin(cx) {
69            Poll::Ready(Some(Ok(RawJsonRowItem::Row(row_data)))) => {
70                Poll::Ready(Some(Ok(Bytes::from(row_data))))
71            }
72            Poll::Ready(Some(Ok(RawJsonRowItem::Metadata(metadata)))) => {
73                match this.read_final_metadata(metadata) {
74                    Ok(meta) => this.meta_data = Some(meta),
75                    Err(e) => {
76                        this.meta_data_error = Some(e.clone());
77                        return Poll::Ready(Some(Err(e)));
78                    }
79                };
80                Poll::Ready(None)
81            }
82            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::new_http_error(
83                format!("{}: {}", &this.endpoint, e),
84                Some(this.statement.clone()),
85                this.client_context_id.clone(),
86            )))),
87            Poll::Ready(None) => Poll::Ready(None),
88            Poll::Pending => Poll::Pending,
89        }
90    }
91}
92
93impl QueryRespReader {
94    pub async fn new(
95        resp: Response,
96        endpoint: impl Into<String>,
97        statement: impl Into<String>,
98        client_context_id: impl Into<String>,
99    ) -> error::Result<Self> {
100        let status_code = resp.status();
101        let endpoint = endpoint.into();
102        let statement = statement.into();
103        let client_context_id = client_context_id.into();
104        if status_code != 200 {
105            let body = match resp.bytes().await {
106                Ok(b) => b,
107                Err(e) => {
108                    debug!("Failed to read response body on error {}", &e);
109                    return Err(Error::new_http_error(
110                        format!("{endpoint}: {e}"),
111                        statement,
112                        client_context_id,
113                    ));
114                }
115            };
116
117            let errors: QueryErrorResponse = match serde_json::from_slice(&body) {
118                Ok(e) => e,
119                Err(e) => {
120                    return Err(Error::new_message_error(
121                        format!(
122                        "non-200 status code received {status_code} but parsing error response body failed {e}"
123                    ),
124                        None,
125                        None,
126                        None,
127                    ));
128                }
129            };
130
131            if errors.errors.is_empty() {
132                return Err(Error::new_message_error(
133                    format!(
134                        "Non-200 status code received {status_code} but response body contained no errors",
135                    ),
136                    None,
137                    None,
138                    None,
139                ));
140            }
141
142            return Err(Self::parse_errors(
143                &errors.errors,
144                endpoint,
145                statement,
146                client_context_id,
147                status_code,
148            ));
149        }
150
151        let stream = resp.bytes_stream();
152        let mut streamer = RawJsonRowStreamer::new(Decoder::new(stream), "results");
153
154        let early_meta_data =
155            Self::read_early_metadata(&mut streamer, &endpoint, &statement, &client_context_id)
156                .await?;
157
158        let has_more_rows = streamer.has_more_rows().await;
159        let mut epilog = None;
160        if !has_more_rows {
161            epilog = match streamer.epilog() {
162                Ok(epilog) => Some(epilog),
163                Err(e) => {
164                    return Err(Error::new_http_error(
165                        format!("{endpoint}: {e}"),
166                        statement,
167                        client_context_id,
168                    ));
169                }
170            };
171        }
172
173        let mut reader = Self {
174            endpoint,
175            statement,
176            client_context_id,
177            status_code,
178            streamer: Box::pin(streamer.into_stream()),
179            early_meta_data,
180            meta_data: None,
181            meta_data_error: None,
182        };
183
184        if let Some(epilog) = epilog {
185            let meta = reader.read_final_metadata(epilog)?;
186
187            reader.meta_data = Some(meta);
188        }
189
190        Ok(reader)
191    }
192
193    pub fn early_metadata(&self) -> &EarlyMetaData {
194        &self.early_meta_data
195    }
196
197    pub fn metadata(&self) -> error::Result<&MetaData> {
198        if let Some(e) = &self.meta_data_error {
199            return Err(e.clone());
200        }
201
202        if let Some(meta) = &self.meta_data {
203            return Ok(meta);
204        }
205
206        Err(Error::new_message_error(
207            "cannot read meta-data until after all rows are read",
208            None,
209            None,
210            None,
211        ))
212    }
213
214    async fn read_early_metadata(
215        streamer: &mut RawJsonRowStreamer,
216        endpoint: &str,
217        statement: &str,
218        client_context_id: &str,
219    ) -> error::Result<EarlyMetaData> {
220        let prelude = streamer.read_prelude().await.map_err(|e| {
221            Error::new_http_error(
222                format!("{endpoint}: {e}"),
223                statement.to_string(),
224                client_context_id.to_string(),
225            )
226        })?;
227
228        let early_metadata: QueryEarlyMetaData = serde_json::from_slice(&prelude).map_err(|e| {
229            Error::new_message_error(
230                format!("failed to parse metadata from response: {e}"),
231                endpoint.to_string(),
232                statement.to_string(),
233                client_context_id.to_string(),
234            )
235        })?;
236
237        Ok(EarlyMetaData {
238            prepared: early_metadata.prepared,
239        })
240    }
241
242    fn read_final_metadata(&mut self, epilog: Vec<u8>) -> error::Result<MetaData> {
243        let metadata: QueryMetaData = match serde_json::from_slice(&epilog) {
244            Ok(m) => m,
245            Err(e) => {
246                return Err(Error::new_message_error(
247                    format!("failed to parse query metadata from epilog: {e}"),
248                    self.endpoint.clone(),
249                    self.statement.clone(),
250                    self.client_context_id.clone(),
251                ));
252            }
253        };
254
255        self.parse_metadata(metadata)
256    }
257
258    fn parse_metadata(&self, metadata: QueryMetaData) -> error::Result<MetaData> {
259        if !metadata.errors.is_empty() {
260            return Err(Self::parse_errors(
261                &metadata.errors,
262                &self.endpoint,
263                &self.statement,
264                &self.client_context_id,
265                self.status_code,
266            ));
267        }
268
269        let metrics = self.parse_metrics(metadata.metrics);
270        let warnings = self.parse_warnings(metadata.warnings);
271
272        Ok(MetaData {
273            prepared: metadata.early_meta_data.prepared,
274            request_id: metadata.request_id.unwrap_or_default(),
275            client_context_id: metadata.client_context_id.unwrap_or_default(),
276            status: metadata.status,
277            metrics,
278            signature: metadata.signature,
279            warnings,
280            profile: metadata.profile,
281        })
282    }
283
284    fn parse_metrics(&self, metrics: Option<QueryMetrics>) -> Option<Metrics> {
285        metrics.map(|m| {
286            let elapsed_time = if let Some(elapsed) = m.elapsed_time {
287                parse_duration_from_golang_string(&elapsed).unwrap_or_default()
288            } else {
289                Duration::default()
290            };
291
292            let execution_time = if let Some(execution) = m.execution_time {
293                parse_duration_from_golang_string(&execution).unwrap_or_default()
294            } else {
295                Duration::default()
296            };
297
298            Metrics {
299                elapsed_time,
300                execution_time,
301                result_count: m.result_count.unwrap_or_default(),
302                result_size: m.result_size.unwrap_or_default(),
303                mutation_count: m.mutation_count.unwrap_or_default(),
304                sort_count: m.sort_count.unwrap_or_default(),
305                error_count: m.error_count.unwrap_or_default(),
306                warning_count: m.warning_count.unwrap_or_default(),
307            }
308        })
309    }
310
311    fn parse_warnings(&self, warnings: Vec<QueryWarning>) -> Vec<Warning> {
312        let mut converted = vec![];
313        for w in warnings {
314            converted.push(Warning {
315                code: w.code.unwrap_or_default(),
316                message: w.msg.unwrap_or_default(),
317            });
318        }
319
320        converted
321    }
322
323    fn parse_errors(
324        errors: &[QueryError],
325        endpoint: impl Into<String>,
326        statement: impl Into<String>,
327        client_context_id: impl Into<String>,
328        status_code: StatusCode,
329    ) -> Error {
330        let error_descs: Vec<ErrorDesc> = errors
331            .iter()
332            .map(|error| {
333                ErrorDesc::new(
334                    Self::parse_error_kind(error),
335                    error.code,
336                    error.msg.clone(),
337                    error.retry.unwrap_or_default(),
338                    error.reason.clone(),
339                )
340            })
341            .collect();
342
343        let chosen_desc = error_descs
344            .iter()
345            .find(|desc| !desc.retry())
346            .unwrap_or(&error_descs[0]);
347
348        let mut server_error = ServerError::new(
349            chosen_desc.kind().clone(),
350            endpoint,
351            status_code,
352            chosen_desc.code(),
353            chosen_desc.message(),
354        )
355        .with_client_context_id(client_context_id)
356        .with_statement(statement);
357
358        if error_descs.len() > 1 {
359            server_error = server_error.with_error_descs(error_descs);
360        }
361
362        match server_error.kind() {
363            ServerErrorKind::ScopeNotFound => {
364                Error::new_resource_error(ResourceError::new(server_error))
365            }
366            ServerErrorKind::CollectionNotFound => {
367                Error::new_resource_error(ResourceError::new(server_error))
368            }
369            ServerErrorKind::IndexNotFound => {
370                Error::new_resource_error(ResourceError::new(server_error))
371            }
372            ServerErrorKind::IndexExists => {
373                Error::new_resource_error(ResourceError::new(server_error))
374            }
375            ServerErrorKind::AuthenticationFailure => {
376                if server_error.code() == 13014 {
377                    Error::new_resource_error(ResourceError::new(server_error))
378                } else {
379                    Error::new_server_error(server_error)
380                }
381            }
382            _ => Error::new_server_error(server_error),
383        }
384    }
385
386    fn parse_error_kind(error: &QueryError) -> ServerErrorKind {
387        let err_code = error.code;
388        let err_code_group = err_code / 1000;
389
390        if err_code_group == 4 {
391            if err_code == 4040
392                || err_code == 4050
393                || err_code == 4060
394                || err_code == 4070
395                || err_code == 4080
396                || err_code == 4090
397            {
398                ServerErrorKind::PreparedStatementFailure
399            } else if err_code == 4300 {
400                ServerErrorKind::IndexExists
401            } else {
402                ServerErrorKind::PlanningFailure
403            }
404        } else if err_code_group == 5 {
405            let msg = error.msg.to_lowercase();
406            if msg.contains("not enough") && msg.contains("replica") {
407                ServerErrorKind::InvalidArgument {
408                    argument: "num_replicas".to_string(),
409                    reason: "not enough indexer nodes to create index with replica count"
410                        .to_string(),
411                }
412            } else if msg.contains("build already in progress") {
413                ServerErrorKind::BuildAlreadyInProgress
414            } else if Regex::new(".*?ndex .*? already exist.*")
415                .unwrap()
416                .is_match(&error.msg)
417            {
418                ServerErrorKind::IndexExists
419            } else if Regex::new(".*?ndex .*? not exist.*")
420                .unwrap()
421                .is_match(&error.msg)
422            {
423                ServerErrorKind::IndexNotFound
424            } else {
425                ServerErrorKind::Internal
426            }
427        } else if err_code_group == 12 {
428            if err_code == 12003 {
429                ServerErrorKind::CollectionNotFound
430            } else if err_code == 12004 {
431                ServerErrorKind::IndexNotFound
432            } else if err_code == 12009 {
433                if !error.reason.is_empty() {
434                    if let Some(code) = error.reason.get("code") {
435                        if code == 12033 {
436                            ServerErrorKind::CasMismatch
437                        } else if code == 17014 {
438                            ServerErrorKind::DocNotFound
439                        } else if code == 17012 {
440                            ServerErrorKind::DocExists
441                        } else {
442                            ServerErrorKind::DMLFailure
443                        }
444                    } else {
445                        ServerErrorKind::DMLFailure
446                    }
447                } else if error.msg.to_lowercase().contains("cas mismatch") {
448                    ServerErrorKind::CasMismatch
449                } else {
450                    ServerErrorKind::DMLFailure
451                }
452            } else if err_code == 12016 {
453                ServerErrorKind::IndexNotFound
454            } else if err_code == 12021 {
455                ServerErrorKind::ScopeNotFound
456            } else {
457                ServerErrorKind::IndexFailure
458            }
459        } else if err_code_group == 14 {
460            ServerErrorKind::IndexFailure
461        } else if err_code_group == 10 {
462            ServerErrorKind::AuthenticationFailure
463        } else if err_code == 1000 {
464            ServerErrorKind::WriteInReadOnlyMode
465        } else if err_code == 1080 {
466            ServerErrorKind::Timeout
467        } else if err_code == 3000 {
468            ServerErrorKind::ParsingFailure
469        } else if err_code == 13014 {
470            ServerErrorKind::AuthenticationFailure
471        } else {
472            ServerErrorKind::Unknown
473        }
474    }
475}