Skip to main content

rc_s3/
admin.rs

1//! Admin API client implementation
2//!
3//! This module provides the AdminClient that implements the AdminApi trait
4//! using HTTP requests with AWS SigV4 signing.
5
6use async_trait::async_trait;
7use aws_credential_types::Credentials;
8use aws_sigv4::http_request::{
9    SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
10};
11use aws_sigv4::sign::v4;
12use rc_core::admin::{
13    AdminApi, BucketQuota, ClusterInfo, CreateServiceAccountRequest, Group, GroupStatus,
14    HealScanMode, HealStartRequest, HealStatus, Policy, PolicyEntity, PolicyInfo, ServiceAccount,
15    ServiceAccountCreateResponse, UpdateGroupMembersRequest, User, UserStatus,
16};
17use rc_core::{Alias, Error, Result};
18use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
19use reqwest::{Client, Method, StatusCode};
20use serde::{Deserialize, Serialize};
21use sha2::{Digest, Sha256};
22use std::collections::HashMap;
23use std::time::SystemTime;
24
25/// Admin API client for RustFS/MinIO-compatible servers
26pub struct AdminClient {
27    http_client: Client,
28    endpoint: String,
29    access_key: String,
30    secret_key: String,
31    region: String,
32}
33
34impl AdminClient {
35    /// Create a new AdminClient from an Alias
36    pub fn new(alias: &Alias) -> Result<Self> {
37        let mut builder = Client::builder()
38            .danger_accept_invalid_certs(alias.insecure)
39            .tls_built_in_native_certs(true)
40            .tls_built_in_webpki_certs(true);
41
42        if let Some(bundle_path) = alias.ca_bundle.as_deref() {
43            let pem = std::fs::read(bundle_path).map_err(|e| {
44                Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
45            })?;
46            let certs = reqwest::Certificate::from_pem_bundle(&pem)
47                .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
48            if certs.is_empty() {
49                return Err(Error::Network(format!(
50                    "Invalid CA bundle '{bundle_path}': no certificates found"
51                )));
52            }
53            for cert in certs {
54                builder = builder.add_root_certificate(cert);
55            }
56        }
57
58        let http_client = builder
59            .build()
60            .map_err(|e| Error::Network(format!("Failed to create HTTP client: {e}")))?;
61
62        Ok(Self {
63            http_client,
64            endpoint: alias.endpoint.trim_end_matches('/').to_string(),
65            access_key: alias.access_key.clone(),
66            secret_key: alias.secret_key.clone(),
67            region: alias.region.clone(),
68        })
69    }
70
71    /// Build the base URL for admin API
72    fn admin_url(&self, path: &str) -> String {
73        format!("{}/rustfs/admin/v3{}", self.endpoint, path)
74    }
75
76    /// Calculate SHA256 hash of the body
77    fn sha256_hash(body: &[u8]) -> String {
78        let mut hasher = Sha256::new();
79        hasher.update(body);
80        hex::encode(hasher.finalize())
81    }
82
83    /// Sign a request using AWS SigV4
84    async fn sign_request(
85        &self,
86        method: &Method,
87        url: &str,
88        headers: &HeaderMap,
89        body: &[u8],
90    ) -> Result<HeaderMap> {
91        let credentials = Credentials::new(
92            &self.access_key,
93            &self.secret_key,
94            None,
95            None,
96            "admin-client",
97        );
98
99        let identity = credentials.into();
100        let mut signing_settings = SigningSettings::default();
101        signing_settings.signature_location = SignatureLocation::Headers;
102
103        let signing_params = v4::SigningParams::builder()
104            .identity(&identity)
105            .region(&self.region)
106            .name("s3")
107            .time(SystemTime::now())
108            .settings(signing_settings)
109            .build()
110            .map_err(|e| Error::Auth(format!("Failed to build signing params: {e}")))?;
111
112        // Convert headers to a vec of tuples
113        let header_pairs: Vec<(&str, &str)> = headers
114            .iter()
115            .filter_map(|(k, v)| v.to_str().ok().map(|v| (k.as_str(), v)))
116            .collect();
117
118        let signable_body = SignableBody::Bytes(body);
119
120        let signable_request = SignableRequest::new(
121            method.as_str(),
122            url,
123            header_pairs.into_iter(),
124            signable_body,
125        )
126        .map_err(|e| Error::Auth(format!("Failed to create signable request: {e}")))?;
127
128        let (signing_instructions, _signature) = sign(signable_request, &signing_params.into())
129            .map_err(|e| Error::Auth(format!("Failed to sign request: {e}")))?
130            .into_parts();
131
132        // Apply signing instructions to create new headers
133        let mut signed_headers = headers.clone();
134        for (name, value) in signing_instructions.headers() {
135            let header_name = HeaderName::try_from(&name.to_string())
136                .map_err(|e| Error::Auth(format!("Invalid header name: {e}")))?;
137            let header_value = HeaderValue::try_from(&value.to_string())
138                .map_err(|e| Error::Auth(format!("Invalid header value: {e}")))?;
139            signed_headers.insert(header_name, header_value);
140        }
141
142        Ok(signed_headers)
143    }
144
145    /// Make a signed request to the admin API
146    async fn request<T: for<'de> Deserialize<'de>>(
147        &self,
148        method: Method,
149        path: &str,
150        query: Option<&[(&str, &str)]>,
151        body: Option<&[u8]>,
152    ) -> Result<T> {
153        let mut url = self.admin_url(path);
154
155        if let Some(q) = query {
156            let query_string: String = q
157                .iter()
158                .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
159                .collect::<Vec<_>>()
160                .join("&");
161            if !query_string.is_empty() {
162                url.push('?');
163                url.push_str(&query_string);
164            }
165        }
166
167        let body_bytes = body.unwrap_or(&[]);
168        let content_hash = Self::sha256_hash(body_bytes);
169
170        let mut headers = HeaderMap::new();
171        headers.insert("x-amz-content-sha256", content_hash.parse().unwrap());
172        headers.insert("host", self.get_host().parse().unwrap());
173
174        if !body_bytes.is_empty() {
175            headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
176        }
177
178        let signed_headers = self
179            .sign_request(&method, &url, &headers, body_bytes)
180            .await?;
181
182        let mut request_builder = self.http_client.request(method.clone(), &url);
183
184        for (name, value) in signed_headers.iter() {
185            request_builder = request_builder.header(name, value);
186        }
187
188        if !body_bytes.is_empty() {
189            request_builder = request_builder.body(body_bytes.to_vec());
190        }
191
192        let response = request_builder
193            .send()
194            .await
195            .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
196
197        let status = response.status();
198
199        if !status.is_success() {
200            let error_body = response
201                .text()
202                .await
203                .unwrap_or_else(|_| "Unknown error".to_string());
204            return Err(self.map_error(status, &error_body));
205        }
206
207        let text = response
208            .text()
209            .await
210            .map_err(|e| Error::Network(format!("Failed to read response: {e}")))?;
211
212        if text.is_empty() {
213            // Return empty/default for empty responses
214            serde_json::from_str("null").map_err(Error::Json)
215        } else {
216            serde_json::from_str(&text).map_err(Error::Json)
217        }
218    }
219
220    /// Make a signed request that returns no body
221    async fn request_no_response(
222        &self,
223        method: Method,
224        path: &str,
225        query: Option<&[(&str, &str)]>,
226        body: Option<&[u8]>,
227    ) -> Result<()> {
228        let mut url = self.admin_url(path);
229
230        if let Some(q) = query {
231            let query_string: String = q
232                .iter()
233                .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
234                .collect::<Vec<_>>()
235                .join("&");
236            if !query_string.is_empty() {
237                url.push('?');
238                url.push_str(&query_string);
239            }
240        }
241
242        let body_bytes = body.unwrap_or(&[]);
243        let content_hash = Self::sha256_hash(body_bytes);
244
245        let mut headers = HeaderMap::new();
246        headers.insert("x-amz-content-sha256", content_hash.parse().unwrap());
247        headers.insert("host", self.get_host().parse().unwrap());
248
249        if !body_bytes.is_empty() {
250            headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
251        }
252
253        let signed_headers = self
254            .sign_request(&method, &url, &headers, body_bytes)
255            .await?;
256
257        let mut request_builder = self.http_client.request(method.clone(), &url);
258
259        for (name, value) in signed_headers.iter() {
260            request_builder = request_builder.header(name, value);
261        }
262
263        if !body_bytes.is_empty() {
264            request_builder = request_builder.body(body_bytes.to_vec());
265        }
266
267        let response = request_builder
268            .send()
269            .await
270            .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
271
272        let status = response.status();
273
274        if !status.is_success() {
275            let error_body = response
276                .text()
277                .await
278                .unwrap_or_else(|_| "Unknown error".to_string());
279            return Err(self.map_error(status, &error_body));
280        }
281
282        Ok(())
283    }
284
285    /// Extract host from endpoint
286    fn get_host(&self) -> String {
287        self.endpoint
288            .trim_start_matches("http://")
289            .trim_start_matches("https://")
290            .to_string()
291    }
292
293    /// Map HTTP status codes to appropriate errors
294    fn map_error(&self, status: StatusCode, body: &str) -> Error {
295        match status {
296            StatusCode::NOT_FOUND => Error::NotFound(body.to_string()),
297            StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => Error::Auth(body.to_string()),
298            StatusCode::CONFLICT => Error::Conflict(body.to_string()),
299            StatusCode::BAD_REQUEST => Error::General(format!("Bad request: {body}")),
300            _ => Error::Network(format!("HTTP {}: {}", status.as_u16(), body)),
301        }
302    }
303}
304
305/// Response wrapper for user list
306#[derive(Debug, Deserialize)]
307struct UserListResponse(HashMap<String, UserInfo>);
308
309#[derive(Debug, Deserialize)]
310#[serde(rename_all = "camelCase")]
311struct UserInfo {
312    #[serde(default)]
313    status: String,
314    #[serde(default)]
315    policy_name: Option<String>,
316    #[serde(default)]
317    member_of: Option<Vec<String>>,
318}
319
320/// Response wrapper for policy list
321#[derive(Debug, Deserialize)]
322struct PolicyListResponse(HashMap<String, serde_json::Value>);
323
324/// Request body for creating a user
325#[derive(Debug, Serialize)]
326#[serde(rename_all = "camelCase")]
327struct CreateUserRequest {
328    secret_key: String,
329    status: String,
330}
331
332/// Request body for creating a group
333#[derive(Debug, Serialize)]
334struct CreateGroupRequest {
335    group: String,
336    #[serde(skip_serializing_if = "Option::is_none")]
337    members: Option<Vec<String>>,
338}
339
340/// Response for service account list
341#[derive(Debug, Deserialize)]
342struct ServiceAccountListResponse {
343    accounts: Option<Vec<ServiceAccountInfo>>,
344}
345
346#[derive(Debug, Deserialize)]
347#[serde(rename_all = "camelCase")]
348struct ServiceAccountInfo {
349    access_key: String,
350    #[serde(default)]
351    parent_user: Option<String>,
352    #[serde(default)]
353    account_status: Option<String>,
354    #[serde(default)]
355    expiration: Option<String>,
356    #[serde(default)]
357    name: Option<String>,
358    #[serde(default)]
359    description: Option<String>,
360    #[serde(default)]
361    implied_policy: Option<bool>,
362}
363
364#[derive(Debug, Deserialize)]
365#[serde(rename_all = "camelCase")]
366struct BackgroundHealStatusResponse {
367    #[serde(default)]
368    bitrot_start_time: Option<String>,
369}
370
371impl From<BackgroundHealStatusResponse> for HealStatus {
372    fn from(response: BackgroundHealStatusResponse) -> Self {
373        Self {
374            healing: response.bitrot_start_time.is_some(),
375            started: response.bitrot_start_time,
376            ..Default::default()
377        }
378    }
379}
380
381#[derive(Debug, Serialize)]
382struct RustfsHealOptions {
383    recursive: bool,
384    #[serde(rename = "dryRun")]
385    dry_run: bool,
386    remove: bool,
387    recreate: bool,
388    #[serde(rename = "scanMode")]
389    scan_mode: u8,
390    #[serde(rename = "updateParity")]
391    update_parity: bool,
392    #[serde(rename = "nolock")]
393    no_lock: bool,
394}
395
396impl From<&HealStartRequest> for RustfsHealOptions {
397    fn from(request: &HealStartRequest) -> Self {
398        Self {
399            recursive: false,
400            dry_run: request.dry_run,
401            remove: request.remove,
402            recreate: request.recreate,
403            scan_mode: rustfs_heal_scan_mode(request.scan_mode),
404            update_parity: false,
405            no_lock: false,
406        }
407    }
408}
409
410fn rustfs_heal_scan_mode(scan_mode: HealScanMode) -> u8 {
411    match scan_mode {
412        HealScanMode::Normal => 1,
413        HealScanMode::Deep => 2,
414    }
415}
416
417fn rustfs_heal_path(request: &HealStartRequest) -> Result<String> {
418    let bucket = request
419        .bucket
420        .as_deref()
421        .filter(|bucket| !bucket.is_empty());
422    let prefix = request
423        .prefix
424        .as_deref()
425        .filter(|prefix| !prefix.is_empty());
426
427    match (bucket, prefix) {
428        (None, None) => Ok("/heal/".to_string()),
429        (Some(bucket), None) => Ok(format!("/heal/{}", urlencoding::encode(bucket))),
430        (Some(bucket), Some(prefix)) => Ok(format!(
431            "/heal/{}/{}",
432            urlencoding::encode(bucket),
433            urlencoding::encode(prefix)
434        )),
435        (None, Some(_)) => Err(Error::InvalidPath(
436            "heal prefix requires a bucket target".to_string(),
437        )),
438    }
439}
440
441fn rustfs_heal_body(request: &HealStartRequest) -> Result<Vec<u8>> {
442    serde_json::to_vec(&RustfsHealOptions::from(request)).map_err(Error::Json)
443}
444
445/// Request body for setting bucket quota
446#[derive(Debug, Serialize)]
447#[serde(rename_all = "camelCase")]
448struct SetBucketQuotaApiRequest {
449    quota: u64,
450    quota_type: String,
451}
452
453#[async_trait]
454impl AdminApi for AdminClient {
455    // ==================== Cluster Operations ====================
456
457    async fn cluster_info(&self) -> Result<ClusterInfo> {
458        self.request(Method::GET, "/info", None, None).await
459    }
460
461    async fn heal_status(&self) -> Result<HealStatus> {
462        let response: BackgroundHealStatusResponse = self
463            .request(Method::POST, "/background-heal/status", None, None)
464            .await?;
465        Ok(response.into())
466    }
467
468    async fn heal_start(&self, request: HealStartRequest) -> Result<HealStatus> {
469        let path = rustfs_heal_path(&request)?;
470        let body = rustfs_heal_body(&request)?;
471        self.request_no_response(Method::POST, &path, None, Some(&body))
472            .await?;
473        Ok(HealStatus::default())
474    }
475
476    async fn heal_stop(&self) -> Result<()> {
477        let body = rustfs_heal_body(&HealStartRequest::default())?;
478        self.request_no_response(
479            Method::POST,
480            "/heal/",
481            Some(&[("forceStop", "true")]),
482            Some(&body),
483        )
484        .await
485    }
486
487    // ==================== User Operations ====================
488
489    async fn list_users(&self) -> Result<Vec<User>> {
490        let response: UserListResponse =
491            self.request(Method::GET, "/list-users", None, None).await?;
492
493        Ok(response
494            .0
495            .into_iter()
496            .map(|(access_key, info)| User {
497                access_key,
498                secret_key: None,
499                status: if info.status == "disabled" {
500                    UserStatus::Disabled
501                } else {
502                    UserStatus::Enabled
503                },
504                policy_name: info.policy_name,
505                member_of: info.member_of.unwrap_or_default(),
506            })
507            .collect())
508    }
509
510    async fn get_user(&self, access_key: &str) -> Result<User> {
511        let query = [("accessKey", access_key)];
512        let response: UserInfo = self
513            .request(Method::GET, "/user-info", Some(&query), None)
514            .await?;
515
516        Ok(User {
517            access_key: access_key.to_string(),
518            secret_key: None,
519            status: if response.status == "disabled" {
520                UserStatus::Disabled
521            } else {
522                UserStatus::Enabled
523            },
524            policy_name: response.policy_name,
525            member_of: response.member_of.unwrap_or_default(),
526        })
527    }
528
529    async fn create_user(&self, access_key: &str, secret_key: &str) -> Result<User> {
530        let query = [("accessKey", access_key)];
531        let body = serde_json::to_vec(&CreateUserRequest {
532            secret_key: secret_key.to_string(),
533            status: "enabled".to_string(),
534        })
535        .map_err(Error::Json)?;
536
537        self.request_no_response(Method::PUT, "/add-user", Some(&query), Some(&body))
538            .await?;
539
540        Ok(User {
541            access_key: access_key.to_string(),
542            secret_key: Some(secret_key.to_string()),
543            status: UserStatus::Enabled,
544            policy_name: None,
545            member_of: vec![],
546        })
547    }
548
549    async fn delete_user(&self, access_key: &str) -> Result<()> {
550        let query = [("accessKey", access_key)];
551        self.request_no_response(Method::DELETE, "/remove-user", Some(&query), None)
552            .await
553    }
554
555    async fn set_user_status(&self, access_key: &str, status: UserStatus) -> Result<()> {
556        let status_str = match status {
557            UserStatus::Enabled => "enabled",
558            UserStatus::Disabled => "disabled",
559        };
560        let query = [("accessKey", access_key), ("status", status_str)];
561        self.request_no_response(Method::PUT, "/set-user-status", Some(&query), None)
562            .await
563    }
564
565    // ==================== Policy Operations ====================
566
567    async fn list_policies(&self) -> Result<Vec<PolicyInfo>> {
568        let response: PolicyListResponse = self
569            .request(Method::GET, "/list-canned-policies", None, None)
570            .await?;
571
572        Ok(response
573            .0
574            .into_keys()
575            .map(|name| PolicyInfo { name })
576            .collect())
577    }
578
579    async fn get_policy(&self, name: &str) -> Result<Policy> {
580        let query = [("name", name)];
581        let policy_doc: serde_json::Value = self
582            .request(Method::GET, "/info-canned-policy", Some(&query), None)
583            .await?;
584
585        Ok(Policy {
586            name: name.to_string(),
587            policy: serde_json::to_string_pretty(&policy_doc).unwrap_or_default(),
588        })
589    }
590
591    async fn create_policy(&self, name: &str, policy_document: &str) -> Result<()> {
592        let query = [("name", name)];
593        let body = policy_document.as_bytes();
594        self.request_no_response(Method::PUT, "/add-canned-policy", Some(&query), Some(body))
595            .await
596    }
597
598    async fn delete_policy(&self, name: &str) -> Result<()> {
599        let query = [("name", name)];
600        self.request_no_response(Method::DELETE, "/remove-canned-policy", Some(&query), None)
601            .await
602    }
603
604    async fn attach_policy(
605        &self,
606        policy_names: &[String],
607        entity_type: PolicyEntity,
608        entity_name: &str,
609    ) -> Result<()> {
610        let policy_name = policy_names.join(",");
611        let is_group = entity_type == PolicyEntity::Group;
612
613        let query = [
614            ("policyName", policy_name.as_str()),
615            ("userOrGroup", entity_name),
616            ("isGroup", if is_group { "true" } else { "false" }),
617        ];
618
619        self.request_no_response(Method::PUT, "/set-user-or-group-policy", Some(&query), None)
620            .await
621    }
622
623    async fn detach_policy(
624        &self,
625        policy_names: &[String],
626        entity_type: PolicyEntity,
627        entity_name: &str,
628    ) -> Result<()> {
629        // Detach by setting empty policy
630        // In RustFS/MinIO, you typically set a new policy which replaces the old one
631        // For detach, we need to get current policies and remove the specified ones
632        let _ = (policy_names, entity_type, entity_name);
633        Err(Error::UnsupportedFeature(
634            "Policy detach not directly supported. Use attach with remaining policies instead."
635                .to_string(),
636        ))
637    }
638
639    // ==================== Group Operations ====================
640
641    async fn list_groups(&self) -> Result<Vec<String>> {
642        let response: Vec<String> = self.request(Method::GET, "/groups", None, None).await?;
643        Ok(response)
644    }
645
646    async fn get_group(&self, name: &str) -> Result<Group> {
647        let query = [("group", name)];
648        let response: Group = self
649            .request(Method::GET, "/group", Some(&query), None)
650            .await?;
651        Ok(response)
652    }
653
654    async fn create_group(&self, name: &str, members: Option<&[String]>) -> Result<Group> {
655        let body = serde_json::to_vec(&CreateGroupRequest {
656            group: name.to_string(),
657            members: members.map(|m| m.to_vec()),
658        })
659        .map_err(Error::Json)?;
660
661        self.request_no_response(Method::POST, "/groups", None, Some(&body))
662            .await?;
663
664        Ok(Group {
665            name: name.to_string(),
666            policy: None,
667            members: members.map(|m| m.to_vec()).unwrap_or_default(),
668            status: GroupStatus::Enabled,
669        })
670    }
671
672    async fn delete_group(&self, name: &str) -> Result<()> {
673        let path = format!("/group/{}", urlencoding::encode(name));
674        self.request_no_response(Method::DELETE, &path, None, None)
675            .await
676    }
677
678    async fn set_group_status(&self, name: &str, status: GroupStatus) -> Result<()> {
679        let status_str = match status {
680            GroupStatus::Enabled => "enabled",
681            GroupStatus::Disabled => "disabled",
682        };
683        let query = [("group", name), ("status", status_str)];
684        self.request_no_response(Method::PUT, "/set-group-status", Some(&query), None)
685            .await
686    }
687
688    async fn add_group_members(&self, group: &str, members: &[String]) -> Result<()> {
689        let body = serde_json::to_vec(&UpdateGroupMembersRequest {
690            group: group.to_string(),
691            members: members.to_vec(),
692            is_remove: false,
693            status: "enabled".to_string(),
694        })
695        .map_err(Error::Json)?;
696
697        self.request_no_response(Method::PUT, "/update-group-members", None, Some(&body))
698            .await
699    }
700
701    async fn remove_group_members(&self, group: &str, members: &[String]) -> Result<()> {
702        let body = serde_json::to_vec(&UpdateGroupMembersRequest {
703            group: group.to_string(),
704            members: members.to_vec(),
705            is_remove: true,
706            status: "enabled".to_string(),
707        })
708        .map_err(Error::Json)?;
709
710        self.request_no_response(Method::PUT, "/update-group-members", None, Some(&body))
711            .await
712    }
713
714    // ==================== Service Account Operations ====================
715
716    async fn list_service_accounts(&self, user: Option<&str>) -> Result<Vec<ServiceAccount>> {
717        let query: Vec<(&str, &str)> = user.map(|u| vec![("user", u)]).unwrap_or_default();
718        let query_ref: Option<&[(&str, &str)]> = if query.is_empty() { None } else { Some(&query) };
719
720        let response: ServiceAccountListResponse = self
721            .request(Method::GET, "/list-service-accounts", query_ref, None)
722            .await?;
723
724        Ok(response
725            .accounts
726            .unwrap_or_default()
727            .into_iter()
728            .map(|sa| ServiceAccount {
729                access_key: sa.access_key,
730                secret_key: None,
731                parent_user: sa.parent_user,
732                policy: None,
733                account_status: sa.account_status,
734                expiration: sa.expiration,
735                name: sa.name,
736                description: sa.description,
737                implied_policy: sa.implied_policy,
738            })
739            .collect())
740    }
741
742    async fn get_service_account(&self, access_key: &str) -> Result<ServiceAccount> {
743        let query = [("accessKey", access_key)];
744        let response: ServiceAccount = self
745            .request(Method::GET, "/info-service-account", Some(&query), None)
746            .await?;
747
748        let mut response = response;
749        if response.access_key.is_empty() {
750            response.access_key = access_key.to_string();
751        }
752        Ok(response)
753    }
754
755    async fn create_service_account(
756        &self,
757        request: CreateServiceAccountRequest,
758    ) -> Result<ServiceAccount> {
759        let body = serde_json::to_vec(&request).map_err(Error::Json)?;
760        let response: ServiceAccountCreateResponse = self
761            .request(Method::PUT, "/add-service-accounts", None, Some(&body))
762            .await?;
763
764        Ok(ServiceAccount {
765            access_key: response.credentials.access_key,
766            secret_key: Some(response.credentials.secret_key),
767            expiration: response.credentials.expiration,
768            parent_user: None,
769            policy: None,
770            account_status: None,
771            name: None,
772            description: None,
773            implied_policy: None,
774        })
775    }
776
777    async fn delete_service_account(&self, access_key: &str) -> Result<()> {
778        let query = [("accessKey", access_key)];
779        self.request_no_response(
780            Method::DELETE,
781            "/delete-service-accounts",
782            Some(&query),
783            None,
784        )
785        .await
786    }
787
788    // ==================== Bucket Quota Operations ====================
789
790    async fn set_bucket_quota(&self, bucket: &str, quota: u64) -> Result<BucketQuota> {
791        let path = format!("/quota/{}", urlencoding::encode(bucket));
792        let body = serde_json::to_vec(&SetBucketQuotaApiRequest {
793            quota,
794            quota_type: "HARD".to_string(),
795        })
796        .map_err(Error::Json)?;
797
798        self.request(Method::PUT, &path, None, Some(&body)).await
799    }
800
801    async fn get_bucket_quota(&self, bucket: &str) -> Result<BucketQuota> {
802        let path = format!("/quota/{}", urlencoding::encode(bucket));
803        self.request(Method::GET, &path, None, None).await
804    }
805
806    async fn clear_bucket_quota(&self, bucket: &str) -> Result<BucketQuota> {
807        let path = format!("/quota/{}", urlencoding::encode(bucket));
808        self.request(Method::DELETE, &path, None, None).await
809    }
810
811    // ==================== Tier Operations ====================
812
813    async fn list_tiers(&self) -> Result<Vec<rc_core::admin::TierConfig>> {
814        self.request(Method::GET, "/tier", None, None).await
815    }
816
817    async fn tier_stats(&self) -> Result<serde_json::Value> {
818        self.request(Method::GET, "/tier-stats", None, None).await
819    }
820
821    async fn add_tier(&self, config: rc_core::admin::TierConfig) -> Result<()> {
822        let body = serde_json::to_vec(&config).map_err(Error::Json)?;
823        self.request_no_response(Method::PUT, "/tier", None, Some(&body))
824            .await
825    }
826
827    async fn edit_tier(&self, name: &str, creds: rc_core::admin::TierCreds) -> Result<()> {
828        let path = format!("/tier/{}", urlencoding::encode(name));
829        let body = serde_json::to_vec(&creds).map_err(Error::Json)?;
830        self.request_no_response(Method::POST, &path, None, Some(&body))
831            .await
832    }
833
834    async fn remove_tier(&self, name: &str, force: bool) -> Result<()> {
835        let path = format!("/tier/{}", urlencoding::encode(name));
836        if force {
837            let query: &[(&str, &str)] = &[("force", "true")];
838            self.request_no_response(Method::DELETE, &path, Some(query), None)
839                .await
840        } else {
841            self.request_no_response(Method::DELETE, &path, None, None)
842                .await
843        }
844    }
845
846    // ==================== Replication Target Operations ====================
847
848    async fn set_remote_target(
849        &self,
850        bucket: &str,
851        target: rc_core::replication::BucketTarget,
852        update: bool,
853    ) -> Result<String> {
854        let body = serde_json::to_vec(&target).map_err(Error::Json)?;
855        if update {
856            let query: &[(&str, &str)] = &[("bucket", bucket), ("update", "true")];
857            self.request(Method::PUT, "/set-remote-target", Some(query), Some(&body))
858                .await
859        } else {
860            let query: &[(&str, &str)] = &[("bucket", bucket)];
861            self.request(Method::PUT, "/set-remote-target", Some(query), Some(&body))
862                .await
863        }
864    }
865
866    async fn list_remote_targets(
867        &self,
868        bucket: &str,
869    ) -> Result<Vec<rc_core::replication::BucketTarget>> {
870        let query: &[(&str, &str)] = &[("bucket", bucket)];
871        self.request(Method::GET, "/list-remote-targets", Some(query), None)
872            .await
873    }
874
875    async fn remove_remote_target(&self, bucket: &str, arn: &str) -> Result<()> {
876        let query: &[(&str, &str)] = &[("bucket", bucket), ("arn", arn)];
877        self.request_no_response(Method::DELETE, "/remove-remote-target", Some(query), None)
878            .await
879    }
880
881    async fn replication_metrics(&self, bucket: &str) -> Result<serde_json::Value> {
882        let query: &[(&str, &str)] = &[("bucket", bucket)];
883        self.request(Method::GET, "/replicationmetrics", Some(query), None)
884            .await
885    }
886}
887
888#[cfg(test)]
889mod tests {
890    use super::*;
891    use std::io::{Read, Write};
892    use std::net::{TcpListener, TcpStream};
893    use std::sync::mpsc;
894    use std::thread;
895    use tempfile::tempdir;
896
897    #[derive(Debug)]
898    struct CapturedAdminRequest {
899        method: String,
900        target: String,
901        body: Vec<u8>,
902    }
903
904    fn read_admin_request(stream: &mut TcpStream) -> CapturedAdminRequest {
905        let mut buffer = Vec::new();
906        let mut chunk = [0_u8; 1024];
907        let header_end = loop {
908            let read = stream.read(&mut chunk).expect("read HTTP request");
909            assert!(read > 0, "client closed connection before headers");
910            buffer.extend_from_slice(&chunk[..read]);
911
912            if let Some(position) = buffer.windows(4).position(|window| window == b"\r\n\r\n") {
913                break position + 4;
914            }
915        };
916
917        let headers = String::from_utf8_lossy(&buffer[..header_end]).into_owned();
918        let content_length = headers
919            .lines()
920            .find_map(|line| {
921                let (name, value) = line.split_once(':')?;
922                name.eq_ignore_ascii_case("content-length")
923                    .then(|| value.trim().parse::<usize>().expect("valid content length"))
924            })
925            .unwrap_or(0);
926
927        while buffer.len() - header_end < content_length {
928            let read = stream.read(&mut chunk).expect("read HTTP request body");
929            assert!(read > 0, "client closed connection before body");
930            buffer.extend_from_slice(&chunk[..read]);
931        }
932
933        let request_line = headers.lines().next().expect("request line");
934        let mut parts = request_line.split_whitespace();
935        let method = parts.next().expect("request method").to_string();
936        let target = parts.next().expect("request target").to_string();
937        let body = buffer[header_end..header_end + content_length].to_vec();
938
939        CapturedAdminRequest {
940            method,
941            target,
942            body,
943        }
944    }
945
946    fn start_admin_test_server(
947        response_status: &str,
948        response_body: &'static str,
949    ) -> (
950        String,
951        mpsc::Receiver<CapturedAdminRequest>,
952        thread::JoinHandle<()>,
953    ) {
954        let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
955        let endpoint = format!("http://{}", listener.local_addr().expect("local addr"));
956        let (sender, receiver) = mpsc::channel();
957        let response_status = response_status.to_string();
958
959        let handle = thread::spawn(move || {
960            let (mut stream, _) = listener.accept().expect("accept request");
961            let request = read_admin_request(&mut stream);
962            sender.send(request).expect("send captured request");
963
964            let response = format!(
965                "HTTP/1.1 {response_status}\r\ncontent-length: {}\r\ncontent-type: application/json\r\nconnection: close\r\n\r\n{response_body}",
966                response_body.len()
967            );
968            stream
969                .write_all(response.as_bytes())
970                .expect("write HTTP response");
971        });
972
973        (endpoint, receiver, handle)
974    }
975
976    fn admin_client_for_endpoint(endpoint: &str) -> AdminClient {
977        let alias = Alias::new("test", endpoint, "access", "secret");
978        AdminClient::new(&alias).expect("admin client should build")
979    }
980
981    fn assert_heal_options_body(
982        body: &[u8],
983        scan_mode: u8,
984        remove: bool,
985        recreate: bool,
986        dry_run: bool,
987    ) {
988        let value: serde_json::Value =
989            serde_json::from_slice(body).expect("heal request body should be JSON");
990
991        assert_eq!(value["recursive"], false);
992        assert_eq!(value["dryRun"], dry_run);
993        assert_eq!(value["remove"], remove);
994        assert_eq!(value["recreate"], recreate);
995        assert_eq!(value["scanMode"], scan_mode);
996        assert_eq!(value["updateParity"], false);
997        assert_eq!(value["nolock"], false);
998        assert!(value.get("bucket").is_none());
999        assert!(value.get("prefix").is_none());
1000    }
1001
1002    #[test]
1003    fn test_admin_url_construction() {
1004        let alias = Alias::new("test", "http://localhost:9000", "access", "secret");
1005        let client = AdminClient::new(&alias).unwrap();
1006
1007        assert_eq!(
1008            client.admin_url("/list-users"),
1009            "http://localhost:9000/rustfs/admin/v3/list-users"
1010        );
1011    }
1012
1013    #[test]
1014    fn test_admin_url_with_trailing_slash() {
1015        let alias = Alias::new("test", "http://localhost:9000/", "access", "secret");
1016        let client = AdminClient::new(&alias).unwrap();
1017
1018        assert_eq!(
1019            client.admin_url("/list-users"),
1020            "http://localhost:9000/rustfs/admin/v3/list-users"
1021        );
1022    }
1023
1024    #[test]
1025    fn test_get_host() {
1026        let alias = Alias::new("test", "https://s3.example.com", "access", "secret");
1027        let client = AdminClient::new(&alias).unwrap();
1028
1029        assert_eq!(client.get_host(), "s3.example.com");
1030    }
1031
1032    #[test]
1033    fn test_sha256_hash() {
1034        let hash = AdminClient::sha256_hash(b"test");
1035        assert_eq!(
1036            hash,
1037            "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
1038        );
1039    }
1040
1041    #[test]
1042    fn test_sha256_hash_empty() {
1043        let hash = AdminClient::sha256_hash(b"");
1044        assert_eq!(
1045            hash,
1046            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
1047        );
1048    }
1049
1050    #[test]
1051    fn test_rustfs_heal_path_matches_admin_routes() {
1052        assert_eq!(
1053            rustfs_heal_path(&HealStartRequest::default()).expect("root path"),
1054            "/heal/"
1055        );
1056
1057        let bucket_request = HealStartRequest {
1058            bucket: Some("photos".to_string()),
1059            ..Default::default()
1060        };
1061        assert_eq!(
1062            rustfs_heal_path(&bucket_request).expect("bucket path"),
1063            "/heal/photos"
1064        );
1065
1066        let prefix_request = HealStartRequest {
1067            bucket: Some("photos".to_string()),
1068            prefix: Some("2026/raw".to_string()),
1069            ..Default::default()
1070        };
1071        assert_eq!(
1072            rustfs_heal_path(&prefix_request).expect("prefix path"),
1073            "/heal/photos/2026%2Fraw"
1074        );
1075
1076        let invalid_request = HealStartRequest {
1077            prefix: Some("2026/raw".to_string()),
1078            ..Default::default()
1079        };
1080        assert!(matches!(
1081            rustfs_heal_path(&invalid_request),
1082            Err(Error::InvalidPath(_))
1083        ));
1084    }
1085
1086    #[test]
1087    fn test_rustfs_heal_body_matches_server_heal_options() {
1088        let request = HealStartRequest {
1089            scan_mode: HealScanMode::Deep,
1090            remove: true,
1091            recreate: true,
1092            dry_run: true,
1093            ..Default::default()
1094        };
1095
1096        let body = rustfs_heal_body(&request).expect("heal options should serialize");
1097        let value: serde_json::Value =
1098            serde_json::from_slice(&body).expect("heal options body should be JSON");
1099
1100        assert_eq!(value["recursive"], false);
1101        assert_eq!(value["dryRun"], true);
1102        assert_eq!(value["remove"], true);
1103        assert_eq!(value["recreate"], true);
1104        assert_eq!(value["scanMode"], 2);
1105        assert_eq!(value["updateParity"], false);
1106        assert_eq!(value["nolock"], false);
1107        assert!(value.get("bucket").is_none());
1108        assert!(value.get("prefix").is_none());
1109    }
1110
1111    #[test]
1112    fn test_background_heal_status_response_maps_to_heal_status() {
1113        let status = HealStatus::from(BackgroundHealStatusResponse {
1114            bitrot_start_time: Some("2026-04-19T10:00:00Z".to_string()),
1115        });
1116
1117        assert!(status.healing);
1118        assert_eq!(status.started.as_deref(), Some("2026-04-19T10:00:00Z"));
1119
1120        let idle = HealStatus::from(BackgroundHealStatusResponse {
1121            bitrot_start_time: None,
1122        });
1123        assert!(!idle.healing);
1124        assert!(idle.started.is_none());
1125    }
1126
1127    #[test]
1128    fn test_bad_request_maps_to_general_admin_error() {
1129        let alias = Alias::new("test", "http://localhost:9000", "access", "secret");
1130        let client = AdminClient::new(&alias).expect("admin client should build");
1131
1132        let error = client.map_error(StatusCode::BAD_REQUEST, "err request body parse");
1133        assert!(matches!(error, Error::General(_)));
1134        assert_eq!(error.to_string(), "Bad request: err request body parse");
1135    }
1136
1137    #[tokio::test]
1138    async fn test_heal_status_uses_background_heal_status_endpoint() {
1139        let (endpoint, receiver, handle) =
1140            start_admin_test_server("200 OK", r#"{"bitrotStartTime":"2026-04-19T10:00:00Z"}"#);
1141        let client = admin_client_for_endpoint(&endpoint);
1142
1143        let status = client.heal_status().await.expect("heal status request");
1144
1145        assert!(status.healing);
1146        assert_eq!(status.started.as_deref(), Some("2026-04-19T10:00:00Z"));
1147
1148        let request = receiver.recv().expect("captured request");
1149        assert_eq!(request.method, "POST");
1150        assert_eq!(request.target, "/rustfs/admin/v3/background-heal/status");
1151        assert!(request.body.is_empty());
1152        handle.join().expect("server thread should finish");
1153    }
1154
1155    #[tokio::test]
1156    async fn test_heal_start_posts_to_bucket_prefix_route_with_options_body() {
1157        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1158        let client = admin_client_for_endpoint(&endpoint);
1159        let request = HealStartRequest {
1160            bucket: Some("raw photos".to_string()),
1161            prefix: Some("2026/april".to_string()),
1162            scan_mode: HealScanMode::Deep,
1163            remove: true,
1164            recreate: true,
1165            dry_run: true,
1166        };
1167
1168        let status = client
1169            .heal_start(request)
1170            .await
1171            .expect("heal start request");
1172
1173        assert!(!status.healing);
1174        assert!(status.heal_id.is_empty());
1175        assert!(status.started.is_none());
1176
1177        let request = receiver.recv().expect("captured request");
1178        assert_eq!(request.method, "POST");
1179        assert_eq!(
1180            request.target,
1181            "/rustfs/admin/v3/heal/raw%20photos/2026%2Fapril"
1182        );
1183        assert_heal_options_body(&request.body, 2, true, true, true);
1184        handle.join().expect("server thread should finish");
1185    }
1186
1187    #[tokio::test]
1188    async fn test_heal_stop_posts_force_stop_to_root_heal_route() {
1189        let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1190        let client = admin_client_for_endpoint(&endpoint);
1191
1192        client.heal_stop().await.expect("heal stop request");
1193
1194        let request = receiver.recv().expect("captured request");
1195        assert_eq!(request.method, "POST");
1196        assert_eq!(request.target, "/rustfs/admin/v3/heal/?forceStop=true");
1197        assert_heal_options_body(&request.body, 1, false, false, false);
1198        handle.join().expect("server thread should finish");
1199    }
1200
1201    #[test]
1202    fn test_admin_client_invalid_ca_bundle_path_surfaces_error() {
1203        let mut alias = Alias::new("test", "https://localhost:9000", "access", "secret");
1204        alias.ca_bundle = Some("/definitely-not-here/ca.pem".to_string());
1205
1206        let result = AdminClient::new(&alias);
1207        match result {
1208            Err(Error::Network(msg)) => {
1209                assert!(
1210                    msg.contains("Failed to read CA bundle"),
1211                    "Unexpected error message: {msg}"
1212                );
1213            }
1214            Ok(_) => panic!("Expected Error::Network for invalid path, got Ok(_)"),
1215            Err(e) => panic!("Expected Error::Network for invalid path, got Err({e})"),
1216        }
1217    }
1218
1219    #[test]
1220    fn test_admin_client_invalid_ca_bundle_pem_surfaces_error() {
1221        let temp_dir = tempdir().expect("create temp dir");
1222        let bad_pem_path = temp_dir.path().join("bad-ca.pem");
1223        std::fs::write(
1224            &bad_pem_path,
1225            b"-----BEGIN CERTIFICATE-----\ninvalid-base64\n-----END CERTIFICATE-----\n",
1226        )
1227        .expect("write invalid PEM");
1228
1229        let mut alias = Alias::new("test", "https://localhost:9000", "access", "secret");
1230        alias.ca_bundle = Some(bad_pem_path.display().to_string());
1231
1232        let result = AdminClient::new(&alias);
1233        match result {
1234            Err(Error::Network(msg)) => {
1235                assert!(
1236                    msg.contains("Invalid CA bundle") && msg.contains("bad-ca.pem"),
1237                    "Unexpected error message for invalid PEM CA bundle: {msg}"
1238                );
1239            }
1240            Ok(_) => panic!("Expected Error::Network for invalid PEM, got Ok(_)"),
1241            Err(e) => panic!("Expected Error::Network for invalid PEM, got Err({e})"),
1242        }
1243    }
1244}