1use crate::authenticator::Authenticator;
20use crate::cbconfig::{CollectionManifest, FullBucketConfig, FullClusterConfig};
21use crate::componentconfigs::NetworkAndCanonicalEndpoint;
22use crate::error::ErrorKind;
23use crate::httpcomponent::{HttpComponent, HttpComponentState};
24use crate::httpx::client::Client;
25use crate::httpx::request::Auth;
26use crate::mgmtx::bucket_helper::EnsureBucketHelper;
27use crate::mgmtx::bucket_settings::BucketDef;
28use crate::mgmtx::group_helper::EnsureGroupHelper;
29use crate::mgmtx::manifest_helper::EnsureManifestHelper;
30use crate::mgmtx::mgmt::AutoFailoverSettings;
31use crate::mgmtx::mgmt_query::IndexStatus;
32use crate::mgmtx::node_target::NodeTarget;
33use crate::mgmtx::options::{
34 EnsureBucketPollOptions, EnsureGroupPollOptions, EnsureManifestPollOptions,
35 EnsureUserPollOptions,
36};
37use crate::mgmtx::responses::{
38 CreateCollectionResponse, CreateScopeResponse, DeleteCollectionResponse, DeleteScopeResponse,
39 UpdateCollectionResponse,
40};
41use crate::mgmtx::user::{Group, RoleAndDescription, UserAndMetadata};
42use crate::mgmtx::user_helper::EnsureUserHelper;
43use crate::options::management::{
44 ChangePasswordOptions, CreateBucketOptions, CreateCollectionOptions, CreateScopeOptions,
45 DeleteBucketOptions, DeleteCollectionOptions, DeleteGroupOptions, DeleteScopeOptions,
46 DeleteUserOptions, EnsureBucketOptions, EnsureGroupOptions, EnsureManifestOptions,
47 EnsureUserOptions, FlushBucketOptions, GetAllBucketsOptions, GetAllGroupsOptions,
48 GetAllUsersOptions, GetAutoFailoverSettingsOptions, GetBucketOptions, GetBucketStatsOptions,
49 GetCollectionManifestOptions, GetFullBucketConfigOptions, GetFullClusterConfigOptions,
50 GetGroupOptions, GetRolesOptions, GetUserOptions, IndexStatusOptions, LoadSampleBucketOptions,
51 UpdateBucketOptions, UpdateCollectionOptions, UpsertGroupOptions, UpsertUserOptions,
52};
53use crate::retry::{orchestrate_retries, RetryManager, RetryRequest};
54use crate::retrybesteffort::ExponentialBackoffCalculator;
55use crate::service_type::ServiceType;
56use crate::tracingcomponent::TracingComponent;
57use crate::{error, mgmtx};
58use serde_json::value::RawValue;
59use std::collections::HashMap;
60use std::sync::Arc;
61use std::time::Duration;
62use tracing::debug;
63
64pub(crate) struct MgmtComponent<C: Client> {
65 id: String,
66 http_component: HttpComponent<C>,
67 tracing: Arc<TracingComponent>,
68
69 retry_manager: Arc<RetryManager>,
70}
71
72pub(crate) struct MgmtComponentConfig {
73 pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
74 pub authenticator: Authenticator,
75}
76
77pub(crate) struct MgmtComponentOptions {
78 pub id: String,
79 pub user_agent: String,
80}
81
82impl<C: Client> MgmtComponent<C> {
83 pub fn new(
84 retry_manager: Arc<RetryManager>,
85 http_client: Arc<C>,
86 tracing: Arc<TracingComponent>,
87 config: MgmtComponentConfig,
88 opts: MgmtComponentOptions,
89 ) -> Self {
90 Self {
91 id: opts.id,
92 http_component: HttpComponent::new(
93 ServiceType::MGMT,
94 opts.user_agent,
95 http_client,
96 HttpComponentState::new(config.endpoints, config.authenticator),
97 ),
98 tracing,
99 retry_manager,
100 }
101 }
102
103 pub fn reconfigure(&self, config: MgmtComponentConfig) {
104 debug!(
105 "Management component {} updating endpoints to {:?}",
106 self.id,
107 &config.endpoints.keys().collect::<Vec<_>>()
108 );
109
110 self.http_component.reconfigure(HttpComponentState::new(
111 config.endpoints,
112 config.authenticator,
113 ))
114 }
115
116 pub async fn get_collection_manifest(
117 &self,
118 opts: &GetCollectionManifestOptions<'_>,
119 ) -> error::Result<CollectionManifest> {
120 let retry_info = RetryRequest::new("get_collection_manifest", false);
121
122 let copts = opts.into();
123
124 orchestrate_retries(
125 self.retry_manager.clone(),
126 opts.retry_strategy.clone(),
127 retry_info,
128 async || {
129 self.http_component
130 .orchestrate_endpoint(
131 None,
132 async |client: Arc<C>,
133 endpoint_id: String,
134 endpoint: String,
135 canonical_endpoint: String,
136 auth: Auth| {
137 let res = match (mgmtx::mgmt::Management::<C> {
138 http_client: client,
139 user_agent: self.http_component.user_agent().to_string(),
140 endpoint,
141 canonical_endpoint,
142 auth,
143 tracing: self.tracing.clone(),
144 }
145 .get_collection_manifest(&copts)
146 .await)
147 {
148 Ok(r) => r,
149 Err(e) => return Err(ErrorKind::Mgmt(e).into()),
150 };
151
152 Ok(res)
153 },
154 )
155 .await
156 },
157 )
158 .await
159 }
160
161 pub async fn create_scope(
162 &self,
163 opts: &CreateScopeOptions<'_>,
164 ) -> error::Result<CreateScopeResponse> {
165 let retry_info = RetryRequest::new("create_scope", false);
166
167 let copts = opts.into();
168
169 orchestrate_retries(
170 self.retry_manager.clone(),
171 opts.retry_strategy.clone(),
172 retry_info,
173 async || {
174 self.http_component
175 .orchestrate_endpoint(
176 None,
177 async |client: Arc<C>,
178 endpoint_id: String,
179 endpoint: String,
180 canonical_endpoint: String,
181 auth: Auth| {
182 let res = match (mgmtx::mgmt::Management::<C> {
183 http_client: client,
184 user_agent: self.http_component.user_agent().to_string(),
185 endpoint,
186 canonical_endpoint,
187 auth,
188 tracing: self.tracing.clone(),
189 }
190 .create_scope(&copts)
191 .await)
192 {
193 Ok(r) => r,
194 Err(e) => return Err(ErrorKind::Mgmt(e).into()),
195 };
196
197 Ok(res)
198 },
199 )
200 .await
201 },
202 )
203 .await
204 }
205
206 pub async fn delete_scope(
207 &self,
208 opts: &DeleteScopeOptions<'_>,
209 ) -> error::Result<DeleteScopeResponse> {
210 let retry_info = RetryRequest::new("delete_scope", false);
211
212 let copts = opts.into();
213
214 orchestrate_retries(
215 self.retry_manager.clone(),
216 opts.retry_strategy.clone(),
217 retry_info,
218 async || {
219 self.http_component
220 .orchestrate_endpoint(
221 None,
222 async |client: Arc<C>,
223 endpoint_id: String,
224 endpoint: String,
225 canonical_endpoint: String,
226 auth: Auth| {
227 let res = match (mgmtx::mgmt::Management::<C> {
228 http_client: client,
229 user_agent: self.http_component.user_agent().to_string(),
230 endpoint,
231 canonical_endpoint,
232 auth,
233 tracing: self.tracing.clone(),
234 }
235 .delete_scope(&copts)
236 .await)
237 {
238 Ok(r) => r,
239 Err(e) => return Err(ErrorKind::Mgmt(e).into()),
240 };
241
242 Ok(res)
243 },
244 )
245 .await
246 },
247 )
248 .await
249 }
250
251 pub async fn create_collection(
252 &self,
253 opts: &CreateCollectionOptions<'_>,
254 ) -> error::Result<CreateCollectionResponse> {
255 let retry_info = RetryRequest::new("create_collection", false);
256
257 let copts = opts.into();
258
259 orchestrate_retries(
260 self.retry_manager.clone(),
261 opts.retry_strategy.clone(),
262 retry_info,
263 async || {
264 self.http_component
265 .orchestrate_endpoint(
266 None,
267 async |client: Arc<C>,
268 endpoint_id: String,
269 endpoint: String,
270 canonical_endpoint: String,
271 auth: Auth| {
272 let res = match (mgmtx::mgmt::Management::<C> {
273 http_client: client,
274 user_agent: self.http_component.user_agent().to_string(),
275 endpoint,
276 canonical_endpoint,
277 auth,
278 tracing: self.tracing.clone(),
279 }
280 .create_collection(&copts)
281 .await)
282 {
283 Ok(r) => r,
284 Err(e) => return Err(ErrorKind::Mgmt(e).into()),
285 };
286
287 Ok(res)
288 },
289 )
290 .await
291 },
292 )
293 .await
294 }
295
296 pub async fn delete_collection(
297 &self,
298 opts: &DeleteCollectionOptions<'_>,
299 ) -> error::Result<DeleteCollectionResponse> {
300 let retry_info = RetryRequest::new("delete_collection", false);
301
302 let copts = opts.into();
303
304 orchestrate_retries(
305 self.retry_manager.clone(),
306 opts.retry_strategy.clone(),
307 retry_info,
308 async || {
309 self.http_component
310 .orchestrate_endpoint(
311 None,
312 async |client: Arc<C>,
313 endpoint_id: String,
314 endpoint: String,
315 canonical_endpoint: String,
316 auth: Auth| {
317 let res = match (mgmtx::mgmt::Management::<C> {
318 http_client: client,
319 user_agent: self.http_component.user_agent().to_string(),
320 endpoint,
321 canonical_endpoint,
322 auth,
323 tracing: self.tracing.clone(),
324 }
325 .delete_collection(&copts)
326 .await)
327 {
328 Ok(r) => r,
329 Err(e) => return Err(ErrorKind::Mgmt(e).into()),
330 };
331
332 Ok(res)
333 },
334 )
335 .await
336 },
337 )
338 .await
339 }
340
341 pub async fn update_collection(
342 &self,
343 opts: &UpdateCollectionOptions<'_>,
344 ) -> error::Result<UpdateCollectionResponse> {
345 let retry_info = RetryRequest::new("update_collection", false);
346
347 let copts = opts.into();
348
349 orchestrate_retries(
350 self.retry_manager.clone(),
351 opts.retry_strategy.clone(),
352 retry_info,
353 async || {
354 self.http_component
355 .orchestrate_endpoint(
356 None,
357 async |client: Arc<C>,
358 endpoint_id: String,
359 endpoint: String,
360 canonical_endpoint: String,
361 auth: Auth| {
362 let res = match (mgmtx::mgmt::Management::<C> {
363 http_client: client,
364 user_agent: self.http_component.user_agent().to_string(),
365 endpoint,
366 canonical_endpoint,
367 auth,
368 tracing: self.tracing.clone(),
369 }
370 .update_collection(&copts)
371 .await)
372 {
373 Ok(r) => r,
374 Err(e) => return Err(ErrorKind::Mgmt(e).into()),
375 };
376
377 Ok(res)
378 },
379 )
380 .await
381 },
382 )
383 .await
384 }
385
386 pub async fn get_all_buckets(
387 &self,
388 opts: &GetAllBucketsOptions<'_>,
389 ) -> error::Result<Vec<BucketDef>> {
390 let retry_info = RetryRequest::new("get_all_buckets", false);
391
392 let copts = opts.into();
393
394 orchestrate_retries(
395 self.retry_manager.clone(),
396 opts.retry_strategy.clone(),
397 retry_info,
398 async || {
399 self.http_component
400 .orchestrate_endpoint(
401 None,
402 async |client: Arc<C>,
403 endpoint_id: String,
404 endpoint: String,
405 canonical_endpoint: String,
406 auth: Auth| {
407 mgmtx::mgmt::Management::<C> {
408 http_client: client,
409 user_agent: self.http_component.user_agent().to_string(),
410 endpoint,
411 canonical_endpoint,
412 auth,
413 tracing: self.tracing.clone(),
414 }
415 .get_all_buckets(&copts)
416 .await
417 .map_err(|e| ErrorKind::Mgmt(e).into())
418 },
419 )
420 .await
421 },
422 )
423 .await
424 }
425
426 pub async fn get_bucket(&self, opts: &GetBucketOptions<'_>) -> error::Result<BucketDef> {
427 let retry_info = RetryRequest::new("get_bucket", false);
428
429 let copts = opts.into();
430
431 orchestrate_retries(
432 self.retry_manager.clone(),
433 opts.retry_strategy.clone(),
434 retry_info,
435 async || {
436 self.http_component
437 .orchestrate_endpoint(
438 None,
439 async |client: Arc<C>,
440 endpoint_id: String,
441 endpoint: String,
442 canonical_endpoint: String,
443 auth: Auth| {
444 mgmtx::mgmt::Management::<C> {
445 http_client: client,
446 user_agent: self.http_component.user_agent().to_string(),
447 endpoint,
448 canonical_endpoint,
449 auth,
450 tracing: self.tracing.clone(),
451 }
452 .get_bucket(&copts)
453 .await
454 .map_err(|e| ErrorKind::Mgmt(e).into())
455 },
456 )
457 .await
458 },
459 )
460 .await
461 }
462
463 pub async fn create_bucket(&self, opts: &CreateBucketOptions<'_>) -> error::Result<()> {
464 let retry_info = RetryRequest::new("create_bucket", false);
465
466 let copts = opts.into();
467
468 orchestrate_retries(
469 self.retry_manager.clone(),
470 opts.retry_strategy.clone(),
471 retry_info,
472 async || {
473 self.http_component
474 .orchestrate_endpoint(
475 None,
476 async |client: Arc<C>,
477 endpoint_id: String,
478 endpoint: String,
479 canonical_endpoint: String,
480 auth: Auth| {
481 mgmtx::mgmt::Management::<C> {
482 http_client: client,
483 user_agent: self.http_component.user_agent().to_string(),
484 endpoint,
485 canonical_endpoint,
486 auth,
487 tracing: self.tracing.clone(),
488 }
489 .create_bucket(&copts)
490 .await
491 .map_err(|e| ErrorKind::Mgmt(e).into())
492 },
493 )
494 .await
495 },
496 )
497 .await
498 }
499
500 pub async fn update_bucket(&self, opts: &UpdateBucketOptions<'_>) -> error::Result<()> {
501 let retry_info = RetryRequest::new("update_bucket", false);
502
503 let copts = opts.into();
504
505 orchestrate_retries(
506 self.retry_manager.clone(),
507 opts.retry_strategy.clone(),
508 retry_info,
509 async || {
510 self.http_component
511 .orchestrate_endpoint(
512 None,
513 async |client: Arc<C>,
514 endpoint_id: String,
515 endpoint: String,
516 canonical_endpoint: String,
517 auth: Auth| {
518 mgmtx::mgmt::Management::<C> {
519 http_client: client,
520 user_agent: self.http_component.user_agent().to_string(),
521 endpoint,
522 canonical_endpoint,
523 auth,
524 tracing: self.tracing.clone(),
525 }
526 .update_bucket(&copts)
527 .await
528 .map_err(|e| ErrorKind::Mgmt(e).into())
529 },
530 )
531 .await
532 },
533 )
534 .await
535 }
536
537 pub async fn delete_bucket(&self, opts: &DeleteBucketOptions<'_>) -> error::Result<()> {
538 let retry_info = RetryRequest::new("delete_bucket", false);
539
540 let copts = opts.into();
541
542 orchestrate_retries(
543 self.retry_manager.clone(),
544 opts.retry_strategy.clone(),
545 retry_info,
546 async || {
547 self.http_component
548 .orchestrate_endpoint(
549 None,
550 async |client: Arc<C>,
551 endpoint_id: String,
552 endpoint: String,
553 canonical_endpoint: String,
554 auth: Auth| {
555 mgmtx::mgmt::Management::<C> {
556 http_client: client,
557 user_agent: self.http_component.user_agent().to_string(),
558 endpoint,
559 canonical_endpoint,
560 auth,
561 tracing: self.tracing.clone(),
562 }
563 .delete_bucket(&copts)
564 .await
565 .map_err(|e| ErrorKind::Mgmt(e).into())
566 },
567 )
568 .await
569 },
570 )
571 .await
572 }
573
574 pub async fn flush_bucket(&self, opts: &FlushBucketOptions<'_>) -> error::Result<()> {
575 let retry_info = RetryRequest::new("flush_bucket", false);
576
577 let copts = opts.into();
578
579 orchestrate_retries(
580 self.retry_manager.clone(),
581 opts.retry_strategy.clone(),
582 retry_info,
583 async || {
584 self.http_component
585 .orchestrate_endpoint(
586 None,
587 async |client: Arc<C>,
588 endpoint_id: String,
589 endpoint: String,
590 canonical_endpoint: String,
591 auth: Auth| {
592 mgmtx::mgmt::Management::<C> {
593 http_client: client,
594 user_agent: self.http_component.user_agent().to_string(),
595 endpoint,
596 canonical_endpoint,
597 auth,
598 tracing: self.tracing.clone(),
599 }
600 .flush_bucket(&copts)
601 .await
602 .map_err(|e| ErrorKind::Mgmt(e).into())
603 },
604 )
605 .await
606 },
607 )
608 .await
609 }
610
611 pub async fn ensure_manifest(&self, opts: &EnsureManifestOptions<'_>) -> error::Result<()> {
612 let mut helper = EnsureManifestHelper::new(
613 self.http_component.user_agent(),
614 opts.bucket_name,
615 opts.manifest_uid,
616 opts.on_behalf_of_info,
617 );
618
619 let backoff = ExponentialBackoffCalculator::new(
620 Duration::from_millis(100),
621 Duration::from_millis(1000),
622 1.5,
623 );
624
625 self.http_component
626 .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
627 helper
628 .clone()
629 .poll(&EnsureManifestPollOptions { client, targets })
630 .await
631 .map_err(error::Error::from)
632 })
633 .await
634 }
635
636 pub async fn ensure_bucket(&self, opts: &EnsureBucketOptions<'_>) -> error::Result<()> {
637 let mut helper = EnsureBucketHelper::new(
638 self.http_component.user_agent(),
639 opts.bucket_name,
640 opts.bucket_uuid,
641 opts.want_missing,
642 opts.on_behalf_of_info,
643 );
644
645 let backoff = ExponentialBackoffCalculator::new(
646 Duration::from_millis(100),
647 Duration::from_millis(1000),
648 1.5,
649 );
650
651 self.http_component
652 .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
653 helper
654 .clone()
655 .poll(&EnsureBucketPollOptions { client, targets })
656 .await
657 .map_err(error::Error::from)
658 })
659 .await
660 }
661
662 pub async fn get_user(&self, opts: &GetUserOptions<'_>) -> error::Result<UserAndMetadata> {
663 let retry_info = RetryRequest::new("get_user", false);
664 let copts = opts.into();
665
666 orchestrate_retries(
667 self.retry_manager.clone(),
668 opts.retry_strategy.clone(),
669 retry_info,
670 async || {
671 self.http_component
672 .orchestrate_endpoint(
673 None,
674 async |client: Arc<C>,
675 endpoint_id: String,
676 endpoint: String,
677 canonical_endpoint: String,
678 auth: Auth| {
679 mgmtx::mgmt::Management::<C> {
680 http_client: client,
681 user_agent: self.http_component.user_agent().to_string(),
682 endpoint,
683 canonical_endpoint,
684 auth,
685 tracing: self.tracing.clone(),
686 }
687 .get_user(&copts)
688 .await
689 .map_err(|e| ErrorKind::Mgmt(e).into())
690 },
691 )
692 .await
693 },
694 )
695 .await
696 }
697
698 pub async fn get_all_users(
699 &self,
700 opts: &GetAllUsersOptions<'_>,
701 ) -> error::Result<Vec<UserAndMetadata>> {
702 let retry_info = RetryRequest::new("get_all_users", false);
703 let copts = opts.into();
704
705 orchestrate_retries(
706 self.retry_manager.clone(),
707 opts.retry_strategy.clone(),
708 retry_info,
709 async || {
710 self.http_component
711 .orchestrate_endpoint(
712 None,
713 async |client: Arc<C>,
714 endpoint_id: String,
715 endpoint: String,
716 canonical_endpoint: String,
717 auth: Auth| {
718 mgmtx::mgmt::Management::<C> {
719 http_client: client,
720 user_agent: self.http_component.user_agent().to_string(),
721 endpoint,
722 canonical_endpoint,
723 auth,
724 tracing: self.tracing.clone(),
725 }
726 .get_all_users(&copts)
727 .await
728 .map_err(|e| ErrorKind::Mgmt(e).into())
729 },
730 )
731 .await
732 },
733 )
734 .await
735 }
736
737 pub async fn upsert_user(&self, opts: &UpsertUserOptions<'_>) -> error::Result<()> {
738 let retry_info = RetryRequest::new("upsert_user", false);
739 let copts = opts.into();
740
741 orchestrate_retries(
742 self.retry_manager.clone(),
743 opts.retry_strategy.clone(),
744 retry_info,
745 async || {
746 self.http_component
747 .orchestrate_endpoint(
748 None,
749 async |client: Arc<C>,
750 endpoint_id: String,
751 endpoint: String,
752 canonical_endpoint: String,
753 auth: Auth| {
754 mgmtx::mgmt::Management::<C> {
755 http_client: client,
756 user_agent: self.http_component.user_agent().to_string(),
757 endpoint,
758 canonical_endpoint,
759 auth,
760 tracing: self.tracing.clone(),
761 }
762 .upsert_user(&copts)
763 .await
764 .map_err(|e| ErrorKind::Mgmt(e).into())
765 },
766 )
767 .await
768 },
769 )
770 .await
771 }
772
773 pub async fn delete_user(&self, opts: &DeleteUserOptions<'_>) -> error::Result<()> {
774 let retry_info = RetryRequest::new("delete_user", false);
775 let copts = opts.into();
776
777 orchestrate_retries(
778 self.retry_manager.clone(),
779 opts.retry_strategy.clone(),
780 retry_info,
781 async || {
782 self.http_component
783 .orchestrate_endpoint(
784 None,
785 async |client: Arc<C>,
786 endpoint_id: String,
787 endpoint: String,
788 canonical_endpoint: String,
789 auth: Auth| {
790 mgmtx::mgmt::Management::<C> {
791 http_client: client,
792 user_agent: self.http_component.user_agent().to_string(),
793 endpoint,
794 canonical_endpoint,
795 auth,
796 tracing: self.tracing.clone(),
797 }
798 .delete_user(&copts)
799 .await
800 .map_err(|e| ErrorKind::Mgmt(e).into())
801 },
802 )
803 .await
804 },
805 )
806 .await
807 }
808
809 pub async fn get_roles(
810 &self,
811 opts: &GetRolesOptions<'_>,
812 ) -> error::Result<Vec<RoleAndDescription>> {
813 let retry_info = RetryRequest::new("get_roles", false);
814 let copts = opts.into();
815
816 orchestrate_retries(
817 self.retry_manager.clone(),
818 opts.retry_strategy.clone(),
819 retry_info,
820 async || {
821 self.http_component
822 .orchestrate_endpoint(
823 None,
824 async |client: Arc<C>,
825 endpoint_id: String,
826 endpoint: String,
827 canonical_endpoint: String,
828 auth: Auth| {
829 mgmtx::mgmt::Management::<C> {
830 http_client: client,
831 user_agent: self.http_component.user_agent().to_string(),
832 endpoint,
833 canonical_endpoint,
834 auth,
835 tracing: self.tracing.clone(),
836 }
837 .get_roles(&copts)
838 .await
839 .map_err(|e| ErrorKind::Mgmt(e).into())
840 },
841 )
842 .await
843 },
844 )
845 .await
846 }
847
848 pub async fn get_group(&self, opts: &GetGroupOptions<'_>) -> error::Result<Group> {
849 let retry_info = RetryRequest::new("get_group", false);
850 let copts = opts.into();
851
852 orchestrate_retries(
853 self.retry_manager.clone(),
854 opts.retry_strategy.clone(),
855 retry_info,
856 async || {
857 self.http_component
858 .orchestrate_endpoint(
859 None,
860 async |client: Arc<C>,
861 endpoint_id: String,
862 endpoint: String,
863 canonical_endpoint: String,
864 auth: Auth| {
865 mgmtx::mgmt::Management::<C> {
866 http_client: client,
867 user_agent: self.http_component.user_agent().to_string(),
868 endpoint,
869 canonical_endpoint,
870 auth,
871 tracing: self.tracing.clone(),
872 }
873 .get_group(&copts)
874 .await
875 .map_err(|e| ErrorKind::Mgmt(e).into())
876 },
877 )
878 .await
879 },
880 )
881 .await
882 }
883
884 pub async fn get_all_groups(
885 &self,
886 opts: &GetAllGroupsOptions<'_>,
887 ) -> error::Result<Vec<Group>> {
888 let retry_info = RetryRequest::new("get_all_groups", false);
889 let copts = opts.into();
890
891 orchestrate_retries(
892 self.retry_manager.clone(),
893 opts.retry_strategy.clone(),
894 retry_info,
895 async || {
896 self.http_component
897 .orchestrate_endpoint(
898 None,
899 async |client: Arc<C>,
900 endpoint_id: String,
901 endpoint: String,
902 canonical_endpoint: String,
903 auth: Auth| {
904 mgmtx::mgmt::Management::<C> {
905 http_client: client,
906 user_agent: self.http_component.user_agent().to_string(),
907 endpoint,
908 canonical_endpoint,
909 auth,
910 tracing: self.tracing.clone(),
911 }
912 .get_all_groups(&copts)
913 .await
914 .map_err(|e| ErrorKind::Mgmt(e).into())
915 },
916 )
917 .await
918 },
919 )
920 .await
921 }
922
923 pub async fn upsert_group(&self, opts: &UpsertGroupOptions<'_>) -> error::Result<()> {
924 let retry_info = RetryRequest::new("upsert_group", false);
925 let copts = opts.into();
926
927 orchestrate_retries(
928 self.retry_manager.clone(),
929 opts.retry_strategy.clone(),
930 retry_info,
931 async || {
932 self.http_component
933 .orchestrate_endpoint(
934 None,
935 async |client: Arc<C>,
936 endpoint_id: String,
937 endpoint: String,
938 canonical_endpoint: String,
939 auth: Auth| {
940 mgmtx::mgmt::Management::<C> {
941 http_client: client,
942 user_agent: self.http_component.user_agent().to_string(),
943 endpoint,
944 canonical_endpoint,
945 auth,
946 tracing: self.tracing.clone(),
947 }
948 .upsert_group(&copts)
949 .await
950 .map_err(|e| ErrorKind::Mgmt(e).into())
951 },
952 )
953 .await
954 },
955 )
956 .await
957 }
958
959 pub async fn delete_group(&self, opts: &DeleteGroupOptions<'_>) -> error::Result<()> {
960 let retry_info = RetryRequest::new("delete_group", false);
961 let copts = opts.into();
962
963 orchestrate_retries(
964 self.retry_manager.clone(),
965 opts.retry_strategy.clone(),
966 retry_info,
967 async || {
968 self.http_component
969 .orchestrate_endpoint(
970 None,
971 async |client: Arc<C>,
972 endpoint_id: String,
973 endpoint: String,
974 canonical_endpoint: String,
975 auth: Auth| {
976 mgmtx::mgmt::Management::<C> {
977 http_client: client,
978 user_agent: self.http_component.user_agent().to_string(),
979 endpoint,
980 canonical_endpoint,
981 auth,
982 tracing: self.tracing.clone(),
983 }
984 .delete_group(&copts)
985 .await
986 .map_err(|e| ErrorKind::Mgmt(e).into())
987 },
988 )
989 .await
990 },
991 )
992 .await
993 }
994
995 pub async fn change_password(&self, opts: &ChangePasswordOptions<'_>) -> error::Result<()> {
996 let retry_info = RetryRequest::new("change_password", false);
997 let copts = opts.into();
998
999 orchestrate_retries(
1000 self.retry_manager.clone(),
1001 opts.retry_strategy.clone(),
1002 retry_info,
1003 async || {
1004 self.http_component
1005 .orchestrate_endpoint(
1006 None,
1007 async |client: Arc<C>,
1008 endpoint_id: String,
1009 endpoint: String,
1010 canonical_endpoint: String,
1011 auth: Auth| {
1012 mgmtx::mgmt::Management::<C> {
1013 http_client: client,
1014 user_agent: self.http_component.user_agent().to_string(),
1015 endpoint,
1016 canonical_endpoint,
1017 auth,
1018 tracing: self.tracing.clone(),
1019 }
1020 .change_password(&copts)
1021 .await
1022 .map_err(|e| ErrorKind::Mgmt(e).into())
1023 },
1024 )
1025 .await
1026 },
1027 )
1028 .await
1029 }
1030
1031 pub async fn ensure_user(&self, opts: &EnsureUserOptions<'_>) -> error::Result<()> {
1032 let mut helper = EnsureUserHelper::new(
1033 self.http_component.user_agent(),
1034 opts.username,
1035 opts.auth_domain,
1036 opts.want_missing,
1037 opts.on_behalf_of_info,
1038 );
1039
1040 let backoff = ExponentialBackoffCalculator::new(
1041 Duration::from_millis(100),
1042 Duration::from_millis(1000),
1043 1.5,
1044 );
1045
1046 self.http_component
1047 .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
1048 helper
1049 .clone()
1050 .poll(&EnsureUserPollOptions { client, targets })
1051 .await
1052 .map_err(error::Error::from)
1053 })
1054 .await
1055 }
1056
1057 pub async fn ensure_group(&self, opts: &EnsureGroupOptions<'_>) -> error::Result<()> {
1058 let mut helper = EnsureGroupHelper::new(
1059 self.http_component.user_agent(),
1060 opts.group_name,
1061 opts.want_missing,
1062 opts.on_behalf_of_info,
1063 );
1064
1065 let backoff = ExponentialBackoffCalculator::new(
1066 Duration::from_millis(100),
1067 Duration::from_millis(1000),
1068 1.5,
1069 );
1070
1071 self.http_component
1072 .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
1073 helper
1074 .clone()
1075 .poll(&EnsureGroupPollOptions { client, targets })
1076 .await
1077 .map_err(error::Error::from)
1078 })
1079 .await
1080 }
1081
1082 pub async fn get_full_cluster_config(
1083 &self,
1084 opts: &GetFullClusterConfigOptions<'_>,
1085 ) -> error::Result<FullClusterConfig> {
1086 let retry_info = RetryRequest::new("get_full_cluster_config", false);
1087 let copts = opts.into();
1088
1089 orchestrate_retries(
1090 self.retry_manager.clone(),
1091 opts.retry_strategy.clone(),
1092 retry_info,
1093 async || {
1094 self.http_component
1095 .orchestrate_endpoint(
1096 None,
1097 async |client: Arc<C>,
1098 endpoint_id: String,
1099 endpoint: String,
1100 canonical_endpoint: String,
1101 auth: Auth| {
1102 mgmtx::mgmt::Management::<C> {
1103 http_client: client,
1104 user_agent: self.http_component.user_agent().to_string(),
1105 endpoint,
1106 canonical_endpoint,
1107 auth,
1108 tracing: Default::default(),
1109 }
1110 .get_full_cluster_config(&copts)
1111 .await
1112 .map_err(|e| ErrorKind::Mgmt(e).into())
1113 },
1114 )
1115 .await
1116 },
1117 )
1118 .await
1119 }
1120
1121 pub async fn get_full_bucket_config(
1122 &self,
1123 opts: &GetFullBucketConfigOptions<'_>,
1124 ) -> error::Result<FullBucketConfig> {
1125 let retry_info = RetryRequest::new("get_full_bucket_config", false);
1126 let copts = opts.into();
1127
1128 orchestrate_retries(
1129 self.retry_manager.clone(),
1130 opts.retry_strategy.clone(),
1131 retry_info,
1132 async || {
1133 self.http_component
1134 .orchestrate_endpoint(
1135 None,
1136 async |client: Arc<C>,
1137 endpoint_id: String,
1138 endpoint: String,
1139 canonical_endpoint: String,
1140 auth: Auth| {
1141 mgmtx::mgmt::Management::<C> {
1142 http_client: client,
1143 user_agent: self.http_component.user_agent().to_string(),
1144 endpoint,
1145 canonical_endpoint,
1146 auth,
1147 tracing: Default::default(),
1148 }
1149 .get_full_bucket_config(&copts)
1150 .await
1151 .map_err(|e| ErrorKind::Mgmt(e).into())
1152 },
1153 )
1154 .await
1155 },
1156 )
1157 .await
1158 }
1159
1160 pub async fn load_sample_bucket(
1161 &self,
1162 opts: &LoadSampleBucketOptions<'_>,
1163 ) -> error::Result<()> {
1164 let retry_info = RetryRequest::new("load_sample_bucket", false);
1165 let copts = opts.into();
1166
1167 orchestrate_retries(
1168 self.retry_manager.clone(),
1169 opts.retry_strategy.clone(),
1170 retry_info,
1171 async || {
1172 self.http_component
1173 .orchestrate_endpoint(
1174 None,
1175 async |client: Arc<C>,
1176 endpoint_id: String,
1177 endpoint: String,
1178 canonical_endpoint: String,
1179 auth: Auth| {
1180 mgmtx::mgmt::Management::<C> {
1181 http_client: client,
1182 user_agent: self.http_component.user_agent().to_string(),
1183 endpoint,
1184 canonical_endpoint,
1185 auth,
1186 tracing: Default::default(),
1187 }
1188 .load_sample_bucket(&copts)
1189 .await
1190 .map_err(|e| ErrorKind::Mgmt(e).into())
1191 },
1192 )
1193 .await
1194 },
1195 )
1196 .await
1197 }
1198
1199 pub async fn index_status(&self, opts: &IndexStatusOptions<'_>) -> error::Result<IndexStatus> {
1200 let retry_info = RetryRequest::new("index_status", false);
1201 let copts = opts.into();
1202
1203 orchestrate_retries(
1204 self.retry_manager.clone(),
1205 opts.retry_strategy.clone(),
1206 retry_info,
1207 async || {
1208 self.http_component
1209 .orchestrate_endpoint(
1210 None,
1211 async |client: Arc<C>,
1212 endpoint_id: String,
1213 endpoint: String,
1214 canonical_endpoint: String,
1215 auth: Auth| {
1216 mgmtx::mgmt::Management::<C> {
1217 http_client: client,
1218 user_agent: self.http_component.user_agent().to_string(),
1219 endpoint,
1220 canonical_endpoint,
1221 auth,
1222 tracing: Default::default(),
1223 }
1224 .index_status(&copts)
1225 .await
1226 .map_err(|e| ErrorKind::Mgmt(e).into())
1227 },
1228 )
1229 .await
1230 },
1231 )
1232 .await
1233 }
1234
1235 pub async fn get_auto_failover_settings(
1236 &self,
1237 opts: &GetAutoFailoverSettingsOptions<'_>,
1238 ) -> error::Result<AutoFailoverSettings> {
1239 let retry_info = RetryRequest::new("get_auto_failover_settings", false);
1240 let copts = opts.into();
1241
1242 orchestrate_retries(
1243 self.retry_manager.clone(),
1244 opts.retry_strategy.clone(),
1245 retry_info,
1246 async || {
1247 self.http_component
1248 .orchestrate_endpoint(
1249 None,
1250 async |client: Arc<C>,
1251 endpoint_id: String,
1252 endpoint: String,
1253 canonical_endpoint: String,
1254 auth: Auth| {
1255 mgmtx::mgmt::Management::<C> {
1256 http_client: client,
1257 user_agent: self.http_component.user_agent().to_string(),
1258 endpoint,
1259 canonical_endpoint,
1260 auth,
1261 tracing: Default::default(),
1262 }
1263 .get_auto_failover_settings(&copts)
1264 .await
1265 .map_err(|e| ErrorKind::Mgmt(e).into())
1266 },
1267 )
1268 .await
1269 },
1270 )
1271 .await
1272 }
1273
1274 pub async fn get_bucket_stats(
1275 &self,
1276 opts: &GetBucketStatsOptions<'_>,
1277 ) -> error::Result<Box<RawValue>> {
1278 let retry_info = RetryRequest::new("get_bucket_stats", false);
1279 let retry = opts.retry_strategy.clone();
1280 let copts = opts.into();
1281
1282 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
1283 self.http_component
1284 .orchestrate_endpoint(
1285 None,
1286 async |client: Arc<C>,
1287 endpoint_id: String,
1288 endpoint: String,
1289 canonical_endpoint: String,
1290 auth: Auth| {
1291 mgmtx::mgmt::Management::<C> {
1292 http_client: client,
1293 user_agent: self.http_component.user_agent().to_string(),
1294 endpoint,
1295 canonical_endpoint,
1296 auth,
1297 tracing: Default::default(),
1298 }
1299 .get_bucket_stats(&copts)
1300 .await
1301 .map_err(|e| ErrorKind::Mgmt(e).into())
1302 },
1303 )
1304 .await
1305 })
1306 .await
1307 }
1308}