couchbase_core/
querycomponent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::authenticator::Authenticator;
20use crate::diagnosticscomponent::PingQueryReportOptions;
21use crate::error::ErrorKind;
22use crate::httpcomponent::{HttpComponent, HttpComponentState};
23use crate::httpx::client::Client;
24use crate::mgmtx::node_target::NodeTarget;
25use crate::options::query::{
26    BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions,
27    DropPrimaryIndexOptions, EnsureIndexOptions, GetAllIndexesOptions, QueryOptions,
28    WatchIndexesOptions,
29};
30use crate::queryx::ensure_index_helper::EnsureIndexHelper;
31use crate::queryx::index::Index;
32use crate::queryx::preparedquery::{PreparedQuery, PreparedStatementCache};
33use crate::queryx::query::Query;
34use crate::queryx::query_options::{EnsureIndexPollOptions, PingOptions};
35use crate::results::pingreport::{EndpointPingReport, PingState};
36use crate::results::query::QueryResultStream;
37use crate::retry::{orchestrate_retries, RetryInfo, RetryManager, DEFAULT_RETRY_STRATEGY};
38use crate::retrybesteffort::ExponentialBackoffCalculator;
39use crate::service_type::ServiceType;
40use crate::{error, httpx};
41use futures::future::join_all;
42use futures::StreamExt;
43use std::collections::HashMap;
44use std::future::Future;
45use std::ops::Sub;
46use std::sync::{Arc, Mutex};
47use std::time::Duration;
48use tokio::select;
49
50pub(crate) struct QueryComponent<C: Client> {
51    http_component: HttpComponent<C>,
52
53    retry_manager: Arc<RetryManager>,
54    prepared_cache: Arc<Mutex<PreparedStatementCache>>,
55}
56
57#[derive(Debug)]
58pub(crate) struct QueryComponentConfig {
59    pub endpoints: HashMap<String, String>,
60    pub authenticator: Arc<Authenticator>,
61}
62
63pub(crate) struct QueryComponentOptions {
64    pub user_agent: String,
65}
66
67impl<C: Client + 'static> QueryComponent<C> {
68    pub fn new(
69        retry_manager: Arc<RetryManager>,
70        http_client: Arc<C>,
71        config: QueryComponentConfig,
72        opts: QueryComponentOptions,
73    ) -> Self {
74        Self {
75            http_component: HttpComponent::new(
76                ServiceType::QUERY,
77                opts.user_agent,
78                http_client,
79                HttpComponentState::new(config.endpoints, config.authenticator),
80            ),
81            retry_manager,
82            prepared_cache: Arc::new(Mutex::new(PreparedStatementCache::default())),
83        }
84    }
85
86    pub fn reconfigure(&self, config: QueryComponentConfig) {
87        self.http_component.reconfigure(HttpComponentState::new(
88            config.endpoints,
89            config.authenticator,
90        ))
91    }
92
93    pub async fn query(&self, opts: QueryOptions) -> error::Result<QueryResultStream> {
94        let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
95            retry_strategy
96        } else {
97            DEFAULT_RETRY_STRATEGY.clone()
98        };
99
100        let retry_info = RetryInfo::new("query", opts.read_only.unwrap_or_default(), retry);
101
102        let endpoint = opts.endpoint.clone();
103        let copts = opts.into();
104
105        orchestrate_retries(self.retry_manager.clone(), retry_info, async || {
106            self.http_component
107                .orchestrate_endpoint(
108                    endpoint.clone(),
109                    async |client: Arc<C>,
110                           endpoint_id: String,
111                           endpoint: String,
112                           username: String,
113                           password: String| {
114                        let res = match (Query::<C> {
115                            http_client: client,
116                            user_agent: self.http_component.user_agent().to_string(),
117                            endpoint: endpoint.clone(),
118                            username,
119                            password,
120                        }
121                        .query(&copts)
122                        .await)
123                        {
124                            Ok(r) => r,
125                            Err(e) => return Err(ErrorKind::Query(e).into()),
126                        };
127
128                        Ok(QueryResultStream {
129                            inner: res,
130                            endpoint,
131                        })
132                    },
133                )
134                .await
135        })
136        .await
137    }
138
139    pub async fn prepared_query(&self, opts: QueryOptions) -> error::Result<QueryResultStream> {
140        let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
141            retry_strategy
142        } else {
143            DEFAULT_RETRY_STRATEGY.clone()
144        };
145        let retry_info =
146            RetryInfo::new("prepared_query", opts.read_only.unwrap_or_default(), retry);
147
148        let endpoint = opts.endpoint.clone();
149        let copts = opts.into();
150
151        orchestrate_retries(self.retry_manager.clone(), retry_info, async || {
152            self.http_component
153                .orchestrate_endpoint(
154                    endpoint.clone(),
155                    async |client: Arc<C>,
156                           endpoint_id: String,
157                           endpoint: String,
158                           username: String,
159                           password: String| {
160                        let res = match (PreparedQuery {
161                            executor: Query::<C> {
162                                http_client: client,
163                                user_agent: self.http_component.user_agent().to_string(),
164                                endpoint: endpoint.clone(),
165                                username,
166                                password,
167                            },
168                            cache: self.prepared_cache.clone(),
169                        }
170                        .prepared_query(&copts)
171                        .await)
172                        {
173                            Ok(r) => r,
174                            Err(e) => return Err(ErrorKind::Query(e).into()),
175                        };
176
177                        Ok(QueryResultStream {
178                            inner: res,
179                            endpoint,
180                        })
181                    },
182                )
183                .await
184        })
185        .await
186    }
187
188    pub async fn get_all_indexes(
189        &self,
190        opts: &GetAllIndexesOptions<'_>,
191    ) -> error::Result<Vec<Index>> {
192        let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
193            retry_strategy
194        } else {
195            DEFAULT_RETRY_STRATEGY.clone()
196        };
197
198        let retry_info = RetryInfo::new("query_get_all_indexes", true, retry);
199
200        let endpoint = opts.endpoint.clone();
201        let copts = opts.into();
202
203        orchestrate_retries(self.retry_manager.clone(), retry_info, async || {
204            self.http_component
205                .orchestrate_endpoint(
206                    endpoint.clone(),
207                    async |client: Arc<C>,
208                           endpoint_id: String,
209                           endpoint: String,
210                           username: String,
211                           password: String| {
212                        let res = match (Query::<C> {
213                            http_client: client,
214                            user_agent: self.http_component.user_agent().to_string(),
215                            endpoint: endpoint.clone(),
216                            username,
217                            password,
218                        }
219                        .get_all_indexes(&copts)
220                        .await)
221                        {
222                            Ok(r) => r,
223                            Err(e) => {
224                                return Err(ErrorKind::Query(e).into());
225                            }
226                        };
227
228                        Ok(res)
229                    },
230                )
231                .await
232        })
233        .await
234    }
235
236    pub async fn create_primary_index(
237        &self,
238        opts: &CreatePrimaryIndexOptions<'_>,
239    ) -> error::Result<()> {
240        let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
241            retry_strategy
242        } else {
243            DEFAULT_RETRY_STRATEGY.clone()
244        };
245
246        let retry_info = RetryInfo::new("query_create_primary_index", false, retry);
247
248        let endpoint = opts.endpoint.clone();
249        let copts = opts.into();
250
251        self.orchestrate_no_res_mgmt_call(
252            retry_info,
253            endpoint.map(|e| e.to_string()),
254            async |query| {
255                query
256                    .create_primary_index(&copts)
257                    .await
258                    .map_err(|e| ErrorKind::Query(e).into())
259            },
260        )
261        .await
262    }
263
264    pub async fn create_index(&self, opts: &CreateIndexOptions<'_>) -> error::Result<()> {
265        let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
266            retry_strategy
267        } else {
268            DEFAULT_RETRY_STRATEGY.clone()
269        };
270
271        let retry_info = RetryInfo::new("query_create_index", false, retry);
272
273        let endpoint = opts.endpoint.clone();
274        let copts = opts.into();
275
276        self.orchestrate_no_res_mgmt_call(
277            retry_info,
278            endpoint.map(|e| e.to_string()),
279            async |query| {
280                query
281                    .create_index(&copts)
282                    .await
283                    .map_err(|e| ErrorKind::Query(e).into())
284            },
285        )
286        .await
287    }
288
289    pub async fn drop_primary_index(
290        &self,
291        opts: &DropPrimaryIndexOptions<'_>,
292    ) -> error::Result<()> {
293        let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
294            retry_strategy
295        } else {
296            DEFAULT_RETRY_STRATEGY.clone()
297        };
298
299        let retry_info = RetryInfo::new("query_drop_primary_index", false, retry);
300
301        let endpoint = opts.endpoint.clone();
302        let copts = opts.into();
303
304        self.orchestrate_no_res_mgmt_call(
305            retry_info,
306            endpoint.map(|e| e.to_string()),
307            async |query| {
308                query
309                    .drop_primary_index(&copts)
310                    .await
311                    .map_err(|e| ErrorKind::Query(e).into())
312            },
313        )
314        .await
315    }
316
317    pub async fn drop_index(&self, opts: &DropIndexOptions<'_>) -> error::Result<()> {
318        let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
319            retry_strategy
320        } else {
321            DEFAULT_RETRY_STRATEGY.clone()
322        };
323
324        let retry_info = RetryInfo::new("query_drop_index", false, retry);
325
326        let endpoint = opts.endpoint.clone();
327        let copts = opts.into();
328
329        self.orchestrate_no_res_mgmt_call(
330            retry_info,
331            endpoint.map(|e| e.to_string()),
332            async |query| {
333                query
334                    .drop_index(&copts)
335                    .await
336                    .map_err(|e| ErrorKind::Query(e).into())
337            },
338        )
339        .await
340    }
341
342    pub async fn build_deferred_indexes(
343        &self,
344        opts: &BuildDeferredIndexesOptions<'_>,
345    ) -> error::Result<()> {
346        let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
347            retry_strategy
348        } else {
349            DEFAULT_RETRY_STRATEGY.clone()
350        };
351
352        let retry_info = RetryInfo::new("query_build_deferred_indexes", false, retry);
353
354        let endpoint = opts.endpoint.clone();
355        let copts = opts.into();
356
357        self.orchestrate_no_res_mgmt_call(
358            retry_info,
359            endpoint.map(|e| e.to_string()),
360            async |query| {
361                query
362                    .build_deferred_indexes(&copts)
363                    .await
364                    .map_err(|e| ErrorKind::Query(e).into())
365            },
366        )
367        .await
368    }
369
370    pub async fn watch_indexes(&self, opts: &WatchIndexesOptions<'_>) -> error::Result<()> {
371        let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
372            retry_strategy
373        } else {
374            DEFAULT_RETRY_STRATEGY.clone()
375        };
376
377        let retry_info = RetryInfo::new("query_watch_indexes", true, retry);
378
379        let endpoint = opts.endpoint.clone();
380        let copts = opts.into();
381
382        self.orchestrate_no_res_mgmt_call(
383            retry_info,
384            endpoint.map(|e| e.to_string()),
385            async |query| {
386                query
387                    .watch_indexes(&copts)
388                    .await
389                    .map_err(|e| ErrorKind::Query(e).into())
390            },
391        )
392        .await
393    }
394
395    pub async fn ensure_index(&self, opts: &EnsureIndexOptions<'_>) -> error::Result<()> {
396        let mut helper = EnsureIndexHelper::new(
397            self.http_component.user_agent(),
398            opts.index_name,
399            opts.bucket_name,
400            opts.scope_name,
401            opts.collection_name,
402            opts.on_behalf_of_info,
403        );
404
405        let backoff = ExponentialBackoffCalculator::new(
406            Duration::from_millis(100),
407            Duration::from_millis(1000),
408            1.5,
409        );
410
411        self.http_component
412            .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
413                helper
414                    .clone()
415                    .poll(&EnsureIndexPollOptions {
416                        client,
417                        targets,
418                        desired_state: opts.desired_state,
419                    })
420                    .await
421                    .map_err(error::Error::from)
422            })
423            .await
424    }
425
426    pub async fn ping_all_endpoints(
427        &self,
428        on_behalf_of: Option<&httpx::request::OnBehalfOfInfo>,
429    ) -> error::Result<Vec<error::Result<()>>> {
430        let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
431
432        let copts = PingOptions { on_behalf_of };
433
434        let mut handles = Vec::with_capacity(targets.len());
435        let user_agent = self.http_component.user_agent().to_string();
436        for target in targets {
437            let user_agent = user_agent.clone();
438            let client = Query::<C> {
439                http_client: client.clone(),
440                user_agent,
441                endpoint: target.endpoint.clone(),
442                username: target.username,
443                password: target.password,
444            };
445
446            let handle = self.ping_one(client, copts.clone());
447
448            handles.push(handle);
449        }
450
451        let results = join_all(handles).await;
452
453        Ok(results)
454    }
455
456    pub async fn create_ping_report(
457        &self,
458        opts: PingQueryReportOptions<'_>,
459    ) -> error::Result<Vec<EndpointPingReport>> {
460        let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
461
462        let copts = PingOptions {
463            on_behalf_of: opts.on_behalf_of,
464        };
465        let timeout = opts.timeout;
466
467        let mut handles = Vec::with_capacity(targets.len());
468        let user_agent = self.http_component.user_agent().to_string();
469        for target in targets {
470            let user_agent = user_agent.clone();
471            let client = Query::<C> {
472                http_client: client.clone(),
473                user_agent,
474                endpoint: target.endpoint.clone(),
475                username: target.username,
476                password: target.password,
477            };
478
479            let handle = self.create_one_report(client, timeout, copts.clone());
480
481            handles.push(handle);
482        }
483
484        let reports = join_all(handles).await;
485
486        Ok(reports)
487    }
488
489    async fn ping_one(&self, client: Query<C>, opts: PingOptions<'_>) -> error::Result<()> {
490        client.ping(&opts).await.map_err(error::Error::from)
491    }
492
493    async fn create_one_report(
494        &self,
495        client: Query<C>,
496        timeout: Duration,
497        opts: PingOptions<'_>,
498    ) -> EndpointPingReport {
499        let start = std::time::Instant::now();
500
501        let res = select! {
502            e = tokio::time::sleep(timeout) => {
503                return EndpointPingReport {
504                    remote: client.endpoint,
505                    error: None,
506                    latency: std::time::Instant::now().sub(start),
507                    id: None,
508                    namespace: None,
509                    state: PingState::Timeout,
510                }
511            }
512            r = client.ping(&opts) => r.map_err(error::Error::from),
513        };
514        let end = std::time::Instant::now();
515
516        let (error, state) = match res {
517            Ok(_) => (None, PingState::Ok),
518            Err(e) => (Some(e), PingState::Error),
519        };
520
521        EndpointPingReport {
522            remote: client.endpoint,
523            error,
524            latency: end.sub(start),
525            id: None,
526            namespace: None,
527            state,
528        }
529    }
530
531    async fn orchestrate_no_res_mgmt_call<Fut>(
532        &self,
533        retry_info: RetryInfo,
534        endpoint: Option<String>,
535        operation: impl Fn(Query<C>) -> Fut + Send + Sync,
536    ) -> error::Result<()>
537    where
538        Fut: Future<Output = error::Result<()>> + Send,
539        C: Client,
540    {
541        orchestrate_retries(self.retry_manager.clone(), retry_info, async || {
542            self.http_component
543                .orchestrate_endpoint(
544                    endpoint.clone(),
545                    async |client: Arc<C>,
546                           endpoint_id: String,
547                           endpoint: String,
548                           username: String,
549                           password: String| {
550                        operation(Query::<C> {
551                            http_client: client,
552                            user_agent: self.http_component.user_agent().to_string(),
553                            endpoint: endpoint.clone(),
554                            username,
555                            password,
556                        })
557                        .await
558                    },
559                )
560                .await
561        })
562        .await
563    }
564}