Skip to main content

raps_oss/
buckets.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2025 Dmytro Yemelianov
3
4//! Bucket operations for the OSS API.
5
6use anyhow::{Context, Result};
7
8use crate::OssClient;
9use crate::types::*;
10
11impl OssClient {
12    /// Create a new bucket
13    pub async fn create_bucket(
14        &self,
15        bucket_key: &str,
16        policy: RetentionPolicy,
17        region: Region,
18    ) -> Result<Bucket> {
19        let token = self.auth.get_token().await?;
20        let url = format!("{}/buckets", self.config.oss_url());
21
22        let request = CreateBucketRequest {
23            bucket_key: bucket_key.to_string(),
24            policy_key: policy.to_string(),
25        };
26
27        let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
28            self.http_client
29                .post(&url)
30                .bearer_auth(&token)
31                .header("x-ads-region", region.to_string())
32                .header("Content-Type", "application/json")
33                .json(&request)
34        })
35        .await?;
36
37        if !response.status().is_success() {
38            let status = response.status();
39            let error_text = response.text().await.unwrap_or_default();
40            anyhow::bail!("Failed to create bucket ({status}): {error_text}");
41        }
42
43        let bucket: Bucket = response
44            .json()
45            .await
46            .context("Failed to parse bucket response")?;
47
48        Ok(bucket)
49    }
50
51    /// List all buckets from all regions
52    ///
53    /// Queries US and EMEA regions concurrently with a per-region timeout
54    /// to avoid hanging when one region is slow or unreachable.
55    pub async fn list_buckets(&self) -> Result<Vec<BucketItem>> {
56        use std::time::Duration;
57
58        let per_region_timeout = Duration::from_secs(
59            std::env::var("RAPS_REGION_TIMEOUT")
60                .ok()
61                .and_then(|v| v.parse().ok())
62                .unwrap_or(10),
63        );
64
65        // Query both regions concurrently
66        let (us_result, emea_result) = tokio::join!(
67            tokio::time::timeout(per_region_timeout, self.list_buckets_in_region(Region::US)),
68            tokio::time::timeout(
69                per_region_timeout,
70                self.list_buckets_in_region(Region::EMEA)
71            ),
72        );
73
74        let mut all_buckets = Vec::new();
75
76        // Process US region
77        match us_result {
78            Ok(Ok(mut buckets)) => {
79                for bucket in &mut buckets {
80                    bucket.region = Some("US".to_string());
81                }
82                all_buckets.extend(buckets);
83            }
84            Ok(Err(e)) => {
85                tracing::warn!(region = "US", error = %e, "Failed to list buckets");
86            }
87            Err(_) => {
88                tracing::warn!(region = "US", timeout = ?per_region_timeout, "Region listing timed out");
89            }
90        }
91
92        // Process EMEA region
93        match emea_result {
94            Ok(Ok(mut buckets)) => {
95                for bucket in &mut buckets {
96                    bucket.region = Some("EMEA".to_string());
97                }
98                all_buckets.extend(buckets);
99            }
100            Ok(Err(e)) => {
101                tracing::warn!(region = "EMEA", error = %e, "Failed to list buckets");
102            }
103            Err(_) => {
104                tracing::warn!(region = "EMEA", timeout = ?per_region_timeout, "Region listing timed out");
105            }
106        }
107
108        Ok(all_buckets)
109    }
110
111    /// List buckets from all regions, returning results per-region as they complete.
112    ///
113    /// Uses `tokio::select!` so whichever region finishes first yields its
114    /// `RegionResult` immediately. The caller can display partial results
115    /// while waiting for slower regions.
116    pub async fn list_buckets_streaming(&self) -> Vec<RegionResult> {
117        use std::time::{Duration, Instant};
118
119        let per_region_timeout = Duration::from_secs(
120            std::env::var("RAPS_REGION_TIMEOUT")
121                .ok()
122                .and_then(|v| v.parse().ok())
123                .unwrap_or(10),
124        );
125
126        let us_start = Instant::now();
127        let emea_start = Instant::now();
128
129        let mut us_fut = std::pin::pin!(tokio::time::timeout(
130            per_region_timeout,
131            self.list_buckets_in_region(Region::US),
132        ));
133        let mut emea_fut = std::pin::pin!(tokio::time::timeout(
134            per_region_timeout,
135            self.list_buckets_in_region(Region::EMEA),
136        ));
137
138        let mut results = Vec::with_capacity(2);
139        let mut us_done = false;
140        let mut emea_done = false;
141
142        while !us_done || !emea_done {
143            tokio::select! {
144                us_result = &mut us_fut, if !us_done => {
145                    us_done = true;
146                    let elapsed = us_start.elapsed();
147                    let buckets = match us_result {
148                        Ok(Ok(mut buckets)) => {
149                            for b in &mut buckets {
150                                b.region = Some("US".to_string());
151                            }
152                            Ok(buckets)
153                        }
154                        Ok(Err(e)) => Err(e),
155                        Err(_) => Err(anyhow::anyhow!(
156                            "US region timed out after {:?}",
157                            per_region_timeout
158                        )),
159                    };
160                    results.push(RegionResult { region: Region::US, buckets, elapsed });
161                }
162                emea_result = &mut emea_fut, if !emea_done => {
163                    emea_done = true;
164                    let elapsed = emea_start.elapsed();
165                    let buckets = match emea_result {
166                        Ok(Ok(mut buckets)) => {
167                            for b in &mut buckets {
168                                b.region = Some("EMEA".to_string());
169                            }
170                            Ok(buckets)
171                        }
172                        Ok(Err(e)) => Err(e),
173                        Err(_) => Err(anyhow::anyhow!(
174                            "EMEA region timed out after {:?}",
175                            per_region_timeout
176                        )),
177                    };
178                    results.push(RegionResult { region: Region::EMEA, buckets, elapsed });
179                }
180            }
181        }
182
183        results
184    }
185
186    /// List buckets in a specific region
187    async fn list_buckets_in_region(&self, region: Region) -> Result<Vec<BucketItem>> {
188        let token = self.auth.get_token().await?;
189        let mut buckets = Vec::new();
190        let mut start_at: Option<String> = None;
191        const MAX_PAGES: usize = 100;
192
193        for _page in 0..MAX_PAGES {
194            let mut url = format!("{}/buckets", self.config.oss_url());
195            if let Some(ref start) = start_at {
196                url = format!("{}?startAt={}", url, start);
197            }
198
199            let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
200                self.http_client
201                    .get(&url)
202                    .bearer_auth(&token)
203                    .header("x-ads-region", region.to_string())
204            })
205            .await?;
206
207            if !response.status().is_success() {
208                let status = response.status();
209                let error_text = response.text().await.unwrap_or_default();
210                anyhow::bail!("Failed to list buckets ({status}): {error_text}");
211            }
212
213            let buckets_response: BucketsResponse = response
214                .json()
215                .await
216                .context("Failed to parse buckets response")?;
217
218            buckets.extend(buckets_response.items);
219
220            if buckets_response.next.is_none() {
221                break;
222            }
223            start_at = buckets_response.next;
224        }
225
226        Ok(buckets)
227    }
228
229    /// Get bucket details
230    pub async fn get_bucket_details(&self, bucket_key: &str) -> Result<Bucket> {
231        let token = self.auth.get_token().await?;
232        let url = format!("{}/buckets/{}/details", self.config.oss_url(), bucket_key);
233
234        let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
235            self.http_client.get(&url).bearer_auth(&token)
236        })
237        .await?;
238
239        if !response.status().is_success() {
240            let status = response.status();
241            let error_text = response.text().await.unwrap_or_default();
242            anyhow::bail!("Failed to get bucket details ({status}): {error_text}");
243        }
244
245        let bucket: Bucket = response
246            .json()
247            .await
248            .context("Failed to parse bucket details")?;
249
250        Ok(bucket)
251    }
252
253    /// Delete a bucket
254    pub async fn delete_bucket(&self, bucket_key: &str) -> Result<()> {
255        let token = self.auth.get_token().await?;
256        let url = format!("{}/buckets/{}", self.config.oss_url(), bucket_key);
257
258        let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
259            self.http_client.delete(&url).bearer_auth(&token)
260        })
261        .await?;
262
263        if !response.status().is_success() {
264            let status = response.status();
265            let error_text = response.text().await.unwrap_or_default();
266            anyhow::bail!("Failed to delete bucket ({status}): {error_text}");
267        }
268
269        Ok(())
270    }
271}