1use crate::authenticator::Authenticator;
20use crate::componentconfigs::NetworkAndCanonicalEndpoint;
21use crate::diagnosticscomponent::PingQueryReportOptions;
22use crate::error::ErrorKind;
23use crate::httpcomponent::{HttpComponent, HttpComponentState};
24use crate::httpx::client::Client;
25use crate::httpx::request::Auth;
26use crate::mgmtx::node_target::NodeTarget;
27use crate::options::query::{
28 BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions,
29 DropPrimaryIndexOptions, EnsureIndexOptions, GetAllIndexesOptions, QueryOptions,
30 WatchIndexesOptions,
31};
32use crate::queryx::ensure_index_helper::EnsureIndexHelper;
33use crate::queryx::index::Index;
34use crate::queryx::preparedquery::{PreparedQuery, PreparedStatementCache};
35use crate::queryx::query::Query;
36use crate::queryx::query_options::{EnsureIndexPollOptions, PingOptions};
37use crate::results::pingreport::{EndpointPingReport, PingState};
38use crate::results::query::QueryResultStream;
39use crate::retry::{orchestrate_retries, RetryManager, RetryRequest, RetryStrategy};
40use crate::retrybesteffort::ExponentialBackoffCalculator;
41use crate::service_type::ServiceType;
42use crate::tracingcomponent::TracingComponent;
43use crate::{error, httpx};
44use futures::future::join_all;
45use futures::StreamExt;
46use std::collections::HashMap;
47use std::future::Future;
48use std::ops::Sub;
49use std::sync::{Arc, Mutex};
50use std::time::Duration;
51use tokio::select;
52use tracing::debug;
53
54pub(crate) struct QueryComponent<C: Client> {
55 id: String,
56 http_component: HttpComponent<C>,
57 tracing: Arc<TracingComponent>,
58
59 retry_manager: Arc<RetryManager>,
60 prepared_cache: Arc<Mutex<PreparedStatementCache>>,
61}
62
63pub(crate) struct QueryComponentConfig {
64 pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
65 pub authenticator: Authenticator,
66}
67
68pub(crate) struct QueryComponentOptions {
69 pub id: String,
70 pub user_agent: String,
71}
72
73impl<C: Client + 'static> QueryComponent<C> {
74 pub fn new(
75 retry_manager: Arc<RetryManager>,
76 http_client: Arc<C>,
77 tracing: Arc<TracingComponent>,
78 config: QueryComponentConfig,
79 opts: QueryComponentOptions,
80 ) -> Self {
81 Self {
82 id: opts.id,
83 http_component: HttpComponent::new(
84 ServiceType::QUERY,
85 opts.user_agent,
86 http_client,
87 HttpComponentState::new(config.endpoints, config.authenticator),
88 ),
89 tracing,
90 retry_manager,
91 prepared_cache: Arc::new(Mutex::new(PreparedStatementCache::default())),
92 }
93 }
94
95 pub fn reconfigure(&self, config: QueryComponentConfig) {
96 debug!(
97 "Query component {} updating endpoints to {:?}",
98 self.id,
99 &config.endpoints.keys().collect::<Vec<_>>()
100 );
101
102 self.http_component.reconfigure(HttpComponentState::new(
103 config.endpoints,
104 config.authenticator,
105 ))
106 }
107
108 pub async fn query(&self, opts: QueryOptions) -> error::Result<QueryResultStream> {
109 let retry_info = RetryRequest::new("query", opts.read_only.unwrap_or_default());
110
111 let retry = opts.retry_strategy.clone();
112 let endpoint = opts.endpoint.clone();
113 let copts = opts.into();
114
115 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
116 self.http_component
117 .orchestrate_endpoint(
118 endpoint.clone(),
119 async |client: Arc<C>,
120 endpoint_id: String,
121 endpoint: String,
122 canonical_endpoint: String,
123 auth: Auth| {
124 let res = match (Query::<C> {
125 http_client: client,
126 user_agent: self.http_component.user_agent().to_string(),
127 endpoint: endpoint.clone(),
128 canonical_endpoint,
129 auth,
130 tracing: self.tracing.clone(),
131 }
132 .query(&copts)
133 .await)
134 {
135 Ok(r) => r,
136 Err(e) => return Err(ErrorKind::Query(e).into()),
137 };
138
139 Ok(QueryResultStream {
140 inner: res,
141 endpoint,
142 })
143 },
144 )
145 .await
146 })
147 .await
148 }
149
150 pub async fn prepared_query(&self, opts: QueryOptions) -> error::Result<QueryResultStream> {
151 let retry_info = RetryRequest::new("prepared_query", opts.read_only.unwrap_or_default());
152
153 let retry = opts.retry_strategy.clone();
154 let endpoint = opts.endpoint.clone();
155 let copts = opts.into();
156
157 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
158 self.http_component
159 .orchestrate_endpoint(
160 endpoint.clone(),
161 async |client: Arc<C>,
162 endpoint_id: String,
163 endpoint: String,
164 canonical_endpoint: String,
165 auth: Auth| {
166 let res = match (PreparedQuery {
167 executor: Query::<C> {
168 http_client: client,
169 user_agent: self.http_component.user_agent().to_string(),
170 endpoint: endpoint.clone(),
171 canonical_endpoint,
172 auth,
173 tracing: self.tracing.clone(),
174 },
175 cache: self.prepared_cache.clone(),
176 }
177 .prepared_query(&copts)
178 .await)
179 {
180 Ok(r) => r,
181 Err(e) => return Err(ErrorKind::Query(e).into()),
182 };
183
184 Ok(QueryResultStream {
185 inner: res,
186 endpoint,
187 })
188 },
189 )
190 .await
191 })
192 .await
193 }
194
195 pub async fn get_all_indexes(
196 &self,
197 opts: &GetAllIndexesOptions<'_>,
198 ) -> error::Result<Vec<Index>> {
199 let retry_info = RetryRequest::new("query_get_all_indexes", true);
200
201 let retry = opts.retry_strategy.clone();
202 let endpoint = opts.endpoint.clone();
203 let copts = opts.into();
204
205 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
206 self.http_component
207 .orchestrate_endpoint(
208 endpoint.clone(),
209 async |client: Arc<C>,
210 endpoint_id: String,
211 endpoint: String,
212 canonical_endpoint: String,
213 auth: Auth| {
214 let res = match (Query::<C> {
215 http_client: client,
216 user_agent: self.http_component.user_agent().to_string(),
217 endpoint,
218 canonical_endpoint,
219 auth,
220 tracing: self.tracing.clone(),
221 }
222 .get_all_indexes(&copts)
223 .await)
224 {
225 Ok(r) => r,
226 Err(e) => {
227 return Err(ErrorKind::Query(e).into());
228 }
229 };
230
231 Ok(res)
232 },
233 )
234 .await
235 })
236 .await
237 }
238
239 pub async fn create_primary_index(
240 &self,
241 opts: &CreatePrimaryIndexOptions<'_>,
242 ) -> error::Result<()> {
243 let retry_info = RetryRequest::new("query_create_primary_index", false);
244
245 let retry = opts.retry_strategy.clone();
246 let endpoint = opts.endpoint.clone();
247 let copts = opts.into();
248
249 self.orchestrate_no_res_mgmt_call(
250 retry,
251 retry_info,
252 endpoint.map(|e| e.to_string()),
253 async |query| {
254 query
255 .create_primary_index(&copts)
256 .await
257 .map_err(|e| ErrorKind::Query(e).into())
258 },
259 )
260 .await
261 }
262
263 pub async fn create_index(&self, opts: &CreateIndexOptions<'_>) -> error::Result<()> {
264 let retry_info = RetryRequest::new("query_create_index", false);
265
266 let retry = opts.retry_strategy.clone();
267 let endpoint = opts.endpoint.clone();
268 let copts = opts.into();
269
270 self.orchestrate_no_res_mgmt_call(
271 retry,
272 retry_info,
273 endpoint.map(|e| e.to_string()),
274 async |query| {
275 query
276 .create_index(&copts)
277 .await
278 .map_err(|e| ErrorKind::Query(e).into())
279 },
280 )
281 .await
282 }
283
284 pub async fn drop_primary_index(
285 &self,
286 opts: &DropPrimaryIndexOptions<'_>,
287 ) -> error::Result<()> {
288 let retry_info = RetryRequest::new("query_drop_primary_index", false);
289
290 let retry = opts.retry_strategy.clone();
291 let endpoint = opts.endpoint.clone();
292 let copts = opts.into();
293
294 self.orchestrate_no_res_mgmt_call(
295 retry,
296 retry_info,
297 endpoint.map(|e| e.to_string()),
298 async |query| {
299 query
300 .drop_primary_index(&copts)
301 .await
302 .map_err(|e| ErrorKind::Query(e).into())
303 },
304 )
305 .await
306 }
307
308 pub async fn drop_index(&self, opts: &DropIndexOptions<'_>) -> error::Result<()> {
309 let retry_info = RetryRequest::new("query_drop_index", false);
310
311 let retry = opts.retry_strategy.clone();
312 let endpoint = opts.endpoint.clone();
313 let copts = opts.into();
314
315 self.orchestrate_no_res_mgmt_call(
316 retry,
317 retry_info,
318 endpoint.map(|e| e.to_string()),
319 async |query| {
320 query
321 .drop_index(&copts)
322 .await
323 .map_err(|e| ErrorKind::Query(e).into())
324 },
325 )
326 .await
327 }
328
329 pub async fn build_deferred_indexes(
330 &self,
331 opts: &BuildDeferredIndexesOptions<'_>,
332 ) -> error::Result<()> {
333 let retry_info = RetryRequest::new("query_build_deferred_indexes", false);
334
335 let retry = opts.retry_strategy.clone();
336 let endpoint = opts.endpoint.clone();
337 let copts = opts.into();
338
339 self.orchestrate_no_res_mgmt_call(
340 retry,
341 retry_info,
342 endpoint.map(|e| e.to_string()),
343 async |query| {
344 query
345 .build_deferred_indexes(&copts)
346 .await
347 .map_err(|e| ErrorKind::Query(e).into())
348 },
349 )
350 .await
351 }
352
353 pub async fn watch_indexes(&self, opts: &WatchIndexesOptions<'_>) -> error::Result<()> {
354 let retry_info = RetryRequest::new("query_watch_indexes", true);
355
356 let retry = opts.retry_strategy.clone();
357 let endpoint = opts.endpoint.clone();
358 let copts = opts.into();
359
360 self.orchestrate_no_res_mgmt_call(
361 retry,
362 retry_info,
363 endpoint.map(|e| e.to_string()),
364 async |query| {
365 query
366 .watch_indexes(&copts)
367 .await
368 .map_err(|e| ErrorKind::Query(e).into())
369 },
370 )
371 .await
372 }
373
374 pub async fn ensure_index(&self, opts: &EnsureIndexOptions<'_>) -> error::Result<()> {
375 let mut helper = EnsureIndexHelper::new(
376 self.http_component.user_agent(),
377 opts.index_name,
378 opts.bucket_name,
379 opts.scope_name,
380 opts.collection_name,
381 opts.on_behalf_of_info,
382 );
383
384 let backoff = ExponentialBackoffCalculator::new(
385 Duration::from_millis(100),
386 Duration::from_millis(1000),
387 1.5,
388 );
389
390 self.http_component
391 .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
392 helper
393 .clone()
394 .poll(&EnsureIndexPollOptions {
395 client,
396 targets,
397 desired_state: opts.desired_state,
398 })
399 .await
400 .map_err(error::Error::from)
401 })
402 .await
403 }
404
405 pub async fn ping_all_endpoints(
406 &self,
407 on_behalf_of: Option<&httpx::request::OnBehalfOfInfo>,
408 ) -> error::Result<Vec<error::Result<()>>> {
409 let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
410
411 let copts = PingOptions { on_behalf_of };
412
413 let mut handles = Vec::with_capacity(targets.len());
414 let user_agent = self.http_component.user_agent().to_string();
415 for target in targets {
416 let user_agent = user_agent.clone();
417 let client = Query::<C> {
418 http_client: client.clone(),
419 user_agent,
420 endpoint: target.endpoint,
421 canonical_endpoint: target.canonical_endpoint,
422 auth: target.auth,
423 tracing: self.tracing.clone(),
424 };
425
426 let handle = self.ping_one(client, copts.clone());
427
428 handles.push(handle);
429 }
430
431 let results = join_all(handles).await;
432
433 Ok(results)
434 }
435
436 pub async fn create_ping_report(
437 &self,
438 opts: PingQueryReportOptions<'_>,
439 ) -> error::Result<Vec<EndpointPingReport>> {
440 let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
441
442 let copts = PingOptions {
443 on_behalf_of: opts.on_behalf_of,
444 };
445 let timeout = opts.timeout;
446
447 let mut handles = Vec::with_capacity(targets.len());
448 let user_agent = self.http_component.user_agent().to_string();
449 for target in targets {
450 let user_agent = user_agent.clone();
451 let client = Query::<C> {
452 http_client: client.clone(),
453 user_agent,
454 endpoint: target.endpoint,
455 canonical_endpoint: target.canonical_endpoint,
456 auth: target.auth,
457 tracing: self.tracing.clone(),
458 };
459
460 let handle = self.create_one_report(client, timeout, copts.clone());
461
462 handles.push(handle);
463 }
464
465 let reports = join_all(handles).await;
466
467 Ok(reports)
468 }
469
470 async fn ping_one(&self, client: Query<C>, opts: PingOptions<'_>) -> error::Result<()> {
471 client.ping(&opts).await.map_err(error::Error::from)
472 }
473
474 async fn create_one_report(
475 &self,
476 client: Query<C>,
477 timeout: Duration,
478 opts: PingOptions<'_>,
479 ) -> EndpointPingReport {
480 let start = std::time::Instant::now();
481
482 let res = select! {
483 e = tokio::time::sleep(timeout) => {
484 return EndpointPingReport {
485 remote: client.endpoint,
486 error: None,
487 latency: std::time::Instant::now().sub(start),
488 id: None,
489 namespace: None,
490 state: PingState::Timeout,
491 }
492 }
493 r = client.ping(&opts) => r.map_err(error::Error::from),
494 };
495 let end = std::time::Instant::now();
496
497 let (error, state) = match res {
498 Ok(_) => (None, PingState::Ok),
499 Err(e) => (Some(e), PingState::Error),
500 };
501
502 EndpointPingReport {
503 remote: client.endpoint,
504 error,
505 latency: end.sub(start),
506 id: None,
507 namespace: None,
508 state,
509 }
510 }
511
512 async fn orchestrate_no_res_mgmt_call<Fut>(
513 &self,
514 retry_strategy: Arc<dyn RetryStrategy>,
515 retry_info: RetryRequest,
516 endpoint: Option<String>,
517 operation: impl Fn(Query<C>) -> Fut + Send + Sync,
518 ) -> error::Result<()>
519 where
520 Fut: Future<Output = error::Result<()>> + Send,
521 C: Client,
522 {
523 orchestrate_retries(
524 self.retry_manager.clone(),
525 retry_strategy,
526 retry_info,
527 async || {
528 self.http_component
529 .orchestrate_endpoint(
530 endpoint.clone(),
531 async |client: Arc<C>,
532 endpoint_id: String,
533 endpoint: String,
534 canonical_endpoint: String,
535 auth: Auth| {
536 operation(Query::<C> {
537 http_client: client,
538 user_agent: self.http_component.user_agent().to_string(),
539 endpoint,
540 canonical_endpoint,
541 auth,
542 tracing: self.tracing.clone(),
543 })
544 .await
545 },
546 )
547 .await
548 },
549 )
550 .await
551 }
552}