1use std::sync::Arc;
4
5use crate::client::{BucketOperations, OSSClientInner};
6use crate::error::{ErrorContext, OssError, OssErrorKind, Result};
7use crate::http::client::HttpRequest;
8use crate::types::bucket::BucketName;
9use crate::types::object::ObjectKey;
10use crate::util::uri::oss_endpoint_url;
11
12pub struct AppendObjectBuilder {
13 client: Arc<OSSClientInner>,
14 bucket: BucketName,
15 key: ObjectKey,
16 position: u64,
17 body: bytes::Bytes,
18 content_type: Option<String>,
19 content_md5: Option<String>,
20 content_encoding: Option<String>,
21 metadata: Vec<(String, String)>,
22}
23
24impl AppendObjectBuilder {
25 pub(crate) fn new(
26 client: Arc<OSSClientInner>,
27 bucket: BucketName,
28 key: ObjectKey,
29 position: u64,
30 body: impl Into<bytes::Bytes>,
31 ) -> Self {
32 Self {
33 client,
34 bucket,
35 key,
36 position,
37 body: body.into(),
38 content_type: None,
39 content_md5: None,
40 content_encoding: None,
41 metadata: Vec::new(),
42 }
43 }
44
45 pub fn content_type(mut self, ct: impl Into<String>) -> Self {
46 self.content_type = Some(ct.into());
47 self
48 }
49
50 pub fn content_md5(mut self, md5: impl Into<String>) -> Self {
51 self.content_md5 = Some(md5.into());
52 self
53 }
54
55 pub fn content_encoding(mut self, ce: impl Into<String>) -> Self {
56 self.content_encoding = Some(ce.into());
57 self
58 }
59
60 pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
61 self.metadata.push((key.into(), value.into()));
62 self
63 }
64
65 pub async fn send(self) -> Result<AppendObjectOutput> {
66 let endpoint = self.client.endpoint.clone();
67 let uri = oss_endpoint_url(
68 &endpoint,
69 Some(self.bucket.as_str()),
70 Some(self.key.as_str()),
71 );
72
73 let query_string = format!("?append&position={}", self.position);
74 let full_uri = format!("{}{}", uri, query_string);
75
76 let mut req = HttpRequest::builder()
77 .method(http::Method::POST)
78 .uri(&full_uri);
79
80 if let Some(ref ct) = self.content_type {
81 req = req.header(
82 http::HeaderName::from_static("content-type"),
83 http::HeaderValue::from_str(ct).map_err(|e| OssError {
84 kind: OssErrorKind::ValidationError,
85 context: Box::new(ErrorContext {
86 operation: Some("set content-type header".into()),
87 bucket: Some(self.bucket.to_string()),
88 object_key: Some(self.key.to_string()),
89 ..Default::default()
90 }),
91 source: Some(Box::new(e)),
92 })?,
93 );
94 }
95
96 if let Some(ref md5) = self.content_md5 {
97 req = req.header(
98 http::HeaderName::from_static("content-md5"),
99 http::HeaderValue::from_str(md5).map_err(|e| OssError {
100 kind: OssErrorKind::ValidationError,
101 context: Box::new(ErrorContext {
102 operation: Some("set content-md5 header".into()),
103 bucket: Some(self.bucket.to_string()),
104 object_key: Some(self.key.to_string()),
105 ..Default::default()
106 }),
107 source: Some(Box::new(e)),
108 })?,
109 );
110 }
111
112 if let Some(ref ce) = self.content_encoding {
113 req = req.header(
114 http::HeaderName::from_static("content-encoding"),
115 http::HeaderValue::from_str(ce).map_err(|e| OssError {
116 kind: OssErrorKind::ValidationError,
117 context: Box::new(ErrorContext {
118 operation: Some("set content-encoding header".into()),
119 bucket: Some(self.bucket.to_string()),
120 object_key: Some(self.key.to_string()),
121 ..Default::default()
122 }),
123 source: Some(Box::new(e)),
124 })?,
125 );
126 }
127
128 for (k, v) in &self.metadata {
129 let header_name = http::HeaderName::from_bytes(k.as_bytes()).map_err(|e| OssError {
130 kind: OssErrorKind::ValidationError,
131 context: Box::new(ErrorContext {
132 operation: Some(format!("set metadata header '{}'", k)),
133 bucket: Some(self.bucket.to_string()),
134 object_key: Some(self.key.to_string()),
135 ..Default::default()
136 }),
137 source: Some(Box::new(e)),
138 })?;
139 req = req.header(
140 header_name,
141 http::HeaderValue::from_str(v).map_err(|e| OssError {
142 kind: OssErrorKind::ValidationError,
143 context: Box::new(ErrorContext {
144 operation: Some(format!("set metadata header value '{}'", k)),
145 bucket: Some(self.bucket.to_string()),
146 object_key: Some(self.key.to_string()),
147 ..Default::default()
148 }),
149 source: Some(Box::new(e)),
150 })?,
151 );
152 }
153
154 let query_params = vec![
155 ("append".into(), "".into()),
156 ("position".into(), self.position.to_string()),
157 ];
158
159 let body_len = self.body.len();
160 let request = req.body(self.body).build();
161
162 let response = self
163 .client
164 .send_signed(request, Some(&self.bucket), query_params)
165 .await
166 .map_err(|e| OssError {
167 kind: OssErrorKind::TransportError,
168 context: Box::new(ErrorContext {
169 operation: Some("AppendObject".into()),
170 bucket: Some(self.bucket.to_string()),
171 object_key: Some(self.key.to_string()),
172 endpoint: Some(endpoint),
173 ..Default::default()
174 }),
175 source: Some(Box::new(e)),
176 })?;
177
178 if response.is_success() {
179 let request_id = response
180 .headers
181 .get("x-oss-request-id")
182 .and_then(|v| v.to_str().ok())
183 .unwrap_or("")
184 .to_string();
185
186 let next_position = response
187 .headers
188 .get("x-oss-next-append-position")
189 .and_then(|v| v.to_str().ok())
190 .and_then(|s| s.parse::<u64>().ok())
191 .unwrap_or(self.position + body_len as u64);
192
193 let hash_crc64 = response
194 .headers
195 .get("x-oss-hash-crc64ecma")
196 .and_then(|v| v.to_str().ok())
197 .map(|s| s.to_string());
198
199 Ok(AppendObjectOutput {
200 request_id,
201 next_position,
202 hash_crc64,
203 })
204 } else {
205 Err(OssError {
206 kind: OssErrorKind::ServiceError(Box::new(crate::error::OssServiceError {
207 status_code: response.status().as_u16(),
208 code: String::new(),
209 message: String::new(),
210 request_id: String::new(),
211 host_id: String::new(),
212 resource: Some(self.key.to_string()),
213 string_to_sign: None,
214 })),
215 context: Box::new(ErrorContext {
216 operation: Some("AppendObject".into()),
217 bucket: Some(self.bucket.to_string()),
218 object_key: Some(self.key.to_string()),
219 ..Default::default()
220 }),
221 source: None,
222 })
223 }
224 }
225}
226
227#[derive(Debug, Clone)]
228pub struct AppendObjectOutput {
229 pub request_id: String,
230 pub next_position: u64,
231 pub hash_crc64: Option<String>,
232}
233
234impl BucketOperations {
235 pub fn append_object(
236 &self,
237 key: impl Into<String>,
238 position: u64,
239 body: impl Into<bytes::Bytes>,
240 ) -> Result<AppendObjectBuilder> {
241 let object_key = ObjectKey::new(key.into())?;
242 Ok(AppendObjectBuilder::new(
243 self.client_inner().clone(),
244 self.bucket_name().clone(),
245 object_key,
246 position,
247 body,
248 ))
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use std::str::FromStr;
255 use std::sync::Mutex;
256
257 use http::HeaderMap;
258
259 use crate::client::OSSClientInner;
260 use crate::config::credentials::Credentials;
261 use crate::http::client::{HttpClient, HttpRequest, HttpResponse};
262 use crate::types::region::Region;
263
264 use super::*;
265
266 struct RecordingHttpClient {
267 requests: Arc<Mutex<Vec<HttpRequest>>>,
268 }
269
270 #[async_trait::async_trait]
271 impl HttpClient for RecordingHttpClient {
272 async fn send(&self, request: HttpRequest) -> crate::error::Result<HttpResponse> {
273 self.requests.lock().unwrap().push(request);
274 let mut headers = HeaderMap::new();
275 headers.insert(
276 "x-oss-request-id",
277 http::HeaderValue::from_static("rid-append"),
278 );
279 headers.insert(
280 "x-oss-next-append-position",
281 http::HeaderValue::from_static("11"),
282 );
283 Ok(HttpResponse {
284 status: http::StatusCode::OK,
285 headers,
286 body: bytes::Bytes::new(),
287 })
288 }
289 }
290
291 fn create_test_inner() -> (Arc<OSSClientInner>, Arc<Mutex<Vec<HttpRequest>>>) {
292 let requests = Arc::new(Mutex::new(Vec::new()));
293 let http = Arc::new(RecordingHttpClient {
294 requests: requests.clone(),
295 });
296 let credentials = Arc::new(crate::config::credentials::StaticCredentialsProvider::new(
297 Credentials::builder()
298 .access_key_id("test-ak")
299 .access_key_secret("test-sk")
300 .build()
301 .unwrap(),
302 ));
303 let inner = Arc::new(OSSClientInner {
304 http,
305 credentials,
306 signer: Arc::from(crate::signer::create_signer(crate::signer::SignVersion::V4)),
307 region: Region::CnHangzhou,
308 endpoint: "oss-cn-hangzhou.aliyuncs.com".into(),
309 });
310 (inner, requests)
311 }
312
313 #[tokio::test]
314 async fn append_object_first_position_zero() {
315 let (inner, requests) = create_test_inner();
316 let bucket = BucketName::new("test-bucket").unwrap();
317 let builder = AppendObjectBuilder::new(
318 inner,
319 bucket,
320 ObjectKey::new("append.txt").unwrap(),
321 0,
322 bytes::Bytes::from_static(b"hello world"),
323 );
324
325 let output = builder.send().await.unwrap();
326 assert_eq!(output.next_position, 11);
327
328 let captured = requests.lock().unwrap();
329 assert!(captured[0].uri.contains("?append&position=0"));
330 assert_eq!(captured[0].method, http::Method::POST);
331 }
332
333 #[tokio::test]
334 async fn append_object_returns_next_position() {
335 let (inner, _) = create_test_inner();
336 let bucket = BucketName::new("test-bucket").unwrap();
337 let builder = AppendObjectBuilder::new(
338 inner,
339 bucket,
340 ObjectKey::new("append.txt").unwrap(),
341 11,
342 bytes::Bytes::from_static(b" more data"),
343 );
344
345 let output = builder.send().await.unwrap();
346 assert_eq!(output.next_position, 11);
347 assert!(!output.request_id.is_empty());
348 }
349
350 #[tokio::test]
351 async fn append_object_with_content_type() {
352 let (inner, requests) = create_test_inner();
353 let bucket = BucketName::new("test-bucket").unwrap();
354 let builder = AppendObjectBuilder::new(
355 inner,
356 bucket,
357 ObjectKey::new("append.txt").unwrap(),
358 0,
359 bytes::Bytes::from_static(b"data"),
360 );
361
362 builder.content_type("text/plain").send().await.unwrap();
363
364 let captured = requests.lock().unwrap();
365 assert_eq!(
366 captured[0]
367 .headers
368 .get("content-type")
369 .unwrap()
370 .to_str()
371 .unwrap(),
372 "text/plain"
373 );
374 }
375
376 #[tokio::test]
377 #[ignore = "requires valid OSS credentials"]
378 async fn e2e_append_object() {
379 let ak = std::env::var("OSS_ACCESS_KEY_ID").expect("OSS_ACCESS_KEY_ID not set");
380 let sk = std::env::var("OSS_ACCESS_KEY_SECRET").expect("OSS_ACCESS_KEY_SECRET not set");
381 let region_str = std::env::var("OSS_REGION").unwrap_or_else(|_| "cn-wulanchabu".into());
382 let bucket_str = std::env::var("OSS_BUCKET").expect("OSS_BUCKET not set");
383
384 let region = Region::from_str(®ion_str).unwrap_or_else(|_| Region::Custom {
385 endpoint: format!("oss-{}.aliyuncs.com", region_str),
386 region_id: region_str.clone(),
387 });
388
389 let client = crate::client::OSSClient::builder()
390 .region(region)
391 .credentials(ak, sk)
392 .build()
393 .unwrap();
394
395 let key = format!("test-append-{}.txt", chrono::Utc::now().timestamp());
396
397 let output1 = client
398 .bucket(&bucket_str)
399 .unwrap()
400 .append_object(&key, 0, bytes::Bytes::from_static(b"hello "))
401 .unwrap()
402 .send()
403 .await
404 .unwrap();
405
406 assert_eq!(output1.next_position, 6);
407
408 let output2 = client
409 .bucket(&bucket_str)
410 .unwrap()
411 .append_object(
412 &key,
413 output1.next_position,
414 bytes::Bytes::from_static(b"world"),
415 )
416 .unwrap()
417 .send()
418 .await
419 .unwrap();
420
421 assert_eq!(output2.next_position, 11);
422
423 let get_output = client
424 .bucket(&bucket_str)
425 .unwrap()
426 .get_object(&key)
427 .unwrap()
428 .send()
429 .await
430 .unwrap();
431 assert_eq!(
432 get_output.body.as_ref(),
433 bytes::Bytes::from_static(b"hello world")
434 );
435
436 client
437 .bucket(&bucket_str)
438 .unwrap()
439 .delete_object(&key)
440 .unwrap()
441 .send()
442 .await
443 .unwrap();
444
445 eprintln!("APPEND E2E '{}' succeeded: 2 appends, total 11 bytes", key);
446 }
447}