1use crate::authenticator::Authenticator;
20use crate::cbconfig::{CollectionManifest, FullBucketConfig, FullClusterConfig};
21use crate::componentconfigs::NetworkAndCanonicalEndpoint;
22use crate::error::ErrorKind;
23use crate::httpcomponent::{HttpComponent, HttpComponentState};
24use crate::httpx::client::Client;
25use crate::httpx::request::Auth;
26use crate::mgmtx::bucket_helper::EnsureBucketHelper;
27use crate::mgmtx::bucket_settings::BucketDef;
28use crate::mgmtx::group_helper::EnsureGroupHelper;
29use crate::mgmtx::manifest_helper::EnsureManifestHelper;
30use crate::mgmtx::mgmt::AutoFailoverSettings;
31use crate::mgmtx::mgmt_query::IndexStatus;
32use crate::mgmtx::node_target::NodeTarget;
33use crate::mgmtx::options::{
34 EnsureBucketPollOptions, EnsureGroupPollOptions, EnsureManifestPollOptions,
35 EnsureUserPollOptions,
36};
37use crate::mgmtx::responses::{
38 CreateCollectionResponse, CreateScopeResponse, DeleteCollectionResponse, DeleteScopeResponse,
39 UpdateCollectionResponse,
40};
41use crate::mgmtx::user::{Group, RoleAndDescription, UserAndMetadata};
42use crate::mgmtx::user_helper::EnsureUserHelper;
43use crate::options::management::{
44 ChangePasswordOptions, CreateBucketOptions, CreateCollectionOptions, CreateScopeOptions,
45 DeleteBucketOptions, DeleteCollectionOptions, DeleteGroupOptions, DeleteScopeOptions,
46 DeleteUserOptions, EnsureBucketOptions, EnsureGroupOptions, EnsureManifestOptions,
47 EnsureUserOptions, FlushBucketOptions, GetAllBucketsOptions, GetAllGroupsOptions,
48 GetAllUsersOptions, GetAutoFailoverSettingsOptions, GetBucketOptions, GetBucketStatsOptions,
49 GetCollectionManifestOptions, GetFullBucketConfigOptions, GetFullClusterConfigOptions,
50 GetGroupOptions, GetRolesOptions, GetUserOptions, IndexStatusOptions, LoadSampleBucketOptions,
51 UpdateBucketOptions, UpdateCollectionOptions, UpsertGroupOptions, UpsertUserOptions,
52};
53use crate::retry::{orchestrate_retries, RetryManager, RetryRequest};
54use crate::retrybesteffort::ExponentialBackoffCalculator;
55use crate::service_type::ServiceType;
56use crate::tracingcomponent::TracingComponent;
57use crate::{error, mgmtx};
58use serde_json::value::RawValue;
59use std::collections::HashMap;
60use std::sync::Arc;
61use std::time::Duration;
62
63pub(crate) struct MgmtComponent<C: Client> {
64 http_component: HttpComponent<C>,
65 tracing: Arc<TracingComponent>,
66
67 retry_manager: Arc<RetryManager>,
68}
69
70pub(crate) struct MgmtComponentConfig {
71 pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
72 pub authenticator: Authenticator,
73}
74
75pub(crate) struct MgmtComponentOptions {
76 pub user_agent: String,
77}
78
79impl<C: Client> MgmtComponent<C> {
80 pub fn new(
81 retry_manager: Arc<RetryManager>,
82 http_client: Arc<C>,
83 tracing: Arc<TracingComponent>,
84 config: MgmtComponentConfig,
85 opts: MgmtComponentOptions,
86 ) -> Self {
87 Self {
88 http_component: HttpComponent::new(
89 ServiceType::MGMT,
90 opts.user_agent,
91 http_client,
92 HttpComponentState::new(config.endpoints, config.authenticator),
93 ),
94 tracing,
95 retry_manager,
96 }
97 }
98
99 pub fn reconfigure(&self, config: MgmtComponentConfig) {
100 self.http_component.reconfigure(HttpComponentState::new(
101 config.endpoints,
102 config.authenticator,
103 ))
104 }
105
106 pub async fn get_collection_manifest(
107 &self,
108 opts: &GetCollectionManifestOptions<'_>,
109 ) -> error::Result<CollectionManifest> {
110 let retry_info = RetryRequest::new("get_collection_manifest", false);
111
112 let copts = opts.into();
113
114 orchestrate_retries(
115 self.retry_manager.clone(),
116 opts.retry_strategy.clone(),
117 retry_info,
118 async || {
119 self.http_component
120 .orchestrate_endpoint(
121 None,
122 async |client: Arc<C>,
123 endpoint_id: String,
124 endpoint: String,
125 canonical_endpoint: String,
126 auth: Auth| {
127 let res = match (mgmtx::mgmt::Management::<C> {
128 http_client: client,
129 user_agent: self.http_component.user_agent().to_string(),
130 endpoint,
131 canonical_endpoint,
132 auth,
133 tracing: self.tracing.clone(),
134 }
135 .get_collection_manifest(&copts)
136 .await)
137 {
138 Ok(r) => r,
139 Err(e) => return Err(ErrorKind::Mgmt(e).into()),
140 };
141
142 Ok(res)
143 },
144 )
145 .await
146 },
147 )
148 .await
149 }
150
151 pub async fn create_scope(
152 &self,
153 opts: &CreateScopeOptions<'_>,
154 ) -> error::Result<CreateScopeResponse> {
155 let retry_info = RetryRequest::new("create_scope", false);
156
157 let copts = opts.into();
158
159 orchestrate_retries(
160 self.retry_manager.clone(),
161 opts.retry_strategy.clone(),
162 retry_info,
163 async || {
164 self.http_component
165 .orchestrate_endpoint(
166 None,
167 async |client: Arc<C>,
168 endpoint_id: String,
169 endpoint: String,
170 canonical_endpoint: String,
171 auth: Auth| {
172 let res = match (mgmtx::mgmt::Management::<C> {
173 http_client: client,
174 user_agent: self.http_component.user_agent().to_string(),
175 endpoint,
176 canonical_endpoint,
177 auth,
178 tracing: self.tracing.clone(),
179 }
180 .create_scope(&copts)
181 .await)
182 {
183 Ok(r) => r,
184 Err(e) => return Err(ErrorKind::Mgmt(e).into()),
185 };
186
187 Ok(res)
188 },
189 )
190 .await
191 },
192 )
193 .await
194 }
195
196 pub async fn delete_scope(
197 &self,
198 opts: &DeleteScopeOptions<'_>,
199 ) -> error::Result<DeleteScopeResponse> {
200 let retry_info = RetryRequest::new("delete_scope", false);
201
202 let copts = opts.into();
203
204 orchestrate_retries(
205 self.retry_manager.clone(),
206 opts.retry_strategy.clone(),
207 retry_info,
208 async || {
209 self.http_component
210 .orchestrate_endpoint(
211 None,
212 async |client: Arc<C>,
213 endpoint_id: String,
214 endpoint: String,
215 canonical_endpoint: String,
216 auth: Auth| {
217 let res = match (mgmtx::mgmt::Management::<C> {
218 http_client: client,
219 user_agent: self.http_component.user_agent().to_string(),
220 endpoint,
221 canonical_endpoint,
222 auth,
223 tracing: self.tracing.clone(),
224 }
225 .delete_scope(&copts)
226 .await)
227 {
228 Ok(r) => r,
229 Err(e) => return Err(ErrorKind::Mgmt(e).into()),
230 };
231
232 Ok(res)
233 },
234 )
235 .await
236 },
237 )
238 .await
239 }
240
241 pub async fn create_collection(
242 &self,
243 opts: &CreateCollectionOptions<'_>,
244 ) -> error::Result<CreateCollectionResponse> {
245 let retry_info = RetryRequest::new("create_collection", false);
246
247 let copts = opts.into();
248
249 orchestrate_retries(
250 self.retry_manager.clone(),
251 opts.retry_strategy.clone(),
252 retry_info,
253 async || {
254 self.http_component
255 .orchestrate_endpoint(
256 None,
257 async |client: Arc<C>,
258 endpoint_id: String,
259 endpoint: String,
260 canonical_endpoint: String,
261 auth: Auth| {
262 let res = match (mgmtx::mgmt::Management::<C> {
263 http_client: client,
264 user_agent: self.http_component.user_agent().to_string(),
265 endpoint,
266 canonical_endpoint,
267 auth,
268 tracing: self.tracing.clone(),
269 }
270 .create_collection(&copts)
271 .await)
272 {
273 Ok(r) => r,
274 Err(e) => return Err(ErrorKind::Mgmt(e).into()),
275 };
276
277 Ok(res)
278 },
279 )
280 .await
281 },
282 )
283 .await
284 }
285
286 pub async fn delete_collection(
287 &self,
288 opts: &DeleteCollectionOptions<'_>,
289 ) -> error::Result<DeleteCollectionResponse> {
290 let retry_info = RetryRequest::new("delete_collection", false);
291
292 let copts = opts.into();
293
294 orchestrate_retries(
295 self.retry_manager.clone(),
296 opts.retry_strategy.clone(),
297 retry_info,
298 async || {
299 self.http_component
300 .orchestrate_endpoint(
301 None,
302 async |client: Arc<C>,
303 endpoint_id: String,
304 endpoint: String,
305 canonical_endpoint: String,
306 auth: Auth| {
307 let res = match (mgmtx::mgmt::Management::<C> {
308 http_client: client,
309 user_agent: self.http_component.user_agent().to_string(),
310 endpoint,
311 canonical_endpoint,
312 auth,
313 tracing: self.tracing.clone(),
314 }
315 .delete_collection(&copts)
316 .await)
317 {
318 Ok(r) => r,
319 Err(e) => return Err(ErrorKind::Mgmt(e).into()),
320 };
321
322 Ok(res)
323 },
324 )
325 .await
326 },
327 )
328 .await
329 }
330
331 pub async fn update_collection(
332 &self,
333 opts: &UpdateCollectionOptions<'_>,
334 ) -> error::Result<UpdateCollectionResponse> {
335 let retry_info = RetryRequest::new("update_collection", false);
336
337 let copts = opts.into();
338
339 orchestrate_retries(
340 self.retry_manager.clone(),
341 opts.retry_strategy.clone(),
342 retry_info,
343 async || {
344 self.http_component
345 .orchestrate_endpoint(
346 None,
347 async |client: Arc<C>,
348 endpoint_id: String,
349 endpoint: String,
350 canonical_endpoint: String,
351 auth: Auth| {
352 let res = match (mgmtx::mgmt::Management::<C> {
353 http_client: client,
354 user_agent: self.http_component.user_agent().to_string(),
355 endpoint,
356 canonical_endpoint,
357 auth,
358 tracing: self.tracing.clone(),
359 }
360 .update_collection(&copts)
361 .await)
362 {
363 Ok(r) => r,
364 Err(e) => return Err(ErrorKind::Mgmt(e).into()),
365 };
366
367 Ok(res)
368 },
369 )
370 .await
371 },
372 )
373 .await
374 }
375
376 pub async fn get_all_buckets(
377 &self,
378 opts: &GetAllBucketsOptions<'_>,
379 ) -> error::Result<Vec<BucketDef>> {
380 let retry_info = RetryRequest::new("get_all_buckets", false);
381
382 let copts = opts.into();
383
384 orchestrate_retries(
385 self.retry_manager.clone(),
386 opts.retry_strategy.clone(),
387 retry_info,
388 async || {
389 self.http_component
390 .orchestrate_endpoint(
391 None,
392 async |client: Arc<C>,
393 endpoint_id: String,
394 endpoint: String,
395 canonical_endpoint: String,
396 auth: Auth| {
397 mgmtx::mgmt::Management::<C> {
398 http_client: client,
399 user_agent: self.http_component.user_agent().to_string(),
400 endpoint,
401 canonical_endpoint,
402 auth,
403 tracing: self.tracing.clone(),
404 }
405 .get_all_buckets(&copts)
406 .await
407 .map_err(|e| ErrorKind::Mgmt(e).into())
408 },
409 )
410 .await
411 },
412 )
413 .await
414 }
415
416 pub async fn get_bucket(&self, opts: &GetBucketOptions<'_>) -> error::Result<BucketDef> {
417 let retry_info = RetryRequest::new("get_bucket", false);
418
419 let copts = opts.into();
420
421 orchestrate_retries(
422 self.retry_manager.clone(),
423 opts.retry_strategy.clone(),
424 retry_info,
425 async || {
426 self.http_component
427 .orchestrate_endpoint(
428 None,
429 async |client: Arc<C>,
430 endpoint_id: String,
431 endpoint: String,
432 canonical_endpoint: String,
433 auth: Auth| {
434 mgmtx::mgmt::Management::<C> {
435 http_client: client,
436 user_agent: self.http_component.user_agent().to_string(),
437 endpoint,
438 canonical_endpoint,
439 auth,
440 tracing: self.tracing.clone(),
441 }
442 .get_bucket(&copts)
443 .await
444 .map_err(|e| ErrorKind::Mgmt(e).into())
445 },
446 )
447 .await
448 },
449 )
450 .await
451 }
452
453 pub async fn create_bucket(&self, opts: &CreateBucketOptions<'_>) -> error::Result<()> {
454 let retry_info = RetryRequest::new("create_bucket", false);
455
456 let copts = opts.into();
457
458 orchestrate_retries(
459 self.retry_manager.clone(),
460 opts.retry_strategy.clone(),
461 retry_info,
462 async || {
463 self.http_component
464 .orchestrate_endpoint(
465 None,
466 async |client: Arc<C>,
467 endpoint_id: String,
468 endpoint: String,
469 canonical_endpoint: String,
470 auth: Auth| {
471 mgmtx::mgmt::Management::<C> {
472 http_client: client,
473 user_agent: self.http_component.user_agent().to_string(),
474 endpoint,
475 canonical_endpoint,
476 auth,
477 tracing: self.tracing.clone(),
478 }
479 .create_bucket(&copts)
480 .await
481 .map_err(|e| ErrorKind::Mgmt(e).into())
482 },
483 )
484 .await
485 },
486 )
487 .await
488 }
489
490 pub async fn update_bucket(&self, opts: &UpdateBucketOptions<'_>) -> error::Result<()> {
491 let retry_info = RetryRequest::new("update_bucket", false);
492
493 let copts = opts.into();
494
495 orchestrate_retries(
496 self.retry_manager.clone(),
497 opts.retry_strategy.clone(),
498 retry_info,
499 async || {
500 self.http_component
501 .orchestrate_endpoint(
502 None,
503 async |client: Arc<C>,
504 endpoint_id: String,
505 endpoint: String,
506 canonical_endpoint: String,
507 auth: Auth| {
508 mgmtx::mgmt::Management::<C> {
509 http_client: client,
510 user_agent: self.http_component.user_agent().to_string(),
511 endpoint,
512 canonical_endpoint,
513 auth,
514 tracing: self.tracing.clone(),
515 }
516 .update_bucket(&copts)
517 .await
518 .map_err(|e| ErrorKind::Mgmt(e).into())
519 },
520 )
521 .await
522 },
523 )
524 .await
525 }
526
527 pub async fn delete_bucket(&self, opts: &DeleteBucketOptions<'_>) -> error::Result<()> {
528 let retry_info = RetryRequest::new("delete_bucket", false);
529
530 let copts = opts.into();
531
532 orchestrate_retries(
533 self.retry_manager.clone(),
534 opts.retry_strategy.clone(),
535 retry_info,
536 async || {
537 self.http_component
538 .orchestrate_endpoint(
539 None,
540 async |client: Arc<C>,
541 endpoint_id: String,
542 endpoint: String,
543 canonical_endpoint: String,
544 auth: Auth| {
545 mgmtx::mgmt::Management::<C> {
546 http_client: client,
547 user_agent: self.http_component.user_agent().to_string(),
548 endpoint,
549 canonical_endpoint,
550 auth,
551 tracing: self.tracing.clone(),
552 }
553 .delete_bucket(&copts)
554 .await
555 .map_err(|e| ErrorKind::Mgmt(e).into())
556 },
557 )
558 .await
559 },
560 )
561 .await
562 }
563
564 pub async fn flush_bucket(&self, opts: &FlushBucketOptions<'_>) -> error::Result<()> {
565 let retry_info = RetryRequest::new("flush_bucket", false);
566
567 let copts = opts.into();
568
569 orchestrate_retries(
570 self.retry_manager.clone(),
571 opts.retry_strategy.clone(),
572 retry_info,
573 async || {
574 self.http_component
575 .orchestrate_endpoint(
576 None,
577 async |client: Arc<C>,
578 endpoint_id: String,
579 endpoint: String,
580 canonical_endpoint: String,
581 auth: Auth| {
582 mgmtx::mgmt::Management::<C> {
583 http_client: client,
584 user_agent: self.http_component.user_agent().to_string(),
585 endpoint,
586 canonical_endpoint,
587 auth,
588 tracing: self.tracing.clone(),
589 }
590 .flush_bucket(&copts)
591 .await
592 .map_err(|e| ErrorKind::Mgmt(e).into())
593 },
594 )
595 .await
596 },
597 )
598 .await
599 }
600
601 pub async fn ensure_manifest(&self, opts: &EnsureManifestOptions<'_>) -> error::Result<()> {
602 let mut helper = EnsureManifestHelper::new(
603 self.http_component.user_agent(),
604 opts.bucket_name,
605 opts.manifest_uid,
606 opts.on_behalf_of_info,
607 );
608
609 let backoff = ExponentialBackoffCalculator::new(
610 Duration::from_millis(100),
611 Duration::from_millis(1000),
612 1.5,
613 );
614
615 self.http_component
616 .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
617 helper
618 .clone()
619 .poll(&EnsureManifestPollOptions { client, targets })
620 .await
621 .map_err(error::Error::from)
622 })
623 .await
624 }
625
626 pub async fn ensure_bucket(&self, opts: &EnsureBucketOptions<'_>) -> error::Result<()> {
627 let mut helper = EnsureBucketHelper::new(
628 self.http_component.user_agent(),
629 opts.bucket_name,
630 opts.bucket_uuid,
631 opts.want_missing,
632 opts.on_behalf_of_info,
633 );
634
635 let backoff = ExponentialBackoffCalculator::new(
636 Duration::from_millis(100),
637 Duration::from_millis(1000),
638 1.5,
639 );
640
641 self.http_component
642 .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
643 helper
644 .clone()
645 .poll(&EnsureBucketPollOptions { client, targets })
646 .await
647 .map_err(error::Error::from)
648 })
649 .await
650 }
651
652 pub async fn get_user(&self, opts: &GetUserOptions<'_>) -> error::Result<UserAndMetadata> {
653 let retry_info = RetryRequest::new("get_user", false);
654 let copts = opts.into();
655
656 orchestrate_retries(
657 self.retry_manager.clone(),
658 opts.retry_strategy.clone(),
659 retry_info,
660 async || {
661 self.http_component
662 .orchestrate_endpoint(
663 None,
664 async |client: Arc<C>,
665 endpoint_id: String,
666 endpoint: String,
667 canonical_endpoint: String,
668 auth: Auth| {
669 mgmtx::mgmt::Management::<C> {
670 http_client: client,
671 user_agent: self.http_component.user_agent().to_string(),
672 endpoint,
673 canonical_endpoint,
674 auth,
675 tracing: self.tracing.clone(),
676 }
677 .get_user(&copts)
678 .await
679 .map_err(|e| ErrorKind::Mgmt(e).into())
680 },
681 )
682 .await
683 },
684 )
685 .await
686 }
687
688 pub async fn get_all_users(
689 &self,
690 opts: &GetAllUsersOptions<'_>,
691 ) -> error::Result<Vec<UserAndMetadata>> {
692 let retry_info = RetryRequest::new("get_all_users", false);
693 let copts = opts.into();
694
695 orchestrate_retries(
696 self.retry_manager.clone(),
697 opts.retry_strategy.clone(),
698 retry_info,
699 async || {
700 self.http_component
701 .orchestrate_endpoint(
702 None,
703 async |client: Arc<C>,
704 endpoint_id: String,
705 endpoint: String,
706 canonical_endpoint: String,
707 auth: Auth| {
708 mgmtx::mgmt::Management::<C> {
709 http_client: client,
710 user_agent: self.http_component.user_agent().to_string(),
711 endpoint,
712 canonical_endpoint,
713 auth,
714 tracing: self.tracing.clone(),
715 }
716 .get_all_users(&copts)
717 .await
718 .map_err(|e| ErrorKind::Mgmt(e).into())
719 },
720 )
721 .await
722 },
723 )
724 .await
725 }
726
727 pub async fn upsert_user(&self, opts: &UpsertUserOptions<'_>) -> error::Result<()> {
728 let retry_info = RetryRequest::new("upsert_user", false);
729 let copts = opts.into();
730
731 orchestrate_retries(
732 self.retry_manager.clone(),
733 opts.retry_strategy.clone(),
734 retry_info,
735 async || {
736 self.http_component
737 .orchestrate_endpoint(
738 None,
739 async |client: Arc<C>,
740 endpoint_id: String,
741 endpoint: String,
742 canonical_endpoint: String,
743 auth: Auth| {
744 mgmtx::mgmt::Management::<C> {
745 http_client: client,
746 user_agent: self.http_component.user_agent().to_string(),
747 endpoint,
748 canonical_endpoint,
749 auth,
750 tracing: self.tracing.clone(),
751 }
752 .upsert_user(&copts)
753 .await
754 .map_err(|e| ErrorKind::Mgmt(e).into())
755 },
756 )
757 .await
758 },
759 )
760 .await
761 }
762
763 pub async fn delete_user(&self, opts: &DeleteUserOptions<'_>) -> error::Result<()> {
764 let retry_info = RetryRequest::new("delete_user", false);
765 let copts = opts.into();
766
767 orchestrate_retries(
768 self.retry_manager.clone(),
769 opts.retry_strategy.clone(),
770 retry_info,
771 async || {
772 self.http_component
773 .orchestrate_endpoint(
774 None,
775 async |client: Arc<C>,
776 endpoint_id: String,
777 endpoint: String,
778 canonical_endpoint: String,
779 auth: Auth| {
780 mgmtx::mgmt::Management::<C> {
781 http_client: client,
782 user_agent: self.http_component.user_agent().to_string(),
783 endpoint,
784 canonical_endpoint,
785 auth,
786 tracing: self.tracing.clone(),
787 }
788 .delete_user(&copts)
789 .await
790 .map_err(|e| ErrorKind::Mgmt(e).into())
791 },
792 )
793 .await
794 },
795 )
796 .await
797 }
798
799 pub async fn get_roles(
800 &self,
801 opts: &GetRolesOptions<'_>,
802 ) -> error::Result<Vec<RoleAndDescription>> {
803 let retry_info = RetryRequest::new("get_roles", false);
804 let copts = opts.into();
805
806 orchestrate_retries(
807 self.retry_manager.clone(),
808 opts.retry_strategy.clone(),
809 retry_info,
810 async || {
811 self.http_component
812 .orchestrate_endpoint(
813 None,
814 async |client: Arc<C>,
815 endpoint_id: String,
816 endpoint: String,
817 canonical_endpoint: String,
818 auth: Auth| {
819 mgmtx::mgmt::Management::<C> {
820 http_client: client,
821 user_agent: self.http_component.user_agent().to_string(),
822 endpoint,
823 canonical_endpoint,
824 auth,
825 tracing: self.tracing.clone(),
826 }
827 .get_roles(&copts)
828 .await
829 .map_err(|e| ErrorKind::Mgmt(e).into())
830 },
831 )
832 .await
833 },
834 )
835 .await
836 }
837
838 pub async fn get_group(&self, opts: &GetGroupOptions<'_>) -> error::Result<Group> {
839 let retry_info = RetryRequest::new("get_group", false);
840 let copts = opts.into();
841
842 orchestrate_retries(
843 self.retry_manager.clone(),
844 opts.retry_strategy.clone(),
845 retry_info,
846 async || {
847 self.http_component
848 .orchestrate_endpoint(
849 None,
850 async |client: Arc<C>,
851 endpoint_id: String,
852 endpoint: String,
853 canonical_endpoint: String,
854 auth: Auth| {
855 mgmtx::mgmt::Management::<C> {
856 http_client: client,
857 user_agent: self.http_component.user_agent().to_string(),
858 endpoint,
859 canonical_endpoint,
860 auth,
861 tracing: self.tracing.clone(),
862 }
863 .get_group(&copts)
864 .await
865 .map_err(|e| ErrorKind::Mgmt(e).into())
866 },
867 )
868 .await
869 },
870 )
871 .await
872 }
873
874 pub async fn get_all_groups(
875 &self,
876 opts: &GetAllGroupsOptions<'_>,
877 ) -> error::Result<Vec<Group>> {
878 let retry_info = RetryRequest::new("get_all_groups", false);
879 let copts = opts.into();
880
881 orchestrate_retries(
882 self.retry_manager.clone(),
883 opts.retry_strategy.clone(),
884 retry_info,
885 async || {
886 self.http_component
887 .orchestrate_endpoint(
888 None,
889 async |client: Arc<C>,
890 endpoint_id: String,
891 endpoint: String,
892 canonical_endpoint: String,
893 auth: Auth| {
894 mgmtx::mgmt::Management::<C> {
895 http_client: client,
896 user_agent: self.http_component.user_agent().to_string(),
897 endpoint,
898 canonical_endpoint,
899 auth,
900 tracing: self.tracing.clone(),
901 }
902 .get_all_groups(&copts)
903 .await
904 .map_err(|e| ErrorKind::Mgmt(e).into())
905 },
906 )
907 .await
908 },
909 )
910 .await
911 }
912
913 pub async fn upsert_group(&self, opts: &UpsertGroupOptions<'_>) -> error::Result<()> {
914 let retry_info = RetryRequest::new("upsert_group", false);
915 let copts = opts.into();
916
917 orchestrate_retries(
918 self.retry_manager.clone(),
919 opts.retry_strategy.clone(),
920 retry_info,
921 async || {
922 self.http_component
923 .orchestrate_endpoint(
924 None,
925 async |client: Arc<C>,
926 endpoint_id: String,
927 endpoint: String,
928 canonical_endpoint: String,
929 auth: Auth| {
930 mgmtx::mgmt::Management::<C> {
931 http_client: client,
932 user_agent: self.http_component.user_agent().to_string(),
933 endpoint,
934 canonical_endpoint,
935 auth,
936 tracing: self.tracing.clone(),
937 }
938 .upsert_group(&copts)
939 .await
940 .map_err(|e| ErrorKind::Mgmt(e).into())
941 },
942 )
943 .await
944 },
945 )
946 .await
947 }
948
949 pub async fn delete_group(&self, opts: &DeleteGroupOptions<'_>) -> error::Result<()> {
950 let retry_info = RetryRequest::new("delete_group", false);
951 let copts = opts.into();
952
953 orchestrate_retries(
954 self.retry_manager.clone(),
955 opts.retry_strategy.clone(),
956 retry_info,
957 async || {
958 self.http_component
959 .orchestrate_endpoint(
960 None,
961 async |client: Arc<C>,
962 endpoint_id: String,
963 endpoint: String,
964 canonical_endpoint: String,
965 auth: Auth| {
966 mgmtx::mgmt::Management::<C> {
967 http_client: client,
968 user_agent: self.http_component.user_agent().to_string(),
969 endpoint,
970 canonical_endpoint,
971 auth,
972 tracing: self.tracing.clone(),
973 }
974 .delete_group(&copts)
975 .await
976 .map_err(|e| ErrorKind::Mgmt(e).into())
977 },
978 )
979 .await
980 },
981 )
982 .await
983 }
984
985 pub async fn change_password(&self, opts: &ChangePasswordOptions<'_>) -> error::Result<()> {
986 let retry_info = RetryRequest::new("change_password", false);
987 let copts = opts.into();
988
989 orchestrate_retries(
990 self.retry_manager.clone(),
991 opts.retry_strategy.clone(),
992 retry_info,
993 async || {
994 self.http_component
995 .orchestrate_endpoint(
996 None,
997 async |client: Arc<C>,
998 endpoint_id: String,
999 endpoint: String,
1000 canonical_endpoint: String,
1001 auth: Auth| {
1002 mgmtx::mgmt::Management::<C> {
1003 http_client: client,
1004 user_agent: self.http_component.user_agent().to_string(),
1005 endpoint,
1006 canonical_endpoint,
1007 auth,
1008 tracing: self.tracing.clone(),
1009 }
1010 .change_password(&copts)
1011 .await
1012 .map_err(|e| ErrorKind::Mgmt(e).into())
1013 },
1014 )
1015 .await
1016 },
1017 )
1018 .await
1019 }
1020
1021 pub async fn ensure_user(&self, opts: &EnsureUserOptions<'_>) -> error::Result<()> {
1022 let mut helper = EnsureUserHelper::new(
1023 self.http_component.user_agent(),
1024 opts.username,
1025 opts.auth_domain,
1026 opts.want_missing,
1027 opts.on_behalf_of_info,
1028 );
1029
1030 let backoff = ExponentialBackoffCalculator::new(
1031 Duration::from_millis(100),
1032 Duration::from_millis(1000),
1033 1.5,
1034 );
1035
1036 self.http_component
1037 .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
1038 helper
1039 .clone()
1040 .poll(&EnsureUserPollOptions { client, targets })
1041 .await
1042 .map_err(error::Error::from)
1043 })
1044 .await
1045 }
1046
1047 pub async fn ensure_group(&self, opts: &EnsureGroupOptions<'_>) -> error::Result<()> {
1048 let mut helper = EnsureGroupHelper::new(
1049 self.http_component.user_agent(),
1050 opts.group_name,
1051 opts.want_missing,
1052 opts.on_behalf_of_info,
1053 );
1054
1055 let backoff = ExponentialBackoffCalculator::new(
1056 Duration::from_millis(100),
1057 Duration::from_millis(1000),
1058 1.5,
1059 );
1060
1061 self.http_component
1062 .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
1063 helper
1064 .clone()
1065 .poll(&EnsureGroupPollOptions { client, targets })
1066 .await
1067 .map_err(error::Error::from)
1068 })
1069 .await
1070 }
1071
1072 pub async fn get_full_cluster_config(
1073 &self,
1074 opts: &GetFullClusterConfigOptions<'_>,
1075 ) -> error::Result<FullClusterConfig> {
1076 let retry_info = RetryRequest::new("get_full_cluster_config", false);
1077 let copts = opts.into();
1078
1079 orchestrate_retries(
1080 self.retry_manager.clone(),
1081 opts.retry_strategy.clone(),
1082 retry_info,
1083 async || {
1084 self.http_component
1085 .orchestrate_endpoint(
1086 None,
1087 async |client: Arc<C>,
1088 endpoint_id: String,
1089 endpoint: String,
1090 canonical_endpoint: String,
1091 auth: Auth| {
1092 mgmtx::mgmt::Management::<C> {
1093 http_client: client,
1094 user_agent: self.http_component.user_agent().to_string(),
1095 endpoint,
1096 canonical_endpoint,
1097 auth,
1098 tracing: Default::default(),
1099 }
1100 .get_full_cluster_config(&copts)
1101 .await
1102 .map_err(|e| ErrorKind::Mgmt(e).into())
1103 },
1104 )
1105 .await
1106 },
1107 )
1108 .await
1109 }
1110
1111 pub async fn get_full_bucket_config(
1112 &self,
1113 opts: &GetFullBucketConfigOptions<'_>,
1114 ) -> error::Result<FullBucketConfig> {
1115 let retry_info = RetryRequest::new("get_full_bucket_config", false);
1116 let copts = opts.into();
1117
1118 orchestrate_retries(
1119 self.retry_manager.clone(),
1120 opts.retry_strategy.clone(),
1121 retry_info,
1122 async || {
1123 self.http_component
1124 .orchestrate_endpoint(
1125 None,
1126 async |client: Arc<C>,
1127 endpoint_id: String,
1128 endpoint: String,
1129 canonical_endpoint: String,
1130 auth: Auth| {
1131 mgmtx::mgmt::Management::<C> {
1132 http_client: client,
1133 user_agent: self.http_component.user_agent().to_string(),
1134 endpoint,
1135 canonical_endpoint,
1136 auth,
1137 tracing: Default::default(),
1138 }
1139 .get_full_bucket_config(&copts)
1140 .await
1141 .map_err(|e| ErrorKind::Mgmt(e).into())
1142 },
1143 )
1144 .await
1145 },
1146 )
1147 .await
1148 }
1149
1150 pub async fn load_sample_bucket(
1151 &self,
1152 opts: &LoadSampleBucketOptions<'_>,
1153 ) -> error::Result<()> {
1154 let retry_info = RetryRequest::new("load_sample_bucket", false);
1155 let copts = opts.into();
1156
1157 orchestrate_retries(
1158 self.retry_manager.clone(),
1159 opts.retry_strategy.clone(),
1160 retry_info,
1161 async || {
1162 self.http_component
1163 .orchestrate_endpoint(
1164 None,
1165 async |client: Arc<C>,
1166 endpoint_id: String,
1167 endpoint: String,
1168 canonical_endpoint: String,
1169 auth: Auth| {
1170 mgmtx::mgmt::Management::<C> {
1171 http_client: client,
1172 user_agent: self.http_component.user_agent().to_string(),
1173 endpoint,
1174 canonical_endpoint,
1175 auth,
1176 tracing: Default::default(),
1177 }
1178 .load_sample_bucket(&copts)
1179 .await
1180 .map_err(|e| ErrorKind::Mgmt(e).into())
1181 },
1182 )
1183 .await
1184 },
1185 )
1186 .await
1187 }
1188
1189 pub async fn index_status(&self, opts: &IndexStatusOptions<'_>) -> error::Result<IndexStatus> {
1190 let retry_info = RetryRequest::new("index_status", false);
1191 let copts = opts.into();
1192
1193 orchestrate_retries(
1194 self.retry_manager.clone(),
1195 opts.retry_strategy.clone(),
1196 retry_info,
1197 async || {
1198 self.http_component
1199 .orchestrate_endpoint(
1200 None,
1201 async |client: Arc<C>,
1202 endpoint_id: String,
1203 endpoint: String,
1204 canonical_endpoint: String,
1205 auth: Auth| {
1206 mgmtx::mgmt::Management::<C> {
1207 http_client: client,
1208 user_agent: self.http_component.user_agent().to_string(),
1209 endpoint,
1210 canonical_endpoint,
1211 auth,
1212 tracing: Default::default(),
1213 }
1214 .index_status(&copts)
1215 .await
1216 .map_err(|e| ErrorKind::Mgmt(e).into())
1217 },
1218 )
1219 .await
1220 },
1221 )
1222 .await
1223 }
1224
1225 pub async fn get_auto_failover_settings(
1226 &self,
1227 opts: &GetAutoFailoverSettingsOptions<'_>,
1228 ) -> error::Result<AutoFailoverSettings> {
1229 let retry_info = RetryRequest::new("get_auto_failover_settings", false);
1230 let copts = opts.into();
1231
1232 orchestrate_retries(
1233 self.retry_manager.clone(),
1234 opts.retry_strategy.clone(),
1235 retry_info,
1236 async || {
1237 self.http_component
1238 .orchestrate_endpoint(
1239 None,
1240 async |client: Arc<C>,
1241 endpoint_id: String,
1242 endpoint: String,
1243 canonical_endpoint: String,
1244 auth: Auth| {
1245 mgmtx::mgmt::Management::<C> {
1246 http_client: client,
1247 user_agent: self.http_component.user_agent().to_string(),
1248 endpoint,
1249 canonical_endpoint,
1250 auth,
1251 tracing: Default::default(),
1252 }
1253 .get_auto_failover_settings(&copts)
1254 .await
1255 .map_err(|e| ErrorKind::Mgmt(e).into())
1256 },
1257 )
1258 .await
1259 },
1260 )
1261 .await
1262 }
1263
1264 pub async fn get_bucket_stats(
1265 &self,
1266 opts: &GetBucketStatsOptions<'_>,
1267 ) -> error::Result<Box<RawValue>> {
1268 let retry_info = RetryRequest::new("get_bucket_stats", false);
1269 let retry = opts.retry_strategy.clone();
1270 let copts = opts.into();
1271
1272 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
1273 self.http_component
1274 .orchestrate_endpoint(
1275 None,
1276 async |client: Arc<C>,
1277 endpoint_id: String,
1278 endpoint: String,
1279 canonical_endpoint: String,
1280 auth: Auth| {
1281 mgmtx::mgmt::Management::<C> {
1282 http_client: client,
1283 user_agent: self.http_component.user_agent().to_string(),
1284 endpoint,
1285 canonical_endpoint,
1286 auth,
1287 tracing: Default::default(),
1288 }
1289 .get_bucket_stats(&copts)
1290 .await
1291 .map_err(|e| ErrorKind::Mgmt(e).into())
1292 },
1293 )
1294 .await
1295 })
1296 .await
1297 }
1298}