Skip to main content

couchbase_core/
querycomponent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::authenticator::Authenticator;
20use crate::componentconfigs::NetworkAndCanonicalEndpoint;
21use crate::diagnosticscomponent::PingQueryReportOptions;
22use crate::error::ErrorKind;
23use crate::httpcomponent::{HttpComponent, HttpComponentState};
24use crate::httpx::client::Client;
25use crate::httpx::request::Auth;
26use crate::mgmtx::node_target::NodeTarget;
27use crate::options::query::{
28    BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions,
29    DropPrimaryIndexOptions, EnsureIndexOptions, GetAllIndexesOptions, QueryOptions,
30    WatchIndexesOptions,
31};
32use crate::queryx::ensure_index_helper::EnsureIndexHelper;
33use crate::queryx::index::Index;
34use crate::queryx::preparedquery::{PreparedQuery, PreparedStatementCache};
35use crate::queryx::query::Query;
36use crate::queryx::query_options::{EnsureIndexPollOptions, PingOptions};
37use crate::results::pingreport::{EndpointPingReport, PingState};
38use crate::results::query::QueryResultStream;
39use crate::retry::{orchestrate_retries, RetryManager, RetryRequest, RetryStrategy};
40use crate::retrybesteffort::ExponentialBackoffCalculator;
41use crate::service_type::ServiceType;
42use crate::tracingcomponent::TracingComponent;
43use crate::{error, httpx};
44use futures::future::join_all;
45use futures::StreamExt;
46use std::collections::HashMap;
47use std::future::Future;
48use std::ops::Sub;
49use std::sync::{Arc, Mutex};
50use std::time::Duration;
51use tokio::select;
52use tracing::debug;
53
54pub(crate) struct QueryComponent<C: Client> {
55    id: String,
56    http_component: HttpComponent<C>,
57    tracing: Arc<TracingComponent>,
58
59    retry_manager: Arc<RetryManager>,
60    prepared_cache: Arc<Mutex<PreparedStatementCache>>,
61}
62
63pub(crate) struct QueryComponentConfig {
64    pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
65    pub authenticator: Authenticator,
66}
67
68pub(crate) struct QueryComponentOptions {
69    pub id: String,
70    pub user_agent: String,
71}
72
73impl<C: Client + 'static> QueryComponent<C> {
74    pub fn new(
75        retry_manager: Arc<RetryManager>,
76        http_client: Arc<C>,
77        tracing: Arc<TracingComponent>,
78        config: QueryComponentConfig,
79        opts: QueryComponentOptions,
80    ) -> Self {
81        Self {
82            id: opts.id,
83            http_component: HttpComponent::new(
84                ServiceType::QUERY,
85                opts.user_agent,
86                http_client,
87                HttpComponentState::new(config.endpoints, config.authenticator),
88            ),
89            tracing,
90            retry_manager,
91            prepared_cache: Arc::new(Mutex::new(PreparedStatementCache::default())),
92        }
93    }
94
95    pub fn reconfigure(&self, config: QueryComponentConfig) {
96        debug!(
97            "Query component {} updating endpoints to {:?}",
98            self.id,
99            &config.endpoints.keys().collect::<Vec<_>>()
100        );
101
102        self.http_component.reconfigure(HttpComponentState::new(
103            config.endpoints,
104            config.authenticator,
105        ))
106    }
107
108    pub async fn query(&self, opts: QueryOptions) -> error::Result<QueryResultStream> {
109        let retry_info = RetryRequest::new("query", opts.read_only.unwrap_or_default());
110
111        let retry = opts.retry_strategy.clone();
112        let endpoint = opts.endpoint.clone();
113        let copts = opts.into();
114
115        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
116            self.http_component
117                .orchestrate_endpoint(
118                    endpoint.clone(),
119                    async |client: Arc<C>,
120                           endpoint_id: String,
121                           endpoint: String,
122                           canonical_endpoint: String,
123                           auth: Auth| {
124                        let res = match (Query::<C> {
125                            http_client: client,
126                            user_agent: self.http_component.user_agent().to_string(),
127                            endpoint: endpoint.clone(),
128                            canonical_endpoint,
129                            auth,
130                            tracing: self.tracing.clone(),
131                        }
132                        .query(&copts)
133                        .await)
134                        {
135                            Ok(r) => r,
136                            Err(e) => return Err(ErrorKind::Query(e).into()),
137                        };
138
139                        Ok(QueryResultStream {
140                            inner: res,
141                            endpoint,
142                        })
143                    },
144                )
145                .await
146        })
147        .await
148    }
149
150    pub async fn prepared_query(&self, opts: QueryOptions) -> error::Result<QueryResultStream> {
151        let retry_info = RetryRequest::new("prepared_query", opts.read_only.unwrap_or_default());
152
153        let retry = opts.retry_strategy.clone();
154        let endpoint = opts.endpoint.clone();
155        let copts = opts.into();
156
157        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
158            self.http_component
159                .orchestrate_endpoint(
160                    endpoint.clone(),
161                    async |client: Arc<C>,
162                           endpoint_id: String,
163                           endpoint: String,
164                           canonical_endpoint: String,
165                           auth: Auth| {
166                        let res = match (PreparedQuery {
167                            executor: Query::<C> {
168                                http_client: client,
169                                user_agent: self.http_component.user_agent().to_string(),
170                                endpoint: endpoint.clone(),
171                                canonical_endpoint,
172                                auth,
173                                tracing: self.tracing.clone(),
174                            },
175                            cache: self.prepared_cache.clone(),
176                        }
177                        .prepared_query(&copts)
178                        .await)
179                        {
180                            Ok(r) => r,
181                            Err(e) => return Err(ErrorKind::Query(e).into()),
182                        };
183
184                        Ok(QueryResultStream {
185                            inner: res,
186                            endpoint,
187                        })
188                    },
189                )
190                .await
191        })
192        .await
193    }
194
195    pub async fn get_all_indexes(
196        &self,
197        opts: &GetAllIndexesOptions<'_>,
198    ) -> error::Result<Vec<Index>> {
199        let retry_info = RetryRequest::new("query_get_all_indexes", true);
200
201        let retry = opts.retry_strategy.clone();
202        let endpoint = opts.endpoint.clone();
203        let copts = opts.into();
204
205        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
206            self.http_component
207                .orchestrate_endpoint(
208                    endpoint.clone(),
209                    async |client: Arc<C>,
210                           endpoint_id: String,
211                           endpoint: String,
212                           canonical_endpoint: String,
213                           auth: Auth| {
214                        let res = match (Query::<C> {
215                            http_client: client,
216                            user_agent: self.http_component.user_agent().to_string(),
217                            endpoint,
218                            canonical_endpoint,
219                            auth,
220                            tracing: self.tracing.clone(),
221                        }
222                        .get_all_indexes(&copts)
223                        .await)
224                        {
225                            Ok(r) => r,
226                            Err(e) => {
227                                return Err(ErrorKind::Query(e).into());
228                            }
229                        };
230
231                        Ok(res)
232                    },
233                )
234                .await
235        })
236        .await
237    }
238
239    pub async fn create_primary_index(
240        &self,
241        opts: &CreatePrimaryIndexOptions<'_>,
242    ) -> error::Result<()> {
243        let retry_info = RetryRequest::new("query_create_primary_index", false);
244
245        let retry = opts.retry_strategy.clone();
246        let endpoint = opts.endpoint.clone();
247        let copts = opts.into();
248
249        self.orchestrate_no_res_mgmt_call(
250            retry,
251            retry_info,
252            endpoint.map(|e| e.to_string()),
253            async |query| {
254                query
255                    .create_primary_index(&copts)
256                    .await
257                    .map_err(|e| ErrorKind::Query(e).into())
258            },
259        )
260        .await
261    }
262
263    pub async fn create_index(&self, opts: &CreateIndexOptions<'_>) -> error::Result<()> {
264        let retry_info = RetryRequest::new("query_create_index", false);
265
266        let retry = opts.retry_strategy.clone();
267        let endpoint = opts.endpoint.clone();
268        let copts = opts.into();
269
270        self.orchestrate_no_res_mgmt_call(
271            retry,
272            retry_info,
273            endpoint.map(|e| e.to_string()),
274            async |query| {
275                query
276                    .create_index(&copts)
277                    .await
278                    .map_err(|e| ErrorKind::Query(e).into())
279            },
280        )
281        .await
282    }
283
284    pub async fn drop_primary_index(
285        &self,
286        opts: &DropPrimaryIndexOptions<'_>,
287    ) -> error::Result<()> {
288        let retry_info = RetryRequest::new("query_drop_primary_index", false);
289
290        let retry = opts.retry_strategy.clone();
291        let endpoint = opts.endpoint.clone();
292        let copts = opts.into();
293
294        self.orchestrate_no_res_mgmt_call(
295            retry,
296            retry_info,
297            endpoint.map(|e| e.to_string()),
298            async |query| {
299                query
300                    .drop_primary_index(&copts)
301                    .await
302                    .map_err(|e| ErrorKind::Query(e).into())
303            },
304        )
305        .await
306    }
307
308    pub async fn drop_index(&self, opts: &DropIndexOptions<'_>) -> error::Result<()> {
309        let retry_info = RetryRequest::new("query_drop_index", false);
310
311        let retry = opts.retry_strategy.clone();
312        let endpoint = opts.endpoint.clone();
313        let copts = opts.into();
314
315        self.orchestrate_no_res_mgmt_call(
316            retry,
317            retry_info,
318            endpoint.map(|e| e.to_string()),
319            async |query| {
320                query
321                    .drop_index(&copts)
322                    .await
323                    .map_err(|e| ErrorKind::Query(e).into())
324            },
325        )
326        .await
327    }
328
329    pub async fn build_deferred_indexes(
330        &self,
331        opts: &BuildDeferredIndexesOptions<'_>,
332    ) -> error::Result<()> {
333        let retry_info = RetryRequest::new("query_build_deferred_indexes", false);
334
335        let retry = opts.retry_strategy.clone();
336        let endpoint = opts.endpoint.clone();
337        let copts = opts.into();
338
339        self.orchestrate_no_res_mgmt_call(
340            retry,
341            retry_info,
342            endpoint.map(|e| e.to_string()),
343            async |query| {
344                query
345                    .build_deferred_indexes(&copts)
346                    .await
347                    .map_err(|e| ErrorKind::Query(e).into())
348            },
349        )
350        .await
351    }
352
353    pub async fn watch_indexes(&self, opts: &WatchIndexesOptions<'_>) -> error::Result<()> {
354        let retry_info = RetryRequest::new("query_watch_indexes", true);
355
356        let retry = opts.retry_strategy.clone();
357        let endpoint = opts.endpoint.clone();
358        let copts = opts.into();
359
360        self.orchestrate_no_res_mgmt_call(
361            retry,
362            retry_info,
363            endpoint.map(|e| e.to_string()),
364            async |query| {
365                query
366                    .watch_indexes(&copts)
367                    .await
368                    .map_err(|e| ErrorKind::Query(e).into())
369            },
370        )
371        .await
372    }
373
374    pub async fn ensure_index(&self, opts: &EnsureIndexOptions<'_>) -> error::Result<()> {
375        let mut helper = EnsureIndexHelper::new(
376            self.http_component.user_agent(),
377            opts.index_name,
378            opts.bucket_name,
379            opts.scope_name,
380            opts.collection_name,
381            opts.on_behalf_of_info,
382        );
383
384        let backoff = ExponentialBackoffCalculator::new(
385            Duration::from_millis(100),
386            Duration::from_millis(1000),
387            1.5,
388        );
389
390        self.http_component
391            .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
392                helper
393                    .clone()
394                    .poll(&EnsureIndexPollOptions {
395                        client,
396                        targets,
397                        desired_state: opts.desired_state,
398                    })
399                    .await
400                    .map_err(error::Error::from)
401            })
402            .await
403    }
404
405    pub async fn ping_all_endpoints(
406        &self,
407        on_behalf_of: Option<&httpx::request::OnBehalfOfInfo>,
408    ) -> error::Result<Vec<error::Result<()>>> {
409        let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
410
411        let copts = PingOptions { on_behalf_of };
412
413        let mut handles = Vec::with_capacity(targets.len());
414        let user_agent = self.http_component.user_agent().to_string();
415        for target in targets {
416            let user_agent = user_agent.clone();
417            let client = Query::<C> {
418                http_client: client.clone(),
419                user_agent,
420                endpoint: target.endpoint,
421                canonical_endpoint: target.canonical_endpoint,
422                auth: target.auth,
423                tracing: self.tracing.clone(),
424            };
425
426            let handle = self.ping_one(client, copts.clone());
427
428            handles.push(handle);
429        }
430
431        let results = join_all(handles).await;
432
433        Ok(results)
434    }
435
436    pub async fn create_ping_report(
437        &self,
438        opts: PingQueryReportOptions<'_>,
439    ) -> error::Result<Vec<EndpointPingReport>> {
440        let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
441
442        let copts = PingOptions {
443            on_behalf_of: opts.on_behalf_of,
444        };
445        let timeout = opts.timeout;
446
447        let mut handles = Vec::with_capacity(targets.len());
448        let user_agent = self.http_component.user_agent().to_string();
449        for target in targets {
450            let user_agent = user_agent.clone();
451            let client = Query::<C> {
452                http_client: client.clone(),
453                user_agent,
454                endpoint: target.endpoint,
455                canonical_endpoint: target.canonical_endpoint,
456                auth: target.auth,
457                tracing: self.tracing.clone(),
458            };
459
460            let handle = self.create_one_report(client, timeout, copts.clone());
461
462            handles.push(handle);
463        }
464
465        let reports = join_all(handles).await;
466
467        Ok(reports)
468    }
469
470    async fn ping_one(&self, client: Query<C>, opts: PingOptions<'_>) -> error::Result<()> {
471        client.ping(&opts).await.map_err(error::Error::from)
472    }
473
474    async fn create_one_report(
475        &self,
476        client: Query<C>,
477        timeout: Duration,
478        opts: PingOptions<'_>,
479    ) -> EndpointPingReport {
480        let start = std::time::Instant::now();
481
482        let res = select! {
483            e = tokio::time::sleep(timeout) => {
484                return EndpointPingReport {
485                    remote: client.endpoint,
486                    error: None,
487                    latency: std::time::Instant::now().sub(start),
488                    id: None,
489                    namespace: None,
490                    state: PingState::Timeout,
491                }
492            }
493            r = client.ping(&opts) => r.map_err(error::Error::from),
494        };
495        let end = std::time::Instant::now();
496
497        let (error, state) = match res {
498            Ok(_) => (None, PingState::Ok),
499            Err(e) => (Some(e), PingState::Error),
500        };
501
502        EndpointPingReport {
503            remote: client.endpoint,
504            error,
505            latency: end.sub(start),
506            id: None,
507            namespace: None,
508            state,
509        }
510    }
511
512    async fn orchestrate_no_res_mgmt_call<Fut>(
513        &self,
514        retry_strategy: Arc<dyn RetryStrategy>,
515        retry_info: RetryRequest,
516        endpoint: Option<String>,
517        operation: impl Fn(Query<C>) -> Fut + Send + Sync,
518    ) -> error::Result<()>
519    where
520        Fut: Future<Output = error::Result<()>> + Send,
521        C: Client,
522    {
523        orchestrate_retries(
524            self.retry_manager.clone(),
525            retry_strategy,
526            retry_info,
527            async || {
528                self.http_component
529                    .orchestrate_endpoint(
530                        endpoint.clone(),
531                        async |client: Arc<C>,
532                               endpoint_id: String,
533                               endpoint: String,
534                               canonical_endpoint: String,
535                               auth: Auth| {
536                            operation(Query::<C> {
537                                http_client: client,
538                                user_agent: self.http_component.user_agent().to_string(),
539                                endpoint,
540                                canonical_endpoint,
541                                auth,
542                                tracing: self.tracing.clone(),
543                            })
544                            .await
545                        },
546                    )
547                    .await
548            },
549        )
550        .await
551    }
552}