Skip to main content

couchbase_core/
querycomponent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::authenticator::Authenticator;
20use crate::componentconfigs::NetworkAndCanonicalEndpoint;
21use crate::diagnosticscomponent::PingQueryReportOptions;
22use crate::error::ErrorKind;
23use crate::httpcomponent::{HttpComponent, HttpComponentState};
24use crate::httpx::client::Client;
25use crate::httpx::request::Auth;
26use crate::mgmtx::node_target::NodeTarget;
27use crate::options::query::{
28    BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions,
29    DropPrimaryIndexOptions, EnsureIndexOptions, GetAllIndexesOptions, QueryOptions,
30    WatchIndexesOptions,
31};
32use crate::queryx::ensure_index_helper::EnsureIndexHelper;
33use crate::queryx::index::Index;
34use crate::queryx::preparedquery::{PreparedQuery, PreparedStatementCache};
35use crate::queryx::query::Query;
36use crate::queryx::query_options::{EnsureIndexPollOptions, PingOptions};
37use crate::results::pingreport::{EndpointPingReport, PingState};
38use crate::results::query::QueryResultStream;
39use crate::retry::{orchestrate_retries, RetryManager, RetryRequest, RetryStrategy};
40use crate::retrybesteffort::ExponentialBackoffCalculator;
41use crate::service_type::ServiceType;
42use crate::tracingcomponent::TracingComponent;
43use crate::{error, httpx};
44use futures::future::join_all;
45use futures::StreamExt;
46use std::collections::HashMap;
47use std::future::Future;
48use std::ops::Sub;
49use std::sync::{Arc, Mutex};
50use std::time::Duration;
51use tokio::select;
52
53pub(crate) struct QueryComponent<C: Client> {
54    http_component: HttpComponent<C>,
55    tracing: Arc<TracingComponent>,
56
57    retry_manager: Arc<RetryManager>,
58    prepared_cache: Arc<Mutex<PreparedStatementCache>>,
59}
60
61pub(crate) struct QueryComponentConfig {
62    pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
63    pub authenticator: Authenticator,
64}
65
66pub(crate) struct QueryComponentOptions {
67    pub user_agent: String,
68}
69
70impl<C: Client + 'static> QueryComponent<C> {
71    pub fn new(
72        retry_manager: Arc<RetryManager>,
73        http_client: Arc<C>,
74        tracing: Arc<TracingComponent>,
75        config: QueryComponentConfig,
76        opts: QueryComponentOptions,
77    ) -> Self {
78        Self {
79            http_component: HttpComponent::new(
80                ServiceType::QUERY,
81                opts.user_agent,
82                http_client,
83                HttpComponentState::new(config.endpoints, config.authenticator),
84            ),
85            tracing,
86            retry_manager,
87            prepared_cache: Arc::new(Mutex::new(PreparedStatementCache::default())),
88        }
89    }
90
91    pub fn reconfigure(&self, config: QueryComponentConfig) {
92        self.http_component.reconfigure(HttpComponentState::new(
93            config.endpoints,
94            config.authenticator,
95        ))
96    }
97
98    pub async fn query(&self, opts: QueryOptions) -> error::Result<QueryResultStream> {
99        let retry_info = RetryRequest::new("query", opts.read_only.unwrap_or_default());
100
101        let retry = opts.retry_strategy.clone();
102        let endpoint = opts.endpoint.clone();
103        let copts = opts.into();
104
105        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
106            self.http_component
107                .orchestrate_endpoint(
108                    endpoint.clone(),
109                    async |client: Arc<C>,
110                           endpoint_id: String,
111                           endpoint: String,
112                           canonical_endpoint: String,
113                           auth: Auth| {
114                        let res = match (Query::<C> {
115                            http_client: client,
116                            user_agent: self.http_component.user_agent().to_string(),
117                            endpoint: endpoint.clone(),
118                            canonical_endpoint,
119                            auth,
120                            tracing: self.tracing.clone(),
121                        }
122                        .query(&copts)
123                        .await)
124                        {
125                            Ok(r) => r,
126                            Err(e) => return Err(ErrorKind::Query(e).into()),
127                        };
128
129                        Ok(QueryResultStream {
130                            inner: res,
131                            endpoint,
132                        })
133                    },
134                )
135                .await
136        })
137        .await
138    }
139
140    pub async fn prepared_query(&self, opts: QueryOptions) -> error::Result<QueryResultStream> {
141        let retry_info = RetryRequest::new("prepared_query", opts.read_only.unwrap_or_default());
142
143        let retry = opts.retry_strategy.clone();
144        let endpoint = opts.endpoint.clone();
145        let copts = opts.into();
146
147        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
148            self.http_component
149                .orchestrate_endpoint(
150                    endpoint.clone(),
151                    async |client: Arc<C>,
152                           endpoint_id: String,
153                           endpoint: String,
154                           canonical_endpoint: String,
155                           auth: Auth| {
156                        let res = match (PreparedQuery {
157                            executor: Query::<C> {
158                                http_client: client,
159                                user_agent: self.http_component.user_agent().to_string(),
160                                endpoint: endpoint.clone(),
161                                canonical_endpoint,
162                                auth,
163                                tracing: self.tracing.clone(),
164                            },
165                            cache: self.prepared_cache.clone(),
166                        }
167                        .prepared_query(&copts)
168                        .await)
169                        {
170                            Ok(r) => r,
171                            Err(e) => return Err(ErrorKind::Query(e).into()),
172                        };
173
174                        Ok(QueryResultStream {
175                            inner: res,
176                            endpoint,
177                        })
178                    },
179                )
180                .await
181        })
182        .await
183    }
184
185    pub async fn get_all_indexes(
186        &self,
187        opts: &GetAllIndexesOptions<'_>,
188    ) -> error::Result<Vec<Index>> {
189        let retry_info = RetryRequest::new("query_get_all_indexes", true);
190
191        let retry = opts.retry_strategy.clone();
192        let endpoint = opts.endpoint.clone();
193        let copts = opts.into();
194
195        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
196            self.http_component
197                .orchestrate_endpoint(
198                    endpoint.clone(),
199                    async |client: Arc<C>,
200                           endpoint_id: String,
201                           endpoint: String,
202                           canonical_endpoint: String,
203                           auth: Auth| {
204                        let res = match (Query::<C> {
205                            http_client: client,
206                            user_agent: self.http_component.user_agent().to_string(),
207                            endpoint,
208                            canonical_endpoint,
209                            auth,
210                            tracing: self.tracing.clone(),
211                        }
212                        .get_all_indexes(&copts)
213                        .await)
214                        {
215                            Ok(r) => r,
216                            Err(e) => {
217                                return Err(ErrorKind::Query(e).into());
218                            }
219                        };
220
221                        Ok(res)
222                    },
223                )
224                .await
225        })
226        .await
227    }
228
229    pub async fn create_primary_index(
230        &self,
231        opts: &CreatePrimaryIndexOptions<'_>,
232    ) -> error::Result<()> {
233        let retry_info = RetryRequest::new("query_create_primary_index", false);
234
235        let retry = opts.retry_strategy.clone();
236        let endpoint = opts.endpoint.clone();
237        let copts = opts.into();
238
239        self.orchestrate_no_res_mgmt_call(
240            retry,
241            retry_info,
242            endpoint.map(|e| e.to_string()),
243            async |query| {
244                query
245                    .create_primary_index(&copts)
246                    .await
247                    .map_err(|e| ErrorKind::Query(e).into())
248            },
249        )
250        .await
251    }
252
253    pub async fn create_index(&self, opts: &CreateIndexOptions<'_>) -> error::Result<()> {
254        let retry_info = RetryRequest::new("query_create_index", false);
255
256        let retry = opts.retry_strategy.clone();
257        let endpoint = opts.endpoint.clone();
258        let copts = opts.into();
259
260        self.orchestrate_no_res_mgmt_call(
261            retry,
262            retry_info,
263            endpoint.map(|e| e.to_string()),
264            async |query| {
265                query
266                    .create_index(&copts)
267                    .await
268                    .map_err(|e| ErrorKind::Query(e).into())
269            },
270        )
271        .await
272    }
273
274    pub async fn drop_primary_index(
275        &self,
276        opts: &DropPrimaryIndexOptions<'_>,
277    ) -> error::Result<()> {
278        let retry_info = RetryRequest::new("query_drop_primary_index", false);
279
280        let retry = opts.retry_strategy.clone();
281        let endpoint = opts.endpoint.clone();
282        let copts = opts.into();
283
284        self.orchestrate_no_res_mgmt_call(
285            retry,
286            retry_info,
287            endpoint.map(|e| e.to_string()),
288            async |query| {
289                query
290                    .drop_primary_index(&copts)
291                    .await
292                    .map_err(|e| ErrorKind::Query(e).into())
293            },
294        )
295        .await
296    }
297
298    pub async fn drop_index(&self, opts: &DropIndexOptions<'_>) -> error::Result<()> {
299        let retry_info = RetryRequest::new("query_drop_index", false);
300
301        let retry = opts.retry_strategy.clone();
302        let endpoint = opts.endpoint.clone();
303        let copts = opts.into();
304
305        self.orchestrate_no_res_mgmt_call(
306            retry,
307            retry_info,
308            endpoint.map(|e| e.to_string()),
309            async |query| {
310                query
311                    .drop_index(&copts)
312                    .await
313                    .map_err(|e| ErrorKind::Query(e).into())
314            },
315        )
316        .await
317    }
318
319    pub async fn build_deferred_indexes(
320        &self,
321        opts: &BuildDeferredIndexesOptions<'_>,
322    ) -> error::Result<()> {
323        let retry_info = RetryRequest::new("query_build_deferred_indexes", false);
324
325        let retry = opts.retry_strategy.clone();
326        let endpoint = opts.endpoint.clone();
327        let copts = opts.into();
328
329        self.orchestrate_no_res_mgmt_call(
330            retry,
331            retry_info,
332            endpoint.map(|e| e.to_string()),
333            async |query| {
334                query
335                    .build_deferred_indexes(&copts)
336                    .await
337                    .map_err(|e| ErrorKind::Query(e).into())
338            },
339        )
340        .await
341    }
342
343    pub async fn watch_indexes(&self, opts: &WatchIndexesOptions<'_>) -> error::Result<()> {
344        let retry_info = RetryRequest::new("query_watch_indexes", true);
345
346        let retry = opts.retry_strategy.clone();
347        let endpoint = opts.endpoint.clone();
348        let copts = opts.into();
349
350        self.orchestrate_no_res_mgmt_call(
351            retry,
352            retry_info,
353            endpoint.map(|e| e.to_string()),
354            async |query| {
355                query
356                    .watch_indexes(&copts)
357                    .await
358                    .map_err(|e| ErrorKind::Query(e).into())
359            },
360        )
361        .await
362    }
363
364    pub async fn ensure_index(&self, opts: &EnsureIndexOptions<'_>) -> error::Result<()> {
365        let mut helper = EnsureIndexHelper::new(
366            self.http_component.user_agent(),
367            opts.index_name,
368            opts.bucket_name,
369            opts.scope_name,
370            opts.collection_name,
371            opts.on_behalf_of_info,
372        );
373
374        let backoff = ExponentialBackoffCalculator::new(
375            Duration::from_millis(100),
376            Duration::from_millis(1000),
377            1.5,
378        );
379
380        self.http_component
381            .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
382                helper
383                    .clone()
384                    .poll(&EnsureIndexPollOptions {
385                        client,
386                        targets,
387                        desired_state: opts.desired_state,
388                    })
389                    .await
390                    .map_err(error::Error::from)
391            })
392            .await
393    }
394
395    pub async fn ping_all_endpoints(
396        &self,
397        on_behalf_of: Option<&httpx::request::OnBehalfOfInfo>,
398    ) -> error::Result<Vec<error::Result<()>>> {
399        let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
400
401        let copts = PingOptions { on_behalf_of };
402
403        let mut handles = Vec::with_capacity(targets.len());
404        let user_agent = self.http_component.user_agent().to_string();
405        for target in targets {
406            let user_agent = user_agent.clone();
407            let client = Query::<C> {
408                http_client: client.clone(),
409                user_agent,
410                endpoint: target.endpoint,
411                canonical_endpoint: target.canonical_endpoint,
412                auth: target.auth,
413                tracing: self.tracing.clone(),
414            };
415
416            let handle = self.ping_one(client, copts.clone());
417
418            handles.push(handle);
419        }
420
421        let results = join_all(handles).await;
422
423        Ok(results)
424    }
425
426    pub async fn create_ping_report(
427        &self,
428        opts: PingQueryReportOptions<'_>,
429    ) -> error::Result<Vec<EndpointPingReport>> {
430        let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
431
432        let copts = PingOptions {
433            on_behalf_of: opts.on_behalf_of,
434        };
435        let timeout = opts.timeout;
436
437        let mut handles = Vec::with_capacity(targets.len());
438        let user_agent = self.http_component.user_agent().to_string();
439        for target in targets {
440            let user_agent = user_agent.clone();
441            let client = Query::<C> {
442                http_client: client.clone(),
443                user_agent,
444                endpoint: target.endpoint,
445                canonical_endpoint: target.canonical_endpoint,
446                auth: target.auth,
447                tracing: self.tracing.clone(),
448            };
449
450            let handle = self.create_one_report(client, timeout, copts.clone());
451
452            handles.push(handle);
453        }
454
455        let reports = join_all(handles).await;
456
457        Ok(reports)
458    }
459
460    async fn ping_one(&self, client: Query<C>, opts: PingOptions<'_>) -> error::Result<()> {
461        client.ping(&opts).await.map_err(error::Error::from)
462    }
463
464    async fn create_one_report(
465        &self,
466        client: Query<C>,
467        timeout: Duration,
468        opts: PingOptions<'_>,
469    ) -> EndpointPingReport {
470        let start = std::time::Instant::now();
471
472        let res = select! {
473            e = tokio::time::sleep(timeout) => {
474                return EndpointPingReport {
475                    remote: client.endpoint,
476                    error: None,
477                    latency: std::time::Instant::now().sub(start),
478                    id: None,
479                    namespace: None,
480                    state: PingState::Timeout,
481                }
482            }
483            r = client.ping(&opts) => r.map_err(error::Error::from),
484        };
485        let end = std::time::Instant::now();
486
487        let (error, state) = match res {
488            Ok(_) => (None, PingState::Ok),
489            Err(e) => (Some(e), PingState::Error),
490        };
491
492        EndpointPingReport {
493            remote: client.endpoint,
494            error,
495            latency: end.sub(start),
496            id: None,
497            namespace: None,
498            state,
499        }
500    }
501
502    async fn orchestrate_no_res_mgmt_call<Fut>(
503        &self,
504        retry_strategy: Arc<dyn RetryStrategy>,
505        retry_info: RetryRequest,
506        endpoint: Option<String>,
507        operation: impl Fn(Query<C>) -> Fut + Send + Sync,
508    ) -> error::Result<()>
509    where
510        Fut: Future<Output = error::Result<()>> + Send,
511        C: Client,
512    {
513        orchestrate_retries(
514            self.retry_manager.clone(),
515            retry_strategy,
516            retry_info,
517            async || {
518                self.http_component
519                    .orchestrate_endpoint(
520                        endpoint.clone(),
521                        async |client: Arc<C>,
522                               endpoint_id: String,
523                               endpoint: String,
524                               canonical_endpoint: String,
525                               auth: Auth| {
526                            operation(Query::<C> {
527                                http_client: client,
528                                user_agent: self.http_component.user_agent().to_string(),
529                                endpoint,
530                                canonical_endpoint,
531                                auth,
532                                tracing: self.tracing.clone(),
533                            })
534                            .await
535                        },
536                    )
537                    .await
538            },
539        )
540        .await
541    }
542}