1use crate::authenticator::Authenticator;
20use crate::componentconfigs::NetworkAndCanonicalEndpoint;
21use crate::diagnosticscomponent::PingSearchReportOptions;
22use crate::error::ErrorKind;
23use crate::httpcomponent::{HttpComponent, HttpComponentState};
24use crate::httpx::client::Client;
25use crate::httpx::request::Auth;
26use crate::mgmtx::node_target::NodeTarget;
27use crate::options::search::SearchOptions;
28use crate::options::search_management::{
29 AllowQueryingOptions, AnalyzeDocumentOptions, DeleteIndexOptions, DisallowQueryingOptions,
30 EnsureIndexOptions, FreezePlanOptions, GetAllIndexesOptions, GetIndexOptions,
31 GetIndexedDocumentsCountOptions, PauseIngestOptions, ResumeIngestOptions, UnfreezePlanOptions,
32 UpsertIndexOptions,
33};
34use crate::results::pingreport::{EndpointPingReport, PingState};
35use crate::results::search::SearchResultStream;
36use crate::retry::{orchestrate_retries, RetryManager, RetryRequest, RetryStrategy};
37use crate::retrybesteffort::ExponentialBackoffCalculator;
38use crate::searchx::document_analysis::DocumentAnalysis;
39use crate::searchx::ensure_index_helper::EnsureIndexHelper;
40use crate::searchx::index::Index;
41use crate::searchx::mgmt_options::{EnsureIndexPollOptions, PingOptions};
42use crate::searchx::search::Search;
43use crate::service_type::ServiceType;
44use crate::tracingcomponent::TracingComponent;
45use crate::{error, httpx};
46use arc_swap::ArcSwap;
47use futures::future::join_all;
48use futures::StreamExt;
49use std::collections::HashMap;
50use std::future::Future;
51use std::ops::Sub;
52use std::sync::Arc;
53use std::time::Duration;
54use tokio::select;
55
56pub(crate) struct SearchComponent<C: Client> {
57 http_component: HttpComponent<C>,
58 tracing: Arc<TracingComponent>,
59
60 retry_manager: Arc<RetryManager>,
61
62 state: ArcSwap<SearchComponentState>,
63}
64
65#[derive(Debug)]
66pub(crate) struct SearchComponentState {
67 pub vector_search_enabled: bool,
68}
69
70pub(crate) struct SearchComponentConfig {
71 pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
72 pub authenticator: Authenticator,
73
74 pub vector_search_enabled: bool,
75}
76
77#[derive(Debug)]
78pub(crate) struct SearchComponentOptions {
79 pub user_agent: String,
80}
81
82impl<C: Client + 'static> SearchComponent<C> {
83 pub fn new(
84 retry_manager: Arc<RetryManager>,
85 http_client: Arc<C>,
86 tracing: Arc<TracingComponent>,
87 config: SearchComponentConfig,
88 opts: SearchComponentOptions,
89 ) -> Self {
90 Self {
91 http_component: HttpComponent::new(
92 ServiceType::SEARCH,
93 opts.user_agent,
94 http_client,
95 HttpComponentState::new(config.endpoints, config.authenticator),
96 ),
97 tracing,
98 retry_manager,
99 state: ArcSwap::new(Arc::new(SearchComponentState {
100 vector_search_enabled: config.vector_search_enabled,
101 })),
102 }
103 }
104
105 pub fn reconfigure(&self, config: SearchComponentConfig) {
106 self.http_component.reconfigure(HttpComponentState::new(
107 config.endpoints,
108 config.authenticator,
109 ));
110
111 self.state.swap(Arc::new(SearchComponentState {
112 vector_search_enabled: config.vector_search_enabled,
113 }));
114 }
115
116 pub async fn query(&self, opts: SearchOptions) -> error::Result<SearchResultStream> {
117 if (opts.knn.is_some() || opts.knn_operator.is_some())
118 && !self.state.load().vector_search_enabled
119 {
120 return Err(ErrorKind::FeatureNotAvailable {
121 feature: "Vector Search".to_string(),
122 msg: "vector queries are available from Couchbase Server 7.6.0 and above"
123 .to_string(),
124 }
125 .into());
126 }
127 let retry_info = RetryRequest::new("search_query", true);
128
129 let retry = opts.retry_strategy.clone();
130 let endpoint = opts.endpoint.clone();
131 let copts = opts.into();
132
133 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
134 self.http_component
135 .orchestrate_endpoint(
136 endpoint.clone(),
137 async |client: Arc<C>,
138 endpoint_id: String,
139 endpoint: String,
140 canonical_endpoint: String,
141 auth: Auth| {
142 let res = match (Search::<C> {
143 http_client: client,
144 user_agent: self.http_component.user_agent().to_string(),
145 endpoint: endpoint.clone(),
146 canonical_endpoint,
147 auth,
148
149 vector_search_enabled: self.state.load().vector_search_enabled,
150 tracing: self.tracing.clone(),
151 }
152 .query(&copts)
153 .await)
154 {
155 Ok(r) => r,
156 Err(e) => return Err(ErrorKind::Search(e).into()),
157 };
158
159 Ok(SearchResultStream {
160 inner: res,
161 endpoint,
162 })
163 },
164 )
165 .await
166 })
167 .await
168 }
169
170 pub async fn get_index(&self, opts: &GetIndexOptions<'_>) -> error::Result<Index> {
171 let retry_info = RetryRequest::new("search_get_index", true);
172 let retry = opts.retry_strategy.clone();
173 let endpoint = opts.endpoint;
174 let copts = opts.into();
175
176 self.orchestrate_mgmt_call(
177 retry,
178 retry_info,
179 endpoint.map(|e| e.to_string()),
180 async |search| {
181 search
182 .get_index(&copts)
183 .await
184 .map_err(|e| ErrorKind::Search(e).into())
185 },
186 )
187 .await
188 }
189
190 pub async fn get_all_indexes(
191 &self,
192 opts: &GetAllIndexesOptions<'_>,
193 ) -> error::Result<Vec<Index>> {
194 let retry_info = RetryRequest::new("search_get_all_indexes", true);
195 let retry = opts.retry_strategy.clone();
196 let endpoint = opts.endpoint;
197 let copts = opts.into();
198
199 self.orchestrate_mgmt_call(
200 retry,
201 retry_info,
202 endpoint.map(|e| e.to_string()),
203 async |search| {
204 search
205 .get_all_indexes(&copts)
206 .await
207 .map_err(|e| ErrorKind::Search(e).into())
208 },
209 )
210 .await
211 }
212
213 pub async fn upsert_index(&self, opts: &UpsertIndexOptions<'_>) -> error::Result<()> {
214 let retry_info = RetryRequest::new("search_upsert_index", true);
215 let retry = opts.retry_strategy.clone();
216 let endpoint = opts.endpoint;
217 let copts = opts.into();
218
219 self.orchestrate_no_res_mgmt_call(
220 retry,
221 retry_info,
222 endpoint.map(|e| e.to_string()),
223 async |search| {
224 search
225 .upsert_index(&copts)
226 .await
227 .map_err(|e| ErrorKind::Search(e).into())
228 },
229 )
230 .await
231 }
232
233 pub async fn delete_index(&self, opts: &DeleteIndexOptions<'_>) -> error::Result<()> {
234 let retry_info = RetryRequest::new("search_delete_index", true);
235 let retry = opts.retry_strategy.clone();
236 let endpoint = opts.endpoint;
237 let copts = opts.into();
238
239 self.orchestrate_no_res_mgmt_call(
240 retry,
241 retry_info,
242 endpoint.map(|e| e.to_string()),
243 async |search| {
244 search
245 .delete_index(&copts)
246 .await
247 .map_err(|e| ErrorKind::Search(e).into())
248 },
249 )
250 .await
251 }
252
253 pub async fn analyze_document(
254 &self,
255 opts: &AnalyzeDocumentOptions<'_>,
256 ) -> error::Result<DocumentAnalysis> {
257 let retry_info = RetryRequest::new("search_analyze_document", true);
258 let retry = opts.retry_strategy.clone();
259 let endpoint = opts.endpoint;
260 let copts = opts.into();
261
262 self.orchestrate_mgmt_call(
263 retry,
264 retry_info,
265 endpoint.map(|e| e.to_string()),
266 async |search| {
267 search
268 .analyze_document(&copts)
269 .await
270 .map_err(|e| ErrorKind::Search(e).into())
271 },
272 )
273 .await
274 }
275
276 pub async fn get_indexed_documents_count(
277 &self,
278 opts: &GetIndexedDocumentsCountOptions<'_>,
279 ) -> error::Result<u64> {
280 let retry_info = RetryRequest::new("search_get_indexed_documents_count", true);
281 let retry = opts.retry_strategy.clone();
282 let endpoint = opts.endpoint;
283 let copts = opts.into();
284
285 self.orchestrate_mgmt_call(
286 retry,
287 retry_info,
288 endpoint.map(|e| e.to_string()),
289 async |search| {
290 search
291 .get_indexed_documents_count(&copts)
292 .await
293 .map_err(|e| ErrorKind::Search(e).into())
294 },
295 )
296 .await
297 }
298
299 pub async fn pause_ingest(&self, opts: &PauseIngestOptions<'_>) -> error::Result<()> {
300 let retry_info = RetryRequest::new("search_pause_ingest", true);
301 let retry = opts.retry_strategy.clone();
302 let endpoint = opts.endpoint;
303 let copts = opts.into();
304
305 self.orchestrate_no_res_mgmt_call(
306 retry,
307 retry_info,
308 endpoint.map(|e| e.to_string()),
309 async |search| {
310 search
311 .pause_ingest(&copts)
312 .await
313 .map_err(|e| ErrorKind::Search(e).into())
314 },
315 )
316 .await
317 }
318
319 pub async fn resume_ingest(&self, opts: &ResumeIngestOptions<'_>) -> error::Result<()> {
320 let retry_info = RetryRequest::new("search_resume_ingest", true);
321 let retry = opts.retry_strategy.clone();
322 let endpoint = opts.endpoint;
323 let copts = opts.into();
324
325 self.orchestrate_no_res_mgmt_call(
326 retry,
327 retry_info,
328 endpoint.map(|e| e.to_string()),
329 async |search| {
330 search
331 .resume_ingest(&copts)
332 .await
333 .map_err(|e| ErrorKind::Search(e).into())
334 },
335 )
336 .await
337 }
338
339 pub async fn allow_querying(&self, opts: &AllowQueryingOptions<'_>) -> error::Result<()> {
340 let retry_info = RetryRequest::new("search_allow_querying", true);
341 let retry = opts.retry_strategy.clone();
342 let endpoint = opts.endpoint;
343 let copts = opts.into();
344
345 self.orchestrate_no_res_mgmt_call(
346 retry,
347 retry_info,
348 endpoint.map(|e| e.to_string()),
349 async |search| {
350 search
351 .allow_querying(&copts)
352 .await
353 .map_err(|e| ErrorKind::Search(e).into())
354 },
355 )
356 .await
357 }
358
359 pub async fn disallow_querying(&self, opts: &DisallowQueryingOptions<'_>) -> error::Result<()> {
360 let retry_info = RetryRequest::new("search_disallow_querying", true);
361 let retry = opts.retry_strategy.clone();
362 let endpoint = opts.endpoint;
363 let copts = opts.into();
364
365 self.orchestrate_no_res_mgmt_call(
366 retry,
367 retry_info,
368 endpoint.map(|e| e.to_string()),
369 async |search| {
370 search
371 .disallow_querying(&copts)
372 .await
373 .map_err(|e| ErrorKind::Search(e).into())
374 },
375 )
376 .await
377 }
378
379 pub async fn freeze_plan(&self, opts: &FreezePlanOptions<'_>) -> error::Result<()> {
380 let retry_info = RetryRequest::new("search_freeze_plan", true);
381 let retry = opts.retry_strategy.clone();
382 let endpoint = opts.endpoint;
383 let copts = opts.into();
384
385 self.orchestrate_no_res_mgmt_call(
386 retry,
387 retry_info,
388 endpoint.map(|e| e.to_string()),
389 async |search| {
390 search
391 .freeze_plan(&copts)
392 .await
393 .map_err(|e| ErrorKind::Search(e).into())
394 },
395 )
396 .await
397 }
398
399 pub async fn unfreeze_plan(&self, opts: &UnfreezePlanOptions<'_>) -> error::Result<()> {
400 let retry_info = RetryRequest::new("search_unfreeze_plan", true);
401 let retry = opts.retry_strategy.clone();
402 let endpoint = opts.endpoint;
403 let copts = opts.into();
404
405 self.orchestrate_no_res_mgmt_call(
406 retry,
407 retry_info,
408 endpoint.map(|e| e.to_string()),
409 async |search| {
410 search
411 .unfreeze_plan(&copts)
412 .await
413 .map_err(|e| ErrorKind::Search(e).into())
414 },
415 )
416 .await
417 }
418
419 pub async fn ensure_index(&self, opts: &EnsureIndexOptions<'_>) -> error::Result<()> {
420 let mut helper = EnsureIndexHelper::new(
421 self.http_component.user_agent(),
422 opts.index_name,
423 opts.bucket_name,
424 opts.scope_name,
425 opts.on_behalf_of_info,
426 );
427
428 let backoff = ExponentialBackoffCalculator::new(
429 Duration::from_millis(100),
430 Duration::from_millis(1000),
431 1.5,
432 );
433
434 self.http_component
435 .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
436 helper
437 .clone()
438 .poll(&EnsureIndexPollOptions {
439 client,
440 targets,
441 desired_state: opts.desired_state,
442 })
443 .await
444 .map_err(error::Error::from)
445 })
446 .await
447 }
448
449 pub async fn ping_all_endpoints(
450 &self,
451 on_behalf_of: Option<&httpx::request::OnBehalfOfInfo>,
452 ) -> error::Result<Vec<error::Result<()>>> {
453 let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
454
455 let copts = PingOptions { on_behalf_of };
456
457 let mut handles = Vec::with_capacity(targets.len());
458 let user_agent = self.http_component.user_agent().to_string();
459 for target in targets {
460 let user_agent = user_agent.clone();
461 let client = Search::<C> {
462 http_client: client.clone(),
463 user_agent,
464 endpoint: target.endpoint,
465 canonical_endpoint: target.canonical_endpoint,
466 auth: target.auth,
467 vector_search_enabled: false,
468 tracing: self.tracing.clone(),
469 };
470
471 let handle = self.ping_one(client, copts.clone());
472
473 handles.push(handle);
474 }
475
476 let results = join_all(handles).await;
477
478 Ok(results)
479 }
480
481 pub async fn create_ping_report(
482 &self,
483 opts: PingSearchReportOptions<'_>,
484 ) -> error::Result<Vec<EndpointPingReport>> {
485 let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
486
487 let copts = PingOptions {
488 on_behalf_of: opts.on_behalf_of,
489 };
490 let timeout = opts.timeout;
491
492 let mut handles = Vec::with_capacity(targets.len());
493 let user_agent = self.http_component.user_agent().to_string();
494 for target in targets {
495 let user_agent = user_agent.clone();
496 let client = Search::<C> {
497 http_client: client.clone(),
498 user_agent,
499 endpoint: target.endpoint,
500 canonical_endpoint: target.canonical_endpoint,
501 auth: target.auth,
502
503 vector_search_enabled: self.state.load().vector_search_enabled,
504 tracing: self.tracing.clone(),
505 };
506
507 let handle = self.create_one_report(client, timeout, copts.clone());
508
509 handles.push(handle);
510 }
511
512 let reports = join_all(handles).await;
513
514 Ok(reports)
515 }
516
517 async fn ping_one(&self, client: Search<C>, opts: PingOptions<'_>) -> error::Result<()> {
518 client.ping(&opts).await.map_err(error::Error::from)
519 }
520
521 async fn create_one_report(
522 &self,
523 client: Search<C>,
524 timeout: Duration,
525 opts: PingOptions<'_>,
526 ) -> EndpointPingReport {
527 let start = std::time::Instant::now();
528 let res = select! {
529 e = tokio::time::sleep(timeout) => {
530 return EndpointPingReport {
531 remote: client.endpoint,
532 error: None,
533 latency: std::time::Instant::now().sub(start),
534 id: None,
535 namespace: None,
536 state: PingState::Timeout,
537 }
538 }
539 r = client.ping(&opts) => r.map_err(error::Error::from),
540 };
541 let end = std::time::Instant::now();
542
543 let (error, state) = match res {
544 Ok(_) => (None, PingState::Ok),
545 Err(e) => (Some(e), PingState::Error),
546 };
547
548 EndpointPingReport {
549 remote: client.endpoint,
550 error,
551 latency: end.sub(start),
552 id: None,
553 namespace: None,
554 state,
555 }
556 }
557
558 async fn orchestrate_mgmt_call<Fut, Resp>(
559 &self,
560 retry_strategy: Arc<dyn RetryStrategy>,
561 retry_info: RetryRequest,
562 endpoint: Option<String>,
563 operation: impl Fn(Search<C>) -> Fut + Send + Sync,
564 ) -> error::Result<Resp>
565 where
566 Resp: Send + Sync,
567 Fut: Future<Output = error::Result<Resp>> + Send,
568 C: Client,
569 {
570 orchestrate_retries(
571 self.retry_manager.clone(),
572 retry_strategy,
573 retry_info,
574 async || {
575 self.http_component
576 .orchestrate_endpoint(
577 endpoint.clone(),
578 async |client: Arc<C>,
579 endpoint_id: String,
580 endpoint: String,
581 canonical_endpoint: String,
582 auth: Auth| {
583 operation(Search::<C> {
584 http_client: client,
585 user_agent: self.http_component.user_agent().to_string(),
586 endpoint,
587 canonical_endpoint,
588 auth,
589
590 vector_search_enabled: self.state.load().vector_search_enabled,
591 tracing: self.tracing.clone(),
592 })
593 .await
594 },
595 )
596 .await
597 },
598 )
599 .await
600 }
601
602 async fn orchestrate_no_res_mgmt_call<Fut>(
603 &self,
604 retry_strategy: Arc<dyn RetryStrategy>,
605 retry_info: RetryRequest,
606 endpoint: Option<String>,
607 operation: impl Fn(Search<C>) -> Fut + Send + Sync,
608 ) -> error::Result<()>
609 where
610 Fut: Future<Output = error::Result<()>> + Send,
611 C: Client,
612 {
613 orchestrate_retries(
614 self.retry_manager.clone(),
615 retry_strategy,
616 retry_info,
617 async || {
618 self.http_component
619 .orchestrate_endpoint(
620 endpoint.clone(),
621 async |client: Arc<C>,
622 endpoint_id: String,
623 endpoint: String,
624 canonical_endpoint: String,
625 auth: Auth| {
626 operation(Search::<C> {
627 http_client: client,
628 user_agent: self.http_component.user_agent().to_string(),
629 endpoint,
630 canonical_endpoint,
631 auth,
632
633 vector_search_enabled: self.state.load().vector_search_enabled,
634 tracing: self.tracing.clone(),
635 })
636 .await
637 },
638 )
639 .await
640 },
641 )
642 .await
643 }
644}