google_cloud_storage/storage/
streaming_source.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines data sources for object writes.
16
17use std::collections::VecDeque;
18
19/// The *total* number of bytes expected in a [StreamingSource].
20pub type SizeHint = http_body::SizeHint;
21
22/// The payload for object writers via the [Storage][crate::client::Storage]
23/// client.
24///
25/// The storage client functions to create new objects consume types convertible
26/// to this type. That includes simple buffers, and any type implementing
27/// [StreamingSource].
28///
29/// # Example
30/// ```
31/// # tokio_test::block_on(async {
32/// # use google_cloud_storage::streaming_source::Payload;
33/// use google_cloud_storage::streaming_source::StreamingSource;
34/// let buffer : &[u8] = b"the quick brown fox jumps over the lazy dog";
35/// let mut size = 0_usize;
36/// let mut payload = Payload::from(bytes::Bytes::from_static(buffer));
37/// while let Some(bytes) = payload.next().await.transpose()? {
38///     size += bytes.len();
39/// }
40/// assert_eq!(size, buffer.len());
41/// # anyhow::Result::<()>::Ok(()) });
42/// ```
43pub struct Payload<T> {
44    payload: T,
45}
46
47impl<T> Payload<T>
48where
49    T: StreamingSource,
50{
51    pub fn from_stream(payload: T) -> Self {
52        Self { payload }
53    }
54}
55
56impl<T> StreamingSource for Payload<T>
57where
58    T: StreamingSource + Send + Sync,
59{
60    type Error = T::Error;
61
62    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
63        self.payload.next().await
64    }
65
66    async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
67        self.payload.size_hint().await
68    }
69}
70
71impl<T> Seek for Payload<T>
72where
73    T: Seek,
74{
75    type Error = T::Error;
76
77    fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send {
78        self.payload.seek(offset)
79    }
80}
81
82impl From<bytes::Bytes> for Payload<BytesSource> {
83    fn from(value: bytes::Bytes) -> Self {
84        let payload = BytesSource::new(value);
85        Self { payload }
86    }
87}
88
89impl From<&'static str> for Payload<BytesSource> {
90    fn from(value: &'static str) -> Self {
91        let b = bytes::Bytes::from_static(value.as_bytes());
92        Payload::from(b)
93    }
94}
95
96impl From<Vec<bytes::Bytes>> for Payload<IterSource> {
97    fn from(value: Vec<bytes::Bytes>) -> Self {
98        let payload = IterSource::new(value);
99        Self { payload }
100    }
101}
102
103impl<S> From<S> for Payload<S>
104where
105    S: StreamingSource,
106{
107    fn from(value: S) -> Self {
108        Self { payload: value }
109    }
110}
111
112/// Provides bytes from single-pass sources.
113pub trait StreamingSource {
114    /// The error type.
115    type Error: std::error::Error + Send + Sync + 'static;
116
117    /// Gets the next set of data.
118    fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes, Self::Error>>> + Send;
119
120    /// An estimate of the payload size.
121    ///
122    /// Returns the expected size as a `[min, max)` range. The maximum may be
123    /// unknown. If the minimum is unknown use `0`.
124    ///
125    /// If the maximum size is known and sufficiently small, the client library
126    /// may be able to use a more efficient protocol for the upload.
127    fn size_hint(&self) -> impl Future<Output = Result<SizeHint, Self::Error>> + Send {
128        std::future::ready(Ok(SizeHint::new()))
129    }
130}
131
132/// Provides bytes from sources that support seek.
133///
134/// The data may be received asynchronously, such as reads from Google Cloud
135/// Storage, reads from other remote storage systems, or the result of
136/// repeatable computations.
137pub trait Seek {
138    /// The error type.
139    type Error: std::error::Error + Send + Sync + 'static;
140
141    /// Resets the stream to start from `offset`.
142    ///
143    /// The client library automatically restarts writes when the connection
144    /// is reset or there is some kind of partial failure. Resuming a write may
145    /// require resetting the stream to an arbitrary point.
146    ///
147    /// The client library assumes that `seek(N)` followed by `next()` always
148    /// returns the same data.
149    fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send;
150}
151
152const READ_SIZE: usize = 256 * 1024;
153
154impl From<tokio::fs::File> for Payload<FileSource> {
155    fn from(value: tokio::fs::File) -> Self {
156        Self {
157            payload: FileSource::new(value),
158        }
159    }
160}
161
162/// Implements [StreamingSource] for a [tokio::fs::File].
163///
164/// # Example
165/// ```
166/// # use google_cloud_storage::client::Storage;
167/// # async fn sample(client: &Storage) -> anyhow::Result<()> {
168/// let payload = tokio::fs::File::open("my-data").await?;
169/// let response = client
170///     .write_object("projects/_/buckets/my-bucket", "my-object", payload)
171///     .send_unbuffered()
172///     .await?;
173/// println!("response details={response:?}");
174/// # Ok(()) }
175/// ```
176pub struct FileSource {
177    inner: tokio::fs::File,
178}
179
180impl FileSource {
181    fn new(inner: tokio::fs::File) -> Self {
182        Self { inner }
183    }
184}
185
186impl StreamingSource for FileSource {
187    type Error = std::io::Error;
188
189    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
190        let mut buffer = vec![0_u8; READ_SIZE];
191        match tokio::io::AsyncReadExt::read(&mut self.inner, &mut buffer).await {
192            Err(e) => Some(Err(e)),
193            Ok(0) => None,
194            Ok(n) => {
195                buffer.resize(n, 0_u8);
196                Some(Ok(bytes::Bytes::from_owner(buffer)))
197            }
198        }
199    }
200    async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
201        let m = self.inner.metadata().await?;
202        Ok(SizeHint::with_exact(m.len()))
203    }
204}
205
206impl Seek for FileSource {
207    type Error = std::io::Error;
208
209    async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
210        use tokio::io::AsyncSeekExt;
211        let _ = self.inner.seek(std::io::SeekFrom::Start(offset)).await?;
212        Ok(())
213    }
214}
215
216/// Implements [StreamingSource] for [bytes::Bytes].
217///
218/// # Example
219/// ```
220/// # use google_cloud_storage::client::Storage;
221/// # async fn sample(client: &Storage) -> anyhow::Result<()> {
222/// let payload = bytes::Bytes::from_static(b"Hello World!");
223/// let response = client
224///     .write_object("projects/_/buckets/my-bucket", "my-object", payload)
225///     .send_unbuffered()
226///     .await?;
227/// println!("response details={response:?}");
228/// # Ok(()) }
229/// ```
230pub struct BytesSource {
231    contents: bytes::Bytes,
232    current: Option<bytes::Bytes>,
233}
234
235impl BytesSource {
236    pub(crate) fn new(contents: bytes::Bytes) -> Self {
237        let current = Some(contents.clone());
238        Self { contents, current }
239    }
240}
241
242impl StreamingSource for BytesSource {
243    type Error = crate::Error;
244
245    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
246        self.current.take().map(Result::Ok)
247    }
248
249    async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
250        let s = self.contents.len() as u64;
251        Ok(SizeHint::with_exact(s))
252    }
253}
254
255impl Seek for BytesSource {
256    type Error = crate::Error;
257
258    async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
259        let pos = std::cmp::min(offset as usize, self.contents.len());
260        self.current = Some(self.contents.slice(pos..));
261        Ok(())
262    }
263}
264
265/// Implements [StreamingSource] for a sequence of [bytes::Bytes].
266pub(crate) struct IterSource {
267    contents: Vec<bytes::Bytes>,
268    current: VecDeque<bytes::Bytes>,
269}
270
271impl IterSource {
272    pub(crate) fn new<I>(iterator: I) -> Self
273    where
274        I: IntoIterator<Item = bytes::Bytes>,
275    {
276        let contents: Vec<bytes::Bytes> = iterator.into_iter().collect();
277        let current: VecDeque<bytes::Bytes> = contents.iter().cloned().collect();
278        Self { contents, current }
279    }
280}
281
282impl StreamingSource for IterSource {
283    type Error = std::io::Error;
284
285    async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
286        self.current.pop_front().map(Ok)
287    }
288
289    async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
290        let s = self.contents.iter().fold(0_u64, |a, i| a + i.len() as u64);
291        Ok(SizeHint::with_exact(s))
292    }
293}
294
295impl Seek for IterSource {
296    type Error = std::io::Error;
297    async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
298        let mut current = VecDeque::new();
299        let mut offset = offset as usize;
300        for b in self.contents.iter() {
301            offset = match (offset, b.len()) {
302                (0, _) => {
303                    current.push_back(b.clone());
304                    0
305                }
306                (o, n) if o >= n => o - n,
307                (o, n) => {
308                    current.push_back(b.clone().split_off(n - o));
309                    0
310                }
311            }
312        }
313        self.current = current;
314        Ok(())
315    }
316}
317
318#[cfg(test)]
319pub mod tests {
320    use super::*;
321    use std::io::Write;
322    use tempfile::NamedTempFile;
323
324    type Result = anyhow::Result<()>;
325
326    const CONTENTS: &[u8] = b"how vexingly quick daft zebras jump";
327
328    pub(crate) struct UnknownSize {
329        inner: BytesSource,
330    }
331    impl UnknownSize {
332        pub fn new(inner: BytesSource) -> Self {
333            Self { inner }
334        }
335    }
336    impl Seek for UnknownSize {
337        type Error = <BytesSource as Seek>::Error;
338        async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
339            self.inner.seek(offset).await
340        }
341    }
342    impl StreamingSource for UnknownSize {
343        type Error = <BytesSource as StreamingSource>::Error;
344        async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
345            self.inner.next().await
346        }
347        async fn size_hint(&self) -> std::result::Result<SizeHint, Self::Error> {
348            let inner = self.inner.size_hint().await?;
349            let mut hint = SizeHint::default();
350            hint.set_lower(inner.lower());
351            Ok(hint)
352        }
353    }
354
355    mockall::mock! {
356        pub(crate) SimpleSource {}
357
358        impl StreamingSource for SimpleSource {
359            type Error = std::io::Error;
360            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
361            async fn size_hint(&self) -> std::result::Result<SizeHint, std::io::Error>;
362        }
363    }
364
365    mockall::mock! {
366        pub(crate) SeekSource {}
367
368        impl StreamingSource for SeekSource {
369            type Error = std::io::Error;
370            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
371            async fn size_hint(&self) -> std::result::Result<SizeHint, std::io::Error>;
372        }
373        impl Seek for SeekSource {
374            type Error = std::io::Error;
375            async fn seek(&mut self, offset: u64) ->std::result::Result<(), std::io::Error>;
376        }
377    }
378
379    /// A helper function to simplify the tests.
380    async fn collect<S>(mut source: S) -> anyhow::Result<Vec<u8>>
381    where
382        S: StreamingSource,
383    {
384        collect_mut(&mut source).await
385    }
386
387    /// A helper function to simplify the tests.
388    async fn collect_mut<S>(source: &mut S) -> anyhow::Result<Vec<u8>>
389    where
390        S: StreamingSource,
391    {
392        let mut vec = Vec::new();
393        while let Some(bytes) = source.next().await.transpose()? {
394            vec.extend_from_slice(&bytes);
395        }
396        Ok(vec)
397    }
398
399    #[tokio::test]
400    async fn empty_bytes() -> Result {
401        let buffer = Payload::from(bytes::Bytes::default());
402        let range = buffer.size_hint().await?;
403        assert_eq!(range.exact(), Some(0));
404        let got = collect(buffer).await?;
405        assert!(got.is_empty(), "{got:?}");
406
407        Ok(())
408    }
409
410    #[tokio::test]
411    async fn simple_bytes() -> Result {
412        let buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
413        let range = buffer.size_hint().await?;
414        assert_eq!(range.exact(), Some(CONTENTS.len() as u64));
415        let got = collect(buffer).await?;
416        assert_eq!(got[..], CONTENTS[..], "{got:?}");
417        Ok(())
418    }
419
420    #[tokio::test]
421    async fn simple_str() -> Result {
422        const LAZY: &str = "the quick brown fox jumps over the lazy dog";
423        let buffer = Payload::from(LAZY);
424        let range = buffer.size_hint().await?;
425        assert_eq!(range.exact(), Some(LAZY.len() as u64));
426        let got = collect(buffer).await?;
427        assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
428        Ok(())
429    }
430
431    #[tokio::test]
432    async fn seek_bytes() -> Result {
433        let mut buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
434        buffer.seek(8).await?;
435        let got = collect(buffer).await?;
436        assert_eq!(got[..], CONTENTS[8..], "{got:?}");
437        Ok(())
438    }
439
440    #[tokio::test]
441    async fn empty_stream() -> Result {
442        let source = IterSource::new(vec![]);
443        let payload = Payload::from(source);
444        let range = payload.size_hint().await?;
445        assert_eq!(range.exact(), Some(0));
446        let got = collect(payload).await?;
447        assert!(got.is_empty(), "{got:?}");
448
449        Ok(())
450    }
451
452    #[tokio::test]
453    async fn simple_stream() -> Result {
454        let source = IterSource::new(
455            ["how ", "vexingly ", "quick ", "daft ", "zebras ", "jump"]
456                .map(|v| bytes::Bytes::from_static(v.as_bytes())),
457        );
458        let payload = Payload::from_stream(source);
459        let got = collect(payload).await?;
460        assert_eq!(got[..], CONTENTS[..]);
461
462        Ok(())
463    }
464
465    #[tokio::test]
466    async fn empty_file() -> Result {
467        let file = NamedTempFile::new()?;
468        let read = tokio::fs::File::from(file.reopen()?);
469        let payload = Payload::from(read);
470        let hint = payload.size_hint().await?;
471        assert_eq!(hint.exact(), Some(0));
472        let got = collect(payload).await?;
473        assert!(got.is_empty(), "{got:?}");
474        Ok(())
475    }
476
477    #[tokio::test]
478    async fn small_file() -> Result {
479        let mut file = NamedTempFile::new()?;
480        assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
481        file.flush()?;
482        let read = tokio::fs::File::from(file.reopen()?);
483        let payload = Payload::from(read);
484        let hint = payload.size_hint().await?;
485        let s = CONTENTS.len() as u64;
486        assert_eq!(hint.exact(), Some(s));
487        let got = collect(payload).await?;
488        assert_eq!(got[..], CONTENTS[..], "{got:?}");
489        Ok(())
490    }
491
492    #[tokio::test]
493    async fn small_file_seek() -> Result {
494        let mut file = NamedTempFile::new()?;
495        assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
496        file.flush()?;
497        let read = tokio::fs::File::from(file.reopen()?);
498        let mut payload = Payload::from(read);
499        payload.seek(8).await?;
500        let got = collect(payload).await?;
501        assert_eq!(got[..], CONTENTS[8..], "{got:?}");
502        Ok(())
503    }
504
505    #[tokio::test]
506    async fn larger_file() -> Result {
507        let mut file = NamedTempFile::new()?;
508        assert_eq!(file.write(&[0_u8; READ_SIZE])?, READ_SIZE);
509        assert_eq!(file.write(&[1_u8; READ_SIZE])?, READ_SIZE);
510        assert_eq!(file.write(&[2_u8; READ_SIZE])?, READ_SIZE);
511        assert_eq!(file.write(&[3_u8; READ_SIZE])?, READ_SIZE);
512        file.flush()?;
513        assert_eq!(READ_SIZE % 2, 0);
514        let read = tokio::fs::File::from(file.reopen()?);
515        let mut payload = Payload::from(read);
516        payload.seek((READ_SIZE + READ_SIZE / 2) as u64).await?;
517        let got = collect(payload).await?;
518        let mut want = Vec::new();
519        want.extend_from_slice(&[1_u8; READ_SIZE / 2]);
520        want.extend_from_slice(&[2_u8; READ_SIZE]);
521        want.extend_from_slice(&[3_u8; READ_SIZE]);
522        assert_eq!(got[..], want[..], "{got:?}");
523        Ok(())
524    }
525
526    #[tokio::test]
527    async fn iter_source_full() -> Result {
528        const N: usize = 32;
529        let mut buf = Vec::new();
530        buf.extend_from_slice(&[1_u8; N]);
531        buf.extend_from_slice(&[2_u8; N]);
532        buf.extend_from_slice(&[3_u8; N]);
533        let b = bytes::Bytes::from_owner(buf);
534
535        let mut stream =
536            IterSource::new(vec![b.slice(0..N), b.slice(N..(2 * N)), b.slice((2 * N)..)]);
537        assert_eq!(stream.size_hint().await?.exact(), Some(3 * N as u64));
538
539        // test_case() is not appropriate here: we want to verify seek() works
540        // multiple times over the *same* stream.
541        for offset in [0, N / 2, 0, N, 0, 2 * N + N / 2] {
542            stream.seek(offset as u64).await?;
543            let got = collect_mut(&mut stream).await?;
544            assert_eq!(got[..], b[offset..(3 * N)]);
545        }
546
547        Ok(())
548    }
549}