1use super::*;
16
17impl<T> UploadObject<T>
18where
19 T: StreamingSource + Send + Sync + 'static,
20 T::Error: std::error::Error + Send + Sync + 'static,
21{
22 pub async fn send(self) -> crate::Result<Object> {
36 let upload_url = self.start_resumable_upload().await?;
37 if self.payload.lock().await.size_hint().0 > RESUMABLE_UPLOAD_QUANTUM as u64 {
40 return self
41 .upload_by_chunks(&upload_url, RESUMABLE_UPLOAD_QUANTUM)
42 .await;
43 }
44 let builder = self.upload_request(upload_url).await?;
45 let response = builder.send().await.map_err(Error::io)?;
46 if !response.status().is_success() {
47 return gaxi::http::to_http_error(response).await;
48 }
49 let response = response.json::<v1::Object>().await.map_err(Error::io)?;
50
51 Ok(Object::from(response))
52 }
53
54 async fn upload_by_chunks(&self, upload_url: &str, target_size: usize) -> Result<Object> {
55 let mut remainder = None;
56 let mut offset = 0_usize;
57 loop {
58 let NextChunk {
59 chunk,
60 size: chunk_size,
61 remainder: r,
62 } = self::next_chunk(&mut *self.payload.lock().await, remainder, target_size).await?;
63 let full_size = if chunk_size < target_size {
64 Some(offset + chunk_size)
65 } else {
66 None
67 };
68 let (builder, chunk_size) = self
69 .partial_upload_request(upload_url, offset, chunk, chunk_size, full_size)
70 .await?;
71 let response = builder.send().await.map_err(Error::io)?;
72 match self::partial_upload_handle_response(response, offset + chunk_size).await? {
73 PartialUpload::Finalized(o) => {
74 return Ok(*o);
75 }
76 PartialUpload::Partial {
77 persisted_size,
78 chunk_remainder,
79 } => {
80 offset = persisted_size;
81 assert_eq!(chunk_remainder, 0);
83 remainder = r;
84 }
85 }
86 }
87 }
88
89 async fn partial_upload_request(
90 &self,
91 upload_url: &str,
92 offset: usize,
93 chunk: VecDeque<bytes::Bytes>,
94 chunk_size: usize,
95 full_size: Option<usize>,
96 ) -> Result<(reqwest::RequestBuilder, usize)> {
97 let range = match (chunk_size, full_size) {
98 (0, Some(s)) => format!("bytes */{s}"),
99 (0, None) => format!("bytes */{offset}"),
100 (n, Some(s)) => format!("bytes {offset}-{}/{s}", offset + n - 1),
101 (n, None) => format!("bytes {offset}-{}/*", offset + n - 1),
102 };
103 let builder = self
104 .inner
105 .client
106 .request(reqwest::Method::PUT, upload_url)
107 .header("content-type", "application/octet-stream")
108 .header("Content-Range", range)
109 .header(
110 "x-goog-api-client",
111 reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
112 );
113
114 let builder = apply_customer_supplied_encryption_headers(builder, &self.params);
115 let builder = self.inner.apply_auth_headers(builder).await?;
116 let stream = unfold(Some(chunk), move |state| async move {
117 if let Some(mut payload) = state {
118 if let Some(next) = payload.pop_front() {
119 return Some((Ok::<bytes::Bytes, Error>(next), Some(payload)));
120 }
121 }
122 None
123 });
124 Ok((builder.body(reqwest::Body::wrap_stream(stream)), chunk_size))
125 }
126
127 async fn upload_request(self, upload_url: String) -> Result<reqwest::RequestBuilder> {
128 let mut payload = self.payload.lock().await;
129 let (chunk, chunk_size, full_size) = {
130 let mut chunk = VecDeque::new();
131 let mut size = 0_usize;
132 while let Some(b) = payload.next().await.transpose().map_err(Error::io)? {
133 size += b.len();
134 chunk.push_back(b);
135 }
136 (chunk, size, Some(size))
137 };
138 let (builder, _size) = self
139 .partial_upload_request(upload_url.as_str(), 0, chunk, chunk_size, full_size)
140 .await?;
141 Ok(builder)
142 }
143}
144
145async fn next_chunk<T>(
146 payload: &mut InsertPayload<T>,
147 remainder: Option<bytes::Bytes>,
148 target_size: usize,
149) -> Result<NextChunk>
150where
151 T: StreamingSource,
152{
153 let mut partial = VecDeque::new();
154 let mut size = 0;
155 let mut process_buffer = |mut b: bytes::Bytes| match b.len() {
156 n if size + n > target_size => {
157 let remainder = b.split_off(target_size - size);
158 size = target_size;
159 partial.push_back(b);
160 Some(Some(remainder))
161 }
162 n if size + n == target_size => {
163 size = target_size;
164 partial.push_back(b);
165 Some(None)
166 }
167 n => {
168 size += n;
169 partial.push_back(b);
170 None
171 }
172 };
173
174 if let Some(b) = remainder {
175 if let Some(p) = process_buffer(b) {
176 return Ok(NextChunk {
177 chunk: partial,
178 size,
179 remainder: p,
180 });
181 }
182 }
183
184 while let Some(b) = payload.next().await.transpose().map_err(Error::io)? {
185 if let Some(p) = process_buffer(b) {
186 return Ok(NextChunk {
187 chunk: partial,
188 size,
189 remainder: p,
190 });
191 }
192 }
193 Ok(NextChunk {
194 chunk: partial,
195 size,
196 remainder: None,
197 })
198}
199
200async fn partial_upload_handle_response(
201 response: reqwest::Response,
202 expected_offset: usize,
203) -> Result<PartialUpload> {
204 if response.status() == self::RESUME_INCOMPLETE {
205 return self::parse_range(response, expected_offset).await;
206 }
207 if !response.status().is_success() {
208 return gaxi::http::to_http_error(response).await;
209 }
210 let response = response.json::<v1::Object>().await.map_err(Error::io)?;
211 Ok(PartialUpload::Finalized(Box::new(Object::from(response))))
212}
213
214async fn parse_range(response: reqwest::Response, expected_offset: usize) -> Result<PartialUpload> {
215 let Some(end) = self::parse_range_end(response.headers()) else {
216 return gaxi::http::to_http_error(response).await;
217 };
218 let (persisted_size, chunk_remainder) = match (expected_offset, end) {
220 (o, 0) => (0, o),
221 (o, e) if o < e + 1 => panic!("more data persistent than sent {response:?}"),
222 (o, e) => (e + 1, o - e - 1),
223 };
224 Ok(PartialUpload::Partial {
225 persisted_size,
226 chunk_remainder,
227 })
228}
229
230fn parse_range_end(headers: &reqwest::header::HeaderMap) -> Option<usize> {
231 let Some(range) = headers.get("range") else {
232 return Some(0_usize);
234 };
235 let end = std::str::from_utf8(range.as_bytes().strip_prefix(b"bytes=0-")?).ok()?;
243 end.parse::<usize>().ok()
244}
245
246#[derive(Debug, PartialEq)]
247enum PartialUpload {
248 Finalized(Box<Object>),
249 Partial {
250 persisted_size: usize,
251 chunk_remainder: usize,
252 },
253}
254
255#[derive(Debug, PartialEq)]
257struct NextChunk {
258 chunk: VecDeque<bytes::Bytes>,
260 size: usize,
262 remainder: Option<bytes::Bytes>,
264}
265
266const RESUME_INCOMPLETE: reqwest::StatusCode = reqwest::StatusCode::PERMANENT_REDIRECT;
267const RESUMABLE_UPLOAD_QUANTUM: usize = 256 * 1024;
270
271#[cfg(test)]
272mod tests {
273 use super::super::client::tests::{create_key_helper, test_builder, test_inner_client};
274 use super::*;
275 use crate::upload_source::tests::VecStream;
276 use httptest::{Expectation, Server, matchers::*, responders::status_code};
277 use serde_json::json;
278 use test_case::test_case;
279
280 type Result = anyhow::Result<()>;
281
282 const SESSION: &str = "https://private.googleapis.com/test-only-session-123";
283
284 #[tokio::test]
285 async fn upload_object_buffered_normal() -> Result {
286 let payload = serde_json::json!({
287 "name": "test-object",
288 "bucket": "test-bucket",
289 "metadata": {
290 "is-test-object": "true",
291 }
292 })
293 .to_string();
294 let server = Server::run();
295 let session = server.url("/upload/session/test-only-001");
296 let path = session.path().to_string();
297 server.expect(
298 Expectation::matching(all_of![
299 request::method_path("POST", "//upload/storage/v1/b/test-bucket/o"),
300 request::query(url_decoded(contains(("name", "test-object")))),
301 request::query(url_decoded(contains(("uploadType", "resumable")))),
302 ])
303 .respond_with(
304 status_code(200)
305 .append_header("location", session.to_string())
306 .body(""),
307 ),
308 );
309
310 server.expect(
311 Expectation::matching(all_of![
312 request::method_path("PUT", path),
313 request::headers(contains(("content-range", "bytes */0")))
314 ])
315 .respond_with(
316 status_code(200)
317 .append_header("content-type", "application/json")
318 .body(payload),
319 ),
320 );
321
322 let endpoint = server.url("");
323 let client = Storage::builder()
324 .with_endpoint(endpoint.to_string())
325 .with_credentials(auth::credentials::testing::test_credentials())
326 .build()
327 .await?;
328 let response = client
329 .upload_object("projects/_/buckets/test-bucket", "test-object", "")
330 .send()
331 .await?;
332 assert_eq!(response.name, "test-object");
333 assert_eq!(response.bucket, "projects/_/buckets/test-bucket");
334 assert_eq!(
335 response.metadata.get("is-test-object").map(String::as_str),
336 Some("true")
337 );
338
339 Ok(())
340 }
341
342 #[tokio::test]
343 async fn upload_object_buffered_not_found() -> Result {
344 let server = Server::run();
345 server.expect(
346 Expectation::matching(all_of![
347 request::method_path("POST", "//upload/storage/v1/b/test-bucket/o"),
348 request::query(url_decoded(contains(("name", "test-object")))),
349 request::query(url_decoded(contains(("uploadType", "resumable")))),
350 ])
351 .respond_with(status_code(404).body("NOT FOUND")),
352 );
353
354 let endpoint = server.url("");
355 let client = Storage::builder()
356 .with_endpoint(endpoint.to_string())
357 .with_credentials(auth::credentials::testing::test_credentials())
358 .build()
359 .await?;
360 let err = client
361 .upload_object("projects/_/buckets/test-bucket", "test-object", "")
362 .send()
363 .await
364 .expect_err("expected a not found error");
365 assert_eq!(err.http_status_code(), Some(404), "{err:?}");
366
367 Ok(())
368 }
369
370 #[test_case("projects/p", "projects%2Fp")]
371 #[test_case("kebab-case", "kebab-case")]
372 #[test_case("dot.name", "dot.name")]
373 #[test_case("under_score", "under_score")]
374 #[test_case("tilde~123", "tilde~123")]
375 #[test_case("exclamation!point!", "exclamation%21point%21")]
376 #[test_case("spaces spaces", "spaces%20%20%20spaces")]
377 #[test_case("preserve%percent%21", "preserve%percent%21")]
378 #[test_case(
379 "testall !#$&'()*+,/:;=?@[]",
380 "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
381 )]
382 #[tokio::test]
383 async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
384 let inner = test_inner_client(test_builder());
385 let request = UploadObject::new(inner, "projects/_/buckets/bucket", name, "hello")
386 .start_resumable_upload_request()
387 .await?
388 .build()?;
389
390 let got = request
391 .url()
392 .query_pairs()
393 .find_map(|(key, val)| match key.to_string().as_str() {
394 "name" => Some(val.to_string()),
395 _ => None,
396 })
397 .unwrap();
398 assert_eq!(got, want);
399 Ok(())
400 }
401
402 #[tokio::test]
403 async fn handle_start_resumable_upload_response() -> Result {
404 let response = http::Response::builder()
405 .header(
406 "Location",
407 "http://private.googleapis.com/test-only/session-123",
408 )
409 .body(Vec::new())?;
410 let response = reqwest::Response::from(response);
411 let url = super::handle_start_resumable_upload_response(response).await?;
412 assert_eq!(url, "http://private.googleapis.com/test-only/session-123");
413 Ok(())
414 }
415
416 #[tokio::test]
417 async fn upload_request() -> Result {
418 use reqwest::header::HeaderValue;
419
420 let inner = test_inner_client(test_builder());
421 let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
422 .upload_request(SESSION.to_string())
423 .await?
424 .build()?;
425
426 assert_eq!(request.method(), reqwest::Method::PUT);
427 assert_eq!(request.url().as_str(), SESSION);
428 assert_eq!(
429 request.headers().get("content-range"),
430 Some(&HeaderValue::from_static("bytes 0-4/5"))
431 );
432 let body = request.body_mut().take().unwrap();
433 let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
434 assert_eq!(contents, "hello");
435 Ok(())
436 }
437
438 #[tokio::test]
439 async fn upload_object_buffered_stream() -> Result {
440 let stream = VecStream::new(
441 [
442 "the ", "quick ", "brown ", "fox ", "jumps ", "over ", "the ", "lazy ", "dog",
443 ]
444 .map(|x| bytes::Bytes::from_static(x.as_bytes()))
445 .to_vec(),
446 );
447 let inner = test_inner_client(test_builder());
448 let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", stream)
449 .upload_request(SESSION.to_string())
450 .await?
451 .build()?;
452
453 assert_eq!(request.method(), reqwest::Method::PUT);
454 assert_eq!(request.url().as_str(), SESSION);
455 let body = request.body_mut().take().unwrap();
456 let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
457 assert_eq!(contents, "the quick brown fox jumps over the lazy dog");
458 Ok(())
459 }
460
461 #[tokio::test]
462 async fn upload_request_headers() -> Result {
463 let (key, key_base64, _, key_sha256_base64) = create_key_helper();
465
466 let inner = test_inner_client(test_builder());
467 let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
468 .with_key(KeyAes256::new(&key)?)
469 .upload_request(SESSION.to_string())
470 .await?
471 .build()?;
472
473 assert_eq!(request.method(), reqwest::Method::PUT);
474 assert_eq!(request.url().as_str(), SESSION);
475
476 let want = vec![
477 ("x-goog-encryption-algorithm", "AES256".to_string()),
478 ("x-goog-encryption-key", key_base64),
479 ("x-goog-encryption-key-sha256", key_sha256_base64),
480 ];
481
482 for (name, value) in want {
483 assert_eq!(
484 request.headers().get(name).unwrap().as_bytes(),
485 bytes::Bytes::from(value)
486 );
487 }
488 Ok(())
489 }
490
491 fn new_line_string(i: i32, len: usize) -> String {
492 format!("{i:022} {:width$}\n", "", width = len - 22 - 2)
493 }
494
495 fn new_line(i: i32, len: usize) -> bytes::Bytes {
496 bytes::Bytes::from_owner(new_line_string(i, len))
497 }
498
499 #[tokio::test]
500 async fn upload_by_chunks() -> Result {
501 const LEN: usize = 32;
502
503 let payload = serde_json::json!({
504 "name": "test-object",
505 "bucket": "test-bucket",
506 "metadata": {
507 "is-test-object": "true",
508 }
509 })
510 .to_string();
511
512 let chunk0 = new_line_string(0, LEN) + &new_line_string(1, LEN);
513 let chunk1 = new_line_string(2, LEN) + &new_line_string(3, LEN);
514 let chunk2 = new_line_string(4, LEN);
515
516 let server = Server::run();
517 let session = server.url("/upload/session/test-only-001");
518 let path = session.path().to_string();
519 server.expect(
520 Expectation::matching(all_of![
521 request::method_path("PUT", path.clone()),
522 request::headers(contains(("content-range", "bytes 0-63/*"))),
523 request::body(chunk0.clone()),
524 ])
525 .respond_with(status_code(308).append_header("range", "bytes=0-63")),
526 );
527
528 server.expect(
529 Expectation::matching(all_of![
530 request::method_path("PUT", path.clone()),
531 request::headers(contains(("content-range", "bytes 64-127/*"))),
532 request::body(chunk1.clone()),
533 ])
534 .respond_with(status_code(308).append_header("range", "bytes=0-127")),
535 );
536
537 server.expect(
538 Expectation::matching(all_of![
539 request::method_path("PUT", path.clone()),
540 request::headers(contains(("content-range", "bytes 128-159/160"))),
541 request::body(chunk2.clone()),
542 ])
543 .respond_with(status_code(200).body(payload.clone())),
544 );
545
546 let stream = VecStream::new((0..5).map(|i| new_line(i, LEN)).collect::<Vec<_>>());
547
548 let inner = test_inner_client(test_builder());
549 let upload = UploadObject::new(inner, "projects/_/buckets/bucket", "object", stream);
550 let response = upload
551 .upload_by_chunks(session.to_string().as_str(), 2 * LEN)
552 .await?;
553 assert_eq!(response.name, "test-object");
554 assert_eq!(response.bucket, "projects/_/buckets/test-bucket");
555 assert_eq!(
556 response.metadata.get("is-test-object").map(String::as_str),
557 Some("true")
558 );
559
560 Ok(())
561 }
562
563 #[tokio::test]
564 async fn partial_upload_request_empty() -> Result {
565 use reqwest::header::HeaderValue;
566 let inner = test_inner_client(test_builder());
567 let upload = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "");
568
569 let chunk = VecDeque::new();
570 let (builder, size) = upload
571 .partial_upload_request("http://localhost/chunk-0", 0_usize, chunk, 0_usize, None)
572 .await?;
573 assert_eq!(size, 0);
574 let mut request = builder.build()?;
575
576 assert_eq!(
577 request.headers().get("content-type"),
578 Some(&HeaderValue::from_static("application/octet-stream"))
579 );
580 assert_eq!(
581 request.headers().get("content-range"),
582 Some(&HeaderValue::from_static("bytes */0"))
583 );
584 assert!(
585 request.headers().get("x-goog-api-client").is_some(),
586 "{request:?}"
587 );
588 let body = request.body_mut().take().unwrap();
589 let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
590 assert!(&contents.is_empty(), "{contents:?}");
591 Ok(())
592 }
593
594 #[tokio::test]
595 async fn partial_upload_request_chunk0() -> Result {
596 use reqwest::header::HeaderValue;
597 const LEN: usize = 32;
598 let inner = test_inner_client(test_builder());
599 let upload = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "");
600
601 let chunk = VecDeque::from_iter([new_line(0, LEN), new_line(1, LEN)]);
602 let expected = chunk.iter().fold(Vec::new(), |mut a, b| {
603 a.extend_from_slice(b);
604 a
605 });
606 let (builder, size) = upload
607 .partial_upload_request("http://localhost/chunk-0", 0_usize, chunk, 2 * LEN, None)
608 .await?;
609 assert_eq!(size, 2 * LEN);
610 let mut request = builder.build()?;
611
612 assert_eq!(
613 request.headers().get("content-type"),
614 Some(&HeaderValue::from_static("application/octet-stream"))
615 );
616 assert_eq!(
617 request.headers().get("content-range"),
618 Some(&HeaderValue::from_static("bytes 0-63/*"))
619 );
620 assert!(
621 request.headers().get("x-goog-api-client").is_some(),
622 "{request:?}"
623 );
624 let body = request.body_mut().take().unwrap();
625 let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
626 assert_eq!(&contents, &expected);
627 Ok(())
628 }
629
630 #[tokio::test]
631 async fn partial_upload_request_chunk1() -> Result {
632 use reqwest::header::HeaderValue;
633 const LEN: usize = 32;
634 let inner = test_inner_client(test_builder());
635 let upload = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "");
636
637 let chunk = VecDeque::from_iter([new_line(2, LEN), new_line(3, LEN)]);
638 let expected = chunk.iter().fold(Vec::new(), |mut a, b| {
639 a.extend_from_slice(b);
640 a
641 });
642 let (builder, size) = upload
643 .partial_upload_request("http://localhost/chunk-1", 2 * LEN, chunk, 2 * LEN, None)
644 .await?;
645 assert_eq!(size, 2 * LEN);
646 let mut request = builder.build()?;
647
648 assert_eq!(
649 request.headers().get("content-type"),
650 Some(&HeaderValue::from_static("application/octet-stream"))
651 );
652 assert_eq!(
653 request.headers().get("content-range"),
654 Some(&HeaderValue::from_static("bytes 64-127/*"))
655 );
656 assert!(
657 request.headers().get("x-goog-api-client").is_some(),
658 "{request:?}"
659 );
660 let body = request.body_mut().take().unwrap();
661 let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
662 assert_eq!(&contents, &expected);
663 Ok(())
664 }
665
666 #[tokio::test]
667 async fn partial_upload_request_chunk_finalize() -> Result {
668 use reqwest::header::HeaderValue;
669 const LEN: usize = 32;
670 let inner = test_inner_client(test_builder());
671 let upload = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "");
672
673 let chunk = VecDeque::from_iter([new_line(2, LEN)]);
674 let expected = chunk.iter().fold(Vec::new(), |mut a, b| {
675 a.extend_from_slice(b);
676 a
677 });
678 let (builder, size) = upload
679 .partial_upload_request(
680 "http://localhost/chunk-finalize",
681 4 * LEN,
682 chunk,
683 LEN,
684 Some(5 * LEN),
685 )
686 .await?;
687 assert_eq!(size, LEN);
688 let mut request = builder.build()?;
689
690 assert_eq!(
691 request.headers().get("content-type"),
692 Some(&HeaderValue::from_static("application/octet-stream"))
693 );
694 assert_eq!(
695 request.headers().get("content-range"),
696 Some(&HeaderValue::from_static("bytes 128-159/160"))
697 );
698 assert!(
699 request.headers().get("x-goog-api-client").is_some(),
700 "{request:?}"
701 );
702 let body = request.body_mut().take().unwrap();
703 let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
704 assert_eq!(&contents, &expected);
705 Ok(())
706 }
707
708 #[tokio::test]
709 async fn next_chunk_success() -> Result {
710 const LEN: usize = 32;
711 let stream = VecStream::new((0..5).map(|i| new_line(i, LEN)).collect::<Vec<_>>());
712 let mut payload = InsertPayload::from(stream);
713
714 let NextChunk {
715 chunk,
716 size,
717 remainder,
718 } = super::next_chunk(&mut payload, None, LEN * 2).await?;
719 assert!(remainder.is_none(), "{remainder:?}");
720 assert_eq!(chunk, vec![new_line(0, LEN), new_line(1, LEN)]);
721 assert_eq!(size, 2 * LEN);
722
723 let NextChunk {
724 chunk,
725 size,
726 remainder,
727 } = super::next_chunk(&mut payload, remainder, LEN * 2).await?;
728 assert!(remainder.is_none(), "{remainder:?}");
729 assert_eq!(chunk, vec![new_line(2, LEN), new_line(3, LEN)]);
730 assert_eq!(size, 2 * LEN);
731
732 let NextChunk {
733 chunk,
734 size,
735 remainder,
736 } = super::next_chunk(&mut payload, remainder, LEN * 2).await?;
737 assert!(remainder.is_none(), "{remainder:?}");
738 assert_eq!(chunk, vec![new_line(4, LEN)]);
739 assert_eq!(size, LEN);
740
741 Ok(())
742 }
743
744 #[tokio::test]
745 async fn next_chunk_split() -> Result {
746 const LEN: usize = 32;
747 let stream = VecStream::new((0..5).map(|i| new_line(i, LEN)).collect::<Vec<_>>());
748 let mut payload = InsertPayload::from(stream);
749
750 let NextChunk {
751 chunk,
752 size,
753 remainder,
754 } = super::next_chunk(&mut payload, None, LEN * 2 + LEN / 2).await?;
755 assert_eq!(remainder, Some(new_line(2, LEN).split_off(LEN / 2)));
756 assert_eq!(
757 chunk,
758 vec![
759 new_line(0, LEN),
760 new_line(1, LEN),
761 new_line(2, LEN).split_to(LEN / 2)
762 ]
763 );
764 assert_eq!(size, 2 * LEN + LEN / 2);
765
766 let NextChunk {
767 chunk,
768 size,
769 remainder,
770 } = super::next_chunk(&mut payload, remainder, LEN * 2 + LEN / 2).await?;
771 assert!(remainder.is_none());
772 assert_eq!(
773 chunk,
774 vec![
775 new_line(2, LEN).split_off(LEN / 2),
776 new_line(3, LEN),
777 new_line(4, LEN)
778 ]
779 );
780 assert_eq!(size, 2 * LEN + LEN / 2);
781
782 Ok(())
783 }
784
785 #[tokio::test]
786 async fn next_chunk_split_large_remainder() -> Result {
787 const LEN: usize = 32;
788 let buffer = (0..3)
789 .map(|i| new_line_string(i, LEN))
790 .collect::<Vec<_>>()
791 .join("");
792 let stream = VecStream::new(vec![bytes::Bytes::from_owner(buffer), new_line(3, LEN)]);
793 let mut payload = InsertPayload::from(stream);
794
795 let remainder = None;
796 let NextChunk {
797 chunk,
798 size,
799 remainder,
800 } = super::next_chunk(&mut payload, remainder, LEN).await?;
801 assert_eq!(chunk, vec![new_line(0, LEN)]);
802 assert_eq!(size, LEN);
803
804 let NextChunk {
805 chunk,
806 size,
807 remainder,
808 } = super::next_chunk(&mut payload, remainder, LEN).await?;
809 assert!(remainder.is_some());
810 assert_eq!(chunk, vec![new_line(1, LEN)]);
811 assert_eq!(size, LEN);
812
813 let NextChunk {
814 chunk,
815 size,
816 remainder,
817 } = super::next_chunk(&mut payload, remainder, LEN).await?;
818 assert!(remainder.is_none());
819 assert_eq!(chunk, vec![new_line(2, LEN)]);
820 assert_eq!(size, LEN);
821
822 let NextChunk {
823 chunk,
824 size,
825 remainder,
826 } = super::next_chunk(&mut payload, remainder, LEN).await?;
827 assert!(remainder.is_none());
828 assert_eq!(chunk, vec![new_line(3, LEN)]);
829 assert_eq!(size, LEN);
830
831 Ok(())
832 }
833
834 #[tokio::test]
835 async fn next_chunk_join_remainder() -> Result {
836 const LEN: usize = 32;
837 let buffer = (0..3)
838 .map(|i| new_line_string(i, LEN))
839 .collect::<Vec<_>>()
840 .join("");
841 let stream = VecStream::new(vec![
842 bytes::Bytes::from_owner(buffer.clone()),
843 new_line(3, LEN),
844 ]);
845 let mut payload = InsertPayload::from(stream);
846
847 let remainder = None;
848 let NextChunk {
849 chunk,
850 size,
851 remainder,
852 } = super::next_chunk(&mut payload, remainder, 2 * LEN).await?;
853 assert!(remainder.is_some());
854 assert_eq!(
855 chunk,
856 vec![bytes::Bytes::from_owner(buffer.clone()).slice(0..(2 * LEN))]
857 );
858 assert_eq!(size, 2 * LEN);
859
860 let NextChunk {
861 chunk,
862 size,
863 remainder,
864 } = super::next_chunk(&mut payload, remainder, 2 * LEN).await?;
865 assert!(remainder.is_none());
866 assert_eq!(
867 chunk,
868 vec![
869 bytes::Bytes::from_owner(buffer.clone()).slice((2 * LEN)..),
870 new_line(3, LEN)
871 ]
872 );
873 assert_eq!(size, 2 * LEN);
874
875 Ok(())
876 }
877
878 #[tokio::test]
879 async fn next_chunk_done() -> Result {
880 const LEN: usize = 32;
881 let stream = VecStream::new((0..2).map(|i| new_line(i, LEN)).collect::<Vec<_>>());
882 let mut payload = InsertPayload::from(stream);
883
884 let NextChunk {
885 chunk,
886 size,
887 remainder,
888 } = super::next_chunk(&mut payload, None, LEN * 4).await?;
889 assert!(remainder.is_none(), "{remainder:?}");
890 assert_eq!(chunk, vec![new_line(0, LEN), new_line(1, LEN)]);
891 assert_eq!(size, 2 * LEN);
892
893 let NextChunk {
894 chunk,
895 size,
896 remainder,
897 } = super::next_chunk(&mut payload, remainder, LEN * 4).await?;
898 assert!(remainder.is_none(), "{remainder:?}");
899 assert!(chunk.is_empty(), "{chunk:?}");
900 assert_eq!(size, 0);
901
902 Ok(())
903 }
904
905 #[tokio::test]
906 async fn partial_handle_response_incomplete() -> Result {
907 let response = http::Response::builder()
908 .header("range", "bytes=0-999")
909 .status(RESUME_INCOMPLETE)
910 .body(Vec::new())?;
911 let response = reqwest::Response::from(response);
912 let partial = super::partial_upload_handle_response(response, 1000).await?;
913 assert_eq!(
914 partial,
915 PartialUpload::Partial {
916 persisted_size: 1000,
917 chunk_remainder: 0
918 }
919 );
920 Ok(())
921 }
922
923 #[tokio::test]
924 async fn partial_handle_response_err() -> Result {
925 let response = http::Response::builder()
926 .status(reqwest::StatusCode::NOT_FOUND)
927 .body(Vec::new())?;
928 let response = reqwest::Response::from(response);
929 let err = super::partial_upload_handle_response(response, 1000)
930 .await
931 .expect_err("NOT_FOUND should fail");
932 assert_eq!(err.http_status_code(), Some(404), "{err:?}");
933 Ok(())
934 }
935
936 #[tokio::test]
937 async fn partial_handle_response_finalized() -> Result {
938 let response = http::Response::builder()
939 .status(reqwest::StatusCode::OK)
940 .body(
941 json!({"bucket": "test-bucket", "name": "test-object", "size": "1000"}).to_string(),
942 )?;
943 let response = reqwest::Response::from(response);
944 let partial = super::partial_upload_handle_response(response, 1000).await?;
945 assert_eq!(
946 partial,
947 PartialUpload::Finalized(Box::new(
948 Object::new()
949 .set_name("test-object")
950 .set_bucket("projects/_/buckets/test-bucket")
951 .set_finalize_time(wkt::Timestamp::default())
952 .set_create_time(wkt::Timestamp::default())
953 .set_update_time(wkt::Timestamp::default())
954 .set_update_storage_class_time(wkt::Timestamp::default())
955 .set_size(1000_i64)
956 ))
957 );
958 Ok(())
959 }
960
961 #[tokio::test]
962 async fn parse_range_success() -> Result {
963 let response = http::Response::builder()
964 .header("range", "bytes=0-999")
965 .status(RESUME_INCOMPLETE)
966 .body(Vec::new())?;
967 let response = reqwest::Response::from(response);
968 let partial = super::parse_range(response, 1000).await?;
969 assert_eq!(
970 partial,
971 PartialUpload::Partial {
972 persisted_size: 1000,
973 chunk_remainder: 0
974 }
975 );
976 Ok(())
977 }
978
979 #[tokio::test]
980 async fn parse_range_partial() -> Result {
981 let response = http::Response::builder()
982 .header("range", "bytes=0-999")
983 .status(RESUME_INCOMPLETE)
984 .body(Vec::new())?;
985 let response = reqwest::Response::from(response);
986 let partial = super::parse_range(response, 1234).await?;
987 assert_eq!(
988 partial,
989 PartialUpload::Partial {
990 persisted_size: 1000,
991 chunk_remainder: 234
992 }
993 );
994 Ok(())
995 }
996
997 #[tokio::test]
998 #[should_panic]
999 async fn parse_range_bad_end() {
1000 let response = http::Response::builder()
1001 .header("range", "bytes=0-999")
1002 .status(RESUME_INCOMPLETE)
1003 .body(Vec::new())
1004 .unwrap();
1005 let response = reqwest::Response::from(response);
1006 let _ = super::parse_range(response, 500).await;
1007 }
1008
1009 #[tokio::test]
1010 async fn parse_range_missing_range() -> Result {
1011 let response = http::Response::builder()
1012 .status(RESUME_INCOMPLETE)
1013 .body(Vec::new())?;
1014 let response = reqwest::Response::from(response);
1015 let partial = super::parse_range(response, 1234).await?;
1016 assert_eq!(
1017 partial,
1018 PartialUpload::Partial {
1019 persisted_size: 0,
1020 chunk_remainder: 1234
1021 }
1022 );
1023 Ok(())
1024 }
1025
1026 #[tokio::test]
1027 async fn parse_range_invalid_range() -> Result {
1028 let response = http::Response::builder()
1029 .header("range", "bytes=100-999")
1030 .status(RESUME_INCOMPLETE)
1031 .body(Vec::new())?;
1032 let response = reqwest::Response::from(response);
1033 let err = super::parse_range(response, 1234)
1034 .await
1035 .expect_err("invalid range should create an error");
1036 assert!(err.http_status_code().is_some(), "{err:?}");
1037 Ok(())
1038 }
1039
1040 #[test_case(None, Some(0))]
1041 #[test_case(Some("bytes=0-12345"), Some(12345))]
1042 #[test_case(Some("bytes=0-1"), Some(1))]
1043 #[test_case(Some("bytes=0-0"), Some(0))]
1044 #[test_case(Some("bytes=1-12345"), None)]
1045 #[test_case(Some(""), None)]
1046 fn range_end(input: Option<&str>, want: Option<usize>) {
1047 use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
1048 let headers = HeaderMap::from_iter(input.into_iter().map(|s| {
1049 (
1050 HeaderName::from_static("range"),
1051 HeaderValue::from_str(s).unwrap(),
1052 )
1053 }));
1054 assert_eq!(super::parse_range_end(&headers), want, "{headers:?}");
1055 }
1056
1057 #[test]
1058 fn validate_status_code() {
1059 assert_eq!(RESUME_INCOMPLETE, 308);
1060 }
1061}