Skip to main content

google_cloud_storage/storage/
streaming_source.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines data sources for object writes.
16
17use std::collections::VecDeque;
18
19/// The *total* number of bytes expected in a [StreamingSource].
20pub type SizeHint = http_body::SizeHint;
21
22/// The payload for object writers via the [Storage][crate::client::Storage]
23/// client.
24///
25/// The storage client functions to create new objects consume types convertible
26/// to this type. That includes simple buffers, and any type implementing
27/// [StreamingSource].
28///
29/// # Example
30/// ```
31/// # async fn sample() -> anyhow::Result<()> {
32/// # use google_cloud_storage::streaming_source::Payload;
33/// use google_cloud_storage::streaming_source::StreamingSource;
34/// let buffer : &[u8] = b"the quick brown fox jumps over the lazy dog";
35/// let mut size = 0_usize;
36/// let mut payload = Payload::from(bytes::Bytes::from_static(buffer));
37/// while let Some(bytes) = payload.next().await.transpose()? {
38///     size += bytes.len();
39/// }
40/// assert_eq!(size, buffer.len());
41/// # Ok(()) }
42/// ```
43pub struct Payload<T> {
44    payload: T,
45}
46
47impl<T> Payload<T>
48where
49    T: StreamingSource,
50{
51    /// Creates a new `Payload` from a streaming source.
52    pub fn from_stream(payload: T) -> Self {
53        Self { payload }
54    }
55}
56
57impl<T> StreamingSource for Payload<T>
58where
59    T: StreamingSource + Send + Sync,
60{
61    type Error = T::Error;
62
63    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
64        self.payload.next().await
65    }
66
67    async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
68        self.payload.size_hint().await
69    }
70}
71
72impl<T> Seek for Payload<T>
73where
74    T: Seek,
75{
76    type Error = T::Error;
77
78    fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send {
79        self.payload.seek(offset)
80    }
81}
82
83impl From<bytes::Bytes> for Payload<BytesSource> {
84    fn from(value: bytes::Bytes) -> Self {
85        let payload = BytesSource::new(value);
86        Self { payload }
87    }
88}
89
90impl From<&'static str> for Payload<BytesSource> {
91    fn from(value: &'static str) -> Self {
92        let b = bytes::Bytes::from_static(value.as_bytes());
93        Payload::from(b)
94    }
95}
96
97impl From<String> for Payload<BytesSource> {
98    fn from(value: String) -> Self {
99        let payload = BytesSource::new(bytes::Bytes::from(value));
100        Self { payload }
101    }
102}
103
104impl From<Vec<bytes::Bytes>> for Payload<IterSource> {
105    fn from(value: Vec<bytes::Bytes>) -> Self {
106        let payload = IterSource::new(value);
107        Self { payload }
108    }
109}
110
111impl<S> From<S> for Payload<S>
112where
113    S: StreamingSource,
114{
115    fn from(value: S) -> Self {
116        Self { payload: value }
117    }
118}
119
120/// Provides bytes from single-pass sources.
121pub trait StreamingSource {
122    /// The error type.
123    type Error: std::error::Error + Send + Sync + 'static;
124
125    /// Gets the next set of data.
126    fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes, Self::Error>>> + Send;
127
128    /// An estimate of the payload size.
129    ///
130    /// Returns the expected size as a `[min, max)` range. The maximum may be
131    /// unknown. If the minimum is unknown use `0`.
132    ///
133    /// If the maximum size is known and sufficiently small, the client library
134    /// may be able to use a more efficient protocol for the upload.
135    fn size_hint(&self) -> impl Future<Output = Result<SizeHint, Self::Error>> + Send {
136        std::future::ready(Ok(SizeHint::new()))
137    }
138}
139
140/// Provides bytes from sources that support seek.
141///
142/// The data may be received asynchronously, such as reads from Google Cloud
143/// Storage, reads from other remote storage systems, or the result of
144/// repeatable computations.
145pub trait Seek {
146    /// The error type.
147    type Error: std::error::Error + Send + Sync + 'static;
148
149    /// Resets the stream to start from `offset`.
150    ///
151    /// The client library automatically restarts writes when the connection
152    /// is reset or there is some kind of partial failure. Resuming a write may
153    /// require resetting the stream to an arbitrary point.
154    ///
155    /// The client library assumes that `seek(N)` followed by `next()` always
156    /// returns the same data.
157    fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send;
158}
159
160const READ_SIZE: usize = 256 * 1024;
161
162impl From<tokio::fs::File> for Payload<FileSource> {
163    fn from(value: tokio::fs::File) -> Self {
164        Self {
165            payload: FileSource::new(value),
166        }
167    }
168}
169
170/// Implements [StreamingSource] for a [tokio::fs::File].
171///
172/// # Example
173/// ```
174/// # use google_cloud_storage::client::Storage;
175/// # async fn sample(client: &Storage) -> anyhow::Result<()> {
176/// let payload = tokio::fs::File::open("my-data").await?;
177/// let response = client
178///     .write_object("projects/_/buckets/my-bucket", "my-object", payload)
179///     .send_unbuffered()
180///     .await?;
181/// println!("response details={response:?}");
182/// # Ok(()) }
183/// ```
184pub struct FileSource {
185    inner: tokio::fs::File,
186}
187
188impl FileSource {
189    fn new(inner: tokio::fs::File) -> Self {
190        Self { inner }
191    }
192}
193
194impl StreamingSource for FileSource {
195    type Error = std::io::Error;
196
197    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
198        let mut buffer = vec![0_u8; READ_SIZE];
199        match tokio::io::AsyncReadExt::read(&mut self.inner, &mut buffer).await {
200            Err(e) => Some(Err(e)),
201            Ok(0) => None,
202            Ok(n) => {
203                buffer.resize(n, 0_u8);
204                Some(Ok(bytes::Bytes::from_owner(buffer)))
205            }
206        }
207    }
208    async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
209        let m = self.inner.metadata().await?;
210        Ok(SizeHint::with_exact(m.len()))
211    }
212}
213
214impl Seek for FileSource {
215    type Error = std::io::Error;
216
217    async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
218        use tokio::io::AsyncSeekExt;
219        let _ = self.inner.seek(std::io::SeekFrom::Start(offset)).await?;
220        Ok(())
221    }
222}
223
224/// Implements [StreamingSource] for [bytes::Bytes].
225///
226/// # Example
227/// ```
228/// # use google_cloud_storage::client::Storage;
229/// # async fn sample(client: &Storage) -> anyhow::Result<()> {
230/// let payload = bytes::Bytes::from_static(b"Hello World!");
231/// let response = client
232///     .write_object("projects/_/buckets/my-bucket", "my-object", payload)
233///     .send_unbuffered()
234///     .await?;
235/// println!("response details={response:?}");
236/// # Ok(()) }
237/// ```
238pub struct BytesSource {
239    contents: bytes::Bytes,
240    current: Option<bytes::Bytes>,
241}
242
243impl BytesSource {
244    pub(crate) fn new(contents: bytes::Bytes) -> Self {
245        let current = Some(contents.clone());
246        Self { contents, current }
247    }
248}
249
250impl StreamingSource for BytesSource {
251    type Error = crate::Error;
252
253    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
254        self.current.take().map(Result::Ok)
255    }
256
257    async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
258        let s = self.contents.len() as u64;
259        Ok(SizeHint::with_exact(s))
260    }
261}
262
263impl Seek for BytesSource {
264    type Error = crate::Error;
265
266    async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
267        let pos = std::cmp::min(offset as usize, self.contents.len());
268        self.current = Some(self.contents.slice(pos..));
269        Ok(())
270    }
271}
272
273/// Implements [StreamingSource] for a sequence of [bytes::Bytes].
274pub(crate) struct IterSource {
275    contents: Vec<bytes::Bytes>,
276    current: VecDeque<bytes::Bytes>,
277}
278
279impl IterSource {
280    pub(crate) fn new<I>(iterator: I) -> Self
281    where
282        I: IntoIterator<Item = bytes::Bytes>,
283    {
284        let contents: Vec<bytes::Bytes> = iterator.into_iter().collect();
285        let current: VecDeque<bytes::Bytes> = contents.iter().cloned().collect();
286        Self { contents, current }
287    }
288}
289
290impl StreamingSource for IterSource {
291    type Error = std::io::Error;
292
293    async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
294        self.current.pop_front().map(Ok)
295    }
296
297    async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
298        let s = self.contents.iter().fold(0_u64, |a, i| a + i.len() as u64);
299        Ok(SizeHint::with_exact(s))
300    }
301}
302
303impl Seek for IterSource {
304    type Error = std::io::Error;
305    async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
306        let mut current = VecDeque::new();
307        let mut offset = offset as usize;
308        for b in self.contents.iter() {
309            offset = match (offset, b.len()) {
310                (0, _) => {
311                    current.push_back(b.clone());
312                    0
313                }
314                (o, n) if o >= n => o - n,
315                (o, n) => {
316                    current.push_back(b.clone().split_off(n - o));
317                    0
318                }
319            }
320        }
321        self.current = current;
322        Ok(())
323    }
324}
325
326#[cfg(test)]
327pub(crate) mod tests {
328    use super::*;
329    use std::io::Write;
330    use tempfile::NamedTempFile;
331
332    type Result = anyhow::Result<()>;
333
334    const CONTENTS: &[u8] = b"how vexingly quick daft zebras jump";
335
336    pub(crate) struct UnknownSize {
337        inner: BytesSource,
338    }
339    impl UnknownSize {
340        pub fn new(inner: BytesSource) -> Self {
341            Self { inner }
342        }
343    }
344    impl Seek for UnknownSize {
345        type Error = <BytesSource as Seek>::Error;
346        async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
347            self.inner.seek(offset).await
348        }
349    }
350    impl StreamingSource for UnknownSize {
351        type Error = <BytesSource as StreamingSource>::Error;
352        async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
353            self.inner.next().await
354        }
355        async fn size_hint(&self) -> std::result::Result<SizeHint, Self::Error> {
356            let inner = self.inner.size_hint().await?;
357            let mut hint = SizeHint::default();
358            hint.set_lower(inner.lower());
359            Ok(hint)
360        }
361    }
362
363    mockall::mock! {
364        pub(crate) SimpleSource {}
365
366        impl StreamingSource for SimpleSource {
367            type Error = std::io::Error;
368            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
369            async fn size_hint(&self) -> std::result::Result<SizeHint, std::io::Error>;
370        }
371    }
372
373    mockall::mock! {
374        pub(crate) SeekSource {}
375
376        impl StreamingSource for SeekSource {
377            type Error = std::io::Error;
378            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
379            async fn size_hint(&self) -> std::result::Result<SizeHint, std::io::Error>;
380        }
381        impl Seek for SeekSource {
382            type Error = std::io::Error;
383            async fn seek(&mut self, offset: u64) ->std::result::Result<(), std::io::Error>;
384        }
385    }
386
387    /// A helper function to simplify the tests.
388    async fn collect<S>(mut source: S) -> anyhow::Result<Vec<u8>>
389    where
390        S: StreamingSource,
391    {
392        collect_mut(&mut source).await
393    }
394
395    /// A helper function to simplify the tests.
396    async fn collect_mut<S>(source: &mut S) -> anyhow::Result<Vec<u8>>
397    where
398        S: StreamingSource,
399    {
400        let mut vec = Vec::new();
401        while let Some(bytes) = source.next().await.transpose()? {
402            vec.extend_from_slice(&bytes);
403        }
404        Ok(vec)
405    }
406
407    #[tokio::test]
408    async fn empty_bytes() -> Result {
409        let buffer = Payload::from(bytes::Bytes::default());
410        let range = buffer.size_hint().await?;
411        assert_eq!(range.exact(), Some(0));
412        let got = collect(buffer).await?;
413        assert!(got.is_empty(), "{got:?}");
414
415        Ok(())
416    }
417
418    #[tokio::test]
419    async fn simple_bytes() -> Result {
420        let buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
421        let range = buffer.size_hint().await?;
422        assert_eq!(range.exact(), Some(CONTENTS.len() as u64));
423        let got = collect(buffer).await?;
424        assert_eq!(got[..], CONTENTS[..], "{got:?}");
425        Ok(())
426    }
427
428    #[tokio::test]
429    async fn simple_str() -> Result {
430        const LAZY: &str = "the quick brown fox jumps over the lazy dog";
431        let buffer = Payload::from(LAZY);
432        let range = buffer.size_hint().await?;
433        assert_eq!(range.exact(), Some(LAZY.len() as u64));
434        let got = collect(buffer).await?;
435        assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
436        Ok(())
437    }
438
439    #[tokio::test]
440    async fn simple_string() -> Result {
441        const LAZY: &str = "the quick brown fox jumps over the lazy dog";
442        let buffer = Payload::from(String::from(LAZY));
443        let range = buffer.size_hint().await?;
444        assert_eq!(range.exact(), Some(LAZY.len() as u64));
445        let got = collect(buffer).await?;
446        assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
447        Ok(())
448    }
449
450    #[tokio::test]
451    async fn seek_bytes() -> Result {
452        let mut buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
453        buffer.seek(8).await?;
454        let got = collect(buffer).await?;
455        assert_eq!(got[..], CONTENTS[8..], "{got:?}");
456        Ok(())
457    }
458
459    #[tokio::test]
460    async fn empty_stream() -> Result {
461        let source = IterSource::new(vec![]);
462        let payload = Payload::from(source);
463        let range = payload.size_hint().await?;
464        assert_eq!(range.exact(), Some(0));
465        let got = collect(payload).await?;
466        assert!(got.is_empty(), "{got:?}");
467
468        Ok(())
469    }
470
471    #[tokio::test]
472    async fn simple_stream() -> Result {
473        let source = IterSource::new(
474            ["how ", "vexingly ", "quick ", "daft ", "zebras ", "jump"]
475                .map(|v| bytes::Bytes::from_static(v.as_bytes())),
476        );
477        let payload = Payload::from_stream(source);
478        let got = collect(payload).await?;
479        assert_eq!(got[..], CONTENTS[..]);
480
481        Ok(())
482    }
483
484    #[tokio::test]
485    async fn empty_file() -> Result {
486        let file = NamedTempFile::new()?;
487        let read = tokio::fs::File::from(file.reopen()?);
488        let payload = Payload::from(read);
489        let hint = payload.size_hint().await?;
490        assert_eq!(hint.exact(), Some(0));
491        let got = collect(payload).await?;
492        assert!(got.is_empty(), "{got:?}");
493        Ok(())
494    }
495
496    #[tokio::test]
497    async fn small_file() -> Result {
498        let mut file = NamedTempFile::new()?;
499        assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
500        file.flush()?;
501        let read = tokio::fs::File::from(file.reopen()?);
502        let payload = Payload::from(read);
503        let hint = payload.size_hint().await?;
504        let s = CONTENTS.len() as u64;
505        assert_eq!(hint.exact(), Some(s));
506        let got = collect(payload).await?;
507        assert_eq!(got[..], CONTENTS[..], "{got:?}");
508        Ok(())
509    }
510
511    #[tokio::test]
512    async fn small_file_seek() -> Result {
513        let mut file = NamedTempFile::new()?;
514        assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
515        file.flush()?;
516        let read = tokio::fs::File::from(file.reopen()?);
517        let mut payload = Payload::from(read);
518        payload.seek(8).await?;
519        let got = collect(payload).await?;
520        assert_eq!(got[..], CONTENTS[8..], "{got:?}");
521        Ok(())
522    }
523
524    #[tokio::test]
525    async fn larger_file() -> Result {
526        let mut file = NamedTempFile::new()?;
527        assert_eq!(file.write(&[0_u8; READ_SIZE])?, READ_SIZE);
528        assert_eq!(file.write(&[1_u8; READ_SIZE])?, READ_SIZE);
529        assert_eq!(file.write(&[2_u8; READ_SIZE])?, READ_SIZE);
530        assert_eq!(file.write(&[3_u8; READ_SIZE])?, READ_SIZE);
531        file.flush()?;
532        assert_eq!(READ_SIZE % 2, 0);
533        let read = tokio::fs::File::from(file.reopen()?);
534        let mut payload = Payload::from(read);
535        payload.seek((READ_SIZE + READ_SIZE / 2) as u64).await?;
536        let got = collect(payload).await?;
537        let mut want = Vec::new();
538        want.extend_from_slice(&[1_u8; READ_SIZE / 2]);
539        want.extend_from_slice(&[2_u8; READ_SIZE]);
540        want.extend_from_slice(&[3_u8; READ_SIZE]);
541        assert_eq!(got[..], want[..], "{got:?}");
542        Ok(())
543    }
544
545    #[tokio::test]
546    async fn iter_source_full() -> Result {
547        const N: usize = 32;
548        let mut buf = Vec::new();
549        buf.extend_from_slice(&[1_u8; N]);
550        buf.extend_from_slice(&[2_u8; N]);
551        buf.extend_from_slice(&[3_u8; N]);
552        let b = bytes::Bytes::from_owner(buf);
553
554        let mut stream =
555            IterSource::new(vec![b.slice(0..N), b.slice(N..(2 * N)), b.slice((2 * N)..)]);
556        assert_eq!(stream.size_hint().await?.exact(), Some(3 * N as u64));
557
558        // test_case() is not appropriate here: we want to verify seek() works
559        // multiple times over the *same* stream.
560        for offset in [0, N / 2, 0, N, 0, 2 * N + N / 2] {
561            stream.seek(offset as u64).await?;
562            let got = collect_mut(&mut stream).await?;
563            assert_eq!(got[..], b[offset..(3 * N)]);
564        }
565
566        Ok(())
567    }
568}