1use async_trait::async_trait;
7use aws_credential_types::Credentials;
8use aws_sigv4::http_request::{
9 SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
10};
11use aws_sigv4::sign::v4;
12use rc_core::admin::{
13 AdminApi, BucketQuota, ClusterInfo, CreateServiceAccountRequest, Group, GroupStatus,
14 HealScanMode, HealStartRequest, HealStatus, Policy, PolicyEntity, PolicyInfo, PoolStatus,
15 PoolTarget, RebalanceStartResult, RebalanceStatus, ServiceAccount,
16 ServiceAccountCreateResponse, UpdateGroupMembersRequest, User, UserStatus,
17};
18use rc_core::{Alias, Error, Result};
19use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
20use reqwest::{Client, Method, StatusCode};
21use serde::{Deserialize, Serialize};
22use sha2::{Digest, Sha256};
23use std::collections::HashMap;
24use std::time::SystemTime;
25
26pub struct AdminClient {
28 http_client: Client,
29 endpoint: String,
30 access_key: String,
31 secret_key: String,
32 region: String,
33 anonymous: bool,
34}
35
36impl AdminClient {
37 pub fn new(alias: &Alias) -> Result<Self> {
39 let mut builder = Client::builder()
40 .danger_accept_invalid_certs(alias.insecure)
41 .tls_built_in_native_certs(true)
42 .tls_built_in_webpki_certs(true);
43
44 if let Some(bundle_path) = alias.ca_bundle.as_deref() {
45 let pem = std::fs::read(bundle_path).map_err(|e| {
46 Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
47 })?;
48 let certs = reqwest::Certificate::from_pem_bundle(&pem)
49 .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
50 if certs.is_empty() {
51 return Err(Error::Network(format!(
52 "Invalid CA bundle '{bundle_path}': no certificates found"
53 )));
54 }
55 for cert in certs {
56 builder = builder.add_root_certificate(cert);
57 }
58 }
59
60 if let (Some(cert_path), Some(key_path)) =
61 (alias.client_cert.as_deref(), alias.client_key.as_deref())
62 {
63 let mut identity_pem = std::fs::read(cert_path).map_err(|e| {
64 Error::Network(format!(
65 "Failed to read client certificate '{cert_path}': {e}"
66 ))
67 })?;
68 let key_pem = std::fs::read(key_path).map_err(|e| {
69 Error::Network(format!("Failed to read client key '{key_path}': {e}"))
70 })?;
71 identity_pem.extend_from_slice(b"\n");
72 identity_pem.extend_from_slice(&key_pem);
73 let identity = reqwest::Identity::from_pem(&identity_pem).map_err(|e| {
74 Error::Network(format!("Invalid client certificate/key identity: {e}"))
75 })?;
76 builder = builder.use_rustls_tls().identity(identity);
77 }
78
79 let http_client = builder
80 .build()
81 .map_err(|e| Error::Network(format!("Failed to create HTTP client: {e}")))?;
82
83 Ok(Self {
84 http_client,
85 endpoint: alias.endpoint.trim_end_matches('/').to_string(),
86 access_key: alias.access_key.clone(),
87 secret_key: alias.secret_key.clone(),
88 region: alias.region.clone(),
89 anonymous: alias.anonymous,
90 })
91 }
92
93 fn admin_url(&self, path: &str) -> String {
95 format!("{}/rustfs/admin/v3{}", self.endpoint, path)
96 }
97
98 fn sha256_hash(body: &[u8]) -> String {
100 let mut hasher = Sha256::new();
101 hasher.update(body);
102 hex::encode(hasher.finalize())
103 }
104
105 async fn sign_request(
107 &self,
108 method: &Method,
109 url: &str,
110 headers: &HeaderMap,
111 body: &[u8],
112 ) -> Result<HeaderMap> {
113 if self.anonymous {
114 return Ok(headers.clone());
115 }
116
117 let credentials = Credentials::new(
118 &self.access_key,
119 &self.secret_key,
120 None,
121 None,
122 "admin-client",
123 );
124
125 let identity = credentials.into();
126 let mut signing_settings = SigningSettings::default();
127 signing_settings.signature_location = SignatureLocation::Headers;
128
129 let signing_params = v4::SigningParams::builder()
130 .identity(&identity)
131 .region(&self.region)
132 .name("s3")
133 .time(SystemTime::now())
134 .settings(signing_settings)
135 .build()
136 .map_err(|e| Error::Auth(format!("Failed to build signing params: {e}")))?;
137
138 let header_pairs: Vec<(&str, &str)> = headers
140 .iter()
141 .filter_map(|(k, v)| v.to_str().ok().map(|v| (k.as_str(), v)))
142 .collect();
143
144 let signable_body = SignableBody::Bytes(body);
145
146 let signable_request = SignableRequest::new(
147 method.as_str(),
148 url,
149 header_pairs.into_iter(),
150 signable_body,
151 )
152 .map_err(|e| Error::Auth(format!("Failed to create signable request: {e}")))?;
153
154 let (signing_instructions, _signature) = sign(signable_request, &signing_params.into())
155 .map_err(|e| Error::Auth(format!("Failed to sign request: {e}")))?
156 .into_parts();
157
158 let mut signed_headers = headers.clone();
160 for (name, value) in signing_instructions.headers() {
161 let header_name = HeaderName::try_from(&name.to_string())
162 .map_err(|e| Error::Auth(format!("Invalid header name: {e}")))?;
163 let header_value = HeaderValue::try_from(&value.to_string())
164 .map_err(|e| Error::Auth(format!("Invalid header value: {e}")))?;
165 signed_headers.insert(header_name, header_value);
166 }
167
168 Ok(signed_headers)
169 }
170
171 async fn request<T: for<'de> Deserialize<'de>>(
173 &self,
174 method: Method,
175 path: &str,
176 query: Option<&[(&str, &str)]>,
177 body: Option<&[u8]>,
178 ) -> Result<T> {
179 let mut url = self.admin_url(path);
180
181 if let Some(q) = query {
182 let query_string: String = q
183 .iter()
184 .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
185 .collect::<Vec<_>>()
186 .join("&");
187 if !query_string.is_empty() {
188 url.push('?');
189 url.push_str(&query_string);
190 }
191 }
192
193 let body_bytes = body.unwrap_or(&[]);
194 let content_hash = Self::sha256_hash(body_bytes);
195
196 let mut headers = HeaderMap::new();
197 headers.insert("x-amz-content-sha256", content_hash.parse().unwrap());
198 headers.insert("host", self.get_host().parse().unwrap());
199
200 if !body_bytes.is_empty() {
201 headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
202 }
203
204 let signed_headers = self
205 .sign_request(&method, &url, &headers, body_bytes)
206 .await?;
207
208 let mut request_builder = self.http_client.request(method.clone(), &url);
209
210 for (name, value) in signed_headers.iter() {
211 request_builder = request_builder.header(name, value);
212 }
213
214 if !body_bytes.is_empty() {
215 request_builder = request_builder.body(body_bytes.to_vec());
216 }
217
218 let response = request_builder
219 .send()
220 .await
221 .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
222
223 let status = response.status();
224
225 if !status.is_success() {
226 let error_body = response
227 .text()
228 .await
229 .unwrap_or_else(|_| "Unknown error".to_string());
230 return Err(self.map_error(status, &error_body));
231 }
232
233 let text = response
234 .text()
235 .await
236 .map_err(|e| Error::Network(format!("Failed to read response: {e}")))?;
237
238 if text.is_empty() {
239 serde_json::from_str("null").map_err(Error::Json)
241 } else {
242 serde_json::from_str(&text).map_err(Error::Json)
243 }
244 }
245
246 async fn request_no_response(
248 &self,
249 method: Method,
250 path: &str,
251 query: Option<&[(&str, &str)]>,
252 body: Option<&[u8]>,
253 ) -> Result<()> {
254 let mut url = self.admin_url(path);
255
256 if let Some(q) = query {
257 let query_string: String = q
258 .iter()
259 .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
260 .collect::<Vec<_>>()
261 .join("&");
262 if !query_string.is_empty() {
263 url.push('?');
264 url.push_str(&query_string);
265 }
266 }
267
268 let body_bytes = body.unwrap_or(&[]);
269 let content_hash = Self::sha256_hash(body_bytes);
270
271 let mut headers = HeaderMap::new();
272 headers.insert("x-amz-content-sha256", content_hash.parse().unwrap());
273 headers.insert("host", self.get_host().parse().unwrap());
274
275 if !body_bytes.is_empty() {
276 headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
277 }
278
279 let signed_headers = self
280 .sign_request(&method, &url, &headers, body_bytes)
281 .await?;
282
283 let mut request_builder = self.http_client.request(method.clone(), &url);
284
285 for (name, value) in signed_headers.iter() {
286 request_builder = request_builder.header(name, value);
287 }
288
289 if !body_bytes.is_empty() {
290 request_builder = request_builder.body(body_bytes.to_vec());
291 }
292
293 let response = request_builder
294 .send()
295 .await
296 .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
297
298 let status = response.status();
299
300 if !status.is_success() {
301 let error_body = response
302 .text()
303 .await
304 .unwrap_or_else(|_| "Unknown error".to_string());
305 return Err(self.map_error(status, &error_body));
306 }
307
308 Ok(())
309 }
310
311 fn get_host(&self) -> String {
313 self.endpoint
314 .trim_start_matches("http://")
315 .trim_start_matches("https://")
316 .to_string()
317 }
318
319 fn map_error(&self, status: StatusCode, body: &str) -> Error {
321 match status {
322 StatusCode::NOT_FOUND => Error::NotFound(body.to_string()),
323 StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => Error::Auth(body.to_string()),
324 StatusCode::CONFLICT => Error::Conflict(body.to_string()),
325 StatusCode::BAD_REQUEST => Error::General(format!("Bad request: {body}")),
326 _ => Error::Network(format!("HTTP {}: {}", status.as_u16(), body)),
327 }
328 }
329}
330
331#[derive(Debug, Deserialize)]
333struct UserListResponse(HashMap<String, UserInfo>);
334
335#[derive(Debug, Deserialize)]
336#[serde(rename_all = "camelCase")]
337struct UserInfo {
338 #[serde(default)]
339 status: String,
340 #[serde(default)]
341 policy_name: Option<String>,
342 #[serde(default)]
343 member_of: Option<Vec<String>>,
344}
345
346#[derive(Debug, Deserialize)]
348struct PolicyListResponse(HashMap<String, serde_json::Value>);
349
350#[derive(Debug, Serialize)]
352#[serde(rename_all = "camelCase")]
353struct CreateUserRequest {
354 secret_key: String,
355 status: String,
356}
357
358#[derive(Debug, Serialize)]
360struct CreateGroupRequest {
361 group: String,
362 #[serde(skip_serializing_if = "Option::is_none")]
363 members: Option<Vec<String>>,
364}
365
366#[derive(Debug, Deserialize)]
368struct ServiceAccountListResponse {
369 accounts: Option<Vec<ServiceAccountInfo>>,
370}
371
372#[derive(Debug, Deserialize)]
373#[serde(rename_all = "camelCase")]
374struct ServiceAccountInfo {
375 access_key: String,
376 #[serde(default)]
377 parent_user: Option<String>,
378 #[serde(default)]
379 account_status: Option<String>,
380 #[serde(default)]
381 expiration: Option<String>,
382 #[serde(default)]
383 name: Option<String>,
384 #[serde(default)]
385 description: Option<String>,
386 #[serde(default)]
387 implied_policy: Option<bool>,
388}
389
390#[derive(Debug, Deserialize)]
391#[serde(rename_all = "camelCase")]
392struct BackgroundHealStatusResponse {
393 #[serde(default)]
394 bitrot_start_time: Option<String>,
395}
396
397impl From<BackgroundHealStatusResponse> for HealStatus {
398 fn from(response: BackgroundHealStatusResponse) -> Self {
399 Self {
400 healing: response.bitrot_start_time.is_some(),
401 started: response.bitrot_start_time,
402 ..Default::default()
403 }
404 }
405}
406
407#[derive(Debug, Serialize)]
408struct RustfsHealOptions {
409 recursive: bool,
410 #[serde(rename = "dryRun")]
411 dry_run: bool,
412 remove: bool,
413 recreate: bool,
414 #[serde(rename = "scanMode")]
415 scan_mode: u8,
416 #[serde(rename = "updateParity")]
417 update_parity: bool,
418 #[serde(rename = "nolock")]
419 no_lock: bool,
420}
421
422impl From<&HealStartRequest> for RustfsHealOptions {
423 fn from(request: &HealStartRequest) -> Self {
424 Self {
425 recursive: false,
426 dry_run: request.dry_run,
427 remove: request.remove,
428 recreate: request.recreate,
429 scan_mode: rustfs_heal_scan_mode(request.scan_mode),
430 update_parity: false,
431 no_lock: false,
432 }
433 }
434}
435
436fn rustfs_heal_scan_mode(scan_mode: HealScanMode) -> u8 {
437 match scan_mode {
438 HealScanMode::Normal => 1,
439 HealScanMode::Deep => 2,
440 }
441}
442
443fn rustfs_heal_path(request: &HealStartRequest) -> Result<String> {
444 let bucket = request
445 .bucket
446 .as_deref()
447 .filter(|bucket| !bucket.is_empty());
448 let prefix = request
449 .prefix
450 .as_deref()
451 .filter(|prefix| !prefix.is_empty());
452
453 match (bucket, prefix) {
454 (None, None) => Ok("/heal/".to_string()),
455 (Some(bucket), None) => Ok(format!("/heal/{}", urlencoding::encode(bucket))),
456 (Some(bucket), Some(prefix)) => Ok(format!(
457 "/heal/{}/{}",
458 urlencoding::encode(bucket),
459 urlencoding::encode(prefix)
460 )),
461 (None, Some(_)) => Err(Error::InvalidPath(
462 "heal prefix requires a bucket target".to_string(),
463 )),
464 }
465}
466
467fn rustfs_heal_body(request: &HealStartRequest) -> Result<Vec<u8>> {
468 serde_json::to_vec(&RustfsHealOptions::from(request)).map_err(Error::Json)
469}
470
471fn pool_target_query(target: &PoolTarget) -> Vec<(&str, &str)> {
472 let mut query = vec![("pool", target.pool.as_str())];
473 if target.by_id {
474 query.push(("by-id", "true"));
475 }
476 query
477}
478
479#[derive(Debug, Serialize)]
481#[serde(rename_all = "camelCase")]
482struct SetBucketQuotaApiRequest {
483 quota: u64,
484 quota_type: String,
485}
486
487#[async_trait]
488impl AdminApi for AdminClient {
489 async fn cluster_info(&self) -> Result<ClusterInfo> {
492 self.request(Method::GET, "/info", None, None).await
493 }
494
495 async fn heal_status(&self) -> Result<HealStatus> {
496 let response: BackgroundHealStatusResponse = self
497 .request(Method::POST, "/background-heal/status", None, None)
498 .await?;
499 Ok(response.into())
500 }
501
502 async fn heal_start(&self, request: HealStartRequest) -> Result<HealStatus> {
503 let path = rustfs_heal_path(&request)?;
504 let body = rustfs_heal_body(&request)?;
505 self.request_no_response(Method::POST, &path, None, Some(&body))
506 .await?;
507 Ok(HealStatus::default())
508 }
509
510 async fn heal_stop(&self) -> Result<()> {
511 let body = rustfs_heal_body(&HealStartRequest::default())?;
512 self.request_no_response(
513 Method::POST,
514 "/heal/",
515 Some(&[("forceStop", "true")]),
516 Some(&body),
517 )
518 .await
519 }
520
521 async fn list_pools(&self) -> Result<Vec<PoolStatus>> {
522 self.request(Method::GET, "/pools/list", None, None).await
523 }
524
525 async fn pool_status(&self, target: PoolTarget) -> Result<PoolStatus> {
526 let query = pool_target_query(&target);
527 self.request(Method::GET, "/pools/status", Some(&query), None)
528 .await
529 }
530
531 async fn decommission_start(&self, target: PoolTarget) -> Result<()> {
532 let query = pool_target_query(&target);
533 self.request_no_response(Method::POST, "/pools/decommission", Some(&query), None)
534 .await
535 }
536
537 async fn decommission_cancel(&self, target: PoolTarget) -> Result<()> {
538 let query = pool_target_query(&target);
539 self.request_no_response(Method::POST, "/pools/cancel", Some(&query), None)
540 .await
541 }
542
543 async fn rebalance_start(&self) -> Result<RebalanceStartResult> {
544 self.request(Method::POST, "/rebalance/start", None, None)
545 .await
546 }
547
548 async fn rebalance_status(&self) -> Result<RebalanceStatus> {
549 self.request(Method::GET, "/rebalance/status", None, None)
550 .await
551 }
552
553 async fn rebalance_stop(&self) -> Result<()> {
554 self.request_no_response(Method::POST, "/rebalance/stop", None, None)
555 .await
556 }
557
558 async fn list_users(&self) -> Result<Vec<User>> {
561 let response: UserListResponse =
562 self.request(Method::GET, "/list-users", None, None).await?;
563
564 Ok(response
565 .0
566 .into_iter()
567 .map(|(access_key, info)| User {
568 access_key,
569 secret_key: None,
570 status: if info.status == "disabled" {
571 UserStatus::Disabled
572 } else {
573 UserStatus::Enabled
574 },
575 policy_name: info.policy_name,
576 member_of: info.member_of.unwrap_or_default(),
577 })
578 .collect())
579 }
580
581 async fn get_user(&self, access_key: &str) -> Result<User> {
582 let query = [("accessKey", access_key)];
583 let response: UserInfo = self
584 .request(Method::GET, "/user-info", Some(&query), None)
585 .await?;
586
587 Ok(User {
588 access_key: access_key.to_string(),
589 secret_key: None,
590 status: if response.status == "disabled" {
591 UserStatus::Disabled
592 } else {
593 UserStatus::Enabled
594 },
595 policy_name: response.policy_name,
596 member_of: response.member_of.unwrap_or_default(),
597 })
598 }
599
600 async fn create_user(&self, access_key: &str, secret_key: &str) -> Result<User> {
601 let query = [("accessKey", access_key)];
602 let body = serde_json::to_vec(&CreateUserRequest {
603 secret_key: secret_key.to_string(),
604 status: "enabled".to_string(),
605 })
606 .map_err(Error::Json)?;
607
608 self.request_no_response(Method::PUT, "/add-user", Some(&query), Some(&body))
609 .await?;
610
611 Ok(User {
612 access_key: access_key.to_string(),
613 secret_key: Some(secret_key.to_string()),
614 status: UserStatus::Enabled,
615 policy_name: None,
616 member_of: vec![],
617 })
618 }
619
620 async fn delete_user(&self, access_key: &str) -> Result<()> {
621 let query = [("accessKey", access_key)];
622 self.request_no_response(Method::DELETE, "/remove-user", Some(&query), None)
623 .await
624 }
625
626 async fn set_user_status(&self, access_key: &str, status: UserStatus) -> Result<()> {
627 let status_str = match status {
628 UserStatus::Enabled => "enabled",
629 UserStatus::Disabled => "disabled",
630 };
631 let query = [("accessKey", access_key), ("status", status_str)];
632 self.request_no_response(Method::PUT, "/set-user-status", Some(&query), None)
633 .await
634 }
635
636 async fn list_policies(&self) -> Result<Vec<PolicyInfo>> {
639 let response: PolicyListResponse = self
640 .request(Method::GET, "/list-canned-policies", None, None)
641 .await?;
642
643 Ok(response
644 .0
645 .into_keys()
646 .map(|name| PolicyInfo { name })
647 .collect())
648 }
649
650 async fn get_policy(&self, name: &str) -> Result<Policy> {
651 let query = [("name", name)];
652 let policy_doc: serde_json::Value = self
653 .request(Method::GET, "/info-canned-policy", Some(&query), None)
654 .await?;
655
656 Ok(Policy {
657 name: name.to_string(),
658 policy: serde_json::to_string_pretty(&policy_doc).unwrap_or_default(),
659 })
660 }
661
662 async fn create_policy(&self, name: &str, policy_document: &str) -> Result<()> {
663 let query = [("name", name)];
664 let body = policy_document.as_bytes();
665 self.request_no_response(Method::PUT, "/add-canned-policy", Some(&query), Some(body))
666 .await
667 }
668
669 async fn delete_policy(&self, name: &str) -> Result<()> {
670 let query = [("name", name)];
671 self.request_no_response(Method::DELETE, "/remove-canned-policy", Some(&query), None)
672 .await
673 }
674
675 async fn attach_policy(
676 &self,
677 policy_names: &[String],
678 entity_type: PolicyEntity,
679 entity_name: &str,
680 ) -> Result<()> {
681 let policy_name = policy_names.join(",");
682 let is_group = entity_type == PolicyEntity::Group;
683
684 let query = [
685 ("policyName", policy_name.as_str()),
686 ("userOrGroup", entity_name),
687 ("isGroup", if is_group { "true" } else { "false" }),
688 ];
689
690 self.request_no_response(Method::PUT, "/set-user-or-group-policy", Some(&query), None)
691 .await
692 }
693
694 async fn detach_policy(
695 &self,
696 policy_names: &[String],
697 entity_type: PolicyEntity,
698 entity_name: &str,
699 ) -> Result<()> {
700 let _ = (policy_names, entity_type, entity_name);
704 Err(Error::UnsupportedFeature(
705 "Policy detach not directly supported. Use attach with remaining policies instead."
706 .to_string(),
707 ))
708 }
709
710 async fn list_groups(&self) -> Result<Vec<String>> {
713 let response: Vec<String> = self.request(Method::GET, "/groups", None, None).await?;
714 Ok(response)
715 }
716
717 async fn get_group(&self, name: &str) -> Result<Group> {
718 let query = [("group", name)];
719 let response: Group = self
720 .request(Method::GET, "/group", Some(&query), None)
721 .await?;
722 Ok(response)
723 }
724
725 async fn create_group(&self, name: &str, members: Option<&[String]>) -> Result<Group> {
726 let body = serde_json::to_vec(&CreateGroupRequest {
727 group: name.to_string(),
728 members: members.map(|m| m.to_vec()),
729 })
730 .map_err(Error::Json)?;
731
732 self.request_no_response(Method::POST, "/groups", None, Some(&body))
733 .await?;
734
735 Ok(Group {
736 name: name.to_string(),
737 policy: None,
738 members: members.map(|m| m.to_vec()).unwrap_or_default(),
739 status: GroupStatus::Enabled,
740 })
741 }
742
743 async fn delete_group(&self, name: &str) -> Result<()> {
744 let path = format!("/group/{}", urlencoding::encode(name));
745 self.request_no_response(Method::DELETE, &path, None, None)
746 .await
747 }
748
749 async fn set_group_status(&self, name: &str, status: GroupStatus) -> Result<()> {
750 let status_str = match status {
751 GroupStatus::Enabled => "enabled",
752 GroupStatus::Disabled => "disabled",
753 };
754 let query = [("group", name), ("status", status_str)];
755 self.request_no_response(Method::PUT, "/set-group-status", Some(&query), None)
756 .await
757 }
758
759 async fn add_group_members(&self, group: &str, members: &[String]) -> Result<()> {
760 let body = serde_json::to_vec(&UpdateGroupMembersRequest {
761 group: group.to_string(),
762 members: members.to_vec(),
763 is_remove: false,
764 status: "enabled".to_string(),
765 })
766 .map_err(Error::Json)?;
767
768 self.request_no_response(Method::PUT, "/update-group-members", None, Some(&body))
769 .await
770 }
771
772 async fn remove_group_members(&self, group: &str, members: &[String]) -> Result<()> {
773 let body = serde_json::to_vec(&UpdateGroupMembersRequest {
774 group: group.to_string(),
775 members: members.to_vec(),
776 is_remove: true,
777 status: "enabled".to_string(),
778 })
779 .map_err(Error::Json)?;
780
781 self.request_no_response(Method::PUT, "/update-group-members", None, Some(&body))
782 .await
783 }
784
785 async fn list_service_accounts(&self, user: Option<&str>) -> Result<Vec<ServiceAccount>> {
788 let query: Vec<(&str, &str)> = user.map(|u| vec![("user", u)]).unwrap_or_default();
789 let query_ref: Option<&[(&str, &str)]> = if query.is_empty() { None } else { Some(&query) };
790
791 let response: ServiceAccountListResponse = self
792 .request(Method::GET, "/list-service-accounts", query_ref, None)
793 .await?;
794
795 Ok(response
796 .accounts
797 .unwrap_or_default()
798 .into_iter()
799 .map(|sa| ServiceAccount {
800 access_key: sa.access_key,
801 secret_key: None,
802 parent_user: sa.parent_user,
803 policy: None,
804 account_status: sa.account_status,
805 expiration: sa.expiration,
806 name: sa.name,
807 description: sa.description,
808 implied_policy: sa.implied_policy,
809 })
810 .collect())
811 }
812
813 async fn get_service_account(&self, access_key: &str) -> Result<ServiceAccount> {
814 let query = [("accessKey", access_key)];
815 let response: ServiceAccount = self
816 .request(Method::GET, "/info-service-account", Some(&query), None)
817 .await?;
818
819 let mut response = response;
820 if response.access_key.is_empty() {
821 response.access_key = access_key.to_string();
822 }
823 Ok(response)
824 }
825
826 async fn create_service_account(
827 &self,
828 request: CreateServiceAccountRequest,
829 ) -> Result<ServiceAccount> {
830 let body = serde_json::to_vec(&request).map_err(Error::Json)?;
831 let response: ServiceAccountCreateResponse = self
832 .request(Method::PUT, "/add-service-accounts", None, Some(&body))
833 .await?;
834
835 Ok(ServiceAccount {
836 access_key: response.credentials.access_key,
837 secret_key: Some(response.credentials.secret_key),
838 expiration: response.credentials.expiration,
839 parent_user: None,
840 policy: None,
841 account_status: None,
842 name: None,
843 description: None,
844 implied_policy: None,
845 })
846 }
847
848 async fn delete_service_account(&self, access_key: &str) -> Result<()> {
849 let query = [("accessKey", access_key)];
850 self.request_no_response(
851 Method::DELETE,
852 "/delete-service-accounts",
853 Some(&query),
854 None,
855 )
856 .await
857 }
858
859 async fn set_bucket_quota(&self, bucket: &str, quota: u64) -> Result<BucketQuota> {
862 let path = format!("/quota/{}", urlencoding::encode(bucket));
863 let body = serde_json::to_vec(&SetBucketQuotaApiRequest {
864 quota,
865 quota_type: "HARD".to_string(),
866 })
867 .map_err(Error::Json)?;
868
869 self.request(Method::PUT, &path, None, Some(&body)).await
870 }
871
872 async fn get_bucket_quota(&self, bucket: &str) -> Result<BucketQuota> {
873 let path = format!("/quota/{}", urlencoding::encode(bucket));
874 self.request(Method::GET, &path, None, None).await
875 }
876
877 async fn clear_bucket_quota(&self, bucket: &str) -> Result<BucketQuota> {
878 let path = format!("/quota/{}", urlencoding::encode(bucket));
879 self.request(Method::DELETE, &path, None, None).await
880 }
881
882 async fn list_tiers(&self) -> Result<Vec<rc_core::admin::TierConfig>> {
885 self.request(Method::GET, "/tier", None, None).await
886 }
887
888 async fn tier_stats(&self) -> Result<serde_json::Value> {
889 self.request(Method::GET, "/tier-stats", None, None).await
890 }
891
892 async fn add_tier(&self, config: rc_core::admin::TierConfig) -> Result<()> {
893 let body = serde_json::to_vec(&config).map_err(Error::Json)?;
894 self.request_no_response(Method::PUT, "/tier", None, Some(&body))
895 .await
896 }
897
898 async fn edit_tier(&self, name: &str, creds: rc_core::admin::TierCreds) -> Result<()> {
899 let path = format!("/tier/{}", urlencoding::encode(name));
900 let body = serde_json::to_vec(&creds).map_err(Error::Json)?;
901 self.request_no_response(Method::POST, &path, None, Some(&body))
902 .await
903 }
904
905 async fn remove_tier(&self, name: &str, force: bool) -> Result<()> {
906 let path = format!("/tier/{}", urlencoding::encode(name));
907 if force {
908 let query: &[(&str, &str)] = &[("force", "true")];
909 self.request_no_response(Method::DELETE, &path, Some(query), None)
910 .await
911 } else {
912 self.request_no_response(Method::DELETE, &path, None, None)
913 .await
914 }
915 }
916
917 async fn set_remote_target(
920 &self,
921 bucket: &str,
922 target: rc_core::replication::BucketTarget,
923 update: bool,
924 ) -> Result<String> {
925 let body = serde_json::to_vec(&target).map_err(Error::Json)?;
926 if update {
927 let query: &[(&str, &str)] = &[("bucket", bucket), ("update", "true")];
928 self.request(Method::PUT, "/set-remote-target", Some(query), Some(&body))
929 .await
930 } else {
931 let query: &[(&str, &str)] = &[("bucket", bucket)];
932 self.request(Method::PUT, "/set-remote-target", Some(query), Some(&body))
933 .await
934 }
935 }
936
937 async fn list_remote_targets(
938 &self,
939 bucket: &str,
940 ) -> Result<Vec<rc_core::replication::BucketTarget>> {
941 let query: &[(&str, &str)] = &[("bucket", bucket)];
942 self.request(Method::GET, "/list-remote-targets", Some(query), None)
943 .await
944 }
945
946 async fn remove_remote_target(&self, bucket: &str, arn: &str) -> Result<()> {
947 let query: &[(&str, &str)] = &[("bucket", bucket), ("arn", arn)];
948 self.request_no_response(Method::DELETE, "/remove-remote-target", Some(query), None)
949 .await
950 }
951
952 async fn replication_metrics(&self, bucket: &str) -> Result<serde_json::Value> {
953 let query: &[(&str, &str)] = &[("bucket", bucket)];
954 self.request(Method::GET, "/replicationmetrics", Some(query), None)
955 .await
956 }
957}
958
959#[cfg(test)]
960mod tests {
961 use super::*;
962 use std::io::{Read, Write};
963 use std::net::{TcpListener, TcpStream};
964 use std::sync::mpsc;
965 use std::thread;
966 use std::time::Duration;
967 use tempfile::tempdir;
968
969 #[derive(Debug)]
970 struct CapturedAdminRequest {
971 method: String,
972 target: String,
973 headers: String,
974 body: Vec<u8>,
975 }
976
977 fn read_admin_request(stream: &mut TcpStream) -> CapturedAdminRequest {
978 let mut buffer = Vec::new();
979 let mut chunk = [0_u8; 1024];
980 let header_end = loop {
981 let read = stream.read(&mut chunk).expect("read HTTP request");
982 assert!(read > 0, "client closed connection before headers");
983 buffer.extend_from_slice(&chunk[..read]);
984
985 if let Some(position) = buffer.windows(4).position(|window| window == b"\r\n\r\n") {
986 break position + 4;
987 }
988 };
989
990 let headers = String::from_utf8_lossy(&buffer[..header_end]).into_owned();
991 let content_length = headers
992 .lines()
993 .find_map(|line| {
994 let (name, value) = line.split_once(':')?;
995 name.eq_ignore_ascii_case("content-length")
996 .then(|| value.trim().parse::<usize>().expect("valid content length"))
997 })
998 .unwrap_or(0);
999
1000 while buffer.len() - header_end < content_length {
1001 let read = stream.read(&mut chunk).expect("read HTTP request body");
1002 assert!(read > 0, "client closed connection before body");
1003 buffer.extend_from_slice(&chunk[..read]);
1004 }
1005
1006 let request_line = headers.lines().next().expect("request line");
1007 let mut parts = request_line.split_whitespace();
1008 let method = parts.next().expect("request method").to_string();
1009 let target = parts.next().expect("request target").to_string();
1010 let body = buffer[header_end..header_end + content_length].to_vec();
1011
1012 CapturedAdminRequest {
1013 method,
1014 target,
1015 headers,
1016 body,
1017 }
1018 }
1019
1020 fn start_admin_test_server(
1021 response_status: &str,
1022 response_body: &'static str,
1023 ) -> (
1024 String,
1025 mpsc::Receiver<CapturedAdminRequest>,
1026 thread::JoinHandle<()>,
1027 ) {
1028 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
1029 let endpoint = format!("http://{}", listener.local_addr().expect("local addr"));
1030 let (sender, receiver) = mpsc::channel();
1031 let response_status = response_status.to_string();
1032
1033 let handle = thread::spawn(move || {
1034 let (mut stream, _) = listener.accept().expect("accept request");
1035 let request = read_admin_request(&mut stream);
1036 sender.send(request).expect("send captured request");
1037
1038 let response = format!(
1039 "HTTP/1.1 {response_status}\r\ncontent-length: {}\r\ncontent-type: application/json\r\nconnection: close\r\n\r\n{response_body}",
1040 response_body.len()
1041 );
1042 stream
1043 .write_all(response.as_bytes())
1044 .expect("write HTTP response");
1045 });
1046
1047 (endpoint, receiver, handle)
1048 }
1049
1050 fn admin_client_for_endpoint(endpoint: &str) -> AdminClient {
1051 let alias = Alias::new("test", endpoint, "access", "secret");
1052 AdminClient::new(&alias).expect("admin client should build")
1053 }
1054
1055 fn anonymous_admin_client_for_endpoint(endpoint: &str) -> AdminClient {
1056 let mut alias = Alias::new("test", endpoint, "", "");
1057 alias.anonymous = true;
1058 AdminClient::new(&alias).expect("anonymous admin client should build")
1059 }
1060
1061 fn assert_heal_options_body(
1062 body: &[u8],
1063 scan_mode: u8,
1064 remove: bool,
1065 recreate: bool,
1066 dry_run: bool,
1067 ) {
1068 let value: serde_json::Value =
1069 serde_json::from_slice(body).expect("heal request body should be JSON");
1070
1071 assert_eq!(value["recursive"], false);
1072 assert_eq!(value["dryRun"], dry_run);
1073 assert_eq!(value["remove"], remove);
1074 assert_eq!(value["recreate"], recreate);
1075 assert_eq!(value["scanMode"], scan_mode);
1076 assert_eq!(value["updateParity"], false);
1077 assert_eq!(value["nolock"], false);
1078 assert!(value.get("bucket").is_none());
1079 assert!(value.get("prefix").is_none());
1080 }
1081
1082 #[test]
1083 fn test_admin_url_construction() {
1084 let alias = Alias::new("test", "http://localhost:9000", "access", "secret");
1085 let client = AdminClient::new(&alias).unwrap();
1086
1087 assert_eq!(
1088 client.admin_url("/list-users"),
1089 "http://localhost:9000/rustfs/admin/v3/list-users"
1090 );
1091 }
1092
1093 #[test]
1094 fn test_admin_url_with_trailing_slash() {
1095 let alias = Alias::new("test", "http://localhost:9000/", "access", "secret");
1096 let client = AdminClient::new(&alias).unwrap();
1097
1098 assert_eq!(
1099 client.admin_url("/list-users"),
1100 "http://localhost:9000/rustfs/admin/v3/list-users"
1101 );
1102 }
1103
1104 #[test]
1105 fn test_get_host() {
1106 let alias = Alias::new("test", "https://s3.example.com", "access", "secret");
1107 let client = AdminClient::new(&alias).unwrap();
1108
1109 assert_eq!(client.get_host(), "s3.example.com");
1110 }
1111
1112 #[test]
1113 fn test_sha256_hash() {
1114 let hash = AdminClient::sha256_hash(b"test");
1115 assert_eq!(
1116 hash,
1117 "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
1118 );
1119 }
1120
1121 #[test]
1122 fn test_sha256_hash_empty() {
1123 let hash = AdminClient::sha256_hash(b"");
1124 assert_eq!(
1125 hash,
1126 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
1127 );
1128 }
1129
1130 #[test]
1131 fn test_rustfs_heal_path_matches_admin_routes() {
1132 assert_eq!(
1133 rustfs_heal_path(&HealStartRequest::default()).expect("root path"),
1134 "/heal/"
1135 );
1136
1137 let bucket_request = HealStartRequest {
1138 bucket: Some("photos".to_string()),
1139 ..Default::default()
1140 };
1141 assert_eq!(
1142 rustfs_heal_path(&bucket_request).expect("bucket path"),
1143 "/heal/photos"
1144 );
1145
1146 let prefix_request = HealStartRequest {
1147 bucket: Some("photos".to_string()),
1148 prefix: Some("2026/raw".to_string()),
1149 ..Default::default()
1150 };
1151 assert_eq!(
1152 rustfs_heal_path(&prefix_request).expect("prefix path"),
1153 "/heal/photos/2026%2Fraw"
1154 );
1155
1156 let invalid_request = HealStartRequest {
1157 prefix: Some("2026/raw".to_string()),
1158 ..Default::default()
1159 };
1160 assert!(matches!(
1161 rustfs_heal_path(&invalid_request),
1162 Err(Error::InvalidPath(_))
1163 ));
1164 }
1165
1166 #[test]
1167 fn test_rustfs_heal_body_matches_server_heal_options() {
1168 let request = HealStartRequest {
1169 scan_mode: HealScanMode::Deep,
1170 remove: true,
1171 recreate: true,
1172 dry_run: true,
1173 ..Default::default()
1174 };
1175
1176 let body = rustfs_heal_body(&request).expect("heal options should serialize");
1177 let value: serde_json::Value =
1178 serde_json::from_slice(&body).expect("heal options body should be JSON");
1179
1180 assert_eq!(value["recursive"], false);
1181 assert_eq!(value["dryRun"], true);
1182 assert_eq!(value["remove"], true);
1183 assert_eq!(value["recreate"], true);
1184 assert_eq!(value["scanMode"], 2);
1185 assert_eq!(value["updateParity"], false);
1186 assert_eq!(value["nolock"], false);
1187 assert!(value.get("bucket").is_none());
1188 assert!(value.get("prefix").is_none());
1189 }
1190
1191 #[test]
1192 fn test_background_heal_status_response_maps_to_heal_status() {
1193 let status = HealStatus::from(BackgroundHealStatusResponse {
1194 bitrot_start_time: Some("2026-04-19T10:00:00Z".to_string()),
1195 });
1196
1197 assert!(status.healing);
1198 assert_eq!(status.started.as_deref(), Some("2026-04-19T10:00:00Z"));
1199
1200 let idle = HealStatus::from(BackgroundHealStatusResponse {
1201 bitrot_start_time: None,
1202 });
1203 assert!(!idle.healing);
1204 assert!(idle.started.is_none());
1205 }
1206
1207 #[test]
1208 fn test_bad_request_maps_to_general_admin_error() {
1209 let alias = Alias::new("test", "http://localhost:9000", "access", "secret");
1210 let client = AdminClient::new(&alias).expect("admin client should build");
1211
1212 let error = client.map_error(StatusCode::BAD_REQUEST, "err request body parse");
1213 assert!(matches!(error, Error::General(_)));
1214 assert_eq!(error.to_string(), "Bad request: err request body parse");
1215 }
1216
1217 #[tokio::test]
1218 async fn test_heal_status_uses_background_heal_status_endpoint() {
1219 let (endpoint, receiver, handle) =
1220 start_admin_test_server("200 OK", r#"{"bitrotStartTime":"2026-04-19T10:00:00Z"}"#);
1221 let client = admin_client_for_endpoint(&endpoint);
1222
1223 let status = client.heal_status().await.expect("heal status request");
1224
1225 assert!(status.healing);
1226 assert_eq!(status.started.as_deref(), Some("2026-04-19T10:00:00Z"));
1227
1228 let request = receiver.recv().expect("captured request");
1229 assert_eq!(request.method, "POST");
1230 assert_eq!(request.target, "/rustfs/admin/v3/background-heal/status");
1231 assert!(request.body.is_empty());
1232 handle.join().expect("server thread should finish");
1233 }
1234
1235 #[tokio::test]
1236 async fn test_anonymous_admin_requests_skip_authorization_header() {
1237 let (endpoint, receiver, handle) =
1238 start_admin_test_server("200 OK", r#"{"bitrotStartTime":"2026-04-19T10:00:00Z"}"#);
1239 let client = anonymous_admin_client_for_endpoint(&endpoint);
1240
1241 client
1242 .heal_status()
1243 .await
1244 .expect("anonymous heal status request");
1245
1246 let request = receiver
1247 .recv_timeout(Duration::from_secs(5))
1248 .expect("captured request");
1249 assert_eq!(request.method, "POST");
1250 assert_eq!(request.target, "/rustfs/admin/v3/background-heal/status");
1251 assert!(
1252 !request
1253 .headers
1254 .lines()
1255 .any(|line| line.to_ascii_lowercase().starts_with("authorization:"))
1256 );
1257 handle.join().expect("server thread should finish");
1258 }
1259
1260 #[tokio::test]
1261 async fn test_anonymous_admin_no_response_requests_skip_authorization_header() {
1262 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1263 let client = anonymous_admin_client_for_endpoint(&endpoint);
1264 let request = HealStartRequest {
1265 bucket: Some("raw photos".to_string()),
1266 prefix: Some("2026/april".to_string()),
1267 scan_mode: HealScanMode::Deep,
1268 remove: true,
1269 recreate: true,
1270 dry_run: true,
1271 };
1272
1273 client
1274 .heal_start(request)
1275 .await
1276 .expect("anonymous heal start request");
1277
1278 let request = receiver
1279 .recv_timeout(Duration::from_secs(5))
1280 .expect("captured request");
1281 assert_eq!(request.method, "POST");
1282 assert_eq!(
1283 request.target,
1284 "/rustfs/admin/v3/heal/raw%20photos/2026%2Fapril"
1285 );
1286 assert_heal_options_body(&request.body, 2, true, true, true);
1287 assert!(
1288 !request
1289 .headers
1290 .lines()
1291 .any(|line| line.to_ascii_lowercase().starts_with("authorization:"))
1292 );
1293 handle.join().expect("server thread should finish");
1294 }
1295
1296 #[tokio::test]
1297 async fn test_heal_start_posts_to_bucket_prefix_route_with_options_body() {
1298 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1299 let client = admin_client_for_endpoint(&endpoint);
1300 let request = HealStartRequest {
1301 bucket: Some("raw photos".to_string()),
1302 prefix: Some("2026/april".to_string()),
1303 scan_mode: HealScanMode::Deep,
1304 remove: true,
1305 recreate: true,
1306 dry_run: true,
1307 };
1308
1309 let status = client
1310 .heal_start(request)
1311 .await
1312 .expect("heal start request");
1313
1314 assert!(!status.healing);
1315 assert!(status.heal_id.is_empty());
1316 assert!(status.started.is_none());
1317
1318 let request = receiver.recv().expect("captured request");
1319 assert_eq!(request.method, "POST");
1320 assert_eq!(
1321 request.target,
1322 "/rustfs/admin/v3/heal/raw%20photos/2026%2Fapril"
1323 );
1324 assert_heal_options_body(&request.body, 2, true, true, true);
1325 handle.join().expect("server thread should finish");
1326 }
1327
1328 #[tokio::test]
1329 async fn test_heal_stop_posts_force_stop_to_root_heal_route() {
1330 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1331 let client = admin_client_for_endpoint(&endpoint);
1332
1333 client.heal_stop().await.expect("heal stop request");
1334
1335 let request = receiver.recv().expect("captured request");
1336 assert_eq!(request.method, "POST");
1337 assert_eq!(request.target, "/rustfs/admin/v3/heal/?forceStop=true");
1338 assert_heal_options_body(&request.body, 1, false, false, false);
1339 handle.join().expect("server thread should finish");
1340 }
1341
1342 #[tokio::test]
1343 async fn test_pool_status_uses_pool_status_route_with_by_id_query() {
1344 let (endpoint, receiver, handle) = start_admin_test_server(
1345 "200 OK",
1346 r#"{"id":1,"cmdline":"/data/pool1/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}"#,
1347 );
1348 let client = admin_client_for_endpoint(&endpoint);
1349
1350 let status = client
1351 .pool_status(PoolTarget {
1352 pool: "1".to_string(),
1353 by_id: true,
1354 })
1355 .await
1356 .expect("pool status request");
1357
1358 assert_eq!(status.id, 1);
1359 assert_eq!(status.cmd_line, "/data/pool1/disk{1...4}");
1360
1361 let request = receiver.recv().expect("captured request");
1362 assert_eq!(request.method, "GET");
1363 assert_eq!(
1364 request.target,
1365 "/rustfs/admin/v3/pools/status?pool=1&by-id=true"
1366 );
1367 assert!(request.body.is_empty());
1368 handle.join().expect("server thread should finish");
1369 }
1370
1371 #[tokio::test]
1372 async fn test_pool_status_uses_command_line_query_without_by_id() {
1373 let (endpoint, receiver, handle) = start_admin_test_server(
1374 "200 OK",
1375 r#"{"id":2,"cmdline":"/data/pool2/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}"#,
1376 );
1377 let client = admin_client_for_endpoint(&endpoint);
1378
1379 let status = client
1380 .pool_status(PoolTarget {
1381 pool: "/data/pool2/disk{1...4}".to_string(),
1382 by_id: false,
1383 })
1384 .await
1385 .expect("pool status request");
1386
1387 assert_eq!(status.id, 2);
1388 assert_eq!(status.cmd_line, "/data/pool2/disk{1...4}");
1389
1390 let request = receiver.recv().expect("captured request");
1391 assert_eq!(request.method, "GET");
1392 assert_eq!(
1393 request.target,
1394 "/rustfs/admin/v3/pools/status?pool=%2Fdata%2Fpool2%2Fdisk%7B1...4%7D"
1395 );
1396 assert!(request.body.is_empty());
1397 handle.join().expect("server thread should finish");
1398 }
1399
1400 #[tokio::test]
1401 async fn test_list_pools_uses_pool_list_route() {
1402 let (endpoint, receiver, handle) = start_admin_test_server(
1403 "200 OK",
1404 r#"[{"id":0,"cmdline":"/data/pool0/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}]"#,
1405 );
1406 let client = admin_client_for_endpoint(&endpoint);
1407
1408 let pools = client.list_pools().await.expect("list pools request");
1409
1410 assert_eq!(pools.len(), 1);
1411 assert_eq!(pools[0].id, 0);
1412 assert_eq!(pools[0].cmd_line, "/data/pool0/disk{1...4}");
1413
1414 let request = receiver.recv().expect("captured request");
1415 assert_eq!(request.method, "GET");
1416 assert_eq!(request.target, "/rustfs/admin/v3/pools/list");
1417 assert!(request.body.is_empty());
1418 handle.join().expect("server thread should finish");
1419 }
1420
1421 #[tokio::test]
1422 async fn test_decommission_start_posts_pool_query() {
1423 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1424 let client = admin_client_for_endpoint(&endpoint);
1425
1426 client
1427 .decommission_start(PoolTarget {
1428 pool: "/data/pool1/disk{1...4}".to_string(),
1429 by_id: false,
1430 })
1431 .await
1432 .expect("decommission start request");
1433
1434 let request = receiver.recv().expect("captured request");
1435 assert_eq!(request.method, "POST");
1436 assert_eq!(
1437 request.target,
1438 "/rustfs/admin/v3/pools/decommission?pool=%2Fdata%2Fpool1%2Fdisk%7B1...4%7D"
1439 );
1440 assert!(request.body.is_empty());
1441 handle.join().expect("server thread should finish");
1442 }
1443
1444 #[tokio::test]
1445 async fn test_decommission_start_posts_by_id_multi_pool_query() {
1446 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1447 let client = admin_client_for_endpoint(&endpoint);
1448
1449 client
1450 .decommission_start(PoolTarget {
1451 pool: "1,2".to_string(),
1452 by_id: true,
1453 })
1454 .await
1455 .expect("decommission start request");
1456
1457 let request = receiver.recv().expect("captured request");
1458 assert_eq!(request.method, "POST");
1459 assert_eq!(
1460 request.target,
1461 "/rustfs/admin/v3/pools/decommission?pool=1%2C2&by-id=true"
1462 );
1463 assert!(request.body.is_empty());
1464 handle.join().expect("server thread should finish");
1465 }
1466
1467 #[tokio::test]
1468 async fn test_decommission_cancel_posts_pool_cancel_route_with_by_id_query() {
1469 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1470 let client = admin_client_for_endpoint(&endpoint);
1471
1472 client
1473 .decommission_cancel(PoolTarget {
1474 pool: "1".to_string(),
1475 by_id: true,
1476 })
1477 .await
1478 .expect("decommission cancel request");
1479
1480 let request = receiver.recv().expect("captured request");
1481 assert_eq!(request.method, "POST");
1482 assert_eq!(
1483 request.target,
1484 "/rustfs/admin/v3/pools/cancel?pool=1&by-id=true"
1485 );
1486 assert!(request.body.is_empty());
1487 handle.join().expect("server thread should finish");
1488 }
1489
1490 #[tokio::test]
1491 async fn test_rebalance_start_posts_rebalance_start_route() {
1492 let (endpoint, receiver, handle) =
1493 start_admin_test_server("200 OK", r#"{"id":"rebalance-123"}"#);
1494 let client = admin_client_for_endpoint(&endpoint);
1495
1496 let result = client
1497 .rebalance_start()
1498 .await
1499 .expect("rebalance start request");
1500
1501 assert_eq!(result.id, "rebalance-123");
1502
1503 let request = receiver.recv().expect("captured request");
1504 assert_eq!(request.method, "POST");
1505 assert_eq!(request.target, "/rustfs/admin/v3/rebalance/start");
1506 assert!(request.body.is_empty());
1507 handle.join().expect("server thread should finish");
1508 }
1509
1510 #[tokio::test]
1511 async fn test_rebalance_status_gets_rebalance_status_route() {
1512 let (endpoint, receiver, handle) = start_admin_test_server(
1513 "200 OK",
1514 r#"{"id":"rebalance-123","pools":[],"stoppedAt":"2026-05-06T00:00:00Z"}"#,
1515 );
1516 let client = admin_client_for_endpoint(&endpoint);
1517
1518 let status = client
1519 .rebalance_status()
1520 .await
1521 .expect("rebalance status request");
1522
1523 assert_eq!(status.id, "rebalance-123");
1524 assert_eq!(status.stopped_at.as_deref(), Some("2026-05-06T00:00:00Z"));
1525
1526 let request = receiver.recv().expect("captured request");
1527 assert_eq!(request.method, "GET");
1528 assert_eq!(request.target, "/rustfs/admin/v3/rebalance/status");
1529 assert!(request.body.is_empty());
1530 handle.join().expect("server thread should finish");
1531 }
1532
1533 #[tokio::test]
1534 async fn test_rebalance_stop_posts_rebalance_stop_route() {
1535 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1536 let client = admin_client_for_endpoint(&endpoint);
1537
1538 client
1539 .rebalance_stop()
1540 .await
1541 .expect("rebalance stop request");
1542
1543 let request = receiver.recv().expect("captured request");
1544 assert_eq!(request.method, "POST");
1545 assert_eq!(request.target, "/rustfs/admin/v3/rebalance/stop");
1546 assert!(request.body.is_empty());
1547 handle.join().expect("server thread should finish");
1548 }
1549
1550 #[test]
1551 fn test_admin_client_invalid_ca_bundle_path_surfaces_error() {
1552 let mut alias = Alias::new("test", "https://localhost:9000", "access", "secret");
1553 alias.ca_bundle = Some("/definitely-not-here/ca.pem".to_string());
1554
1555 let result = AdminClient::new(&alias);
1556 match result {
1557 Err(Error::Network(msg)) => {
1558 assert!(
1559 msg.contains("Failed to read CA bundle"),
1560 "Unexpected error message: {msg}"
1561 );
1562 }
1563 Ok(_) => panic!("Expected Error::Network for invalid path, got Ok(_)"),
1564 Err(e) => panic!("Expected Error::Network for invalid path, got Err({e})"),
1565 }
1566 }
1567
1568 #[test]
1569 fn test_admin_client_invalid_ca_bundle_pem_surfaces_error() {
1570 let temp_dir = tempdir().expect("create temp dir");
1571 let bad_pem_path = temp_dir.path().join("bad-ca.pem");
1572 std::fs::write(
1573 &bad_pem_path,
1574 b"-----BEGIN CERTIFICATE-----\ninvalid-base64\n-----END CERTIFICATE-----\n",
1575 )
1576 .expect("write invalid PEM");
1577
1578 let mut alias = Alias::new("test", "https://localhost:9000", "access", "secret");
1579 alias.ca_bundle = Some(bad_pem_path.display().to_string());
1580
1581 let result = AdminClient::new(&alias);
1582 match result {
1583 Err(Error::Network(msg)) => {
1584 assert!(
1585 msg.contains("Invalid CA bundle") && msg.contains("bad-ca.pem"),
1586 "Unexpected error message for invalid PEM CA bundle: {msg}"
1587 );
1588 }
1589 Ok(_) => panic!("Expected Error::Network for invalid PEM, got Ok(_)"),
1590 Err(e) => panic!("Expected Error::Network for invalid PEM, got Err({e})"),
1591 }
1592 }
1593}