google_cloud_storage/storage/upload_object/
buffered.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17impl<T> UploadObject<T>
18where
19    T: StreamingSource + Send + Sync + 'static,
20    T::Error: std::error::Error + Send + Sync + 'static,
21{
22    /// Upload an object from a streaming source without rewinds.
23    ///
24    /// # Example
25    /// ```
26    /// # use google_cloud_storage::client::Storage;
27    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
28    /// let response = client
29    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
30    ///     .send()
31    ///     .await?;
32    /// println!("response details={response:?}");
33    /// # Ok(()) }
34    /// ```
35    pub async fn send(self) -> crate::Result<Object> {
36        let upload_url = self.start_resumable_upload().await?;
37        // TODO(#2043) - make the threshold to use resumable uploads and the
38        //    target size for each chunk configurable.
39        if self.payload.lock().await.size_hint().0 > RESUMABLE_UPLOAD_QUANTUM as u64 {
40            return self
41                .upload_by_chunks(&upload_url, RESUMABLE_UPLOAD_QUANTUM)
42                .await;
43        }
44        let builder = self.upload_request(upload_url).await?;
45        let response = builder.send().await.map_err(Error::io)?;
46        if !response.status().is_success() {
47            return gaxi::http::to_http_error(response).await;
48        }
49        let response = response.json::<v1::Object>().await.map_err(Error::io)?;
50
51        Ok(Object::from(response))
52    }
53
54    async fn upload_by_chunks(&self, upload_url: &str, target_size: usize) -> Result<Object> {
55        let mut remainder = None;
56        let mut offset = 0_usize;
57        loop {
58            let NextChunk {
59                chunk,
60                size: chunk_size,
61                remainder: r,
62            } = self::next_chunk(&mut *self.payload.lock().await, remainder, target_size).await?;
63            let full_size = if chunk_size < target_size {
64                Some(offset + chunk_size)
65            } else {
66                None
67            };
68            let (builder, chunk_size) = self
69                .partial_upload_request(upload_url, offset, chunk, chunk_size, full_size)
70                .await?;
71            let response = builder.send().await.map_err(Error::io)?;
72            match self::partial_upload_handle_response(response, offset + chunk_size).await? {
73                PartialUpload::Finalized(o) => {
74                    return Ok(*o);
75                }
76                PartialUpload::Partial {
77                    persisted_size,
78                    chunk_remainder,
79                } => {
80                    offset = persisted_size;
81                    // TODO(#2043) - handle partial uploads
82                    assert_eq!(chunk_remainder, 0);
83                    remainder = r;
84                }
85            }
86        }
87    }
88
89    async fn partial_upload_request(
90        &self,
91        upload_url: &str,
92        offset: usize,
93        chunk: VecDeque<bytes::Bytes>,
94        chunk_size: usize,
95        full_size: Option<usize>,
96    ) -> Result<(reqwest::RequestBuilder, usize)> {
97        let range = match (chunk_size, full_size) {
98            (0, Some(s)) => format!("bytes */{s}"),
99            (0, None) => format!("bytes */{offset}"),
100            (n, Some(s)) => format!("bytes {offset}-{}/{s}", offset + n - 1),
101            (n, None) => format!("bytes {offset}-{}/*", offset + n - 1),
102        };
103        let builder = self
104            .inner
105            .client
106            .request(reqwest::Method::PUT, upload_url)
107            .header("content-type", "application/octet-stream")
108            .header("Content-Range", range)
109            .header(
110                "x-goog-api-client",
111                reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
112            );
113
114        let builder = apply_customer_supplied_encryption_headers(builder, &self.params);
115        let builder = self.inner.apply_auth_headers(builder).await?;
116        let stream = unfold(Some(chunk), move |state| async move {
117            if let Some(mut payload) = state {
118                if let Some(next) = payload.pop_front() {
119                    return Some((Ok::<bytes::Bytes, Error>(next), Some(payload)));
120                }
121            }
122            None
123        });
124        Ok((builder.body(reqwest::Body::wrap_stream(stream)), chunk_size))
125    }
126
127    async fn upload_request(self, upload_url: String) -> Result<reqwest::RequestBuilder> {
128        let mut payload = self.payload.lock().await;
129        let (chunk, chunk_size, full_size) = {
130            let mut chunk = VecDeque::new();
131            let mut size = 0_usize;
132            while let Some(b) = payload.next().await.transpose().map_err(Error::io)? {
133                size += b.len();
134                chunk.push_back(b);
135            }
136            (chunk, size, Some(size))
137        };
138        let (builder, _size) = self
139            .partial_upload_request(upload_url.as_str(), 0, chunk, chunk_size, full_size)
140            .await?;
141        Ok(builder)
142    }
143}
144
145async fn next_chunk<T>(
146    payload: &mut InsertPayload<T>,
147    remainder: Option<bytes::Bytes>,
148    target_size: usize,
149) -> Result<NextChunk>
150where
151    T: StreamingSource,
152{
153    let mut partial = VecDeque::new();
154    let mut size = 0;
155    let mut process_buffer = |mut b: bytes::Bytes| match b.len() {
156        n if size + n > target_size => {
157            let remainder = b.split_off(target_size - size);
158            size = target_size;
159            partial.push_back(b);
160            Some(Some(remainder))
161        }
162        n if size + n == target_size => {
163            size = target_size;
164            partial.push_back(b);
165            Some(None)
166        }
167        n => {
168            size += n;
169            partial.push_back(b);
170            None
171        }
172    };
173
174    if let Some(b) = remainder {
175        if let Some(p) = process_buffer(b) {
176            return Ok(NextChunk {
177                chunk: partial,
178                size,
179                remainder: p,
180            });
181        }
182    }
183
184    while let Some(b) = payload.next().await.transpose().map_err(Error::io)? {
185        if let Some(p) = process_buffer(b) {
186            return Ok(NextChunk {
187                chunk: partial,
188                size,
189                remainder: p,
190            });
191        }
192    }
193    Ok(NextChunk {
194        chunk: partial,
195        size,
196        remainder: None,
197    })
198}
199
200async fn partial_upload_handle_response(
201    response: reqwest::Response,
202    expected_offset: usize,
203) -> Result<PartialUpload> {
204    if response.status() == self::RESUME_INCOMPLETE {
205        return self::parse_range(response, expected_offset).await;
206    }
207    if !response.status().is_success() {
208        return gaxi::http::to_http_error(response).await;
209    }
210    let response = response.json::<v1::Object>().await.map_err(Error::io)?;
211    Ok(PartialUpload::Finalized(Box::new(Object::from(response))))
212}
213
214async fn parse_range(response: reqwest::Response, expected_offset: usize) -> Result<PartialUpload> {
215    let Some(end) = self::parse_range_end(response.headers()) else {
216        return gaxi::http::to_http_error(response).await;
217    };
218    // The `Range` header returns an inclusive range, i.e. bytes=0-999 means "1000 bytes".
219    let (persisted_size, chunk_remainder) = match (expected_offset, end) {
220        (o, 0) => (0, o),
221        (o, e) if o < e + 1 => panic!("more data persistent than sent {response:?}"),
222        (o, e) => (e + 1, o - e - 1),
223    };
224    Ok(PartialUpload::Partial {
225        persisted_size,
226        chunk_remainder,
227    })
228}
229
230fn parse_range_end(headers: &reqwest::header::HeaderMap) -> Option<usize> {
231    let Some(range) = headers.get("range") else {
232        // A missing `Range:` header indicates that no bytes are persisted.
233        return Some(0_usize);
234    };
235    // Uploads must be sequential, so the persisted range (if present) always
236    // starts at zero. This is poorly documented, but can be inferred from
237    //   https://cloud.google.com/storage/docs/performing-resumable-uploads#resume-upload
238    // which requires uploads to continue from the last byte persisted. It is
239    // better documented in the gRPC version, where holes are explicitly
240    // forbidden:
241    //   https://github.com/googleapis/googleapis/blob/302273adb3293bb504ecd83be8e1467511d5c779/google/storage/v2/storage.proto#L1253-L1255
242    let end = std::str::from_utf8(range.as_bytes().strip_prefix(b"bytes=0-")?).ok()?;
243    end.parse::<usize>().ok()
244}
245
246#[derive(Debug, PartialEq)]
247enum PartialUpload {
248    Finalized(Box<Object>),
249    Partial {
250        persisted_size: usize,
251        chunk_remainder: usize,
252    },
253}
254
255/// The result of breaking the source data into a fixed sized chunk.
256#[derive(Debug, PartialEq)]
257struct NextChunk {
258    /// The data for this chunk.
259    chunk: VecDeque<bytes::Bytes>,
260    /// The total number of bytes in `chunk`.
261    size: usize,
262    // Any data received from the source that did not fit in the chunk.
263    remainder: Option<bytes::Bytes>,
264}
265
266const RESUME_INCOMPLETE: reqwest::StatusCode = reqwest::StatusCode::PERMANENT_REDIRECT;
267// Resumable uploads chunks (except for the last chunk) *must* be sized to a
268// multiple of 256 KiB.
269const RESUMABLE_UPLOAD_QUANTUM: usize = 256 * 1024;
270
271#[cfg(test)]
272mod tests {
273    use super::super::client::tests::{create_key_helper, test_builder, test_inner_client};
274    use super::*;
275    use crate::upload_source::tests::VecStream;
276    use httptest::{Expectation, Server, matchers::*, responders::status_code};
277    use serde_json::json;
278    use test_case::test_case;
279
280    type Result = anyhow::Result<()>;
281
282    const SESSION: &str = "https://private.googleapis.com/test-only-session-123";
283
284    #[tokio::test]
285    async fn upload_object_buffered_normal() -> Result {
286        let payload = serde_json::json!({
287            "name": "test-object",
288            "bucket": "test-bucket",
289            "metadata": {
290                "is-test-object": "true",
291            }
292        })
293        .to_string();
294        let server = Server::run();
295        let session = server.url("/upload/session/test-only-001");
296        let path = session.path().to_string();
297        server.expect(
298            Expectation::matching(all_of![
299                request::method_path("POST", "//upload/storage/v1/b/test-bucket/o"),
300                request::query(url_decoded(contains(("name", "test-object")))),
301                request::query(url_decoded(contains(("uploadType", "resumable")))),
302            ])
303            .respond_with(
304                status_code(200)
305                    .append_header("location", session.to_string())
306                    .body(""),
307            ),
308        );
309
310        server.expect(
311            Expectation::matching(all_of![
312                request::method_path("PUT", path),
313                request::headers(contains(("content-range", "bytes */0")))
314            ])
315            .respond_with(
316                status_code(200)
317                    .append_header("content-type", "application/json")
318                    .body(payload),
319            ),
320        );
321
322        let endpoint = server.url("");
323        let client = Storage::builder()
324            .with_endpoint(endpoint.to_string())
325            .with_credentials(auth::credentials::testing::test_credentials())
326            .build()
327            .await?;
328        let response = client
329            .upload_object("projects/_/buckets/test-bucket", "test-object", "")
330            .send()
331            .await?;
332        assert_eq!(response.name, "test-object");
333        assert_eq!(response.bucket, "projects/_/buckets/test-bucket");
334        assert_eq!(
335            response.metadata.get("is-test-object").map(String::as_str),
336            Some("true")
337        );
338
339        Ok(())
340    }
341
342    #[tokio::test]
343    async fn upload_object_buffered_not_found() -> Result {
344        let server = Server::run();
345        server.expect(
346            Expectation::matching(all_of![
347                request::method_path("POST", "//upload/storage/v1/b/test-bucket/o"),
348                request::query(url_decoded(contains(("name", "test-object")))),
349                request::query(url_decoded(contains(("uploadType", "resumable")))),
350            ])
351            .respond_with(status_code(404).body("NOT FOUND")),
352        );
353
354        let endpoint = server.url("");
355        let client = Storage::builder()
356            .with_endpoint(endpoint.to_string())
357            .with_credentials(auth::credentials::testing::test_credentials())
358            .build()
359            .await?;
360        let err = client
361            .upload_object("projects/_/buckets/test-bucket", "test-object", "")
362            .send()
363            .await
364            .expect_err("expected a not found error");
365        assert_eq!(err.http_status_code(), Some(404), "{err:?}");
366
367        Ok(())
368    }
369
370    #[test_case("projects/p", "projects%2Fp")]
371    #[test_case("kebab-case", "kebab-case")]
372    #[test_case("dot.name", "dot.name")]
373    #[test_case("under_score", "under_score")]
374    #[test_case("tilde~123", "tilde~123")]
375    #[test_case("exclamation!point!", "exclamation%21point%21")]
376    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
377    #[test_case("preserve%percent%21", "preserve%percent%21")]
378    #[test_case(
379        "testall !#$&'()*+,/:;=?@[]",
380        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
381    )]
382    #[tokio::test]
383    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
384        let inner = test_inner_client(test_builder());
385        let request = UploadObject::new(inner, "projects/_/buckets/bucket", name, "hello")
386            .start_resumable_upload_request()
387            .await?
388            .build()?;
389
390        let got = request
391            .url()
392            .query_pairs()
393            .find_map(|(key, val)| match key.to_string().as_str() {
394                "name" => Some(val.to_string()),
395                _ => None,
396            })
397            .unwrap();
398        assert_eq!(got, want);
399        Ok(())
400    }
401
402    #[tokio::test]
403    async fn handle_start_resumable_upload_response() -> Result {
404        let response = http::Response::builder()
405            .header(
406                "Location",
407                "http://private.googleapis.com/test-only/session-123",
408            )
409            .body(Vec::new())?;
410        let response = reqwest::Response::from(response);
411        let url = super::handle_start_resumable_upload_response(response).await?;
412        assert_eq!(url, "http://private.googleapis.com/test-only/session-123");
413        Ok(())
414    }
415
416    #[tokio::test]
417    async fn upload_request() -> Result {
418        use reqwest::header::HeaderValue;
419
420        let inner = test_inner_client(test_builder());
421        let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
422            .upload_request(SESSION.to_string())
423            .await?
424            .build()?;
425
426        assert_eq!(request.method(), reqwest::Method::PUT);
427        assert_eq!(request.url().as_str(), SESSION);
428        assert_eq!(
429            request.headers().get("content-range"),
430            Some(&HeaderValue::from_static("bytes 0-4/5"))
431        );
432        let body = request.body_mut().take().unwrap();
433        let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
434        assert_eq!(contents, "hello");
435        Ok(())
436    }
437
438    #[tokio::test]
439    async fn upload_object_buffered_stream() -> Result {
440        let stream = VecStream::new(
441            [
442                "the ", "quick ", "brown ", "fox ", "jumps ", "over ", "the ", "lazy ", "dog",
443            ]
444            .map(|x| bytes::Bytes::from_static(x.as_bytes()))
445            .to_vec(),
446        );
447        let inner = test_inner_client(test_builder());
448        let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", stream)
449            .upload_request(SESSION.to_string())
450            .await?
451            .build()?;
452
453        assert_eq!(request.method(), reqwest::Method::PUT);
454        assert_eq!(request.url().as_str(), SESSION);
455        let body = request.body_mut().take().unwrap();
456        let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
457        assert_eq!(contents, "the quick brown fox jumps over the lazy dog");
458        Ok(())
459    }
460
461    #[tokio::test]
462    async fn upload_request_headers() -> Result {
463        // Make a 32-byte key.
464        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
465
466        let inner = test_inner_client(test_builder());
467        let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
468            .with_key(KeyAes256::new(&key)?)
469            .upload_request(SESSION.to_string())
470            .await?
471            .build()?;
472
473        assert_eq!(request.method(), reqwest::Method::PUT);
474        assert_eq!(request.url().as_str(), SESSION);
475
476        let want = vec![
477            ("x-goog-encryption-algorithm", "AES256".to_string()),
478            ("x-goog-encryption-key", key_base64),
479            ("x-goog-encryption-key-sha256", key_sha256_base64),
480        ];
481
482        for (name, value) in want {
483            assert_eq!(
484                request.headers().get(name).unwrap().as_bytes(),
485                bytes::Bytes::from(value)
486            );
487        }
488        Ok(())
489    }
490
491    fn new_line_string(i: i32, len: usize) -> String {
492        format!("{i:022} {:width$}\n", "", width = len - 22 - 2)
493    }
494
495    fn new_line(i: i32, len: usize) -> bytes::Bytes {
496        bytes::Bytes::from_owner(new_line_string(i, len))
497    }
498
499    #[tokio::test]
500    async fn upload_by_chunks() -> Result {
501        const LEN: usize = 32;
502
503        let payload = serde_json::json!({
504            "name": "test-object",
505            "bucket": "test-bucket",
506            "metadata": {
507                "is-test-object": "true",
508            }
509        })
510        .to_string();
511
512        let chunk0 = new_line_string(0, LEN) + &new_line_string(1, LEN);
513        let chunk1 = new_line_string(2, LEN) + &new_line_string(3, LEN);
514        let chunk2 = new_line_string(4, LEN);
515
516        let server = Server::run();
517        let session = server.url("/upload/session/test-only-001");
518        let path = session.path().to_string();
519        server.expect(
520            Expectation::matching(all_of![
521                request::method_path("PUT", path.clone()),
522                request::headers(contains(("content-range", "bytes 0-63/*"))),
523                request::body(chunk0.clone()),
524            ])
525            .respond_with(status_code(308).append_header("range", "bytes=0-63")),
526        );
527
528        server.expect(
529            Expectation::matching(all_of![
530                request::method_path("PUT", path.clone()),
531                request::headers(contains(("content-range", "bytes 64-127/*"))),
532                request::body(chunk1.clone()),
533            ])
534            .respond_with(status_code(308).append_header("range", "bytes=0-127")),
535        );
536
537        server.expect(
538            Expectation::matching(all_of![
539                request::method_path("PUT", path.clone()),
540                request::headers(contains(("content-range", "bytes 128-159/160"))),
541                request::body(chunk2.clone()),
542            ])
543            .respond_with(status_code(200).body(payload.clone())),
544        );
545
546        let stream = VecStream::new((0..5).map(|i| new_line(i, LEN)).collect::<Vec<_>>());
547
548        let inner = test_inner_client(test_builder());
549        let upload = UploadObject::new(inner, "projects/_/buckets/bucket", "object", stream);
550        let response = upload
551            .upload_by_chunks(session.to_string().as_str(), 2 * LEN)
552            .await?;
553        assert_eq!(response.name, "test-object");
554        assert_eq!(response.bucket, "projects/_/buckets/test-bucket");
555        assert_eq!(
556            response.metadata.get("is-test-object").map(String::as_str),
557            Some("true")
558        );
559
560        Ok(())
561    }
562
563    #[tokio::test]
564    async fn partial_upload_request_empty() -> Result {
565        use reqwest::header::HeaderValue;
566        let inner = test_inner_client(test_builder());
567        let upload = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "");
568
569        let chunk = VecDeque::new();
570        let (builder, size) = upload
571            .partial_upload_request("http://localhost/chunk-0", 0_usize, chunk, 0_usize, None)
572            .await?;
573        assert_eq!(size, 0);
574        let mut request = builder.build()?;
575
576        assert_eq!(
577            request.headers().get("content-type"),
578            Some(&HeaderValue::from_static("application/octet-stream"))
579        );
580        assert_eq!(
581            request.headers().get("content-range"),
582            Some(&HeaderValue::from_static("bytes */0"))
583        );
584        assert!(
585            request.headers().get("x-goog-api-client").is_some(),
586            "{request:?}"
587        );
588        let body = request.body_mut().take().unwrap();
589        let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
590        assert!(&contents.is_empty(), "{contents:?}");
591        Ok(())
592    }
593
594    #[tokio::test]
595    async fn partial_upload_request_chunk0() -> Result {
596        use reqwest::header::HeaderValue;
597        const LEN: usize = 32;
598        let inner = test_inner_client(test_builder());
599        let upload = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "");
600
601        let chunk = VecDeque::from_iter([new_line(0, LEN), new_line(1, LEN)]);
602        let expected = chunk.iter().fold(Vec::new(), |mut a, b| {
603            a.extend_from_slice(b);
604            a
605        });
606        let (builder, size) = upload
607            .partial_upload_request("http://localhost/chunk-0", 0_usize, chunk, 2 * LEN, None)
608            .await?;
609        assert_eq!(size, 2 * LEN);
610        let mut request = builder.build()?;
611
612        assert_eq!(
613            request.headers().get("content-type"),
614            Some(&HeaderValue::from_static("application/octet-stream"))
615        );
616        assert_eq!(
617            request.headers().get("content-range"),
618            Some(&HeaderValue::from_static("bytes 0-63/*"))
619        );
620        assert!(
621            request.headers().get("x-goog-api-client").is_some(),
622            "{request:?}"
623        );
624        let body = request.body_mut().take().unwrap();
625        let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
626        assert_eq!(&contents, &expected);
627        Ok(())
628    }
629
630    #[tokio::test]
631    async fn partial_upload_request_chunk1() -> Result {
632        use reqwest::header::HeaderValue;
633        const LEN: usize = 32;
634        let inner = test_inner_client(test_builder());
635        let upload = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "");
636
637        let chunk = VecDeque::from_iter([new_line(2, LEN), new_line(3, LEN)]);
638        let expected = chunk.iter().fold(Vec::new(), |mut a, b| {
639            a.extend_from_slice(b);
640            a
641        });
642        let (builder, size) = upload
643            .partial_upload_request("http://localhost/chunk-1", 2 * LEN, chunk, 2 * LEN, None)
644            .await?;
645        assert_eq!(size, 2 * LEN);
646        let mut request = builder.build()?;
647
648        assert_eq!(
649            request.headers().get("content-type"),
650            Some(&HeaderValue::from_static("application/octet-stream"))
651        );
652        assert_eq!(
653            request.headers().get("content-range"),
654            Some(&HeaderValue::from_static("bytes 64-127/*"))
655        );
656        assert!(
657            request.headers().get("x-goog-api-client").is_some(),
658            "{request:?}"
659        );
660        let body = request.body_mut().take().unwrap();
661        let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
662        assert_eq!(&contents, &expected);
663        Ok(())
664    }
665
666    #[tokio::test]
667    async fn partial_upload_request_chunk_finalize() -> Result {
668        use reqwest::header::HeaderValue;
669        const LEN: usize = 32;
670        let inner = test_inner_client(test_builder());
671        let upload = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "");
672
673        let chunk = VecDeque::from_iter([new_line(2, LEN)]);
674        let expected = chunk.iter().fold(Vec::new(), |mut a, b| {
675            a.extend_from_slice(b);
676            a
677        });
678        let (builder, size) = upload
679            .partial_upload_request(
680                "http://localhost/chunk-finalize",
681                4 * LEN,
682                chunk,
683                LEN,
684                Some(5 * LEN),
685            )
686            .await?;
687        assert_eq!(size, LEN);
688        let mut request = builder.build()?;
689
690        assert_eq!(
691            request.headers().get("content-type"),
692            Some(&HeaderValue::from_static("application/octet-stream"))
693        );
694        assert_eq!(
695            request.headers().get("content-range"),
696            Some(&HeaderValue::from_static("bytes 128-159/160"))
697        );
698        assert!(
699            request.headers().get("x-goog-api-client").is_some(),
700            "{request:?}"
701        );
702        let body = request.body_mut().take().unwrap();
703        let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
704        assert_eq!(&contents, &expected);
705        Ok(())
706    }
707
708    #[tokio::test]
709    async fn next_chunk_success() -> Result {
710        const LEN: usize = 32;
711        let stream = VecStream::new((0..5).map(|i| new_line(i, LEN)).collect::<Vec<_>>());
712        let mut payload = InsertPayload::from(stream);
713
714        let NextChunk {
715            chunk,
716            size,
717            remainder,
718        } = super::next_chunk(&mut payload, None, LEN * 2).await?;
719        assert!(remainder.is_none(), "{remainder:?}");
720        assert_eq!(chunk, vec![new_line(0, LEN), new_line(1, LEN)]);
721        assert_eq!(size, 2 * LEN);
722
723        let NextChunk {
724            chunk,
725            size,
726            remainder,
727        } = super::next_chunk(&mut payload, remainder, LEN * 2).await?;
728        assert!(remainder.is_none(), "{remainder:?}");
729        assert_eq!(chunk, vec![new_line(2, LEN), new_line(3, LEN)]);
730        assert_eq!(size, 2 * LEN);
731
732        let NextChunk {
733            chunk,
734            size,
735            remainder,
736        } = super::next_chunk(&mut payload, remainder, LEN * 2).await?;
737        assert!(remainder.is_none(), "{remainder:?}");
738        assert_eq!(chunk, vec![new_line(4, LEN)]);
739        assert_eq!(size, LEN);
740
741        Ok(())
742    }
743
744    #[tokio::test]
745    async fn next_chunk_split() -> Result {
746        const LEN: usize = 32;
747        let stream = VecStream::new((0..5).map(|i| new_line(i, LEN)).collect::<Vec<_>>());
748        let mut payload = InsertPayload::from(stream);
749
750        let NextChunk {
751            chunk,
752            size,
753            remainder,
754        } = super::next_chunk(&mut payload, None, LEN * 2 + LEN / 2).await?;
755        assert_eq!(remainder, Some(new_line(2, LEN).split_off(LEN / 2)));
756        assert_eq!(
757            chunk,
758            vec![
759                new_line(0, LEN),
760                new_line(1, LEN),
761                new_line(2, LEN).split_to(LEN / 2)
762            ]
763        );
764        assert_eq!(size, 2 * LEN + LEN / 2);
765
766        let NextChunk {
767            chunk,
768            size,
769            remainder,
770        } = super::next_chunk(&mut payload, remainder, LEN * 2 + LEN / 2).await?;
771        assert!(remainder.is_none());
772        assert_eq!(
773            chunk,
774            vec![
775                new_line(2, LEN).split_off(LEN / 2),
776                new_line(3, LEN),
777                new_line(4, LEN)
778            ]
779        );
780        assert_eq!(size, 2 * LEN + LEN / 2);
781
782        Ok(())
783    }
784
785    #[tokio::test]
786    async fn next_chunk_split_large_remainder() -> Result {
787        const LEN: usize = 32;
788        let buffer = (0..3)
789            .map(|i| new_line_string(i, LEN))
790            .collect::<Vec<_>>()
791            .join("");
792        let stream = VecStream::new(vec![bytes::Bytes::from_owner(buffer), new_line(3, LEN)]);
793        let mut payload = InsertPayload::from(stream);
794
795        let remainder = None;
796        let NextChunk {
797            chunk,
798            size,
799            remainder,
800        } = super::next_chunk(&mut payload, remainder, LEN).await?;
801        assert_eq!(chunk, vec![new_line(0, LEN)]);
802        assert_eq!(size, LEN);
803
804        let NextChunk {
805            chunk,
806            size,
807            remainder,
808        } = super::next_chunk(&mut payload, remainder, LEN).await?;
809        assert!(remainder.is_some());
810        assert_eq!(chunk, vec![new_line(1, LEN)]);
811        assert_eq!(size, LEN);
812
813        let NextChunk {
814            chunk,
815            size,
816            remainder,
817        } = super::next_chunk(&mut payload, remainder, LEN).await?;
818        assert!(remainder.is_none());
819        assert_eq!(chunk, vec![new_line(2, LEN)]);
820        assert_eq!(size, LEN);
821
822        let NextChunk {
823            chunk,
824            size,
825            remainder,
826        } = super::next_chunk(&mut payload, remainder, LEN).await?;
827        assert!(remainder.is_none());
828        assert_eq!(chunk, vec![new_line(3, LEN)]);
829        assert_eq!(size, LEN);
830
831        Ok(())
832    }
833
834    #[tokio::test]
835    async fn next_chunk_join_remainder() -> Result {
836        const LEN: usize = 32;
837        let buffer = (0..3)
838            .map(|i| new_line_string(i, LEN))
839            .collect::<Vec<_>>()
840            .join("");
841        let stream = VecStream::new(vec![
842            bytes::Bytes::from_owner(buffer.clone()),
843            new_line(3, LEN),
844        ]);
845        let mut payload = InsertPayload::from(stream);
846
847        let remainder = None;
848        let NextChunk {
849            chunk,
850            size,
851            remainder,
852        } = super::next_chunk(&mut payload, remainder, 2 * LEN).await?;
853        assert!(remainder.is_some());
854        assert_eq!(
855            chunk,
856            vec![bytes::Bytes::from_owner(buffer.clone()).slice(0..(2 * LEN))]
857        );
858        assert_eq!(size, 2 * LEN);
859
860        let NextChunk {
861            chunk,
862            size,
863            remainder,
864        } = super::next_chunk(&mut payload, remainder, 2 * LEN).await?;
865        assert!(remainder.is_none());
866        assert_eq!(
867            chunk,
868            vec![
869                bytes::Bytes::from_owner(buffer.clone()).slice((2 * LEN)..),
870                new_line(3, LEN)
871            ]
872        );
873        assert_eq!(size, 2 * LEN);
874
875        Ok(())
876    }
877
878    #[tokio::test]
879    async fn next_chunk_done() -> Result {
880        const LEN: usize = 32;
881        let stream = VecStream::new((0..2).map(|i| new_line(i, LEN)).collect::<Vec<_>>());
882        let mut payload = InsertPayload::from(stream);
883
884        let NextChunk {
885            chunk,
886            size,
887            remainder,
888        } = super::next_chunk(&mut payload, None, LEN * 4).await?;
889        assert!(remainder.is_none(), "{remainder:?}");
890        assert_eq!(chunk, vec![new_line(0, LEN), new_line(1, LEN)]);
891        assert_eq!(size, 2 * LEN);
892
893        let NextChunk {
894            chunk,
895            size,
896            remainder,
897        } = super::next_chunk(&mut payload, remainder, LEN * 4).await?;
898        assert!(remainder.is_none(), "{remainder:?}");
899        assert!(chunk.is_empty(), "{chunk:?}");
900        assert_eq!(size, 0);
901
902        Ok(())
903    }
904
905    #[tokio::test]
906    async fn partial_handle_response_incomplete() -> Result {
907        let response = http::Response::builder()
908            .header("range", "bytes=0-999")
909            .status(RESUME_INCOMPLETE)
910            .body(Vec::new())?;
911        let response = reqwest::Response::from(response);
912        let partial = super::partial_upload_handle_response(response, 1000).await?;
913        assert_eq!(
914            partial,
915            PartialUpload::Partial {
916                persisted_size: 1000,
917                chunk_remainder: 0
918            }
919        );
920        Ok(())
921    }
922
923    #[tokio::test]
924    async fn partial_handle_response_err() -> Result {
925        let response = http::Response::builder()
926            .status(reqwest::StatusCode::NOT_FOUND)
927            .body(Vec::new())?;
928        let response = reqwest::Response::from(response);
929        let err = super::partial_upload_handle_response(response, 1000)
930            .await
931            .expect_err("NOT_FOUND should fail");
932        assert_eq!(err.http_status_code(), Some(404), "{err:?}");
933        Ok(())
934    }
935
936    #[tokio::test]
937    async fn partial_handle_response_finalized() -> Result {
938        let response = http::Response::builder()
939            .status(reqwest::StatusCode::OK)
940            .body(
941                json!({"bucket": "test-bucket", "name": "test-object", "size": "1000"}).to_string(),
942            )?;
943        let response = reqwest::Response::from(response);
944        let partial = super::partial_upload_handle_response(response, 1000).await?;
945        assert_eq!(
946            partial,
947            PartialUpload::Finalized(Box::new(
948                Object::new()
949                    .set_name("test-object")
950                    .set_bucket("projects/_/buckets/test-bucket")
951                    .set_finalize_time(wkt::Timestamp::default())
952                    .set_create_time(wkt::Timestamp::default())
953                    .set_update_time(wkt::Timestamp::default())
954                    .set_update_storage_class_time(wkt::Timestamp::default())
955                    .set_size(1000_i64)
956            ))
957        );
958        Ok(())
959    }
960
961    #[tokio::test]
962    async fn parse_range_success() -> Result {
963        let response = http::Response::builder()
964            .header("range", "bytes=0-999")
965            .status(RESUME_INCOMPLETE)
966            .body(Vec::new())?;
967        let response = reqwest::Response::from(response);
968        let partial = super::parse_range(response, 1000).await?;
969        assert_eq!(
970            partial,
971            PartialUpload::Partial {
972                persisted_size: 1000,
973                chunk_remainder: 0
974            }
975        );
976        Ok(())
977    }
978
979    #[tokio::test]
980    async fn parse_range_partial() -> Result {
981        let response = http::Response::builder()
982            .header("range", "bytes=0-999")
983            .status(RESUME_INCOMPLETE)
984            .body(Vec::new())?;
985        let response = reqwest::Response::from(response);
986        let partial = super::parse_range(response, 1234).await?;
987        assert_eq!(
988            partial,
989            PartialUpload::Partial {
990                persisted_size: 1000,
991                chunk_remainder: 234
992            }
993        );
994        Ok(())
995    }
996
997    #[tokio::test]
998    #[should_panic]
999    async fn parse_range_bad_end() {
1000        let response = http::Response::builder()
1001            .header("range", "bytes=0-999")
1002            .status(RESUME_INCOMPLETE)
1003            .body(Vec::new())
1004            .unwrap();
1005        let response = reqwest::Response::from(response);
1006        let _ = super::parse_range(response, 500).await;
1007    }
1008
1009    #[tokio::test]
1010    async fn parse_range_missing_range() -> Result {
1011        let response = http::Response::builder()
1012            .status(RESUME_INCOMPLETE)
1013            .body(Vec::new())?;
1014        let response = reqwest::Response::from(response);
1015        let partial = super::parse_range(response, 1234).await?;
1016        assert_eq!(
1017            partial,
1018            PartialUpload::Partial {
1019                persisted_size: 0,
1020                chunk_remainder: 1234
1021            }
1022        );
1023        Ok(())
1024    }
1025
1026    #[tokio::test]
1027    async fn parse_range_invalid_range() -> Result {
1028        let response = http::Response::builder()
1029            .header("range", "bytes=100-999")
1030            .status(RESUME_INCOMPLETE)
1031            .body(Vec::new())?;
1032        let response = reqwest::Response::from(response);
1033        let err = super::parse_range(response, 1234)
1034            .await
1035            .expect_err("invalid range should create an error");
1036        assert!(err.http_status_code().is_some(), "{err:?}");
1037        Ok(())
1038    }
1039
1040    #[test_case(None, Some(0))]
1041    #[test_case(Some("bytes=0-12345"), Some(12345))]
1042    #[test_case(Some("bytes=0-1"), Some(1))]
1043    #[test_case(Some("bytes=0-0"), Some(0))]
1044    #[test_case(Some("bytes=1-12345"), None)]
1045    #[test_case(Some(""), None)]
1046    fn range_end(input: Option<&str>, want: Option<usize>) {
1047        use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
1048        let headers = HeaderMap::from_iter(input.into_iter().map(|s| {
1049            (
1050                HeaderName::from_static("range"),
1051                HeaderValue::from_str(s).unwrap(),
1052            )
1053        }));
1054        assert_eq!(super::parse_range_end(&headers), want, "{headers:?}");
1055    }
1056
1057    #[test]
1058    fn validate_status_code() {
1059        assert_eq!(RESUME_INCOMPLETE, 308);
1060    }
1061}