Skip to main content

aliyun_oss/operations/
object_append.rs

1//! Append object operations.
2
3use std::sync::Arc;
4
5use crate::client::{BucketOperations, OSSClientInner};
6use crate::error::{ErrorContext, OssError, OssErrorKind, Result};
7use crate::http::client::HttpRequest;
8use crate::types::bucket::BucketName;
9use crate::types::object::ObjectKey;
10use crate::util::uri::oss_endpoint_url;
11
12pub struct AppendObjectBuilder {
13    client: Arc<OSSClientInner>,
14    bucket: BucketName,
15    key: ObjectKey,
16    position: u64,
17    body: bytes::Bytes,
18    content_type: Option<String>,
19    content_md5: Option<String>,
20    content_encoding: Option<String>,
21    metadata: Vec<(String, String)>,
22}
23
24impl AppendObjectBuilder {
25    pub(crate) fn new(
26        client: Arc<OSSClientInner>,
27        bucket: BucketName,
28        key: ObjectKey,
29        position: u64,
30        body: impl Into<bytes::Bytes>,
31    ) -> Self {
32        Self {
33            client,
34            bucket,
35            key,
36            position,
37            body: body.into(),
38            content_type: None,
39            content_md5: None,
40            content_encoding: None,
41            metadata: Vec::new(),
42        }
43    }
44
45    pub fn content_type(mut self, ct: impl Into<String>) -> Self {
46        self.content_type = Some(ct.into());
47        self
48    }
49
50    pub fn content_md5(mut self, md5: impl Into<String>) -> Self {
51        self.content_md5 = Some(md5.into());
52        self
53    }
54
55    pub fn content_encoding(mut self, ce: impl Into<String>) -> Self {
56        self.content_encoding = Some(ce.into());
57        self
58    }
59
60    pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
61        self.metadata.push((key.into(), value.into()));
62        self
63    }
64
65    pub async fn send(self) -> Result<AppendObjectOutput> {
66        let endpoint = self.client.endpoint.clone();
67        let uri = oss_endpoint_url(
68            &endpoint,
69            Some(self.bucket.as_str()),
70            Some(self.key.as_str()),
71        );
72
73        let query_string = format!("?append&position={}", self.position);
74        let full_uri = format!("{}{}", uri, query_string);
75
76        let mut req = HttpRequest::builder()
77            .method(http::Method::POST)
78            .uri(&full_uri);
79
80        if let Some(ref ct) = self.content_type {
81            req = req.header(
82                http::HeaderName::from_static("content-type"),
83                http::HeaderValue::from_str(ct).map_err(|e| OssError {
84                    kind: OssErrorKind::ValidationError,
85                    context: Box::new(ErrorContext {
86                        operation: Some("set content-type header".into()),
87                        bucket: Some(self.bucket.to_string()),
88                        object_key: Some(self.key.to_string()),
89                        ..Default::default()
90                    }),
91                    source: Some(Box::new(e)),
92                })?,
93            );
94        }
95
96        if let Some(ref md5) = self.content_md5 {
97            req = req.header(
98                http::HeaderName::from_static("content-md5"),
99                http::HeaderValue::from_str(md5).map_err(|e| OssError {
100                    kind: OssErrorKind::ValidationError,
101                    context: Box::new(ErrorContext {
102                        operation: Some("set content-md5 header".into()),
103                        bucket: Some(self.bucket.to_string()),
104                        object_key: Some(self.key.to_string()),
105                        ..Default::default()
106                    }),
107                    source: Some(Box::new(e)),
108                })?,
109            );
110        }
111
112        if let Some(ref ce) = self.content_encoding {
113            req = req.header(
114                http::HeaderName::from_static("content-encoding"),
115                http::HeaderValue::from_str(ce).map_err(|e| OssError {
116                    kind: OssErrorKind::ValidationError,
117                    context: Box::new(ErrorContext {
118                        operation: Some("set content-encoding header".into()),
119                        bucket: Some(self.bucket.to_string()),
120                        object_key: Some(self.key.to_string()),
121                        ..Default::default()
122                    }),
123                    source: Some(Box::new(e)),
124                })?,
125            );
126        }
127
128        for (k, v) in &self.metadata {
129            let header_name = http::HeaderName::from_bytes(k.as_bytes()).map_err(|e| OssError {
130                kind: OssErrorKind::ValidationError,
131                context: Box::new(ErrorContext {
132                    operation: Some(format!("set metadata header '{}'", k)),
133                    bucket: Some(self.bucket.to_string()),
134                    object_key: Some(self.key.to_string()),
135                    ..Default::default()
136                }),
137                source: Some(Box::new(e)),
138            })?;
139            req = req.header(
140                header_name,
141                http::HeaderValue::from_str(v).map_err(|e| OssError {
142                    kind: OssErrorKind::ValidationError,
143                    context: Box::new(ErrorContext {
144                        operation: Some(format!("set metadata header value '{}'", k)),
145                        bucket: Some(self.bucket.to_string()),
146                        object_key: Some(self.key.to_string()),
147                        ..Default::default()
148                    }),
149                    source: Some(Box::new(e)),
150                })?,
151            );
152        }
153
154        let query_params = vec![
155            ("append".into(), "".into()),
156            ("position".into(), self.position.to_string()),
157        ];
158
159        let body_len = self.body.len();
160        let request = req.body(self.body).build();
161
162        let response = self
163            .client
164            .send_signed(request, Some(&self.bucket), query_params)
165            .await
166            .map_err(|e| OssError {
167                kind: OssErrorKind::TransportError,
168                context: Box::new(ErrorContext {
169                    operation: Some("AppendObject".into()),
170                    bucket: Some(self.bucket.to_string()),
171                    object_key: Some(self.key.to_string()),
172                    endpoint: Some(endpoint),
173                    ..Default::default()
174                }),
175                source: Some(Box::new(e)),
176            })?;
177
178        if response.is_success() {
179            let request_id = response
180                .headers
181                .get("x-oss-request-id")
182                .and_then(|v| v.to_str().ok())
183                .unwrap_or("")
184                .to_string();
185
186            let next_position = response
187                .headers
188                .get("x-oss-next-append-position")
189                .and_then(|v| v.to_str().ok())
190                .and_then(|s| s.parse::<u64>().ok())
191                .unwrap_or(self.position + body_len as u64);
192
193            let hash_crc64 = response
194                .headers
195                .get("x-oss-hash-crc64ecma")
196                .and_then(|v| v.to_str().ok())
197                .map(|s| s.to_string());
198
199            Ok(AppendObjectOutput {
200                request_id,
201                next_position,
202                hash_crc64,
203            })
204        } else {
205            Err(OssError {
206                kind: OssErrorKind::ServiceError(Box::new(crate::error::OssServiceError {
207                    status_code: response.status().as_u16(),
208                    code: String::new(),
209                    message: String::new(),
210                    request_id: String::new(),
211                    host_id: String::new(),
212                    resource: Some(self.key.to_string()),
213                    string_to_sign: None,
214                })),
215                context: Box::new(ErrorContext {
216                    operation: Some("AppendObject".into()),
217                    bucket: Some(self.bucket.to_string()),
218                    object_key: Some(self.key.to_string()),
219                    ..Default::default()
220                }),
221                source: None,
222            })
223        }
224    }
225}
226
227#[derive(Debug, Clone)]
228pub struct AppendObjectOutput {
229    pub request_id: String,
230    pub next_position: u64,
231    pub hash_crc64: Option<String>,
232}
233
234impl BucketOperations {
235    pub fn append_object(
236        &self,
237        key: impl Into<String>,
238        position: u64,
239        body: impl Into<bytes::Bytes>,
240    ) -> Result<AppendObjectBuilder> {
241        let object_key = ObjectKey::new(key.into())?;
242        Ok(AppendObjectBuilder::new(
243            self.client_inner().clone(),
244            self.bucket_name().clone(),
245            object_key,
246            position,
247            body,
248        ))
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use std::str::FromStr;
255    use std::sync::Mutex;
256
257    use http::HeaderMap;
258
259    use crate::client::OSSClientInner;
260    use crate::config::credentials::Credentials;
261    use crate::http::client::{HttpClient, HttpRequest, HttpResponse};
262    use crate::types::region::Region;
263
264    use super::*;
265
266    struct RecordingHttpClient {
267        requests: Arc<Mutex<Vec<HttpRequest>>>,
268    }
269
270    #[async_trait::async_trait]
271    impl HttpClient for RecordingHttpClient {
272        async fn send(&self, request: HttpRequest) -> crate::error::Result<HttpResponse> {
273            self.requests.lock().unwrap().push(request);
274            let mut headers = HeaderMap::new();
275            headers.insert(
276                "x-oss-request-id",
277                http::HeaderValue::from_static("rid-append"),
278            );
279            headers.insert(
280                "x-oss-next-append-position",
281                http::HeaderValue::from_static("11"),
282            );
283            Ok(HttpResponse {
284                status: http::StatusCode::OK,
285                headers,
286                body: bytes::Bytes::new(),
287            })
288        }
289    }
290
291    fn create_test_inner() -> (Arc<OSSClientInner>, Arc<Mutex<Vec<HttpRequest>>>) {
292        let requests = Arc::new(Mutex::new(Vec::new()));
293        let http = Arc::new(RecordingHttpClient {
294            requests: requests.clone(),
295        });
296        let credentials = Arc::new(crate::config::credentials::StaticCredentialsProvider::new(
297            Credentials::builder()
298                .access_key_id("test-ak")
299                .access_key_secret("test-sk")
300                .build()
301                .unwrap(),
302        ));
303        let inner = Arc::new(OSSClientInner {
304            http,
305            credentials,
306            signer: Arc::from(crate::signer::create_signer(crate::signer::SignVersion::V4)),
307            region: Region::CnHangzhou,
308            endpoint: "oss-cn-hangzhou.aliyuncs.com".into(),
309        });
310        (inner, requests)
311    }
312
313    #[tokio::test]
314    async fn append_object_first_position_zero() {
315        let (inner, requests) = create_test_inner();
316        let bucket = BucketName::new("test-bucket").unwrap();
317        let builder = AppendObjectBuilder::new(
318            inner,
319            bucket,
320            ObjectKey::new("append.txt").unwrap(),
321            0,
322            bytes::Bytes::from_static(b"hello world"),
323        );
324
325        let output = builder.send().await.unwrap();
326        assert_eq!(output.next_position, 11);
327
328        let captured = requests.lock().unwrap();
329        assert!(captured[0].uri.contains("?append&position=0"));
330        assert_eq!(captured[0].method, http::Method::POST);
331    }
332
333    #[tokio::test]
334    async fn append_object_returns_next_position() {
335        let (inner, _) = create_test_inner();
336        let bucket = BucketName::new("test-bucket").unwrap();
337        let builder = AppendObjectBuilder::new(
338            inner,
339            bucket,
340            ObjectKey::new("append.txt").unwrap(),
341            11,
342            bytes::Bytes::from_static(b" more data"),
343        );
344
345        let output = builder.send().await.unwrap();
346        assert_eq!(output.next_position, 11);
347        assert!(!output.request_id.is_empty());
348    }
349
350    #[tokio::test]
351    async fn append_object_with_content_type() {
352        let (inner, requests) = create_test_inner();
353        let bucket = BucketName::new("test-bucket").unwrap();
354        let builder = AppendObjectBuilder::new(
355            inner,
356            bucket,
357            ObjectKey::new("append.txt").unwrap(),
358            0,
359            bytes::Bytes::from_static(b"data"),
360        );
361
362        builder.content_type("text/plain").send().await.unwrap();
363
364        let captured = requests.lock().unwrap();
365        assert_eq!(
366            captured[0]
367                .headers
368                .get("content-type")
369                .unwrap()
370                .to_str()
371                .unwrap(),
372            "text/plain"
373        );
374    }
375
376    #[tokio::test]
377    #[ignore = "requires valid OSS credentials"]
378    async fn e2e_append_object() {
379        let ak = std::env::var("OSS_ACCESS_KEY_ID").expect("OSS_ACCESS_KEY_ID not set");
380        let sk = std::env::var("OSS_ACCESS_KEY_SECRET").expect("OSS_ACCESS_KEY_SECRET not set");
381        let region_str = std::env::var("OSS_REGION").unwrap_or_else(|_| "cn-wulanchabu".into());
382        let bucket_str = std::env::var("OSS_BUCKET").expect("OSS_BUCKET not set");
383
384        let region = Region::from_str(&region_str).unwrap_or_else(|_| Region::Custom {
385            endpoint: format!("oss-{}.aliyuncs.com", region_str),
386            region_id: region_str.clone(),
387        });
388
389        let client = crate::client::OSSClient::builder()
390            .region(region)
391            .credentials(ak, sk)
392            .build()
393            .unwrap();
394
395        let key = format!("test-append-{}.txt", chrono::Utc::now().timestamp());
396
397        let output1 = client
398            .bucket(&bucket_str)
399            .unwrap()
400            .append_object(&key, 0, bytes::Bytes::from_static(b"hello "))
401            .unwrap()
402            .send()
403            .await
404            .unwrap();
405
406        assert_eq!(output1.next_position, 6);
407
408        let output2 = client
409            .bucket(&bucket_str)
410            .unwrap()
411            .append_object(
412                &key,
413                output1.next_position,
414                bytes::Bytes::from_static(b"world"),
415            )
416            .unwrap()
417            .send()
418            .await
419            .unwrap();
420
421        assert_eq!(output2.next_position, 11);
422
423        let get_output = client
424            .bucket(&bucket_str)
425            .unwrap()
426            .get_object(&key)
427            .unwrap()
428            .send()
429            .await
430            .unwrap();
431        assert_eq!(
432            get_output.body.as_ref(),
433            bytes::Bytes::from_static(b"hello world")
434        );
435
436        client
437            .bucket(&bucket_str)
438            .unwrap()
439            .delete_object(&key)
440            .unwrap()
441            .send()
442            .await
443            .unwrap();
444
445        eprintln!("APPEND E2E '{}' succeeded: 2 appends, total 11 bytes", key);
446    }
447}