1use anyhow::{Context, Result};
7
8use crate::OssClient;
9use crate::types::*;
10
11impl OssClient {
12 pub async fn create_bucket(
14 &self,
15 bucket_key: &str,
16 policy: RetentionPolicy,
17 region: Region,
18 ) -> Result<Bucket> {
19 let token = self.auth.get_token().await?;
20 let url = format!("{}/buckets", self.config.oss_url());
21
22 let request = CreateBucketRequest {
23 bucket_key: bucket_key.to_string(),
24 policy_key: policy.to_string(),
25 };
26
27 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
28 self.http_client
29 .post(&url)
30 .bearer_auth(&token)
31 .header("x-ads-region", region.to_string())
32 .header("Content-Type", "application/json")
33 .json(&request)
34 })
35 .await?;
36
37 if !response.status().is_success() {
38 let status = response.status();
39 let error_text = response.text().await.unwrap_or_default();
40 anyhow::bail!("Failed to create bucket ({status}): {error_text}");
41 }
42
43 let bucket: Bucket = response
44 .json()
45 .await
46 .context("Failed to parse bucket response")?;
47
48 Ok(bucket)
49 }
50
51 pub async fn list_buckets(&self) -> Result<Vec<BucketItem>> {
56 use std::time::Duration;
57
58 let per_region_timeout = Duration::from_secs(
59 std::env::var("RAPS_REGION_TIMEOUT")
60 .ok()
61 .and_then(|v| v.parse().ok())
62 .unwrap_or(10),
63 );
64
65 let (us_result, emea_result) = tokio::join!(
67 tokio::time::timeout(per_region_timeout, self.list_buckets_in_region(Region::US)),
68 tokio::time::timeout(
69 per_region_timeout,
70 self.list_buckets_in_region(Region::EMEA)
71 ),
72 );
73
74 let mut all_buckets = Vec::new();
75
76 match us_result {
78 Ok(Ok(mut buckets)) => {
79 for bucket in &mut buckets {
80 bucket.region = Some("US".to_string());
81 }
82 all_buckets.extend(buckets);
83 }
84 Ok(Err(e)) => {
85 tracing::warn!(region = "US", error = %e, "Failed to list buckets");
86 }
87 Err(_) => {
88 tracing::warn!(region = "US", timeout = ?per_region_timeout, "Region listing timed out");
89 }
90 }
91
92 match emea_result {
94 Ok(Ok(mut buckets)) => {
95 for bucket in &mut buckets {
96 bucket.region = Some("EMEA".to_string());
97 }
98 all_buckets.extend(buckets);
99 }
100 Ok(Err(e)) => {
101 tracing::warn!(region = "EMEA", error = %e, "Failed to list buckets");
102 }
103 Err(_) => {
104 tracing::warn!(region = "EMEA", timeout = ?per_region_timeout, "Region listing timed out");
105 }
106 }
107
108 Ok(all_buckets)
109 }
110
111 pub async fn list_buckets_streaming(&self) -> Vec<RegionResult> {
117 use std::time::{Duration, Instant};
118
119 let per_region_timeout = Duration::from_secs(
120 std::env::var("RAPS_REGION_TIMEOUT")
121 .ok()
122 .and_then(|v| v.parse().ok())
123 .unwrap_or(10),
124 );
125
126 let us_start = Instant::now();
127 let emea_start = Instant::now();
128
129 let mut us_fut = std::pin::pin!(tokio::time::timeout(
130 per_region_timeout,
131 self.list_buckets_in_region(Region::US),
132 ));
133 let mut emea_fut = std::pin::pin!(tokio::time::timeout(
134 per_region_timeout,
135 self.list_buckets_in_region(Region::EMEA),
136 ));
137
138 let mut results = Vec::with_capacity(2);
139 let mut us_done = false;
140 let mut emea_done = false;
141
142 while !us_done || !emea_done {
143 tokio::select! {
144 us_result = &mut us_fut, if !us_done => {
145 us_done = true;
146 let elapsed = us_start.elapsed();
147 let buckets = match us_result {
148 Ok(Ok(mut buckets)) => {
149 for b in &mut buckets {
150 b.region = Some("US".to_string());
151 }
152 Ok(buckets)
153 }
154 Ok(Err(e)) => Err(e),
155 Err(_) => Err(anyhow::anyhow!(
156 "US region timed out after {:?}",
157 per_region_timeout
158 )),
159 };
160 results.push(RegionResult { region: Region::US, buckets, elapsed });
161 }
162 emea_result = &mut emea_fut, if !emea_done => {
163 emea_done = true;
164 let elapsed = emea_start.elapsed();
165 let buckets = match emea_result {
166 Ok(Ok(mut buckets)) => {
167 for b in &mut buckets {
168 b.region = Some("EMEA".to_string());
169 }
170 Ok(buckets)
171 }
172 Ok(Err(e)) => Err(e),
173 Err(_) => Err(anyhow::anyhow!(
174 "EMEA region timed out after {:?}",
175 per_region_timeout
176 )),
177 };
178 results.push(RegionResult { region: Region::EMEA, buckets, elapsed });
179 }
180 }
181 }
182
183 results
184 }
185
186 async fn list_buckets_in_region(&self, region: Region) -> Result<Vec<BucketItem>> {
188 let token = self.auth.get_token().await?;
189 let mut buckets = Vec::new();
190 let mut start_at: Option<String> = None;
191 const MAX_PAGES: usize = 100;
192
193 for _page in 0..MAX_PAGES {
194 let mut url = format!("{}/buckets", self.config.oss_url());
195 if let Some(ref start) = start_at {
196 url = format!("{}?startAt={}", url, start);
197 }
198
199 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
200 self.http_client
201 .get(&url)
202 .bearer_auth(&token)
203 .header("x-ads-region", region.to_string())
204 })
205 .await?;
206
207 if !response.status().is_success() {
208 let status = response.status();
209 let error_text = response.text().await.unwrap_or_default();
210 anyhow::bail!("Failed to list buckets ({status}): {error_text}");
211 }
212
213 let buckets_response: BucketsResponse = response
214 .json()
215 .await
216 .context("Failed to parse buckets response")?;
217
218 buckets.extend(buckets_response.items);
219
220 if buckets_response.next.is_none() {
221 break;
222 }
223 start_at = buckets_response.next;
224 }
225
226 Ok(buckets)
227 }
228
229 pub async fn get_bucket_details(&self, bucket_key: &str) -> Result<Bucket> {
231 let token = self.auth.get_token().await?;
232 let url = format!("{}/buckets/{}/details", self.config.oss_url(), bucket_key);
233
234 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
235 self.http_client.get(&url).bearer_auth(&token)
236 })
237 .await?;
238
239 if !response.status().is_success() {
240 let status = response.status();
241 let error_text = response.text().await.unwrap_or_default();
242 anyhow::bail!("Failed to get bucket details ({status}): {error_text}");
243 }
244
245 let bucket: Bucket = response
246 .json()
247 .await
248 .context("Failed to parse bucket details")?;
249
250 Ok(bucket)
251 }
252
253 pub async fn delete_bucket(&self, bucket_key: &str) -> Result<()> {
255 let token = self.auth.get_token().await?;
256 let url = format!("{}/buckets/{}", self.config.oss_url(), bucket_key);
257
258 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
259 self.http_client.delete(&url).bearer_auth(&token)
260 })
261 .await?;
262
263 if !response.status().is_success() {
264 let status = response.status();
265 let error_text = response.text().await.unwrap_or_default();
266 anyhow::bail!("Failed to delete bucket ({status}): {error_text}");
267 }
268
269 Ok(())
270 }
271}