1use crate::httpx::client::Client;
20use crate::httpx::request::{Auth, OnBehalfOfInfo, Request};
21use crate::httpx::response::Response;
22use crate::mgmtx::mgmt::parse_response_json;
23use crate::searchx::document_analysis::DocumentAnalysis;
24use crate::searchx::error;
25use crate::searchx::error::{Error, ServerError};
26use crate::searchx::index::Index;
27use crate::searchx::index_json::{SearchIndexResponseJson, SearchIndexesResponseJson};
28use crate::searchx::mgmt_options::{
29 AllowQueryingOptions, AnalyzeDocumentOptions, DeleteIndexOptions, DisallowQueryingOptions,
30 FreezePlanOptions, GetAllIndexesOptions, GetIndexOptions, GetIndexedDocumentsCountOptions,
31 PauseIngestOptions, PingOptions, RefreshConfigOptions, ResumeIngestOptions,
32 UnfreezePlanOptions, UpsertIndexOptions,
33};
34use crate::searchx::query_options::QueryOptions;
35use crate::searchx::search_json::{DocumentAnalysisJson, IndexedDocumentsJson};
36use crate::searchx::search_respreader::SearchRespReader;
37use crate::tracingcomponent::{
38 BeginDispatchFields, EndDispatchFields, TracingComponent, SERVICE_VALUE_SEARCH,
39 SPAN_ATTRIB_DB_SYSTEM_VALUE, SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
40};
41use crate::util::get_host_port_tuple_from_uri;
42use bytes::Bytes;
43use http::{Method, StatusCode};
44use std::collections::HashMap;
45use std::sync::Arc;
46use tracing::{instrument, Level, Span};
47
48#[derive(Debug)]
49pub struct Search<C: Client> {
50 pub http_client: Arc<C>,
51 pub user_agent: String,
52 pub endpoint: String,
53 pub canonical_endpoint: String,
54 pub auth: Auth,
55
56 pub vector_search_enabled: bool,
57 pub(crate) tracing: Arc<TracingComponent>,
58}
59
60impl<C: Client> Search<C> {
61 pub fn new_request(
62 &self,
63 method: Method,
64 path: impl Into<String>,
65 content_type: impl Into<String>,
66 on_behalf_of: Option<OnBehalfOfInfo>,
67 headers: Option<HashMap<&str, &str>>,
68 body: Option<Bytes>,
69 ) -> Request {
70 let auth = if let Some(obo) = on_behalf_of {
71 Auth::OnBehalfOf(OnBehalfOfInfo {
72 username: obo.username,
73 password_or_domain: obo.password_or_domain,
74 })
75 } else {
76 self.auth.clone()
77 };
78
79 let mut req = Request::new(method, format!("{}/{}", self.endpoint, path.into()))
80 .auth(auth)
81 .user_agent(self.user_agent.clone())
82 .content_type(content_type.into())
83 .body(body);
84
85 if let Some(headers) = headers {
86 for (key, value) in headers.into_iter() {
87 req = req.add_header(key, value);
88 }
89 }
90
91 req
92 }
93
94 pub async fn execute(
95 &self,
96 method: Method,
97 path: impl Into<String>,
98 content_type: impl Into<String>,
99 on_behalf_of: Option<OnBehalfOfInfo>,
100 headers: Option<HashMap<&str, &str>>,
101 body: Option<Bytes>,
102 ) -> crate::httpx::error::Result<Response> {
103 let req = self.new_request(method, path, content_type, on_behalf_of, headers, body);
104
105 self.http_client.execute(req).await
106 }
107
108 pub async fn query(&self, opts: &QueryOptions) -> error::Result<SearchRespReader> {
109 if !self.vector_search_enabled && (opts.knn.is_some() || opts.knn_operator.is_some()) {
110 return Err(error::Error::new_unsupported_feature_error(
111 "vector search".to_string(),
112 ));
113 }
114
115 let req_uri = if let Some(bucket) = &opts.bucket_name {
116 if let Some(scope) = &opts.scope_name {
117 format!(
118 "api/bucket/{}/scope/{}/index/{}/query",
119 bucket, scope, opts.index_name
120 )
121 } else {
122 return Err(error::Error::new_invalid_argument_error(
123 "must specify both or neither scope and bucket names",
124 None,
125 ));
126 }
127 } else {
128 format!("api/index/{}/query", &opts.index_name)
129 };
130
131 let on_behalf_of = opts.on_behalf_of.clone();
132
133 let body = serde_json::to_vec(&opts).map_err(|e| {
134 error::Error::new_encoding_error(format!("could not serialize query options: {e}"))
135 })?;
136
137 let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
138 let canonical_addr =
139 get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
140 let res = self
141 .tracing
142 .orchestrate_dispatch_span(
143 BeginDispatchFields::new(
144 (&peer_addr.0, &peer_addr.1),
145 (&canonical_addr.0, &canonical_addr.1),
146 None,
147 ),
148 self.execute(
149 Method::POST,
150 req_uri,
151 "application/json",
152 on_behalf_of,
153 None,
154 Some(Bytes::from(body)),
155 ),
156 |_| EndDispatchFields::new(None, None),
157 )
158 .await
159 .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
160
161 SearchRespReader::new(res, &opts.index_name, &self.endpoint).await
162 }
163
164 pub async fn upsert_index(&self, opts: &UpsertIndexOptions<'_>) -> error::Result<()> {
165 let req_uri = Self::get_uri(&opts.index.name, opts.bucket_name, opts.scope_name)?;
166
167 let body = serde_json::to_vec(&opts.index).map_err(|e| {
168 error::Error::new_encoding_error(format!("could not serialize index: {e}"))
169 })?;
170
171 let mut headers = HashMap::new();
172 headers.insert("cache-control", "no-cache");
173
174 let res = self
175 .execute(
176 Method::PUT,
177 req_uri,
178 "application/json",
179 opts.on_behalf_of.cloned(),
181 Some(headers),
182 Some(Bytes::from(body)),
183 )
184 .await
185 .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
186
187 if res.status() != 200 {
188 return Err(
189 decode_response_error(res, opts.index.name.clone(), self.endpoint.clone()).await,
190 );
191 }
192
193 Ok(())
194 }
195
196 pub async fn delete_index(&self, opts: &DeleteIndexOptions<'_>) -> error::Result<()> {
197 let req_uri = Self::get_uri(opts.index_name, opts.bucket_name, opts.scope_name)?;
198
199 let mut headers = HashMap::new();
200 headers.insert("cache-control", "no-cache");
201
202 let res = self
203 .execute(
204 Method::DELETE,
205 req_uri,
206 "application/json",
207 opts.on_behalf_of.cloned(),
209 Some(headers),
210 None,
211 )
212 .await
213 .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
214
215 if res.status() != 200 {
216 return Err(decode_response_error(
217 res,
218 opts.index_name.to_string(),
219 self.endpoint.clone(),
220 )
221 .await);
222 }
223
224 Ok(())
225 }
226
227 pub async fn get_index(&self, opts: &GetIndexOptions<'_>) -> error::Result<Index> {
228 let req_uri = Self::get_uri(opts.index_name, opts.bucket_name, opts.scope_name)?;
229
230 let res = self
231 .execute(
232 Method::GET,
233 req_uri,
234 "",
235 opts.on_behalf_of.cloned(),
237 None,
238 None,
239 )
240 .await
241 .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
242
243 if res.status() != 200 {
244 return Err(decode_response_error(
245 res,
246 opts.index_name.to_string(),
247 self.endpoint.clone(),
248 )
249 .await);
250 }
251
252 let index: SearchIndexResponseJson = parse_response_json(res).await.map_err(|e| {
253 error::Error::new_message_error(
254 format!("failed to parse index json: {e}"),
255 Some(self.endpoint.clone()),
256 )
257 })?;
258
259 Ok(index.index_def.into())
260 }
261
262 pub async fn get_all_indexes(
263 &self,
264 opts: &GetAllIndexesOptions<'_>,
265 ) -> error::Result<Vec<Index>> {
266 let req_uri = Self::get_uri("", opts.bucket_name, opts.scope_name)?;
267
268 let res = self
269 .execute(
270 Method::GET,
271 req_uri,
272 "",
273 opts.on_behalf_of.cloned(),
275 None,
276 None,
277 )
278 .await
279 .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
280
281 if res.status() != 200 {
282 return Err(decode_response_error(res, "".to_string(), self.endpoint.clone()).await);
283 }
284
285 let index: SearchIndexesResponseJson = parse_response_json(res).await.map_err(|e| {
286 error::Error::new_message_error(
287 format!("failed to parse index json: {e}"),
288 Some(self.endpoint.clone()),
289 )
290 })?;
291
292 Ok(index
293 .indexes
294 .index_defs
295 .into_values()
296 .map(Index::from)
297 .collect())
298 }
299
300 pub async fn analyze_document(
301 &self,
302 opts: &AnalyzeDocumentOptions<'_>,
303 ) -> error::Result<DocumentAnalysis> {
304 let req_uri = Self::get_uri(opts.index_name, opts.bucket_name, opts.scope_name)?;
305 let body = Bytes::from(opts.doc_content.to_vec());
306
307 let res = self
308 .execute(
309 Method::POST,
310 req_uri,
311 "application/json",
312 opts.on_behalf_of.cloned(),
314 None,
315 Some(body),
316 )
317 .await
318 .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
319
320 if res.status() != 200 {
321 return Err(decode_response_error(
322 res,
323 opts.index_name.to_string(),
324 self.endpoint.clone(),
325 )
326 .await);
327 }
328
329 let analysis: DocumentAnalysisJson = parse_response_json(res).await.map_err(|e| {
330 error::Error::new_message_error(
331 format!("failed to parse document analysis: {e}"),
332 Some(self.endpoint.clone()),
333 )
334 })?;
335
336 Ok(analysis.into())
337 }
338
339 pub async fn get_indexed_documents_count(
340 &self,
341 opts: &GetIndexedDocumentsCountOptions<'_>,
342 ) -> error::Result<u64> {
343 let req_uri = if opts.scope_name.is_none() && opts.bucket_name.is_none() {
344 format!("/api/index/{}/count", opts.index_name)
345 } else if opts.scope_name.is_some() && opts.bucket_name.is_some() {
346 format!(
347 "/api/bucket/{}/scope/{}/index/{}/count",
348 opts.bucket_name.unwrap(),
349 opts.scope_name.unwrap(),
350 opts.index_name
351 )
352 } else {
353 return Err(error::Error::new_invalid_argument_error(
354 "must specify both or neither of scope and bucket names",
355 None,
356 ));
357 };
358
359 let res = self
360 .execute(
361 Method::GET,
362 req_uri,
363 "",
364 opts.on_behalf_of.cloned(),
365 None,
366 None,
367 )
368 .await
369 .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
370
371 if res.status() != 200 {
372 return Err(decode_response_error(
373 res,
374 opts.index_name.to_string(),
375 self.endpoint.clone(),
376 )
377 .await);
378 }
379
380 let count: IndexedDocumentsJson = parse_response_json(res).await.map_err(|e| {
381 error::Error::new_message_error(
382 format!("failed to parse indexed count: {e}"),
383 Some(self.endpoint.clone()),
384 )
385 })?;
386
387 Ok(count.count)
388 }
389
390 pub async fn pause_ingest(&self, opts: &PauseIngestOptions<'_>) -> error::Result<()> {
391 self.control_request(
392 opts.index_name,
393 opts.bucket_name,
394 opts.scope_name,
395 "ingestControl/pause",
396 opts.on_behalf_of,
397 )
398 .await
399 }
400
401 pub async fn resume_ingest(&self, opts: &ResumeIngestOptions<'_>) -> error::Result<()> {
402 self.control_request(
403 opts.index_name,
404 opts.bucket_name,
405 opts.scope_name,
406 "ingestControl/resume",
407 opts.on_behalf_of,
408 )
409 .await
410 }
411
412 pub async fn allow_querying(&self, opts: &AllowQueryingOptions<'_>) -> error::Result<()> {
413 self.control_request(
414 opts.index_name,
415 opts.bucket_name,
416 opts.scope_name,
417 "queryControl/allow",
418 opts.on_behalf_of,
419 )
420 .await
421 }
422
423 pub async fn disallow_querying(&self, opts: &DisallowQueryingOptions<'_>) -> error::Result<()> {
424 self.control_request(
425 opts.index_name,
426 opts.bucket_name,
427 opts.scope_name,
428 "queryControl/disallow",
429 opts.on_behalf_of,
430 )
431 .await
432 }
433
434 pub async fn freeze_plan(&self, opts: &FreezePlanOptions<'_>) -> error::Result<()> {
435 self.control_request(
436 opts.index_name,
437 opts.bucket_name,
438 opts.scope_name,
439 "planFreezeControl/freeze",
440 opts.on_behalf_of,
441 )
442 .await
443 }
444
445 pub async fn unfreeze_plan(&self, opts: &UnfreezePlanOptions<'_>) -> error::Result<()> {
446 self.control_request(
447 opts.index_name,
448 opts.bucket_name,
449 opts.scope_name,
450 "planFreezeControl/unfreeze",
451 opts.on_behalf_of,
452 )
453 .await
454 }
455
456 pub async fn ping(&self, opts: &PingOptions<'_>) -> error::Result<()> {
457 let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
458 let canonical_addr =
459 get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
460 let res = match self
461 .tracing
462 .orchestrate_dispatch_span(
463 BeginDispatchFields::new(
464 (&peer_addr.0, &peer_addr.1),
465 (&canonical_addr.0, &canonical_addr.1),
466 None,
467 ),
468 self.execute(
469 Method::GET,
470 "/api/ping",
471 "",
472 opts.on_behalf_of.cloned(),
473 None,
474 None,
475 ),
476 |_| EndDispatchFields::new(None, None),
477 )
478 .await
479 {
480 Ok(r) => r,
481 Err(e) => {
482 return Err(Error::new_http_error(e, self.endpoint.to_string()));
483 }
484 };
485
486 if res.status().is_success() {
487 return Ok(());
488 }
489
490 Err(Error::new_message_error(
491 format!("ping failed with status code: {}", res.status()),
492 Some(self.endpoint.clone()),
493 ))
494 }
495
496 async fn control_request(
497 &self,
498 index_name: &str,
499 bucket_name: Option<&str>,
500 scope_name: Option<&str>,
501 control: &str,
502 on_behalf_of: Option<&OnBehalfOfInfo>,
503 ) -> error::Result<()> {
504 if index_name.is_empty() {
505 return Err(error::Error::new_invalid_argument_error(
506 "must specify index name",
507 None,
508 ));
509 }
510
511 let req_uri = if scope_name.is_none() && bucket_name.is_none() {
512 format!("/api/index/{index_name}/{control}")
513 } else if scope_name.is_some() && bucket_name.is_some() {
514 format!(
515 "/api/bucket/{}/scope/{}/index/{}/{}",
516 bucket_name.unwrap(),
517 scope_name.unwrap(),
518 index_name,
519 control
520 )
521 } else {
522 return Err(error::Error::new_invalid_argument_error(
523 "must specify both or neither of scope and bucket names",
524 None,
525 ));
526 };
527
528 let res = self
529 .execute(
530 Method::POST,
531 req_uri,
532 "application/json",
533 on_behalf_of.cloned(),
534 None,
535 None,
536 )
537 .await
538 .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
539
540 if res.status() != 200 {
541 return Err(
542 decode_response_error(res, index_name.to_string(), self.endpoint.clone()).await,
543 );
544 }
545
546 Ok(())
547 }
548
549 pub(crate) async fn refresh_config(
550 &self,
551 opts: &RefreshConfigOptions<'_>,
552 ) -> error::Result<()> {
553 let res = self
554 .execute(
555 Method::POST,
556 "/api/cfgRefresh",
557 "application/json",
558 opts.on_behalf_of.cloned(),
559 None,
560 None,
561 )
562 .await
563 .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
564
565 if res.status() != 200 {
566 return Err(decode_response_error(res, "".to_string(), self.endpoint.clone()).await);
567 }
568
569 Ok(())
570 }
571
572 fn get_uri(
573 index_name: &str,
574 bucket_name: Option<&str>,
575 scope_name: Option<&str>,
576 ) -> error::Result<String> {
577 if let Some(bucket) = &bucket_name {
578 if let Some(scope) = &scope_name {
579 Ok(format!(
580 "api/bucket/{}/scope/{}/index/{}",
581 bucket, scope, &index_name
582 ))
583 } else {
584 Err(error::Error::new_invalid_argument_error(
585 "must specify both or neither scope and bucket names",
586 None,
587 ))
588 }
589 } else {
590 Ok(format!("api/index/{}", &index_name))
591 }
592 }
593}
594
595pub(crate) async fn decode_response_error(
596 response: Response,
597 index_name: String,
598 endpoint: String,
599) -> error::Error {
600 let status = response.status();
601 let body = match response.bytes().await {
602 Ok(b) => b,
603 Err(e) => {
604 return Error::new_http_error(e, endpoint);
605 }
606 };
607
608 let body_str = match String::from_utf8(body.to_vec()) {
609 Ok(s) => s.to_lowercase(),
610 Err(e) => {
611 return Error::new_message_error(
612 format!("could not parse error response: {e}"),
613 endpoint,
614 );
615 }
616 };
617
618 decode_common_error(index_name, status, &body_str, endpoint)
619}
620
621pub(crate) fn decode_common_error(
622 index_name: String,
623 status: StatusCode,
624 body_str: &str,
625 endpoint: String,
626) -> error::Error {
627 let error_kind = if status == 401 || status == 403 {
628 error::ServerErrorKind::AuthenticationFailure
629 } else if body_str.contains("index not found") {
630 error::ServerErrorKind::IndexNotFound
631 } else if body_str.contains("index with the same name already exists")
632 || (body_str.contains("current index uuuid")
633 && body_str.contains("did not match input uuid"))
634 {
635 error::ServerErrorKind::IndexExists
636 } else if body_str.contains("unknown indextype") {
637 error::ServerErrorKind::UnknownIndexType
638 } else if body_str.contains("error obtaining vbucket count for bucket")
639 || body_str.contains("requested resource not found")
640 || body_str.contains("non existent bucket")
641 {
642 error::ServerErrorKind::SourceNotFound
646 } else if body_str
647 .contains("failed to connect to or retrieve information from source, sourcetype")
648 {
649 error::ServerErrorKind::SourceTypeIncorrect
650 } else if body_str.contains("no planpindexes for indexname") {
651 error::ServerErrorKind::NoIndexPartitionsPlanned
652 } else if body_str.contains("no local pindexes found") {
653 error::ServerErrorKind::NoIndexPartitionsFound
654 } else if status == 500 {
655 error::ServerErrorKind::Internal
656 } else if status == 429 {
657 if body_str.contains("num_concurrent_requests")
658 || body_str.contains("num_queries_per_min")
659 || body_str.contains("ingress_mib_per_min")
660 || body_str.contains("egress_mib_per_min")
661 {
662 error::ServerErrorKind::RateLimitedFailure
663 } else {
664 error::ServerErrorKind::Unknown
665 }
666 } else {
667 error::ServerErrorKind::Unknown
668 };
669
670 error::Error::new_server_error(ServerError::new(
671 error_kind, index_name, body_str, endpoint, status,
672 ))
673}