1use std::sync::Arc;
4
5use crate::client::{BucketOperations, OSSClientInner};
6use crate::error::{ErrorContext, OssError, OssErrorKind, Result};
7use crate::http::client::HttpRequest;
8use crate::types::bucket::BucketName;
9
10pub struct PutBucketReplicationBuilder {
11 client: Arc<OSSClientInner>,
12 bucket: BucketName,
13 body_xml: String,
14}
15
16impl PutBucketReplicationBuilder {
17 pub(crate) fn new(client: Arc<OSSClientInner>, bucket: BucketName, body_xml: String) -> Self {
18 Self {
19 client,
20 bucket,
21 body_xml,
22 }
23 }
24
25 pub async fn send(self) -> Result<PutBucketReplicationOutput> {
26 let endpoint = self.client.endpoint.clone();
27 let uri = format!("https://{}.{}?replication", self.bucket.as_str(), endpoint);
28 let query_params: Vec<(String, String)> = vec![("replication".into(), String::new())];
29
30 let request = HttpRequest::builder()
31 .method(http::Method::PUT)
32 .uri(&uri)
33 .body(bytes::Bytes::from(self.body_xml))
34 .build();
35
36 let response = self
37 .client
38 .send_signed(request, Some(&self.bucket), query_params)
39 .await
40 .map_err(|e| OssError {
41 kind: OssErrorKind::TransportError,
42 context: Box::new(ErrorContext {
43 operation: Some("PutBucketReplication".into()),
44 bucket: Some(self.bucket.to_string()),
45 endpoint: Some(endpoint),
46 ..Default::default()
47 }),
48 source: Some(Box::new(e)),
49 })?;
50
51 if response.status().is_success() {
52 Ok(PutBucketReplicationOutput {
53 request_id: response
54 .headers
55 .get("x-oss-request-id")
56 .and_then(|v| v.to_str().ok())
57 .unwrap_or("")
58 .to_string(),
59 })
60 } else {
61 Err(OssError {
62 kind: OssErrorKind::ServiceError(Box::new(crate::error::OssServiceError {
63 status_code: response.status().as_u16(),
64 code: String::new(),
65 message: String::new(),
66 request_id: String::new(),
67 host_id: String::new(),
68 resource: Some(self.bucket.to_string()),
69 string_to_sign: None,
70 })),
71 context: Box::new(ErrorContext {
72 operation: Some("PutBucketReplication".into()),
73 bucket: Some(self.bucket.to_string()),
74 ..Default::default()
75 }),
76 source: None,
77 })
78 }
79 }
80}
81
82#[derive(Debug, Clone)]
83pub struct PutBucketReplicationOutput {
84 pub request_id: String,
85}
86
87pub struct GetBucketReplicationBuilder {
88 client: Arc<OSSClientInner>,
89 bucket: BucketName,
90}
91
92impl GetBucketReplicationBuilder {
93 pub(crate) fn new(client: Arc<OSSClientInner>, bucket: BucketName) -> Self {
94 Self { client, bucket }
95 }
96
97 pub async fn send(self) -> Result<GetBucketReplicationOutput> {
98 let endpoint = self.client.endpoint.clone();
99 let uri = format!("https://{}.{}?replication", self.bucket.as_str(), endpoint);
100 let query_params: Vec<(String, String)> = vec![("replication".into(), String::new())];
101
102 let request = HttpRequest::builder()
103 .method(http::Method::GET)
104 .uri(&uri)
105 .build();
106
107 let response = self
108 .client
109 .send_signed(request, Some(&self.bucket), query_params)
110 .await
111 .map_err(|e| OssError {
112 kind: OssErrorKind::TransportError,
113 context: Box::new(ErrorContext {
114 operation: Some("GetBucketReplication".into()),
115 bucket: Some(self.bucket.to_string()),
116 endpoint: Some(endpoint),
117 ..Default::default()
118 }),
119 source: Some(Box::new(e)),
120 })?;
121
122 if response.is_success() {
123 Ok(GetBucketReplicationOutput {
124 body: response.body_as_str().unwrap_or("").to_string(),
125 })
126 } else {
127 Err(OssError {
128 kind: OssErrorKind::ServiceError(Box::new(crate::error::OssServiceError {
129 status_code: response.status().as_u16(),
130 code: String::new(),
131 message: String::new(),
132 request_id: String::new(),
133 host_id: String::new(),
134 resource: Some(self.bucket.to_string()),
135 string_to_sign: None,
136 })),
137 context: Box::new(ErrorContext {
138 operation: Some("GetBucketReplication".into()),
139 bucket: Some(self.bucket.to_string()),
140 ..Default::default()
141 }),
142 source: None,
143 })
144 }
145 }
146}
147
148#[derive(Debug, Clone)]
149pub struct GetBucketReplicationOutput {
150 pub body: String,
151}
152
153pub struct DeleteBucketReplicationBuilder {
154 client: Arc<OSSClientInner>,
155 bucket: BucketName,
156 replication_rule_id: String,
157}
158
159impl DeleteBucketReplicationBuilder {
160 pub(crate) fn new(
161 client: Arc<OSSClientInner>,
162 bucket: BucketName,
163 replication_rule_id: String,
164 ) -> Self {
165 Self {
166 client,
167 bucket,
168 replication_rule_id,
169 }
170 }
171
172 pub async fn send(self) -> Result<DeleteBucketReplicationOutput> {
173 let endpoint = self.client.endpoint.clone();
174 let uri = format!(
175 "https://{}.{}?replication&replicationRuleId={}",
176 self.bucket.as_str(),
177 endpoint,
178 self.replication_rule_id
179 );
180 let query_params: Vec<(String, String)> = vec![
181 ("replication".into(), String::new()),
182 ("replicationRuleId".into(), self.replication_rule_id),
183 ];
184
185 let request = HttpRequest::builder()
186 .method(http::Method::DELETE)
187 .uri(&uri)
188 .build();
189
190 let response = self
191 .client
192 .send_signed(request, Some(&self.bucket), query_params)
193 .await
194 .map_err(|e| OssError {
195 kind: OssErrorKind::TransportError,
196 context: Box::new(ErrorContext {
197 operation: Some("DeleteBucketReplication".into()),
198 bucket: Some(self.bucket.to_string()),
199 endpoint: Some(endpoint),
200 ..Default::default()
201 }),
202 source: Some(Box::new(e)),
203 })?;
204
205 if response.status().is_success() {
206 Ok(DeleteBucketReplicationOutput {
207 request_id: response
208 .headers
209 .get("x-oss-request-id")
210 .and_then(|v| v.to_str().ok())
211 .unwrap_or("")
212 .to_string(),
213 })
214 } else {
215 Err(OssError {
216 kind: OssErrorKind::ServiceError(Box::new(crate::error::OssServiceError {
217 status_code: response.status().as_u16(),
218 code: String::new(),
219 message: String::new(),
220 request_id: String::new(),
221 host_id: String::new(),
222 resource: Some(self.bucket.to_string()),
223 string_to_sign: None,
224 })),
225 context: Box::new(ErrorContext {
226 operation: Some("DeleteBucketReplication".into()),
227 bucket: Some(self.bucket.to_string()),
228 ..Default::default()
229 }),
230 source: None,
231 })
232 }
233 }
234}
235
236#[derive(Debug, Clone)]
237pub struct DeleteBucketReplicationOutput {
238 pub request_id: String,
239}
240
241impl BucketOperations {
242 pub fn put_replication(&self, body_xml: String) -> PutBucketReplicationBuilder {
243 PutBucketReplicationBuilder::new(
244 self.client_inner().clone(),
245 self.bucket_name().clone(),
246 body_xml,
247 )
248 }
249
250 pub fn get_replication(&self) -> GetBucketReplicationBuilder {
251 GetBucketReplicationBuilder::new(self.client_inner().clone(), self.bucket_name().clone())
252 }
253
254 pub fn delete_replication(
255 &self,
256 replication_rule_id: String,
257 ) -> DeleteBucketReplicationBuilder {
258 DeleteBucketReplicationBuilder::new(
259 self.client_inner().clone(),
260 self.bucket_name().clone(),
261 replication_rule_id,
262 )
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use std::sync::Mutex;
269
270 use crate::client::OSSClientInner;
271 use crate::config::credentials::Credentials;
272 use crate::http::client::{HttpClient, HttpRequest, HttpResponse};
273 use crate::types::region::Region;
274
275 use super::*;
276
277 struct RecordingHttpClient {
278 requests: Arc<Mutex<Vec<HttpRequest>>>,
279 status_code: http::StatusCode,
280 response_body: bytes::Bytes,
281 }
282
283 #[async_trait::async_trait]
284 impl HttpClient for RecordingHttpClient {
285 async fn send(&self, request: HttpRequest) -> crate::error::Result<HttpResponse> {
286 self.requests.lock().unwrap().push(request);
287 let mut headers = http::HeaderMap::new();
288 headers.insert(
289 "x-oss-request-id",
290 http::HeaderValue::from_static("rid-repl"),
291 );
292 Ok(HttpResponse {
293 status: self.status_code,
294 headers,
295 body: self.response_body.clone(),
296 })
297 }
298 }
299
300 fn create_test_inner(
301 body: bytes::Bytes,
302 ) -> (Arc<OSSClientInner>, Arc<Mutex<Vec<HttpRequest>>>) {
303 let requests = Arc::new(Mutex::new(Vec::new()));
304 let http = Arc::new(RecordingHttpClient {
305 requests: requests.clone(),
306 status_code: http::StatusCode::OK,
307 response_body: body,
308 });
309 let credentials = Arc::new(crate::config::credentials::StaticCredentialsProvider::new(
310 Credentials::builder()
311 .access_key_id("test-ak")
312 .access_key_secret("test-sk")
313 .build()
314 .unwrap(),
315 ));
316 let inner = Arc::new(OSSClientInner {
317 http,
318 credentials,
319 signer: Arc::from(crate::signer::create_signer(crate::signer::SignVersion::V4)),
320 region: Region::CnHangzhou,
321 endpoint: "oss-cn-hangzhou.aliyuncs.com".into(),
322 });
323 (inner, requests)
324 }
325
326 #[tokio::test]
327 async fn put_replication_sends_request() {
328 let (inner, requests) = create_test_inner(bytes::Bytes::new());
329 let builder = PutBucketReplicationBuilder::new(
330 inner,
331 BucketName::new("test-bucket").unwrap(),
332 "<ReplicationConfiguration/>".into(),
333 );
334 builder.send().await.unwrap();
335 let captured = requests.lock().unwrap();
336 assert_eq!(captured[0].method, http::Method::PUT);
337 assert!(captured[0].uri.contains("?replication"));
338 }
339
340 #[tokio::test]
341 async fn delete_replication_sends_with_rule_id() {
342 let (inner, requests) = create_test_inner(bytes::Bytes::new());
343 let builder = DeleteBucketReplicationBuilder::new(
344 inner,
345 BucketName::new("test-bucket").unwrap(),
346 "rule-1".into(),
347 );
348 builder.send().await.unwrap();
349 let captured = requests.lock().unwrap();
350 assert_eq!(captured[0].method, http::Method::DELETE);
351 assert!(captured[0].uri.contains("replicationRuleId=rule-1"));
352 }
353}