Skip to main content

rc_s3/
admin.rs

1//! Admin API client implementation
2//!
3//! This module provides the AdminClient that implements the AdminApi trait
4//! using HTTP requests with AWS SigV4 signing.
5
6use async_trait::async_trait;
7use aws_credential_types::Credentials;
8use aws_sigv4::http_request::{
9    SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
10};
11use aws_sigv4::sign::v4;
12use rc_core::admin::{
13    AdminApi, BucketQuota, ClusterInfo, CreateServiceAccountRequest, Group, GroupStatus,
14    HealScanMode, HealStartRequest, HealStatus, Policy, PolicyEntity, PolicyInfo, PoolStatus,
15    PoolTarget, RebalanceStartResult, RebalanceStatus, ServiceAccount,
16    ServiceAccountCreateResponse, UpdateGroupMembersRequest, User, UserStatus,
17};
18use rc_core::{Alias, Error, Result};
19use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
20use reqwest::{Client, Method, StatusCode};
21use serde::{Deserialize, Serialize};
22use sha2::{Digest, Sha256};
23use std::collections::HashMap;
24use std::time::SystemTime;
25
26/// Admin API client for RustFS/MinIO-compatible servers
27pub struct AdminClient {
28    http_client: Client,
29    endpoint: String,
30    access_key: String,
31    secret_key: String,
32    region: String,
33    anonymous: bool,
34}
35
36impl AdminClient {
37    /// Create a new AdminClient from an Alias
38    pub fn new(alias: &Alias) -> Result<Self> {
39        let mut builder = Client::builder()
40            .danger_accept_invalid_certs(alias.insecure)
41            .tls_built_in_native_certs(true)
42            .tls_built_in_webpki_certs(true);
43
44        if let Some(bundle_path) = alias.ca_bundle.as_deref() {
45            let pem = std::fs::read(bundle_path).map_err(|e| {
46                Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
47            })?;
48            let certs = reqwest::Certificate::from_pem_bundle(&pem)
49                .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
50            if certs.is_empty() {
51                return Err(Error::Network(format!(
52                    "Invalid CA bundle '{bundle_path}': no certificates found"
53                )));
54            }
55            for cert in certs {
56                builder = builder.add_root_certificate(cert);
57            }
58        }
59
60        if let (Some(cert_path), Some(key_path)) =
61            (alias.client_cert.as_deref(), alias.client_key.as_deref())
62        {
63            let mut identity_pem = std::fs::read(cert_path).map_err(|e| {
64                Error::Network(format!(
65                    "Failed to read client certificate '{cert_path}': {e}"
66                ))
67            })?;
68            let key_pem = std::fs::read(key_path).map_err(|e| {
69                Error::Network(format!("Failed to read client key '{key_path}': {e}"))
70            })?;
71            identity_pem.extend_from_slice(b"\n");
72            identity_pem.extend_from_slice(&key_pem);
73            let identity = reqwest::Identity::from_pem(&identity_pem).map_err(|e| {
74                Error::Network(format!("Invalid client certificate/key identity: {e}"))
75            })?;
76            builder = builder.use_rustls_tls().identity(identity);
77        }
78
79        let http_client = builder
80            .build()
81            .map_err(|e| Error::Network(format!("Failed to create HTTP client: {e}")))?;
82
83        Ok(Self {
84            http_client,
85            endpoint: alias.endpoint.trim_end_matches('/').to_string(),
86            access_key: alias.access_key.clone(),
87            secret_key: alias.secret_key.clone(),
88            region: alias.region.clone(),
89            anonymous: alias.anonymous,
90        })
91    }
92
93    /// Build the base URL for admin API
94    fn admin_url(&self, path: &str) -> String {
95        format!("{}/rustfs/admin/v3{}", self.endpoint, path)
96    }
97
98    /// Calculate SHA256 hash of the body
99    fn sha256_hash(body: &[u8]) -> String {
100        let mut hasher = Sha256::new();
101        hasher.update(body);
102        hex::encode(hasher.finalize())
103    }
104
105    /// Sign a request using AWS SigV4
106    async fn sign_request(
107        &self,
108        method: &Method,
109        url: &str,
110        headers: &HeaderMap,
111        body: &[u8],
112    ) -> Result<HeaderMap> {
113        if self.anonymous {
114            return Ok(headers.clone());
115        }
116
117        let credentials = Credentials::new(
118            &self.access_key,
119            &self.secret_key,
120            None,
121            None,
122            "admin-client",
123        );
124
125        let identity = credentials.into();
126        let mut signing_settings = SigningSettings::default();
127        signing_settings.signature_location = SignatureLocation::Headers;
128
129        let signing_params = v4::SigningParams::builder()
130            .identity(&identity)
131            .region(&self.region)
132            .name("s3")
133            .time(SystemTime::now())
134            .settings(signing_settings)
135            .build()
136            .map_err(|e| Error::Auth(format!("Failed to build signing params: {e}")))?;
137
138        // Convert headers to a vec of tuples
139        let header_pairs: Vec<(&str, &str)> = headers
140            .iter()
141            .filter_map(|(k, v)| v.to_str().ok().map(|v| (k.as_str(), v)))
142            .collect();
143
144        let signable_body = SignableBody::Bytes(body);
145
146        let signable_request = SignableRequest::new(
147            method.as_str(),
148            url,
149            header_pairs.into_iter(),
150            signable_body,
151        )
152        .map_err(|e| Error::Auth(format!("Failed to create signable request: {e}")))?;
153
154        let (signing_instructions, _signature) = sign(signable_request, &signing_params.into())
155            .map_err(|e| Error::Auth(format!("Failed to sign request: {e}")))?
156            .into_parts();
157
158        // Apply signing instructions to create new headers
159        let mut signed_headers = headers.clone();
160        for (name, value) in signing_instructions.headers() {
161            let header_name = HeaderName::try_from(&name.to_string())
162                .map_err(|e| Error::Auth(format!("Invalid header name: {e}")))?;
163            let header_value = HeaderValue::try_from(&value.to_string())
164                .map_err(|e| Error::Auth(format!("Invalid header value: {e}")))?;
165            signed_headers.insert(header_name, header_value);
166        }
167
168        Ok(signed_headers)
169    }
170
171    /// Make a signed request to the admin API
172    async fn request<T: for<'de> Deserialize<'de>>(
173        &self,
174        method: Method,
175        path: &str,
176        query: Option<&[(&str, &str)]>,
177        body: Option<&[u8]>,
178    ) -> Result<T> {
179        let mut url = self.admin_url(path);
180
181        if let Some(q) = query {
182            let query_string: String = q
183                .iter()
184                .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
185                .collect::<Vec<_>>()
186                .join("&");
187            if !query_string.is_empty() {
188                url.push('?');
189                url.push_str(&query_string);
190            }
191        }
192
193        let body_bytes = body.unwrap_or(&[]);
194        let content_hash = Self::sha256_hash(body_bytes);
195
196        let mut headers = HeaderMap::new();
197        headers.insert("x-amz-content-sha256", content_hash.parse().unwrap());
198        headers.insert("host", self.get_host().parse().unwrap());
199
200        if !body_bytes.is_empty() {
201            headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
202        }
203
204        let signed_headers = self
205            .sign_request(&method, &url, &headers, body_bytes)
206            .await?;
207
208        let mut request_builder = self.http_client.request(method.clone(), &url);
209
210        for (name, value) in signed_headers.iter() {
211            request_builder = request_builder.header(name, value);
212        }
213
214        if !body_bytes.is_empty() {
215            request_builder = request_builder.body(body_bytes.to_vec());
216        }
217
218        let response = request_builder
219            .send()
220            .await
221            .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
222
223        let status = response.status();
224
225        if !status.is_success() {
226            let error_body = response
227                .text()
228                .await
229                .unwrap_or_else(|_| "Unknown error".to_string());
230            return Err(self.map_error(status, &error_body));
231        }
232
233        let text = response
234            .text()
235            .await
236            .map_err(|e| Error::Network(format!("Failed to read response: {e}")))?;
237
238        if text.is_empty() {
239            // Return empty/default for empty responses
240            serde_json::from_str("null").map_err(Error::Json)
241        } else {
242            serde_json::from_str(&text).map_err(Error::Json)
243        }
244    }
245
246    /// Make a signed request that returns no body
247    async fn request_no_response(
248        &self,
249        method: Method,
250        path: &str,
251        query: Option<&[(&str, &str)]>,
252        body: Option<&[u8]>,
253    ) -> Result<()> {
254        let mut url = self.admin_url(path);
255
256        if let Some(q) = query {
257            let query_string: String = q
258                .iter()
259                .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
260                .collect::<Vec<_>>()
261                .join("&");
262            if !query_string.is_empty() {
263                url.push('?');
264                url.push_str(&query_string);
265            }
266        }
267
268        let body_bytes = body.unwrap_or(&[]);
269        let content_hash = Self::sha256_hash(body_bytes);
270
271        let mut headers = HeaderMap::new();
272        headers.insert("x-amz-content-sha256", content_hash.parse().unwrap());
273        headers.insert("host", self.get_host().parse().unwrap());
274
275        if !body_bytes.is_empty() {
276            headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
277        }
278
279        let signed_headers = self
280            .sign_request(&method, &url, &headers, body_bytes)
281            .await?;
282
283        let mut request_builder = self.http_client.request(method.clone(), &url);
284
285        for (name, value) in signed_headers.iter() {
286            request_builder = request_builder.header(name, value);
287        }
288
289        if !body_bytes.is_empty() {
290            request_builder = request_builder.body(body_bytes.to_vec());
291        }
292
293        let response = request_builder
294            .send()
295            .await
296            .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
297
298        let status = response.status();
299
300        if !status.is_success() {
301            let error_body = response
302                .text()
303                .await
304                .unwrap_or_else(|_| "Unknown error".to_string());
305            return Err(self.map_error(status, &error_body));
306        }
307
308        Ok(())
309    }
310
311    /// Extract host from endpoint
312    fn get_host(&self) -> String {
313        self.endpoint
314            .trim_start_matches("http://")
315            .trim_start_matches("https://")
316            .to_string()
317    }
318
319    /// Map HTTP status codes to appropriate errors
320    fn map_error(&self, status: StatusCode, body: &str) -> Error {
321        match status {
322            StatusCode::NOT_FOUND => Error::NotFound(body.to_string()),
323            StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => Error::Auth(body.to_string()),
324            StatusCode::CONFLICT => Error::Conflict(body.to_string()),
325            StatusCode::BAD_REQUEST => Error::General(format!("Bad request: {body}")),
326            _ => Error::Network(format!("HTTP {}: {}", status.as_u16(), body)),
327        }
328    }
329}
330
331/// Response wrapper for user list
332#[derive(Debug, Deserialize)]
333struct UserListResponse(HashMap<String, UserInfo>);
334
335#[derive(Debug, Deserialize)]
336#[serde(rename_all = "camelCase")]
337struct UserInfo {
338    #[serde(default)]
339    status: String,
340    #[serde(default)]
341    policy_name: Option<String>,
342    #[serde(default)]
343    member_of: Option<Vec<String>>,
344}
345
346/// Response wrapper for policy list
347#[derive(Debug, Deserialize)]
348struct PolicyListResponse(HashMap<String, serde_json::Value>);
349
350/// Request body for creating a user
351#[derive(Debug, Serialize)]
352#[serde(rename_all = "camelCase")]
353struct CreateUserRequest {
354    secret_key: String,
355    status: String,
356}
357
358/// Request body for creating a group
359#[derive(Debug, Serialize)]
360struct CreateGroupRequest {
361    group: String,
362    #[serde(skip_serializing_if = "Option::is_none")]
363    members: Option<Vec<String>>,
364}
365
366/// Response for service account list
367#[derive(Debug, Deserialize)]
368struct ServiceAccountListResponse {
369    accounts: Option<Vec<ServiceAccountInfo>>,
370}
371
372#[derive(Debug, Deserialize)]
373#[serde(rename_all = "camelCase")]
374struct ServiceAccountInfo {
375    access_key: String,
376    #[serde(default)]
377    parent_user: Option<String>,
378    #[serde(default)]
379    account_status: Option<String>,
380    #[serde(default)]
381    expiration: Option<String>,
382    #[serde(default)]
383    name: Option<String>,
384    #[serde(default)]
385    description: Option<String>,
386    #[serde(default)]
387    implied_policy: Option<bool>,
388}
389
390#[derive(Debug, Deserialize)]
391#[serde(rename_all = "camelCase")]
392struct BackgroundHealStatusResponse {
393    #[serde(default)]
394    bitrot_start_time: Option<String>,
395}
396
397impl From<BackgroundHealStatusResponse> for HealStatus {
398    fn from(response: BackgroundHealStatusResponse) -> Self {
399        Self {
400            healing: response.bitrot_start_time.is_some(),
401            started: response.bitrot_start_time,
402            ..Default::default()
403        }
404    }
405}
406
407#[derive(Debug, Serialize)]
408struct RustfsHealOptions {
409    recursive: bool,
410    #[serde(rename = "dryRun")]
411    dry_run: bool,
412    remove: bool,
413    recreate: bool,
414    #[serde(rename = "scanMode")]
415    scan_mode: u8,
416    #[serde(rename = "updateParity")]
417    update_parity: bool,
418    #[serde(rename = "nolock")]
419    no_lock: bool,
420}
421
422impl From<&HealStartRequest> for RustfsHealOptions {
423    fn from(request: &HealStartRequest) -> Self {
424        Self {
425            recursive: false,
426            dry_run: request.dry_run,
427            remove: request.remove,
428            recreate: request.recreate,
429            scan_mode: rustfs_heal_scan_mode(request.scan_mode),
430            update_parity: false,
431            no_lock: false,
432        }
433    }
434}
435
436fn rustfs_heal_scan_mode(scan_mode: HealScanMode) -> u8 {
437    match scan_mode {
438        HealScanMode::Normal => 1,
439        HealScanMode::Deep => 2,
440    }
441}
442
443fn rustfs_heal_path(request: &HealStartRequest) -> Result<String> {
444    let bucket = request
445        .bucket
446        .as_deref()
447        .filter(|bucket| !bucket.is_empty());
448    let prefix = request
449        .prefix
450        .as_deref()
451        .filter(|prefix| !prefix.is_empty());
452
453    match (bucket, prefix) {
454        (None, None) => Ok("/heal/".to_string()),
455        (Some(bucket), None) => Ok(format!("/heal/{}", urlencoding::encode(bucket))),
456        (Some(bucket), Some(prefix)) => Ok(format!(
457            "/heal/{}/{}",
458            urlencoding::encode(bucket),
459            urlencoding::encode(prefix)
460        )),
461        (None, Some(_)) => Err(Error::InvalidPath(
462            "heal prefix requires a bucket target".to_string(),
463        )),
464    }
465}
466
467fn rustfs_heal_body(request: &HealStartRequest) -> Result<Vec<u8>> {
468    serde_json::to_vec(&RustfsHealOptions::from(request)).map_err(Error::Json)
469}
470
471fn pool_target_query(target: &PoolTarget) -> Vec<(&str, &str)> {
472    let mut query = vec![("pool", target.pool.as_str())];
473    if target.by_id {
474        query.push(("by-id", "true"));
475    }
476    query
477}
478
479/// Request body for setting bucket quota
480#[derive(Debug, Serialize)]
481#[serde(rename_all = "camelCase")]
482struct SetBucketQuotaApiRequest {
483    quota: u64,
484    quota_type: String,
485}
486
487#[async_trait]
488impl AdminApi for AdminClient {
489    // ==================== Cluster Operations ====================
490
491    async fn cluster_info(&self) -> Result<ClusterInfo> {
492        self.request(Method::GET, "/info", None, None).await
493    }
494
495    async fn heal_status(&self) -> Result<HealStatus> {
496        let response: BackgroundHealStatusResponse = self
497            .request(Method::POST, "/background-heal/status", None, None)
498            .await?;
499        Ok(response.into())
500    }
501
502    async fn heal_start(&self, request: HealStartRequest) -> Result<HealStatus> {
503        let path = rustfs_heal_path(&request)?;
504        let body = rustfs_heal_body(&request)?;
505        self.request_no_response(Method::POST, &path, None, Some(&body))
506            .await?;
507        Ok(HealStatus::default())
508    }
509
510    async fn heal_stop(&self) -> Result<()> {
511        let body = rustfs_heal_body(&HealStartRequest::default())?;
512        self.request_no_response(
513            Method::POST,
514            "/heal/",
515            Some(&[("forceStop", "true")]),
516            Some(&body),
517        )
518        .await
519    }
520
521    async fn list_pools(&self) -> Result<Vec<PoolStatus>> {
522        self.request(Method::GET, "/pools/list", None, None).await
523    }
524
525    async fn pool_status(&self, target: PoolTarget) -> Result<PoolStatus> {
526        let query = pool_target_query(&target);
527        self.request(Method::GET, "/pools/status", Some(&query), None)
528            .await
529    }
530
531    async fn decommission_start(&self, target: PoolTarget) -> Result<()> {
532        let query = pool_target_query(&target);
533        self.request_no_response(Method::POST, "/pools/decommission", Some(&query), None)
534            .await
535    }
536
537    async fn decommission_cancel(&self, target: PoolTarget) -> Result<()> {
538        let query = pool_target_query(&target);
539        self.request_no_response(Method::POST, "/pools/cancel", Some(&query), None)
540            .await
541    }
542
543    async fn rebalance_start(&self) -> Result<RebalanceStartResult> {
544        self.request(Method::POST, "/rebalance/start", None, None)
545            .await
546    }
547
548    async fn rebalance_status(&self) -> Result<RebalanceStatus> {
549        self.request(Method::GET, "/rebalance/status", None, None)
550            .await
551    }
552
553    async fn rebalance_stop(&self) -> Result<()> {
554        self.request_no_response(Method::POST, "/rebalance/stop", None, None)
555            .await
556    }
557
558    // ==================== User Operations ====================
559
560    async fn list_users(&self) -> Result<Vec<User>> {
561        let response: UserListResponse =
562            self.request(Method::GET, "/list-users", None, None).await?;
563
564        Ok(response
565            .0
566            .into_iter()
567            .map(|(access_key, info)| User {
568                access_key,
569                secret_key: None,
570                status: if info.status == "disabled" {
571                    UserStatus::Disabled
572                } else {
573                    UserStatus::Enabled
574                },
575                policy_name: info.policy_name,
576                member_of: info.member_of.unwrap_or_default(),
577            })
578            .collect())
579    }
580
581    async fn get_user(&self, access_key: &str) -> Result<User> {
582        let query = [("accessKey", access_key)];
583        let response: UserInfo = self
584            .request(Method::GET, "/user-info", Some(&query), None)
585            .await?;
586
587        Ok(User {
588            access_key: access_key.to_string(),
589            secret_key: None,
590            status: if response.status == "disabled" {
591                UserStatus::Disabled
592            } else {
593                UserStatus::Enabled
594            },
595            policy_name: response.policy_name,
596            member_of: response.member_of.unwrap_or_default(),
597        })
598    }
599
600    async fn create_user(&self, access_key: &str, secret_key: &str) -> Result<User> {
601        let query = [("accessKey", access_key)];
602        let body = serde_json::to_vec(&CreateUserRequest {
603            secret_key: secret_key.to_string(),
604            status: "enabled".to_string(),
605        })
606        .map_err(Error::Json)?;
607
608        self.request_no_response(Method::PUT, "/add-user", Some(&query), Some(&body))
609            .await?;
610
611        Ok(User {
612            access_key: access_key.to_string(),
613            secret_key: Some(secret_key.to_string()),
614            status: UserStatus::Enabled,
615            policy_name: None,
616            member_of: vec![],
617        })
618    }
619
620    async fn delete_user(&self, access_key: &str) -> Result<()> {
621        let query = [("accessKey", access_key)];
622        self.request_no_response(Method::DELETE, "/remove-user", Some(&query), None)
623            .await
624    }
625
626    async fn set_user_status(&self, access_key: &str, status: UserStatus) -> Result<()> {
627        let status_str = match status {
628            UserStatus::Enabled => "enabled",
629            UserStatus::Disabled => "disabled",
630        };
631        let query = [("accessKey", access_key), ("status", status_str)];
632        self.request_no_response(Method::PUT, "/set-user-status", Some(&query), None)
633            .await
634    }
635
636    // ==================== Policy Operations ====================
637
638    async fn list_policies(&self) -> Result<Vec<PolicyInfo>> {
639        let response: PolicyListResponse = self
640            .request(Method::GET, "/list-canned-policies", None, None)
641            .await?;
642
643        Ok(response
644            .0
645            .into_keys()
646            .map(|name| PolicyInfo { name })
647            .collect())
648    }
649
650    async fn get_policy(&self, name: &str) -> Result<Policy> {
651        let query = [("name", name)];
652        let policy_doc: serde_json::Value = self
653            .request(Method::GET, "/info-canned-policy", Some(&query), None)
654            .await?;
655
656        Ok(Policy {
657            name: name.to_string(),
658            policy: serde_json::to_string_pretty(&policy_doc).unwrap_or_default(),
659        })
660    }
661
662    async fn create_policy(&self, name: &str, policy_document: &str) -> Result<()> {
663        let query = [("name", name)];
664        let body = policy_document.as_bytes();
665        self.request_no_response(Method::PUT, "/add-canned-policy", Some(&query), Some(body))
666            .await
667    }
668
669    async fn delete_policy(&self, name: &str) -> Result<()> {
670        let query = [("name", name)];
671        self.request_no_response(Method::DELETE, "/remove-canned-policy", Some(&query), None)
672            .await
673    }
674
675    async fn attach_policy(
676        &self,
677        policy_names: &[String],
678        entity_type: PolicyEntity,
679        entity_name: &str,
680    ) -> Result<()> {
681        let policy_name = policy_names.join(",");
682        let is_group = entity_type == PolicyEntity::Group;
683
684        let query = [
685            ("policyName", policy_name.as_str()),
686            ("userOrGroup", entity_name),
687            ("isGroup", if is_group { "true" } else { "false" }),
688        ];
689
690        self.request_no_response(Method::PUT, "/set-user-or-group-policy", Some(&query), None)
691            .await
692    }
693
694    async fn detach_policy(
695        &self,
696        policy_names: &[String],
697        entity_type: PolicyEntity,
698        entity_name: &str,
699    ) -> Result<()> {
700        // Detach by setting empty policy
701        // In RustFS/MinIO, you typically set a new policy which replaces the old one
702        // For detach, we need to get current policies and remove the specified ones
703        let _ = (policy_names, entity_type, entity_name);
704        Err(Error::UnsupportedFeature(
705            "Policy detach not directly supported. Use attach with remaining policies instead."
706                .to_string(),
707        ))
708    }
709
710    // ==================== Group Operations ====================
711
712    async fn list_groups(&self) -> Result<Vec<String>> {
713        let response: Vec<String> = self.request(Method::GET, "/groups", None, None).await?;
714        Ok(response)
715    }
716
717    async fn get_group(&self, name: &str) -> Result<Group> {
718        let query = [("group", name)];
719        let response: Group = self
720            .request(Method::GET, "/group", Some(&query), None)
721            .await?;
722        Ok(response)
723    }
724
725    async fn create_group(&self, name: &str, members: Option<&[String]>) -> Result<Group> {
726        let body = serde_json::to_vec(&CreateGroupRequest {
727            group: name.to_string(),
728            members: members.map(|m| m.to_vec()),
729        })
730        .map_err(Error::Json)?;
731
732        self.request_no_response(Method::POST, "/groups", None, Some(&body))
733            .await?;
734
735        Ok(Group {
736            name: name.to_string(),
737            policy: None,
738            members: members.map(|m| m.to_vec()).unwrap_or_default(),
739            status: GroupStatus::Enabled,
740        })
741    }
742
743    async fn delete_group(&self, name: &str) -> Result<()> {
744        let path = format!("/group/{}", urlencoding::encode(name));
745        self.request_no_response(Method::DELETE, &path, None, None)
746            .await
747    }
748
749    async fn set_group_status(&self, name: &str, status: GroupStatus) -> Result<()> {
750        let status_str = match status {
751            GroupStatus::Enabled => "enabled",
752            GroupStatus::Disabled => "disabled",
753        };
754        let query = [("group", name), ("status", status_str)];
755        self.request_no_response(Method::PUT, "/set-group-status", Some(&query), None)
756            .await
757    }
758
759    async fn add_group_members(&self, group: &str, members: &[String]) -> Result<()> {
760        let body = serde_json::to_vec(&UpdateGroupMembersRequest {
761            group: group.to_string(),
762            members: members.to_vec(),
763            is_remove: false,
764            status: "enabled".to_string(),
765        })
766        .map_err(Error::Json)?;
767
768        self.request_no_response(Method::PUT, "/update-group-members", None, Some(&body))
769            .await
770    }
771
772    async fn remove_group_members(&self, group: &str, members: &[String]) -> Result<()> {
773        let body = serde_json::to_vec(&UpdateGroupMembersRequest {
774            group: group.to_string(),
775            members: members.to_vec(),
776            is_remove: true,
777            status: "enabled".to_string(),
778        })
779        .map_err(Error::Json)?;
780
781        self.request_no_response(Method::PUT, "/update-group-members", None, Some(&body))
782            .await
783    }
784
785    // ==================== Service Account Operations ====================
786
787    async fn list_service_accounts(&self, user: Option<&str>) -> Result<Vec<ServiceAccount>> {
788        let query: Vec<(&str, &str)> = user.map(|u| vec![("user", u)]).unwrap_or_default();
789        let query_ref: Option<&[(&str, &str)]> = if query.is_empty() { None } else { Some(&query) };
790
791        let response: ServiceAccountListResponse = self
792            .request(Method::GET, "/list-service-accounts", query_ref, None)
793            .await?;
794
795        Ok(response
796            .accounts
797            .unwrap_or_default()
798            .into_iter()
799            .map(|sa| ServiceAccount {
800                access_key: sa.access_key,
801                secret_key: None,
802                parent_user: sa.parent_user,
803                policy: None,
804                account_status: sa.account_status,
805                expiration: sa.expiration,
806                name: sa.name,
807                description: sa.description,
808                implied_policy: sa.implied_policy,
809            })
810            .collect())
811    }
812
813    async fn get_service_account(&self, access_key: &str) -> Result<ServiceAccount> {
814        let query = [("accessKey", access_key)];
815        let response: ServiceAccount = self
816            .request(Method::GET, "/info-service-account", Some(&query), None)
817            .await?;
818
819        let mut response = response;
820        if response.access_key.is_empty() {
821            response.access_key = access_key.to_string();
822        }
823        Ok(response)
824    }
825
826    async fn create_service_account(
827        &self,
828        request: CreateServiceAccountRequest,
829    ) -> Result<ServiceAccount> {
830        let body = serde_json::to_vec(&request).map_err(Error::Json)?;
831        let response: ServiceAccountCreateResponse = self
832            .request(Method::PUT, "/add-service-accounts", None, Some(&body))
833            .await?;
834
835        Ok(ServiceAccount {
836            access_key: response.credentials.access_key,
837            secret_key: Some(response.credentials.secret_key),
838            expiration: response.credentials.expiration,
839            parent_user: None,
840            policy: None,
841            account_status: None,
842            name: None,
843            description: None,
844            implied_policy: None,
845        })
846    }
847
848    async fn delete_service_account(&self, access_key: &str) -> Result<()> {
849        let query = [("accessKey", access_key)];
850        self.request_no_response(
851            Method::DELETE,
852            "/delete-service-accounts",
853            Some(&query),
854            None,
855        )
856        .await
857    }
858
859    // ==================== Bucket Quota Operations ====================
860
861    async fn set_bucket_quota(&self, bucket: &str, quota: u64) -> Result<BucketQuota> {
862        let path = format!("/quota/{}", urlencoding::encode(bucket));
863        let body = serde_json::to_vec(&SetBucketQuotaApiRequest {
864            quota,
865            quota_type: "HARD".to_string(),
866        })
867        .map_err(Error::Json)?;
868
869        self.request(Method::PUT, &path, None, Some(&body)).await
870    }
871
872    async fn get_bucket_quota(&self, bucket: &str) -> Result<BucketQuota> {
873        let path = format!("/quota/{}", urlencoding::encode(bucket));
874        self.request(Method::GET, &path, None, None).await
875    }
876
877    async fn clear_bucket_quota(&self, bucket: &str) -> Result<BucketQuota> {
878        let path = format!("/quota/{}", urlencoding::encode(bucket));
879        self.request(Method::DELETE, &path, None, None).await
880    }
881
882    // ==================== Tier Operations ====================
883
884    async fn list_tiers(&self) -> Result<Vec<rc_core::admin::TierConfig>> {
885        self.request(Method::GET, "/tier", None, None).await
886    }
887
888    async fn tier_stats(&self) -> Result<serde_json::Value> {
889        self.request(Method::GET, "/tier-stats", None, None).await
890    }
891
892    async fn add_tier(&self, config: rc_core::admin::TierConfig) -> Result<()> {
893        let body = serde_json::to_vec(&config).map_err(Error::Json)?;
894        self.request_no_response(Method::PUT, "/tier", None, Some(&body))
895            .await
896    }
897
898    async fn edit_tier(&self, name: &str, creds: rc_core::admin::TierCreds) -> Result<()> {
899        let path = format!("/tier/{}", urlencoding::encode(name));
900        let body = serde_json::to_vec(&creds).map_err(Error::Json)?;
901        self.request_no_response(Method::POST, &path, None, Some(&body))
902            .await
903    }
904
905    async fn remove_tier(&self, name: &str, force: bool) -> Result<()> {
906        let path = format!("/tier/{}", urlencoding::encode(name));
907        if force {
908            let query: &[(&str, &str)] = &[("force", "true")];
909            self.request_no_response(Method::DELETE, &path, Some(query), None)
910                .await
911        } else {
912            self.request_no_response(Method::DELETE, &path, None, None)
913                .await
914        }
915    }
916
917    // ==================== Replication Target Operations ====================
918
919    async fn set_remote_target(
920        &self,
921        bucket: &str,
922        target: rc_core::replication::BucketTarget,
923        update: bool,
924    ) -> Result<String> {
925        let body = serde_json::to_vec(&target).map_err(Error::Json)?;
926        if update {
927            let query: &[(&str, &str)] = &[("bucket", bucket), ("update", "true")];
928            self.request(Method::PUT, "/set-remote-target", Some(query), Some(&body))
929                .await
930        } else {
931            let query: &[(&str, &str)] = &[("bucket", bucket)];
932            self.request(Method::PUT, "/set-remote-target", Some(query), Some(&body))
933                .await
934        }
935    }
936
937    async fn list_remote_targets(
938        &self,
939        bucket: &str,
940    ) -> Result<Vec<rc_core::replication::BucketTarget>> {
941        let query: &[(&str, &str)] = &[("bucket", bucket)];
942        self.request(Method::GET, "/list-remote-targets", Some(query), None)
943            .await
944    }
945
946    async fn remove_remote_target(&self, bucket: &str, arn: &str) -> Result<()> {
947        let query: &[(&str, &str)] = &[("bucket", bucket), ("arn", arn)];
948        self.request_no_response(Method::DELETE, "/remove-remote-target", Some(query), None)
949            .await
950    }
951
952    async fn replication_metrics(&self, bucket: &str) -> Result<serde_json::Value> {
953        let query: &[(&str, &str)] = &[("bucket", bucket)];
954        self.request(Method::GET, "/replicationmetrics", Some(query), None)
955            .await
956    }
957}
958
959#[cfg(test)]
960mod tests {
961    use super::*;
962    use std::io::{Read, Write};
963    use std::net::{TcpListener, TcpStream};
964    use std::sync::mpsc;
965    use std::thread;
966    use std::time::Duration;
967    use tempfile::tempdir;
968
969    #[derive(Debug)]
970    struct CapturedAdminRequest {
971        method: String,
972        target: String,
973        headers: String,
974        body: Vec<u8>,
975    }
976
977    fn read_admin_request(stream: &mut TcpStream) -> CapturedAdminRequest {
978        let mut buffer = Vec::new();
979        let mut chunk = [0_u8; 1024];
980        let header_end = loop {
981            let read = stream.read(&mut chunk).expect("read HTTP request");
982            assert!(read > 0, "client closed connection before headers");
983            buffer.extend_from_slice(&chunk[..read]);
984
985            if let Some(position) = buffer.windows(4).position(|window| window == b"\r\n\r\n") {
986                break position + 4;
987            }
988        };
989
990        let headers = String::from_utf8_lossy(&buffer[..header_end]).into_owned();
991        let content_length = headers
992            .lines()
993            .find_map(|line| {
994                let (name, value) = line.split_once(':')?;
995                name.eq_ignore_ascii_case("content-length")
996                    .then(|| value.trim().parse::<usize>().expect("valid content length"))
997            })
998            .unwrap_or(0);
999
1000        while buffer.len() - header_end < content_length {
1001            let read = stream.read(&mut chunk).expect("read HTTP request body");
1002            assert!(read > 0, "client closed connection before body");
1003            buffer.extend_from_slice(&chunk[..read]);
1004        }
1005
1006        let request_line = headers.lines().next().expect("request line");
1007        let mut parts = request_line.split_whitespace();
1008        let method = parts.next().expect("request method").to_string();
1009        let target = parts.next().expect("request target").to_string();
1010        let body = buffer[header_end..header_end + content_length].to_vec();
1011
1012        CapturedAdminRequest {
1013            method,
1014            target,
1015            headers,
1016            body,
1017        }
1018    }
1019
1020    fn start_admin_test_server(
1021        response_status: &str,
1022        response_body: &'static str,
1023    ) -> (
1024        String,
1025        mpsc::Receiver<CapturedAdminRequest>,
1026        thread::JoinHandle<()>,
1027    ) {
1028        let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
1029        let endpoint = format!("http://{}", listener.local_addr().expect("local addr"));
1030        let (sender, receiver) = mpsc::channel();
1031        let response_status = response_status.to_string();
1032
1033        let handle = thread::spawn(move || {
1034            let (mut stream, _) = listener.accept().expect("accept request");
1035            let request = read_admin_request(&mut stream);
1036            sender.send(request).expect("send captured request");
1037
1038            let response = format!(
1039                "HTTP/1.1 {response_status}\r\ncontent-length: {}\r\ncontent-type: application/json\r\nconnection: close\r\n\r\n{response_body}",
1040                response_body.len()
1041            );
1042            stream
1043                .write_all(response.as_bytes())
1044                .expect("write HTTP response");
1045        });
1046
1047        (endpoint, receiver, handle)
1048    }
1049
1050    fn admin_client_for_endpoint(endpoint: &str) -> AdminClient {
1051        let alias = Alias::new("test", endpoint, "access", "secret");
1052        AdminClient::new(&alias).expect("admin client should build")
1053    }
1054
1055    fn anonymous_admin_client_for_endpoint(endpoint: &str) -> AdminClient {
1056        let mut alias = Alias::new("test", endpoint, "", "");
1057        alias.anonymous = true;
1058        AdminClient::new(&alias).expect("anonymous admin client should build")
1059    }
1060
1061    fn assert_heal_options_body(
1062        body: &[u8],
1063        scan_mode: u8,
1064        remove: bool,
1065        recreate: bool,
1066        dry_run: bool,
1067    ) {
1068        let value: serde_json::Value =
1069            serde_json::from_slice(body).expect("heal request body should be JSON");
1070
1071        assert_eq!(value["recursive"], false);
1072        assert_eq!(value["dryRun"], dry_run);
1073        assert_eq!(value["remove"], remove);
1074        assert_eq!(value["recreate"], recreate);
1075        assert_eq!(value["scanMode"], scan_mode);
1076        assert_eq!(value["updateParity"], false);
1077        assert_eq!(value["nolock"], false);
1078        assert!(value.get("bucket").is_none());
1079        assert!(value.get("prefix").is_none());
1080    }
1081
1082    #[test]
1083    fn test_admin_url_construction() {
1084        let alias = Alias::new("test", "http://localhost:9000", "access", "secret");
1085        let client = AdminClient::new(&alias).unwrap();
1086
1087        assert_eq!(
1088            client.admin_url("/list-users"),
1089            "http://localhost:9000/rustfs/admin/v3/list-users"
1090        );
1091    }
1092
1093    #[test]
1094    fn test_admin_url_with_trailing_slash() {
1095        let alias = Alias::new("test", "http://localhost:9000/", "access", "secret");
1096        let client = AdminClient::new(&alias).unwrap();
1097
1098        assert_eq!(
1099            client.admin_url("/list-users"),
1100            "http://localhost:9000/rustfs/admin/v3/list-users"
1101        );
1102    }
1103
1104    #[test]
1105    fn test_get_host() {
1106        let alias = Alias::new("test", "https://s3.example.com", "access", "secret");
1107        let client = AdminClient::new(&alias).unwrap();
1108
1109        assert_eq!(client.get_host(), "s3.example.com");
1110    }
1111
1112    #[test]
1113    fn test_sha256_hash() {
1114        let hash = AdminClient::sha256_hash(b"test");
1115        assert_eq!(
1116            hash,
1117            "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
1118        );
1119    }
1120
1121    #[test]
1122    fn test_sha256_hash_empty() {
1123        let hash = AdminClient::sha256_hash(b"");
1124        assert_eq!(
1125            hash,
1126            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
1127        );
1128    }
1129
1130    #[test]
1131    fn test_rustfs_heal_path_matches_admin_routes() {
1132        assert_eq!(
1133            rustfs_heal_path(&HealStartRequest::default()).expect("root path"),
1134            "/heal/"
1135        );
1136
1137        let bucket_request = HealStartRequest {
1138            bucket: Some("photos".to_string()),
1139            ..Default::default()
1140        };
1141        assert_eq!(
1142            rustfs_heal_path(&bucket_request).expect("bucket path"),
1143            "/heal/photos"
1144        );
1145
1146        let prefix_request = HealStartRequest {
1147            bucket: Some("photos".to_string()),
1148            prefix: Some("2026/raw".to_string()),
1149            ..Default::default()
1150        };
1151        assert_eq!(
1152            rustfs_heal_path(&prefix_request).expect("prefix path"),
1153            "/heal/photos/2026%2Fraw"
1154        );
1155
1156        let invalid_request = HealStartRequest {
1157            prefix: Some("2026/raw".to_string()),
1158            ..Default::default()
1159        };
1160        assert!(matches!(
1161            rustfs_heal_path(&invalid_request),
1162            Err(Error::InvalidPath(_))
1163        ));
1164    }
1165
1166    #[test]
1167    fn test_rustfs_heal_body_matches_server_heal_options() {
1168        let request = HealStartRequest {
1169            scan_mode: HealScanMode::Deep,
1170            remove: true,
1171            recreate: true,
1172            dry_run: true,
1173            ..Default::default()
1174        };
1175
1176        let body = rustfs_heal_body(&request).expect("heal options should serialize");
1177        let value: serde_json::Value =
1178            serde_json::from_slice(&body).expect("heal options body should be JSON");
1179
1180        assert_eq!(value["recursive"], false);
1181        assert_eq!(value["dryRun"], true);
1182        assert_eq!(value["remove"], true);
1183        assert_eq!(value["recreate"], true);
1184        assert_eq!(value["scanMode"], 2);
1185        assert_eq!(value["updateParity"], false);
1186        assert_eq!(value["nolock"], false);
1187        assert!(value.get("bucket").is_none());
1188        assert!(value.get("prefix").is_none());
1189    }
1190
1191    #[test]
1192    fn test_background_heal_status_response_maps_to_heal_status() {
1193        let status = HealStatus::from(BackgroundHealStatusResponse {
1194            bitrot_start_time: Some("2026-04-19T10:00:00Z".to_string()),
1195        });
1196
1197        assert!(status.healing);
1198        assert_eq!(status.started.as_deref(), Some("2026-04-19T10:00:00Z"));
1199
1200        let idle = HealStatus::from(BackgroundHealStatusResponse {
1201            bitrot_start_time: None,
1202        });
1203        assert!(!idle.healing);
1204        assert!(idle.started.is_none());
1205    }
1206
1207    #[test]
1208    fn test_bad_request_maps_to_general_admin_error() {
1209        let alias = Alias::new("test", "http://localhost:9000", "access", "secret");
1210        let client = AdminClient::new(&alias).expect("admin client should build");
1211
1212        let error = client.map_error(StatusCode::BAD_REQUEST, "err request body parse");
1213        assert!(matches!(error, Error::General(_)));
1214        assert_eq!(error.to_string(), "Bad request: err request body parse");
1215    }
1216
1217    #[tokio::test]
1218    async fn test_heal_status_uses_background_heal_status_endpoint() {
1219        let (endpoint, receiver, handle) =
1220            start_admin_test_server("200 OK", r#"{"bitrotStartTime":"2026-04-19T10:00:00Z"}"#);
1221        let client = admin_client_for_endpoint(&endpoint);
1222
1223        let status = client.heal_status().await.expect("heal status request");
1224
1225        assert!(status.healing);
1226        assert_eq!(status.started.as_deref(), Some("2026-04-19T10:00:00Z"));
1227
1228        let request = receiver.recv().expect("captured request");
1229        assert_eq!(request.method, "POST");
1230        assert_eq!(request.target, "/rustfs/admin/v3/background-heal/status");
1231        assert!(request.body.is_empty());
1232        handle.join().expect("server thread should finish");
1233    }
1234
1235    #[tokio::test]
1236    async fn test_anonymous_admin_requests_skip_authorization_header() {
1237        let (endpoint, receiver, handle) =
1238            start_admin_test_server("200 OK", r#"{"bitrotStartTime":"2026-04-19T10:00:00Z"}"#);
1239        let client = anonymous_admin_client_for_endpoint(&endpoint);
1240
1241        client
1242            .heal_status()
1243            .await
1244            .expect("anonymous heal status request");
1245
1246        let request = receiver
1247            .recv_timeout(Duration::from_secs(5))
1248            .expect("captured request");
1249        assert_eq!(request.method, "POST");
1250        assert_eq!(request.target, "/rustfs/admin/v3/background-heal/status");
1251        assert!(
1252            !request
1253                .headers
1254                .lines()
1255                .any(|line| line.to_ascii_lowercase().starts_with("authorization:"))
1256        );
1257        handle.join().expect("server thread should finish");
1258    }
1259
1260    #[tokio::test]
1261    async fn test_anonymous_admin_no_response_requests_skip_authorization_header() {
1262        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1263        let client = anonymous_admin_client_for_endpoint(&endpoint);
1264        let request = HealStartRequest {
1265            bucket: Some("raw photos".to_string()),
1266            prefix: Some("2026/april".to_string()),
1267            scan_mode: HealScanMode::Deep,
1268            remove: true,
1269            recreate: true,
1270            dry_run: true,
1271        };
1272
1273        client
1274            .heal_start(request)
1275            .await
1276            .expect("anonymous heal start request");
1277
1278        let request = receiver
1279            .recv_timeout(Duration::from_secs(5))
1280            .expect("captured request");
1281        assert_eq!(request.method, "POST");
1282        assert_eq!(
1283            request.target,
1284            "/rustfs/admin/v3/heal/raw%20photos/2026%2Fapril"
1285        );
1286        assert_heal_options_body(&request.body, 2, true, true, true);
1287        assert!(
1288            !request
1289                .headers
1290                .lines()
1291                .any(|line| line.to_ascii_lowercase().starts_with("authorization:"))
1292        );
1293        handle.join().expect("server thread should finish");
1294    }
1295
1296    #[tokio::test]
1297    async fn test_heal_start_posts_to_bucket_prefix_route_with_options_body() {
1298        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1299        let client = admin_client_for_endpoint(&endpoint);
1300        let request = HealStartRequest {
1301            bucket: Some("raw photos".to_string()),
1302            prefix: Some("2026/april".to_string()),
1303            scan_mode: HealScanMode::Deep,
1304            remove: true,
1305            recreate: true,
1306            dry_run: true,
1307        };
1308
1309        let status = client
1310            .heal_start(request)
1311            .await
1312            .expect("heal start request");
1313
1314        assert!(!status.healing);
1315        assert!(status.heal_id.is_empty());
1316        assert!(status.started.is_none());
1317
1318        let request = receiver.recv().expect("captured request");
1319        assert_eq!(request.method, "POST");
1320        assert_eq!(
1321            request.target,
1322            "/rustfs/admin/v3/heal/raw%20photos/2026%2Fapril"
1323        );
1324        assert_heal_options_body(&request.body, 2, true, true, true);
1325        handle.join().expect("server thread should finish");
1326    }
1327
1328    #[tokio::test]
1329    async fn test_heal_stop_posts_force_stop_to_root_heal_route() {
1330        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1331        let client = admin_client_for_endpoint(&endpoint);
1332
1333        client.heal_stop().await.expect("heal stop request");
1334
1335        let request = receiver.recv().expect("captured request");
1336        assert_eq!(request.method, "POST");
1337        assert_eq!(request.target, "/rustfs/admin/v3/heal/?forceStop=true");
1338        assert_heal_options_body(&request.body, 1, false, false, false);
1339        handle.join().expect("server thread should finish");
1340    }
1341
1342    #[tokio::test]
1343    async fn test_pool_status_uses_pool_status_route_with_by_id_query() {
1344        let (endpoint, receiver, handle) = start_admin_test_server(
1345            "200 OK",
1346            r#"{"id":1,"cmdline":"/data/pool1/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}"#,
1347        );
1348        let client = admin_client_for_endpoint(&endpoint);
1349
1350        let status = client
1351            .pool_status(PoolTarget {
1352                pool: "1".to_string(),
1353                by_id: true,
1354            })
1355            .await
1356            .expect("pool status request");
1357
1358        assert_eq!(status.id, 1);
1359        assert_eq!(status.cmd_line, "/data/pool1/disk{1...4}");
1360
1361        let request = receiver.recv().expect("captured request");
1362        assert_eq!(request.method, "GET");
1363        assert_eq!(
1364            request.target,
1365            "/rustfs/admin/v3/pools/status?pool=1&by-id=true"
1366        );
1367        assert!(request.body.is_empty());
1368        handle.join().expect("server thread should finish");
1369    }
1370
1371    #[tokio::test]
1372    async fn test_pool_status_uses_command_line_query_without_by_id() {
1373        let (endpoint, receiver, handle) = start_admin_test_server(
1374            "200 OK",
1375            r#"{"id":2,"cmdline":"/data/pool2/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}"#,
1376        );
1377        let client = admin_client_for_endpoint(&endpoint);
1378
1379        let status = client
1380            .pool_status(PoolTarget {
1381                pool: "/data/pool2/disk{1...4}".to_string(),
1382                by_id: false,
1383            })
1384            .await
1385            .expect("pool status request");
1386
1387        assert_eq!(status.id, 2);
1388        assert_eq!(status.cmd_line, "/data/pool2/disk{1...4}");
1389
1390        let request = receiver.recv().expect("captured request");
1391        assert_eq!(request.method, "GET");
1392        assert_eq!(
1393            request.target,
1394            "/rustfs/admin/v3/pools/status?pool=%2Fdata%2Fpool2%2Fdisk%7B1...4%7D"
1395        );
1396        assert!(request.body.is_empty());
1397        handle.join().expect("server thread should finish");
1398    }
1399
1400    #[tokio::test]
1401    async fn test_list_pools_uses_pool_list_route() {
1402        let (endpoint, receiver, handle) = start_admin_test_server(
1403            "200 OK",
1404            r#"[{"id":0,"cmdline":"/data/pool0/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}]"#,
1405        );
1406        let client = admin_client_for_endpoint(&endpoint);
1407
1408        let pools = client.list_pools().await.expect("list pools request");
1409
1410        assert_eq!(pools.len(), 1);
1411        assert_eq!(pools[0].id, 0);
1412        assert_eq!(pools[0].cmd_line, "/data/pool0/disk{1...4}");
1413
1414        let request = receiver.recv().expect("captured request");
1415        assert_eq!(request.method, "GET");
1416        assert_eq!(request.target, "/rustfs/admin/v3/pools/list");
1417        assert!(request.body.is_empty());
1418        handle.join().expect("server thread should finish");
1419    }
1420
1421    #[tokio::test]
1422    async fn test_decommission_start_posts_pool_query() {
1423        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1424        let client = admin_client_for_endpoint(&endpoint);
1425
1426        client
1427            .decommission_start(PoolTarget {
1428                pool: "/data/pool1/disk{1...4}".to_string(),
1429                by_id: false,
1430            })
1431            .await
1432            .expect("decommission start request");
1433
1434        let request = receiver.recv().expect("captured request");
1435        assert_eq!(request.method, "POST");
1436        assert_eq!(
1437            request.target,
1438            "/rustfs/admin/v3/pools/decommission?pool=%2Fdata%2Fpool1%2Fdisk%7B1...4%7D"
1439        );
1440        assert!(request.body.is_empty());
1441        handle.join().expect("server thread should finish");
1442    }
1443
1444    #[tokio::test]
1445    async fn test_decommission_start_posts_by_id_multi_pool_query() {
1446        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1447        let client = admin_client_for_endpoint(&endpoint);
1448
1449        client
1450            .decommission_start(PoolTarget {
1451                pool: "1,2".to_string(),
1452                by_id: true,
1453            })
1454            .await
1455            .expect("decommission start request");
1456
1457        let request = receiver.recv().expect("captured request");
1458        assert_eq!(request.method, "POST");
1459        assert_eq!(
1460            request.target,
1461            "/rustfs/admin/v3/pools/decommission?pool=1%2C2&by-id=true"
1462        );
1463        assert!(request.body.is_empty());
1464        handle.join().expect("server thread should finish");
1465    }
1466
1467    #[tokio::test]
1468    async fn test_decommission_cancel_posts_pool_cancel_route_with_by_id_query() {
1469        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1470        let client = admin_client_for_endpoint(&endpoint);
1471
1472        client
1473            .decommission_cancel(PoolTarget {
1474                pool: "1".to_string(),
1475                by_id: true,
1476            })
1477            .await
1478            .expect("decommission cancel request");
1479
1480        let request = receiver.recv().expect("captured request");
1481        assert_eq!(request.method, "POST");
1482        assert_eq!(
1483            request.target,
1484            "/rustfs/admin/v3/pools/cancel?pool=1&by-id=true"
1485        );
1486        assert!(request.body.is_empty());
1487        handle.join().expect("server thread should finish");
1488    }
1489
1490    #[tokio::test]
1491    async fn test_rebalance_start_posts_rebalance_start_route() {
1492        let (endpoint, receiver, handle) =
1493            start_admin_test_server("200 OK", r#"{"id":"rebalance-123"}"#);
1494        let client = admin_client_for_endpoint(&endpoint);
1495
1496        let result = client
1497            .rebalance_start()
1498            .await
1499            .expect("rebalance start request");
1500
1501        assert_eq!(result.id, "rebalance-123");
1502
1503        let request = receiver.recv().expect("captured request");
1504        assert_eq!(request.method, "POST");
1505        assert_eq!(request.target, "/rustfs/admin/v3/rebalance/start");
1506        assert!(request.body.is_empty());
1507        handle.join().expect("server thread should finish");
1508    }
1509
1510    #[tokio::test]
1511    async fn test_rebalance_status_gets_rebalance_status_route() {
1512        let (endpoint, receiver, handle) = start_admin_test_server(
1513            "200 OK",
1514            r#"{"id":"rebalance-123","pools":[],"stoppedAt":"2026-05-06T00:00:00Z"}"#,
1515        );
1516        let client = admin_client_for_endpoint(&endpoint);
1517
1518        let status = client
1519            .rebalance_status()
1520            .await
1521            .expect("rebalance status request");
1522
1523        assert_eq!(status.id, "rebalance-123");
1524        assert_eq!(status.stopped_at.as_deref(), Some("2026-05-06T00:00:00Z"));
1525
1526        let request = receiver.recv().expect("captured request");
1527        assert_eq!(request.method, "GET");
1528        assert_eq!(request.target, "/rustfs/admin/v3/rebalance/status");
1529        assert!(request.body.is_empty());
1530        handle.join().expect("server thread should finish");
1531    }
1532
1533    #[tokio::test]
1534    async fn test_rebalance_stop_posts_rebalance_stop_route() {
1535        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1536        let client = admin_client_for_endpoint(&endpoint);
1537
1538        client
1539            .rebalance_stop()
1540            .await
1541            .expect("rebalance stop request");
1542
1543        let request = receiver.recv().expect("captured request");
1544        assert_eq!(request.method, "POST");
1545        assert_eq!(request.target, "/rustfs/admin/v3/rebalance/stop");
1546        assert!(request.body.is_empty());
1547        handle.join().expect("server thread should finish");
1548    }
1549
1550    #[test]
1551    fn test_admin_client_invalid_ca_bundle_path_surfaces_error() {
1552        let mut alias = Alias::new("test", "https://localhost:9000", "access", "secret");
1553        alias.ca_bundle = Some("/definitely-not-here/ca.pem".to_string());
1554
1555        let result = AdminClient::new(&alias);
1556        match result {
1557            Err(Error::Network(msg)) => {
1558                assert!(
1559                    msg.contains("Failed to read CA bundle"),
1560                    "Unexpected error message: {msg}"
1561                );
1562            }
1563            Ok(_) => panic!("Expected Error::Network for invalid path, got Ok(_)"),
1564            Err(e) => panic!("Expected Error::Network for invalid path, got Err({e})"),
1565        }
1566    }
1567
1568    #[test]
1569    fn test_admin_client_invalid_ca_bundle_pem_surfaces_error() {
1570        let temp_dir = tempdir().expect("create temp dir");
1571        let bad_pem_path = temp_dir.path().join("bad-ca.pem");
1572        std::fs::write(
1573            &bad_pem_path,
1574            b"-----BEGIN CERTIFICATE-----\ninvalid-base64\n-----END CERTIFICATE-----\n",
1575        )
1576        .expect("write invalid PEM");
1577
1578        let mut alias = Alias::new("test", "https://localhost:9000", "access", "secret");
1579        alias.ca_bundle = Some(bad_pem_path.display().to_string());
1580
1581        let result = AdminClient::new(&alias);
1582        match result {
1583            Err(Error::Network(msg)) => {
1584                assert!(
1585                    msg.contains("Invalid CA bundle") && msg.contains("bad-ca.pem"),
1586                    "Unexpected error message for invalid PEM CA bundle: {msg}"
1587                );
1588            }
1589            Ok(_) => panic!("Expected Error::Network for invalid PEM, got Ok(_)"),
1590            Err(e) => panic!("Expected Error::Network for invalid PEM, got Err({e})"),
1591        }
1592    }
1593}