Skip to main content

raps_oss/
batch.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2025 Dmytro Yemelianov
3
4//! Copy and batch operations for the OSS API.
5
6use anyhow::{Context, Result};
7use std::sync::Arc;
8use tokio::sync::Semaphore;
9
10use crate::OssClient;
11use crate::types::*;
12
13impl OssClient {
14    // ============== COPY & BATCH OPERATIONS ==============
15
16    /// Server-side copy of an object using `x-ads-copy-from` header.
17    /// Much more efficient than download+re-upload for same-region copies.
18    pub async fn copy_object(
19        &self,
20        src_bucket: &str,
21        object_key: &str,
22        dest_bucket: &str,
23        dest_key: Option<&str>,
24    ) -> Result<ObjectDetails> {
25        let token = self.auth.get_token().await?;
26        let destination_key = dest_key.unwrap_or(object_key);
27        let url = format!(
28            "{}/buckets/{}/objects/{}",
29            self.config.oss_url(),
30            dest_bucket,
31            urlencoding::encode(destination_key)
32        );
33        let copy_from = format!("{}/objects/{}", src_bucket, urlencoding::encode(object_key));
34
35        let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
36            self.http_client
37                .put(&url)
38                .bearer_auth(&token)
39                .header("x-ads-copy-from", &copy_from)
40                .header("Content-Length", "0")
41        })
42        .await?;
43
44        if !response.status().is_success() {
45            let status = response.status();
46            let error_text = response.text().await.unwrap_or_default();
47            anyhow::bail!(
48                "Failed to copy '{}/{}' to '{}/{}' ({status}): {error_text}",
49                src_bucket,
50                object_key,
51                dest_bucket,
52                destination_key
53            );
54        }
55
56        response
57            .json()
58            .await
59            .context("Failed to parse copy response")
60    }
61
62    /// Batch copy objects from one bucket to another with concurrent execution.
63    /// Uses a semaphore to limit concurrency to 10 concurrent requests.
64    pub async fn batch_copy_objects(
65        &self,
66        src_bucket: &str,
67        dest_bucket: &str,
68        object_keys: &[String],
69    ) -> Result<BatchResult<ObjectDetails>> {
70        let semaphore = Arc::new(Semaphore::new(10));
71        let mut join_set = tokio::task::JoinSet::new();
72
73        for key in object_keys {
74            let client = self.clone();
75            let src = src_bucket.to_string();
76            let dest = dest_bucket.to_string();
77            let key = key.clone();
78            let sem = semaphore.clone();
79
80            join_set.spawn(async move {
81                let _permit = sem.acquire().await.expect("semaphore closed unexpectedly");
82                let result = client.copy_object(&src, &key, &dest, None).await;
83                (key, result)
84            });
85        }
86
87        let mut results = Vec::new();
88        let mut succeeded = 0;
89        let mut failed = 0;
90
91        while let Some(res) = join_set.join_next().await {
92            match res {
93                Ok((key, Ok(details))) => {
94                    succeeded += 1;
95                    results.push(BatchItemResult {
96                        key,
97                        result: Ok(details),
98                    });
99                }
100                Ok((key, Err(e))) => {
101                    failed += 1;
102                    results.push(BatchItemResult {
103                        key,
104                        result: Err(e.to_string()),
105                    });
106                }
107                Err(e) => {
108                    failed += 1;
109                    results.push(BatchItemResult {
110                        key: "<unknown>".to_string(),
111                        result: Err(format!("Task join error: {}", e)),
112                    });
113                }
114            }
115        }
116
117        Ok(BatchResult {
118            total: object_keys.len(),
119            succeeded,
120            failed,
121            results,
122        })
123    }
124
125    /// Batch rename objects within a bucket (copy to new key, then delete old key).
126    /// Uses a semaphore to limit concurrency to 10 concurrent requests.
127    pub async fn batch_rename_objects(
128        &self,
129        bucket: &str,
130        renames: &[(String, String)],
131    ) -> Result<BatchResult<ObjectDetails>> {
132        let semaphore = Arc::new(Semaphore::new(10));
133        let mut join_set = tokio::task::JoinSet::new();
134
135        for (old_key, new_key) in renames {
136            let client = self.clone();
137            let bucket = bucket.to_string();
138            let old = old_key.clone();
139            let new = new_key.clone();
140            let sem = semaphore.clone();
141
142            join_set.spawn(async move {
143                let _permit = sem.acquire().await.expect("semaphore closed unexpectedly");
144                // Copy to new key
145                let copy_result = client.copy_object(&bucket, &old, &bucket, Some(&new)).await;
146                match copy_result {
147                    Ok(details) => {
148                        // Delete old key on successful copy
149                        if let Err(e) = client.delete_object(&bucket, &old).await {
150                            // Copy succeeded but delete failed — partial success
151                            return (
152                                old,
153                                Err(anyhow::anyhow!(
154                                    "Copied to '{}' but failed to delete original: {}",
155                                    new,
156                                    e
157                                )),
158                            );
159                        }
160                        (old, Ok(details))
161                    }
162                    Err(e) => (old, Err(e)),
163                }
164            });
165        }
166
167        let mut results = Vec::new();
168        let mut succeeded = 0;
169        let mut failed = 0;
170
171        while let Some(res) = join_set.join_next().await {
172            match res {
173                Ok((key, Ok(details))) => {
174                    succeeded += 1;
175                    results.push(BatchItemResult {
176                        key,
177                        result: Ok(details),
178                    });
179                }
180                Ok((key, Err(e))) => {
181                    failed += 1;
182                    results.push(BatchItemResult {
183                        key,
184                        result: Err(e.to_string()),
185                    });
186                }
187                Err(e) => {
188                    failed += 1;
189                    results.push(BatchItemResult {
190                        key: "<unknown>".to_string(),
191                        result: Err(format!("Task join error: {}", e)),
192                    });
193                }
194            }
195        }
196
197        Ok(BatchResult {
198            total: renames.len(),
199            succeeded,
200            failed,
201            results,
202        })
203    }
204}