Skip to main content

couchbase_core/queryx/
query_respreader.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use std::pin::{pin, Pin};
20use std::ptr::read;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25use arc_swap::ArcSwap;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::future::err;
29use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
30use http::StatusCode;
31use regex::Regex;
32use tokio::sync::Mutex;
33use tracing::debug;
34
35use crate::helpers::durations::parse_duration_from_golang_string;
36use crate::httpx;
37use crate::httpx::decoder::Decoder;
38use crate::httpx::raw_json_row_streamer::{RawJsonRowItem, RawJsonRowStreamer};
39use crate::httpx::response::Response;
40use crate::memdx::magic::Magic::Res;
41use crate::queryx::error;
42use crate::queryx::error::{
43    Error, ErrorDesc, ErrorKind, ResourceError, ServerError, ServerErrorKind,
44};
45use crate::queryx::query_json::{
46    QueryEarlyMetaData, QueryError, QueryErrorResponse, QueryMetaData, QueryMetrics, QueryWarning,
47};
48use crate::queryx::query_result::{EarlyMetaData, MetaData, Metrics, Warning};
49
50pub struct QueryRespReader {
51    endpoint: String,
52    statement: String,
53    client_context_id: String,
54    status_code: StatusCode,
55
56    streamer: Pin<Box<dyn Stream<Item = httpx::error::Result<RawJsonRowItem>> + Send>>,
57    early_meta_data: EarlyMetaData,
58    meta_data: Option<MetaData>,
59    meta_data_error: Option<Error>,
60}
61
62impl Stream for QueryRespReader {
63    type Item = error::Result<Bytes>;
64
65    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66        let this = self.get_mut();
67
68        match this.streamer.poll_next_unpin(cx) {
69            Poll::Ready(Some(Ok(RawJsonRowItem::Row(row_data)))) => {
70                Poll::Ready(Some(Ok(Bytes::from(row_data))))
71            }
72            Poll::Ready(Some(Ok(RawJsonRowItem::Metadata(metadata)))) => {
73                match this.read_final_metadata(metadata) {
74                    Ok(meta) => this.meta_data = Some(meta),
75                    Err(e) => {
76                        this.meta_data_error = Some(e.clone());
77                        return Poll::Ready(Some(Err(e)));
78                    }
79                };
80                Poll::Ready(None)
81            }
82            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::new_http_error(
83                e,
84                this.endpoint.clone(),
85                Some(this.statement.clone()),
86                this.client_context_id.clone(),
87            )))),
88            Poll::Ready(None) => Poll::Ready(None),
89            Poll::Pending => Poll::Pending,
90        }
91    }
92}
93
94impl QueryRespReader {
95    pub async fn new(
96        resp: Response,
97        endpoint: impl Into<String>,
98        statement: impl Into<String>,
99        client_context_id: impl Into<String>,
100    ) -> error::Result<Self> {
101        let status_code = resp.status();
102        let endpoint = endpoint.into();
103        let statement = statement.into();
104        let client_context_id = client_context_id.into();
105        if status_code != 200 {
106            let body = match resp.bytes().await {
107                Ok(b) => b,
108                Err(e) => {
109                    debug!("Failed to read response body on error {}", &e);
110                    return Err(Error::new_http_error(
111                        e,
112                        endpoint,
113                        statement,
114                        client_context_id,
115                    ));
116                }
117            };
118
119            let errors: QueryErrorResponse = match serde_json::from_slice(&body) {
120                Ok(e) => e,
121                Err(e) => {
122                    return Err(Error::new_message_error(
123                        format!(
124                        "non-200 status code received {status_code} but parsing error response body failed {e}"
125                    ),
126                        None,
127                        None,
128                        None,
129                    ));
130                }
131            };
132
133            if errors.errors.is_empty() {
134                return Err(Error::new_message_error(
135                    format!(
136                        "Non-200 status code received {status_code} but response body contained no errors",
137                    ),
138                    None,
139                    None,
140                    None,
141                ));
142            }
143
144            return Err(Self::parse_errors(
145                &errors.errors,
146                endpoint,
147                statement,
148                client_context_id,
149                status_code,
150            ));
151        }
152
153        let stream = resp.bytes_stream();
154        let mut streamer = RawJsonRowStreamer::new(Decoder::new(stream), "results");
155
156        let early_meta_data =
157            Self::read_early_metadata(&mut streamer, &endpoint, &statement, &client_context_id)
158                .await?;
159
160        let has_more_rows = streamer.has_more_rows().await;
161        let mut epilog = None;
162        if !has_more_rows {
163            epilog = match streamer.epilog() {
164                Ok(epilog) => Some(epilog),
165                Err(e) => {
166                    return Err(Error::new_http_error(
167                        e,
168                        endpoint,
169                        statement,
170                        client_context_id,
171                    ));
172                }
173            };
174        }
175
176        let mut reader = Self {
177            endpoint,
178            statement,
179            client_context_id,
180            status_code,
181            streamer: Box::pin(streamer.into_stream()),
182            early_meta_data,
183            meta_data: None,
184            meta_data_error: None,
185        };
186
187        if let Some(epilog) = epilog {
188            let meta = reader.read_final_metadata(epilog)?;
189
190            reader.meta_data = Some(meta);
191        }
192
193        Ok(reader)
194    }
195
196    pub fn early_metadata(&self) -> &EarlyMetaData {
197        &self.early_meta_data
198    }
199
200    pub fn metadata(&self) -> error::Result<&MetaData> {
201        if let Some(e) = &self.meta_data_error {
202            return Err(e.clone());
203        }
204
205        if let Some(meta) = &self.meta_data {
206            return Ok(meta);
207        }
208
209        Err(Error::new_message_error(
210            "cannot read meta-data until after all rows are read",
211            None,
212            None,
213            None,
214        ))
215    }
216
217    async fn read_early_metadata(
218        streamer: &mut RawJsonRowStreamer,
219        endpoint: &str,
220        statement: &str,
221        client_context_id: &str,
222    ) -> error::Result<EarlyMetaData> {
223        let prelude = streamer.read_prelude().await.map_err(|e| {
224            Error::new_http_error(
225                e,
226                endpoint,
227                statement.to_string(),
228                client_context_id.to_string(),
229            )
230        })?;
231
232        let early_metadata: QueryEarlyMetaData = serde_json::from_slice(&prelude).map_err(|e| {
233            Error::new_message_error(
234                format!("failed to parse metadata from response: {e}"),
235                endpoint.to_string(),
236                statement.to_string(),
237                client_context_id.to_string(),
238            )
239        })?;
240
241        Ok(EarlyMetaData {
242            prepared: early_metadata.prepared,
243        })
244    }
245
246    fn read_final_metadata(&mut self, epilog: Vec<u8>) -> error::Result<MetaData> {
247        let metadata: QueryMetaData = match serde_json::from_slice(&epilog) {
248            Ok(m) => m,
249            Err(e) => {
250                return Err(Error::new_message_error(
251                    format!("failed to parse query metadata from epilog: {e}"),
252                    self.endpoint.clone(),
253                    self.statement.clone(),
254                    self.client_context_id.clone(),
255                ));
256            }
257        };
258
259        self.parse_metadata(metadata)
260    }
261
262    fn parse_metadata(&self, metadata: QueryMetaData) -> error::Result<MetaData> {
263        if !metadata.errors.is_empty() {
264            return Err(Self::parse_errors(
265                &metadata.errors,
266                &self.endpoint,
267                &self.statement,
268                &self.client_context_id,
269                self.status_code,
270            ));
271        }
272
273        let metrics = self.parse_metrics(metadata.metrics);
274        let warnings = self.parse_warnings(metadata.warnings);
275
276        Ok(MetaData {
277            prepared: metadata.early_meta_data.prepared,
278            request_id: metadata.request_id.unwrap_or_default(),
279            client_context_id: metadata.client_context_id.unwrap_or_default(),
280            status: metadata.status,
281            metrics,
282            signature: metadata.signature,
283            warnings,
284            profile: metadata.profile,
285        })
286    }
287
288    fn parse_metrics(&self, metrics: Option<QueryMetrics>) -> Option<Metrics> {
289        metrics.map(|m| {
290            let elapsed_time = if let Some(elapsed) = m.elapsed_time {
291                parse_duration_from_golang_string(&elapsed).unwrap_or_default()
292            } else {
293                Duration::default()
294            };
295
296            let execution_time = if let Some(execution) = m.execution_time {
297                parse_duration_from_golang_string(&execution).unwrap_or_default()
298            } else {
299                Duration::default()
300            };
301
302            Metrics {
303                elapsed_time,
304                execution_time,
305                result_count: m.result_count.unwrap_or_default(),
306                result_size: m.result_size.unwrap_or_default(),
307                mutation_count: m.mutation_count.unwrap_or_default(),
308                sort_count: m.sort_count.unwrap_or_default(),
309                error_count: m.error_count.unwrap_or_default(),
310                warning_count: m.warning_count.unwrap_or_default(),
311            }
312        })
313    }
314
315    fn parse_warnings(&self, warnings: Vec<QueryWarning>) -> Vec<Warning> {
316        let mut converted = vec![];
317        for w in warnings {
318            converted.push(Warning {
319                code: w.code.unwrap_or_default(),
320                message: w.msg.unwrap_or_default(),
321            });
322        }
323
324        converted
325    }
326
327    fn parse_errors(
328        errors: &[QueryError],
329        endpoint: impl Into<String>,
330        statement: impl Into<String>,
331        client_context_id: impl Into<String>,
332        status_code: StatusCode,
333    ) -> Error {
334        let error_descs: Vec<ErrorDesc> = errors
335            .iter()
336            .map(|error| {
337                ErrorDesc::new(
338                    Self::parse_error_kind(error),
339                    error.code,
340                    error.msg.clone(),
341                    error.retry.unwrap_or_default(),
342                    error.reason.clone(),
343                )
344            })
345            .collect();
346
347        let chosen_desc = error_descs
348            .iter()
349            .find(|desc| !desc.retry())
350            .unwrap_or(&error_descs[0]);
351
352        let mut server_error = ServerError::new(
353            chosen_desc.kind().clone(),
354            endpoint,
355            status_code,
356            chosen_desc.code(),
357            chosen_desc.message(),
358        )
359        .with_client_context_id(client_context_id)
360        .with_statement(statement);
361
362        if error_descs.len() > 1 {
363            server_error = server_error.with_error_descs(error_descs);
364        }
365
366        match server_error.kind() {
367            ServerErrorKind::ScopeNotFound => {
368                Error::new_resource_error(ResourceError::new(server_error))
369            }
370            ServerErrorKind::CollectionNotFound => {
371                Error::new_resource_error(ResourceError::new(server_error))
372            }
373            ServerErrorKind::IndexNotFound => {
374                Error::new_resource_error(ResourceError::new(server_error))
375            }
376            ServerErrorKind::IndexExists => {
377                Error::new_resource_error(ResourceError::new(server_error))
378            }
379            ServerErrorKind::AuthenticationFailure => {
380                if server_error.code() == 13014 {
381                    Error::new_resource_error(ResourceError::new(server_error))
382                } else {
383                    Error::new_server_error(server_error)
384                }
385            }
386            _ => Error::new_server_error(server_error),
387        }
388    }
389
390    fn parse_error_kind(error: &QueryError) -> ServerErrorKind {
391        let err_code = error.code;
392        let err_code_group = err_code / 1000;
393
394        if err_code_group == 4 {
395            if err_code == 4040
396                || err_code == 4050
397                || err_code == 4060
398                || err_code == 4070
399                || err_code == 4080
400                || err_code == 4090
401            {
402                ServerErrorKind::PreparedStatementFailure
403            } else if err_code == 4300 {
404                ServerErrorKind::IndexExists
405            } else {
406                ServerErrorKind::PlanningFailure
407            }
408        } else if err_code_group == 5 {
409            let msg = error.msg.to_lowercase();
410            if msg.contains("not enough") && msg.contains("replica") {
411                ServerErrorKind::InvalidArgument {
412                    argument: "num_replicas".to_string(),
413                    reason: "not enough indexer nodes to create index with replica count"
414                        .to_string(),
415                }
416            } else if msg.contains("build already in progress") {
417                ServerErrorKind::BuildAlreadyInProgress
418            } else if Regex::new(".*?ndex .*? already exist.*")
419                .unwrap()
420                .is_match(&error.msg)
421            {
422                ServerErrorKind::IndexExists
423            } else if Regex::new(".*?ndex .*? not exist.*")
424                .unwrap()
425                .is_match(&error.msg)
426            {
427                ServerErrorKind::IndexNotFound
428            } else {
429                ServerErrorKind::Internal
430            }
431        } else if err_code_group == 12 {
432            if err_code == 12003 {
433                ServerErrorKind::CollectionNotFound
434            } else if err_code == 12004 {
435                ServerErrorKind::IndexNotFound
436            } else if err_code == 12009 {
437                if !error.reason.is_empty() {
438                    if let Some(code) = error.reason.get("code") {
439                        if code == 12033 {
440                            ServerErrorKind::CasMismatch
441                        } else if code == 17014 {
442                            ServerErrorKind::DocNotFound
443                        } else if code == 17012 {
444                            ServerErrorKind::DocExists
445                        } else {
446                            ServerErrorKind::DMLFailure
447                        }
448                    } else {
449                        ServerErrorKind::DMLFailure
450                    }
451                } else if error.msg.to_lowercase().contains("cas mismatch") {
452                    ServerErrorKind::CasMismatch
453                } else {
454                    ServerErrorKind::DMLFailure
455                }
456            } else if err_code == 12016 {
457                ServerErrorKind::IndexNotFound
458            } else if err_code == 12021 {
459                ServerErrorKind::ScopeNotFound
460            } else {
461                ServerErrorKind::IndexFailure
462            }
463        } else if err_code_group == 14 {
464            ServerErrorKind::IndexFailure
465        } else if err_code_group == 10 {
466            ServerErrorKind::AuthenticationFailure
467        } else if err_code == 1000 {
468            ServerErrorKind::WriteInReadOnlyMode
469        } else if err_code == 1080 {
470            ServerErrorKind::Timeout
471        } else if err_code == 3000 {
472            ServerErrorKind::ParsingFailure
473        } else if err_code == 13014 {
474            ServerErrorKind::AuthenticationFailure
475        } else {
476            ServerErrorKind::Unknown
477        }
478    }
479}