google_cloud_storage/storage/
upload_source.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines upload data sources.
16
17use std::collections::VecDeque;
18
19/// The payload for object uploads via the [Storage][crate::client::Storage]
20/// client.
21///
22/// The storage client functions to upload new objects consume any type that can
23/// be converted to this type. That includes simple buffers, and any type
24/// implementing [StreamingSource].
25///
26/// # Example
27/// ```
28/// # tokio_test::block_on(async {
29/// # use google_cloud_storage::upload_source::Payload;
30/// use google_cloud_storage::upload_source::StreamingSource;
31/// let buffer : &[u8] = b"the quick brown fox jumps over the lazy dog";
32/// let mut size = 0_usize;
33/// let mut payload = Payload::from(bytes::Bytes::from_static(buffer));
34/// while let Some(bytes) = payload.next().await.transpose()? {
35///     size += bytes.len();
36/// }
37/// assert_eq!(size, buffer.len());
38/// # anyhow::Result::<()>::Ok(()) });
39/// ```
40pub struct Payload<T> {
41    payload: T,
42}
43
44impl<T> Payload<T>
45where
46    T: StreamingSource,
47{
48    pub fn from_stream(payload: T) -> Self {
49        Self { payload }
50    }
51}
52
53impl<T> StreamingSource for Payload<T>
54where
55    T: StreamingSource + Send + Sync,
56{
57    type Error = T::Error;
58
59    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
60        self.payload.next().await
61    }
62
63    async fn size_hint(&self) -> Result<(u64, Option<u64>), Self::Error> {
64        self.payload.size_hint().await
65    }
66}
67
68impl<T> Seek for Payload<T>
69where
70    T: Seek,
71{
72    type Error = T::Error;
73
74    fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send {
75        self.payload.seek(offset)
76    }
77}
78
79impl From<bytes::Bytes> for Payload<BytesSource> {
80    fn from(value: bytes::Bytes) -> Self {
81        let payload = BytesSource::new(value);
82        Self { payload }
83    }
84}
85
86impl From<&'static str> for Payload<BytesSource> {
87    fn from(value: &'static str) -> Self {
88        let b = bytes::Bytes::from_static(value.as_bytes());
89        Payload::from(b)
90    }
91}
92
93impl From<Vec<bytes::Bytes>> for Payload<IterSource> {
94    fn from(value: Vec<bytes::Bytes>) -> Self {
95        let payload = IterSource::new(value);
96        Self { payload }
97    }
98}
99
100impl<S> From<S> for Payload<S>
101where
102    S: StreamingSource,
103{
104    fn from(value: S) -> Self {
105        Self { payload: value }
106    }
107}
108
109/// Provides bytes for an upload from single-pass sources.
110pub trait StreamingSource {
111    /// The error type.
112    type Error: std::error::Error + Send + Sync + 'static;
113
114    /// Gets the next set of data to upload.
115    fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes, Self::Error>>> + Send;
116
117    /// An estimate of the upload size.
118    ///
119    /// Returns the expected size as a [min, max) range. Where `None` represents
120    /// an unknown limit for the upload.
121    ///
122    /// If the maximum size is known and sufficiently small, the client library
123    /// may be able to use a more efficient protocol for the upload.
124    fn size_hint(&self) -> impl Future<Output = Result<(u64, Option<u64>), Self::Error>> + Send {
125        std::future::ready(Ok((0_u64, None)))
126    }
127}
128
129/// Provides bytes for an upload from sources that support seek.
130///
131/// Implementations of this trait provide data for Google Cloud Storage uploads.
132/// The data may be received asynchronously, such as downloads from Google Cloud
133/// Storage, other remote storage systems, or the result of repeatable
134/// computations.
135pub trait Seek {
136    /// The error type.
137    type Error: std::error::Error + Send + Sync + 'static;
138
139    /// Resets the stream to start from `offset`.
140    ///
141    /// The client library automatically restarts uploads when the connection
142    /// is reset or there is some kind of partial failure. Resuming an upload
143    /// may require resetting the stream to an arbitrary point.
144    ///
145    /// The client library assumes that `seek(N)` followed by `next()` always
146    /// returns the same data.
147    fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send;
148}
149
150const READ_SIZE: usize = 256 * 1024;
151
152impl From<tokio::fs::File> for Payload<FileSource> {
153    fn from(value: tokio::fs::File) -> Self {
154        Self {
155            payload: FileSource::new(value),
156        }
157    }
158}
159
160/// Implements [StreamingSource] for a [tokio::fs::File].
161///
162/// # Example
163/// ```
164/// # use google_cloud_storage::client::Storage;
165/// # async fn sample(client: &Storage) -> anyhow::Result<()> {
166/// let payload = tokio::fs::File::open("my-data").await?;
167/// let response = client
168///     .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
169///     .send_unbuffered()
170///     .await?;
171/// println!("response details={response:?}");
172/// # Ok(()) }
173/// ```
174pub struct FileSource {
175    inner: tokio::fs::File,
176}
177
178impl FileSource {
179    fn new(inner: tokio::fs::File) -> Self {
180        Self { inner }
181    }
182}
183
184impl StreamingSource for FileSource {
185    type Error = std::io::Error;
186
187    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
188        let mut buffer = vec![0_u8; READ_SIZE];
189        match tokio::io::AsyncReadExt::read(&mut self.inner, &mut buffer).await {
190            Err(e) => Some(Err(e)),
191            Ok(0) => None,
192            Ok(n) => {
193                buffer.resize(n, 0_u8);
194                Some(Ok(bytes::Bytes::from_owner(buffer)))
195            }
196        }
197    }
198    async fn size_hint(&self) -> Result<(u64, Option<u64>), Self::Error> {
199        let m = self.inner.metadata().await?;
200        Ok((m.len(), Some(m.len())))
201    }
202}
203
204impl Seek for FileSource {
205    type Error = std::io::Error;
206
207    async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
208        use tokio::io::AsyncSeekExt;
209        let _ = self.inner.seek(std::io::SeekFrom::Start(offset)).await?;
210        Ok(())
211    }
212}
213
214/// Implements [StreamingSource] for [bytes::Bytes].
215///
216/// # Example
217/// ```
218/// # use google_cloud_storage::client::Storage;
219/// # async fn sample(client: &Storage) -> anyhow::Result<()> {
220/// let payload = bytes::Bytes::from_static(b"Hello World!");
221/// let response = client
222///     .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
223///     .send_unbuffered()
224///     .await?;
225/// println!("response details={response:?}");
226/// # Ok(()) }
227/// ```
228pub struct BytesSource {
229    contents: bytes::Bytes,
230    current: Option<bytes::Bytes>,
231}
232
233impl BytesSource {
234    pub(crate) fn new(contents: bytes::Bytes) -> Self {
235        let current = Some(contents.clone());
236        Self { contents, current }
237    }
238}
239
240impl StreamingSource for BytesSource {
241    type Error = crate::Error;
242
243    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
244        self.current.take().map(Result::Ok)
245    }
246
247    async fn size_hint(&self) -> Result<(u64, Option<u64>), Self::Error> {
248        let s = self.contents.len() as u64;
249        Ok((s, Some(s)))
250    }
251}
252
253impl Seek for BytesSource {
254    type Error = crate::Error;
255
256    async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
257        let pos = std::cmp::min(offset as usize, self.contents.len());
258        self.current = Some(self.contents.slice(pos..));
259        Ok(())
260    }
261}
262
263/// Implements [StreamingSource] for a sequence of [bytes::Bytes].
264pub(crate) struct IterSource {
265    contents: Vec<bytes::Bytes>,
266    current: VecDeque<bytes::Bytes>,
267}
268
269impl IterSource {
270    pub(crate) fn new<I>(iterator: I) -> Self
271    where
272        I: IntoIterator<Item = bytes::Bytes>,
273    {
274        let contents: Vec<bytes::Bytes> = iterator.into_iter().collect();
275        let current: VecDeque<bytes::Bytes> = contents.iter().cloned().collect();
276        Self { contents, current }
277    }
278}
279
280impl StreamingSource for IterSource {
281    type Error = std::io::Error;
282
283    async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
284        self.current.pop_front().map(Ok)
285    }
286
287    async fn size_hint(&self) -> Result<(u64, Option<u64>), Self::Error> {
288        let s = self.contents.iter().fold(0_u64, |a, i| a + i.len() as u64);
289        Ok((s, Some(s)))
290    }
291}
292
293impl Seek for IterSource {
294    type Error = std::io::Error;
295    async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
296        let mut current = VecDeque::new();
297        let mut offset = offset as usize;
298        for b in self.contents.iter() {
299            offset = match (offset, b.len()) {
300                (0, _) => {
301                    current.push_back(b.clone());
302                    0
303                }
304                (o, n) if o >= n => o - n,
305                (o, n) => {
306                    current.push_back(b.clone().split_off(n - o));
307                    0
308                }
309            }
310        }
311        self.current = current;
312        Ok(())
313    }
314}
315
316#[cfg(test)]
317pub mod tests {
318    use super::*;
319    use std::io::Write;
320    use tempfile::NamedTempFile;
321
322    type Result = anyhow::Result<()>;
323
324    const CONTENTS: &[u8] = b"how vexingly quick daft zebras jump";
325
326    pub(crate) struct UnknownSize {
327        inner: BytesSource,
328    }
329    impl UnknownSize {
330        pub fn new(inner: BytesSource) -> Self {
331            Self { inner }
332        }
333    }
334    impl Seek for UnknownSize {
335        type Error = <BytesSource as Seek>::Error;
336        async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
337            self.inner.seek(offset).await
338        }
339    }
340    impl StreamingSource for UnknownSize {
341        type Error = <BytesSource as StreamingSource>::Error;
342        async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
343            self.inner.next().await
344        }
345        async fn size_hint(&self) -> std::result::Result<(u64, Option<u64>), Self::Error> {
346            let hint = self.inner.size_hint().await?;
347            Ok((hint.0, None))
348        }
349    }
350
351    mockall::mock! {
352        pub(crate) SimpleSource {}
353
354        impl StreamingSource for SimpleSource {
355            type Error = std::io::Error;
356            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
357            async fn size_hint(&self) -> std::result::Result<(u64, Option<u64>), std::io::Error>;
358        }
359    }
360
361    mockall::mock! {
362        pub(crate) SeekSource {}
363
364        impl StreamingSource for SeekSource {
365            type Error = std::io::Error;
366            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
367            async fn size_hint(&self) -> std::result::Result<(u64, Option<u64>), std::io::Error>;
368        }
369        impl Seek for SeekSource {
370            type Error = std::io::Error;
371            async fn seek(&mut self, offset: u64) ->std::result::Result<(), std::io::Error>;
372        }
373    }
374
375    /// A helper function to simplify the tests.
376    async fn collect<S>(mut source: S) -> anyhow::Result<Vec<u8>>
377    where
378        S: StreamingSource,
379    {
380        collect_mut(&mut source).await
381    }
382
383    /// A helper function to simplify the tests.
384    async fn collect_mut<S>(source: &mut S) -> anyhow::Result<Vec<u8>>
385    where
386        S: StreamingSource,
387    {
388        let mut vec = Vec::new();
389        while let Some(bytes) = source.next().await.transpose()? {
390            vec.extend_from_slice(&bytes);
391        }
392        Ok(vec)
393    }
394
395    #[tokio::test]
396    async fn empty_bytes() -> Result {
397        let buffer = Payload::from(bytes::Bytes::default());
398        let range = buffer.size_hint().await?;
399        assert_eq!(range, (0, Some(0)));
400        let got = collect(buffer).await?;
401        assert!(got.is_empty(), "{got:?}");
402
403        Ok(())
404    }
405
406    #[tokio::test]
407    async fn simple_bytes() -> Result {
408        let buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
409        let range = buffer.size_hint().await?;
410        assert_eq!(range, (CONTENTS.len() as u64, Some(CONTENTS.len() as u64)));
411        let got = collect(buffer).await?;
412        assert_eq!(got[..], CONTENTS[..], "{got:?}");
413        Ok(())
414    }
415
416    #[tokio::test]
417    async fn simple_str() -> Result {
418        const LAZY: &str = "the quick brown fox jumps over the lazy dog";
419        let buffer = Payload::from(LAZY);
420        let range = buffer.size_hint().await?;
421        assert_eq!(range, (LAZY.len() as u64, Some(LAZY.len() as u64)));
422        let got = collect(buffer).await?;
423        assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
424        Ok(())
425    }
426
427    #[tokio::test]
428    async fn seek_bytes() -> Result {
429        let mut buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
430        buffer.seek(8).await?;
431        let got = collect(buffer).await?;
432        assert_eq!(got[..], CONTENTS[8..], "{got:?}");
433        Ok(())
434    }
435
436    #[tokio::test]
437    async fn empty_stream() -> Result {
438        let source = IterSource::new(vec![]);
439        let payload = Payload::from(source);
440        let range = payload.size_hint().await?;
441        assert_eq!(range, (0, Some(0)));
442        let got = collect(payload).await?;
443        assert!(got.is_empty(), "{got:?}");
444
445        Ok(())
446    }
447
448    #[tokio::test]
449    async fn simple_stream() -> Result {
450        let source = IterSource::new(
451            ["how ", "vexingly ", "quick ", "daft ", "zebras ", "jump"]
452                .map(|v| bytes::Bytes::from_static(v.as_bytes())),
453        );
454        let payload = Payload::from_stream(source);
455        let got = collect(payload).await?;
456        assert_eq!(got[..], CONTENTS[..]);
457
458        Ok(())
459    }
460
461    #[tokio::test]
462    async fn empty_file() -> Result {
463        let file = NamedTempFile::new()?;
464        let read = tokio::fs::File::from(file.reopen()?);
465        let payload = Payload::from(read);
466        let hint = payload.size_hint().await?;
467        assert_eq!(hint, (0_u64, Some(0_u64)));
468        let got = collect(payload).await?;
469        assert!(got.is_empty(), "{got:?}");
470        Ok(())
471    }
472
473    #[tokio::test]
474    async fn small_file() -> Result {
475        let mut file = NamedTempFile::new()?;
476        assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
477        file.flush()?;
478        let read = tokio::fs::File::from(file.reopen()?);
479        let payload = Payload::from(read);
480        let hint = payload.size_hint().await?;
481        let s = CONTENTS.len() as u64;
482        assert_eq!(hint, (s, Some(s)));
483        let got = collect(payload).await?;
484        assert_eq!(got[..], CONTENTS[..], "{got:?}");
485        Ok(())
486    }
487
488    #[tokio::test]
489    async fn small_file_seek() -> Result {
490        let mut file = NamedTempFile::new()?;
491        assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
492        file.flush()?;
493        let read = tokio::fs::File::from(file.reopen()?);
494        let mut payload = Payload::from(read);
495        payload.seek(8).await?;
496        let got = collect(payload).await?;
497        assert_eq!(got[..], CONTENTS[8..], "{got:?}");
498        Ok(())
499    }
500
501    #[tokio::test]
502    async fn larger_file() -> Result {
503        let mut file = NamedTempFile::new()?;
504        assert_eq!(file.write(&[0_u8; READ_SIZE])?, READ_SIZE);
505        assert_eq!(file.write(&[1_u8; READ_SIZE])?, READ_SIZE);
506        assert_eq!(file.write(&[2_u8; READ_SIZE])?, READ_SIZE);
507        assert_eq!(file.write(&[3_u8; READ_SIZE])?, READ_SIZE);
508        file.flush()?;
509        assert_eq!(READ_SIZE % 2, 0);
510        let read = tokio::fs::File::from(file.reopen()?);
511        let mut payload = Payload::from(read);
512        payload.seek((READ_SIZE + READ_SIZE / 2) as u64).await?;
513        let got = collect(payload).await?;
514        let mut want = Vec::new();
515        want.extend_from_slice(&[1_u8; READ_SIZE / 2]);
516        want.extend_from_slice(&[2_u8; READ_SIZE]);
517        want.extend_from_slice(&[3_u8; READ_SIZE]);
518        assert_eq!(got[..], want[..], "{got:?}");
519        Ok(())
520    }
521
522    #[tokio::test]
523    async fn iter_source_full() -> Result {
524        const N: usize = 32;
525        let mut buf = Vec::new();
526        buf.extend_from_slice(&[1_u8; N]);
527        buf.extend_from_slice(&[2_u8; N]);
528        buf.extend_from_slice(&[3_u8; N]);
529        let b = bytes::Bytes::from_owner(buf);
530
531        let mut stream =
532            IterSource::new(vec![b.slice(0..N), b.slice(N..(2 * N)), b.slice((2 * N)..)]);
533        assert_eq!(
534            stream.size_hint().await?,
535            (3 * N as u64, Some(3 * N as u64))
536        );
537
538        // test_case() is not appropriate here: we want to verify seek() works
539        // multiple times over the *same* stream.
540        for offset in [0, N / 2, 0, N, 0, 2 * N + N / 2] {
541            stream.seek(offset as u64).await?;
542            let got = collect_mut(&mut stream).await?;
543            assert_eq!(got[..], b[offset..(3 * N)]);
544        }
545
546        Ok(())
547    }
548}