Skip to main content

aliyun_oss/operations/
bucket_replication.rs

1//! Cross-region replication operations.
2
3use std::sync::Arc;
4
5use crate::client::{BucketOperations, OSSClientInner};
6use crate::error::{ErrorContext, OssError, OssErrorKind, Result};
7use crate::http::client::HttpRequest;
8use crate::types::bucket::BucketName;
9
10pub struct PutBucketReplicationBuilder {
11    client: Arc<OSSClientInner>,
12    bucket: BucketName,
13    body_xml: String,
14}
15
16impl PutBucketReplicationBuilder {
17    pub(crate) fn new(client: Arc<OSSClientInner>, bucket: BucketName, body_xml: String) -> Self {
18        Self {
19            client,
20            bucket,
21            body_xml,
22        }
23    }
24
25    pub async fn send(self) -> Result<PutBucketReplicationOutput> {
26        let endpoint = self.client.endpoint.clone();
27        let uri = format!("https://{}.{}?replication", self.bucket.as_str(), endpoint);
28        let query_params: Vec<(String, String)> = vec![("replication".into(), String::new())];
29
30        let request = HttpRequest::builder()
31            .method(http::Method::PUT)
32            .uri(&uri)
33            .body(bytes::Bytes::from(self.body_xml))
34            .build();
35
36        let response = self
37            .client
38            .send_signed(request, Some(&self.bucket), query_params)
39            .await
40            .map_err(|e| OssError {
41                kind: OssErrorKind::TransportError,
42                context: Box::new(ErrorContext {
43                    operation: Some("PutBucketReplication".into()),
44                    bucket: Some(self.bucket.to_string()),
45                    endpoint: Some(endpoint),
46                    ..Default::default()
47                }),
48                source: Some(Box::new(e)),
49            })?;
50
51        if response.status().is_success() {
52            Ok(PutBucketReplicationOutput {
53                request_id: response
54                    .headers
55                    .get("x-oss-request-id")
56                    .and_then(|v| v.to_str().ok())
57                    .unwrap_or("")
58                    .to_string(),
59            })
60        } else {
61            Err(OssError {
62                kind: OssErrorKind::ServiceError(Box::new(crate::error::OssServiceError {
63                    status_code: response.status().as_u16(),
64                    code: String::new(),
65                    message: String::new(),
66                    request_id: String::new(),
67                    host_id: String::new(),
68                    resource: Some(self.bucket.to_string()),
69                    string_to_sign: None,
70                })),
71                context: Box::new(ErrorContext {
72                    operation: Some("PutBucketReplication".into()),
73                    bucket: Some(self.bucket.to_string()),
74                    ..Default::default()
75                }),
76                source: None,
77            })
78        }
79    }
80}
81
82#[derive(Debug, Clone)]
83pub struct PutBucketReplicationOutput {
84    pub request_id: String,
85}
86
87pub struct GetBucketReplicationBuilder {
88    client: Arc<OSSClientInner>,
89    bucket: BucketName,
90}
91
92impl GetBucketReplicationBuilder {
93    pub(crate) fn new(client: Arc<OSSClientInner>, bucket: BucketName) -> Self {
94        Self { client, bucket }
95    }
96
97    pub async fn send(self) -> Result<GetBucketReplicationOutput> {
98        let endpoint = self.client.endpoint.clone();
99        let uri = format!("https://{}.{}?replication", self.bucket.as_str(), endpoint);
100        let query_params: Vec<(String, String)> = vec![("replication".into(), String::new())];
101
102        let request = HttpRequest::builder()
103            .method(http::Method::GET)
104            .uri(&uri)
105            .build();
106
107        let response = self
108            .client
109            .send_signed(request, Some(&self.bucket), query_params)
110            .await
111            .map_err(|e| OssError {
112                kind: OssErrorKind::TransportError,
113                context: Box::new(ErrorContext {
114                    operation: Some("GetBucketReplication".into()),
115                    bucket: Some(self.bucket.to_string()),
116                    endpoint: Some(endpoint),
117                    ..Default::default()
118                }),
119                source: Some(Box::new(e)),
120            })?;
121
122        if response.is_success() {
123            Ok(GetBucketReplicationOutput {
124                body: response.body_as_str().unwrap_or("").to_string(),
125            })
126        } else {
127            Err(OssError {
128                kind: OssErrorKind::ServiceError(Box::new(crate::error::OssServiceError {
129                    status_code: response.status().as_u16(),
130                    code: String::new(),
131                    message: String::new(),
132                    request_id: String::new(),
133                    host_id: String::new(),
134                    resource: Some(self.bucket.to_string()),
135                    string_to_sign: None,
136                })),
137                context: Box::new(ErrorContext {
138                    operation: Some("GetBucketReplication".into()),
139                    bucket: Some(self.bucket.to_string()),
140                    ..Default::default()
141                }),
142                source: None,
143            })
144        }
145    }
146}
147
148#[derive(Debug, Clone)]
149pub struct GetBucketReplicationOutput {
150    pub body: String,
151}
152
153pub struct DeleteBucketReplicationBuilder {
154    client: Arc<OSSClientInner>,
155    bucket: BucketName,
156    replication_rule_id: String,
157}
158
159impl DeleteBucketReplicationBuilder {
160    pub(crate) fn new(
161        client: Arc<OSSClientInner>,
162        bucket: BucketName,
163        replication_rule_id: String,
164    ) -> Self {
165        Self {
166            client,
167            bucket,
168            replication_rule_id,
169        }
170    }
171
172    pub async fn send(self) -> Result<DeleteBucketReplicationOutput> {
173        let endpoint = self.client.endpoint.clone();
174        let uri = format!(
175            "https://{}.{}?replication&replicationRuleId={}",
176            self.bucket.as_str(),
177            endpoint,
178            self.replication_rule_id
179        );
180        let query_params: Vec<(String, String)> = vec![
181            ("replication".into(), String::new()),
182            ("replicationRuleId".into(), self.replication_rule_id),
183        ];
184
185        let request = HttpRequest::builder()
186            .method(http::Method::DELETE)
187            .uri(&uri)
188            .build();
189
190        let response = self
191            .client
192            .send_signed(request, Some(&self.bucket), query_params)
193            .await
194            .map_err(|e| OssError {
195                kind: OssErrorKind::TransportError,
196                context: Box::new(ErrorContext {
197                    operation: Some("DeleteBucketReplication".into()),
198                    bucket: Some(self.bucket.to_string()),
199                    endpoint: Some(endpoint),
200                    ..Default::default()
201                }),
202                source: Some(Box::new(e)),
203            })?;
204
205        if response.status().is_success() {
206            Ok(DeleteBucketReplicationOutput {
207                request_id: response
208                    .headers
209                    .get("x-oss-request-id")
210                    .and_then(|v| v.to_str().ok())
211                    .unwrap_or("")
212                    .to_string(),
213            })
214        } else {
215            Err(OssError {
216                kind: OssErrorKind::ServiceError(Box::new(crate::error::OssServiceError {
217                    status_code: response.status().as_u16(),
218                    code: String::new(),
219                    message: String::new(),
220                    request_id: String::new(),
221                    host_id: String::new(),
222                    resource: Some(self.bucket.to_string()),
223                    string_to_sign: None,
224                })),
225                context: Box::new(ErrorContext {
226                    operation: Some("DeleteBucketReplication".into()),
227                    bucket: Some(self.bucket.to_string()),
228                    ..Default::default()
229                }),
230                source: None,
231            })
232        }
233    }
234}
235
236#[derive(Debug, Clone)]
237pub struct DeleteBucketReplicationOutput {
238    pub request_id: String,
239}
240
241impl BucketOperations {
242    pub fn put_replication(&self, body_xml: String) -> PutBucketReplicationBuilder {
243        PutBucketReplicationBuilder::new(
244            self.client_inner().clone(),
245            self.bucket_name().clone(),
246            body_xml,
247        )
248    }
249
250    pub fn get_replication(&self) -> GetBucketReplicationBuilder {
251        GetBucketReplicationBuilder::new(self.client_inner().clone(), self.bucket_name().clone())
252    }
253
254    pub fn delete_replication(
255        &self,
256        replication_rule_id: String,
257    ) -> DeleteBucketReplicationBuilder {
258        DeleteBucketReplicationBuilder::new(
259            self.client_inner().clone(),
260            self.bucket_name().clone(),
261            replication_rule_id,
262        )
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use std::sync::Mutex;
269
270    use crate::client::OSSClientInner;
271    use crate::config::credentials::Credentials;
272    use crate::http::client::{HttpClient, HttpRequest, HttpResponse};
273    use crate::types::region::Region;
274
275    use super::*;
276
277    struct RecordingHttpClient {
278        requests: Arc<Mutex<Vec<HttpRequest>>>,
279        status_code: http::StatusCode,
280        response_body: bytes::Bytes,
281    }
282
283    #[async_trait::async_trait]
284    impl HttpClient for RecordingHttpClient {
285        async fn send(&self, request: HttpRequest) -> crate::error::Result<HttpResponse> {
286            self.requests.lock().unwrap().push(request);
287            let mut headers = http::HeaderMap::new();
288            headers.insert(
289                "x-oss-request-id",
290                http::HeaderValue::from_static("rid-repl"),
291            );
292            Ok(HttpResponse {
293                status: self.status_code,
294                headers,
295                body: self.response_body.clone(),
296            })
297        }
298    }
299
300    fn create_test_inner(
301        body: bytes::Bytes,
302    ) -> (Arc<OSSClientInner>, Arc<Mutex<Vec<HttpRequest>>>) {
303        let requests = Arc::new(Mutex::new(Vec::new()));
304        let http = Arc::new(RecordingHttpClient {
305            requests: requests.clone(),
306            status_code: http::StatusCode::OK,
307            response_body: body,
308        });
309        let credentials = Arc::new(crate::config::credentials::StaticCredentialsProvider::new(
310            Credentials::builder()
311                .access_key_id("test-ak")
312                .access_key_secret("test-sk")
313                .build()
314                .unwrap(),
315        ));
316        let inner = Arc::new(OSSClientInner {
317            http,
318            credentials,
319            signer: Arc::from(crate::signer::create_signer(crate::signer::SignVersion::V4)),
320            region: Region::CnHangzhou,
321            endpoint: "oss-cn-hangzhou.aliyuncs.com".into(),
322        });
323        (inner, requests)
324    }
325
326    #[tokio::test]
327    async fn put_replication_sends_request() {
328        let (inner, requests) = create_test_inner(bytes::Bytes::new());
329        let builder = PutBucketReplicationBuilder::new(
330            inner,
331            BucketName::new("test-bucket").unwrap(),
332            "<ReplicationConfiguration/>".into(),
333        );
334        builder.send().await.unwrap();
335        let captured = requests.lock().unwrap();
336        assert_eq!(captured[0].method, http::Method::PUT);
337        assert!(captured[0].uri.contains("?replication"));
338    }
339
340    #[tokio::test]
341    async fn delete_replication_sends_with_rule_id() {
342        let (inner, requests) = create_test_inner(bytes::Bytes::new());
343        let builder = DeleteBucketReplicationBuilder::new(
344            inner,
345            BucketName::new("test-bucket").unwrap(),
346            "rule-1".into(),
347        );
348        builder.send().await.unwrap();
349        let captured = requests.lock().unwrap();
350        assert_eq!(captured[0].method, http::Method::DELETE);
351        assert!(captured[0].uri.contains("replicationRuleId=rule-1"));
352    }
353}