Skip to main content

couchbase_core/
mgmtcomponent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::authenticator::Authenticator;
20use crate::cbconfig::{CollectionManifest, FullBucketConfig, FullClusterConfig};
21use crate::componentconfigs::NetworkAndCanonicalEndpoint;
22use crate::error::ErrorKind;
23use crate::httpcomponent::{HttpComponent, HttpComponentState};
24use crate::httpx::client::Client;
25use crate::httpx::request::Auth;
26use crate::mgmtx::bucket_helper::EnsureBucketHelper;
27use crate::mgmtx::bucket_settings::BucketDef;
28use crate::mgmtx::group_helper::EnsureGroupHelper;
29use crate::mgmtx::manifest_helper::EnsureManifestHelper;
30use crate::mgmtx::mgmt::AutoFailoverSettings;
31use crate::mgmtx::mgmt_query::IndexStatus;
32use crate::mgmtx::node_target::NodeTarget;
33use crate::mgmtx::options::{
34    EnsureBucketPollOptions, EnsureGroupPollOptions, EnsureManifestPollOptions,
35    EnsureUserPollOptions,
36};
37use crate::mgmtx::responses::{
38    CreateCollectionResponse, CreateScopeResponse, DeleteCollectionResponse, DeleteScopeResponse,
39    UpdateCollectionResponse,
40};
41use crate::mgmtx::user::{Group, RoleAndDescription, UserAndMetadata};
42use crate::mgmtx::user_helper::EnsureUserHelper;
43use crate::options::management::{
44    ChangePasswordOptions, CreateBucketOptions, CreateCollectionOptions, CreateScopeOptions,
45    DeleteBucketOptions, DeleteCollectionOptions, DeleteGroupOptions, DeleteScopeOptions,
46    DeleteUserOptions, EnsureBucketOptions, EnsureGroupOptions, EnsureManifestOptions,
47    EnsureUserOptions, FlushBucketOptions, GetAllBucketsOptions, GetAllGroupsOptions,
48    GetAllUsersOptions, GetAutoFailoverSettingsOptions, GetBucketOptions, GetBucketStatsOptions,
49    GetCollectionManifestOptions, GetFullBucketConfigOptions, GetFullClusterConfigOptions,
50    GetGroupOptions, GetRolesOptions, GetUserOptions, IndexStatusOptions, LoadSampleBucketOptions,
51    UpdateBucketOptions, UpdateCollectionOptions, UpsertGroupOptions, UpsertUserOptions,
52};
53use crate::retry::{orchestrate_retries, RetryManager, RetryRequest};
54use crate::retrybesteffort::ExponentialBackoffCalculator;
55use crate::service_type::ServiceType;
56use crate::tracingcomponent::TracingComponent;
57use crate::{error, mgmtx};
58use serde_json::value::RawValue;
59use std::collections::HashMap;
60use std::sync::Arc;
61use std::time::Duration;
62
63pub(crate) struct MgmtComponent<C: Client> {
64    http_component: HttpComponent<C>,
65    tracing: Arc<TracingComponent>,
66
67    retry_manager: Arc<RetryManager>,
68}
69
70pub(crate) struct MgmtComponentConfig {
71    pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
72    pub authenticator: Authenticator,
73}
74
75pub(crate) struct MgmtComponentOptions {
76    pub user_agent: String,
77}
78
79impl<C: Client> MgmtComponent<C> {
80    pub fn new(
81        retry_manager: Arc<RetryManager>,
82        http_client: Arc<C>,
83        tracing: Arc<TracingComponent>,
84        config: MgmtComponentConfig,
85        opts: MgmtComponentOptions,
86    ) -> Self {
87        Self {
88            http_component: HttpComponent::new(
89                ServiceType::MGMT,
90                opts.user_agent,
91                http_client,
92                HttpComponentState::new(config.endpoints, config.authenticator),
93            ),
94            tracing,
95            retry_manager,
96        }
97    }
98
99    pub fn reconfigure(&self, config: MgmtComponentConfig) {
100        self.http_component.reconfigure(HttpComponentState::new(
101            config.endpoints,
102            config.authenticator,
103        ))
104    }
105
106    pub async fn get_collection_manifest(
107        &self,
108        opts: &GetCollectionManifestOptions<'_>,
109    ) -> error::Result<CollectionManifest> {
110        let retry_info = RetryRequest::new("get_collection_manifest", false);
111
112        let copts = opts.into();
113
114        orchestrate_retries(
115            self.retry_manager.clone(),
116            opts.retry_strategy.clone(),
117            retry_info,
118            async || {
119                self.http_component
120                    .orchestrate_endpoint(
121                        None,
122                        async |client: Arc<C>,
123                               endpoint_id: String,
124                               endpoint: String,
125                               canonical_endpoint: String,
126                               auth: Auth| {
127                            let res = match (mgmtx::mgmt::Management::<C> {
128                                http_client: client,
129                                user_agent: self.http_component.user_agent().to_string(),
130                                endpoint,
131                                canonical_endpoint,
132                                auth,
133                                tracing: self.tracing.clone(),
134                            }
135                            .get_collection_manifest(&copts)
136                            .await)
137                            {
138                                Ok(r) => r,
139                                Err(e) => return Err(ErrorKind::Mgmt(e).into()),
140                            };
141
142                            Ok(res)
143                        },
144                    )
145                    .await
146            },
147        )
148        .await
149    }
150
151    pub async fn create_scope(
152        &self,
153        opts: &CreateScopeOptions<'_>,
154    ) -> error::Result<CreateScopeResponse> {
155        let retry_info = RetryRequest::new("create_scope", false);
156
157        let copts = opts.into();
158
159        orchestrate_retries(
160            self.retry_manager.clone(),
161            opts.retry_strategy.clone(),
162            retry_info,
163            async || {
164                self.http_component
165                    .orchestrate_endpoint(
166                        None,
167                        async |client: Arc<C>,
168                               endpoint_id: String,
169                               endpoint: String,
170                               canonical_endpoint: String,
171                               auth: Auth| {
172                            let res = match (mgmtx::mgmt::Management::<C> {
173                                http_client: client,
174                                user_agent: self.http_component.user_agent().to_string(),
175                                endpoint,
176                                canonical_endpoint,
177                                auth,
178                                tracing: self.tracing.clone(),
179                            }
180                            .create_scope(&copts)
181                            .await)
182                            {
183                                Ok(r) => r,
184                                Err(e) => return Err(ErrorKind::Mgmt(e).into()),
185                            };
186
187                            Ok(res)
188                        },
189                    )
190                    .await
191            },
192        )
193        .await
194    }
195
196    pub async fn delete_scope(
197        &self,
198        opts: &DeleteScopeOptions<'_>,
199    ) -> error::Result<DeleteScopeResponse> {
200        let retry_info = RetryRequest::new("delete_scope", false);
201
202        let copts = opts.into();
203
204        orchestrate_retries(
205            self.retry_manager.clone(),
206            opts.retry_strategy.clone(),
207            retry_info,
208            async || {
209                self.http_component
210                    .orchestrate_endpoint(
211                        None,
212                        async |client: Arc<C>,
213                               endpoint_id: String,
214                               endpoint: String,
215                               canonical_endpoint: String,
216                               auth: Auth| {
217                            let res = match (mgmtx::mgmt::Management::<C> {
218                                http_client: client,
219                                user_agent: self.http_component.user_agent().to_string(),
220                                endpoint,
221                                canonical_endpoint,
222                                auth,
223                                tracing: self.tracing.clone(),
224                            }
225                            .delete_scope(&copts)
226                            .await)
227                            {
228                                Ok(r) => r,
229                                Err(e) => return Err(ErrorKind::Mgmt(e).into()),
230                            };
231
232                            Ok(res)
233                        },
234                    )
235                    .await
236            },
237        )
238        .await
239    }
240
241    pub async fn create_collection(
242        &self,
243        opts: &CreateCollectionOptions<'_>,
244    ) -> error::Result<CreateCollectionResponse> {
245        let retry_info = RetryRequest::new("create_collection", false);
246
247        let copts = opts.into();
248
249        orchestrate_retries(
250            self.retry_manager.clone(),
251            opts.retry_strategy.clone(),
252            retry_info,
253            async || {
254                self.http_component
255                    .orchestrate_endpoint(
256                        None,
257                        async |client: Arc<C>,
258                               endpoint_id: String,
259                               endpoint: String,
260                               canonical_endpoint: String,
261                               auth: Auth| {
262                            let res = match (mgmtx::mgmt::Management::<C> {
263                                http_client: client,
264                                user_agent: self.http_component.user_agent().to_string(),
265                                endpoint,
266                                canonical_endpoint,
267                                auth,
268                                tracing: self.tracing.clone(),
269                            }
270                            .create_collection(&copts)
271                            .await)
272                            {
273                                Ok(r) => r,
274                                Err(e) => return Err(ErrorKind::Mgmt(e).into()),
275                            };
276
277                            Ok(res)
278                        },
279                    )
280                    .await
281            },
282        )
283        .await
284    }
285
286    pub async fn delete_collection(
287        &self,
288        opts: &DeleteCollectionOptions<'_>,
289    ) -> error::Result<DeleteCollectionResponse> {
290        let retry_info = RetryRequest::new("delete_collection", false);
291
292        let copts = opts.into();
293
294        orchestrate_retries(
295            self.retry_manager.clone(),
296            opts.retry_strategy.clone(),
297            retry_info,
298            async || {
299                self.http_component
300                    .orchestrate_endpoint(
301                        None,
302                        async |client: Arc<C>,
303                               endpoint_id: String,
304                               endpoint: String,
305                               canonical_endpoint: String,
306                               auth: Auth| {
307                            let res = match (mgmtx::mgmt::Management::<C> {
308                                http_client: client,
309                                user_agent: self.http_component.user_agent().to_string(),
310                                endpoint,
311                                canonical_endpoint,
312                                auth,
313                                tracing: self.tracing.clone(),
314                            }
315                            .delete_collection(&copts)
316                            .await)
317                            {
318                                Ok(r) => r,
319                                Err(e) => return Err(ErrorKind::Mgmt(e).into()),
320                            };
321
322                            Ok(res)
323                        },
324                    )
325                    .await
326            },
327        )
328        .await
329    }
330
331    pub async fn update_collection(
332        &self,
333        opts: &UpdateCollectionOptions<'_>,
334    ) -> error::Result<UpdateCollectionResponse> {
335        let retry_info = RetryRequest::new("update_collection", false);
336
337        let copts = opts.into();
338
339        orchestrate_retries(
340            self.retry_manager.clone(),
341            opts.retry_strategy.clone(),
342            retry_info,
343            async || {
344                self.http_component
345                    .orchestrate_endpoint(
346                        None,
347                        async |client: Arc<C>,
348                               endpoint_id: String,
349                               endpoint: String,
350                               canonical_endpoint: String,
351                               auth: Auth| {
352                            let res = match (mgmtx::mgmt::Management::<C> {
353                                http_client: client,
354                                user_agent: self.http_component.user_agent().to_string(),
355                                endpoint,
356                                canonical_endpoint,
357                                auth,
358                                tracing: self.tracing.clone(),
359                            }
360                            .update_collection(&copts)
361                            .await)
362                            {
363                                Ok(r) => r,
364                                Err(e) => return Err(ErrorKind::Mgmt(e).into()),
365                            };
366
367                            Ok(res)
368                        },
369                    )
370                    .await
371            },
372        )
373        .await
374    }
375
376    pub async fn get_all_buckets(
377        &self,
378        opts: &GetAllBucketsOptions<'_>,
379    ) -> error::Result<Vec<BucketDef>> {
380        let retry_info = RetryRequest::new("get_all_buckets", false);
381
382        let copts = opts.into();
383
384        orchestrate_retries(
385            self.retry_manager.clone(),
386            opts.retry_strategy.clone(),
387            retry_info,
388            async || {
389                self.http_component
390                    .orchestrate_endpoint(
391                        None,
392                        async |client: Arc<C>,
393                               endpoint_id: String,
394                               endpoint: String,
395                               canonical_endpoint: String,
396                               auth: Auth| {
397                            mgmtx::mgmt::Management::<C> {
398                                http_client: client,
399                                user_agent: self.http_component.user_agent().to_string(),
400                                endpoint,
401                                canonical_endpoint,
402                                auth,
403                                tracing: self.tracing.clone(),
404                            }
405                            .get_all_buckets(&copts)
406                            .await
407                            .map_err(|e| ErrorKind::Mgmt(e).into())
408                        },
409                    )
410                    .await
411            },
412        )
413        .await
414    }
415
416    pub async fn get_bucket(&self, opts: &GetBucketOptions<'_>) -> error::Result<BucketDef> {
417        let retry_info = RetryRequest::new("get_bucket", false);
418
419        let copts = opts.into();
420
421        orchestrate_retries(
422            self.retry_manager.clone(),
423            opts.retry_strategy.clone(),
424            retry_info,
425            async || {
426                self.http_component
427                    .orchestrate_endpoint(
428                        None,
429                        async |client: Arc<C>,
430                               endpoint_id: String,
431                               endpoint: String,
432                               canonical_endpoint: String,
433                               auth: Auth| {
434                            mgmtx::mgmt::Management::<C> {
435                                http_client: client,
436                                user_agent: self.http_component.user_agent().to_string(),
437                                endpoint,
438                                canonical_endpoint,
439                                auth,
440                                tracing: self.tracing.clone(),
441                            }
442                            .get_bucket(&copts)
443                            .await
444                            .map_err(|e| ErrorKind::Mgmt(e).into())
445                        },
446                    )
447                    .await
448            },
449        )
450        .await
451    }
452
453    pub async fn create_bucket(&self, opts: &CreateBucketOptions<'_>) -> error::Result<()> {
454        let retry_info = RetryRequest::new("create_bucket", false);
455
456        let copts = opts.into();
457
458        orchestrate_retries(
459            self.retry_manager.clone(),
460            opts.retry_strategy.clone(),
461            retry_info,
462            async || {
463                self.http_component
464                    .orchestrate_endpoint(
465                        None,
466                        async |client: Arc<C>,
467                               endpoint_id: String,
468                               endpoint: String,
469                               canonical_endpoint: String,
470                               auth: Auth| {
471                            mgmtx::mgmt::Management::<C> {
472                                http_client: client,
473                                user_agent: self.http_component.user_agent().to_string(),
474                                endpoint,
475                                canonical_endpoint,
476                                auth,
477                                tracing: self.tracing.clone(),
478                            }
479                            .create_bucket(&copts)
480                            .await
481                            .map_err(|e| ErrorKind::Mgmt(e).into())
482                        },
483                    )
484                    .await
485            },
486        )
487        .await
488    }
489
490    pub async fn update_bucket(&self, opts: &UpdateBucketOptions<'_>) -> error::Result<()> {
491        let retry_info = RetryRequest::new("update_bucket", false);
492
493        let copts = opts.into();
494
495        orchestrate_retries(
496            self.retry_manager.clone(),
497            opts.retry_strategy.clone(),
498            retry_info,
499            async || {
500                self.http_component
501                    .orchestrate_endpoint(
502                        None,
503                        async |client: Arc<C>,
504                               endpoint_id: String,
505                               endpoint: String,
506                               canonical_endpoint: String,
507                               auth: Auth| {
508                            mgmtx::mgmt::Management::<C> {
509                                http_client: client,
510                                user_agent: self.http_component.user_agent().to_string(),
511                                endpoint,
512                                canonical_endpoint,
513                                auth,
514                                tracing: self.tracing.clone(),
515                            }
516                            .update_bucket(&copts)
517                            .await
518                            .map_err(|e| ErrorKind::Mgmt(e).into())
519                        },
520                    )
521                    .await
522            },
523        )
524        .await
525    }
526
527    pub async fn delete_bucket(&self, opts: &DeleteBucketOptions<'_>) -> error::Result<()> {
528        let retry_info = RetryRequest::new("delete_bucket", false);
529
530        let copts = opts.into();
531
532        orchestrate_retries(
533            self.retry_manager.clone(),
534            opts.retry_strategy.clone(),
535            retry_info,
536            async || {
537                self.http_component
538                    .orchestrate_endpoint(
539                        None,
540                        async |client: Arc<C>,
541                               endpoint_id: String,
542                               endpoint: String,
543                               canonical_endpoint: String,
544                               auth: Auth| {
545                            mgmtx::mgmt::Management::<C> {
546                                http_client: client,
547                                user_agent: self.http_component.user_agent().to_string(),
548                                endpoint,
549                                canonical_endpoint,
550                                auth,
551                                tracing: self.tracing.clone(),
552                            }
553                            .delete_bucket(&copts)
554                            .await
555                            .map_err(|e| ErrorKind::Mgmt(e).into())
556                        },
557                    )
558                    .await
559            },
560        )
561        .await
562    }
563
564    pub async fn flush_bucket(&self, opts: &FlushBucketOptions<'_>) -> error::Result<()> {
565        let retry_info = RetryRequest::new("flush_bucket", false);
566
567        let copts = opts.into();
568
569        orchestrate_retries(
570            self.retry_manager.clone(),
571            opts.retry_strategy.clone(),
572            retry_info,
573            async || {
574                self.http_component
575                    .orchestrate_endpoint(
576                        None,
577                        async |client: Arc<C>,
578                               endpoint_id: String,
579                               endpoint: String,
580                               canonical_endpoint: String,
581                               auth: Auth| {
582                            mgmtx::mgmt::Management::<C> {
583                                http_client: client,
584                                user_agent: self.http_component.user_agent().to_string(),
585                                endpoint,
586                                canonical_endpoint,
587                                auth,
588                                tracing: self.tracing.clone(),
589                            }
590                            .flush_bucket(&copts)
591                            .await
592                            .map_err(|e| ErrorKind::Mgmt(e).into())
593                        },
594                    )
595                    .await
596            },
597        )
598        .await
599    }
600
601    pub async fn ensure_manifest(&self, opts: &EnsureManifestOptions<'_>) -> error::Result<()> {
602        let mut helper = EnsureManifestHelper::new(
603            self.http_component.user_agent(),
604            opts.bucket_name,
605            opts.manifest_uid,
606            opts.on_behalf_of_info,
607        );
608
609        let backoff = ExponentialBackoffCalculator::new(
610            Duration::from_millis(100),
611            Duration::from_millis(1000),
612            1.5,
613        );
614
615        self.http_component
616            .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
617                helper
618                    .clone()
619                    .poll(&EnsureManifestPollOptions { client, targets })
620                    .await
621                    .map_err(error::Error::from)
622            })
623            .await
624    }
625
626    pub async fn ensure_bucket(&self, opts: &EnsureBucketOptions<'_>) -> error::Result<()> {
627        let mut helper = EnsureBucketHelper::new(
628            self.http_component.user_agent(),
629            opts.bucket_name,
630            opts.bucket_uuid,
631            opts.want_missing,
632            opts.on_behalf_of_info,
633        );
634
635        let backoff = ExponentialBackoffCalculator::new(
636            Duration::from_millis(100),
637            Duration::from_millis(1000),
638            1.5,
639        );
640
641        self.http_component
642            .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
643                helper
644                    .clone()
645                    .poll(&EnsureBucketPollOptions { client, targets })
646                    .await
647                    .map_err(error::Error::from)
648            })
649            .await
650    }
651
652    pub async fn get_user(&self, opts: &GetUserOptions<'_>) -> error::Result<UserAndMetadata> {
653        let retry_info = RetryRequest::new("get_user", false);
654        let copts = opts.into();
655
656        orchestrate_retries(
657            self.retry_manager.clone(),
658            opts.retry_strategy.clone(),
659            retry_info,
660            async || {
661                self.http_component
662                    .orchestrate_endpoint(
663                        None,
664                        async |client: Arc<C>,
665                               endpoint_id: String,
666                               endpoint: String,
667                               canonical_endpoint: String,
668                               auth: Auth| {
669                            mgmtx::mgmt::Management::<C> {
670                                http_client: client,
671                                user_agent: self.http_component.user_agent().to_string(),
672                                endpoint,
673                                canonical_endpoint,
674                                auth,
675                                tracing: self.tracing.clone(),
676                            }
677                            .get_user(&copts)
678                            .await
679                            .map_err(|e| ErrorKind::Mgmt(e).into())
680                        },
681                    )
682                    .await
683            },
684        )
685        .await
686    }
687
688    pub async fn get_all_users(
689        &self,
690        opts: &GetAllUsersOptions<'_>,
691    ) -> error::Result<Vec<UserAndMetadata>> {
692        let retry_info = RetryRequest::new("get_all_users", false);
693        let copts = opts.into();
694
695        orchestrate_retries(
696            self.retry_manager.clone(),
697            opts.retry_strategy.clone(),
698            retry_info,
699            async || {
700                self.http_component
701                    .orchestrate_endpoint(
702                        None,
703                        async |client: Arc<C>,
704                               endpoint_id: String,
705                               endpoint: String,
706                               canonical_endpoint: String,
707                               auth: Auth| {
708                            mgmtx::mgmt::Management::<C> {
709                                http_client: client,
710                                user_agent: self.http_component.user_agent().to_string(),
711                                endpoint,
712                                canonical_endpoint,
713                                auth,
714                                tracing: self.tracing.clone(),
715                            }
716                            .get_all_users(&copts)
717                            .await
718                            .map_err(|e| ErrorKind::Mgmt(e).into())
719                        },
720                    )
721                    .await
722            },
723        )
724        .await
725    }
726
727    pub async fn upsert_user(&self, opts: &UpsertUserOptions<'_>) -> error::Result<()> {
728        let retry_info = RetryRequest::new("upsert_user", false);
729        let copts = opts.into();
730
731        orchestrate_retries(
732            self.retry_manager.clone(),
733            opts.retry_strategy.clone(),
734            retry_info,
735            async || {
736                self.http_component
737                    .orchestrate_endpoint(
738                        None,
739                        async |client: Arc<C>,
740                               endpoint_id: String,
741                               endpoint: String,
742                               canonical_endpoint: String,
743                               auth: Auth| {
744                            mgmtx::mgmt::Management::<C> {
745                                http_client: client,
746                                user_agent: self.http_component.user_agent().to_string(),
747                                endpoint,
748                                canonical_endpoint,
749                                auth,
750                                tracing: self.tracing.clone(),
751                            }
752                            .upsert_user(&copts)
753                            .await
754                            .map_err(|e| ErrorKind::Mgmt(e).into())
755                        },
756                    )
757                    .await
758            },
759        )
760        .await
761    }
762
763    pub async fn delete_user(&self, opts: &DeleteUserOptions<'_>) -> error::Result<()> {
764        let retry_info = RetryRequest::new("delete_user", false);
765        let copts = opts.into();
766
767        orchestrate_retries(
768            self.retry_manager.clone(),
769            opts.retry_strategy.clone(),
770            retry_info,
771            async || {
772                self.http_component
773                    .orchestrate_endpoint(
774                        None,
775                        async |client: Arc<C>,
776                               endpoint_id: String,
777                               endpoint: String,
778                               canonical_endpoint: String,
779                               auth: Auth| {
780                            mgmtx::mgmt::Management::<C> {
781                                http_client: client,
782                                user_agent: self.http_component.user_agent().to_string(),
783                                endpoint,
784                                canonical_endpoint,
785                                auth,
786                                tracing: self.tracing.clone(),
787                            }
788                            .delete_user(&copts)
789                            .await
790                            .map_err(|e| ErrorKind::Mgmt(e).into())
791                        },
792                    )
793                    .await
794            },
795        )
796        .await
797    }
798
799    pub async fn get_roles(
800        &self,
801        opts: &GetRolesOptions<'_>,
802    ) -> error::Result<Vec<RoleAndDescription>> {
803        let retry_info = RetryRequest::new("get_roles", false);
804        let copts = opts.into();
805
806        orchestrate_retries(
807            self.retry_manager.clone(),
808            opts.retry_strategy.clone(),
809            retry_info,
810            async || {
811                self.http_component
812                    .orchestrate_endpoint(
813                        None,
814                        async |client: Arc<C>,
815                               endpoint_id: String,
816                               endpoint: String,
817                               canonical_endpoint: String,
818                               auth: Auth| {
819                            mgmtx::mgmt::Management::<C> {
820                                http_client: client,
821                                user_agent: self.http_component.user_agent().to_string(),
822                                endpoint,
823                                canonical_endpoint,
824                                auth,
825                                tracing: self.tracing.clone(),
826                            }
827                            .get_roles(&copts)
828                            .await
829                            .map_err(|e| ErrorKind::Mgmt(e).into())
830                        },
831                    )
832                    .await
833            },
834        )
835        .await
836    }
837
838    pub async fn get_group(&self, opts: &GetGroupOptions<'_>) -> error::Result<Group> {
839        let retry_info = RetryRequest::new("get_group", false);
840        let copts = opts.into();
841
842        orchestrate_retries(
843            self.retry_manager.clone(),
844            opts.retry_strategy.clone(),
845            retry_info,
846            async || {
847                self.http_component
848                    .orchestrate_endpoint(
849                        None,
850                        async |client: Arc<C>,
851                               endpoint_id: String,
852                               endpoint: String,
853                               canonical_endpoint: String,
854                               auth: Auth| {
855                            mgmtx::mgmt::Management::<C> {
856                                http_client: client,
857                                user_agent: self.http_component.user_agent().to_string(),
858                                endpoint,
859                                canonical_endpoint,
860                                auth,
861                                tracing: self.tracing.clone(),
862                            }
863                            .get_group(&copts)
864                            .await
865                            .map_err(|e| ErrorKind::Mgmt(e).into())
866                        },
867                    )
868                    .await
869            },
870        )
871        .await
872    }
873
874    pub async fn get_all_groups(
875        &self,
876        opts: &GetAllGroupsOptions<'_>,
877    ) -> error::Result<Vec<Group>> {
878        let retry_info = RetryRequest::new("get_all_groups", false);
879        let copts = opts.into();
880
881        orchestrate_retries(
882            self.retry_manager.clone(),
883            opts.retry_strategy.clone(),
884            retry_info,
885            async || {
886                self.http_component
887                    .orchestrate_endpoint(
888                        None,
889                        async |client: Arc<C>,
890                               endpoint_id: String,
891                               endpoint: String,
892                               canonical_endpoint: String,
893                               auth: Auth| {
894                            mgmtx::mgmt::Management::<C> {
895                                http_client: client,
896                                user_agent: self.http_component.user_agent().to_string(),
897                                endpoint,
898                                canonical_endpoint,
899                                auth,
900                                tracing: self.tracing.clone(),
901                            }
902                            .get_all_groups(&copts)
903                            .await
904                            .map_err(|e| ErrorKind::Mgmt(e).into())
905                        },
906                    )
907                    .await
908            },
909        )
910        .await
911    }
912
913    pub async fn upsert_group(&self, opts: &UpsertGroupOptions<'_>) -> error::Result<()> {
914        let retry_info = RetryRequest::new("upsert_group", false);
915        let copts = opts.into();
916
917        orchestrate_retries(
918            self.retry_manager.clone(),
919            opts.retry_strategy.clone(),
920            retry_info,
921            async || {
922                self.http_component
923                    .orchestrate_endpoint(
924                        None,
925                        async |client: Arc<C>,
926                               endpoint_id: String,
927                               endpoint: String,
928                               canonical_endpoint: String,
929                               auth: Auth| {
930                            mgmtx::mgmt::Management::<C> {
931                                http_client: client,
932                                user_agent: self.http_component.user_agent().to_string(),
933                                endpoint,
934                                canonical_endpoint,
935                                auth,
936                                tracing: self.tracing.clone(),
937                            }
938                            .upsert_group(&copts)
939                            .await
940                            .map_err(|e| ErrorKind::Mgmt(e).into())
941                        },
942                    )
943                    .await
944            },
945        )
946        .await
947    }
948
949    pub async fn delete_group(&self, opts: &DeleteGroupOptions<'_>) -> error::Result<()> {
950        let retry_info = RetryRequest::new("delete_group", false);
951        let copts = opts.into();
952
953        orchestrate_retries(
954            self.retry_manager.clone(),
955            opts.retry_strategy.clone(),
956            retry_info,
957            async || {
958                self.http_component
959                    .orchestrate_endpoint(
960                        None,
961                        async |client: Arc<C>,
962                               endpoint_id: String,
963                               endpoint: String,
964                               canonical_endpoint: String,
965                               auth: Auth| {
966                            mgmtx::mgmt::Management::<C> {
967                                http_client: client,
968                                user_agent: self.http_component.user_agent().to_string(),
969                                endpoint,
970                                canonical_endpoint,
971                                auth,
972                                tracing: self.tracing.clone(),
973                            }
974                            .delete_group(&copts)
975                            .await
976                            .map_err(|e| ErrorKind::Mgmt(e).into())
977                        },
978                    )
979                    .await
980            },
981        )
982        .await
983    }
984
985    pub async fn change_password(&self, opts: &ChangePasswordOptions<'_>) -> error::Result<()> {
986        let retry_info = RetryRequest::new("change_password", false);
987        let copts = opts.into();
988
989        orchestrate_retries(
990            self.retry_manager.clone(),
991            opts.retry_strategy.clone(),
992            retry_info,
993            async || {
994                self.http_component
995                    .orchestrate_endpoint(
996                        None,
997                        async |client: Arc<C>,
998                               endpoint_id: String,
999                               endpoint: String,
1000                               canonical_endpoint: String,
1001                               auth: Auth| {
1002                            mgmtx::mgmt::Management::<C> {
1003                                http_client: client,
1004                                user_agent: self.http_component.user_agent().to_string(),
1005                                endpoint,
1006                                canonical_endpoint,
1007                                auth,
1008                                tracing: self.tracing.clone(),
1009                            }
1010                            .change_password(&copts)
1011                            .await
1012                            .map_err(|e| ErrorKind::Mgmt(e).into())
1013                        },
1014                    )
1015                    .await
1016            },
1017        )
1018        .await
1019    }
1020
1021    pub async fn ensure_user(&self, opts: &EnsureUserOptions<'_>) -> error::Result<()> {
1022        let mut helper = EnsureUserHelper::new(
1023            self.http_component.user_agent(),
1024            opts.username,
1025            opts.auth_domain,
1026            opts.want_missing,
1027            opts.on_behalf_of_info,
1028        );
1029
1030        let backoff = ExponentialBackoffCalculator::new(
1031            Duration::from_millis(100),
1032            Duration::from_millis(1000),
1033            1.5,
1034        );
1035
1036        self.http_component
1037            .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
1038                helper
1039                    .clone()
1040                    .poll(&EnsureUserPollOptions { client, targets })
1041                    .await
1042                    .map_err(error::Error::from)
1043            })
1044            .await
1045    }
1046
1047    pub async fn ensure_group(&self, opts: &EnsureGroupOptions<'_>) -> error::Result<()> {
1048        let mut helper = EnsureGroupHelper::new(
1049            self.http_component.user_agent(),
1050            opts.group_name,
1051            opts.want_missing,
1052            opts.on_behalf_of_info,
1053        );
1054
1055        let backoff = ExponentialBackoffCalculator::new(
1056            Duration::from_millis(100),
1057            Duration::from_millis(1000),
1058            1.5,
1059        );
1060
1061        self.http_component
1062            .ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
1063                helper
1064                    .clone()
1065                    .poll(&EnsureGroupPollOptions { client, targets })
1066                    .await
1067                    .map_err(error::Error::from)
1068            })
1069            .await
1070    }
1071
1072    pub async fn get_full_cluster_config(
1073        &self,
1074        opts: &GetFullClusterConfigOptions<'_>,
1075    ) -> error::Result<FullClusterConfig> {
1076        let retry_info = RetryRequest::new("get_full_cluster_config", false);
1077        let copts = opts.into();
1078
1079        orchestrate_retries(
1080            self.retry_manager.clone(),
1081            opts.retry_strategy.clone(),
1082            retry_info,
1083            async || {
1084                self.http_component
1085                    .orchestrate_endpoint(
1086                        None,
1087                        async |client: Arc<C>,
1088                               endpoint_id: String,
1089                               endpoint: String,
1090                               canonical_endpoint: String,
1091                               auth: Auth| {
1092                            mgmtx::mgmt::Management::<C> {
1093                                http_client: client,
1094                                user_agent: self.http_component.user_agent().to_string(),
1095                                endpoint,
1096                                canonical_endpoint,
1097                                auth,
1098                                tracing: Default::default(),
1099                            }
1100                            .get_full_cluster_config(&copts)
1101                            .await
1102                            .map_err(|e| ErrorKind::Mgmt(e).into())
1103                        },
1104                    )
1105                    .await
1106            },
1107        )
1108        .await
1109    }
1110
1111    pub async fn get_full_bucket_config(
1112        &self,
1113        opts: &GetFullBucketConfigOptions<'_>,
1114    ) -> error::Result<FullBucketConfig> {
1115        let retry_info = RetryRequest::new("get_full_bucket_config", false);
1116        let copts = opts.into();
1117
1118        orchestrate_retries(
1119            self.retry_manager.clone(),
1120            opts.retry_strategy.clone(),
1121            retry_info,
1122            async || {
1123                self.http_component
1124                    .orchestrate_endpoint(
1125                        None,
1126                        async |client: Arc<C>,
1127                               endpoint_id: String,
1128                               endpoint: String,
1129                               canonical_endpoint: String,
1130                               auth: Auth| {
1131                            mgmtx::mgmt::Management::<C> {
1132                                http_client: client,
1133                                user_agent: self.http_component.user_agent().to_string(),
1134                                endpoint,
1135                                canonical_endpoint,
1136                                auth,
1137                                tracing: Default::default(),
1138                            }
1139                            .get_full_bucket_config(&copts)
1140                            .await
1141                            .map_err(|e| ErrorKind::Mgmt(e).into())
1142                        },
1143                    )
1144                    .await
1145            },
1146        )
1147        .await
1148    }
1149
1150    pub async fn load_sample_bucket(
1151        &self,
1152        opts: &LoadSampleBucketOptions<'_>,
1153    ) -> error::Result<()> {
1154        let retry_info = RetryRequest::new("load_sample_bucket", false);
1155        let copts = opts.into();
1156
1157        orchestrate_retries(
1158            self.retry_manager.clone(),
1159            opts.retry_strategy.clone(),
1160            retry_info,
1161            async || {
1162                self.http_component
1163                    .orchestrate_endpoint(
1164                        None,
1165                        async |client: Arc<C>,
1166                               endpoint_id: String,
1167                               endpoint: String,
1168                               canonical_endpoint: String,
1169                               auth: Auth| {
1170                            mgmtx::mgmt::Management::<C> {
1171                                http_client: client,
1172                                user_agent: self.http_component.user_agent().to_string(),
1173                                endpoint,
1174                                canonical_endpoint,
1175                                auth,
1176                                tracing: Default::default(),
1177                            }
1178                            .load_sample_bucket(&copts)
1179                            .await
1180                            .map_err(|e| ErrorKind::Mgmt(e).into())
1181                        },
1182                    )
1183                    .await
1184            },
1185        )
1186        .await
1187    }
1188
1189    pub async fn index_status(&self, opts: &IndexStatusOptions<'_>) -> error::Result<IndexStatus> {
1190        let retry_info = RetryRequest::new("index_status", false);
1191        let copts = opts.into();
1192
1193        orchestrate_retries(
1194            self.retry_manager.clone(),
1195            opts.retry_strategy.clone(),
1196            retry_info,
1197            async || {
1198                self.http_component
1199                    .orchestrate_endpoint(
1200                        None,
1201                        async |client: Arc<C>,
1202                               endpoint_id: String,
1203                               endpoint: String,
1204                               canonical_endpoint: String,
1205                               auth: Auth| {
1206                            mgmtx::mgmt::Management::<C> {
1207                                http_client: client,
1208                                user_agent: self.http_component.user_agent().to_string(),
1209                                endpoint,
1210                                canonical_endpoint,
1211                                auth,
1212                                tracing: Default::default(),
1213                            }
1214                            .index_status(&copts)
1215                            .await
1216                            .map_err(|e| ErrorKind::Mgmt(e).into())
1217                        },
1218                    )
1219                    .await
1220            },
1221        )
1222        .await
1223    }
1224
1225    pub async fn get_auto_failover_settings(
1226        &self,
1227        opts: &GetAutoFailoverSettingsOptions<'_>,
1228    ) -> error::Result<AutoFailoverSettings> {
1229        let retry_info = RetryRequest::new("get_auto_failover_settings", false);
1230        let copts = opts.into();
1231
1232        orchestrate_retries(
1233            self.retry_manager.clone(),
1234            opts.retry_strategy.clone(),
1235            retry_info,
1236            async || {
1237                self.http_component
1238                    .orchestrate_endpoint(
1239                        None,
1240                        async |client: Arc<C>,
1241                               endpoint_id: String,
1242                               endpoint: String,
1243                               canonical_endpoint: String,
1244                               auth: Auth| {
1245                            mgmtx::mgmt::Management::<C> {
1246                                http_client: client,
1247                                user_agent: self.http_component.user_agent().to_string(),
1248                                endpoint,
1249                                canonical_endpoint,
1250                                auth,
1251                                tracing: Default::default(),
1252                            }
1253                            .get_auto_failover_settings(&copts)
1254                            .await
1255                            .map_err(|e| ErrorKind::Mgmt(e).into())
1256                        },
1257                    )
1258                    .await
1259            },
1260        )
1261        .await
1262    }
1263
1264    pub async fn get_bucket_stats(
1265        &self,
1266        opts: &GetBucketStatsOptions<'_>,
1267    ) -> error::Result<Box<RawValue>> {
1268        let retry_info = RetryRequest::new("get_bucket_stats", false);
1269        let retry = opts.retry_strategy.clone();
1270        let copts = opts.into();
1271
1272        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
1273            self.http_component
1274                .orchestrate_endpoint(
1275                    None,
1276                    async |client: Arc<C>,
1277                           endpoint_id: String,
1278                           endpoint: String,
1279                           canonical_endpoint: String,
1280                           auth: Auth| {
1281                        mgmtx::mgmt::Management::<C> {
1282                            http_client: client,
1283                            user_agent: self.http_component.user_agent().to_string(),
1284                            endpoint,
1285                            canonical_endpoint,
1286                            auth,
1287                            tracing: Default::default(),
1288                        }
1289                        .get_bucket_stats(&copts)
1290                        .await
1291                        .map_err(|e| ErrorKind::Mgmt(e).into())
1292                    },
1293                )
1294                .await
1295        })
1296        .await
1297    }
1298}