couchbase_core/searchx/
search_respreader.rs1use crate::httpx;
20use crate::httpx::decoder::Decoder;
21use crate::httpx::raw_json_row_streamer::{RawJsonRowItem, RawJsonRowStreamer};
22use crate::httpx::response::Response;
23use crate::searchx::error::{ErrorKind, ServerError, ServerErrorKind};
24use crate::searchx::search::{decode_common_error, Search};
25use crate::searchx::search_result::{FacetResult, MetaData, Metrics, ResultHit};
26use crate::searchx::{error, search_json};
27use bytes::Bytes;
28use futures::{FutureExt, Stream, StreamExt};
29use http::StatusCode;
30use std::collections::HashMap;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34use std::time::Duration;
35use tracing::debug;
36
37pub struct SearchRespReader {
38 endpoint: String,
39 status_code: StatusCode,
40
41 index_name: String,
42 streamer: Pin<Box<dyn Stream<Item = httpx::error::Result<RawJsonRowItem>> + Send>>,
43 meta_data: Option<MetaData>,
44 epilogue_error: Option<error::Error>,
45 facets: Option<HashMap<String, FacetResult>>,
46}
47
48impl Stream for SearchRespReader {
49 type Item = error::Result<ResultHit>;
50
51 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52 let this = self.get_mut();
53
54 match this.streamer.poll_next_unpin(cx) {
55 Poll::Ready(Some(Ok(RawJsonRowItem::Row(row_data)))) => {
56 let row: search_json::Row = match serde_json::from_slice(&row_data).map_err(|e| {
57 error::Error::new_message_error(
58 format!("failed to parse row: {}", &e),
59 this.endpoint.clone(),
60 )
61 }) {
62 Ok(row) => row,
63 Err(e) => return Poll::Ready(Some(Err(e))),
64 };
65
66 Poll::Ready(Some(Ok(ResultHit::from(row))))
67 }
68 Poll::Ready(Some(Ok(RawJsonRowItem::Metadata(metadata)))) => {
69 match this.read_final_metadata(metadata) {
70 Ok((meta, facets)) => {
71 this.meta_data = Some(meta);
72 this.facets = Some(facets);
73 }
74 Err(e) => {
75 this.epilogue_error = Some(e.clone());
76 return Poll::Ready(Some(Err(e)));
77 }
78 }
79 Poll::Ready(None)
80 }
81 Poll::Ready(Some(Err(e))) => {
82 Poll::Ready(Some(Err(error::Error::new_http_error(e, &this.endpoint))))
83 }
84 Poll::Ready(None) => Poll::Ready(None),
85 Poll::Pending => Poll::Pending,
86 }
87 }
88}
89
90impl SearchRespReader {
91 pub async fn new(
92 resp: Response,
93 index_name: impl Into<String>,
94 endpoint: impl Into<String>,
95 ) -> error::Result<Self> {
96 let endpoint = endpoint.into();
97 let index_name = index_name.into();
98
99 let status_code = resp.status();
100 if status_code != 200 {
101 let body = match resp.bytes().await {
102 Ok(b) => b,
103 Err(e) => {
104 debug!("Failed to read response body on error {e}");
105 return Err(error::Error::new_http_error(e, endpoint));
106 }
107 };
108
109 let err: search_json::ErrorResponse = match serde_json::from_slice(&body) {
110 Ok(e) => e,
111 Err(e) => {
112 return Err(error::Error::new_message_error(
113 format!(
114 "non-200 status code received {status_code} but parsing error response body failed {e}"
115 ),
116 endpoint,
117 ));
118 }
119 };
120
121 return Err(decode_common_error(
122 index_name,
123 status_code,
124 &err.error,
125 endpoint,
126 ));
127 }
128
129 let stream = resp.bytes_stream();
130 let mut streamer = RawJsonRowStreamer::new(Decoder::new(stream), "hits");
131
132 match streamer.read_prelude().await {
133 Ok(_) => {}
134 Err(e) => {
135 return Err(error::Error::new_http_error(e, endpoint));
136 }
137 };
138
139 let has_more_rows = streamer.has_more_rows().await;
140 let mut epilog = None;
141 if !has_more_rows {
142 epilog = match streamer.epilog() {
143 Ok(epilog) => Some(epilog),
144 Err(e) => {
145 return Err(error::Error::new_http_error(e, endpoint));
146 }
147 };
148 }
149
150 let mut reader = Self {
151 endpoint,
152 status_code,
153 index_name,
154 streamer: Box::pin(streamer.into_stream()),
155 meta_data: None,
156 facets: None,
157 epilogue_error: None,
158 };
159
160 if let Some(epilog) = epilog {
161 let (meta, facets) = reader.read_final_metadata(epilog)?;
162
163 reader.meta_data = Some(meta);
164 reader.facets = Some(facets);
165 }
166
167 Ok(reader)
168 }
169
170 pub fn metadata(&self) -> error::Result<&MetaData> {
171 if let Some(e) = &self.epilogue_error {
172 return Err(e.clone());
173 }
174
175 if let Some(meta) = &self.meta_data {
176 return Ok(meta);
177 }
178
179 Err(error::Error::new_message_error(
180 "cannot read meta-data until after all rows are read",
181 None,
182 ))
183 }
184
185 pub fn facets(&self) -> error::Result<&HashMap<String, FacetResult>> {
186 if let Some(e) = &self.epilogue_error {
187 return Err(e.clone());
188 }
189
190 if let Some(facets) = &self.facets {
191 return Ok(facets);
192 }
193
194 Err(error::Error::new_message_error(
195 "cannot read facets until after all rows are read",
196 None,
197 ))
198 }
199
200 fn read_final_metadata(
201 &mut self,
202 epilog: Vec<u8>,
203 ) -> error::Result<(MetaData, HashMap<String, FacetResult>)> {
204 let metadata_json: search_json::SearchMetaData = match serde_json::from_slice(&epilog) {
205 Ok(m) => m,
206 Err(e) => {
207 return Err(error::Error::new_message_error(
208 format!("failed to parse metadata: {}", &e),
209 self.endpoint.clone(),
210 ));
211 }
212 };
213
214 let metadata = MetaData {
215 errors: metadata_json.status.errors,
216 metrics: Metrics {
217 failed_partition_count: metadata_json.status.failed,
218 max_score: metadata_json.max_score,
219 successful_partition_count: metadata_json.status.successful,
220 took: Duration::from_nanos(metadata_json.took),
221 total_hits: metadata_json.total_hits,
222 total_partition_count: metadata_json.status.total,
223 },
224 };
225
226 let mut facets: HashMap<String, FacetResult> = HashMap::new();
227 if let Some(resp_facets) = metadata_json.facets {
228 for (facet_name, facet_data) in resp_facets {
229 facets.insert(facet_name, facet_data.try_into()?);
230 }
231 }
232
233 Ok((metadata, facets))
234 }
235}