1use crate::authenticator::Authenticator;
20use crate::diagnosticscomponent::PingQueryReportOptions;
21use crate::error::ErrorKind;
22use crate::httpcomponent::{HttpComponent, HttpComponentState};
23use crate::httpx::client::Client;
24use crate::mgmtx::node_target::NodeTarget;
25use crate::options::query::{
26 BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions,
27 DropPrimaryIndexOptions, EnsureIndexOptions, GetAllIndexesOptions, QueryOptions,
28 WatchIndexesOptions,
29};
30use crate::queryx::ensure_index_helper::EnsureIndexHelper;
31use crate::queryx::index::Index;
32use crate::queryx::preparedquery::{PreparedQuery, PreparedStatementCache};
33use crate::queryx::query::Query;
34use crate::queryx::query_options::{EnsureIndexPollOptions, PingOptions};
35use crate::results::pingreport::{EndpointPingReport, PingState};
36use crate::results::query::QueryResultStream;
37use crate::retry::{orchestrate_retries, RetryInfo, RetryManager, DEFAULT_RETRY_STRATEGY};
38use crate::retrybesteffort::ExponentialBackoffCalculator;
39use crate::service_type::ServiceType;
40use crate::{error, httpx};
41use futures::future::join_all;
42use futures::StreamExt;
43use std::collections::HashMap;
44use std::future::Future;
45use std::ops::Sub;
46use std::sync::{Arc, Mutex};
47use std::time::Duration;
48use tokio::select;
49
50pub(crate) struct QueryComponent<C: Client> {
51 http_component: HttpComponent<C>,
52
53 retry_manager: Arc<RetryManager>,
54 prepared_cache: Arc<Mutex<PreparedStatementCache>>,
55}
56
57#[derive(Debug)]
58pub(crate) struct QueryComponentConfig {
59 pub endpoints: HashMap<String, String>,
60 pub authenticator: Arc<Authenticator>,
61}
62
63pub(crate) struct QueryComponentOptions {
64 pub user_agent: String,
65}
66
67impl<C: Client + 'static> QueryComponent<C> {
68 pub fn new(
69 retry_manager: Arc<RetryManager>,
70 http_client: Arc<C>,
71 config: QueryComponentConfig,
72 opts: QueryComponentOptions,
73 ) -> Self {
74 Self {
75 http_component: HttpComponent::new(
76 ServiceType::QUERY,
77 opts.user_agent,
78 http_client,
79 HttpComponentState::new(config.endpoints, config.authenticator),
80 ),
81 retry_manager,
82 prepared_cache: Arc::new(Mutex::new(PreparedStatementCache::default())),
83 }
84 }
85
86 pub fn reconfigure(&self, config: QueryComponentConfig) {
87 self.http_component.reconfigure(HttpComponentState::new(
88 config.endpoints,
89 config.authenticator,
90 ))
91 }
92
93 pub async fn query(&self, opts: QueryOptions) -> error::Result<QueryResultStream> {
94 let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
95 retry_strategy
96 } else {
97 DEFAULT_RETRY_STRATEGY.clone()
98 };
99
100 let retry_info = RetryInfo::new("query", opts.read_only.unwrap_or_default(), retry);
101
102 let endpoint = opts.endpoint.clone();
103 let copts = opts.into();
104
105 orchestrate_retries(self.retry_manager.clone(), retry_info, async || {
106 self.http_component
107 .orchestrate_endpoint(
108 endpoint.clone(),
109 async |client: Arc<C>,
110 endpoint_id: String,
111 endpoint: String,
112 username: String,
113 password: String| {
114 let res = match (Query::<C> {
115 http_client: client,
116 user_agent: self.http_component.user_agent().to_string(),
117 endpoint: endpoint.clone(),
118 username,
119 password,
120 }
121 .query(&copts)
122 .await)
123 {
124 Ok(r) => r,
125 Err(e) => return Err(ErrorKind::Query(e).into()),
126 };
127
128 Ok(QueryResultStream {
129 inner: res,
130 endpoint,
131 })
132 },
133 )
134 .await
135 })
136 .await
137 }
138
139 pub async fn prepared_query(&self, opts: QueryOptions) -> error::Result<QueryResultStream> {
140 let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
141 retry_strategy
142 } else {
143 DEFAULT_RETRY_STRATEGY.clone()
144 };
145 let retry_info =
146 RetryInfo::new("prepared_query", opts.read_only.unwrap_or_default(), retry);
147
148 let endpoint = opts.endpoint.clone();
149 let copts = opts.into();
150
151 orchestrate_retries(self.retry_manager.clone(), retry_info, async || {
152 self.http_component
153 .orchestrate_endpoint(
154 endpoint.clone(),
155 async |client: Arc<C>,
156 endpoint_id: String,
157 endpoint: String,
158 username: String,
159 password: String| {
160 let res = match (PreparedQuery {
161 executor: Query::<C> {
162 http_client: client,
163 user_agent: self.http_component.user_agent().to_string(),
164 endpoint: endpoint.clone(),
165 username,
166 password,
167 },
168 cache: self.prepared_cache.clone(),
169 }
170 .prepared_query(&copts)
171 .await)
172 {
173 Ok(r) => r,
174 Err(e) => return Err(ErrorKind::Query(e).into()),
175 };
176
177 Ok(QueryResultStream {
178 inner: res,
179 endpoint,
180 })
181 },
182 )
183 .await
184 })
185 .await
186 }
187
188 pub async fn get_all_indexes(
189 &self,
190 opts: &GetAllIndexesOptions<'_>,
191 ) -> error::Result<Vec<Index>> {
192 let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
193 retry_strategy
194 } else {
195 DEFAULT_RETRY_STRATEGY.clone()
196 };
197
198 let retry_info = RetryInfo::new("query_get_all_indexes", true, retry);
199
200 let endpoint = opts.endpoint.clone();
201 let copts = opts.into();
202
203 orchestrate_retries(self.retry_manager.clone(), retry_info, async || {
204 self.http_component
205 .orchestrate_endpoint(
206 endpoint.clone(),
207 async |client: Arc<C>,
208 endpoint_id: String,
209 endpoint: String,
210 username: String,
211 password: String| {
212 let res = match (Query::<C> {
213 http_client: client,
214 user_agent: self.http_component.user_agent().to_string(),
215 endpoint: endpoint.clone(),
216 username,
217 password,
218 }
219 .get_all_indexes(&copts)
220 .await)
221 {
222 Ok(r) => r,
223 Err(e) => {
224 return Err(ErrorKind::Query(e).into());
225 }
226 };
227
228 Ok(res)
229 },
230 )
231 .await
232 })
233 .await
234 }
235
236 pub async fn create_primary_index(
237 &self,
238 opts: &CreatePrimaryIndexOptions<'_>,
239 ) -> error::Result<()> {
240 let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
241 retry_strategy
242 } else {
243 DEFAULT_RETRY_STRATEGY.clone()
244 };
245
246 let retry_info = RetryInfo::new("query_create_primary_index", false, retry);
247
248 let endpoint = opts.endpoint.clone();
249 let copts = opts.into();
250
251 self.orchestrate_no_res_mgmt_call(
252 retry_info,
253 endpoint.map(|e| e.to_string()),
254 async |query| {
255 query
256 .create_primary_index(&copts)
257 .await
258 .map_err(|e| ErrorKind::Query(e).into())
259 },
260 )
261 .await
262 }
263
264 pub async fn create_index(&self, opts: &CreateIndexOptions<'_>) -> error::Result<()> {
265 let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
266 retry_strategy
267 } else {
268 DEFAULT_RETRY_STRATEGY.clone()
269 };
270
271 let retry_info = RetryInfo::new("query_create_index", false, retry);
272
273 let endpoint = opts.endpoint.clone();
274 let copts = opts.into();
275
276 self.orchestrate_no_res_mgmt_call(
277 retry_info,
278 endpoint.map(|e| e.to_string()),
279 async |query| {
280 query
281 .create_index(&copts)
282 .await
283 .map_err(|e| ErrorKind::Query(e).into())
284 },
285 )
286 .await
287 }
288
289 pub async fn drop_primary_index(
290 &self,
291 opts: &DropPrimaryIndexOptions<'_>,
292 ) -> error::Result<()> {
293 let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
294 retry_strategy
295 } else {
296 DEFAULT_RETRY_STRATEGY.clone()
297 };
298
299 let retry_info = RetryInfo::new("query_drop_primary_index", false, retry);
300
301 let endpoint = opts.endpoint.clone();
302 let copts = opts.into();
303
304 self.orchestrate_no_res_mgmt_call(
305 retry_info,
306 endpoint.map(|e| e.to_string()),
307 async |query| {
308 query
309 .drop_primary_index(&copts)
310 .await
311 .map_err(|e| ErrorKind::Query(e).into())
312 },
313 )
314 .await
315 }
316
317 pub async fn drop_index(&self, opts: &DropIndexOptions<'_>) -> error::Result<()> {
318 let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
319 retry_strategy
320 } else {
321 DEFAULT_RETRY_STRATEGY.clone()
322 };
323
324 let retry_info = RetryInfo::new("query_drop_index", false, retry);
325
326 let endpoint = opts.endpoint.clone();
327 let copts = opts.into();
328
329 self.orchestrate_no_res_mgmt_call(
330 retry_info,
331 endpoint.map(|e| e.to_string()),
332 async |query| {
333 query
334 .drop_index(&copts)
335 .await
336 .map_err(|e| ErrorKind::Query(e).into())
337 },
338 )
339 .await
340 }
341
342 pub async fn build_deferred_indexes(
343 &self,
344 opts: &BuildDeferredIndexesOptions<'_>,
345 ) -> error::Result<()> {
346 let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
347 retry_strategy
348 } else {
349 DEFAULT_RETRY_STRATEGY.clone()
350 };
351
352 let retry_info = RetryInfo::new("query_build_deferred_indexes", false, retry);
353
354 let endpoint = opts.endpoint.clone();
355 let copts = opts.into();
356
357 self.orchestrate_no_res_mgmt_call(
358 retry_info,
359 endpoint.map(|e| e.to_string()),
360 async |query| {
361 query
362 .build_deferred_indexes(&copts)
363 .await
364 .map_err(|e| ErrorKind::Query(e).into())
365 },
366 )
367 .await
368 }
369
370 pub async fn watch_indexes(&self, opts: &WatchIndexesOptions<'_>) -> error::Result<()> {
371 let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
372 retry_strategy
373 } else {
374 DEFAULT_RETRY_STRATEGY.clone()
375 };
376
377 let retry_info = RetryInfo::new("query_watch_indexes", true, retry);
378
379 let endpoint = opts.endpoint.clone();
380 let copts = opts.into();
381
382 self.orchestrate_no_res_mgmt_call(
383 retry_info,
384 endpoint.map(|e| e.to_string()),
385 async |query| {
386 query
387 .watch_indexes(&copts)
388 .await
389 .map_err(|e| ErrorKind::Query(e).into())
390 },
391 )
392 .await
393 }
394
395 pub async fn ensure_index(&self, opts: &EnsureIndexOptions<'_>) -> error::Result<()> {
396 let mut helper = EnsureIndexHelper::new(
397 self.http_component.user_agent(),
398 opts.index_name,
399 opts.bucket_name,
400 opts.scope_name,
401 opts.collection_name,
402 opts.on_behalf_of_info,
403 );
404
405 let backoff = ExponentialBackoffCalculator::new(
406 Duration::from_millis(100),
407 Duration::from_millis(1000),
408 1.5,
409 );
410
411 self.http_component
412 .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
413 helper
414 .clone()
415 .poll(&EnsureIndexPollOptions {
416 client,
417 targets,
418 desired_state: opts.desired_state,
419 })
420 .await
421 .map_err(error::Error::from)
422 })
423 .await
424 }
425
426 pub async fn ping_all_endpoints(
427 &self,
428 on_behalf_of: Option<&httpx::request::OnBehalfOfInfo>,
429 ) -> error::Result<Vec<error::Result<()>>> {
430 let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
431
432 let copts = PingOptions { on_behalf_of };
433
434 let mut handles = Vec::with_capacity(targets.len());
435 let user_agent = self.http_component.user_agent().to_string();
436 for target in targets {
437 let user_agent = user_agent.clone();
438 let client = Query::<C> {
439 http_client: client.clone(),
440 user_agent,
441 endpoint: target.endpoint.clone(),
442 username: target.username,
443 password: target.password,
444 };
445
446 let handle = self.ping_one(client, copts.clone());
447
448 handles.push(handle);
449 }
450
451 let results = join_all(handles).await;
452
453 Ok(results)
454 }
455
456 pub async fn create_ping_report(
457 &self,
458 opts: PingQueryReportOptions<'_>,
459 ) -> error::Result<Vec<EndpointPingReport>> {
460 let (client, targets) = self.http_component.get_all_targets::<NodeTarget>(&[])?;
461
462 let copts = PingOptions {
463 on_behalf_of: opts.on_behalf_of,
464 };
465 let timeout = opts.timeout;
466
467 let mut handles = Vec::with_capacity(targets.len());
468 let user_agent = self.http_component.user_agent().to_string();
469 for target in targets {
470 let user_agent = user_agent.clone();
471 let client = Query::<C> {
472 http_client: client.clone(),
473 user_agent,
474 endpoint: target.endpoint.clone(),
475 username: target.username,
476 password: target.password,
477 };
478
479 let handle = self.create_one_report(client, timeout, copts.clone());
480
481 handles.push(handle);
482 }
483
484 let reports = join_all(handles).await;
485
486 Ok(reports)
487 }
488
489 async fn ping_one(&self, client: Query<C>, opts: PingOptions<'_>) -> error::Result<()> {
490 client.ping(&opts).await.map_err(error::Error::from)
491 }
492
493 async fn create_one_report(
494 &self,
495 client: Query<C>,
496 timeout: Duration,
497 opts: PingOptions<'_>,
498 ) -> EndpointPingReport {
499 let start = std::time::Instant::now();
500
501 let res = select! {
502 e = tokio::time::sleep(timeout) => {
503 return EndpointPingReport {
504 remote: client.endpoint,
505 error: None,
506 latency: std::time::Instant::now().sub(start),
507 id: None,
508 namespace: None,
509 state: PingState::Timeout,
510 }
511 }
512 r = client.ping(&opts) => r.map_err(error::Error::from),
513 };
514 let end = std::time::Instant::now();
515
516 let (error, state) = match res {
517 Ok(_) => (None, PingState::Ok),
518 Err(e) => (Some(e), PingState::Error),
519 };
520
521 EndpointPingReport {
522 remote: client.endpoint,
523 error,
524 latency: end.sub(start),
525 id: None,
526 namespace: None,
527 state,
528 }
529 }
530
531 async fn orchestrate_no_res_mgmt_call<Fut>(
532 &self,
533 retry_info: RetryInfo,
534 endpoint: Option<String>,
535 operation: impl Fn(Query<C>) -> Fut + Send + Sync,
536 ) -> error::Result<()>
537 where
538 Fut: Future<Output = error::Result<()>> + Send,
539 C: Client,
540 {
541 orchestrate_retries(self.retry_manager.clone(), retry_info, async || {
542 self.http_component
543 .orchestrate_endpoint(
544 endpoint.clone(),
545 async |client: Arc<C>,
546 endpoint_id: String,
547 endpoint: String,
548 username: String,
549 password: String| {
550 operation(Query::<C> {
551 http_client: client,
552 user_agent: self.http_component.user_agent().to_string(),
553 endpoint: endpoint.clone(),
554 username,
555 password,
556 })
557 .await
558 },
559 )
560 .await
561 })
562 .await
563 }
564}