Skip to main content

couchbase_core/searchx/
search.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::httpx::client::Client;
20use crate::httpx::request::{Auth, OnBehalfOfInfo, Request};
21use crate::httpx::response::Response;
22use crate::mgmtx::mgmt::parse_response_json;
23use crate::searchx::document_analysis::DocumentAnalysis;
24use crate::searchx::error;
25use crate::searchx::error::{Error, ServerError};
26use crate::searchx::index::Index;
27use crate::searchx::index_json::{SearchIndexResponseJson, SearchIndexesResponseJson};
28use crate::searchx::mgmt_options::{
29    AllowQueryingOptions, AnalyzeDocumentOptions, DeleteIndexOptions, DisallowQueryingOptions,
30    FreezePlanOptions, GetAllIndexesOptions, GetIndexOptions, GetIndexedDocumentsCountOptions,
31    PauseIngestOptions, PingOptions, RefreshConfigOptions, ResumeIngestOptions,
32    UnfreezePlanOptions, UpsertIndexOptions,
33};
34use crate::searchx::query_options::QueryOptions;
35use crate::searchx::search_json::{DocumentAnalysisJson, IndexedDocumentsJson};
36use crate::searchx::search_respreader::SearchRespReader;
37use crate::tracingcomponent::{
38    BeginDispatchFields, EndDispatchFields, TracingComponent, SERVICE_VALUE_SEARCH,
39    SPAN_ATTRIB_DB_SYSTEM_VALUE, SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
40};
41use crate::util::get_host_port_tuple_from_uri;
42use bytes::Bytes;
43use http::{Method, StatusCode};
44use std::collections::HashMap;
45use std::sync::Arc;
46use tracing::{instrument, Level, Span};
47
48#[derive(Debug)]
49pub struct Search<C: Client> {
50    pub http_client: Arc<C>,
51    pub user_agent: String,
52    pub endpoint: String,
53    pub canonical_endpoint: String,
54    pub auth: Auth,
55
56    pub vector_search_enabled: bool,
57    pub(crate) tracing: Arc<TracingComponent>,
58}
59
60impl<C: Client> Search<C> {
61    pub fn new_request(
62        &self,
63        method: Method,
64        path: impl Into<String>,
65        content_type: impl Into<String>,
66        on_behalf_of: Option<OnBehalfOfInfo>,
67        headers: Option<HashMap<&str, &str>>,
68        body: Option<Bytes>,
69    ) -> Request {
70        let auth = if let Some(obo) = on_behalf_of {
71            Auth::OnBehalfOf(OnBehalfOfInfo {
72                username: obo.username,
73                password_or_domain: obo.password_or_domain,
74            })
75        } else {
76            self.auth.clone()
77        };
78
79        let mut req = Request::new(method, format!("{}/{}", self.endpoint, path.into()))
80            .auth(auth)
81            .user_agent(self.user_agent.clone())
82            .content_type(content_type.into())
83            .body(body);
84
85        if let Some(headers) = headers {
86            for (key, value) in headers.into_iter() {
87                req = req.add_header(key, value);
88            }
89        }
90
91        req
92    }
93
94    pub async fn execute(
95        &self,
96        method: Method,
97        path: impl Into<String>,
98        content_type: impl Into<String>,
99        on_behalf_of: Option<OnBehalfOfInfo>,
100        headers: Option<HashMap<&str, &str>>,
101        body: Option<Bytes>,
102    ) -> crate::httpx::error::Result<Response> {
103        let req = self.new_request(method, path, content_type, on_behalf_of, headers, body);
104
105        self.http_client.execute(req).await
106    }
107
108    pub async fn query(&self, opts: &QueryOptions) -> error::Result<SearchRespReader> {
109        if !self.vector_search_enabled && (opts.knn.is_some() || opts.knn_operator.is_some()) {
110            return Err(error::Error::new_unsupported_feature_error(
111                "vector search".to_string(),
112            ));
113        }
114
115        let req_uri = if let Some(bucket) = &opts.bucket_name {
116            if let Some(scope) = &opts.scope_name {
117                format!(
118                    "api/bucket/{}/scope/{}/index/{}/query",
119                    bucket, scope, opts.index_name
120                )
121            } else {
122                return Err(error::Error::new_invalid_argument_error(
123                    "must specify both or neither scope and bucket names",
124                    None,
125                ));
126            }
127        } else {
128            format!("api/index/{}/query", &opts.index_name)
129        };
130
131        let on_behalf_of = opts.on_behalf_of.clone();
132
133        let body = serde_json::to_vec(&opts).map_err(|e| {
134            error::Error::new_encoding_error(format!("could not serialize query options: {e}"))
135        })?;
136
137        let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
138        let canonical_addr =
139            get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
140        let res = self
141            .tracing
142            .orchestrate_dispatch_span(
143                BeginDispatchFields::new(
144                    (&peer_addr.0, &peer_addr.1),
145                    (&canonical_addr.0, &canonical_addr.1),
146                    None,
147                ),
148                self.execute(
149                    Method::POST,
150                    req_uri,
151                    "application/json",
152                    on_behalf_of,
153                    None,
154                    Some(Bytes::from(body)),
155                ),
156                |_| EndDispatchFields::new(None, None),
157            )
158            .await
159            .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
160
161        SearchRespReader::new(res, &opts.index_name, &self.endpoint).await
162    }
163
164    pub async fn upsert_index(&self, opts: &UpsertIndexOptions<'_>) -> error::Result<()> {
165        let req_uri = Self::get_uri(&opts.index.name, opts.bucket_name, opts.scope_name)?;
166
167        let body = serde_json::to_vec(&opts.index).map_err(|e| {
168            error::Error::new_encoding_error(format!("could not serialize index: {e}"))
169        })?;
170
171        let mut headers = HashMap::new();
172        headers.insert("cache-control", "no-cache");
173
174        let res = self
175            .execute(
176                Method::PUT,
177                req_uri,
178                "application/json",
179                // TODO: change when we change ownership on execute
180                opts.on_behalf_of.cloned(),
181                Some(headers),
182                Some(Bytes::from(body)),
183            )
184            .await
185            .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
186
187        if res.status() != 200 {
188            return Err(
189                decode_response_error(res, opts.index.name.clone(), self.endpoint.clone()).await,
190            );
191        }
192
193        Ok(())
194    }
195
196    pub async fn delete_index(&self, opts: &DeleteIndexOptions<'_>) -> error::Result<()> {
197        let req_uri = Self::get_uri(opts.index_name, opts.bucket_name, opts.scope_name)?;
198
199        let mut headers = HashMap::new();
200        headers.insert("cache-control", "no-cache");
201
202        let res = self
203            .execute(
204                Method::DELETE,
205                req_uri,
206                "application/json",
207                // TODO: change when we change ownership on execute
208                opts.on_behalf_of.cloned(),
209                Some(headers),
210                None,
211            )
212            .await
213            .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
214
215        if res.status() != 200 {
216            return Err(decode_response_error(
217                res,
218                opts.index_name.to_string(),
219                self.endpoint.clone(),
220            )
221            .await);
222        }
223
224        Ok(())
225    }
226
227    pub async fn get_index(&self, opts: &GetIndexOptions<'_>) -> error::Result<Index> {
228        let req_uri = Self::get_uri(opts.index_name, opts.bucket_name, opts.scope_name)?;
229
230        let res = self
231            .execute(
232                Method::GET,
233                req_uri,
234                "",
235                // TODO: change when we change ownership on execute
236                opts.on_behalf_of.cloned(),
237                None,
238                None,
239            )
240            .await
241            .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
242
243        if res.status() != 200 {
244            return Err(decode_response_error(
245                res,
246                opts.index_name.to_string(),
247                self.endpoint.clone(),
248            )
249            .await);
250        }
251
252        let index: SearchIndexResponseJson = parse_response_json(res).await.map_err(|e| {
253            error::Error::new_message_error(
254                format!("failed to parse index json: {e}"),
255                Some(self.endpoint.clone()),
256            )
257        })?;
258
259        Ok(index.index_def.into())
260    }
261
262    pub async fn get_all_indexes(
263        &self,
264        opts: &GetAllIndexesOptions<'_>,
265    ) -> error::Result<Vec<Index>> {
266        let req_uri = Self::get_uri("", opts.bucket_name, opts.scope_name)?;
267
268        let res = self
269            .execute(
270                Method::GET,
271                req_uri,
272                "",
273                // TODO: change when we change ownership on execute
274                opts.on_behalf_of.cloned(),
275                None,
276                None,
277            )
278            .await
279            .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
280
281        if res.status() != 200 {
282            return Err(decode_response_error(res, "".to_string(), self.endpoint.clone()).await);
283        }
284
285        let index: SearchIndexesResponseJson = parse_response_json(res).await.map_err(|e| {
286            error::Error::new_message_error(
287                format!("failed to parse index json: {e}"),
288                Some(self.endpoint.clone()),
289            )
290        })?;
291
292        Ok(index
293            .indexes
294            .index_defs
295            .into_values()
296            .map(Index::from)
297            .collect())
298    }
299
300    pub async fn analyze_document(
301        &self,
302        opts: &AnalyzeDocumentOptions<'_>,
303    ) -> error::Result<DocumentAnalysis> {
304        let req_uri = Self::get_uri(opts.index_name, opts.bucket_name, opts.scope_name)?;
305        let body = Bytes::from(opts.doc_content.to_vec());
306
307        let res = self
308            .execute(
309                Method::POST,
310                req_uri,
311                "application/json",
312                // TODO: change when we change ownership on execute
313                opts.on_behalf_of.cloned(),
314                None,
315                Some(body),
316            )
317            .await
318            .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
319
320        if res.status() != 200 {
321            return Err(decode_response_error(
322                res,
323                opts.index_name.to_string(),
324                self.endpoint.clone(),
325            )
326            .await);
327        }
328
329        let analysis: DocumentAnalysisJson = parse_response_json(res).await.map_err(|e| {
330            error::Error::new_message_error(
331                format!("failed to parse document analysis: {e}"),
332                Some(self.endpoint.clone()),
333            )
334        })?;
335
336        Ok(analysis.into())
337    }
338
339    pub async fn get_indexed_documents_count(
340        &self,
341        opts: &GetIndexedDocumentsCountOptions<'_>,
342    ) -> error::Result<u64> {
343        let req_uri = if opts.scope_name.is_none() && opts.bucket_name.is_none() {
344            format!("/api/index/{}/count", opts.index_name)
345        } else if opts.scope_name.is_some() && opts.bucket_name.is_some() {
346            format!(
347                "/api/bucket/{}/scope/{}/index/{}/count",
348                opts.bucket_name.unwrap(),
349                opts.scope_name.unwrap(),
350                opts.index_name
351            )
352        } else {
353            return Err(error::Error::new_invalid_argument_error(
354                "must specify both or neither of scope and bucket names",
355                None,
356            ));
357        };
358
359        let res = self
360            .execute(
361                Method::GET,
362                req_uri,
363                "",
364                opts.on_behalf_of.cloned(),
365                None,
366                None,
367            )
368            .await
369            .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
370
371        if res.status() != 200 {
372            return Err(decode_response_error(
373                res,
374                opts.index_name.to_string(),
375                self.endpoint.clone(),
376            )
377            .await);
378        }
379
380        let count: IndexedDocumentsJson = parse_response_json(res).await.map_err(|e| {
381            error::Error::new_message_error(
382                format!("failed to parse indexed count: {e}"),
383                Some(self.endpoint.clone()),
384            )
385        })?;
386
387        Ok(count.count)
388    }
389
390    pub async fn pause_ingest(&self, opts: &PauseIngestOptions<'_>) -> error::Result<()> {
391        self.control_request(
392            opts.index_name,
393            opts.bucket_name,
394            opts.scope_name,
395            "ingestControl/pause",
396            opts.on_behalf_of,
397        )
398        .await
399    }
400
401    pub async fn resume_ingest(&self, opts: &ResumeIngestOptions<'_>) -> error::Result<()> {
402        self.control_request(
403            opts.index_name,
404            opts.bucket_name,
405            opts.scope_name,
406            "ingestControl/resume",
407            opts.on_behalf_of,
408        )
409        .await
410    }
411
412    pub async fn allow_querying(&self, opts: &AllowQueryingOptions<'_>) -> error::Result<()> {
413        self.control_request(
414            opts.index_name,
415            opts.bucket_name,
416            opts.scope_name,
417            "queryControl/allow",
418            opts.on_behalf_of,
419        )
420        .await
421    }
422
423    pub async fn disallow_querying(&self, opts: &DisallowQueryingOptions<'_>) -> error::Result<()> {
424        self.control_request(
425            opts.index_name,
426            opts.bucket_name,
427            opts.scope_name,
428            "queryControl/disallow",
429            opts.on_behalf_of,
430        )
431        .await
432    }
433
434    pub async fn freeze_plan(&self, opts: &FreezePlanOptions<'_>) -> error::Result<()> {
435        self.control_request(
436            opts.index_name,
437            opts.bucket_name,
438            opts.scope_name,
439            "planFreezeControl/freeze",
440            opts.on_behalf_of,
441        )
442        .await
443    }
444
445    pub async fn unfreeze_plan(&self, opts: &UnfreezePlanOptions<'_>) -> error::Result<()> {
446        self.control_request(
447            opts.index_name,
448            opts.bucket_name,
449            opts.scope_name,
450            "planFreezeControl/unfreeze",
451            opts.on_behalf_of,
452        )
453        .await
454    }
455
456    pub async fn ping(&self, opts: &PingOptions<'_>) -> error::Result<()> {
457        let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
458        let canonical_addr =
459            get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
460        let res = match self
461            .tracing
462            .orchestrate_dispatch_span(
463                BeginDispatchFields::new(
464                    (&peer_addr.0, &peer_addr.1),
465                    (&canonical_addr.0, &canonical_addr.1),
466                    None,
467                ),
468                self.execute(
469                    Method::GET,
470                    "/api/ping",
471                    "",
472                    opts.on_behalf_of.cloned(),
473                    None,
474                    None,
475                ),
476                |_| EndDispatchFields::new(None, None),
477            )
478            .await
479        {
480            Ok(r) => r,
481            Err(e) => {
482                return Err(Error::new_http_error(e, self.endpoint.to_string()));
483            }
484        };
485
486        if res.status().is_success() {
487            return Ok(());
488        }
489
490        Err(Error::new_message_error(
491            format!("ping failed with status code: {}", res.status()),
492            Some(self.endpoint.clone()),
493        ))
494    }
495
496    async fn control_request(
497        &self,
498        index_name: &str,
499        bucket_name: Option<&str>,
500        scope_name: Option<&str>,
501        control: &str,
502        on_behalf_of: Option<&OnBehalfOfInfo>,
503    ) -> error::Result<()> {
504        if index_name.is_empty() {
505            return Err(error::Error::new_invalid_argument_error(
506                "must specify index name",
507                None,
508            ));
509        }
510
511        let req_uri = if scope_name.is_none() && bucket_name.is_none() {
512            format!("/api/index/{index_name}/{control}")
513        } else if scope_name.is_some() && bucket_name.is_some() {
514            format!(
515                "/api/bucket/{}/scope/{}/index/{}/{}",
516                bucket_name.unwrap(),
517                scope_name.unwrap(),
518                index_name,
519                control
520            )
521        } else {
522            return Err(error::Error::new_invalid_argument_error(
523                "must specify both or neither of scope and bucket names",
524                None,
525            ));
526        };
527
528        let res = self
529            .execute(
530                Method::POST,
531                req_uri,
532                "application/json",
533                on_behalf_of.cloned(),
534                None,
535                None,
536            )
537            .await
538            .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
539
540        if res.status() != 200 {
541            return Err(
542                decode_response_error(res, index_name.to_string(), self.endpoint.clone()).await,
543            );
544        }
545
546        Ok(())
547    }
548
549    pub(crate) async fn refresh_config(
550        &self,
551        opts: &RefreshConfigOptions<'_>,
552    ) -> error::Result<()> {
553        let res = self
554            .execute(
555                Method::POST,
556                "/api/cfgRefresh",
557                "application/json",
558                opts.on_behalf_of.cloned(),
559                None,
560                None,
561            )
562            .await
563            .map_err(|e| error::Error::new_http_error(e, self.endpoint.to_string()))?;
564
565        if res.status() != 200 {
566            return Err(decode_response_error(res, "".to_string(), self.endpoint.clone()).await);
567        }
568
569        Ok(())
570    }
571
572    fn get_uri(
573        index_name: &str,
574        bucket_name: Option<&str>,
575        scope_name: Option<&str>,
576    ) -> error::Result<String> {
577        if let Some(bucket) = &bucket_name {
578            if let Some(scope) = &scope_name {
579                Ok(format!(
580                    "api/bucket/{}/scope/{}/index/{}",
581                    bucket, scope, &index_name
582                ))
583            } else {
584                Err(error::Error::new_invalid_argument_error(
585                    "must specify both or neither scope and bucket names",
586                    None,
587                ))
588            }
589        } else {
590            Ok(format!("api/index/{}", &index_name))
591        }
592    }
593}
594
595pub(crate) async fn decode_response_error(
596    response: Response,
597    index_name: String,
598    endpoint: String,
599) -> error::Error {
600    let status = response.status();
601    let body = match response.bytes().await {
602        Ok(b) => b,
603        Err(e) => {
604            return Error::new_http_error(e, endpoint);
605        }
606    };
607
608    let body_str = match String::from_utf8(body.to_vec()) {
609        Ok(s) => s.to_lowercase(),
610        Err(e) => {
611            return Error::new_message_error(
612                format!("could not parse error response: {e}"),
613                endpoint,
614            );
615        }
616    };
617
618    decode_common_error(index_name, status, &body_str, endpoint)
619}
620
621pub(crate) fn decode_common_error(
622    index_name: String,
623    status: StatusCode,
624    body_str: &str,
625    endpoint: String,
626) -> error::Error {
627    let error_kind = if status == 401 || status == 403 {
628        error::ServerErrorKind::AuthenticationFailure
629    } else if body_str.contains("index not found") {
630        error::ServerErrorKind::IndexNotFound
631    } else if body_str.contains("index with the same name already exists")
632        || (body_str.contains("current index uuuid")
633            && body_str.contains("did not match input uuid"))
634    {
635        error::ServerErrorKind::IndexExists
636    } else if body_str.contains("unknown indextype") {
637        error::ServerErrorKind::UnknownIndexType
638    } else if body_str.contains("error obtaining vbucket count for bucket")
639        || body_str.contains("requested resource not found")
640        || body_str.contains("non existent bucket")
641    {
642        // In server 7.2.4 and later, ns_server produces "non-existent bucket" instead of "requested resource not found".
643        // However in 7.6.0, FTS reordered their handling here and produces the "vbucket count for bucket" instead.
644        // So we need to check for all the variants of this.
645        error::ServerErrorKind::SourceNotFound
646    } else if body_str
647        .contains("failed to connect to or retrieve information from source, sourcetype")
648    {
649        error::ServerErrorKind::SourceTypeIncorrect
650    } else if body_str.contains("no planpindexes for indexname") {
651        error::ServerErrorKind::NoIndexPartitionsPlanned
652    } else if body_str.contains("no local pindexes found") {
653        error::ServerErrorKind::NoIndexPartitionsFound
654    } else if status == 500 {
655        error::ServerErrorKind::Internal
656    } else if status == 429 {
657        if body_str.contains("num_concurrent_requests")
658            || body_str.contains("num_queries_per_min")
659            || body_str.contains("ingress_mib_per_min")
660            || body_str.contains("egress_mib_per_min")
661        {
662            error::ServerErrorKind::RateLimitedFailure
663        } else {
664            error::ServerErrorKind::Unknown
665        }
666    } else {
667        error::ServerErrorKind::Unknown
668    };
669
670    error::Error::new_server_error(ServerError::new(
671        error_kind, index_name, body_str, endpoint, status,
672    ))
673}