1use async_trait::async_trait;
7use aws_credential_types::Credentials;
8use aws_sigv4::http_request::{
9 SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
10};
11use aws_sigv4::sign::v4;
12use rc_core::admin::{
13 AdminApi, BucketQuota, ClusterInfo, CreateServiceAccountRequest, Group, GroupStatus,
14 HealScanMode, HealStartRequest, HealStatus, Policy, PolicyEntity, PolicyInfo, PoolStatus,
15 PoolTarget, RebalanceStartResult, RebalanceStatus, ServiceAccount,
16 ServiceAccountCreateResponse, UpdateGroupMembersRequest, User, UserStatus,
17};
18use rc_core::{Alias, Error, Result};
19use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
20use reqwest::{Client, Method, StatusCode};
21use serde::{Deserialize, Serialize};
22use sha2::{Digest, Sha256};
23use std::collections::HashMap;
24use std::time::SystemTime;
25
26pub struct AdminClient {
28 http_client: Client,
29 endpoint: String,
30 access_key: String,
31 secret_key: String,
32 region: String,
33}
34
35impl AdminClient {
36 pub fn new(alias: &Alias) -> Result<Self> {
38 let mut builder = Client::builder()
39 .danger_accept_invalid_certs(alias.insecure)
40 .tls_built_in_native_certs(true)
41 .tls_built_in_webpki_certs(true);
42
43 if let Some(bundle_path) = alias.ca_bundle.as_deref() {
44 let pem = std::fs::read(bundle_path).map_err(|e| {
45 Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
46 })?;
47 let certs = reqwest::Certificate::from_pem_bundle(&pem)
48 .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
49 if certs.is_empty() {
50 return Err(Error::Network(format!(
51 "Invalid CA bundle '{bundle_path}': no certificates found"
52 )));
53 }
54 for cert in certs {
55 builder = builder.add_root_certificate(cert);
56 }
57 }
58
59 let http_client = builder
60 .build()
61 .map_err(|e| Error::Network(format!("Failed to create HTTP client: {e}")))?;
62
63 Ok(Self {
64 http_client,
65 endpoint: alias.endpoint.trim_end_matches('/').to_string(),
66 access_key: alias.access_key.clone(),
67 secret_key: alias.secret_key.clone(),
68 region: alias.region.clone(),
69 })
70 }
71
72 fn admin_url(&self, path: &str) -> String {
74 format!("{}/rustfs/admin/v3{}", self.endpoint, path)
75 }
76
77 fn sha256_hash(body: &[u8]) -> String {
79 let mut hasher = Sha256::new();
80 hasher.update(body);
81 hex::encode(hasher.finalize())
82 }
83
84 async fn sign_request(
86 &self,
87 method: &Method,
88 url: &str,
89 headers: &HeaderMap,
90 body: &[u8],
91 ) -> Result<HeaderMap> {
92 let credentials = Credentials::new(
93 &self.access_key,
94 &self.secret_key,
95 None,
96 None,
97 "admin-client",
98 );
99
100 let identity = credentials.into();
101 let mut signing_settings = SigningSettings::default();
102 signing_settings.signature_location = SignatureLocation::Headers;
103
104 let signing_params = v4::SigningParams::builder()
105 .identity(&identity)
106 .region(&self.region)
107 .name("s3")
108 .time(SystemTime::now())
109 .settings(signing_settings)
110 .build()
111 .map_err(|e| Error::Auth(format!("Failed to build signing params: {e}")))?;
112
113 let header_pairs: Vec<(&str, &str)> = headers
115 .iter()
116 .filter_map(|(k, v)| v.to_str().ok().map(|v| (k.as_str(), v)))
117 .collect();
118
119 let signable_body = SignableBody::Bytes(body);
120
121 let signable_request = SignableRequest::new(
122 method.as_str(),
123 url,
124 header_pairs.into_iter(),
125 signable_body,
126 )
127 .map_err(|e| Error::Auth(format!("Failed to create signable request: {e}")))?;
128
129 let (signing_instructions, _signature) = sign(signable_request, &signing_params.into())
130 .map_err(|e| Error::Auth(format!("Failed to sign request: {e}")))?
131 .into_parts();
132
133 let mut signed_headers = headers.clone();
135 for (name, value) in signing_instructions.headers() {
136 let header_name = HeaderName::try_from(&name.to_string())
137 .map_err(|e| Error::Auth(format!("Invalid header name: {e}")))?;
138 let header_value = HeaderValue::try_from(&value.to_string())
139 .map_err(|e| Error::Auth(format!("Invalid header value: {e}")))?;
140 signed_headers.insert(header_name, header_value);
141 }
142
143 Ok(signed_headers)
144 }
145
146 async fn request<T: for<'de> Deserialize<'de>>(
148 &self,
149 method: Method,
150 path: &str,
151 query: Option<&[(&str, &str)]>,
152 body: Option<&[u8]>,
153 ) -> Result<T> {
154 let mut url = self.admin_url(path);
155
156 if let Some(q) = query {
157 let query_string: String = q
158 .iter()
159 .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
160 .collect::<Vec<_>>()
161 .join("&");
162 if !query_string.is_empty() {
163 url.push('?');
164 url.push_str(&query_string);
165 }
166 }
167
168 let body_bytes = body.unwrap_or(&[]);
169 let content_hash = Self::sha256_hash(body_bytes);
170
171 let mut headers = HeaderMap::new();
172 headers.insert("x-amz-content-sha256", content_hash.parse().unwrap());
173 headers.insert("host", self.get_host().parse().unwrap());
174
175 if !body_bytes.is_empty() {
176 headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
177 }
178
179 let signed_headers = self
180 .sign_request(&method, &url, &headers, body_bytes)
181 .await?;
182
183 let mut request_builder = self.http_client.request(method.clone(), &url);
184
185 for (name, value) in signed_headers.iter() {
186 request_builder = request_builder.header(name, value);
187 }
188
189 if !body_bytes.is_empty() {
190 request_builder = request_builder.body(body_bytes.to_vec());
191 }
192
193 let response = request_builder
194 .send()
195 .await
196 .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
197
198 let status = response.status();
199
200 if !status.is_success() {
201 let error_body = response
202 .text()
203 .await
204 .unwrap_or_else(|_| "Unknown error".to_string());
205 return Err(self.map_error(status, &error_body));
206 }
207
208 let text = response
209 .text()
210 .await
211 .map_err(|e| Error::Network(format!("Failed to read response: {e}")))?;
212
213 if text.is_empty() {
214 serde_json::from_str("null").map_err(Error::Json)
216 } else {
217 serde_json::from_str(&text).map_err(Error::Json)
218 }
219 }
220
221 async fn request_no_response(
223 &self,
224 method: Method,
225 path: &str,
226 query: Option<&[(&str, &str)]>,
227 body: Option<&[u8]>,
228 ) -> Result<()> {
229 let mut url = self.admin_url(path);
230
231 if let Some(q) = query {
232 let query_string: String = q
233 .iter()
234 .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
235 .collect::<Vec<_>>()
236 .join("&");
237 if !query_string.is_empty() {
238 url.push('?');
239 url.push_str(&query_string);
240 }
241 }
242
243 let body_bytes = body.unwrap_or(&[]);
244 let content_hash = Self::sha256_hash(body_bytes);
245
246 let mut headers = HeaderMap::new();
247 headers.insert("x-amz-content-sha256", content_hash.parse().unwrap());
248 headers.insert("host", self.get_host().parse().unwrap());
249
250 if !body_bytes.is_empty() {
251 headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
252 }
253
254 let signed_headers = self
255 .sign_request(&method, &url, &headers, body_bytes)
256 .await?;
257
258 let mut request_builder = self.http_client.request(method.clone(), &url);
259
260 for (name, value) in signed_headers.iter() {
261 request_builder = request_builder.header(name, value);
262 }
263
264 if !body_bytes.is_empty() {
265 request_builder = request_builder.body(body_bytes.to_vec());
266 }
267
268 let response = request_builder
269 .send()
270 .await
271 .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
272
273 let status = response.status();
274
275 if !status.is_success() {
276 let error_body = response
277 .text()
278 .await
279 .unwrap_or_else(|_| "Unknown error".to_string());
280 return Err(self.map_error(status, &error_body));
281 }
282
283 Ok(())
284 }
285
286 fn get_host(&self) -> String {
288 self.endpoint
289 .trim_start_matches("http://")
290 .trim_start_matches("https://")
291 .to_string()
292 }
293
294 fn map_error(&self, status: StatusCode, body: &str) -> Error {
296 match status {
297 StatusCode::NOT_FOUND => Error::NotFound(body.to_string()),
298 StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => Error::Auth(body.to_string()),
299 StatusCode::CONFLICT => Error::Conflict(body.to_string()),
300 StatusCode::BAD_REQUEST => Error::General(format!("Bad request: {body}")),
301 _ => Error::Network(format!("HTTP {}: {}", status.as_u16(), body)),
302 }
303 }
304}
305
306#[derive(Debug, Deserialize)]
308struct UserListResponse(HashMap<String, UserInfo>);
309
310#[derive(Debug, Deserialize)]
311#[serde(rename_all = "camelCase")]
312struct UserInfo {
313 #[serde(default)]
314 status: String,
315 #[serde(default)]
316 policy_name: Option<String>,
317 #[serde(default)]
318 member_of: Option<Vec<String>>,
319}
320
321#[derive(Debug, Deserialize)]
323struct PolicyListResponse(HashMap<String, serde_json::Value>);
324
325#[derive(Debug, Serialize)]
327#[serde(rename_all = "camelCase")]
328struct CreateUserRequest {
329 secret_key: String,
330 status: String,
331}
332
333#[derive(Debug, Serialize)]
335struct CreateGroupRequest {
336 group: String,
337 #[serde(skip_serializing_if = "Option::is_none")]
338 members: Option<Vec<String>>,
339}
340
341#[derive(Debug, Deserialize)]
343struct ServiceAccountListResponse {
344 accounts: Option<Vec<ServiceAccountInfo>>,
345}
346
347#[derive(Debug, Deserialize)]
348#[serde(rename_all = "camelCase")]
349struct ServiceAccountInfo {
350 access_key: String,
351 #[serde(default)]
352 parent_user: Option<String>,
353 #[serde(default)]
354 account_status: Option<String>,
355 #[serde(default)]
356 expiration: Option<String>,
357 #[serde(default)]
358 name: Option<String>,
359 #[serde(default)]
360 description: Option<String>,
361 #[serde(default)]
362 implied_policy: Option<bool>,
363}
364
365#[derive(Debug, Deserialize)]
366#[serde(rename_all = "camelCase")]
367struct BackgroundHealStatusResponse {
368 #[serde(default)]
369 bitrot_start_time: Option<String>,
370}
371
372impl From<BackgroundHealStatusResponse> for HealStatus {
373 fn from(response: BackgroundHealStatusResponse) -> Self {
374 Self {
375 healing: response.bitrot_start_time.is_some(),
376 started: response.bitrot_start_time,
377 ..Default::default()
378 }
379 }
380}
381
382#[derive(Debug, Serialize)]
383struct RustfsHealOptions {
384 recursive: bool,
385 #[serde(rename = "dryRun")]
386 dry_run: bool,
387 remove: bool,
388 recreate: bool,
389 #[serde(rename = "scanMode")]
390 scan_mode: u8,
391 #[serde(rename = "updateParity")]
392 update_parity: bool,
393 #[serde(rename = "nolock")]
394 no_lock: bool,
395}
396
397impl From<&HealStartRequest> for RustfsHealOptions {
398 fn from(request: &HealStartRequest) -> Self {
399 Self {
400 recursive: false,
401 dry_run: request.dry_run,
402 remove: request.remove,
403 recreate: request.recreate,
404 scan_mode: rustfs_heal_scan_mode(request.scan_mode),
405 update_parity: false,
406 no_lock: false,
407 }
408 }
409}
410
411fn rustfs_heal_scan_mode(scan_mode: HealScanMode) -> u8 {
412 match scan_mode {
413 HealScanMode::Normal => 1,
414 HealScanMode::Deep => 2,
415 }
416}
417
418fn rustfs_heal_path(request: &HealStartRequest) -> Result<String> {
419 let bucket = request
420 .bucket
421 .as_deref()
422 .filter(|bucket| !bucket.is_empty());
423 let prefix = request
424 .prefix
425 .as_deref()
426 .filter(|prefix| !prefix.is_empty());
427
428 match (bucket, prefix) {
429 (None, None) => Ok("/heal/".to_string()),
430 (Some(bucket), None) => Ok(format!("/heal/{}", urlencoding::encode(bucket))),
431 (Some(bucket), Some(prefix)) => Ok(format!(
432 "/heal/{}/{}",
433 urlencoding::encode(bucket),
434 urlencoding::encode(prefix)
435 )),
436 (None, Some(_)) => Err(Error::InvalidPath(
437 "heal prefix requires a bucket target".to_string(),
438 )),
439 }
440}
441
442fn rustfs_heal_body(request: &HealStartRequest) -> Result<Vec<u8>> {
443 serde_json::to_vec(&RustfsHealOptions::from(request)).map_err(Error::Json)
444}
445
446fn pool_target_query(target: &PoolTarget) -> Vec<(&str, &str)> {
447 let mut query = vec![("pool", target.pool.as_str())];
448 if target.by_id {
449 query.push(("by-id", "true"));
450 }
451 query
452}
453
454#[derive(Debug, Serialize)]
456#[serde(rename_all = "camelCase")]
457struct SetBucketQuotaApiRequest {
458 quota: u64,
459 quota_type: String,
460}
461
462#[async_trait]
463impl AdminApi for AdminClient {
464 async fn cluster_info(&self) -> Result<ClusterInfo> {
467 self.request(Method::GET, "/info", None, None).await
468 }
469
470 async fn heal_status(&self) -> Result<HealStatus> {
471 let response: BackgroundHealStatusResponse = self
472 .request(Method::POST, "/background-heal/status", None, None)
473 .await?;
474 Ok(response.into())
475 }
476
477 async fn heal_start(&self, request: HealStartRequest) -> Result<HealStatus> {
478 let path = rustfs_heal_path(&request)?;
479 let body = rustfs_heal_body(&request)?;
480 self.request_no_response(Method::POST, &path, None, Some(&body))
481 .await?;
482 Ok(HealStatus::default())
483 }
484
485 async fn heal_stop(&self) -> Result<()> {
486 let body = rustfs_heal_body(&HealStartRequest::default())?;
487 self.request_no_response(
488 Method::POST,
489 "/heal/",
490 Some(&[("forceStop", "true")]),
491 Some(&body),
492 )
493 .await
494 }
495
496 async fn list_pools(&self) -> Result<Vec<PoolStatus>> {
497 self.request(Method::GET, "/pools/list", None, None).await
498 }
499
500 async fn pool_status(&self, target: PoolTarget) -> Result<PoolStatus> {
501 let query = pool_target_query(&target);
502 self.request(Method::GET, "/pools/status", Some(&query), None)
503 .await
504 }
505
506 async fn decommission_start(&self, target: PoolTarget) -> Result<()> {
507 let query = pool_target_query(&target);
508 self.request_no_response(Method::POST, "/pools/decommission", Some(&query), None)
509 .await
510 }
511
512 async fn decommission_cancel(&self, target: PoolTarget) -> Result<()> {
513 let query = pool_target_query(&target);
514 self.request_no_response(Method::POST, "/pools/cancel", Some(&query), None)
515 .await
516 }
517
518 async fn rebalance_start(&self) -> Result<RebalanceStartResult> {
519 self.request(Method::POST, "/rebalance/start", None, None)
520 .await
521 }
522
523 async fn rebalance_status(&self) -> Result<RebalanceStatus> {
524 self.request(Method::GET, "/rebalance/status", None, None)
525 .await
526 }
527
528 async fn rebalance_stop(&self) -> Result<()> {
529 self.request_no_response(Method::POST, "/rebalance/stop", None, None)
530 .await
531 }
532
533 async fn list_users(&self) -> Result<Vec<User>> {
536 let response: UserListResponse =
537 self.request(Method::GET, "/list-users", None, None).await?;
538
539 Ok(response
540 .0
541 .into_iter()
542 .map(|(access_key, info)| User {
543 access_key,
544 secret_key: None,
545 status: if info.status == "disabled" {
546 UserStatus::Disabled
547 } else {
548 UserStatus::Enabled
549 },
550 policy_name: info.policy_name,
551 member_of: info.member_of.unwrap_or_default(),
552 })
553 .collect())
554 }
555
556 async fn get_user(&self, access_key: &str) -> Result<User> {
557 let query = [("accessKey", access_key)];
558 let response: UserInfo = self
559 .request(Method::GET, "/user-info", Some(&query), None)
560 .await?;
561
562 Ok(User {
563 access_key: access_key.to_string(),
564 secret_key: None,
565 status: if response.status == "disabled" {
566 UserStatus::Disabled
567 } else {
568 UserStatus::Enabled
569 },
570 policy_name: response.policy_name,
571 member_of: response.member_of.unwrap_or_default(),
572 })
573 }
574
575 async fn create_user(&self, access_key: &str, secret_key: &str) -> Result<User> {
576 let query = [("accessKey", access_key)];
577 let body = serde_json::to_vec(&CreateUserRequest {
578 secret_key: secret_key.to_string(),
579 status: "enabled".to_string(),
580 })
581 .map_err(Error::Json)?;
582
583 self.request_no_response(Method::PUT, "/add-user", Some(&query), Some(&body))
584 .await?;
585
586 Ok(User {
587 access_key: access_key.to_string(),
588 secret_key: Some(secret_key.to_string()),
589 status: UserStatus::Enabled,
590 policy_name: None,
591 member_of: vec![],
592 })
593 }
594
595 async fn delete_user(&self, access_key: &str) -> Result<()> {
596 let query = [("accessKey", access_key)];
597 self.request_no_response(Method::DELETE, "/remove-user", Some(&query), None)
598 .await
599 }
600
601 async fn set_user_status(&self, access_key: &str, status: UserStatus) -> Result<()> {
602 let status_str = match status {
603 UserStatus::Enabled => "enabled",
604 UserStatus::Disabled => "disabled",
605 };
606 let query = [("accessKey", access_key), ("status", status_str)];
607 self.request_no_response(Method::PUT, "/set-user-status", Some(&query), None)
608 .await
609 }
610
611 async fn list_policies(&self) -> Result<Vec<PolicyInfo>> {
614 let response: PolicyListResponse = self
615 .request(Method::GET, "/list-canned-policies", None, None)
616 .await?;
617
618 Ok(response
619 .0
620 .into_keys()
621 .map(|name| PolicyInfo { name })
622 .collect())
623 }
624
625 async fn get_policy(&self, name: &str) -> Result<Policy> {
626 let query = [("name", name)];
627 let policy_doc: serde_json::Value = self
628 .request(Method::GET, "/info-canned-policy", Some(&query), None)
629 .await?;
630
631 Ok(Policy {
632 name: name.to_string(),
633 policy: serde_json::to_string_pretty(&policy_doc).unwrap_or_default(),
634 })
635 }
636
637 async fn create_policy(&self, name: &str, policy_document: &str) -> Result<()> {
638 let query = [("name", name)];
639 let body = policy_document.as_bytes();
640 self.request_no_response(Method::PUT, "/add-canned-policy", Some(&query), Some(body))
641 .await
642 }
643
644 async fn delete_policy(&self, name: &str) -> Result<()> {
645 let query = [("name", name)];
646 self.request_no_response(Method::DELETE, "/remove-canned-policy", Some(&query), None)
647 .await
648 }
649
650 async fn attach_policy(
651 &self,
652 policy_names: &[String],
653 entity_type: PolicyEntity,
654 entity_name: &str,
655 ) -> Result<()> {
656 let policy_name = policy_names.join(",");
657 let is_group = entity_type == PolicyEntity::Group;
658
659 let query = [
660 ("policyName", policy_name.as_str()),
661 ("userOrGroup", entity_name),
662 ("isGroup", if is_group { "true" } else { "false" }),
663 ];
664
665 self.request_no_response(Method::PUT, "/set-user-or-group-policy", Some(&query), None)
666 .await
667 }
668
669 async fn detach_policy(
670 &self,
671 policy_names: &[String],
672 entity_type: PolicyEntity,
673 entity_name: &str,
674 ) -> Result<()> {
675 let _ = (policy_names, entity_type, entity_name);
679 Err(Error::UnsupportedFeature(
680 "Policy detach not directly supported. Use attach with remaining policies instead."
681 .to_string(),
682 ))
683 }
684
685 async fn list_groups(&self) -> Result<Vec<String>> {
688 let response: Vec<String> = self.request(Method::GET, "/groups", None, None).await?;
689 Ok(response)
690 }
691
692 async fn get_group(&self, name: &str) -> Result<Group> {
693 let query = [("group", name)];
694 let response: Group = self
695 .request(Method::GET, "/group", Some(&query), None)
696 .await?;
697 Ok(response)
698 }
699
700 async fn create_group(&self, name: &str, members: Option<&[String]>) -> Result<Group> {
701 let body = serde_json::to_vec(&CreateGroupRequest {
702 group: name.to_string(),
703 members: members.map(|m| m.to_vec()),
704 })
705 .map_err(Error::Json)?;
706
707 self.request_no_response(Method::POST, "/groups", None, Some(&body))
708 .await?;
709
710 Ok(Group {
711 name: name.to_string(),
712 policy: None,
713 members: members.map(|m| m.to_vec()).unwrap_or_default(),
714 status: GroupStatus::Enabled,
715 })
716 }
717
718 async fn delete_group(&self, name: &str) -> Result<()> {
719 let path = format!("/group/{}", urlencoding::encode(name));
720 self.request_no_response(Method::DELETE, &path, None, None)
721 .await
722 }
723
724 async fn set_group_status(&self, name: &str, status: GroupStatus) -> Result<()> {
725 let status_str = match status {
726 GroupStatus::Enabled => "enabled",
727 GroupStatus::Disabled => "disabled",
728 };
729 let query = [("group", name), ("status", status_str)];
730 self.request_no_response(Method::PUT, "/set-group-status", Some(&query), None)
731 .await
732 }
733
734 async fn add_group_members(&self, group: &str, members: &[String]) -> Result<()> {
735 let body = serde_json::to_vec(&UpdateGroupMembersRequest {
736 group: group.to_string(),
737 members: members.to_vec(),
738 is_remove: false,
739 status: "enabled".to_string(),
740 })
741 .map_err(Error::Json)?;
742
743 self.request_no_response(Method::PUT, "/update-group-members", None, Some(&body))
744 .await
745 }
746
747 async fn remove_group_members(&self, group: &str, members: &[String]) -> Result<()> {
748 let body = serde_json::to_vec(&UpdateGroupMembersRequest {
749 group: group.to_string(),
750 members: members.to_vec(),
751 is_remove: true,
752 status: "enabled".to_string(),
753 })
754 .map_err(Error::Json)?;
755
756 self.request_no_response(Method::PUT, "/update-group-members", None, Some(&body))
757 .await
758 }
759
760 async fn list_service_accounts(&self, user: Option<&str>) -> Result<Vec<ServiceAccount>> {
763 let query: Vec<(&str, &str)> = user.map(|u| vec![("user", u)]).unwrap_or_default();
764 let query_ref: Option<&[(&str, &str)]> = if query.is_empty() { None } else { Some(&query) };
765
766 let response: ServiceAccountListResponse = self
767 .request(Method::GET, "/list-service-accounts", query_ref, None)
768 .await?;
769
770 Ok(response
771 .accounts
772 .unwrap_or_default()
773 .into_iter()
774 .map(|sa| ServiceAccount {
775 access_key: sa.access_key,
776 secret_key: None,
777 parent_user: sa.parent_user,
778 policy: None,
779 account_status: sa.account_status,
780 expiration: sa.expiration,
781 name: sa.name,
782 description: sa.description,
783 implied_policy: sa.implied_policy,
784 })
785 .collect())
786 }
787
788 async fn get_service_account(&self, access_key: &str) -> Result<ServiceAccount> {
789 let query = [("accessKey", access_key)];
790 let response: ServiceAccount = self
791 .request(Method::GET, "/info-service-account", Some(&query), None)
792 .await?;
793
794 let mut response = response;
795 if response.access_key.is_empty() {
796 response.access_key = access_key.to_string();
797 }
798 Ok(response)
799 }
800
801 async fn create_service_account(
802 &self,
803 request: CreateServiceAccountRequest,
804 ) -> Result<ServiceAccount> {
805 let body = serde_json::to_vec(&request).map_err(Error::Json)?;
806 let response: ServiceAccountCreateResponse = self
807 .request(Method::PUT, "/add-service-accounts", None, Some(&body))
808 .await?;
809
810 Ok(ServiceAccount {
811 access_key: response.credentials.access_key,
812 secret_key: Some(response.credentials.secret_key),
813 expiration: response.credentials.expiration,
814 parent_user: None,
815 policy: None,
816 account_status: None,
817 name: None,
818 description: None,
819 implied_policy: None,
820 })
821 }
822
823 async fn delete_service_account(&self, access_key: &str) -> Result<()> {
824 let query = [("accessKey", access_key)];
825 self.request_no_response(
826 Method::DELETE,
827 "/delete-service-accounts",
828 Some(&query),
829 None,
830 )
831 .await
832 }
833
834 async fn set_bucket_quota(&self, bucket: &str, quota: u64) -> Result<BucketQuota> {
837 let path = format!("/quota/{}", urlencoding::encode(bucket));
838 let body = serde_json::to_vec(&SetBucketQuotaApiRequest {
839 quota,
840 quota_type: "HARD".to_string(),
841 })
842 .map_err(Error::Json)?;
843
844 self.request(Method::PUT, &path, None, Some(&body)).await
845 }
846
847 async fn get_bucket_quota(&self, bucket: &str) -> Result<BucketQuota> {
848 let path = format!("/quota/{}", urlencoding::encode(bucket));
849 self.request(Method::GET, &path, None, None).await
850 }
851
852 async fn clear_bucket_quota(&self, bucket: &str) -> Result<BucketQuota> {
853 let path = format!("/quota/{}", urlencoding::encode(bucket));
854 self.request(Method::DELETE, &path, None, None).await
855 }
856
857 async fn list_tiers(&self) -> Result<Vec<rc_core::admin::TierConfig>> {
860 self.request(Method::GET, "/tier", None, None).await
861 }
862
863 async fn tier_stats(&self) -> Result<serde_json::Value> {
864 self.request(Method::GET, "/tier-stats", None, None).await
865 }
866
867 async fn add_tier(&self, config: rc_core::admin::TierConfig) -> Result<()> {
868 let body = serde_json::to_vec(&config).map_err(Error::Json)?;
869 self.request_no_response(Method::PUT, "/tier", None, Some(&body))
870 .await
871 }
872
873 async fn edit_tier(&self, name: &str, creds: rc_core::admin::TierCreds) -> Result<()> {
874 let path = format!("/tier/{}", urlencoding::encode(name));
875 let body = serde_json::to_vec(&creds).map_err(Error::Json)?;
876 self.request_no_response(Method::POST, &path, None, Some(&body))
877 .await
878 }
879
880 async fn remove_tier(&self, name: &str, force: bool) -> Result<()> {
881 let path = format!("/tier/{}", urlencoding::encode(name));
882 if force {
883 let query: &[(&str, &str)] = &[("force", "true")];
884 self.request_no_response(Method::DELETE, &path, Some(query), None)
885 .await
886 } else {
887 self.request_no_response(Method::DELETE, &path, None, None)
888 .await
889 }
890 }
891
892 async fn set_remote_target(
895 &self,
896 bucket: &str,
897 target: rc_core::replication::BucketTarget,
898 update: bool,
899 ) -> Result<String> {
900 let body = serde_json::to_vec(&target).map_err(Error::Json)?;
901 if update {
902 let query: &[(&str, &str)] = &[("bucket", bucket), ("update", "true")];
903 self.request(Method::PUT, "/set-remote-target", Some(query), Some(&body))
904 .await
905 } else {
906 let query: &[(&str, &str)] = &[("bucket", bucket)];
907 self.request(Method::PUT, "/set-remote-target", Some(query), Some(&body))
908 .await
909 }
910 }
911
912 async fn list_remote_targets(
913 &self,
914 bucket: &str,
915 ) -> Result<Vec<rc_core::replication::BucketTarget>> {
916 let query: &[(&str, &str)] = &[("bucket", bucket)];
917 self.request(Method::GET, "/list-remote-targets", Some(query), None)
918 .await
919 }
920
921 async fn remove_remote_target(&self, bucket: &str, arn: &str) -> Result<()> {
922 let query: &[(&str, &str)] = &[("bucket", bucket), ("arn", arn)];
923 self.request_no_response(Method::DELETE, "/remove-remote-target", Some(query), None)
924 .await
925 }
926
927 async fn replication_metrics(&self, bucket: &str) -> Result<serde_json::Value> {
928 let query: &[(&str, &str)] = &[("bucket", bucket)];
929 self.request(Method::GET, "/replicationmetrics", Some(query), None)
930 .await
931 }
932}
933
934#[cfg(test)]
935mod tests {
936 use super::*;
937 use std::io::{Read, Write};
938 use std::net::{TcpListener, TcpStream};
939 use std::sync::mpsc;
940 use std::thread;
941 use tempfile::tempdir;
942
943 #[derive(Debug)]
944 struct CapturedAdminRequest {
945 method: String,
946 target: String,
947 body: Vec<u8>,
948 }
949
950 fn read_admin_request(stream: &mut TcpStream) -> CapturedAdminRequest {
951 let mut buffer = Vec::new();
952 let mut chunk = [0_u8; 1024];
953 let header_end = loop {
954 let read = stream.read(&mut chunk).expect("read HTTP request");
955 assert!(read > 0, "client closed connection before headers");
956 buffer.extend_from_slice(&chunk[..read]);
957
958 if let Some(position) = buffer.windows(4).position(|window| window == b"\r\n\r\n") {
959 break position + 4;
960 }
961 };
962
963 let headers = String::from_utf8_lossy(&buffer[..header_end]).into_owned();
964 let content_length = headers
965 .lines()
966 .find_map(|line| {
967 let (name, value) = line.split_once(':')?;
968 name.eq_ignore_ascii_case("content-length")
969 .then(|| value.trim().parse::<usize>().expect("valid content length"))
970 })
971 .unwrap_or(0);
972
973 while buffer.len() - header_end < content_length {
974 let read = stream.read(&mut chunk).expect("read HTTP request body");
975 assert!(read > 0, "client closed connection before body");
976 buffer.extend_from_slice(&chunk[..read]);
977 }
978
979 let request_line = headers.lines().next().expect("request line");
980 let mut parts = request_line.split_whitespace();
981 let method = parts.next().expect("request method").to_string();
982 let target = parts.next().expect("request target").to_string();
983 let body = buffer[header_end..header_end + content_length].to_vec();
984
985 CapturedAdminRequest {
986 method,
987 target,
988 body,
989 }
990 }
991
992 fn start_admin_test_server(
993 response_status: &str,
994 response_body: &'static str,
995 ) -> (
996 String,
997 mpsc::Receiver<CapturedAdminRequest>,
998 thread::JoinHandle<()>,
999 ) {
1000 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
1001 let endpoint = format!("http://{}", listener.local_addr().expect("local addr"));
1002 let (sender, receiver) = mpsc::channel();
1003 let response_status = response_status.to_string();
1004
1005 let handle = thread::spawn(move || {
1006 let (mut stream, _) = listener.accept().expect("accept request");
1007 let request = read_admin_request(&mut stream);
1008 sender.send(request).expect("send captured request");
1009
1010 let response = format!(
1011 "HTTP/1.1 {response_status}\r\ncontent-length: {}\r\ncontent-type: application/json\r\nconnection: close\r\n\r\n{response_body}",
1012 response_body.len()
1013 );
1014 stream
1015 .write_all(response.as_bytes())
1016 .expect("write HTTP response");
1017 });
1018
1019 (endpoint, receiver, handle)
1020 }
1021
1022 fn admin_client_for_endpoint(endpoint: &str) -> AdminClient {
1023 let alias = Alias::new("test", endpoint, "access", "secret");
1024 AdminClient::new(&alias).expect("admin client should build")
1025 }
1026
1027 fn assert_heal_options_body(
1028 body: &[u8],
1029 scan_mode: u8,
1030 remove: bool,
1031 recreate: bool,
1032 dry_run: bool,
1033 ) {
1034 let value: serde_json::Value =
1035 serde_json::from_slice(body).expect("heal request body should be JSON");
1036
1037 assert_eq!(value["recursive"], false);
1038 assert_eq!(value["dryRun"], dry_run);
1039 assert_eq!(value["remove"], remove);
1040 assert_eq!(value["recreate"], recreate);
1041 assert_eq!(value["scanMode"], scan_mode);
1042 assert_eq!(value["updateParity"], false);
1043 assert_eq!(value["nolock"], false);
1044 assert!(value.get("bucket").is_none());
1045 assert!(value.get("prefix").is_none());
1046 }
1047
1048 #[test]
1049 fn test_admin_url_construction() {
1050 let alias = Alias::new("test", "http://localhost:9000", "access", "secret");
1051 let client = AdminClient::new(&alias).unwrap();
1052
1053 assert_eq!(
1054 client.admin_url("/list-users"),
1055 "http://localhost:9000/rustfs/admin/v3/list-users"
1056 );
1057 }
1058
1059 #[test]
1060 fn test_admin_url_with_trailing_slash() {
1061 let alias = Alias::new("test", "http://localhost:9000/", "access", "secret");
1062 let client = AdminClient::new(&alias).unwrap();
1063
1064 assert_eq!(
1065 client.admin_url("/list-users"),
1066 "http://localhost:9000/rustfs/admin/v3/list-users"
1067 );
1068 }
1069
1070 #[test]
1071 fn test_get_host() {
1072 let alias = Alias::new("test", "https://s3.example.com", "access", "secret");
1073 let client = AdminClient::new(&alias).unwrap();
1074
1075 assert_eq!(client.get_host(), "s3.example.com");
1076 }
1077
1078 #[test]
1079 fn test_sha256_hash() {
1080 let hash = AdminClient::sha256_hash(b"test");
1081 assert_eq!(
1082 hash,
1083 "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
1084 );
1085 }
1086
1087 #[test]
1088 fn test_sha256_hash_empty() {
1089 let hash = AdminClient::sha256_hash(b"");
1090 assert_eq!(
1091 hash,
1092 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
1093 );
1094 }
1095
1096 #[test]
1097 fn test_rustfs_heal_path_matches_admin_routes() {
1098 assert_eq!(
1099 rustfs_heal_path(&HealStartRequest::default()).expect("root path"),
1100 "/heal/"
1101 );
1102
1103 let bucket_request = HealStartRequest {
1104 bucket: Some("photos".to_string()),
1105 ..Default::default()
1106 };
1107 assert_eq!(
1108 rustfs_heal_path(&bucket_request).expect("bucket path"),
1109 "/heal/photos"
1110 );
1111
1112 let prefix_request = HealStartRequest {
1113 bucket: Some("photos".to_string()),
1114 prefix: Some("2026/raw".to_string()),
1115 ..Default::default()
1116 };
1117 assert_eq!(
1118 rustfs_heal_path(&prefix_request).expect("prefix path"),
1119 "/heal/photos/2026%2Fraw"
1120 );
1121
1122 let invalid_request = HealStartRequest {
1123 prefix: Some("2026/raw".to_string()),
1124 ..Default::default()
1125 };
1126 assert!(matches!(
1127 rustfs_heal_path(&invalid_request),
1128 Err(Error::InvalidPath(_))
1129 ));
1130 }
1131
1132 #[test]
1133 fn test_rustfs_heal_body_matches_server_heal_options() {
1134 let request = HealStartRequest {
1135 scan_mode: HealScanMode::Deep,
1136 remove: true,
1137 recreate: true,
1138 dry_run: true,
1139 ..Default::default()
1140 };
1141
1142 let body = rustfs_heal_body(&request).expect("heal options should serialize");
1143 let value: serde_json::Value =
1144 serde_json::from_slice(&body).expect("heal options body should be JSON");
1145
1146 assert_eq!(value["recursive"], false);
1147 assert_eq!(value["dryRun"], true);
1148 assert_eq!(value["remove"], true);
1149 assert_eq!(value["recreate"], true);
1150 assert_eq!(value["scanMode"], 2);
1151 assert_eq!(value["updateParity"], false);
1152 assert_eq!(value["nolock"], false);
1153 assert!(value.get("bucket").is_none());
1154 assert!(value.get("prefix").is_none());
1155 }
1156
1157 #[test]
1158 fn test_background_heal_status_response_maps_to_heal_status() {
1159 let status = HealStatus::from(BackgroundHealStatusResponse {
1160 bitrot_start_time: Some("2026-04-19T10:00:00Z".to_string()),
1161 });
1162
1163 assert!(status.healing);
1164 assert_eq!(status.started.as_deref(), Some("2026-04-19T10:00:00Z"));
1165
1166 let idle = HealStatus::from(BackgroundHealStatusResponse {
1167 bitrot_start_time: None,
1168 });
1169 assert!(!idle.healing);
1170 assert!(idle.started.is_none());
1171 }
1172
1173 #[test]
1174 fn test_bad_request_maps_to_general_admin_error() {
1175 let alias = Alias::new("test", "http://localhost:9000", "access", "secret");
1176 let client = AdminClient::new(&alias).expect("admin client should build");
1177
1178 let error = client.map_error(StatusCode::BAD_REQUEST, "err request body parse");
1179 assert!(matches!(error, Error::General(_)));
1180 assert_eq!(error.to_string(), "Bad request: err request body parse");
1181 }
1182
1183 #[tokio::test]
1184 async fn test_heal_status_uses_background_heal_status_endpoint() {
1185 let (endpoint, receiver, handle) =
1186 start_admin_test_server("200 OK", r#"{"bitrotStartTime":"2026-04-19T10:00:00Z"}"#);
1187 let client = admin_client_for_endpoint(&endpoint);
1188
1189 let status = client.heal_status().await.expect("heal status request");
1190
1191 assert!(status.healing);
1192 assert_eq!(status.started.as_deref(), Some("2026-04-19T10:00:00Z"));
1193
1194 let request = receiver.recv().expect("captured request");
1195 assert_eq!(request.method, "POST");
1196 assert_eq!(request.target, "/rustfs/admin/v3/background-heal/status");
1197 assert!(request.body.is_empty());
1198 handle.join().expect("server thread should finish");
1199 }
1200
1201 #[tokio::test]
1202 async fn test_heal_start_posts_to_bucket_prefix_route_with_options_body() {
1203 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1204 let client = admin_client_for_endpoint(&endpoint);
1205 let request = HealStartRequest {
1206 bucket: Some("raw photos".to_string()),
1207 prefix: Some("2026/april".to_string()),
1208 scan_mode: HealScanMode::Deep,
1209 remove: true,
1210 recreate: true,
1211 dry_run: true,
1212 };
1213
1214 let status = client
1215 .heal_start(request)
1216 .await
1217 .expect("heal start request");
1218
1219 assert!(!status.healing);
1220 assert!(status.heal_id.is_empty());
1221 assert!(status.started.is_none());
1222
1223 let request = receiver.recv().expect("captured request");
1224 assert_eq!(request.method, "POST");
1225 assert_eq!(
1226 request.target,
1227 "/rustfs/admin/v3/heal/raw%20photos/2026%2Fapril"
1228 );
1229 assert_heal_options_body(&request.body, 2, true, true, true);
1230 handle.join().expect("server thread should finish");
1231 }
1232
1233 #[tokio::test]
1234 async fn test_heal_stop_posts_force_stop_to_root_heal_route() {
1235 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1236 let client = admin_client_for_endpoint(&endpoint);
1237
1238 client.heal_stop().await.expect("heal stop request");
1239
1240 let request = receiver.recv().expect("captured request");
1241 assert_eq!(request.method, "POST");
1242 assert_eq!(request.target, "/rustfs/admin/v3/heal/?forceStop=true");
1243 assert_heal_options_body(&request.body, 1, false, false, false);
1244 handle.join().expect("server thread should finish");
1245 }
1246
1247 #[tokio::test]
1248 async fn test_pool_status_uses_pool_status_route_with_by_id_query() {
1249 let (endpoint, receiver, handle) = start_admin_test_server(
1250 "200 OK",
1251 r#"{"id":1,"cmdline":"/data/pool1/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}"#,
1252 );
1253 let client = admin_client_for_endpoint(&endpoint);
1254
1255 let status = client
1256 .pool_status(PoolTarget {
1257 pool: "1".to_string(),
1258 by_id: true,
1259 })
1260 .await
1261 .expect("pool status request");
1262
1263 assert_eq!(status.id, 1);
1264 assert_eq!(status.cmd_line, "/data/pool1/disk{1...4}");
1265
1266 let request = receiver.recv().expect("captured request");
1267 assert_eq!(request.method, "GET");
1268 assert_eq!(
1269 request.target,
1270 "/rustfs/admin/v3/pools/status?pool=1&by-id=true"
1271 );
1272 assert!(request.body.is_empty());
1273 handle.join().expect("server thread should finish");
1274 }
1275
1276 #[tokio::test]
1277 async fn test_pool_status_uses_command_line_query_without_by_id() {
1278 let (endpoint, receiver, handle) = start_admin_test_server(
1279 "200 OK",
1280 r#"{"id":2,"cmdline":"/data/pool2/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}"#,
1281 );
1282 let client = admin_client_for_endpoint(&endpoint);
1283
1284 let status = client
1285 .pool_status(PoolTarget {
1286 pool: "/data/pool2/disk{1...4}".to_string(),
1287 by_id: false,
1288 })
1289 .await
1290 .expect("pool status request");
1291
1292 assert_eq!(status.id, 2);
1293 assert_eq!(status.cmd_line, "/data/pool2/disk{1...4}");
1294
1295 let request = receiver.recv().expect("captured request");
1296 assert_eq!(request.method, "GET");
1297 assert_eq!(
1298 request.target,
1299 "/rustfs/admin/v3/pools/status?pool=%2Fdata%2Fpool2%2Fdisk%7B1...4%7D"
1300 );
1301 assert!(request.body.is_empty());
1302 handle.join().expect("server thread should finish");
1303 }
1304
1305 #[tokio::test]
1306 async fn test_list_pools_uses_pool_list_route() {
1307 let (endpoint, receiver, handle) = start_admin_test_server(
1308 "200 OK",
1309 r#"[{"id":0,"cmdline":"/data/pool0/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}]"#,
1310 );
1311 let client = admin_client_for_endpoint(&endpoint);
1312
1313 let pools = client.list_pools().await.expect("list pools request");
1314
1315 assert_eq!(pools.len(), 1);
1316 assert_eq!(pools[0].id, 0);
1317 assert_eq!(pools[0].cmd_line, "/data/pool0/disk{1...4}");
1318
1319 let request = receiver.recv().expect("captured request");
1320 assert_eq!(request.method, "GET");
1321 assert_eq!(request.target, "/rustfs/admin/v3/pools/list");
1322 assert!(request.body.is_empty());
1323 handle.join().expect("server thread should finish");
1324 }
1325
1326 #[tokio::test]
1327 async fn test_decommission_start_posts_pool_query() {
1328 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1329 let client = admin_client_for_endpoint(&endpoint);
1330
1331 client
1332 .decommission_start(PoolTarget {
1333 pool: "/data/pool1/disk{1...4}".to_string(),
1334 by_id: false,
1335 })
1336 .await
1337 .expect("decommission start request");
1338
1339 let request = receiver.recv().expect("captured request");
1340 assert_eq!(request.method, "POST");
1341 assert_eq!(
1342 request.target,
1343 "/rustfs/admin/v3/pools/decommission?pool=%2Fdata%2Fpool1%2Fdisk%7B1...4%7D"
1344 );
1345 assert!(request.body.is_empty());
1346 handle.join().expect("server thread should finish");
1347 }
1348
1349 #[tokio::test]
1350 async fn test_decommission_start_posts_by_id_multi_pool_query() {
1351 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1352 let client = admin_client_for_endpoint(&endpoint);
1353
1354 client
1355 .decommission_start(PoolTarget {
1356 pool: "1,2".to_string(),
1357 by_id: true,
1358 })
1359 .await
1360 .expect("decommission start request");
1361
1362 let request = receiver.recv().expect("captured request");
1363 assert_eq!(request.method, "POST");
1364 assert_eq!(
1365 request.target,
1366 "/rustfs/admin/v3/pools/decommission?pool=1%2C2&by-id=true"
1367 );
1368 assert!(request.body.is_empty());
1369 handle.join().expect("server thread should finish");
1370 }
1371
1372 #[tokio::test]
1373 async fn test_decommission_cancel_posts_pool_cancel_route_with_by_id_query() {
1374 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1375 let client = admin_client_for_endpoint(&endpoint);
1376
1377 client
1378 .decommission_cancel(PoolTarget {
1379 pool: "1".to_string(),
1380 by_id: true,
1381 })
1382 .await
1383 .expect("decommission cancel request");
1384
1385 let request = receiver.recv().expect("captured request");
1386 assert_eq!(request.method, "POST");
1387 assert_eq!(
1388 request.target,
1389 "/rustfs/admin/v3/pools/cancel?pool=1&by-id=true"
1390 );
1391 assert!(request.body.is_empty());
1392 handle.join().expect("server thread should finish");
1393 }
1394
1395 #[tokio::test]
1396 async fn test_rebalance_start_posts_rebalance_start_route() {
1397 let (endpoint, receiver, handle) =
1398 start_admin_test_server("200 OK", r#"{"id":"rebalance-123"}"#);
1399 let client = admin_client_for_endpoint(&endpoint);
1400
1401 let result = client
1402 .rebalance_start()
1403 .await
1404 .expect("rebalance start request");
1405
1406 assert_eq!(result.id, "rebalance-123");
1407
1408 let request = receiver.recv().expect("captured request");
1409 assert_eq!(request.method, "POST");
1410 assert_eq!(request.target, "/rustfs/admin/v3/rebalance/start");
1411 assert!(request.body.is_empty());
1412 handle.join().expect("server thread should finish");
1413 }
1414
1415 #[tokio::test]
1416 async fn test_rebalance_status_gets_rebalance_status_route() {
1417 let (endpoint, receiver, handle) = start_admin_test_server(
1418 "200 OK",
1419 r#"{"id":"rebalance-123","pools":[],"stoppedAt":"2026-05-06T00:00:00Z"}"#,
1420 );
1421 let client = admin_client_for_endpoint(&endpoint);
1422
1423 let status = client
1424 .rebalance_status()
1425 .await
1426 .expect("rebalance status request");
1427
1428 assert_eq!(status.id, "rebalance-123");
1429 assert_eq!(status.stopped_at.as_deref(), Some("2026-05-06T00:00:00Z"));
1430
1431 let request = receiver.recv().expect("captured request");
1432 assert_eq!(request.method, "GET");
1433 assert_eq!(request.target, "/rustfs/admin/v3/rebalance/status");
1434 assert!(request.body.is_empty());
1435 handle.join().expect("server thread should finish");
1436 }
1437
1438 #[tokio::test]
1439 async fn test_rebalance_stop_posts_rebalance_stop_route() {
1440 let (endpoint, receiver, handle) = start_admin_test_server("200 OK", "");
1441 let client = admin_client_for_endpoint(&endpoint);
1442
1443 client
1444 .rebalance_stop()
1445 .await
1446 .expect("rebalance stop request");
1447
1448 let request = receiver.recv().expect("captured request");
1449 assert_eq!(request.method, "POST");
1450 assert_eq!(request.target, "/rustfs/admin/v3/rebalance/stop");
1451 assert!(request.body.is_empty());
1452 handle.join().expect("server thread should finish");
1453 }
1454
1455 #[test]
1456 fn test_admin_client_invalid_ca_bundle_path_surfaces_error() {
1457 let mut alias = Alias::new("test", "https://localhost:9000", "access", "secret");
1458 alias.ca_bundle = Some("/definitely-not-here/ca.pem".to_string());
1459
1460 let result = AdminClient::new(&alias);
1461 match result {
1462 Err(Error::Network(msg)) => {
1463 assert!(
1464 msg.contains("Failed to read CA bundle"),
1465 "Unexpected error message: {msg}"
1466 );
1467 }
1468 Ok(_) => panic!("Expected Error::Network for invalid path, got Ok(_)"),
1469 Err(e) => panic!("Expected Error::Network for invalid path, got Err({e})"),
1470 }
1471 }
1472
1473 #[test]
1474 fn test_admin_client_invalid_ca_bundle_pem_surfaces_error() {
1475 let temp_dir = tempdir().expect("create temp dir");
1476 let bad_pem_path = temp_dir.path().join("bad-ca.pem");
1477 std::fs::write(
1478 &bad_pem_path,
1479 b"-----BEGIN CERTIFICATE-----\ninvalid-base64\n-----END CERTIFICATE-----\n",
1480 )
1481 .expect("write invalid PEM");
1482
1483 let mut alias = Alias::new("test", "https://localhost:9000", "access", "secret");
1484 alias.ca_bundle = Some(bad_pem_path.display().to_string());
1485
1486 let result = AdminClient::new(&alias);
1487 match result {
1488 Err(Error::Network(msg)) => {
1489 assert!(
1490 msg.contains("Invalid CA bundle") && msg.contains("bad-ca.pem"),
1491 "Unexpected error message for invalid PEM CA bundle: {msg}"
1492 );
1493 }
1494 Ok(_) => panic!("Expected Error::Network for invalid PEM, got Ok(_)"),
1495 Err(e) => panic!("Expected Error::Network for invalid PEM, got Err({e})"),
1496 }
1497 }
1498}