1use async_trait::async_trait;
7use aws_credential_types::Credentials;
8use aws_sigv4::http_request::{
9 SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
10};
11use aws_sigv4::sign::v4;
12use rc_core::admin::{
13 AdminApi, BucketQuota, ClusterInfo, CreateServiceAccountRequest, Group, GroupStatus,
14 HealScanMode, HealStartRequest, HealStatus, Policy, PolicyEntity, PolicyInfo, ServiceAccount,
15 ServiceAccountCreateResponse, UpdateGroupMembersRequest, User, UserStatus,
16};
17use rc_core::{Alias, Error, Result};
18use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
19use reqwest::{Client, Method, StatusCode};
20use serde::{Deserialize, Serialize};
21use sha2::{Digest, Sha256};
22use std::collections::HashMap;
23use std::time::SystemTime;
24
25pub struct AdminClient {
27 http_client: Client,
28 endpoint: String,
29 access_key: String,
30 secret_key: String,
31 region: String,
32}
33
34impl AdminClient {
35 pub fn new(alias: &Alias) -> Result<Self> {
37 let mut builder = Client::builder()
38 .danger_accept_invalid_certs(alias.insecure)
39 .tls_built_in_native_certs(true)
40 .tls_built_in_webpki_certs(true);
41
42 if let Some(bundle_path) = alias.ca_bundle.as_deref() {
43 let pem = std::fs::read(bundle_path).map_err(|e| {
44 Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
45 })?;
46 let certs = reqwest::Certificate::from_pem_bundle(&pem)
47 .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
48 if certs.is_empty() {
49 return Err(Error::Network(format!(
50 "Invalid CA bundle '{bundle_path}': no certificates found"
51 )));
52 }
53 for cert in certs {
54 builder = builder.add_root_certificate(cert);
55 }
56 }
57
58 let http_client = builder
59 .build()
60 .map_err(|e| Error::Network(format!("Failed to create HTTP client: {e}")))?;
61
62 Ok(Self {
63 http_client,
64 endpoint: alias.endpoint.trim_end_matches('/').to_string(),
65 access_key: alias.access_key.clone(),
66 secret_key: alias.secret_key.clone(),
67 region: alias.region.clone(),
68 })
69 }
70
71 fn admin_url(&self, path: &str) -> String {
73 format!("{}/rustfs/admin/v3{}", self.endpoint, path)
74 }
75
76 fn sha256_hash(body: &[u8]) -> String {
78 let mut hasher = Sha256::new();
79 hasher.update(body);
80 hex::encode(hasher.finalize())
81 }
82
83 async fn sign_request(
85 &self,
86 method: &Method,
87 url: &str,
88 headers: &HeaderMap,
89 body: &[u8],
90 ) -> Result<HeaderMap> {
91 let credentials = Credentials::new(
92 &self.access_key,
93 &self.secret_key,
94 None,
95 None,
96 "admin-client",
97 );
98
99 let identity = credentials.into();
100 let mut signing_settings = SigningSettings::default();
101 signing_settings.signature_location = SignatureLocation::Headers;
102
103 let signing_params = v4::SigningParams::builder()
104 .identity(&identity)
105 .region(&self.region)
106 .name("s3")
107 .time(SystemTime::now())
108 .settings(signing_settings)
109 .build()
110 .map_err(|e| Error::Auth(format!("Failed to build signing params: {e}")))?;
111
112 let header_pairs: Vec<(&str, &str)> = headers
114 .iter()
115 .filter_map(|(k, v)| v.to_str().ok().map(|v| (k.as_str(), v)))
116 .collect();
117
118 let signable_body = SignableBody::Bytes(body);
119
120 let signable_request = SignableRequest::new(
121 method.as_str(),
122 url,
123 header_pairs.into_iter(),
124 signable_body,
125 )
126 .map_err(|e| Error::Auth(format!("Failed to create signable request: {e}")))?;
127
128 let (signing_instructions, _signature) = sign(signable_request, &signing_params.into())
129 .map_err(|e| Error::Auth(format!("Failed to sign request: {e}")))?
130 .into_parts();
131
132 let mut signed_headers = headers.clone();
134 for (name, value) in signing_instructions.headers() {
135 let header_name = HeaderName::try_from(&name.to_string())
136 .map_err(|e| Error::Auth(format!("Invalid header name: {e}")))?;
137 let header_value = HeaderValue::try_from(&value.to_string())
138 .map_err(|e| Error::Auth(format!("Invalid header value: {e}")))?;
139 signed_headers.insert(header_name, header_value);
140 }
141
142 Ok(signed_headers)
143 }
144
145 async fn request<T: for<'de> Deserialize<'de>>(
147 &self,
148 method: Method,
149 path: &str,
150 query: Option<&[(&str, &str)]>,
151 body: Option<&[u8]>,
152 ) -> Result<T> {
153 let mut url = self.admin_url(path);
154
155 if let Some(q) = query {
156 let query_string: String = q
157 .iter()
158 .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
159 .collect::<Vec<_>>()
160 .join("&");
161 if !query_string.is_empty() {
162 url.push('?');
163 url.push_str(&query_string);
164 }
165 }
166
167 let body_bytes = body.unwrap_or(&[]);
168 let content_hash = Self::sha256_hash(body_bytes);
169
170 let mut headers = HeaderMap::new();
171 headers.insert("x-amz-content-sha256", content_hash.parse().unwrap());
172 headers.insert("host", self.get_host().parse().unwrap());
173
174 if !body_bytes.is_empty() {
175 headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
176 }
177
178 let signed_headers = self
179 .sign_request(&method, &url, &headers, body_bytes)
180 .await?;
181
182 let mut request_builder = self.http_client.request(method.clone(), &url);
183
184 for (name, value) in signed_headers.iter() {
185 request_builder = request_builder.header(name, value);
186 }
187
188 if !body_bytes.is_empty() {
189 request_builder = request_builder.body(body_bytes.to_vec());
190 }
191
192 let response = request_builder
193 .send()
194 .await
195 .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
196
197 let status = response.status();
198
199 if !status.is_success() {
200 let error_body = response
201 .text()
202 .await
203 .unwrap_or_else(|_| "Unknown error".to_string());
204 return Err(self.map_error(status, &error_body));
205 }
206
207 let text = response
208 .text()
209 .await
210 .map_err(|e| Error::Network(format!("Failed to read response: {e}")))?;
211
212 if text.is_empty() {
213 serde_json::from_str("null").map_err(Error::Json)
215 } else {
216 serde_json::from_str(&text).map_err(Error::Json)
217 }
218 }
219
220 async fn request_no_response(
222 &self,
223 method: Method,
224 path: &str,
225 query: Option<&[(&str, &str)]>,
226 body: Option<&[u8]>,
227 ) -> Result<()> {
228 let mut url = self.admin_url(path);
229
230 if let Some(q) = query {
231 let query_string: String = q
232 .iter()
233 .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
234 .collect::<Vec<_>>()
235 .join("&");
236 if !query_string.is_empty() {
237 url.push('?');
238 url.push_str(&query_string);
239 }
240 }
241
242 let body_bytes = body.unwrap_or(&[]);
243 let content_hash = Self::sha256_hash(body_bytes);
244
245 let mut headers = HeaderMap::new();
246 headers.insert("x-amz-content-sha256", content_hash.parse().unwrap());
247 headers.insert("host", self.get_host().parse().unwrap());
248
249 if !body_bytes.is_empty() {
250 headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
251 }
252
253 let signed_headers = self
254 .sign_request(&method, &url, &headers, body_bytes)
255 .await?;
256
257 let mut request_builder = self.http_client.request(method.clone(), &url);
258
259 for (name, value) in signed_headers.iter() {
260 request_builder = request_builder.header(name, value);
261 }
262
263 if !body_bytes.is_empty() {
264 request_builder = request_builder.body(body_bytes.to_vec());
265 }
266
267 let response = request_builder
268 .send()
269 .await
270 .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
271
272 let status = response.status();
273
274 if !status.is_success() {
275 let error_body = response
276 .text()
277 .await
278 .unwrap_or_else(|_| "Unknown error".to_string());
279 return Err(self.map_error(status, &error_body));
280 }
281
282 Ok(())
283 }
284
285 fn get_host(&self) -> String {
287 self.endpoint
288 .trim_start_matches("http://")
289 .trim_start_matches("https://")
290 .to_string()
291 }
292
293 fn map_error(&self, status: StatusCode, body: &str) -> Error {
295 match status {
296 StatusCode::NOT_FOUND => Error::NotFound(body.to_string()),
297 StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => Error::Auth(body.to_string()),
298 StatusCode::CONFLICT => Error::Conflict(body.to_string()),
299 StatusCode::BAD_REQUEST => Error::General(format!("Bad request: {body}")),
300 _ => Error::Network(format!("HTTP {}: {}", status.as_u16(), body)),
301 }
302 }
303}
304
305#[derive(Debug, Deserialize)]
307struct UserListResponse(HashMap<String, UserInfo>);
308
309#[derive(Debug, Deserialize)]
310#[serde(rename_all = "camelCase")]
311struct UserInfo {
312 #[serde(default)]
313 status: String,
314 #[serde(default)]
315 policy_name: Option<String>,
316 #[serde(default)]
317 member_of: Option<Vec<String>>,
318}
319
320#[derive(Debug, Deserialize)]
322struct PolicyListResponse(HashMap<String, serde_json::Value>);
323
324#[derive(Debug, Serialize)]
326#[serde(rename_all = "camelCase")]
327struct CreateUserRequest {
328 secret_key: String,
329 status: String,
330}
331
332#[derive(Debug, Serialize)]
334struct CreateGroupRequest {
335 group: String,
336 #[serde(skip_serializing_if = "Option::is_none")]
337 members: Option<Vec<String>>,
338}
339
340#[derive(Debug, Deserialize)]
342struct ServiceAccountListResponse {
343 accounts: Option<Vec<ServiceAccountInfo>>,
344}
345
346#[derive(Debug, Deserialize)]
347#[serde(rename_all = "camelCase")]
348struct ServiceAccountInfo {
349 access_key: String,
350 #[serde(default)]
351 parent_user: Option<String>,
352 #[serde(default)]
353 account_status: Option<String>,
354 #[serde(default)]
355 expiration: Option<String>,
356 #[serde(default)]
357 name: Option<String>,
358 #[serde(default)]
359 description: Option<String>,
360 #[serde(default)]
361 implied_policy: Option<bool>,
362}
363
364#[derive(Debug, Deserialize)]
365#[serde(rename_all = "camelCase")]
366struct BackgroundHealStatusResponse {
367 #[serde(default)]
368 bitrot_start_time: Option<String>,
369}
370
371impl From<BackgroundHealStatusResponse> for HealStatus {
372 fn from(response: BackgroundHealStatusResponse) -> Self {
373 Self {
374 healing: response.bitrot_start_time.is_some(),
375 started: response.bitrot_start_time,
376 ..Default::default()
377 }
378 }
379}
380
381#[derive(Debug, Serialize)]
382struct RustfsHealOptions {
383 recursive: bool,
384 #[serde(rename = "dryRun")]
385 dry_run: bool,
386 remove: bool,
387 recreate: bool,
388 #[serde(rename = "scanMode")]
389 scan_mode: u8,
390 #[serde(rename = "updateParity")]
391 update_parity: bool,
392 #[serde(rename = "nolock")]
393 no_lock: bool,
394}
395
396impl From<&HealStartRequest> for RustfsHealOptions {
397 fn from(request: &HealStartRequest) -> Self {
398 Self {
399 recursive: false,
400 dry_run: request.dry_run,
401 remove: request.remove,
402 recreate: request.recreate,
403 scan_mode: rustfs_heal_scan_mode(request.scan_mode),
404 update_parity: false,
405 no_lock: false,
406 }
407 }
408}
409
410fn rustfs_heal_scan_mode(scan_mode: HealScanMode) -> u8 {
411 match scan_mode {
412 HealScanMode::Normal => 1,
413 HealScanMode::Deep => 2,
414 }
415}
416
417fn rustfs_heal_path(request: &HealStartRequest) -> Result<String> {
418 let bucket = request
419 .bucket
420 .as_deref()
421 .filter(|bucket| !bucket.is_empty());
422 let prefix = request
423 .prefix
424 .as_deref()
425 .filter(|prefix| !prefix.is_empty());
426
427 match (bucket, prefix) {
428 (None, None) => Ok("/heal/".to_string()),
429 (Some(bucket), None) => Ok(format!("/heal/{}", urlencoding::encode(bucket))),
430 (Some(bucket), Some(prefix)) => Ok(format!(
431 "/heal/{}/{}",
432 urlencoding::encode(bucket),
433 urlencoding::encode(prefix)
434 )),
435 (None, Some(_)) => Err(Error::InvalidPath(
436 "heal prefix requires a bucket target".to_string(),
437 )),
438 }
439}
440
441fn rustfs_heal_body(request: &HealStartRequest) -> Result<Vec<u8>> {
442 serde_json::to_vec(&RustfsHealOptions::from(request)).map_err(Error::Json)
443}
444
445#[derive(Debug, Serialize)]
447#[serde(rename_all = "camelCase")]
448struct SetBucketQuotaApiRequest {
449 quota: u64,
450 quota_type: String,
451}
452
453#[async_trait]
454impl AdminApi for AdminClient {
455 async fn cluster_info(&self) -> Result<ClusterInfo> {
458 self.request(Method::GET, "/info", None, None).await
459 }
460
461 async fn heal_status(&self) -> Result<HealStatus> {
462 let response: BackgroundHealStatusResponse = self
463 .request(Method::POST, "/background-heal/status", None, None)
464 .await?;
465 Ok(response.into())
466 }
467
468 async fn heal_start(&self, request: HealStartRequest) -> Result<HealStatus> {
469 let path = rustfs_heal_path(&request)?;
470 let body = rustfs_heal_body(&request)?;
471 self.request_no_response(Method::POST, &path, None, Some(&body))
472 .await?;
473 Ok(HealStatus::default())
474 }
475
476 async fn heal_stop(&self) -> Result<()> {
477 let body = rustfs_heal_body(&HealStartRequest::default())?;
478 self.request_no_response(
479 Method::POST,
480 "/heal/",
481 Some(&[("forceStop", "true")]),
482 Some(&body),
483 )
484 .await
485 }
486
487 async fn list_users(&self) -> Result<Vec<User>> {
490 let response: UserListResponse =
491 self.request(Method::GET, "/list-users", None, None).await?;
492
493 Ok(response
494 .0
495 .into_iter()
496 .map(|(access_key, info)| User {
497 access_key,
498 secret_key: None,
499 status: if info.status == "disabled" {
500 UserStatus::Disabled
501 } else {
502 UserStatus::Enabled
503 },
504 policy_name: info.policy_name,
505 member_of: info.member_of.unwrap_or_default(),
506 })
507 .collect())
508 }
509
510 async fn get_user(&self, access_key: &str) -> Result<User> {
511 let query = [("accessKey", access_key)];
512 let response: UserInfo = self
513 .request(Method::GET, "/user-info", Some(&query), None)
514 .await?;
515
516 Ok(User {
517 access_key: access_key.to_string(),
518 secret_key: None,
519 status: if response.status == "disabled" {
520 UserStatus::Disabled
521 } else {
522 UserStatus::Enabled
523 },
524 policy_name: response.policy_name,
525 member_of: response.member_of.unwrap_or_default(),
526 })
527 }
528
529 async fn create_user(&self, access_key: &str, secret_key: &str) -> Result<User> {
530 let query = [("accessKey", access_key)];
531 let body = serde_json::to_vec(&CreateUserRequest {
532 secret_key: secret_key.to_string(),
533 status: "enabled".to_string(),
534 })
535 .map_err(Error::Json)?;
536
537 self.request_no_response(Method::PUT, "/add-user", Some(&query), Some(&body))
538 .await?;
539
540 Ok(User {
541 access_key: access_key.to_string(),
542 secret_key: Some(secret_key.to_string()),
543 status: UserStatus::Enabled,
544 policy_name: None,
545 member_of: vec![],
546 })
547 }
548
549 async fn delete_user(&self, access_key: &str) -> Result<()> {
550 let query = [("accessKey", access_key)];
551 self.request_no_response(Method::DELETE, "/remove-user", Some(&query), None)
552 .await
553 }
554
555 async fn set_user_status(&self, access_key: &str, status: UserStatus) -> Result<()> {
556 let status_str = match status {
557 UserStatus::Enabled => "enabled",
558 UserStatus::Disabled => "disabled",
559 };
560 let query = [("accessKey", access_key), ("status", status_str)];
561 self.request_no_response(Method::PUT, "/set-user-status", Some(&query), None)
562 .await
563 }
564
565 async fn list_policies(&self) -> Result<Vec<PolicyInfo>> {
568 let response: PolicyListResponse = self
569 .request(Method::GET, "/list-canned-policies", None, None)
570 .await?;
571
572 Ok(response
573 .0
574 .into_keys()
575 .map(|name| PolicyInfo { name })
576 .collect())
577 }
578
579 async fn get_policy(&self, name: &str) -> Result<Policy> {
580 let query = [("name", name)];
581 let policy_doc: serde_json::Value = self
582 .request(Method::GET, "/info-canned-policy", Some(&query), None)
583 .await?;
584
585 Ok(Policy {
586 name: name.to_string(),
587 policy: serde_json::to_string_pretty(&policy_doc).unwrap_or_default(),
588 })
589 }
590
591 async fn create_policy(&self, name: &str, policy_document: &str) -> Result<()> {
592 let query = [("name", name)];
593 let body = policy_document.as_bytes();
594 self.request_no_response(Method::PUT, "/add-canned-policy", Some(&query), Some(body))
595 .await
596 }
597
598 async fn delete_policy(&self, name: &str) -> Result<()> {
599 let query = [("name", name)];
600 self.request_no_response(Method::DELETE, "/remove-canned-policy", Some(&query), None)
601 .await
602 }
603
604 async fn attach_policy(
605 &self,
606 policy_names: &[String],
607 entity_type: PolicyEntity,
608 entity_name: &str,
609 ) -> Result<()> {
610 let policy_name = policy_names.join(",");
611 let is_group = entity_type == PolicyEntity::Group;
612
613 let query = [
614 ("policyName", policy_name.as_str()),
615 ("userOrGroup", entity_name),
616 ("isGroup", if is_group { "true" } else { "false" }),
617 ];
618
619 self.request_no_response(Method::PUT, "/set-user-or-group-policy", Some(&query), None)
620 .await
621 }
622
623 async fn detach_policy(
624 &self,
625 policy_names: &[String],
626 entity_type: PolicyEntity,
627 entity_name: &str,
628 ) -> Result<()> {
629 let _ = (policy_names, entity_type, entity_name);
633 Err(Error::UnsupportedFeature(
634 "Policy detach not directly supported. Use attach with remaining policies instead."
635 .to_string(),
636 ))
637 }
638
639 async fn list_groups(&self) -> Result<Vec<String>> {
642 let response: Vec<String> = self.request(Method::GET, "/groups", None, None).await?;
643 Ok(response)
644 }
645
646 async fn get_group(&self, name: &str) -> Result<Group> {
647 let query = [("group", name)];
648 let response: Group = self
649 .request(Method::GET, "/group", Some(&query), None)
650 .await?;
651 Ok(response)
652 }
653
654 async fn create_group(&self, name: &str, members: Option<&[String]>) -> Result<Group> {
655 let body = serde_json::to_vec(&CreateGroupRequest {
656 group: name.to_string(),
657 members: members.map(|m| m.to_vec()),
658 })
659 .map_err(Error::Json)?;
660
661 self.request_no_response(Method::POST, "/groups", None, Some(&body))
662 .await?;
663
664 Ok(Group {
665 name: name.to_string(),
666 policy: None,
667 members: members.map(|m| m.to_vec()).unwrap_or_default(),
668 status: GroupStatus::Enabled,
669 })
670 }
671
672 async fn delete_group(&self, name: &str) -> Result<()> {
673 let path = format!("/group/{}", urlencoding::encode(name));
674 self.request_no_response(Method::DELETE, &path, None, None)
675 .await
676 }
677
678 async fn set_group_status(&self, name: &str, status: GroupStatus) -> Result<()> {
679 let status_str = match status {
680 GroupStatus::Enabled => "enabled",
681 GroupStatus::Disabled => "disabled",
682 };
683 let query = [("group", name), ("status", status_str)];
684 self.request_no_response(Method::PUT, "/set-group-status", Some(&query), None)
685 .await
686 }
687
688 async fn add_group_members(&self, group: &str, members: &[String]) -> Result<()> {
689 let body = serde_json::to_vec(&UpdateGroupMembersRequest {
690 group: group.to_string(),
691 members: members.to_vec(),
692 is_remove: false,
693 status: "enabled".to_string(),
694 })
695 .map_err(Error::Json)?;
696
697 self.request_no_response(Method::PUT, "/update-group-members", None, Some(&body))
698 .await
699 }
700
701 async fn remove_group_members(&self, group: &str, members: &[String]) -> Result<()> {
702 let body = serde_json::to_vec(&UpdateGroupMembersRequest {
703 group: group.to_string(),
704 members: members.to_vec(),
705 is_remove: true,
706 status: "enabled".to_string(),
707 })
708 .map_err(Error::Json)?;
709
710 self.request_no_response(Method::PUT, "/update-group-members", None, Some(&body))
711 .await
712 }
713
714 async fn list_service_accounts(&self, user: Option<&str>) -> Result<Vec<ServiceAccount>> {
717 let query: Vec<(&str, &str)> = user.map(|u| vec![("user", u)]).unwrap_or_default();
718 let query_ref: Option<&[(&str, &str)]> = if query.is_empty() { None } else { Some(&query) };
719
720 let response: ServiceAccountListResponse = self
721 .request(Method::GET, "/list-service-accounts", query_ref, None)
722 .await?;
723
724 Ok(response
725 .accounts
726 .unwrap_or_default()
727 .into_iter()
728 .map(|sa| ServiceAccount {
729 access_key: sa.access_key,
730 secret_key: None,
731 parent_user: sa.parent_user,
732 policy: None,
733 account_status: sa.account_status,
734 expiration: sa.expiration,
735 name: sa.name,
736 description: sa.description,
737 implied_policy: sa.implied_policy,
738 })
739 .collect())
740 }
741
742 async fn get_service_account(&self, access_key: &str) -> Result<ServiceAccount> {
743 let query = [("accessKey", access_key)];
744 let response: ServiceAccount = self
745 .request(Method::GET, "/info-service-account", Some(&query), None)
746 .await?;
747
748 let mut response = response;
749 if response.access_key.is_empty() {
750 response.access_key = access_key.to_string();
751 }
752 Ok(response)
753 }
754
755 async fn create_service_account(
756 &self,
757 request: CreateServiceAccountRequest,
758 ) -> Result<ServiceAccount> {
759 let body = serde_json::to_vec(&request).map_err(Error::Json)?;
760 let response: ServiceAccountCreateResponse = self
761 .request(Method::PUT, "/add-service-accounts", None, Some(&body))
762 .await?;
763
764 Ok(ServiceAccount {
765 access_key: response.credentials.access_key,
766 secret_key: Some(response.credentials.secret_key),
767 expiration: response.credentials.expiration,
768 parent_user: None,
769 policy: None,
770 account_status: None,
771 name: None,
772 description: None,
773 implied_policy: None,
774 })
775 }
776
777 async fn delete_service_account(&self, access_key: &str) -> Result<()> {
778 let query = [("accessKey", access_key)];
779 self.request_no_response(
780 Method::DELETE,
781 "/delete-service-accounts",
782 Some(&query),
783 None,
784 )
785 .await
786 }
787
788 async fn set_bucket_quota(&self, bucket: &str, quota: u64) -> Result<BucketQuota> {
791 let path = format!("/quota/{}", urlencoding::encode(bucket));
792 let body = serde_json::to_vec(&SetBucketQuotaApiRequest {
793 quota,
794 quota_type: "HARD".to_string(),
795 })
796 .map_err(Error::Json)?;
797
798 self.request(Method::PUT, &path, None, Some(&body)).await
799 }
800
801 async fn get_bucket_quota(&self, bucket: &str) -> Result<BucketQuota> {
802 let path = format!("/quota/{}", urlencoding::encode(bucket));
803 self.request(Method::GET, &path, None, None).await
804 }
805
806 async fn clear_bucket_quota(&self, bucket: &str) -> Result<BucketQuota> {
807 let path = format!("/quota/{}", urlencoding::encode(bucket));
808 self.request(Method::DELETE, &path, None, None).await
809 }
810
811 async fn list_tiers(&self) -> Result<Vec<rc_core::admin::TierConfig>> {
814 self.request(Method::GET, "/tier", None, None).await
815 }
816
817 async fn tier_stats(&self) -> Result<serde_json::Value> {
818 self.request(Method::GET, "/tier-stats", None, None).await
819 }
820
821 async fn add_tier(&self, config: rc_core::admin::TierConfig) -> Result<()> {
822 let body = serde_json::to_vec(&config).map_err(Error::Json)?;
823 self.request_no_response(Method::PUT, "/tier", None, Some(&body))
824 .await
825 }
826
827 async fn edit_tier(&self, name: &str, creds: rc_core::admin::TierCreds) -> Result<()> {
828 let path = format!("/tier/{}", urlencoding::encode(name));
829 let body = serde_json::to_vec(&creds).map_err(Error::Json)?;
830 self.request_no_response(Method::POST, &path, None, Some(&body))
831 .await
832 }
833
834 async fn remove_tier(&self, name: &str, force: bool) -> Result<()> {
835 let path = format!("/tier/{}", urlencoding::encode(name));
836 if force {
837 let query: &[(&str, &str)] = &[("force", "true")];
838 self.request_no_response(Method::DELETE, &path, Some(query), None)
839 .await
840 } else {
841 self.request_no_response(Method::DELETE, &path, None, None)
842 .await
843 }
844 }
845
846 async fn set_remote_target(
849 &self,
850 bucket: &str,
851 target: rc_core::replication::BucketTarget,
852 update: bool,
853 ) -> Result<String> {
854 let body = serde_json::to_vec(&target).map_err(Error::Json)?;
855 if update {
856 let query: &[(&str, &str)] = &[("bucket", bucket), ("update", "true")];
857 self.request(Method::PUT, "/set-remote-target", Some(query), Some(&body))
858 .await
859 } else {
860 let query: &[(&str, &str)] = &[("bucket", bucket)];
861 self.request(Method::PUT, "/set-remote-target", Some(query), Some(&body))
862 .await
863 }
864 }
865
866 async fn list_remote_targets(
867 &self,
868 bucket: &str,
869 ) -> Result<Vec<rc_core::replication::BucketTarget>> {
870 let query: &[(&str, &str)] = &[("bucket", bucket)];
871 self.request(Method::GET, "/list-remote-targets", Some(query), None)
872 .await
873 }
874
875 async fn remove_remote_target(&self, bucket: &str, arn: &str) -> Result<()> {
876 let query: &[(&str, &str)] = &[("bucket", bucket), ("arn", arn)];
877 self.request_no_response(Method::DELETE, "/remove-remote-target", Some(query), None)
878 .await
879 }
880
881 async fn replication_metrics(&self, bucket: &str) -> Result<serde_json::Value> {
882 let query: &[(&str, &str)] = &[("bucket", bucket)];
883 self.request(Method::GET, "/replicationmetrics", Some(query), None)
884 .await
885 }
886}
887
888#[cfg(test)]
889mod tests {
890 use super::*;
891 use std::io::{Read, Write};
892 use std::net::{TcpListener, TcpStream};
893 use std::sync::mpsc;
894 use std::thread;
895 use tempfile::tempdir;
896
897 #[derive(Debug)]
898 struct CapturedAdminRequest {
899 method: String,
900 target: String,
901 body: Vec<u8>,
902 }
903
904 fn read_admin_request(stream: &mut TcpStream) -> CapturedAdminRequest {
905 let mut buffer = Vec::new();
906 let mut chunk = [0_u8; 1024];
907 let header_end = loop {
908 let read = stream.read(&mut chunk).expect("read HTTP request");
909 assert!(read > 0, "client closed connection before headers");
910 buffer.extend_from_slice(&chunk[..read]);
911
912 if let Some(position) = buffer.windows(4).position(|window| window == b"\r\n\r\n") {
913 break position + 4;
914 }
915 };
916
917 let headers = String::from_utf8_lossy(&buffer[..header_end]).into_owned();
918 let content_length = headers
919 .lines()
920 .find_map(|line| {
921 let (name, value) = line.split_once(':')?;
922 name.eq_ignore_ascii_case("content-length")
923 .then(|| value.trim().parse::<usize>().expect("valid content length"))
924 })
925 .unwrap_or(0);
926
927 while buffer.len() - header_end < content_length {
928 let read = stream.read(&mut chunk).expect("read HTTP request body");
929 assert!(read > 0, "client closed connection before body");
930 buffer.extend_from_slice(&chunk[..read]);
931 }
932
933 let request_line = headers.lines().next().expect("request line");
934 let mut parts = request_line.split_whitespace();
935 let method = parts.next().expect("request method").to_string();
936 let target = parts.next().expect("request target").to_string();
937 let body = buffer[header_end..header_end + content_length].to_vec();
938
939 CapturedAdminRequest {
940 method,
941 target,
942 body,
943 }
944 }
945
946 fn start_admin_test_server(
947 response_status: &str,
948 response_body: &'static str,
949 ) -> (
950 String,
951 mpsc::Receiver<CapturedAdminRequest>,
952 thread::JoinHandle<()>,
953 ) {
954 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
955 let endpoint = format!("http://{}", listener.local_addr().expect("local addr"));
956 let (sender, receiver) = mpsc::channel();
957 let response_status = response_status.to_string();
958
959 let handle = thread::spawn(move || {
960 let (mut stream, _) = listener.accept().expect("accept request");
961 let request = read_admin_request(&mut stream);
962 sender.send(request).expect("send captured request");
963
964 let response = format!(
965 "HTTP/1.1 {response_status}\r\ncontent-length: {}\r\ncontent-type: application/json\r\nconnection: close\r\n\r\n{response_body}",
966 response_body.len()
967 );
968 stream
969 .write_all(response.as_bytes())
970 .expect("write HTTP response");
971 });
972
973 (endpoint, receiver, handle)
974 }
975
976 fn admin_client_for_endpoint(endpoint: &str) -> AdminClient {
977 let alias = Alias::new("test", endpoint, "access", "secret");
978 AdminClient::new(&alias).expect("admin client should build")
979 }
980
981 fn assert_heal_options_body(
982 body: &[u8],
983 scan_mode: u8,
984 remove: bool,
985 recreate: bool,
986 dry_run: bool,
987 ) {
988 let value: serde_json::Value =
989 serde_json::from_slice(body).expect("heal request body should be JSON");
990
991 assert_eq!(value["recursive"], false);
992 assert_eq!(value["dryRun"], dry_run);
993 assert_eq!(value["remove"], remove);
994 assert_eq!(value["recreate"], recreate);
995 assert_eq!(value["scanMode"], scan_mode);
996 assert_eq!(value["updateParity"], false);
997 assert_eq!(value["nolock"], false);
998 assert!(value.get("bucket").is_none());
999 assert!(value.get("prefix").is_none());
1000 }
1001
1002 #[test]
1003 fn test_admin_url_construction() {
1004 let alias = Alias::new("test", "http://localhost:9000", "access", "secret");
1005 let client = AdminClient::new(&alias).unwrap();
1006
1007 assert_eq!(
1008 client.admin_url("/list-users"),
1009 "http://localhost:9000/rustfs/admin/v3/list-users"
1010 );
1011 }
1012
1013 #[test]
1014 fn test_admin_url_with_trailing_slash() {
1015 let alias = Alias::new("test", "http://localhost:9000/", "access", "secret");
1016 let client = AdminClient::new(&alias).unwrap();
1017
1018 assert_eq!(
1019 client.admin_url("/list-users"),
1020 "http://localhost:9000/rustfs/admin/v3/list-users"
1021 );
1022 }
1023
1024 #[test]
1025 fn test_get_host() {
1026 let alias = Alias::new("test", "https://s3.example.com", "access", "secret");
1027 let client = AdminClient::new(&alias).unwrap();
1028
1029 assert_eq!(client.get_host(), "s3.example.com");
1030 }
1031
1032 #[test]
1033 fn test_sha256_hash() {
1034 let hash = AdminClient::sha256_hash(b"test");
1035 assert_eq!(
1036 hash,
1037 "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
1038 );
1039 }
1040
1041 #[test]
1042 fn test_sha256_hash_empty() {
1043 let hash = AdminClient::sha256_hash(b"");
1044 assert_eq!(
1045 hash,
1046 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
1047 );
1048 }
1049
1050 #[test]
1051 fn test_rustfs_heal_path_matches_admin_routes() {
1052 assert_eq!(
1053 rustfs_heal_path(&HealStartRequest::default()).expect("root path"),
1054 "/heal/"
1055 );
1056
1057 let bucket_request = HealStartRequest {
1058 bucket: Some("photos".to_string()),
1059 ..Default::default()
1060 };
1061 assert_eq!(
1062 rustfs_heal_path(&bucket_request).expect("bucket path"),
1063 "/heal/photos"
1064 );
1065
1066 let prefix_request = HealStartRequest {
1067 bucket: Some("photos".to_string()),
1068 prefix: Some("2026/raw".to_string()),
1069 ..Default::default()
1070 };
1071 assert_eq!(
1072 rustfs_heal_path(&prefix_request).expect("prefix path"),
1073 "/heal/photos/2026%2Fraw"
1074 );
1075
1076 let invalid_request = HealStartRequest {
1077 prefix: Some("2026/raw".to_string()),
1078 ..Default::default()
1079 };
1080 assert!(matches!(
1081 rustfs_heal_path(&invalid_request),
1082 Err(Error::InvalidPath(_))
1083 ));
1084 }
1085
1086 #[test]
1087 fn test_rustfs_heal_body_matches_server_heal_options() {
1088 let request = HealStartRequest {
1089 scan_mode: HealScanMode::Deep,
1090 remove: true,
1091 recreate: true,
1092 dry_run: true,
1093 ..Default::default()
1094 };
1095
1096 let body = rustfs_heal_body(&request).expect("heal options should serialize");
1097 let value: serde_json::Value =
1098 serde_json::from_slice(&body).expect("heal options body should be JSON");
1099
1100 assert_eq!(value["recursive"], false);
1101 assert_eq!(value["dryRun"], true);
1102 assert_eq!(value["remove"], true);
1103 assert_eq!(value["recreate"], true);
1104 assert_eq!(value["scanMode"], 2);
1105 assert_eq!(value["updateParity"], false);
1106 assert_eq!(value["nolock"], false);
1107 assert!(value.get("bucket").is_none());
1108 assert!(value.get("prefix").is_none());
1109 }
1110
1111 #[test]
1112 fn test_background_heal_status_response_maps_to_heal_status() {
1113 let status = HealStatus::from(BackgroundHealStatusResponse {
1114 bitrot_start_time: Some("2026-04-19T10:00:00Z".to_string()),
1115 });
1116
1117 assert!(status.healing);
1118 assert_eq!(status.started.as_deref(), Some("2026-04-19T10:00:00Z"));
1119
1120 let idle = HealStatus::from(BackgroundHealStatusResponse {
1121 bitrot_start_time: None,
1122 });
1123 assert!(!idle.healing);
1124 assert!(idle.started.is_none());
1125 }
1126
1127 #[test]
1128 fn test_bad_request_maps_to_general_admin_error() {
1129 let alias = Alias::new("test", "http://localhost:9000", "access", "secret");
1130 let client = AdminClient::new(&alias).expect("admin client should build");
1131
1132 let error = client.map_error(StatusCode::BAD_REQUEST, "err request body parse");
1133 assert!(matches!(error, Error::General(_)));
1134 assert_eq!(error.to_string(), "Bad request: err request body parse");
1135 }
1136
1137 #[tokio::test]
1138 async fn test_heal_status_uses_background_heal_status_endpoint() {
1139 let (endpoint, receiver, handle) =
1140 start_admin_test_server("200 OK", r#"{"bitrotStartTime":"2026-04-19T10:00:00Z"}"#);
1141 let client = admin_client_for_endpoint(&endpoint);
1142
1143 let status = client.heal_status().await.expect("heal status request");
1144
1145 assert!(status.healing);
1146 assert_eq!(status.started.as_deref(), Some("2026-04-19T10:00:00Z"));
1147
1148 let request = receiver.recv().expect("captured request");
1149 assert_eq!(request.method, "POST");
1150 assert_eq!(request.target, "/rustfs/admin/v3/background-heal/status");
1151 assert!(request.body.is_empty());
1152 handle.join().expect("server thread should finish");
1153 }
1154
1155 #[tokio::test]
1156 async fn test_heal_start_posts_to_bucket_prefix_route_with_options_body() {
1157 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1158 let client = admin_client_for_endpoint(&endpoint);
1159 let request = HealStartRequest {
1160 bucket: Some("raw photos".to_string()),
1161 prefix: Some("2026/april".to_string()),
1162 scan_mode: HealScanMode::Deep,
1163 remove: true,
1164 recreate: true,
1165 dry_run: true,
1166 };
1167
1168 let status = client
1169 .heal_start(request)
1170 .await
1171 .expect("heal start request");
1172
1173 assert!(!status.healing);
1174 assert!(status.heal_id.is_empty());
1175 assert!(status.started.is_none());
1176
1177 let request = receiver.recv().expect("captured request");
1178 assert_eq!(request.method, "POST");
1179 assert_eq!(
1180 request.target,
1181 "/rustfs/admin/v3/heal/raw%20photos/2026%2Fapril"
1182 );
1183 assert_heal_options_body(&request.body, 2, true, true, true);
1184 handle.join().expect("server thread should finish");
1185 }
1186
1187 #[tokio::test]
1188 async fn test_heal_stop_posts_force_stop_to_root_heal_route() {
1189 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1190 let client = admin_client_for_endpoint(&endpoint);
1191
1192 client.heal_stop().await.expect("heal stop request");
1193
1194 let request = receiver.recv().expect("captured request");
1195 assert_eq!(request.method, "POST");
1196 assert_eq!(request.target, "/rustfs/admin/v3/heal/?forceStop=true");
1197 assert_heal_options_body(&request.body, 1, false, false, false);
1198 handle.join().expect("server thread should finish");
1199 }
1200
1201 #[test]
1202 fn test_admin_client_invalid_ca_bundle_path_surfaces_error() {
1203 let mut alias = Alias::new("test", "https://localhost:9000", "access", "secret");
1204 alias.ca_bundle = Some("/definitely-not-here/ca.pem".to_string());
1205
1206 let result = AdminClient::new(&alias);
1207 match result {
1208 Err(Error::Network(msg)) => {
1209 assert!(
1210 msg.contains("Failed to read CA bundle"),
1211 "Unexpected error message: {msg}"
1212 );
1213 }
1214 Ok(_) => panic!("Expected Error::Network for invalid path, got Ok(_)"),
1215 Err(e) => panic!("Expected Error::Network for invalid path, got Err({e})"),
1216 }
1217 }
1218
1219 #[test]
1220 fn test_admin_client_invalid_ca_bundle_pem_surfaces_error() {
1221 let temp_dir = tempdir().expect("create temp dir");
1222 let bad_pem_path = temp_dir.path().join("bad-ca.pem");
1223 std::fs::write(
1224 &bad_pem_path,
1225 b"-----BEGIN CERTIFICATE-----\ninvalid-base64\n-----END CERTIFICATE-----\n",
1226 )
1227 .expect("write invalid PEM");
1228
1229 let mut alias = Alias::new("test", "https://localhost:9000", "access", "secret");
1230 alias.ca_bundle = Some(bad_pem_path.display().to_string());
1231
1232 let result = AdminClient::new(&alias);
1233 match result {
1234 Err(Error::Network(msg)) => {
1235 assert!(
1236 msg.contains("Invalid CA bundle") && msg.contains("bad-ca.pem"),
1237 "Unexpected error message for invalid PEM CA bundle: {msg}"
1238 );
1239 }
1240 Ok(_) => panic!("Expected Error::Network for invalid PEM, got Ok(_)"),
1241 Err(e) => panic!("Expected Error::Network for invalid PEM, got Err({e})"),
1242 }
1243 }
1244}