Skip to main content

couchbase_core/
searchcomponent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::authenticator::Authenticator;
20use crate::componentconfigs::NetworkAndCanonicalEndpoint;
21use crate::diagnosticscomponent::PingSearchReportOptions;
22use crate::error::ErrorKind;
23use crate::httpcomponent::{HttpComponent, HttpComponentState};
24use crate::httpx::client::Client;
25use crate::httpx::request::Auth;
26use crate::mgmtx::node_target::NodeTarget;
27use crate::options::search::SearchOptions;
28use crate::options::search_management::{
29    AllowQueryingOptions, AnalyzeDocumentOptions, DeleteIndexOptions, DisallowQueryingOptions,
30    EnsureIndexOptions, FreezePlanOptions, GetAllIndexesOptions, GetIndexOptions,
31    GetIndexedDocumentsCountOptions, PauseIngestOptions, ResumeIngestOptions, UnfreezePlanOptions,
32    UpsertIndexOptions,
33};
34use crate::results::pingreport::{EndpointPingReport, PingState};
35use crate::results::search::SearchResultStream;
36use crate::retry::{orchestrate_retries, RetryManager, RetryRequest, RetryStrategy};
37use crate::retrybesteffort::ExponentialBackoffCalculator;
38use crate::searchx::document_analysis::DocumentAnalysis;
39use crate::searchx::ensure_index_helper::EnsureIndexHelper;
40use crate::searchx::index::Index;
41use crate::searchx::mgmt_options::{EnsureIndexPollOptions, PingOptions};
42use crate::searchx::search::Search;
43use crate::service_type::ServiceType;
44use crate::tracingcomponent::TracingComponent;
45use crate::{error, httpx};
46use arc_swap::ArcSwap;
47use futures::future::join_all;
48use futures::StreamExt;
49use std::collections::HashMap;
50use std::future::Future;
51use std::ops::Sub;
52use std::sync::Arc;
53use std::time::Duration;
54use tokio::select;
55
56pub(crate) struct SearchComponent<C: Client> {
57    http_component: HttpComponent<C>,
58    tracing: Arc<TracingComponent>,
59
60    retry_manager: Arc<RetryManager>,
61
62    state: ArcSwap<SearchComponentState>,
63}
64
65#[derive(Debug)]
66pub(crate) struct SearchComponentState {
67    pub vector_search_enabled: bool,
68}
69
70pub(crate) struct SearchComponentConfig {
71    pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
72    pub authenticator: Authenticator,
73
74    pub vector_search_enabled: bool,
75}
76
77#[derive(Debug)]
78pub(crate) struct SearchComponentOptions {
79    pub user_agent: String,
80}
81
82impl<C: Client + 'static> SearchComponent<C> {
83    pub fn new(
84        retry_manager: Arc<RetryManager>,
85        http_client: Arc<C>,
86        tracing: Arc<TracingComponent>,
87        config: SearchComponentConfig,
88        opts: SearchComponentOptions,
89    ) -> Self {
90        Self {
91            http_component: HttpComponent::new(
92                ServiceType::SEARCH,
93                opts.user_agent,
94                http_client,
95                HttpComponentState::new(config.endpoints, config.authenticator),
96            ),
97            tracing,
98            retry_manager,
99            state: ArcSwap::new(Arc::new(SearchComponentState {
100                vector_search_enabled: config.vector_search_enabled,
101            })),
102        }
103    }
104
105    pub fn reconfigure(&self, config: SearchComponentConfig) {
106        self.http_component.reconfigure(HttpComponentState::new(
107            config.endpoints,
108            config.authenticator,
109        ));
110
111        self.state.swap(Arc::new(SearchComponentState {
112            vector_search_enabled: config.vector_search_enabled,
113        }));
114    }
115
116    pub async fn query(&self, opts: SearchOptions) -> error::Result<SearchResultStream> {
117        if (opts.knn.is_some() || opts.knn_operator.is_some())
118            && !self.state.load().vector_search_enabled
119        {
120            return Err(ErrorKind::FeatureNotAvailable {
121                feature: "Vector Search".to_string(),
122                msg: "vector queries are available from Couchbase Server 7.6.0 and above"
123                    .to_string(),
124            }
125            .into());
126        }
127        let retry_info = RetryRequest::new("search_query", true);
128
129        let retry = opts.retry_strategy.clone();
130        let endpoint = opts.endpoint.clone();
131        let copts = opts.into();
132
133        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
134            self.http_component
135                .orchestrate_endpoint(
136                    endpoint.clone(),
137                    async |client: Arc<C>,
138                           endpoint_id: String,
139                           endpoint: String,
140                           canonical_endpoint: String,
141                           auth: Auth| {
142                        let res = match (Search::<C> {
143                            http_client: client,
144                            user_agent: self.http_component.user_agent().to_string(),
145                            endpoint: endpoint.clone(),
146                            canonical_endpoint,
147                            auth,
148
149                            vector_search_enabled: self.state.load().vector_search_enabled,
150                            tracing: self.tracing.clone(),
151                        }
152                        .query(&copts)
153                        .await)
154                        {
155                            Ok(r) => r,
156                            Err(e) => return Err(ErrorKind::Search(e).into()),
157                        };
158
159                        Ok(SearchResultStream {
160                            inner: res,
161                            endpoint,
162                        })
163                    },
164                )
165                .await
166        })
167        .await
168    }
169
170    pub async fn get_index(&self, opts: &GetIndexOptions<'_>) -> error::Result<Index> {
171        let retry_info = RetryRequest::new("search_get_index", true);
172        let retry = opts.retry_strategy.clone();
173        let endpoint = opts.endpoint;
174        let copts = opts.into();
175
176        self.orchestrate_mgmt_call(
177            retry,
178            retry_info,
179            endpoint.map(|e| e.to_string()),
180            async |search| {
181                search
182                    .get_index(&copts)
183                    .await
184                    .map_err(|e| ErrorKind::Search(e).into())
185            },
186        )
187        .await
188    }
189
190    pub async fn get_all_indexes(
191        &self,
192        opts: &GetAllIndexesOptions<'_>,
193    ) -> error::Result<Vec<Index>> {
194        let retry_info = RetryRequest::new("search_get_all_indexes", true);
195        let retry = opts.retry_strategy.clone();
196        let endpoint = opts.endpoint;
197        let copts = opts.into();
198
199        self.orchestrate_mgmt_call(
200            retry,
201            retry_info,
202            endpoint.map(|e| e.to_string()),
203            async |search| {
204                search
205                    .get_all_indexes(&copts)
206                    .await
207                    .map_err(|e| ErrorKind::Search(e).into())
208            },
209        )
210        .await
211    }
212
213    pub async fn upsert_index(&self, opts: &UpsertIndexOptions<'_>) -> error::Result<()> {
214        let retry_info = RetryRequest::new("search_upsert_index", true);
215        let retry = opts.retry_strategy.clone();
216        let endpoint = opts.endpoint;
217        let copts = opts.into();
218
219        self.orchestrate_no_res_mgmt_call(
220            retry,
221            retry_info,
222            endpoint.map(|e| e.to_string()),
223            async |search| {
224                search
225                    .upsert_index(&copts)
226                    .await
227                    .map_err(|e| ErrorKind::Search(e).into())
228            },
229        )
230        .await
231    }
232
233    pub async fn delete_index(&self, opts: &DeleteIndexOptions<'_>) -> error::Result<()> {
234        let retry_info = RetryRequest::new("search_delete_index", true);
235        let retry = opts.retry_strategy.clone();
236        let endpoint = opts.endpoint;
237        let copts = opts.into();
238
239        self.orchestrate_no_res_mgmt_call(
240            retry,
241            retry_info,
242            endpoint.map(|e| e.to_string()),
243            async |search| {
244                search
245                    .delete_index(&copts)
246                    .await
247                    .map_err(|e| ErrorKind::Search(e).into())
248            },
249        )
250        .await
251    }
252
253    pub async fn analyze_document(
254        &self,
255        opts: &AnalyzeDocumentOptions<'_>,
256    ) -> error::Result<DocumentAnalysis> {
257        let retry_info = RetryRequest::new("search_analyze_document", true);
258        let retry = opts.retry_strategy.clone();
259        let endpoint = opts.endpoint;
260        let copts = opts.into();
261
262        self.orchestrate_mgmt_call(
263            retry,
264            retry_info,
265            endpoint.map(|e| e.to_string()),
266            async |search| {
267                search
268                    .analyze_document(&copts)
269                    .await
270                    .map_err(|e| ErrorKind::Search(e).into())
271            },
272        )
273        .await
274    }
275
276    pub async fn get_indexed_documents_count(
277        &self,
278        opts: &GetIndexedDocumentsCountOptions<'_>,
279    ) -> error::Result<u64> {
280        let retry_info = RetryRequest::new("search_get_indexed_documents_count", true);
281        let retry = opts.retry_strategy.clone();
282        let endpoint = opts.endpoint;
283        let copts = opts.into();
284
285        self.orchestrate_mgmt_call(
286            retry,
287            retry_info,
288            endpoint.map(|e| e.to_string()),
289            async |search| {
290                search
291                    .get_indexed_documents_count(&copts)
292                    .await
293                    .map_err(|e| ErrorKind::Search(e).into())
294            },
295        )
296        .await
297    }
298
299    pub async fn pause_ingest(&self, opts: &PauseIngestOptions<'_>) -> error::Result<()> {
300        let retry_info = RetryRequest::new("search_pause_ingest", true);
301        let retry = opts.retry_strategy.clone();
302        let endpoint = opts.endpoint;
303        let copts = opts.into();
304
305        self.orchestrate_no_res_mgmt_call(
306            retry,
307            retry_info,
308            endpoint.map(|e| e.to_string()),
309            async |search| {
310                search
311                    .pause_ingest(&copts)
312                    .await
313                    .map_err(|e| ErrorKind::Search(e).into())
314            },
315        )
316        .await
317    }
318
319    pub async fn resume_ingest(&self, opts: &ResumeIngestOptions<'_>) -> error::Result<()> {
320        let retry_info = RetryRequest::new("search_resume_ingest", true);
321        let retry = opts.retry_strategy.clone();
322        let endpoint = opts.endpoint;
323        let copts = opts.into();
324
325        self.orchestrate_no_res_mgmt_call(
326            retry,
327            retry_info,
328            endpoint.map(|e| e.to_string()),
329            async |search| {
330                search
331                    .resume_ingest(&copts)
332                    .await
333                    .map_err(|e| ErrorKind::Search(e).into())
334            },
335        )
336        .await
337    }
338
339    pub async fn allow_querying(&self, opts: &AllowQueryingOptions<'_>) -> error::Result<()> {
340        let retry_info = RetryRequest::new("search_allow_querying", true);
341        let retry = opts.retry_strategy.clone();
342        let endpoint = opts.endpoint;
343        let copts = opts.into();
344
345        self.orchestrate_no_res_mgmt_call(
346            retry,
347            retry_info,
348            endpoint.map(|e| e.to_string()),
349            async |search| {
350                search
351                    .allow_querying(&copts)
352                    .await
353                    .map_err(|e| ErrorKind::Search(e).into())
354            },
355        )
356        .await
357    }
358
359    pub async fn disallow_querying(&self, opts: &DisallowQueryingOptions<'_>) -> error::Result<()> {
360        let retry_info = RetryRequest::new("search_disallow_querying", true);
361        let retry = opts.retry_strategy.clone();
362        let endpoint = opts.endpoint;
363        let copts = opts.into();
364
365        self.orchestrate_no_res_mgmt_call(
366            retry,
367            retry_info,
368            endpoint.map(|e| e.to_string()),
369            async |search| {
370                search
371                    .disallow_querying(&copts)
372                    .await
373                    .map_err(|e| ErrorKind::Search(e).into())
374            },
375        )
376        .await
377    }
378
379    pub async fn freeze_plan(&self, opts: &FreezePlanOptions<'_>) -> error::Result<()> {
380        let retry_info = RetryRequest::new("search_freeze_plan", true);
381        let retry = opts.retry_strategy.clone();
382        let endpoint = opts.endpoint;
383        let copts = opts.into();
384
385        self.orchestrate_no_res_mgmt_call(
386            retry,
387            retry_info,
388            endpoint.map(|e| e.to_string()),
389            async |search| {
390                search
391                    .freeze_plan(&copts)
392                    .await
393                    .map_err(|e| ErrorKind::Search(e).into())
394            },
395        )
396        .await
397    }
398
399    pub async fn unfreeze_plan(&self, opts: &UnfreezePlanOptions<'_>) -> error::Result<()> {
400        let retry_info = RetryRequest::new("search_unfreeze_plan", true);
401        let retry = opts.retry_strategy.clone();
402        let endpoint = opts.endpoint;
403        let copts = opts.into();
404
405        self.orchestrate_no_res_mgmt_call(
406            retry,
407            retry_info,
408            endpoint.map(|e| e.to_string()),
409            async |search| {
410                search
411                    .unfreeze_plan(&copts)
412                    .await
413                    .map_err(|e| ErrorKind::Search(e).into())
414            },
415        )
416        .await
417    }
418
419    pub async fn ensure_index(&self, opts: &EnsureIndexOptions<'_>) -> error::Result<()> {
420        let mut helper = EnsureIndexHelper::new(
421            self.http_component.user_agent(),
422            opts.index_name,
423            opts.bucket_name,
424            opts.scope_name,
425            opts.on_behalf_of_info,
426        );
427
428        let backoff = ExponentialBackoffCalculator::new(
429            Duration::from_millis(100),
430            Duration::from_millis(1000),
431            1.5,
432        );
433
434        self.http_component
435            .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
436                helper
437                    .clone()
438                    .poll(&EnsureIndexPollOptions {
439                        client,
440                        targets,
441                        desired_state: opts.desired_state,
442                    })
443                    .await
444                    .map_err(error::Error::from)
445            })
446            .await
447    }
448
449    pub async fn ping_all_endpoints(
450        &self,
451        on_behalf_of: Option<&httpx::request::OnBehalfOfInfo>,
452    ) -> error::Result<Vec<error::Result<()>>> {
453        let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
454
455        let copts = PingOptions { on_behalf_of };
456
457        let mut handles = Vec::with_capacity(targets.len());
458        let user_agent = self.http_component.user_agent().to_string();
459        for target in targets {
460            let user_agent = user_agent.clone();
461            let client = Search::<C> {
462                http_client: client.clone(),
463                user_agent,
464                endpoint: target.endpoint,
465                canonical_endpoint: target.canonical_endpoint,
466                auth: target.auth,
467                vector_search_enabled: false,
468                tracing: self.tracing.clone(),
469            };
470
471            let handle = self.ping_one(client, copts.clone());
472
473            handles.push(handle);
474        }
475
476        let results = join_all(handles).await;
477
478        Ok(results)
479    }
480
481    pub async fn create_ping_report(
482        &self,
483        opts: PingSearchReportOptions<'_>,
484    ) -> error::Result<Vec<EndpointPingReport>> {
485        let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
486
487        let copts = PingOptions {
488            on_behalf_of: opts.on_behalf_of,
489        };
490        let timeout = opts.timeout;
491
492        let mut handles = Vec::with_capacity(targets.len());
493        let user_agent = self.http_component.user_agent().to_string();
494        for target in targets {
495            let user_agent = user_agent.clone();
496            let client = Search::<C> {
497                http_client: client.clone(),
498                user_agent,
499                endpoint: target.endpoint,
500                canonical_endpoint: target.canonical_endpoint,
501                auth: target.auth,
502
503                vector_search_enabled: self.state.load().vector_search_enabled,
504                tracing: self.tracing.clone(),
505            };
506
507            let handle = self.create_one_report(client, timeout, copts.clone());
508
509            handles.push(handle);
510        }
511
512        let reports = join_all(handles).await;
513
514        Ok(reports)
515    }
516
517    async fn ping_one(&self, client: Search<C>, opts: PingOptions<'_>) -> error::Result<()> {
518        client.ping(&opts).await.map_err(error::Error::from)
519    }
520
521    async fn create_one_report(
522        &self,
523        client: Search<C>,
524        timeout: Duration,
525        opts: PingOptions<'_>,
526    ) -> EndpointPingReport {
527        let start = std::time::Instant::now();
528        let res = select! {
529            e = tokio::time::sleep(timeout) => {
530                return EndpointPingReport {
531                    remote: client.endpoint,
532                    error: None,
533                    latency: std::time::Instant::now().sub(start),
534                    id: None,
535                    namespace: None,
536                    state: PingState::Timeout,
537                }
538            }
539            r = client.ping(&opts) => r.map_err(error::Error::from),
540        };
541        let end = std::time::Instant::now();
542
543        let (error, state) = match res {
544            Ok(_) => (None, PingState::Ok),
545            Err(e) => (Some(e), PingState::Error),
546        };
547
548        EndpointPingReport {
549            remote: client.endpoint,
550            error,
551            latency: end.sub(start),
552            id: None,
553            namespace: None,
554            state,
555        }
556    }
557
558    async fn orchestrate_mgmt_call<Fut, Resp>(
559        &self,
560        retry_strategy: Arc<dyn RetryStrategy>,
561        retry_info: RetryRequest,
562        endpoint: Option<String>,
563        operation: impl Fn(Search<C>) -> Fut + Send + Sync,
564    ) -> error::Result<Resp>
565    where
566        Resp: Send + Sync,
567        Fut: Future<Output = error::Result<Resp>> + Send,
568        C: Client,
569    {
570        orchestrate_retries(
571            self.retry_manager.clone(),
572            retry_strategy,
573            retry_info,
574            async || {
575                self.http_component
576                    .orchestrate_endpoint(
577                        endpoint.clone(),
578                        async |client: Arc<C>,
579                               endpoint_id: String,
580                               endpoint: String,
581                               canonical_endpoint: String,
582                               auth: Auth| {
583                            operation(Search::<C> {
584                                http_client: client,
585                                user_agent: self.http_component.user_agent().to_string(),
586                                endpoint,
587                                canonical_endpoint,
588                                auth,
589
590                                vector_search_enabled: self.state.load().vector_search_enabled,
591                                tracing: self.tracing.clone(),
592                            })
593                            .await
594                        },
595                    )
596                    .await
597            },
598        )
599        .await
600    }
601
602    async fn orchestrate_no_res_mgmt_call<Fut>(
603        &self,
604        retry_strategy: Arc<dyn RetryStrategy>,
605        retry_info: RetryRequest,
606        endpoint: Option<String>,
607        operation: impl Fn(Search<C>) -> Fut + Send + Sync,
608    ) -> error::Result<()>
609    where
610        Fut: Future<Output = error::Result<()>> + Send,
611        C: Client,
612    {
613        orchestrate_retries(
614            self.retry_manager.clone(),
615            retry_strategy,
616            retry_info,
617            async || {
618                self.http_component
619                    .orchestrate_endpoint(
620                        endpoint.clone(),
621                        async |client: Arc<C>,
622                               endpoint_id: String,
623                               endpoint: String,
624                               canonical_endpoint: String,
625                               auth: Auth| {
626                            operation(Search::<C> {
627                                http_client: client,
628                                user_agent: self.http_component.user_agent().to_string(),
629                                endpoint,
630                                canonical_endpoint,
631                                auth,
632
633                                vector_search_enabled: self.state.load().vector_search_enabled,
634                                tracing: self.tracing.clone(),
635                            })
636                            .await
637                        },
638                    )
639                    .await
640            },
641        )
642        .await
643    }
644}