Skip to main content

rc_s3/
admin.rs

1//! Admin API client implementation
2//!
3//! This module provides the AdminClient that implements the AdminApi trait
4//! using HTTP requests with AWS SigV4 signing.
5
6use async_trait::async_trait;
7use aws_credential_types::Credentials;
8use aws_sigv4::http_request::{
9    SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
10};
11use aws_sigv4::sign::v4;
12use rc_core::admin::{
13    AdminApi, BucketQuota, ClusterInfo, CreateServiceAccountRequest, Group, GroupStatus,
14    HealScanMode, HealStartRequest, HealStatus, Policy, PolicyEntity, PolicyInfo, PoolStatus,
15    PoolTarget, RebalanceStartResult, RebalanceStatus, ServiceAccount,
16    ServiceAccountCreateResponse, UpdateGroupMembersRequest, User, UserStatus,
17};
18use rc_core::{Alias, Error, Result};
19use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
20use reqwest::{Client, Method, StatusCode};
21use serde::{Deserialize, Serialize};
22use sha2::{Digest, Sha256};
23use std::collections::HashMap;
24use std::time::SystemTime;
25
26/// Admin API client for RustFS/MinIO-compatible servers
27pub struct AdminClient {
28    http_client: Client,
29    endpoint: String,
30    access_key: String,
31    secret_key: String,
32    region: String,
33}
34
35impl AdminClient {
36    /// Create a new AdminClient from an Alias
37    pub fn new(alias: &Alias) -> Result<Self> {
38        let mut builder = Client::builder()
39            .danger_accept_invalid_certs(alias.insecure)
40            .tls_built_in_native_certs(true)
41            .tls_built_in_webpki_certs(true);
42
43        if let Some(bundle_path) = alias.ca_bundle.as_deref() {
44            let pem = std::fs::read(bundle_path).map_err(|e| {
45                Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
46            })?;
47            let certs = reqwest::Certificate::from_pem_bundle(&pem)
48                .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
49            if certs.is_empty() {
50                return Err(Error::Network(format!(
51                    "Invalid CA bundle '{bundle_path}': no certificates found"
52                )));
53            }
54            for cert in certs {
55                builder = builder.add_root_certificate(cert);
56            }
57        }
58
59        let http_client = builder
60            .build()
61            .map_err(|e| Error::Network(format!("Failed to create HTTP client: {e}")))?;
62
63        Ok(Self {
64            http_client,
65            endpoint: alias.endpoint.trim_end_matches('/').to_string(),
66            access_key: alias.access_key.clone(),
67            secret_key: alias.secret_key.clone(),
68            region: alias.region.clone(),
69        })
70    }
71
72    /// Build the base URL for admin API
73    fn admin_url(&self, path: &str) -> String {
74        format!("{}/rustfs/admin/v3{}", self.endpoint, path)
75    }
76
77    /// Calculate SHA256 hash of the body
78    fn sha256_hash(body: &[u8]) -> String {
79        let mut hasher = Sha256::new();
80        hasher.update(body);
81        hex::encode(hasher.finalize())
82    }
83
84    /// Sign a request using AWS SigV4
85    async fn sign_request(
86        &self,
87        method: &Method,
88        url: &str,
89        headers: &HeaderMap,
90        body: &[u8],
91    ) -> Result<HeaderMap> {
92        let credentials = Credentials::new(
93            &self.access_key,
94            &self.secret_key,
95            None,
96            None,
97            "admin-client",
98        );
99
100        let identity = credentials.into();
101        let mut signing_settings = SigningSettings::default();
102        signing_settings.signature_location = SignatureLocation::Headers;
103
104        let signing_params = v4::SigningParams::builder()
105            .identity(&identity)
106            .region(&self.region)
107            .name("s3")
108            .time(SystemTime::now())
109            .settings(signing_settings)
110            .build()
111            .map_err(|e| Error::Auth(format!("Failed to build signing params: {e}")))?;
112
113        // Convert headers to a vec of tuples
114        let header_pairs: Vec<(&str, &str)> = headers
115            .iter()
116            .filter_map(|(k, v)| v.to_str().ok().map(|v| (k.as_str(), v)))
117            .collect();
118
119        let signable_body = SignableBody::Bytes(body);
120
121        let signable_request = SignableRequest::new(
122            method.as_str(),
123            url,
124            header_pairs.into_iter(),
125            signable_body,
126        )
127        .map_err(|e| Error::Auth(format!("Failed to create signable request: {e}")))?;
128
129        let (signing_instructions, _signature) = sign(signable_request, &signing_params.into())
130            .map_err(|e| Error::Auth(format!("Failed to sign request: {e}")))?
131            .into_parts();
132
133        // Apply signing instructions to create new headers
134        let mut signed_headers = headers.clone();
135        for (name, value) in signing_instructions.headers() {
136            let header_name = HeaderName::try_from(&name.to_string())
137                .map_err(|e| Error::Auth(format!("Invalid header name: {e}")))?;
138            let header_value = HeaderValue::try_from(&value.to_string())
139                .map_err(|e| Error::Auth(format!("Invalid header value: {e}")))?;
140            signed_headers.insert(header_name, header_value);
141        }
142
143        Ok(signed_headers)
144    }
145
146    /// Make a signed request to the admin API
147    async fn request<T: for<'de> Deserialize<'de>>(
148        &self,
149        method: Method,
150        path: &str,
151        query: Option<&[(&str, &str)]>,
152        body: Option<&[u8]>,
153    ) -> Result<T> {
154        let mut url = self.admin_url(path);
155
156        if let Some(q) = query {
157            let query_string: String = q
158                .iter()
159                .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
160                .collect::<Vec<_>>()
161                .join("&");
162            if !query_string.is_empty() {
163                url.push('?');
164                url.push_str(&query_string);
165            }
166        }
167
168        let body_bytes = body.unwrap_or(&[]);
169        let content_hash = Self::sha256_hash(body_bytes);
170
171        let mut headers = HeaderMap::new();
172        headers.insert("x-amz-content-sha256", content_hash.parse().unwrap());
173        headers.insert("host", self.get_host().parse().unwrap());
174
175        if !body_bytes.is_empty() {
176            headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
177        }
178
179        let signed_headers = self
180            .sign_request(&method, &url, &headers, body_bytes)
181            .await?;
182
183        let mut request_builder = self.http_client.request(method.clone(), &url);
184
185        for (name, value) in signed_headers.iter() {
186            request_builder = request_builder.header(name, value);
187        }
188
189        if !body_bytes.is_empty() {
190            request_builder = request_builder.body(body_bytes.to_vec());
191        }
192
193        let response = request_builder
194            .send()
195            .await
196            .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
197
198        let status = response.status();
199
200        if !status.is_success() {
201            let error_body = response
202                .text()
203                .await
204                .unwrap_or_else(|_| "Unknown error".to_string());
205            return Err(self.map_error(status, &error_body));
206        }
207
208        let text = response
209            .text()
210            .await
211            .map_err(|e| Error::Network(format!("Failed to read response: {e}")))?;
212
213        if text.is_empty() {
214            // Return empty/default for empty responses
215            serde_json::from_str("null").map_err(Error::Json)
216        } else {
217            serde_json::from_str(&text).map_err(Error::Json)
218        }
219    }
220
221    /// Make a signed request that returns no body
222    async fn request_no_response(
223        &self,
224        method: Method,
225        path: &str,
226        query: Option<&[(&str, &str)]>,
227        body: Option<&[u8]>,
228    ) -> Result<()> {
229        let mut url = self.admin_url(path);
230
231        if let Some(q) = query {
232            let query_string: String = q
233                .iter()
234                .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
235                .collect::<Vec<_>>()
236                .join("&");
237            if !query_string.is_empty() {
238                url.push('?');
239                url.push_str(&query_string);
240            }
241        }
242
243        let body_bytes = body.unwrap_or(&[]);
244        let content_hash = Self::sha256_hash(body_bytes);
245
246        let mut headers = HeaderMap::new();
247        headers.insert("x-amz-content-sha256", content_hash.parse().unwrap());
248        headers.insert("host", self.get_host().parse().unwrap());
249
250        if !body_bytes.is_empty() {
251            headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
252        }
253
254        let signed_headers = self
255            .sign_request(&method, &url, &headers, body_bytes)
256            .await?;
257
258        let mut request_builder = self.http_client.request(method.clone(), &url);
259
260        for (name, value) in signed_headers.iter() {
261            request_builder = request_builder.header(name, value);
262        }
263
264        if !body_bytes.is_empty() {
265            request_builder = request_builder.body(body_bytes.to_vec());
266        }
267
268        let response = request_builder
269            .send()
270            .await
271            .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
272
273        let status = response.status();
274
275        if !status.is_success() {
276            let error_body = response
277                .text()
278                .await
279                .unwrap_or_else(|_| "Unknown error".to_string());
280            return Err(self.map_error(status, &error_body));
281        }
282
283        Ok(())
284    }
285
286    /// Extract host from endpoint
287    fn get_host(&self) -> String {
288        self.endpoint
289            .trim_start_matches("http://")
290            .trim_start_matches("https://")
291            .to_string()
292    }
293
294    /// Map HTTP status codes to appropriate errors
295    fn map_error(&self, status: StatusCode, body: &str) -> Error {
296        match status {
297            StatusCode::NOT_FOUND => Error::NotFound(body.to_string()),
298            StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => Error::Auth(body.to_string()),
299            StatusCode::CONFLICT => Error::Conflict(body.to_string()),
300            StatusCode::BAD_REQUEST => Error::General(format!("Bad request: {body}")),
301            _ => Error::Network(format!("HTTP {}: {}", status.as_u16(), body)),
302        }
303    }
304}
305
306/// Response wrapper for user list
307#[derive(Debug, Deserialize)]
308struct UserListResponse(HashMap<String, UserInfo>);
309
310#[derive(Debug, Deserialize)]
311#[serde(rename_all = "camelCase")]
312struct UserInfo {
313    #[serde(default)]
314    status: String,
315    #[serde(default)]
316    policy_name: Option<String>,
317    #[serde(default)]
318    member_of: Option<Vec<String>>,
319}
320
321/// Response wrapper for policy list
322#[derive(Debug, Deserialize)]
323struct PolicyListResponse(HashMap<String, serde_json::Value>);
324
325/// Request body for creating a user
326#[derive(Debug, Serialize)]
327#[serde(rename_all = "camelCase")]
328struct CreateUserRequest {
329    secret_key: String,
330    status: String,
331}
332
333/// Request body for creating a group
334#[derive(Debug, Serialize)]
335struct CreateGroupRequest {
336    group: String,
337    #[serde(skip_serializing_if = "Option::is_none")]
338    members: Option<Vec<String>>,
339}
340
341/// Response for service account list
342#[derive(Debug, Deserialize)]
343struct ServiceAccountListResponse {
344    accounts: Option<Vec<ServiceAccountInfo>>,
345}
346
347#[derive(Debug, Deserialize)]
348#[serde(rename_all = "camelCase")]
349struct ServiceAccountInfo {
350    access_key: String,
351    #[serde(default)]
352    parent_user: Option<String>,
353    #[serde(default)]
354    account_status: Option<String>,
355    #[serde(default)]
356    expiration: Option<String>,
357    #[serde(default)]
358    name: Option<String>,
359    #[serde(default)]
360    description: Option<String>,
361    #[serde(default)]
362    implied_policy: Option<bool>,
363}
364
365#[derive(Debug, Deserialize)]
366#[serde(rename_all = "camelCase")]
367struct BackgroundHealStatusResponse {
368    #[serde(default)]
369    bitrot_start_time: Option<String>,
370}
371
372impl From<BackgroundHealStatusResponse> for HealStatus {
373    fn from(response: BackgroundHealStatusResponse) -> Self {
374        Self {
375            healing: response.bitrot_start_time.is_some(),
376            started: response.bitrot_start_time,
377            ..Default::default()
378        }
379    }
380}
381
382#[derive(Debug, Serialize)]
383struct RustfsHealOptions {
384    recursive: bool,
385    #[serde(rename = "dryRun")]
386    dry_run: bool,
387    remove: bool,
388    recreate: bool,
389    #[serde(rename = "scanMode")]
390    scan_mode: u8,
391    #[serde(rename = "updateParity")]
392    update_parity: bool,
393    #[serde(rename = "nolock")]
394    no_lock: bool,
395}
396
397impl From<&HealStartRequest> for RustfsHealOptions {
398    fn from(request: &HealStartRequest) -> Self {
399        Self {
400            recursive: false,
401            dry_run: request.dry_run,
402            remove: request.remove,
403            recreate: request.recreate,
404            scan_mode: rustfs_heal_scan_mode(request.scan_mode),
405            update_parity: false,
406            no_lock: false,
407        }
408    }
409}
410
411fn rustfs_heal_scan_mode(scan_mode: HealScanMode) -> u8 {
412    match scan_mode {
413        HealScanMode::Normal => 1,
414        HealScanMode::Deep => 2,
415    }
416}
417
418fn rustfs_heal_path(request: &HealStartRequest) -> Result<String> {
419    let bucket = request
420        .bucket
421        .as_deref()
422        .filter(|bucket| !bucket.is_empty());
423    let prefix = request
424        .prefix
425        .as_deref()
426        .filter(|prefix| !prefix.is_empty());
427
428    match (bucket, prefix) {
429        (None, None) => Ok("/heal/".to_string()),
430        (Some(bucket), None) => Ok(format!("/heal/{}", urlencoding::encode(bucket))),
431        (Some(bucket), Some(prefix)) => Ok(format!(
432            "/heal/{}/{}",
433            urlencoding::encode(bucket),
434            urlencoding::encode(prefix)
435        )),
436        (None, Some(_)) => Err(Error::InvalidPath(
437            "heal prefix requires a bucket target".to_string(),
438        )),
439    }
440}
441
442fn rustfs_heal_body(request: &HealStartRequest) -> Result<Vec<u8>> {
443    serde_json::to_vec(&RustfsHealOptions::from(request)).map_err(Error::Json)
444}
445
446fn pool_target_query(target: &PoolTarget) -> Vec<(&str, &str)> {
447    let mut query = vec![("pool", target.pool.as_str())];
448    if target.by_id {
449        query.push(("by-id", "true"));
450    }
451    query
452}
453
454/// Request body for setting bucket quota
455#[derive(Debug, Serialize)]
456#[serde(rename_all = "camelCase")]
457struct SetBucketQuotaApiRequest {
458    quota: u64,
459    quota_type: String,
460}
461
462#[async_trait]
463impl AdminApi for AdminClient {
464    // ==================== Cluster Operations ====================
465
466    async fn cluster_info(&self) -> Result<ClusterInfo> {
467        self.request(Method::GET, "/info", None, None).await
468    }
469
470    async fn heal_status(&self) -> Result<HealStatus> {
471        let response: BackgroundHealStatusResponse = self
472            .request(Method::POST, "/background-heal/status", None, None)
473            .await?;
474        Ok(response.into())
475    }
476
477    async fn heal_start(&self, request: HealStartRequest) -> Result<HealStatus> {
478        let path = rustfs_heal_path(&request)?;
479        let body = rustfs_heal_body(&request)?;
480        self.request_no_response(Method::POST, &path, None, Some(&body))
481            .await?;
482        Ok(HealStatus::default())
483    }
484
485    async fn heal_stop(&self) -> Result<()> {
486        let body = rustfs_heal_body(&HealStartRequest::default())?;
487        self.request_no_response(
488            Method::POST,
489            "/heal/",
490            Some(&[("forceStop", "true")]),
491            Some(&body),
492        )
493        .await
494    }
495
496    async fn list_pools(&self) -> Result<Vec<PoolStatus>> {
497        self.request(Method::GET, "/pools/list", None, None).await
498    }
499
500    async fn pool_status(&self, target: PoolTarget) -> Result<PoolStatus> {
501        let query = pool_target_query(&target);
502        self.request(Method::GET, "/pools/status", Some(&query), None)
503            .await
504    }
505
506    async fn decommission_start(&self, target: PoolTarget) -> Result<()> {
507        let query = pool_target_query(&target);
508        self.request_no_response(Method::POST, "/pools/decommission", Some(&query), None)
509            .await
510    }
511
512    async fn decommission_cancel(&self, target: PoolTarget) -> Result<()> {
513        let query = pool_target_query(&target);
514        self.request_no_response(Method::POST, "/pools/cancel", Some(&query), None)
515            .await
516    }
517
518    async fn rebalance_start(&self) -> Result<RebalanceStartResult> {
519        self.request(Method::POST, "/rebalance/start", None, None)
520            .await
521    }
522
523    async fn rebalance_status(&self) -> Result<RebalanceStatus> {
524        self.request(Method::GET, "/rebalance/status", None, None)
525            .await
526    }
527
528    async fn rebalance_stop(&self) -> Result<()> {
529        self.request_no_response(Method::POST, "/rebalance/stop", None, None)
530            .await
531    }
532
533    // ==================== User Operations ====================
534
535    async fn list_users(&self) -> Result<Vec<User>> {
536        let response: UserListResponse =
537            self.request(Method::GET, "/list-users", None, None).await?;
538
539        Ok(response
540            .0
541            .into_iter()
542            .map(|(access_key, info)| User {
543                access_key,
544                secret_key: None,
545                status: if info.status == "disabled" {
546                    UserStatus::Disabled
547                } else {
548                    UserStatus::Enabled
549                },
550                policy_name: info.policy_name,
551                member_of: info.member_of.unwrap_or_default(),
552            })
553            .collect())
554    }
555
556    async fn get_user(&self, access_key: &str) -> Result<User> {
557        let query = [("accessKey", access_key)];
558        let response: UserInfo = self
559            .request(Method::GET, "/user-info", Some(&query), None)
560            .await?;
561
562        Ok(User {
563            access_key: access_key.to_string(),
564            secret_key: None,
565            status: if response.status == "disabled" {
566                UserStatus::Disabled
567            } else {
568                UserStatus::Enabled
569            },
570            policy_name: response.policy_name,
571            member_of: response.member_of.unwrap_or_default(),
572        })
573    }
574
575    async fn create_user(&self, access_key: &str, secret_key: &str) -> Result<User> {
576        let query = [("accessKey", access_key)];
577        let body = serde_json::to_vec(&CreateUserRequest {
578            secret_key: secret_key.to_string(),
579            status: "enabled".to_string(),
580        })
581        .map_err(Error::Json)?;
582
583        self.request_no_response(Method::PUT, "/add-user", Some(&query), Some(&body))
584            .await?;
585
586        Ok(User {
587            access_key: access_key.to_string(),
588            secret_key: Some(secret_key.to_string()),
589            status: UserStatus::Enabled,
590            policy_name: None,
591            member_of: vec![],
592        })
593    }
594
595    async fn delete_user(&self, access_key: &str) -> Result<()> {
596        let query = [("accessKey", access_key)];
597        self.request_no_response(Method::DELETE, "/remove-user", Some(&query), None)
598            .await
599    }
600
601    async fn set_user_status(&self, access_key: &str, status: UserStatus) -> Result<()> {
602        let status_str = match status {
603            UserStatus::Enabled => "enabled",
604            UserStatus::Disabled => "disabled",
605        };
606        let query = [("accessKey", access_key), ("status", status_str)];
607        self.request_no_response(Method::PUT, "/set-user-status", Some(&query), None)
608            .await
609    }
610
611    // ==================== Policy Operations ====================
612
613    async fn list_policies(&self) -> Result<Vec<PolicyInfo>> {
614        let response: PolicyListResponse = self
615            .request(Method::GET, "/list-canned-policies", None, None)
616            .await?;
617
618        Ok(response
619            .0
620            .into_keys()
621            .map(|name| PolicyInfo { name })
622            .collect())
623    }
624
625    async fn get_policy(&self, name: &str) -> Result<Policy> {
626        let query = [("name", name)];
627        let policy_doc: serde_json::Value = self
628            .request(Method::GET, "/info-canned-policy", Some(&query), None)
629            .await?;
630
631        Ok(Policy {
632            name: name.to_string(),
633            policy: serde_json::to_string_pretty(&policy_doc).unwrap_or_default(),
634        })
635    }
636
637    async fn create_policy(&self, name: &str, policy_document: &str) -> Result<()> {
638        let query = [("name", name)];
639        let body = policy_document.as_bytes();
640        self.request_no_response(Method::PUT, "/add-canned-policy", Some(&query), Some(body))
641            .await
642    }
643
644    async fn delete_policy(&self, name: &str) -> Result<()> {
645        let query = [("name", name)];
646        self.request_no_response(Method::DELETE, "/remove-canned-policy", Some(&query), None)
647            .await
648    }
649
650    async fn attach_policy(
651        &self,
652        policy_names: &[String],
653        entity_type: PolicyEntity,
654        entity_name: &str,
655    ) -> Result<()> {
656        let policy_name = policy_names.join(",");
657        let is_group = entity_type == PolicyEntity::Group;
658
659        let query = [
660            ("policyName", policy_name.as_str()),
661            ("userOrGroup", entity_name),
662            ("isGroup", if is_group { "true" } else { "false" }),
663        ];
664
665        self.request_no_response(Method::PUT, "/set-user-or-group-policy", Some(&query), None)
666            .await
667    }
668
669    async fn detach_policy(
670        &self,
671        policy_names: &[String],
672        entity_type: PolicyEntity,
673        entity_name: &str,
674    ) -> Result<()> {
675        // Detach by setting empty policy
676        // In RustFS/MinIO, you typically set a new policy which replaces the old one
677        // For detach, we need to get current policies and remove the specified ones
678        let _ = (policy_names, entity_type, entity_name);
679        Err(Error::UnsupportedFeature(
680            "Policy detach not directly supported. Use attach with remaining policies instead."
681                .to_string(),
682        ))
683    }
684
685    // ==================== Group Operations ====================
686
687    async fn list_groups(&self) -> Result<Vec<String>> {
688        let response: Vec<String> = self.request(Method::GET, "/groups", None, None).await?;
689        Ok(response)
690    }
691
692    async fn get_group(&self, name: &str) -> Result<Group> {
693        let query = [("group", name)];
694        let response: Group = self
695            .request(Method::GET, "/group", Some(&query), None)
696            .await?;
697        Ok(response)
698    }
699
700    async fn create_group(&self, name: &str, members: Option<&[String]>) -> Result<Group> {
701        let body = serde_json::to_vec(&CreateGroupRequest {
702            group: name.to_string(),
703            members: members.map(|m| m.to_vec()),
704        })
705        .map_err(Error::Json)?;
706
707        self.request_no_response(Method::POST, "/groups", None, Some(&body))
708            .await?;
709
710        Ok(Group {
711            name: name.to_string(),
712            policy: None,
713            members: members.map(|m| m.to_vec()).unwrap_or_default(),
714            status: GroupStatus::Enabled,
715        })
716    }
717
718    async fn delete_group(&self, name: &str) -> Result<()> {
719        let path = format!("/group/{}", urlencoding::encode(name));
720        self.request_no_response(Method::DELETE, &path, None, None)
721            .await
722    }
723
724    async fn set_group_status(&self, name: &str, status: GroupStatus) -> Result<()> {
725        let status_str = match status {
726            GroupStatus::Enabled => "enabled",
727            GroupStatus::Disabled => "disabled",
728        };
729        let query = [("group", name), ("status", status_str)];
730        self.request_no_response(Method::PUT, "/set-group-status", Some(&query), None)
731            .await
732    }
733
734    async fn add_group_members(&self, group: &str, members: &[String]) -> Result<()> {
735        let body = serde_json::to_vec(&UpdateGroupMembersRequest {
736            group: group.to_string(),
737            members: members.to_vec(),
738            is_remove: false,
739            status: "enabled".to_string(),
740        })
741        .map_err(Error::Json)?;
742
743        self.request_no_response(Method::PUT, "/update-group-members", None, Some(&body))
744            .await
745    }
746
747    async fn remove_group_members(&self, group: &str, members: &[String]) -> Result<()> {
748        let body = serde_json::to_vec(&UpdateGroupMembersRequest {
749            group: group.to_string(),
750            members: members.to_vec(),
751            is_remove: true,
752            status: "enabled".to_string(),
753        })
754        .map_err(Error::Json)?;
755
756        self.request_no_response(Method::PUT, "/update-group-members", None, Some(&body))
757            .await
758    }
759
760    // ==================== Service Account Operations ====================
761
762    async fn list_service_accounts(&self, user: Option<&str>) -> Result<Vec<ServiceAccount>> {
763        let query: Vec<(&str, &str)> = user.map(|u| vec![("user", u)]).unwrap_or_default();
764        let query_ref: Option<&[(&str, &str)]> = if query.is_empty() { None } else { Some(&query) };
765
766        let response: ServiceAccountListResponse = self
767            .request(Method::GET, "/list-service-accounts", query_ref, None)
768            .await?;
769
770        Ok(response
771            .accounts
772            .unwrap_or_default()
773            .into_iter()
774            .map(|sa| ServiceAccount {
775                access_key: sa.access_key,
776                secret_key: None,
777                parent_user: sa.parent_user,
778                policy: None,
779                account_status: sa.account_status,
780                expiration: sa.expiration,
781                name: sa.name,
782                description: sa.description,
783                implied_policy: sa.implied_policy,
784            })
785            .collect())
786    }
787
788    async fn get_service_account(&self, access_key: &str) -> Result<ServiceAccount> {
789        let query = [("accessKey", access_key)];
790        let response: ServiceAccount = self
791            .request(Method::GET, "/info-service-account", Some(&query), None)
792            .await?;
793
794        let mut response = response;
795        if response.access_key.is_empty() {
796            response.access_key = access_key.to_string();
797        }
798        Ok(response)
799    }
800
801    async fn create_service_account(
802        &self,
803        request: CreateServiceAccountRequest,
804    ) -> Result<ServiceAccount> {
805        let body = serde_json::to_vec(&request).map_err(Error::Json)?;
806        let response: ServiceAccountCreateResponse = self
807            .request(Method::PUT, "/add-service-accounts", None, Some(&body))
808            .await?;
809
810        Ok(ServiceAccount {
811            access_key: response.credentials.access_key,
812            secret_key: Some(response.credentials.secret_key),
813            expiration: response.credentials.expiration,
814            parent_user: None,
815            policy: None,
816            account_status: None,
817            name: None,
818            description: None,
819            implied_policy: None,
820        })
821    }
822
823    async fn delete_service_account(&self, access_key: &str) -> Result<()> {
824        let query = [("accessKey", access_key)];
825        self.request_no_response(
826            Method::DELETE,
827            "/delete-service-accounts",
828            Some(&query),
829            None,
830        )
831        .await
832    }
833
834    // ==================== Bucket Quota Operations ====================
835
836    async fn set_bucket_quota(&self, bucket: &str, quota: u64) -> Result<BucketQuota> {
837        let path = format!("/quota/{}", urlencoding::encode(bucket));
838        let body = serde_json::to_vec(&SetBucketQuotaApiRequest {
839            quota,
840            quota_type: "HARD".to_string(),
841        })
842        .map_err(Error::Json)?;
843
844        self.request(Method::PUT, &path, None, Some(&body)).await
845    }
846
847    async fn get_bucket_quota(&self, bucket: &str) -> Result<BucketQuota> {
848        let path = format!("/quota/{}", urlencoding::encode(bucket));
849        self.request(Method::GET, &path, None, None).await
850    }
851
852    async fn clear_bucket_quota(&self, bucket: &str) -> Result<BucketQuota> {
853        let path = format!("/quota/{}", urlencoding::encode(bucket));
854        self.request(Method::DELETE, &path, None, None).await
855    }
856
857    // ==================== Tier Operations ====================
858
859    async fn list_tiers(&self) -> Result<Vec<rc_core::admin::TierConfig>> {
860        self.request(Method::GET, "/tier", None, None).await
861    }
862
863    async fn tier_stats(&self) -> Result<serde_json::Value> {
864        self.request(Method::GET, "/tier-stats", None, None).await
865    }
866
867    async fn add_tier(&self, config: rc_core::admin::TierConfig) -> Result<()> {
868        let body = serde_json::to_vec(&config).map_err(Error::Json)?;
869        self.request_no_response(Method::PUT, "/tier", None, Some(&body))
870            .await
871    }
872
873    async fn edit_tier(&self, name: &str, creds: rc_core::admin::TierCreds) -> Result<()> {
874        let path = format!("/tier/{}", urlencoding::encode(name));
875        let body = serde_json::to_vec(&creds).map_err(Error::Json)?;
876        self.request_no_response(Method::POST, &path, None, Some(&body))
877            .await
878    }
879
880    async fn remove_tier(&self, name: &str, force: bool) -> Result<()> {
881        let path = format!("/tier/{}", urlencoding::encode(name));
882        if force {
883            let query: &[(&str, &str)] = &[("force", "true")];
884            self.request_no_response(Method::DELETE, &path, Some(query), None)
885                .await
886        } else {
887            self.request_no_response(Method::DELETE, &path, None, None)
888                .await
889        }
890    }
891
892    // ==================== Replication Target Operations ====================
893
894    async fn set_remote_target(
895        &self,
896        bucket: &str,
897        target: rc_core::replication::BucketTarget,
898        update: bool,
899    ) -> Result<String> {
900        let body = serde_json::to_vec(&target).map_err(Error::Json)?;
901        if update {
902            let query: &[(&str, &str)] = &[("bucket", bucket), ("update", "true")];
903            self.request(Method::PUT, "/set-remote-target", Some(query), Some(&body))
904                .await
905        } else {
906            let query: &[(&str, &str)] = &[("bucket", bucket)];
907            self.request(Method::PUT, "/set-remote-target", Some(query), Some(&body))
908                .await
909        }
910    }
911
912    async fn list_remote_targets(
913        &self,
914        bucket: &str,
915    ) -> Result<Vec<rc_core::replication::BucketTarget>> {
916        let query: &[(&str, &str)] = &[("bucket", bucket)];
917        self.request(Method::GET, "/list-remote-targets", Some(query), None)
918            .await
919    }
920
921    async fn remove_remote_target(&self, bucket: &str, arn: &str) -> Result<()> {
922        let query: &[(&str, &str)] = &[("bucket", bucket), ("arn", arn)];
923        self.request_no_response(Method::DELETE, "/remove-remote-target", Some(query), None)
924            .await
925    }
926
927    async fn replication_metrics(&self, bucket: &str) -> Result<serde_json::Value> {
928        let query: &[(&str, &str)] = &[("bucket", bucket)];
929        self.request(Method::GET, "/replicationmetrics", Some(query), None)
930            .await
931    }
932}
933
934#[cfg(test)]
935mod tests {
936    use super::*;
937    use std::io::{Read, Write};
938    use std::net::{TcpListener, TcpStream};
939    use std::sync::mpsc;
940    use std::thread;
941    use tempfile::tempdir;
942
943    #[derive(Debug)]
944    struct CapturedAdminRequest {
945        method: String,
946        target: String,
947        body: Vec<u8>,
948    }
949
950    fn read_admin_request(stream: &mut TcpStream) -> CapturedAdminRequest {
951        let mut buffer = Vec::new();
952        let mut chunk = [0_u8; 1024];
953        let header_end = loop {
954            let read = stream.read(&mut chunk).expect("read HTTP request");
955            assert!(read > 0, "client closed connection before headers");
956            buffer.extend_from_slice(&chunk[..read]);
957
958            if let Some(position) = buffer.windows(4).position(|window| window == b"\r\n\r\n") {
959                break position + 4;
960            }
961        };
962
963        let headers = String::from_utf8_lossy(&buffer[..header_end]).into_owned();
964        let content_length = headers
965            .lines()
966            .find_map(|line| {
967                let (name, value) = line.split_once(':')?;
968                name.eq_ignore_ascii_case("content-length")
969                    .then(|| value.trim().parse::<usize>().expect("valid content length"))
970            })
971            .unwrap_or(0);
972
973        while buffer.len() - header_end < content_length {
974            let read = stream.read(&mut chunk).expect("read HTTP request body");
975            assert!(read > 0, "client closed connection before body");
976            buffer.extend_from_slice(&chunk[..read]);
977        }
978
979        let request_line = headers.lines().next().expect("request line");
980        let mut parts = request_line.split_whitespace();
981        let method = parts.next().expect("request method").to_string();
982        let target = parts.next().expect("request target").to_string();
983        let body = buffer[header_end..header_end + content_length].to_vec();
984
985        CapturedAdminRequest {
986            method,
987            target,
988            body,
989        }
990    }
991
992    fn start_admin_test_server(
993        response_status: &str,
994        response_body: &'static str,
995    ) -> (
996        String,
997        mpsc::Receiver<CapturedAdminRequest>,
998        thread::JoinHandle<()>,
999    ) {
1000        let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
1001        let endpoint = format!("http://{}", listener.local_addr().expect("local addr"));
1002        let (sender, receiver) = mpsc::channel();
1003        let response_status = response_status.to_string();
1004
1005        let handle = thread::spawn(move || {
1006            let (mut stream, _) = listener.accept().expect("accept request");
1007            let request = read_admin_request(&mut stream);
1008            sender.send(request).expect("send captured request");
1009
1010            let response = format!(
1011                "HTTP/1.1 {response_status}\r\ncontent-length: {}\r\ncontent-type: application/json\r\nconnection: close\r\n\r\n{response_body}",
1012                response_body.len()
1013            );
1014            stream
1015                .write_all(response.as_bytes())
1016                .expect("write HTTP response");
1017        });
1018
1019        (endpoint, receiver, handle)
1020    }
1021
1022    fn admin_client_for_endpoint(endpoint: &str) -> AdminClient {
1023        let alias = Alias::new("test", endpoint, "access", "secret");
1024        AdminClient::new(&alias).expect("admin client should build")
1025    }
1026
1027    fn assert_heal_options_body(
1028        body: &[u8],
1029        scan_mode: u8,
1030        remove: bool,
1031        recreate: bool,
1032        dry_run: bool,
1033    ) {
1034        let value: serde_json::Value =
1035            serde_json::from_slice(body).expect("heal request body should be JSON");
1036
1037        assert_eq!(value["recursive"], false);
1038        assert_eq!(value["dryRun"], dry_run);
1039        assert_eq!(value["remove"], remove);
1040        assert_eq!(value["recreate"], recreate);
1041        assert_eq!(value["scanMode"], scan_mode);
1042        assert_eq!(value["updateParity"], false);
1043        assert_eq!(value["nolock"], false);
1044        assert!(value.get("bucket").is_none());
1045        assert!(value.get("prefix").is_none());
1046    }
1047
1048    #[test]
1049    fn test_admin_url_construction() {
1050        let alias = Alias::new("test", "http://localhost:9000", "access", "secret");
1051        let client = AdminClient::new(&alias).unwrap();
1052
1053        assert_eq!(
1054            client.admin_url("/list-users"),
1055            "http://localhost:9000/rustfs/admin/v3/list-users"
1056        );
1057    }
1058
1059    #[test]
1060    fn test_admin_url_with_trailing_slash() {
1061        let alias = Alias::new("test", "http://localhost:9000/", "access", "secret");
1062        let client = AdminClient::new(&alias).unwrap();
1063
1064        assert_eq!(
1065            client.admin_url("/list-users"),
1066            "http://localhost:9000/rustfs/admin/v3/list-users"
1067        );
1068    }
1069
1070    #[test]
1071    fn test_get_host() {
1072        let alias = Alias::new("test", "https://s3.example.com", "access", "secret");
1073        let client = AdminClient::new(&alias).unwrap();
1074
1075        assert_eq!(client.get_host(), "s3.example.com");
1076    }
1077
1078    #[test]
1079    fn test_sha256_hash() {
1080        let hash = AdminClient::sha256_hash(b"test");
1081        assert_eq!(
1082            hash,
1083            "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
1084        );
1085    }
1086
1087    #[test]
1088    fn test_sha256_hash_empty() {
1089        let hash = AdminClient::sha256_hash(b"");
1090        assert_eq!(
1091            hash,
1092            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
1093        );
1094    }
1095
1096    #[test]
1097    fn test_rustfs_heal_path_matches_admin_routes() {
1098        assert_eq!(
1099            rustfs_heal_path(&HealStartRequest::default()).expect("root path"),
1100            "/heal/"
1101        );
1102
1103        let bucket_request = HealStartRequest {
1104            bucket: Some("photos".to_string()),
1105            ..Default::default()
1106        };
1107        assert_eq!(
1108            rustfs_heal_path(&bucket_request).expect("bucket path"),
1109            "/heal/photos"
1110        );
1111
1112        let prefix_request = HealStartRequest {
1113            bucket: Some("photos".to_string()),
1114            prefix: Some("2026/raw".to_string()),
1115            ..Default::default()
1116        };
1117        assert_eq!(
1118            rustfs_heal_path(&prefix_request).expect("prefix path"),
1119            "/heal/photos/2026%2Fraw"
1120        );
1121
1122        let invalid_request = HealStartRequest {
1123            prefix: Some("2026/raw".to_string()),
1124            ..Default::default()
1125        };
1126        assert!(matches!(
1127            rustfs_heal_path(&invalid_request),
1128            Err(Error::InvalidPath(_))
1129        ));
1130    }
1131
1132    #[test]
1133    fn test_rustfs_heal_body_matches_server_heal_options() {
1134        let request = HealStartRequest {
1135            scan_mode: HealScanMode::Deep,
1136            remove: true,
1137            recreate: true,
1138            dry_run: true,
1139            ..Default::default()
1140        };
1141
1142        let body = rustfs_heal_body(&request).expect("heal options should serialize");
1143        let value: serde_json::Value =
1144            serde_json::from_slice(&body).expect("heal options body should be JSON");
1145
1146        assert_eq!(value["recursive"], false);
1147        assert_eq!(value["dryRun"], true);
1148        assert_eq!(value["remove"], true);
1149        assert_eq!(value["recreate"], true);
1150        assert_eq!(value["scanMode"], 2);
1151        assert_eq!(value["updateParity"], false);
1152        assert_eq!(value["nolock"], false);
1153        assert!(value.get("bucket").is_none());
1154        assert!(value.get("prefix").is_none());
1155    }
1156
1157    #[test]
1158    fn test_background_heal_status_response_maps_to_heal_status() {
1159        let status = HealStatus::from(BackgroundHealStatusResponse {
1160            bitrot_start_time: Some("2026-04-19T10:00:00Z".to_string()),
1161        });
1162
1163        assert!(status.healing);
1164        assert_eq!(status.started.as_deref(), Some("2026-04-19T10:00:00Z"));
1165
1166        let idle = HealStatus::from(BackgroundHealStatusResponse {
1167            bitrot_start_time: None,
1168        });
1169        assert!(!idle.healing);
1170        assert!(idle.started.is_none());
1171    }
1172
1173    #[test]
1174    fn test_bad_request_maps_to_general_admin_error() {
1175        let alias = Alias::new("test", "http://localhost:9000", "access", "secret");
1176        let client = AdminClient::new(&alias).expect("admin client should build");
1177
1178        let error = client.map_error(StatusCode::BAD_REQUEST, "err request body parse");
1179        assert!(matches!(error, Error::General(_)));
1180        assert_eq!(error.to_string(), "Bad request: err request body parse");
1181    }
1182
1183    #[tokio::test]
1184    async fn test_heal_status_uses_background_heal_status_endpoint() {
1185        let (endpoint, receiver, handle) =
1186            start_admin_test_server("200 OK", r#"{"bitrotStartTime":"2026-04-19T10:00:00Z"}"#);
1187        let client = admin_client_for_endpoint(&endpoint);
1188
1189        let status = client.heal_status().await.expect("heal status request");
1190
1191        assert!(status.healing);
1192        assert_eq!(status.started.as_deref(), Some("2026-04-19T10:00:00Z"));
1193
1194        let request = receiver.recv().expect("captured request");
1195        assert_eq!(request.method, "POST");
1196        assert_eq!(request.target, "/rustfs/admin/v3/background-heal/status");
1197        assert!(request.body.is_empty());
1198        handle.join().expect("server thread should finish");
1199    }
1200
1201    #[tokio::test]
1202    async fn test_heal_start_posts_to_bucket_prefix_route_with_options_body() {
1203        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1204        let client = admin_client_for_endpoint(&endpoint);
1205        let request = HealStartRequest {
1206            bucket: Some("raw photos".to_string()),
1207            prefix: Some("2026/april".to_string()),
1208            scan_mode: HealScanMode::Deep,
1209            remove: true,
1210            recreate: true,
1211            dry_run: true,
1212        };
1213
1214        let status = client
1215            .heal_start(request)
1216            .await
1217            .expect("heal start request");
1218
1219        assert!(!status.healing);
1220        assert!(status.heal_id.is_empty());
1221        assert!(status.started.is_none());
1222
1223        let request = receiver.recv().expect("captured request");
1224        assert_eq!(request.method, "POST");
1225        assert_eq!(
1226            request.target,
1227            "/rustfs/admin/v3/heal/raw%20photos/2026%2Fapril"
1228        );
1229        assert_heal_options_body(&request.body, 2, true, true, true);
1230        handle.join().expect("server thread should finish");
1231    }
1232
1233    #[tokio::test]
1234    async fn test_heal_stop_posts_force_stop_to_root_heal_route() {
1235        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1236        let client = admin_client_for_endpoint(&endpoint);
1237
1238        client.heal_stop().await.expect("heal stop request");
1239
1240        let request = receiver.recv().expect("captured request");
1241        assert_eq!(request.method, "POST");
1242        assert_eq!(request.target, "/rustfs/admin/v3/heal/?forceStop=true");
1243        assert_heal_options_body(&request.body, 1, false, false, false);
1244        handle.join().expect("server thread should finish");
1245    }
1246
1247    #[tokio::test]
1248    async fn test_pool_status_uses_pool_status_route_with_by_id_query() {
1249        let (endpoint, receiver, handle) = start_admin_test_server(
1250            "200 OK",
1251            r#"{"id":1,"cmdline":"/data/pool1/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}"#,
1252        );
1253        let client = admin_client_for_endpoint(&endpoint);
1254
1255        let status = client
1256            .pool_status(PoolTarget {
1257                pool: "1".to_string(),
1258                by_id: true,
1259            })
1260            .await
1261            .expect("pool status request");
1262
1263        assert_eq!(status.id, 1);
1264        assert_eq!(status.cmd_line, "/data/pool1/disk{1...4}");
1265
1266        let request = receiver.recv().expect("captured request");
1267        assert_eq!(request.method, "GET");
1268        assert_eq!(
1269            request.target,
1270            "/rustfs/admin/v3/pools/status?pool=1&by-id=true"
1271        );
1272        assert!(request.body.is_empty());
1273        handle.join().expect("server thread should finish");
1274    }
1275
1276    #[tokio::test]
1277    async fn test_pool_status_uses_command_line_query_without_by_id() {
1278        let (endpoint, receiver, handle) = start_admin_test_server(
1279            "200 OK",
1280            r#"{"id":2,"cmdline":"/data/pool2/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}"#,
1281        );
1282        let client = admin_client_for_endpoint(&endpoint);
1283
1284        let status = client
1285            .pool_status(PoolTarget {
1286                pool: "/data/pool2/disk{1...4}".to_string(),
1287                by_id: false,
1288            })
1289            .await
1290            .expect("pool status request");
1291
1292        assert_eq!(status.id, 2);
1293        assert_eq!(status.cmd_line, "/data/pool2/disk{1...4}");
1294
1295        let request = receiver.recv().expect("captured request");
1296        assert_eq!(request.method, "GET");
1297        assert_eq!(
1298            request.target,
1299            "/rustfs/admin/v3/pools/status?pool=%2Fdata%2Fpool2%2Fdisk%7B1...4%7D"
1300        );
1301        assert!(request.body.is_empty());
1302        handle.join().expect("server thread should finish");
1303    }
1304
1305    #[tokio::test]
1306    async fn test_list_pools_uses_pool_list_route() {
1307        let (endpoint, receiver, handle) = start_admin_test_server(
1308            "200 OK",
1309            r#"[{"id":0,"cmdline":"/data/pool0/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}]"#,
1310        );
1311        let client = admin_client_for_endpoint(&endpoint);
1312
1313        let pools = client.list_pools().await.expect("list pools request");
1314
1315        assert_eq!(pools.len(), 1);
1316        assert_eq!(pools[0].id, 0);
1317        assert_eq!(pools[0].cmd_line, "/data/pool0/disk{1...4}");
1318
1319        let request = receiver.recv().expect("captured request");
1320        assert_eq!(request.method, "GET");
1321        assert_eq!(request.target, "/rustfs/admin/v3/pools/list");
1322        assert!(request.body.is_empty());
1323        handle.join().expect("server thread should finish");
1324    }
1325
1326    #[tokio::test]
1327    async fn test_decommission_start_posts_pool_query() {
1328        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1329        let client = admin_client_for_endpoint(&endpoint);
1330
1331        client
1332            .decommission_start(PoolTarget {
1333                pool: "/data/pool1/disk{1...4}".to_string(),
1334                by_id: false,
1335            })
1336            .await
1337            .expect("decommission start request");
1338
1339        let request = receiver.recv().expect("captured request");
1340        assert_eq!(request.method, "POST");
1341        assert_eq!(
1342            request.target,
1343            "/rustfs/admin/v3/pools/decommission?pool=%2Fdata%2Fpool1%2Fdisk%7B1...4%7D"
1344        );
1345        assert!(request.body.is_empty());
1346        handle.join().expect("server thread should finish");
1347    }
1348
1349    #[tokio::test]
1350    async fn test_decommission_start_posts_by_id_multi_pool_query() {
1351        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1352        let client = admin_client_for_endpoint(&endpoint);
1353
1354        client
1355            .decommission_start(PoolTarget {
1356                pool: "1,2".to_string(),
1357                by_id: true,
1358            })
1359            .await
1360            .expect("decommission start request");
1361
1362        let request = receiver.recv().expect("captured request");
1363        assert_eq!(request.method, "POST");
1364        assert_eq!(
1365            request.target,
1366            "/rustfs/admin/v3/pools/decommission?pool=1%2C2&by-id=true"
1367        );
1368        assert!(request.body.is_empty());
1369        handle.join().expect("server thread should finish");
1370    }
1371
1372    #[tokio::test]
1373    async fn test_decommission_cancel_posts_pool_cancel_route_with_by_id_query() {
1374        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1375        let client = admin_client_for_endpoint(&endpoint);
1376
1377        client
1378            .decommission_cancel(PoolTarget {
1379                pool: "1".to_string(),
1380                by_id: true,
1381            })
1382            .await
1383            .expect("decommission cancel request");
1384
1385        let request = receiver.recv().expect("captured request");
1386        assert_eq!(request.method, "POST");
1387        assert_eq!(
1388            request.target,
1389            "/rustfs/admin/v3/pools/cancel?pool=1&by-id=true"
1390        );
1391        assert!(request.body.is_empty());
1392        handle.join().expect("server thread should finish");
1393    }
1394
1395    #[tokio::test]
1396    async fn test_rebalance_start_posts_rebalance_start_route() {
1397        let (endpoint, receiver, handle) =
1398            start_admin_test_server("200 OK", r#"{"id":"rebalance-123"}"#);
1399        let client = admin_client_for_endpoint(&endpoint);
1400
1401        let result = client
1402            .rebalance_start()
1403            .await
1404            .expect("rebalance start request");
1405
1406        assert_eq!(result.id, "rebalance-123");
1407
1408        let request = receiver.recv().expect("captured request");
1409        assert_eq!(request.method, "POST");
1410        assert_eq!(request.target, "/rustfs/admin/v3/rebalance/start");
1411        assert!(request.body.is_empty());
1412        handle.join().expect("server thread should finish");
1413    }
1414
1415    #[tokio::test]
1416    async fn test_rebalance_status_gets_rebalance_status_route() {
1417        let (endpoint, receiver, handle) = start_admin_test_server(
1418            "200 OK",
1419            r#"{"id":"rebalance-123","pools":[],"stoppedAt":"2026-05-06T00:00:00Z"}"#,
1420        );
1421        let client = admin_client_for_endpoint(&endpoint);
1422
1423        let status = client
1424            .rebalance_status()
1425            .await
1426            .expect("rebalance status request");
1427
1428        assert_eq!(status.id, "rebalance-123");
1429        assert_eq!(status.stopped_at.as_deref(), Some("2026-05-06T00:00:00Z"));
1430
1431        let request = receiver.recv().expect("captured request");
1432        assert_eq!(request.method, "GET");
1433        assert_eq!(request.target, "/rustfs/admin/v3/rebalance/status");
1434        assert!(request.body.is_empty());
1435        handle.join().expect("server thread should finish");
1436    }
1437
1438    #[tokio::test]
1439    async fn test_rebalance_stop_posts_rebalance_stop_route() {
1440        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1441        let client = admin_client_for_endpoint(&endpoint);
1442
1443        client
1444            .rebalance_stop()
1445            .await
1446            .expect("rebalance stop request");
1447
1448        let request = receiver.recv().expect("captured request");
1449        assert_eq!(request.method, "POST");
1450        assert_eq!(request.target, "/rustfs/admin/v3/rebalance/stop");
1451        assert!(request.body.is_empty());
1452        handle.join().expect("server thread should finish");
1453    }
1454
1455    #[test]
1456    fn test_admin_client_invalid_ca_bundle_path_surfaces_error() {
1457        let mut alias = Alias::new("test", "https://localhost:9000", "access", "secret");
1458        alias.ca_bundle = Some("/definitely-not-here/ca.pem".to_string());
1459
1460        let result = AdminClient::new(&alias);
1461        match result {
1462            Err(Error::Network(msg)) => {
1463                assert!(
1464                    msg.contains("Failed to read CA bundle"),
1465                    "Unexpected error message: {msg}"
1466                );
1467            }
1468            Ok(_) => panic!("Expected Error::Network for invalid path, got Ok(_)"),
1469            Err(e) => panic!("Expected Error::Network for invalid path, got Err({e})"),
1470        }
1471    }
1472
1473    #[test]
1474    fn test_admin_client_invalid_ca_bundle_pem_surfaces_error() {
1475        let temp_dir = tempdir().expect("create temp dir");
1476        let bad_pem_path = temp_dir.path().join("bad-ca.pem");
1477        std::fs::write(
1478            &bad_pem_path,
1479            b"-----BEGIN CERTIFICATE-----\ninvalid-base64\n-----END CERTIFICATE-----\n",
1480        )
1481        .expect("write invalid PEM");
1482
1483        let mut alias = Alias::new("test", "https://localhost:9000", "access", "secret");
1484        alias.ca_bundle = Some(bad_pem_path.display().to_string());
1485
1486        let result = AdminClient::new(&alias);
1487        match result {
1488            Err(Error::Network(msg)) => {
1489                assert!(
1490                    msg.contains("Invalid CA bundle") && msg.contains("bad-ca.pem"),
1491                    "Unexpected error message for invalid PEM CA bundle: {msg}"
1492                );
1493            }
1494            Ok(_) => panic!("Expected Error::Network for invalid PEM, got Ok(_)"),
1495            Err(e) => panic!("Expected Error::Network for invalid PEM, got Err({e})"),
1496        }
1497    }
1498}