1use anyhow::{Context, Result};
7use std::sync::Arc;
8use tokio::sync::Semaphore;
9
10use crate::OssClient;
11use crate::types::*;
12
13impl OssClient {
14 pub async fn copy_object(
19 &self,
20 src_bucket: &str,
21 object_key: &str,
22 dest_bucket: &str,
23 dest_key: Option<&str>,
24 ) -> Result<ObjectDetails> {
25 let token = self.auth.get_token().await?;
26 let destination_key = dest_key.unwrap_or(object_key);
27 let url = format!(
28 "{}/buckets/{}/objects/{}",
29 self.config.oss_url(),
30 dest_bucket,
31 urlencoding::encode(destination_key)
32 );
33 let copy_from = format!("{}/objects/{}", src_bucket, urlencoding::encode(object_key));
34
35 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
36 self.http_client
37 .put(&url)
38 .bearer_auth(&token)
39 .header("x-ads-copy-from", ©_from)
40 .header("Content-Length", "0")
41 })
42 .await?;
43
44 if !response.status().is_success() {
45 let status = response.status();
46 let error_text = response.text().await.unwrap_or_default();
47 anyhow::bail!(
48 "Failed to copy '{}/{}' to '{}/{}' ({status}): {error_text}",
49 src_bucket,
50 object_key,
51 dest_bucket,
52 destination_key
53 );
54 }
55
56 response
57 .json()
58 .await
59 .context("Failed to parse copy response")
60 }
61
62 pub async fn batch_copy_objects(
65 &self,
66 src_bucket: &str,
67 dest_bucket: &str,
68 object_keys: &[String],
69 ) -> Result<BatchResult<ObjectDetails>> {
70 let semaphore = Arc::new(Semaphore::new(10));
71 let mut join_set = tokio::task::JoinSet::new();
72
73 for key in object_keys {
74 let client = self.clone();
75 let src = src_bucket.to_string();
76 let dest = dest_bucket.to_string();
77 let key = key.clone();
78 let sem = semaphore.clone();
79
80 join_set.spawn(async move {
81 let _permit = sem.acquire().await.expect("semaphore closed unexpectedly");
82 let result = client.copy_object(&src, &key, &dest, None).await;
83 (key, result)
84 });
85 }
86
87 let mut results = Vec::new();
88 let mut succeeded = 0;
89 let mut failed = 0;
90
91 while let Some(res) = join_set.join_next().await {
92 match res {
93 Ok((key, Ok(details))) => {
94 succeeded += 1;
95 results.push(BatchItemResult {
96 key,
97 result: Ok(details),
98 });
99 }
100 Ok((key, Err(e))) => {
101 failed += 1;
102 results.push(BatchItemResult {
103 key,
104 result: Err(e.to_string()),
105 });
106 }
107 Err(e) => {
108 failed += 1;
109 results.push(BatchItemResult {
110 key: "<unknown>".to_string(),
111 result: Err(format!("Task join error: {}", e)),
112 });
113 }
114 }
115 }
116
117 Ok(BatchResult {
118 total: object_keys.len(),
119 succeeded,
120 failed,
121 results,
122 })
123 }
124
125 pub async fn batch_rename_objects(
128 &self,
129 bucket: &str,
130 renames: &[(String, String)],
131 ) -> Result<BatchResult<ObjectDetails>> {
132 let semaphore = Arc::new(Semaphore::new(10));
133 let mut join_set = tokio::task::JoinSet::new();
134
135 for (old_key, new_key) in renames {
136 let client = self.clone();
137 let bucket = bucket.to_string();
138 let old = old_key.clone();
139 let new = new_key.clone();
140 let sem = semaphore.clone();
141
142 join_set.spawn(async move {
143 let _permit = sem.acquire().await.expect("semaphore closed unexpectedly");
144 let copy_result = client.copy_object(&bucket, &old, &bucket, Some(&new)).await;
146 match copy_result {
147 Ok(details) => {
148 if let Err(e) = client.delete_object(&bucket, &old).await {
150 return (
152 old,
153 Err(anyhow::anyhow!(
154 "Copied to '{}' but failed to delete original: {}",
155 new,
156 e
157 )),
158 );
159 }
160 (old, Ok(details))
161 }
162 Err(e) => (old, Err(e)),
163 }
164 });
165 }
166
167 let mut results = Vec::new();
168 let mut succeeded = 0;
169 let mut failed = 0;
170
171 while let Some(res) = join_set.join_next().await {
172 match res {
173 Ok((key, Ok(details))) => {
174 succeeded += 1;
175 results.push(BatchItemResult {
176 key,
177 result: Ok(details),
178 });
179 }
180 Ok((key, Err(e))) => {
181 failed += 1;
182 results.push(BatchItemResult {
183 key,
184 result: Err(e.to_string()),
185 });
186 }
187 Err(e) => {
188 failed += 1;
189 results.push(BatchItemResult {
190 key: "<unknown>".to_string(),
191 result: Err(format!("Task join error: {}", e)),
192 });
193 }
194 }
195 }
196
197 Ok(BatchResult {
198 total: renames.len(),
199 succeeded,
200 failed,
201 results,
202 })
203 }
204}