Skip to main content

couchbase_core/
mgmtcomponent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::authenticator::Authenticator;
20use crate::cbconfig::{CollectionManifest, FullBucketConfig, FullClusterConfig};
21use crate::componentconfigs::NetworkAndCanonicalEndpoint;
22use crate::error::ErrorKind;
23use crate::httpcomponent::{HttpComponent, HttpComponentState};
24use crate::httpx::client::Client;
25use crate::httpx::request::Auth;
26use crate::mgmtx::bucket_helper::EnsureBucketHelper;
27use crate::mgmtx::bucket_settings::BucketDef;
28use crate::mgmtx::group_helper::EnsureGroupHelper;
29use crate::mgmtx::manifest_helper::EnsureManifestHelper;
30use crate::mgmtx::mgmt::AutoFailoverSettings;
31use crate::mgmtx::mgmt_query::IndexStatus;
32use crate::mgmtx::node_target::NodeTarget;
33use crate::mgmtx::options::{
34    EnsureBucketPollOptions, EnsureGroupPollOptions, EnsureManifestPollOptions,
35    EnsureUserPollOptions,
36};
37use crate::mgmtx::responses::{
38    CreateCollectionResponse, CreateScopeResponse, DeleteCollectionResponse, DeleteScopeResponse,
39    UpdateCollectionResponse,
40};
41use crate::mgmtx::user::{Group, RoleAndDescription, UserAndMetadata};
42use crate::mgmtx::user_helper::EnsureUserHelper;
43use crate::options::management::{
44    ChangePasswordOptions, CreateBucketOptions, CreateCollectionOptions, CreateScopeOptions,
45    DeleteBucketOptions, DeleteCollectionOptions, DeleteGroupOptions, DeleteScopeOptions,
46    DeleteUserOptions, EnsureBucketOptions, EnsureGroupOptions, EnsureManifestOptions,
47    EnsureUserOptions, FlushBucketOptions, GetAllBucketsOptions, GetAllGroupsOptions,
48    GetAllUsersOptions, GetAutoFailoverSettingsOptions, GetBucketOptions, GetBucketStatsOptions,
49    GetCollectionManifestOptions, GetFullBucketConfigOptions, GetFullClusterConfigOptions,
50    GetGroupOptions, GetRolesOptions, GetUserOptions, IndexStatusOptions, LoadSampleBucketOptions,
51    UpdateBucketOptions, UpdateCollectionOptions, UpsertGroupOptions, UpsertUserOptions,
52};
53use crate::retry::{orchestrate_retries, RetryManager, RetryRequest};
54use crate::retrybesteffort::ExponentialBackoffCalculator;
55use crate::service_type::ServiceType;
56use crate::tracingcomponent::TracingComponent;
57use crate::{error, mgmtx};
58use serde_json::value::RawValue;
59use std::collections::HashMap;
60use std::sync::Arc;
61use std::time::Duration;
62use tracing::debug;
63
64pub(crate) struct MgmtComponent<C: Client> {
65    id: String,
66    http_component: HttpComponent<C>,
67    tracing: Arc<TracingComponent>,
68
69    retry_manager: Arc<RetryManager>,
70}
71
72pub(crate) struct MgmtComponentConfig {
73    pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
74    pub authenticator: Authenticator,
75}
76
77pub(crate) struct MgmtComponentOptions {
78    pub id: String,
79    pub user_agent: String,
80}
81
82impl<C: Client> MgmtComponent<C> {
83    pub fn new(
84        retry_manager: Arc<RetryManager>,
85        http_client: Arc<C>,
86        tracing: Arc<TracingComponent>,
87        config: MgmtComponentConfig,
88        opts: MgmtComponentOptions,
89    ) -> Self {
90        Self {
91            id: opts.id,
92            http_component: HttpComponent::new(
93                ServiceType::MGMT,
94                opts.user_agent,
95                http_client,
96                HttpComponentState::new(config.endpoints, config.authenticator),
97            ),
98            tracing,
99            retry_manager,
100        }
101    }
102
103    pub fn reconfigure(&self, config: MgmtComponentConfig) {
104        debug!(
105            "Management component {} updating endpoints to {:?}",
106            self.id,
107            &config.endpoints.keys().collect::<Vec<_>>()
108        );
109
110        self.http_component.reconfigure(HttpComponentState::new(
111            config.endpoints,
112            config.authenticator,
113        ))
114    }
115
116    pub async fn get_collection_manifest(
117        &self,
118        opts: &GetCollectionManifestOptions<'_>,
119    ) -> error::Result<CollectionManifest> {
120        let retry_info = RetryRequest::new("get_collection_manifest", false);
121
122        let copts = opts.into();
123
124        orchestrate_retries(
125            self.retry_manager.clone(),
126            opts.retry_strategy.clone(),
127            retry_info,
128            async || {
129                self.http_component
130                    .orchestrate_endpoint(
131                        None,
132                        async |client: Arc<C>,
133                               endpoint_id: String,
134                               endpoint: String,
135                               canonical_endpoint: String,
136                               auth: Auth| {
137                            let res = match (mgmtx::mgmt::Management::<C> {
138                                http_client: client,
139                                user_agent: self.http_component.user_agent().to_string(),
140                                endpoint,
141                                canonical_endpoint,
142                                auth,
143                                tracing: self.tracing.clone(),
144                            }
145                            .get_collection_manifest(&copts)
146                            .await)
147                            {
148                                Ok(r) => r,
149                                Err(e) => return Err(ErrorKind::Mgmt(e).into()),
150                            };
151
152                            Ok(res)
153                        },
154                    )
155                    .await
156            },
157        )
158        .await
159    }
160
161    pub async fn create_scope(
162        &self,
163        opts: &CreateScopeOptions<'_>,
164    ) -> error::Result<CreateScopeResponse> {
165        let retry_info = RetryRequest::new("create_scope", false);
166
167        let copts = opts.into();
168
169        orchestrate_retries(
170            self.retry_manager.clone(),
171            opts.retry_strategy.clone(),
172            retry_info,
173            async || {
174                self.http_component
175                    .orchestrate_endpoint(
176                        None,
177                        async |client: Arc<C>,
178                               endpoint_id: String,
179                               endpoint: String,
180                               canonical_endpoint: String,
181                               auth: Auth| {
182                            let res = match (mgmtx::mgmt::Management::<C> {
183                                http_client: client,
184                                user_agent: self.http_component.user_agent().to_string(),
185                                endpoint,
186                                canonical_endpoint,
187                                auth,
188                                tracing: self.tracing.clone(),
189                            }
190                            .create_scope(&copts)
191                            .await)
192                            {
193                                Ok(r) => r,
194                                Err(e) => return Err(ErrorKind::Mgmt(e).into()),
195                            };
196
197                            Ok(res)
198                        },
199                    )
200                    .await
201            },
202        )
203        .await
204    }
205
206    pub async fn delete_scope(
207        &self,
208        opts: &DeleteScopeOptions<'_>,
209    ) -> error::Result<DeleteScopeResponse> {
210        let retry_info = RetryRequest::new("delete_scope", false);
211
212        let copts = opts.into();
213
214        orchestrate_retries(
215            self.retry_manager.clone(),
216            opts.retry_strategy.clone(),
217            retry_info,
218            async || {
219                self.http_component
220                    .orchestrate_endpoint(
221                        None,
222                        async |client: Arc<C>,
223                               endpoint_id: String,
224                               endpoint: String,
225                               canonical_endpoint: String,
226                               auth: Auth| {
227                            let res = match (mgmtx::mgmt::Management::<C> {
228                                http_client: client,
229                                user_agent: self.http_component.user_agent().to_string(),
230                                endpoint,
231                                canonical_endpoint,
232                                auth,
233                                tracing: self.tracing.clone(),
234                            }
235                            .delete_scope(&copts)
236                            .await)
237                            {
238                                Ok(r) => r,
239                                Err(e) => return Err(ErrorKind::Mgmt(e).into()),
240                            };
241
242                            Ok(res)
243                        },
244                    )
245                    .await
246            },
247        )
248        .await
249    }
250
251    pub async fn create_collection(
252        &self,
253        opts: &CreateCollectionOptions<'_>,
254    ) -> error::Result<CreateCollectionResponse> {
255        let retry_info = RetryRequest::new("create_collection", false);
256
257        let copts = opts.into();
258
259        orchestrate_retries(
260            self.retry_manager.clone(),
261            opts.retry_strategy.clone(),
262            retry_info,
263            async || {
264                self.http_component
265                    .orchestrate_endpoint(
266                        None,
267                        async |client: Arc<C>,
268                               endpoint_id: String,
269                               endpoint: String,
270                               canonical_endpoint: String,
271                               auth: Auth| {
272                            let res = match (mgmtx::mgmt::Management::<C> {
273                                http_client: client,
274                                user_agent: self.http_component.user_agent().to_string(),
275                                endpoint,
276                                canonical_endpoint,
277                                auth,
278                                tracing: self.tracing.clone(),
279                            }
280                            .create_collection(&copts)
281                            .await)
282                            {
283                                Ok(r) => r,
284                                Err(e) => return Err(ErrorKind::Mgmt(e).into()),
285                            };
286
287                            Ok(res)
288                        },
289                    )
290                    .await
291            },
292        )
293        .await
294    }
295
296    pub async fn delete_collection(
297        &self,
298        opts: &DeleteCollectionOptions<'_>,
299    ) -> error::Result<DeleteCollectionResponse> {
300        let retry_info = RetryRequest::new("delete_collection", false);
301
302        let copts = opts.into();
303
304        orchestrate_retries(
305            self.retry_manager.clone(),
306            opts.retry_strategy.clone(),
307            retry_info,
308            async || {
309                self.http_component
310                    .orchestrate_endpoint(
311                        None,
312                        async |client: Arc<C>,
313                               endpoint_id: String,
314                               endpoint: String,
315                               canonical_endpoint: String,
316                               auth: Auth| {
317                            let res = match (mgmtx::mgmt::Management::<C> {
318                                http_client: client,
319                                user_agent: self.http_component.user_agent().to_string(),
320                                endpoint,
321                                canonical_endpoint,
322                                auth,
323                                tracing: self.tracing.clone(),
324                            }
325                            .delete_collection(&copts)
326                            .await)
327                            {
328                                Ok(r) => r,
329                                Err(e) => return Err(ErrorKind::Mgmt(e).into()),
330                            };
331
332                            Ok(res)
333                        },
334                    )
335                    .await
336            },
337        )
338        .await
339    }
340
341    pub async fn update_collection(
342        &self,
343        opts: &UpdateCollectionOptions<'_>,
344    ) -> error::Result<UpdateCollectionResponse> {
345        let retry_info = RetryRequest::new("update_collection", false);
346
347        let copts = opts.into();
348
349        orchestrate_retries(
350            self.retry_manager.clone(),
351            opts.retry_strategy.clone(),
352            retry_info,
353            async || {
354                self.http_component
355                    .orchestrate_endpoint(
356                        None,
357                        async |client: Arc<C>,
358                               endpoint_id: String,
359                               endpoint: String,
360                               canonical_endpoint: String,
361                               auth: Auth| {
362                            let res = match (mgmtx::mgmt::Management::<C> {
363                                http_client: client,
364                                user_agent: self.http_component.user_agent().to_string(),
365                                endpoint,
366                                canonical_endpoint,
367                                auth,
368                                tracing: self.tracing.clone(),
369                            }
370                            .update_collection(&copts)
371                            .await)
372                            {
373                                Ok(r) => r,
374                                Err(e) => return Err(ErrorKind::Mgmt(e).into()),
375                            };
376
377                            Ok(res)
378                        },
379                    )
380                    .await
381            },
382        )
383        .await
384    }
385
386    pub async fn get_all_buckets(
387        &self,
388        opts: &GetAllBucketsOptions<'_>,
389    ) -> error::Result<Vec<BucketDef>> {
390        let retry_info = RetryRequest::new("get_all_buckets", false);
391
392        let copts = opts.into();
393
394        orchestrate_retries(
395            self.retry_manager.clone(),
396            opts.retry_strategy.clone(),
397            retry_info,
398            async || {
399                self.http_component
400                    .orchestrate_endpoint(
401                        None,
402                        async |client: Arc<C>,
403                               endpoint_id: String,
404                               endpoint: String,
405                               canonical_endpoint: String,
406                               auth: Auth| {
407                            mgmtx::mgmt::Management::<C> {
408                                http_client: client,
409                                user_agent: self.http_component.user_agent().to_string(),
410                                endpoint,
411                                canonical_endpoint,
412                                auth,
413                                tracing: self.tracing.clone(),
414                            }
415                            .get_all_buckets(&copts)
416                            .await
417                            .map_err(|e| ErrorKind::Mgmt(e).into())
418                        },
419                    )
420                    .await
421            },
422        )
423        .await
424    }
425
426    pub async fn get_bucket(&self, opts: &GetBucketOptions<'_>) -> error::Result<BucketDef> {
427        let retry_info = RetryRequest::new("get_bucket", false);
428
429        let copts = opts.into();
430
431        orchestrate_retries(
432            self.retry_manager.clone(),
433            opts.retry_strategy.clone(),
434            retry_info,
435            async || {
436                self.http_component
437                    .orchestrate_endpoint(
438                        None,
439                        async |client: Arc<C>,
440                               endpoint_id: String,
441                               endpoint: String,
442                               canonical_endpoint: String,
443                               auth: Auth| {
444                            mgmtx::mgmt::Management::<C> {
445                                http_client: client,
446                                user_agent: self.http_component.user_agent().to_string(),
447                                endpoint,
448                                canonical_endpoint,
449                                auth,
450                                tracing: self.tracing.clone(),
451                            }
452                            .get_bucket(&copts)
453                            .await
454                            .map_err(|e| ErrorKind::Mgmt(e).into())
455                        },
456                    )
457                    .await
458            },
459        )
460        .await
461    }
462
463    pub async fn create_bucket(&self, opts: &CreateBucketOptions<'_>) -> error::Result<()> {
464        let retry_info = RetryRequest::new("create_bucket", false);
465
466        let copts = opts.into();
467
468        orchestrate_retries(
469            self.retry_manager.clone(),
470            opts.retry_strategy.clone(),
471            retry_info,
472            async || {
473                self.http_component
474                    .orchestrate_endpoint(
475                        None,
476                        async |client: Arc<C>,
477                               endpoint_id: String,
478                               endpoint: String,
479                               canonical_endpoint: String,
480                               auth: Auth| {
481                            mgmtx::mgmt::Management::<C> {
482                                http_client: client,
483                                user_agent: self.http_component.user_agent().to_string(),
484                                endpoint,
485                                canonical_endpoint,
486                                auth,
487                                tracing: self.tracing.clone(),
488                            }
489                            .create_bucket(&copts)
490                            .await
491                            .map_err(|e| ErrorKind::Mgmt(e).into())
492                        },
493                    )
494                    .await
495            },
496        )
497        .await
498    }
499
500    pub async fn update_bucket(&self, opts: &UpdateBucketOptions<'_>) -> error::Result<()> {
501        let retry_info = RetryRequest::new("update_bucket", false);
502
503        let copts = opts.into();
504
505        orchestrate_retries(
506            self.retry_manager.clone(),
507            opts.retry_strategy.clone(),
508            retry_info,
509            async || {
510                self.http_component
511                    .orchestrate_endpoint(
512                        None,
513                        async |client: Arc<C>,
514                               endpoint_id: String,
515                               endpoint: String,
516                               canonical_endpoint: String,
517                               auth: Auth| {
518                            mgmtx::mgmt::Management::<C> {
519                                http_client: client,
520                                user_agent: self.http_component.user_agent().to_string(),
521                                endpoint,
522                                canonical_endpoint,
523                                auth,
524                                tracing: self.tracing.clone(),
525                            }
526                            .update_bucket(&copts)
527                            .await
528                            .map_err(|e| ErrorKind::Mgmt(e).into())
529                        },
530                    )
531                    .await
532            },
533        )
534        .await
535    }
536
537    pub async fn delete_bucket(&self, opts: &DeleteBucketOptions<'_>) -> error::Result<()> {
538        let retry_info = RetryRequest::new("delete_bucket", false);
539
540        let copts = opts.into();
541
542        orchestrate_retries(
543            self.retry_manager.clone(),
544            opts.retry_strategy.clone(),
545            retry_info,
546            async || {
547                self.http_component
548                    .orchestrate_endpoint(
549                        None,
550                        async |client: Arc<C>,
551                               endpoint_id: String,
552                               endpoint: String,
553                               canonical_endpoint: String,
554                               auth: Auth| {
555                            mgmtx::mgmt::Management::<C> {
556                                http_client: client,
557                                user_agent: self.http_component.user_agent().to_string(),
558                                endpoint,
559                                canonical_endpoint,
560                                auth,
561                                tracing: self.tracing.clone(),
562                            }
563                            .delete_bucket(&copts)
564                            .await
565                            .map_err(|e| ErrorKind::Mgmt(e).into())
566                        },
567                    )
568                    .await
569            },
570        )
571        .await
572    }
573
574    pub async fn flush_bucket(&self, opts: &FlushBucketOptions<'_>) -> error::Result<()> {
575        let retry_info = RetryRequest::new("flush_bucket", false);
576
577        let copts = opts.into();
578
579        orchestrate_retries(
580            self.retry_manager.clone(),
581            opts.retry_strategy.clone(),
582            retry_info,
583            async || {
584                self.http_component
585                    .orchestrate_endpoint(
586                        None,
587                        async |client: Arc<C>,
588                               endpoint_id: String,
589                               endpoint: String,
590                               canonical_endpoint: String,
591                               auth: Auth| {
592                            mgmtx::mgmt::Management::<C> {
593                                http_client: client,
594                                user_agent: self.http_component.user_agent().to_string(),
595                                endpoint,
596                                canonical_endpoint,
597                                auth,
598                                tracing: self.tracing.clone(),
599                            }
600                            .flush_bucket(&copts)
601                            .await
602                            .map_err(|e| ErrorKind::Mgmt(e).into())
603                        },
604                    )
605                    .await
606            },
607        )
608        .await
609    }
610
611    pub async fn ensure_manifest(&self, opts: &EnsureManifestOptions<'_>) -> error::Result<()> {
612        let mut helper = EnsureManifestHelper::new(
613            self.http_component.user_agent(),
614            opts.bucket_name,
615            opts.manifest_uid,
616            opts.on_behalf_of_info,
617        );
618
619        let backoff = ExponentialBackoffCalculator::new(
620            Duration::from_millis(100),
621            Duration::from_millis(1000),
622            1.5,
623        );
624
625        self.http_component
626            .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
627                helper
628                    .clone()
629                    .poll(&EnsureManifestPollOptions { client, targets })
630                    .await
631                    .map_err(error::Error::from)
632            })
633            .await
634    }
635
636    pub async fn ensure_bucket(&self, opts: &EnsureBucketOptions<'_>) -> error::Result<()> {
637        let mut helper = EnsureBucketHelper::new(
638            self.http_component.user_agent(),
639            opts.bucket_name,
640            opts.bucket_uuid,
641            opts.want_missing,
642            opts.on_behalf_of_info,
643        );
644
645        let backoff = ExponentialBackoffCalculator::new(
646            Duration::from_millis(100),
647            Duration::from_millis(1000),
648            1.5,
649        );
650
651        self.http_component
652            .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
653                helper
654                    .clone()
655                    .poll(&EnsureBucketPollOptions { client, targets })
656                    .await
657                    .map_err(error::Error::from)
658            })
659            .await
660    }
661
662    pub async fn get_user(&self, opts: &GetUserOptions<'_>) -> error::Result<UserAndMetadata> {
663        let retry_info = RetryRequest::new("get_user", false);
664        let copts = opts.into();
665
666        orchestrate_retries(
667            self.retry_manager.clone(),
668            opts.retry_strategy.clone(),
669            retry_info,
670            async || {
671                self.http_component
672                    .orchestrate_endpoint(
673                        None,
674                        async |client: Arc<C>,
675                               endpoint_id: String,
676                               endpoint: String,
677                               canonical_endpoint: String,
678                               auth: Auth| {
679                            mgmtx::mgmt::Management::<C> {
680                                http_client: client,
681                                user_agent: self.http_component.user_agent().to_string(),
682                                endpoint,
683                                canonical_endpoint,
684                                auth,
685                                tracing: self.tracing.clone(),
686                            }
687                            .get_user(&copts)
688                            .await
689                            .map_err(|e| ErrorKind::Mgmt(e).into())
690                        },
691                    )
692                    .await
693            },
694        )
695        .await
696    }
697
698    pub async fn get_all_users(
699        &self,
700        opts: &GetAllUsersOptions<'_>,
701    ) -> error::Result<Vec<UserAndMetadata>> {
702        let retry_info = RetryRequest::new("get_all_users", false);
703        let copts = opts.into();
704
705        orchestrate_retries(
706            self.retry_manager.clone(),
707            opts.retry_strategy.clone(),
708            retry_info,
709            async || {
710                self.http_component
711                    .orchestrate_endpoint(
712                        None,
713                        async |client: Arc<C>,
714                               endpoint_id: String,
715                               endpoint: String,
716                               canonical_endpoint: String,
717                               auth: Auth| {
718                            mgmtx::mgmt::Management::<C> {
719                                http_client: client,
720                                user_agent: self.http_component.user_agent().to_string(),
721                                endpoint,
722                                canonical_endpoint,
723                                auth,
724                                tracing: self.tracing.clone(),
725                            }
726                            .get_all_users(&copts)
727                            .await
728                            .map_err(|e| ErrorKind::Mgmt(e).into())
729                        },
730                    )
731                    .await
732            },
733        )
734        .await
735    }
736
737    pub async fn upsert_user(&self, opts: &UpsertUserOptions<'_>) -> error::Result<()> {
738        let retry_info = RetryRequest::new("upsert_user", false);
739        let copts = opts.into();
740
741        orchestrate_retries(
742            self.retry_manager.clone(),
743            opts.retry_strategy.clone(),
744            retry_info,
745            async || {
746                self.http_component
747                    .orchestrate_endpoint(
748                        None,
749                        async |client: Arc<C>,
750                               endpoint_id: String,
751                               endpoint: String,
752                               canonical_endpoint: String,
753                               auth: Auth| {
754                            mgmtx::mgmt::Management::<C> {
755                                http_client: client,
756                                user_agent: self.http_component.user_agent().to_string(),
757                                endpoint,
758                                canonical_endpoint,
759                                auth,
760                                tracing: self.tracing.clone(),
761                            }
762                            .upsert_user(&copts)
763                            .await
764                            .map_err(|e| ErrorKind::Mgmt(e).into())
765                        },
766                    )
767                    .await
768            },
769        )
770        .await
771    }
772
773    pub async fn delete_user(&self, opts: &DeleteUserOptions<'_>) -> error::Result<()> {
774        let retry_info = RetryRequest::new("delete_user", false);
775        let copts = opts.into();
776
777        orchestrate_retries(
778            self.retry_manager.clone(),
779            opts.retry_strategy.clone(),
780            retry_info,
781            async || {
782                self.http_component
783                    .orchestrate_endpoint(
784                        None,
785                        async |client: Arc<C>,
786                               endpoint_id: String,
787                               endpoint: String,
788                               canonical_endpoint: String,
789                               auth: Auth| {
790                            mgmtx::mgmt::Management::<C> {
791                                http_client: client,
792                                user_agent: self.http_component.user_agent().to_string(),
793                                endpoint,
794                                canonical_endpoint,
795                                auth,
796                                tracing: self.tracing.clone(),
797                            }
798                            .delete_user(&copts)
799                            .await
800                            .map_err(|e| ErrorKind::Mgmt(e).into())
801                        },
802                    )
803                    .await
804            },
805        )
806        .await
807    }
808
809    pub async fn get_roles(
810        &self,
811        opts: &GetRolesOptions<'_>,
812    ) -> error::Result<Vec<RoleAndDescription>> {
813        let retry_info = RetryRequest::new("get_roles", false);
814        let copts = opts.into();
815
816        orchestrate_retries(
817            self.retry_manager.clone(),
818            opts.retry_strategy.clone(),
819            retry_info,
820            async || {
821                self.http_component
822                    .orchestrate_endpoint(
823                        None,
824                        async |client: Arc<C>,
825                               endpoint_id: String,
826                               endpoint: String,
827                               canonical_endpoint: String,
828                               auth: Auth| {
829                            mgmtx::mgmt::Management::<C> {
830                                http_client: client,
831                                user_agent: self.http_component.user_agent().to_string(),
832                                endpoint,
833                                canonical_endpoint,
834                                auth,
835                                tracing: self.tracing.clone(),
836                            }
837                            .get_roles(&copts)
838                            .await
839                            .map_err(|e| ErrorKind::Mgmt(e).into())
840                        },
841                    )
842                    .await
843            },
844        )
845        .await
846    }
847
848    pub async fn get_group(&self, opts: &GetGroupOptions<'_>) -> error::Result<Group> {
849        let retry_info = RetryRequest::new("get_group", false);
850        let copts = opts.into();
851
852        orchestrate_retries(
853            self.retry_manager.clone(),
854            opts.retry_strategy.clone(),
855            retry_info,
856            async || {
857                self.http_component
858                    .orchestrate_endpoint(
859                        None,
860                        async |client: Arc<C>,
861                               endpoint_id: String,
862                               endpoint: String,
863                               canonical_endpoint: String,
864                               auth: Auth| {
865                            mgmtx::mgmt::Management::<C> {
866                                http_client: client,
867                                user_agent: self.http_component.user_agent().to_string(),
868                                endpoint,
869                                canonical_endpoint,
870                                auth,
871                                tracing: self.tracing.clone(),
872                            }
873                            .get_group(&copts)
874                            .await
875                            .map_err(|e| ErrorKind::Mgmt(e).into())
876                        },
877                    )
878                    .await
879            },
880        )
881        .await
882    }
883
884    pub async fn get_all_groups(
885        &self,
886        opts: &GetAllGroupsOptions<'_>,
887    ) -> error::Result<Vec<Group>> {
888        let retry_info = RetryRequest::new("get_all_groups", false);
889        let copts = opts.into();
890
891        orchestrate_retries(
892            self.retry_manager.clone(),
893            opts.retry_strategy.clone(),
894            retry_info,
895            async || {
896                self.http_component
897                    .orchestrate_endpoint(
898                        None,
899                        async |client: Arc<C>,
900                               endpoint_id: String,
901                               endpoint: String,
902                               canonical_endpoint: String,
903                               auth: Auth| {
904                            mgmtx::mgmt::Management::<C> {
905                                http_client: client,
906                                user_agent: self.http_component.user_agent().to_string(),
907                                endpoint,
908                                canonical_endpoint,
909                                auth,
910                                tracing: self.tracing.clone(),
911                            }
912                            .get_all_groups(&copts)
913                            .await
914                            .map_err(|e| ErrorKind::Mgmt(e).into())
915                        },
916                    )
917                    .await
918            },
919        )
920        .await
921    }
922
923    pub async fn upsert_group(&self, opts: &UpsertGroupOptions<'_>) -> error::Result<()> {
924        let retry_info = RetryRequest::new("upsert_group", false);
925        let copts = opts.into();
926
927        orchestrate_retries(
928            self.retry_manager.clone(),
929            opts.retry_strategy.clone(),
930            retry_info,
931            async || {
932                self.http_component
933                    .orchestrate_endpoint(
934                        None,
935                        async |client: Arc<C>,
936                               endpoint_id: String,
937                               endpoint: String,
938                               canonical_endpoint: String,
939                               auth: Auth| {
940                            mgmtx::mgmt::Management::<C> {
941                                http_client: client,
942                                user_agent: self.http_component.user_agent().to_string(),
943                                endpoint,
944                                canonical_endpoint,
945                                auth,
946                                tracing: self.tracing.clone(),
947                            }
948                            .upsert_group(&copts)
949                            .await
950                            .map_err(|e| ErrorKind::Mgmt(e).into())
951                        },
952                    )
953                    .await
954            },
955        )
956        .await
957    }
958
959    pub async fn delete_group(&self, opts: &DeleteGroupOptions<'_>) -> error::Result<()> {
960        let retry_info = RetryRequest::new("delete_group", false);
961        let copts = opts.into();
962
963        orchestrate_retries(
964            self.retry_manager.clone(),
965            opts.retry_strategy.clone(),
966            retry_info,
967            async || {
968                self.http_component
969                    .orchestrate_endpoint(
970                        None,
971                        async |client: Arc<C>,
972                               endpoint_id: String,
973                               endpoint: String,
974                               canonical_endpoint: String,
975                               auth: Auth| {
976                            mgmtx::mgmt::Management::<C> {
977                                http_client: client,
978                                user_agent: self.http_component.user_agent().to_string(),
979                                endpoint,
980                                canonical_endpoint,
981                                auth,
982                                tracing: self.tracing.clone(),
983                            }
984                            .delete_group(&copts)
985                            .await
986                            .map_err(|e| ErrorKind::Mgmt(e).into())
987                        },
988                    )
989                    .await
990            },
991        )
992        .await
993    }
994
995    pub async fn change_password(&self, opts: &ChangePasswordOptions<'_>) -> error::Result<()> {
996        let retry_info = RetryRequest::new("change_password", false);
997        let copts = opts.into();
998
999        orchestrate_retries(
1000            self.retry_manager.clone(),
1001            opts.retry_strategy.clone(),
1002            retry_info,
1003            async || {
1004                self.http_component
1005                    .orchestrate_endpoint(
1006                        None,
1007                        async |client: Arc<C>,
1008                               endpoint_id: String,
1009                               endpoint: String,
1010                               canonical_endpoint: String,
1011                               auth: Auth| {
1012                            mgmtx::mgmt::Management::<C> {
1013                                http_client: client,
1014                                user_agent: self.http_component.user_agent().to_string(),
1015                                endpoint,
1016                                canonical_endpoint,
1017                                auth,
1018                                tracing: self.tracing.clone(),
1019                            }
1020                            .change_password(&copts)
1021                            .await
1022                            .map_err(|e| ErrorKind::Mgmt(e).into())
1023                        },
1024                    )
1025                    .await
1026            },
1027        )
1028        .await
1029    }
1030
1031    pub async fn ensure_user(&self, opts: &EnsureUserOptions<'_>) -> error::Result<()> {
1032        let mut helper = EnsureUserHelper::new(
1033            self.http_component.user_agent(),
1034            opts.username,
1035            opts.auth_domain,
1036            opts.want_missing,
1037            opts.on_behalf_of_info,
1038        );
1039
1040        let backoff = ExponentialBackoffCalculator::new(
1041            Duration::from_millis(100),
1042            Duration::from_millis(1000),
1043            1.5,
1044        );
1045
1046        self.http_component
1047            .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
1048                helper
1049                    .clone()
1050                    .poll(&EnsureUserPollOptions { client, targets })
1051                    .await
1052                    .map_err(error::Error::from)
1053            })
1054            .await
1055    }
1056
1057    pub async fn ensure_group(&self, opts: &EnsureGroupOptions<'_>) -> error::Result<()> {
1058        let mut helper = EnsureGroupHelper::new(
1059            self.http_component.user_agent(),
1060            opts.group_name,
1061            opts.want_missing,
1062            opts.on_behalf_of_info,
1063        );
1064
1065        let backoff = ExponentialBackoffCalculator::new(
1066            Duration::from_millis(100),
1067            Duration::from_millis(1000),
1068            1.5,
1069        );
1070
1071        self.http_component
1072            .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
1073                helper
1074                    .clone()
1075                    .poll(&EnsureGroupPollOptions { client, targets })
1076                    .await
1077                    .map_err(error::Error::from)
1078            })
1079            .await
1080    }
1081
1082    pub async fn get_full_cluster_config(
1083        &self,
1084        opts: &GetFullClusterConfigOptions<'_>,
1085    ) -> error::Result<FullClusterConfig> {
1086        let retry_info = RetryRequest::new("get_full_cluster_config", false);
1087        let copts = opts.into();
1088
1089        orchestrate_retries(
1090            self.retry_manager.clone(),
1091            opts.retry_strategy.clone(),
1092            retry_info,
1093            async || {
1094                self.http_component
1095                    .orchestrate_endpoint(
1096                        None,
1097                        async |client: Arc<C>,
1098                               endpoint_id: String,
1099                               endpoint: String,
1100                               canonical_endpoint: String,
1101                               auth: Auth| {
1102                            mgmtx::mgmt::Management::<C> {
1103                                http_client: client,
1104                                user_agent: self.http_component.user_agent().to_string(),
1105                                endpoint,
1106                                canonical_endpoint,
1107                                auth,
1108                                tracing: Default::default(),
1109                            }
1110                            .get_full_cluster_config(&copts)
1111                            .await
1112                            .map_err(|e| ErrorKind::Mgmt(e).into())
1113                        },
1114                    )
1115                    .await
1116            },
1117        )
1118        .await
1119    }
1120
1121    pub async fn get_full_bucket_config(
1122        &self,
1123        opts: &GetFullBucketConfigOptions<'_>,
1124    ) -> error::Result<FullBucketConfig> {
1125        let retry_info = RetryRequest::new("get_full_bucket_config", false);
1126        let copts = opts.into();
1127
1128        orchestrate_retries(
1129            self.retry_manager.clone(),
1130            opts.retry_strategy.clone(),
1131            retry_info,
1132            async || {
1133                self.http_component
1134                    .orchestrate_endpoint(
1135                        None,
1136                        async |client: Arc<C>,
1137                               endpoint_id: String,
1138                               endpoint: String,
1139                               canonical_endpoint: String,
1140                               auth: Auth| {
1141                            mgmtx::mgmt::Management::<C> {
1142                                http_client: client,
1143                                user_agent: self.http_component.user_agent().to_string(),
1144                                endpoint,
1145                                canonical_endpoint,
1146                                auth,
1147                                tracing: Default::default(),
1148                            }
1149                            .get_full_bucket_config(&copts)
1150                            .await
1151                            .map_err(|e| ErrorKind::Mgmt(e).into())
1152                        },
1153                    )
1154                    .await
1155            },
1156        )
1157        .await
1158    }
1159
1160    pub async fn load_sample_bucket(
1161        &self,
1162        opts: &LoadSampleBucketOptions<'_>,
1163    ) -> error::Result<()> {
1164        let retry_info = RetryRequest::new("load_sample_bucket", false);
1165        let copts = opts.into();
1166
1167        orchestrate_retries(
1168            self.retry_manager.clone(),
1169            opts.retry_strategy.clone(),
1170            retry_info,
1171            async || {
1172                self.http_component
1173                    .orchestrate_endpoint(
1174                        None,
1175                        async |client: Arc<C>,
1176                               endpoint_id: String,
1177                               endpoint: String,
1178                               canonical_endpoint: String,
1179                               auth: Auth| {
1180                            mgmtx::mgmt::Management::<C> {
1181                                http_client: client,
1182                                user_agent: self.http_component.user_agent().to_string(),
1183                                endpoint,
1184                                canonical_endpoint,
1185                                auth,
1186                                tracing: Default::default(),
1187                            }
1188                            .load_sample_bucket(&copts)
1189                            .await
1190                            .map_err(|e| ErrorKind::Mgmt(e).into())
1191                        },
1192                    )
1193                    .await
1194            },
1195        )
1196        .await
1197    }
1198
1199    pub async fn index_status(&self, opts: &IndexStatusOptions<'_>) -> error::Result<IndexStatus> {
1200        let retry_info = RetryRequest::new("index_status", false);
1201        let copts = opts.into();
1202
1203        orchestrate_retries(
1204            self.retry_manager.clone(),
1205            opts.retry_strategy.clone(),
1206            retry_info,
1207            async || {
1208                self.http_component
1209                    .orchestrate_endpoint(
1210                        None,
1211                        async |client: Arc<C>,
1212                               endpoint_id: String,
1213                               endpoint: String,
1214                               canonical_endpoint: String,
1215                               auth: Auth| {
1216                            mgmtx::mgmt::Management::<C> {
1217                                http_client: client,
1218                                user_agent: self.http_component.user_agent().to_string(),
1219                                endpoint,
1220                                canonical_endpoint,
1221                                auth,
1222                                tracing: Default::default(),
1223                            }
1224                            .index_status(&copts)
1225                            .await
1226                            .map_err(|e| ErrorKind::Mgmt(e).into())
1227                        },
1228                    )
1229                    .await
1230            },
1231        )
1232        .await
1233    }
1234
1235    pub async fn get_auto_failover_settings(
1236        &self,
1237        opts: &GetAutoFailoverSettingsOptions<'_>,
1238    ) -> error::Result<AutoFailoverSettings> {
1239        let retry_info = RetryRequest::new("get_auto_failover_settings", false);
1240        let copts = opts.into();
1241
1242        orchestrate_retries(
1243            self.retry_manager.clone(),
1244            opts.retry_strategy.clone(),
1245            retry_info,
1246            async || {
1247                self.http_component
1248                    .orchestrate_endpoint(
1249                        None,
1250                        async |client: Arc<C>,
1251                               endpoint_id: String,
1252                               endpoint: String,
1253                               canonical_endpoint: String,
1254                               auth: Auth| {
1255                            mgmtx::mgmt::Management::<C> {
1256                                http_client: client,
1257                                user_agent: self.http_component.user_agent().to_string(),
1258                                endpoint,
1259                                canonical_endpoint,
1260                                auth,
1261                                tracing: Default::default(),
1262                            }
1263                            .get_auto_failover_settings(&copts)
1264                            .await
1265                            .map_err(|e| ErrorKind::Mgmt(e).into())
1266                        },
1267                    )
1268                    .await
1269            },
1270        )
1271        .await
1272    }
1273
1274    pub async fn get_bucket_stats(
1275        &self,
1276        opts: &GetBucketStatsOptions<'_>,
1277    ) -> error::Result<Box<RawValue>> {
1278        let retry_info = RetryRequest::new("get_bucket_stats", false);
1279        let retry = opts.retry_strategy.clone();
1280        let copts = opts.into();
1281
1282        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
1283            self.http_component
1284                .orchestrate_endpoint(
1285                    None,
1286                    async |client: Arc<C>,
1287                           endpoint_id: String,
1288                           endpoint: String,
1289                           canonical_endpoint: String,
1290                           auth: Auth| {
1291                        mgmtx::mgmt::Management::<C> {
1292                            http_client: client,
1293                            user_agent: self.http_component.user_agent().to_string(),
1294                            endpoint,
1295                            canonical_endpoint,
1296                            auth,
1297                            tracing: Default::default(),
1298                        }
1299                        .get_bucket_stats(&copts)
1300                        .await
1301                        .map_err(|e| ErrorKind::Mgmt(e).into())
1302                    },
1303                )
1304                .await
1305        })
1306        .await
1307    }
1308}