google_cloud_storage/storage/
streaming_source.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines data sources for object writes.
16
17use std::collections::VecDeque;
18
19/// The *total* number of bytes expected in a [StreamingSource].
20pub type SizeHint = http_body::SizeHint;
21
22/// The payload for object writers via the [Storage][crate::client::Storage]
23/// client.
24///
25/// The storage client functions to create new objects consume types convertible
26/// to this type. That includes simple buffers, and any type implementing
27/// [StreamingSource].
28///
29/// # Example
30/// ```
31/// # tokio_test::block_on(async {
32/// # use google_cloud_storage::streaming_source::Payload;
33/// use google_cloud_storage::streaming_source::StreamingSource;
34/// let buffer : &[u8] = b"the quick brown fox jumps over the lazy dog";
35/// let mut size = 0_usize;
36/// let mut payload = Payload::from(bytes::Bytes::from_static(buffer));
37/// while let Some(bytes) = payload.next().await.transpose()? {
38///     size += bytes.len();
39/// }
40/// assert_eq!(size, buffer.len());
41/// # anyhow::Result::<()>::Ok(()) });
42/// ```
43pub struct Payload<T> {
44    payload: T,
45}
46
47impl<T> Payload<T>
48where
49    T: StreamingSource,
50{
51    pub fn from_stream(payload: T) -> Self {
52        Self { payload }
53    }
54}
55
56impl<T> StreamingSource for Payload<T>
57where
58    T: StreamingSource + Send + Sync,
59{
60    type Error = T::Error;
61
62    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
63        self.payload.next().await
64    }
65
66    async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
67        self.payload.size_hint().await
68    }
69}
70
71impl<T> Seek for Payload<T>
72where
73    T: Seek,
74{
75    type Error = T::Error;
76
77    fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send {
78        self.payload.seek(offset)
79    }
80}
81
82impl From<bytes::Bytes> for Payload<BytesSource> {
83    fn from(value: bytes::Bytes) -> Self {
84        let payload = BytesSource::new(value);
85        Self { payload }
86    }
87}
88
89impl From<&'static str> for Payload<BytesSource> {
90    fn from(value: &'static str) -> Self {
91        let b = bytes::Bytes::from_static(value.as_bytes());
92        Payload::from(b)
93    }
94}
95
96impl From<String> for Payload<BytesSource> {
97    fn from(value: String) -> Self {
98        let payload = BytesSource::new(bytes::Bytes::from(value));
99        Self { payload }
100    }
101}
102
103impl From<Vec<bytes::Bytes>> for Payload<IterSource> {
104    fn from(value: Vec<bytes::Bytes>) -> Self {
105        let payload = IterSource::new(value);
106        Self { payload }
107    }
108}
109
110impl<S> From<S> for Payload<S>
111where
112    S: StreamingSource,
113{
114    fn from(value: S) -> Self {
115        Self { payload: value }
116    }
117}
118
119/// Provides bytes from single-pass sources.
120pub trait StreamingSource {
121    /// The error type.
122    type Error: std::error::Error + Send + Sync + 'static;
123
124    /// Gets the next set of data.
125    fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes, Self::Error>>> + Send;
126
127    /// An estimate of the payload size.
128    ///
129    /// Returns the expected size as a `[min, max)` range. The maximum may be
130    /// unknown. If the minimum is unknown use `0`.
131    ///
132    /// If the maximum size is known and sufficiently small, the client library
133    /// may be able to use a more efficient protocol for the upload.
134    fn size_hint(&self) -> impl Future<Output = Result<SizeHint, Self::Error>> + Send {
135        std::future::ready(Ok(SizeHint::new()))
136    }
137}
138
139/// Provides bytes from sources that support seek.
140///
141/// The data may be received asynchronously, such as reads from Google Cloud
142/// Storage, reads from other remote storage systems, or the result of
143/// repeatable computations.
144pub trait Seek {
145    /// The error type.
146    type Error: std::error::Error + Send + Sync + 'static;
147
148    /// Resets the stream to start from `offset`.
149    ///
150    /// The client library automatically restarts writes when the connection
151    /// is reset or there is some kind of partial failure. Resuming a write may
152    /// require resetting the stream to an arbitrary point.
153    ///
154    /// The client library assumes that `seek(N)` followed by `next()` always
155    /// returns the same data.
156    fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send;
157}
158
159const READ_SIZE: usize = 256 * 1024;
160
161impl From<tokio::fs::File> for Payload<FileSource> {
162    fn from(value: tokio::fs::File) -> Self {
163        Self {
164            payload: FileSource::new(value),
165        }
166    }
167}
168
169/// Implements [StreamingSource] for a [tokio::fs::File].
170///
171/// # Example
172/// ```
173/// # use google_cloud_storage::client::Storage;
174/// # async fn sample(client: &Storage) -> anyhow::Result<()> {
175/// let payload = tokio::fs::File::open("my-data").await?;
176/// let response = client
177///     .write_object("projects/_/buckets/my-bucket", "my-object", payload)
178///     .send_unbuffered()
179///     .await?;
180/// println!("response details={response:?}");
181/// # Ok(()) }
182/// ```
183pub struct FileSource {
184    inner: tokio::fs::File,
185}
186
187impl FileSource {
188    fn new(inner: tokio::fs::File) -> Self {
189        Self { inner }
190    }
191}
192
193impl StreamingSource for FileSource {
194    type Error = std::io::Error;
195
196    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
197        let mut buffer = vec![0_u8; READ_SIZE];
198        match tokio::io::AsyncReadExt::read(&mut self.inner, &mut buffer).await {
199            Err(e) => Some(Err(e)),
200            Ok(0) => None,
201            Ok(n) => {
202                buffer.resize(n, 0_u8);
203                Some(Ok(bytes::Bytes::from_owner(buffer)))
204            }
205        }
206    }
207    async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
208        let m = self.inner.metadata().await?;
209        Ok(SizeHint::with_exact(m.len()))
210    }
211}
212
213impl Seek for FileSource {
214    type Error = std::io::Error;
215
216    async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
217        use tokio::io::AsyncSeekExt;
218        let _ = self.inner.seek(std::io::SeekFrom::Start(offset)).await?;
219        Ok(())
220    }
221}
222
223/// Implements [StreamingSource] for [bytes::Bytes].
224///
225/// # Example
226/// ```
227/// # use google_cloud_storage::client::Storage;
228/// # async fn sample(client: &Storage) -> anyhow::Result<()> {
229/// let payload = bytes::Bytes::from_static(b"Hello World!");
230/// let response = client
231///     .write_object("projects/_/buckets/my-bucket", "my-object", payload)
232///     .send_unbuffered()
233///     .await?;
234/// println!("response details={response:?}");
235/// # Ok(()) }
236/// ```
237pub struct BytesSource {
238    contents: bytes::Bytes,
239    current: Option<bytes::Bytes>,
240}
241
242impl BytesSource {
243    pub(crate) fn new(contents: bytes::Bytes) -> Self {
244        let current = Some(contents.clone());
245        Self { contents, current }
246    }
247}
248
249impl StreamingSource for BytesSource {
250    type Error = crate::Error;
251
252    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
253        self.current.take().map(Result::Ok)
254    }
255
256    async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
257        let s = self.contents.len() as u64;
258        Ok(SizeHint::with_exact(s))
259    }
260}
261
262impl Seek for BytesSource {
263    type Error = crate::Error;
264
265    async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
266        let pos = std::cmp::min(offset as usize, self.contents.len());
267        self.current = Some(self.contents.slice(pos..));
268        Ok(())
269    }
270}
271
272/// Implements [StreamingSource] for a sequence of [bytes::Bytes].
273pub(crate) struct IterSource {
274    contents: Vec<bytes::Bytes>,
275    current: VecDeque<bytes::Bytes>,
276}
277
278impl IterSource {
279    pub(crate) fn new<I>(iterator: I) -> Self
280    where
281        I: IntoIterator<Item = bytes::Bytes>,
282    {
283        let contents: Vec<bytes::Bytes> = iterator.into_iter().collect();
284        let current: VecDeque<bytes::Bytes> = contents.iter().cloned().collect();
285        Self { contents, current }
286    }
287}
288
289impl StreamingSource for IterSource {
290    type Error = std::io::Error;
291
292    async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
293        self.current.pop_front().map(Ok)
294    }
295
296    async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
297        let s = self.contents.iter().fold(0_u64, |a, i| a + i.len() as u64);
298        Ok(SizeHint::with_exact(s))
299    }
300}
301
302impl Seek for IterSource {
303    type Error = std::io::Error;
304    async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
305        let mut current = VecDeque::new();
306        let mut offset = offset as usize;
307        for b in self.contents.iter() {
308            offset = match (offset, b.len()) {
309                (0, _) => {
310                    current.push_back(b.clone());
311                    0
312                }
313                (o, n) if o >= n => o - n,
314                (o, n) => {
315                    current.push_back(b.clone().split_off(n - o));
316                    0
317                }
318            }
319        }
320        self.current = current;
321        Ok(())
322    }
323}
324
325#[cfg(test)]
326pub mod tests {
327    use super::*;
328    use std::io::Write;
329    use tempfile::NamedTempFile;
330
331    type Result = anyhow::Result<()>;
332
333    const CONTENTS: &[u8] = b"how vexingly quick daft zebras jump";
334
335    pub(crate) struct UnknownSize {
336        inner: BytesSource,
337    }
338    impl UnknownSize {
339        pub fn new(inner: BytesSource) -> Self {
340            Self { inner }
341        }
342    }
343    impl Seek for UnknownSize {
344        type Error = <BytesSource as Seek>::Error;
345        async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
346            self.inner.seek(offset).await
347        }
348    }
349    impl StreamingSource for UnknownSize {
350        type Error = <BytesSource as StreamingSource>::Error;
351        async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
352            self.inner.next().await
353        }
354        async fn size_hint(&self) -> std::result::Result<SizeHint, Self::Error> {
355            let inner = self.inner.size_hint().await?;
356            let mut hint = SizeHint::default();
357            hint.set_lower(inner.lower());
358            Ok(hint)
359        }
360    }
361
362    mockall::mock! {
363        pub(crate) SimpleSource {}
364
365        impl StreamingSource for SimpleSource {
366            type Error = std::io::Error;
367            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
368            async fn size_hint(&self) -> std::result::Result<SizeHint, std::io::Error>;
369        }
370    }
371
372    mockall::mock! {
373        pub(crate) SeekSource {}
374
375        impl StreamingSource for SeekSource {
376            type Error = std::io::Error;
377            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
378            async fn size_hint(&self) -> std::result::Result<SizeHint, std::io::Error>;
379        }
380        impl Seek for SeekSource {
381            type Error = std::io::Error;
382            async fn seek(&mut self, offset: u64) ->std::result::Result<(), std::io::Error>;
383        }
384    }
385
386    /// A helper function to simplify the tests.
387    async fn collect<S>(mut source: S) -> anyhow::Result<Vec<u8>>
388    where
389        S: StreamingSource,
390    {
391        collect_mut(&mut source).await
392    }
393
394    /// A helper function to simplify the tests.
395    async fn collect_mut<S>(source: &mut S) -> anyhow::Result<Vec<u8>>
396    where
397        S: StreamingSource,
398    {
399        let mut vec = Vec::new();
400        while let Some(bytes) = source.next().await.transpose()? {
401            vec.extend_from_slice(&bytes);
402        }
403        Ok(vec)
404    }
405
406    #[tokio::test]
407    async fn empty_bytes() -> Result {
408        let buffer = Payload::from(bytes::Bytes::default());
409        let range = buffer.size_hint().await?;
410        assert_eq!(range.exact(), Some(0));
411        let got = collect(buffer).await?;
412        assert!(got.is_empty(), "{got:?}");
413
414        Ok(())
415    }
416
417    #[tokio::test]
418    async fn simple_bytes() -> Result {
419        let buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
420        let range = buffer.size_hint().await?;
421        assert_eq!(range.exact(), Some(CONTENTS.len() as u64));
422        let got = collect(buffer).await?;
423        assert_eq!(got[..], CONTENTS[..], "{got:?}");
424        Ok(())
425    }
426
427    #[tokio::test]
428    async fn simple_str() -> Result {
429        const LAZY: &str = "the quick brown fox jumps over the lazy dog";
430        let buffer = Payload::from(LAZY);
431        let range = buffer.size_hint().await?;
432        assert_eq!(range.exact(), Some(LAZY.len() as u64));
433        let got = collect(buffer).await?;
434        assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
435        Ok(())
436    }
437
438    #[tokio::test]
439    async fn simple_string() -> Result {
440        const LAZY: &str = "the quick brown fox jumps over the lazy dog";
441        let buffer = Payload::from(String::from(LAZY));
442        let range = buffer.size_hint().await?;
443        assert_eq!(range.exact(), Some(LAZY.len() as u64));
444        let got = collect(buffer).await?;
445        assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
446        Ok(())
447    }
448
449    #[tokio::test]
450    async fn seek_bytes() -> Result {
451        let mut buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
452        buffer.seek(8).await?;
453        let got = collect(buffer).await?;
454        assert_eq!(got[..], CONTENTS[8..], "{got:?}");
455        Ok(())
456    }
457
458    #[tokio::test]
459    async fn empty_stream() -> Result {
460        let source = IterSource::new(vec![]);
461        let payload = Payload::from(source);
462        let range = payload.size_hint().await?;
463        assert_eq!(range.exact(), Some(0));
464        let got = collect(payload).await?;
465        assert!(got.is_empty(), "{got:?}");
466
467        Ok(())
468    }
469
470    #[tokio::test]
471    async fn simple_stream() -> Result {
472        let source = IterSource::new(
473            ["how ", "vexingly ", "quick ", "daft ", "zebras ", "jump"]
474                .map(|v| bytes::Bytes::from_static(v.as_bytes())),
475        );
476        let payload = Payload::from_stream(source);
477        let got = collect(payload).await?;
478        assert_eq!(got[..], CONTENTS[..]);
479
480        Ok(())
481    }
482
483    #[tokio::test]
484    async fn empty_file() -> Result {
485        let file = NamedTempFile::new()?;
486        let read = tokio::fs::File::from(file.reopen()?);
487        let payload = Payload::from(read);
488        let hint = payload.size_hint().await?;
489        assert_eq!(hint.exact(), Some(0));
490        let got = collect(payload).await?;
491        assert!(got.is_empty(), "{got:?}");
492        Ok(())
493    }
494
495    #[tokio::test]
496    async fn small_file() -> Result {
497        let mut file = NamedTempFile::new()?;
498        assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
499        file.flush()?;
500        let read = tokio::fs::File::from(file.reopen()?);
501        let payload = Payload::from(read);
502        let hint = payload.size_hint().await?;
503        let s = CONTENTS.len() as u64;
504        assert_eq!(hint.exact(), Some(s));
505        let got = collect(payload).await?;
506        assert_eq!(got[..], CONTENTS[..], "{got:?}");
507        Ok(())
508    }
509
510    #[tokio::test]
511    async fn small_file_seek() -> Result {
512        let mut file = NamedTempFile::new()?;
513        assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
514        file.flush()?;
515        let read = tokio::fs::File::from(file.reopen()?);
516        let mut payload = Payload::from(read);
517        payload.seek(8).await?;
518        let got = collect(payload).await?;
519        assert_eq!(got[..], CONTENTS[8..], "{got:?}");
520        Ok(())
521    }
522
523    #[tokio::test]
524    async fn larger_file() -> Result {
525        let mut file = NamedTempFile::new()?;
526        assert_eq!(file.write(&[0_u8; READ_SIZE])?, READ_SIZE);
527        assert_eq!(file.write(&[1_u8; READ_SIZE])?, READ_SIZE);
528        assert_eq!(file.write(&[2_u8; READ_SIZE])?, READ_SIZE);
529        assert_eq!(file.write(&[3_u8; READ_SIZE])?, READ_SIZE);
530        file.flush()?;
531        assert_eq!(READ_SIZE % 2, 0);
532        let read = tokio::fs::File::from(file.reopen()?);
533        let mut payload = Payload::from(read);
534        payload.seek((READ_SIZE + READ_SIZE / 2) as u64).await?;
535        let got = collect(payload).await?;
536        let mut want = Vec::new();
537        want.extend_from_slice(&[1_u8; READ_SIZE / 2]);
538        want.extend_from_slice(&[2_u8; READ_SIZE]);
539        want.extend_from_slice(&[3_u8; READ_SIZE]);
540        assert_eq!(got[..], want[..], "{got:?}");
541        Ok(())
542    }
543
544    #[tokio::test]
545    async fn iter_source_full() -> Result {
546        const N: usize = 32;
547        let mut buf = Vec::new();
548        buf.extend_from_slice(&[1_u8; N]);
549        buf.extend_from_slice(&[2_u8; N]);
550        buf.extend_from_slice(&[3_u8; N]);
551        let b = bytes::Bytes::from_owner(buf);
552
553        let mut stream =
554            IterSource::new(vec![b.slice(0..N), b.slice(N..(2 * N)), b.slice((2 * N)..)]);
555        assert_eq!(stream.size_hint().await?.exact(), Some(3 * N as u64));
556
557        // test_case() is not appropriate here: we want to verify seek() works
558        // multiple times over the *same* stream.
559        for offset in [0, N / 2, 0, N, 0, 2 * N + N / 2] {
560            stream.seek(offset as u64).await?;
561            let got = collect_mut(&mut stream).await?;
562            assert_eq!(got[..], b[offset..(3 * N)]);
563        }
564
565        Ok(())
566    }
567}