1use crate::authenticator::Authenticator;
20use crate::componentconfigs::NetworkAndCanonicalEndpoint;
21use crate::diagnosticscomponent::PingQueryReportOptions;
22use crate::error::ErrorKind;
23use crate::httpcomponent::{HttpComponent, HttpComponentState};
24use crate::httpx::client::Client;
25use crate::httpx::request::Auth;
26use crate::mgmtx::node_target::NodeTarget;
27use crate::options::query::{
28 BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions,
29 DropPrimaryIndexOptions, EnsureIndexOptions, GetAllIndexesOptions, QueryOptions,
30 WatchIndexesOptions,
31};
32use crate::queryx::ensure_index_helper::EnsureIndexHelper;
33use crate::queryx::index::Index;
34use crate::queryx::preparedquery::{PreparedQuery, PreparedStatementCache};
35use crate::queryx::query::Query;
36use crate::queryx::query_options::{EnsureIndexPollOptions, PingOptions};
37use crate::results::pingreport::{EndpointPingReport, PingState};
38use crate::results::query::QueryResultStream;
39use crate::retry::{orchestrate_retries, RetryManager, RetryRequest, RetryStrategy};
40use crate::retrybesteffort::ExponentialBackoffCalculator;
41use crate::service_type::ServiceType;
42use crate::tracingcomponent::TracingComponent;
43use crate::{error, httpx};
44use futures::future::join_all;
45use futures::StreamExt;
46use std::collections::HashMap;
47use std::future::Future;
48use std::ops::Sub;
49use std::sync::{Arc, Mutex};
50use std::time::Duration;
51use tokio::select;
52
53pub(crate) struct QueryComponent<C: Client> {
54 http_component: HttpComponent<C>,
55 tracing: Arc<TracingComponent>,
56
57 retry_manager: Arc<RetryManager>,
58 prepared_cache: Arc<Mutex<PreparedStatementCache>>,
59}
60
61pub(crate) struct QueryComponentConfig {
62 pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
63 pub authenticator: Authenticator,
64}
65
66pub(crate) struct QueryComponentOptions {
67 pub user_agent: String,
68}
69
70impl<C: Client + 'static> QueryComponent<C> {
71 pub fn new(
72 retry_manager: Arc<RetryManager>,
73 http_client: Arc<C>,
74 tracing: Arc<TracingComponent>,
75 config: QueryComponentConfig,
76 opts: QueryComponentOptions,
77 ) -> Self {
78 Self {
79 http_component: HttpComponent::new(
80 ServiceType::QUERY,
81 opts.user_agent,
82 http_client,
83 HttpComponentState::new(config.endpoints, config.authenticator),
84 ),
85 tracing,
86 retry_manager,
87 prepared_cache: Arc::new(Mutex::new(PreparedStatementCache::default())),
88 }
89 }
90
91 pub fn reconfigure(&self, config: QueryComponentConfig) {
92 self.http_component.reconfigure(HttpComponentState::new(
93 config.endpoints,
94 config.authenticator,
95 ))
96 }
97
98 pub async fn query(&self, opts: QueryOptions) -> error::Result<QueryResultStream> {
99 let retry_info = RetryRequest::new("query", opts.read_only.unwrap_or_default());
100
101 let retry = opts.retry_strategy.clone();
102 let endpoint = opts.endpoint.clone();
103 let copts = opts.into();
104
105 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
106 self.http_component
107 .orchestrate_endpoint(
108 endpoint.clone(),
109 async |client: Arc<C>,
110 endpoint_id: String,
111 endpoint: String,
112 canonical_endpoint: String,
113 auth: Auth| {
114 let res = match (Query::<C> {
115 http_client: client,
116 user_agent: self.http_component.user_agent().to_string(),
117 endpoint: endpoint.clone(),
118 canonical_endpoint,
119 auth,
120 tracing: self.tracing.clone(),
121 }
122 .query(&copts)
123 .await)
124 {
125 Ok(r) => r,
126 Err(e) => return Err(ErrorKind::Query(e).into()),
127 };
128
129 Ok(QueryResultStream {
130 inner: res,
131 endpoint,
132 })
133 },
134 )
135 .await
136 })
137 .await
138 }
139
140 pub async fn prepared_query(&self, opts: QueryOptions) -> error::Result<QueryResultStream> {
141 let retry_info = RetryRequest::new("prepared_query", opts.read_only.unwrap_or_default());
142
143 let retry = opts.retry_strategy.clone();
144 let endpoint = opts.endpoint.clone();
145 let copts = opts.into();
146
147 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
148 self.http_component
149 .orchestrate_endpoint(
150 endpoint.clone(),
151 async |client: Arc<C>,
152 endpoint_id: String,
153 endpoint: String,
154 canonical_endpoint: String,
155 auth: Auth| {
156 let res = match (PreparedQuery {
157 executor: Query::<C> {
158 http_client: client,
159 user_agent: self.http_component.user_agent().to_string(),
160 endpoint: endpoint.clone(),
161 canonical_endpoint,
162 auth,
163 tracing: self.tracing.clone(),
164 },
165 cache: self.prepared_cache.clone(),
166 }
167 .prepared_query(&copts)
168 .await)
169 {
170 Ok(r) => r,
171 Err(e) => return Err(ErrorKind::Query(e).into()),
172 };
173
174 Ok(QueryResultStream {
175 inner: res,
176 endpoint,
177 })
178 },
179 )
180 .await
181 })
182 .await
183 }
184
185 pub async fn get_all_indexes(
186 &self,
187 opts: &GetAllIndexesOptions<'_>,
188 ) -> error::Result<Vec<Index>> {
189 let retry_info = RetryRequest::new("query_get_all_indexes", true);
190
191 let retry = opts.retry_strategy.clone();
192 let endpoint = opts.endpoint.clone();
193 let copts = opts.into();
194
195 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
196 self.http_component
197 .orchestrate_endpoint(
198 endpoint.clone(),
199 async |client: Arc<C>,
200 endpoint_id: String,
201 endpoint: String,
202 canonical_endpoint: String,
203 auth: Auth| {
204 let res = match (Query::<C> {
205 http_client: client,
206 user_agent: self.http_component.user_agent().to_string(),
207 endpoint,
208 canonical_endpoint,
209 auth,
210 tracing: self.tracing.clone(),
211 }
212 .get_all_indexes(&copts)
213 .await)
214 {
215 Ok(r) => r,
216 Err(e) => {
217 return Err(ErrorKind::Query(e).into());
218 }
219 };
220
221 Ok(res)
222 },
223 )
224 .await
225 })
226 .await
227 }
228
229 pub async fn create_primary_index(
230 &self,
231 opts: &CreatePrimaryIndexOptions<'_>,
232 ) -> error::Result<()> {
233 let retry_info = RetryRequest::new("query_create_primary_index", false);
234
235 let retry = opts.retry_strategy.clone();
236 let endpoint = opts.endpoint.clone();
237 let copts = opts.into();
238
239 self.orchestrate_no_res_mgmt_call(
240 retry,
241 retry_info,
242 endpoint.map(|e| e.to_string()),
243 async |query| {
244 query
245 .create_primary_index(&copts)
246 .await
247 .map_err(|e| ErrorKind::Query(e).into())
248 },
249 )
250 .await
251 }
252
253 pub async fn create_index(&self, opts: &CreateIndexOptions<'_>) -> error::Result<()> {
254 let retry_info = RetryRequest::new("query_create_index", false);
255
256 let retry = opts.retry_strategy.clone();
257 let endpoint = opts.endpoint.clone();
258 let copts = opts.into();
259
260 self.orchestrate_no_res_mgmt_call(
261 retry,
262 retry_info,
263 endpoint.map(|e| e.to_string()),
264 async |query| {
265 query
266 .create_index(&copts)
267 .await
268 .map_err(|e| ErrorKind::Query(e).into())
269 },
270 )
271 .await
272 }
273
274 pub async fn drop_primary_index(
275 &self,
276 opts: &DropPrimaryIndexOptions<'_>,
277 ) -> error::Result<()> {
278 let retry_info = RetryRequest::new("query_drop_primary_index", false);
279
280 let retry = opts.retry_strategy.clone();
281 let endpoint = opts.endpoint.clone();
282 let copts = opts.into();
283
284 self.orchestrate_no_res_mgmt_call(
285 retry,
286 retry_info,
287 endpoint.map(|e| e.to_string()),
288 async |query| {
289 query
290 .drop_primary_index(&copts)
291 .await
292 .map_err(|e| ErrorKind::Query(e).into())
293 },
294 )
295 .await
296 }
297
298 pub async fn drop_index(&self, opts: &DropIndexOptions<'_>) -> error::Result<()> {
299 let retry_info = RetryRequest::new("query_drop_index", false);
300
301 let retry = opts.retry_strategy.clone();
302 let endpoint = opts.endpoint.clone();
303 let copts = opts.into();
304
305 self.orchestrate_no_res_mgmt_call(
306 retry,
307 retry_info,
308 endpoint.map(|e| e.to_string()),
309 async |query| {
310 query
311 .drop_index(&copts)
312 .await
313 .map_err(|e| ErrorKind::Query(e).into())
314 },
315 )
316 .await
317 }
318
319 pub async fn build_deferred_indexes(
320 &self,
321 opts: &BuildDeferredIndexesOptions<'_>,
322 ) -> error::Result<()> {
323 let retry_info = RetryRequest::new("query_build_deferred_indexes", false);
324
325 let retry = opts.retry_strategy.clone();
326 let endpoint = opts.endpoint.clone();
327 let copts = opts.into();
328
329 self.orchestrate_no_res_mgmt_call(
330 retry,
331 retry_info,
332 endpoint.map(|e| e.to_string()),
333 async |query| {
334 query
335 .build_deferred_indexes(&copts)
336 .await
337 .map_err(|e| ErrorKind::Query(e).into())
338 },
339 )
340 .await
341 }
342
343 pub async fn watch_indexes(&self, opts: &WatchIndexesOptions<'_>) -> error::Result<()> {
344 let retry_info = RetryRequest::new("query_watch_indexes", true);
345
346 let retry = opts.retry_strategy.clone();
347 let endpoint = opts.endpoint.clone();
348 let copts = opts.into();
349
350 self.orchestrate_no_res_mgmt_call(
351 retry,
352 retry_info,
353 endpoint.map(|e| e.to_string()),
354 async |query| {
355 query
356 .watch_indexes(&copts)
357 .await
358 .map_err(|e| ErrorKind::Query(e).into())
359 },
360 )
361 .await
362 }
363
364 pub async fn ensure_index(&self, opts: &EnsureIndexOptions<'_>) -> error::Result<()> {
365 let mut helper = EnsureIndexHelper::new(
366 self.http_component.user_agent(),
367 opts.index_name,
368 opts.bucket_name,
369 opts.scope_name,
370 opts.collection_name,
371 opts.on_behalf_of_info,
372 );
373
374 let backoff = ExponentialBackoffCalculator::new(
375 Duration::from_millis(100),
376 Duration::from_millis(1000),
377 1.5,
378 );
379
380 self.http_component
381 .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
382 helper
383 .clone()
384 .poll(&EnsureIndexPollOptions {
385 client,
386 targets,
387 desired_state: opts.desired_state,
388 })
389 .await
390 .map_err(error::Error::from)
391 })
392 .await
393 }
394
395 pub async fn ping_all_endpoints(
396 &self,
397 on_behalf_of: Option<&httpx::request::OnBehalfOfInfo>,
398 ) -> error::Result<Vec<error::Result<()>>> {
399 let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
400
401 let copts = PingOptions { on_behalf_of };
402
403 let mut handles = Vec::with_capacity(targets.len());
404 let user_agent = self.http_component.user_agent().to_string();
405 for target in targets {
406 let user_agent = user_agent.clone();
407 let client = Query::<C> {
408 http_client: client.clone(),
409 user_agent,
410 endpoint: target.endpoint,
411 canonical_endpoint: target.canonical_endpoint,
412 auth: target.auth,
413 tracing: self.tracing.clone(),
414 };
415
416 let handle = self.ping_one(client, copts.clone());
417
418 handles.push(handle);
419 }
420
421 let results = join_all(handles).await;
422
423 Ok(results)
424 }
425
426 pub async fn create_ping_report(
427 &self,
428 opts: PingQueryReportOptions<'_>,
429 ) -> error::Result<Vec<EndpointPingReport>> {
430 let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
431
432 let copts = PingOptions {
433 on_behalf_of: opts.on_behalf_of,
434 };
435 let timeout = opts.timeout;
436
437 let mut handles = Vec::with_capacity(targets.len());
438 let user_agent = self.http_component.user_agent().to_string();
439 for target in targets {
440 let user_agent = user_agent.clone();
441 let client = Query::<C> {
442 http_client: client.clone(),
443 user_agent,
444 endpoint: target.endpoint,
445 canonical_endpoint: target.canonical_endpoint,
446 auth: target.auth,
447 tracing: self.tracing.clone(),
448 };
449
450 let handle = self.create_one_report(client, timeout, copts.clone());
451
452 handles.push(handle);
453 }
454
455 let reports = join_all(handles).await;
456
457 Ok(reports)
458 }
459
460 async fn ping_one(&self, client: Query<C>, opts: PingOptions<'_>) -> error::Result<()> {
461 client.ping(&opts).await.map_err(error::Error::from)
462 }
463
464 async fn create_one_report(
465 &self,
466 client: Query<C>,
467 timeout: Duration,
468 opts: PingOptions<'_>,
469 ) -> EndpointPingReport {
470 let start = std::time::Instant::now();
471
472 let res = select! {
473 e = tokio::time::sleep(timeout) => {
474 return EndpointPingReport {
475 remote: client.endpoint,
476 error: None,
477 latency: std::time::Instant::now().sub(start),
478 id: None,
479 namespace: None,
480 state: PingState::Timeout,
481 }
482 }
483 r = client.ping(&opts) => r.map_err(error::Error::from),
484 };
485 let end = std::time::Instant::now();
486
487 let (error, state) = match res {
488 Ok(_) => (None, PingState::Ok),
489 Err(e) => (Some(e), PingState::Error),
490 };
491
492 EndpointPingReport {
493 remote: client.endpoint,
494 error,
495 latency: end.sub(start),
496 id: None,
497 namespace: None,
498 state,
499 }
500 }
501
502 async fn orchestrate_no_res_mgmt_call<Fut>(
503 &self,
504 retry_strategy: Arc<dyn RetryStrategy>,
505 retry_info: RetryRequest,
506 endpoint: Option<String>,
507 operation: impl Fn(Query<C>) -> Fut + Send + Sync,
508 ) -> error::Result<()>
509 where
510 Fut: Future<Output = error::Result<()>> + Send,
511 C: Client,
512 {
513 orchestrate_retries(
514 self.retry_manager.clone(),
515 retry_strategy,
516 retry_info,
517 async || {
518 self.http_component
519 .orchestrate_endpoint(
520 endpoint.clone(),
521 async |client: Arc<C>,
522 endpoint_id: String,
523 endpoint: String,
524 canonical_endpoint: String,
525 auth: Auth| {
526 operation(Query::<C> {
527 http_client: client,
528 user_agent: self.http_component.user_agent().to_string(),
529 endpoint,
530 canonical_endpoint,
531 auth,
532 tracing: self.tracing.clone(),
533 })
534 .await
535 },
536 )
537 .await
538 },
539 )
540 .await
541 }
542}