Skip to main content

couchbase_core/searchx/
search_respreader.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::httpx;
20use crate::httpx::decoder::Decoder;
21use crate::httpx::raw_json_row_streamer::{RawJsonRowItem, RawJsonRowStreamer};
22use crate::httpx::response::Response;
23use crate::searchx::error::{ErrorKind, ServerError, ServerErrorKind};
24use crate::searchx::search::{decode_common_error, Search};
25use crate::searchx::search_result::{FacetResult, MetaData, Metrics, ResultHit};
26use crate::searchx::{error, search_json};
27use bytes::Bytes;
28use futures::{FutureExt, Stream, StreamExt};
29use http::StatusCode;
30use std::collections::HashMap;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34use std::time::Duration;
35use tracing::debug;
36
37pub struct SearchRespReader {
38    endpoint: String,
39    status_code: StatusCode,
40
41    index_name: String,
42    streamer: Pin<Box<dyn Stream<Item = httpx::error::Result<RawJsonRowItem>> + Send>>,
43    meta_data: Option<MetaData>,
44    epilogue_error: Option<error::Error>,
45    facets: Option<HashMap<String, FacetResult>>,
46}
47
48impl Stream for SearchRespReader {
49    type Item = error::Result<ResultHit>;
50
51    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52        let this = self.get_mut();
53
54        match this.streamer.poll_next_unpin(cx) {
55            Poll::Ready(Some(Ok(RawJsonRowItem::Row(row_data)))) => {
56                let row: search_json::Row = match serde_json::from_slice(&row_data).map_err(|e| {
57                    error::Error::new_message_error(
58                        format!("failed to parse row: {}", &e),
59                        this.endpoint.clone(),
60                    )
61                }) {
62                    Ok(row) => row,
63                    Err(e) => return Poll::Ready(Some(Err(e))),
64                };
65
66                Poll::Ready(Some(Ok(ResultHit::from(row))))
67            }
68            Poll::Ready(Some(Ok(RawJsonRowItem::Metadata(metadata)))) => {
69                match this.read_final_metadata(metadata) {
70                    Ok((meta, facets)) => {
71                        this.meta_data = Some(meta);
72                        this.facets = Some(facets);
73                    }
74                    Err(e) => {
75                        this.epilogue_error = Some(e.clone());
76                        return Poll::Ready(Some(Err(e)));
77                    }
78                }
79                Poll::Ready(None)
80            }
81            Poll::Ready(Some(Err(e))) => {
82                Poll::Ready(Some(Err(error::Error::new_http_error(e, &this.endpoint))))
83            }
84            Poll::Ready(None) => Poll::Ready(None),
85            Poll::Pending => Poll::Pending,
86        }
87    }
88}
89
90impl SearchRespReader {
91    pub async fn new(
92        resp: Response,
93        index_name: impl Into<String>,
94        endpoint: impl Into<String>,
95    ) -> error::Result<Self> {
96        let endpoint = endpoint.into();
97        let index_name = index_name.into();
98
99        let status_code = resp.status();
100        if status_code != 200 {
101            let body = match resp.bytes().await {
102                Ok(b) => b,
103                Err(e) => {
104                    debug!("Failed to read response body on error {e}");
105                    return Err(error::Error::new_http_error(e, endpoint));
106                }
107            };
108
109            let err: search_json::ErrorResponse = match serde_json::from_slice(&body) {
110                Ok(e) => e,
111                Err(e) => {
112                    return Err(error::Error::new_message_error(
113                        format!(
114                        "non-200 status code received {status_code} but parsing error response body failed {e}"
115                    ),
116                        endpoint,
117                    ));
118                }
119            };
120
121            return Err(decode_common_error(
122                index_name,
123                status_code,
124                &err.error,
125                endpoint,
126            ));
127        }
128
129        let stream = resp.bytes_stream();
130        let mut streamer = RawJsonRowStreamer::new(Decoder::new(stream), "hits");
131
132        match streamer.read_prelude().await {
133            Ok(_) => {}
134            Err(e) => {
135                return Err(error::Error::new_http_error(e, endpoint));
136            }
137        };
138
139        let has_more_rows = streamer.has_more_rows().await;
140        let mut epilog = None;
141        if !has_more_rows {
142            epilog = match streamer.epilog() {
143                Ok(epilog) => Some(epilog),
144                Err(e) => {
145                    return Err(error::Error::new_http_error(e, endpoint));
146                }
147            };
148        }
149
150        let mut reader = Self {
151            endpoint,
152            status_code,
153            index_name,
154            streamer: Box::pin(streamer.into_stream()),
155            meta_data: None,
156            facets: None,
157            epilogue_error: None,
158        };
159
160        if let Some(epilog) = epilog {
161            let (meta, facets) = reader.read_final_metadata(epilog)?;
162
163            reader.meta_data = Some(meta);
164            reader.facets = Some(facets);
165        }
166
167        Ok(reader)
168    }
169
170    pub fn metadata(&self) -> error::Result<&MetaData> {
171        if let Some(e) = &self.epilogue_error {
172            return Err(e.clone());
173        }
174
175        if let Some(meta) = &self.meta_data {
176            return Ok(meta);
177        }
178
179        Err(error::Error::new_message_error(
180            "cannot read meta-data until after all rows are read",
181            None,
182        ))
183    }
184
185    pub fn facets(&self) -> error::Result<&HashMap<String, FacetResult>> {
186        if let Some(e) = &self.epilogue_error {
187            return Err(e.clone());
188        }
189
190        if let Some(facets) = &self.facets {
191            return Ok(facets);
192        }
193
194        Err(error::Error::new_message_error(
195            "cannot read facets until after all rows are read",
196            None,
197        ))
198    }
199
200    fn read_final_metadata(
201        &mut self,
202        epilog: Vec<u8>,
203    ) -> error::Result<(MetaData, HashMap<String, FacetResult>)> {
204        let metadata_json: search_json::SearchMetaData = match serde_json::from_slice(&epilog) {
205            Ok(m) => m,
206            Err(e) => {
207                return Err(error::Error::new_message_error(
208                    format!("failed to parse metadata: {}", &e),
209                    self.endpoint.clone(),
210                ));
211            }
212        };
213
214        let metadata = MetaData {
215            errors: metadata_json.status.errors,
216            metrics: Metrics {
217                failed_partition_count: metadata_json.status.failed,
218                max_score: metadata_json.max_score,
219                successful_partition_count: metadata_json.status.successful,
220                took: Duration::from_nanos(metadata_json.took),
221                total_hits: metadata_json.total_hits,
222                total_partition_count: metadata_json.status.total,
223            },
224        };
225
226        let mut facets: HashMap<String, FacetResult> = HashMap::new();
227        if let Some(resp_facets) = metadata_json.facets {
228            for (facet_name, facet_data) in resp_facets {
229                facets.insert(facet_name, facet_data.try_into()?);
230            }
231        }
232
233        Ok((metadata, facets))
234    }
235}