google_cloud_storage/storage/
upload_source.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines upload data sources.
16
17/// The payload for object uploads via the [Storage][crate::client::Storage]
18/// client.
19///
20/// The storage client functions to upload new objects consume any type that can
21/// be converted to this type. That includes simple buffers, and any type
22/// implementing [StreamingSource].
23///
24/// # Example
25/// ```
26/// # tokio_test::block_on(async {
27/// # use google_cloud_storage::upload_source::InsertPayload;
28/// use google_cloud_storage::upload_source::StreamingSource;
29/// let buffer : &[u8] = b"the quick brown fox jumps over the lazy dog";
30/// let mut size = 0_usize;
31/// let mut payload = InsertPayload::from(buffer);
32/// while let Some(bytes) = payload.next().await.transpose()? {
33///     size += bytes.len();
34/// }
35/// assert_eq!(size, buffer.len());
36/// # anyhow::Result::<()>::Ok(()) });
37/// ```
38pub struct InsertPayload<T> {
39    payload: T,
40}
41
42impl<T> StreamingSource for InsertPayload<T>
43where
44    T: StreamingSource,
45{
46    type Error = T::Error;
47
48    fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes, Self::Error>>> + Send {
49        self.payload.next()
50    }
51
52    fn size_hint(&self) -> (u64, Option<u64>) {
53        self.payload.size_hint()
54    }
55}
56
57impl<T> Seek for InsertPayload<T>
58where
59    T: Seek,
60{
61    type Error = T::Error;
62
63    fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send {
64        self.payload.seek(offset)
65    }
66}
67
68impl From<bytes::Bytes> for InsertPayload<BytesSource> {
69    fn from(value: bytes::Bytes) -> Self {
70        let payload = BytesSource::new(value);
71        Self { payload }
72    }
73}
74
75impl From<&'static str> for InsertPayload<BytesSource> {
76    fn from(value: &'static str) -> Self {
77        let b = bytes::Bytes::from_static(value.as_bytes());
78        InsertPayload::from(b)
79    }
80}
81
82impl From<&'static [u8]> for InsertPayload<BytesSource> {
83    fn from(value: &'static [u8]) -> Self {
84        let b = bytes::Bytes::from_static(value);
85        InsertPayload::from(b)
86    }
87}
88
89impl<S> From<S> for InsertPayload<S>
90where
91    S: StreamingSource + Seek,
92{
93    fn from(value: S) -> Self {
94        Self { payload: value }
95    }
96}
97
98/// Provides bytes for an upload from single-pass sources.
99pub trait StreamingSource {
100    /// The error type.
101    type Error: std::error::Error + Send + Sync + 'static;
102
103    /// Gets the next set of data to upload.
104    fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes, Self::Error>>> + Send;
105
106    /// An estimate of the upload size.
107    ///
108    /// Returns the expected size as a [min, max) range. Where `None` represents
109    /// an unknown limit for the upload.
110    ///
111    /// If the upper limit is known and sufficiently small, the client library
112    /// may be able to use a more efficient protocol for the upload.
113    fn size_hint(&self) -> (u64, Option<u64>) {
114        (0_u64, None)
115    }
116}
117
118/// Provides bytes for an upload from sources that support seek.
119///
120/// Implementations of this trait provide data for Google Cloud Storage uploads.
121/// The data may be received asynchronously, such as downloads from Google Cloud
122/// Storage, other remote storage systems, or the result of repeatable
123/// computations.
124pub trait Seek {
125    /// The error type.
126    type Error: std::error::Error + Send + Sync + 'static;
127
128    /// Resets the stream to start from `offset`.
129    ///
130    /// The client library automatically restarts uploads when the connection
131    /// is reset or there is some kind of partial failure. Resuming an upload
132    /// may require resetting the stream to an arbitrary point.
133    ///
134    /// The client library assumes that `seek(N)` followed by `next()` always
135    /// returns the same data.
136    fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send;
137}
138
139const READ_SIZE: usize = 256 * 1024;
140
141impl<S> StreamingSource for S
142where
143    S: tokio::io::AsyncRead + Unpin + Send,
144{
145    type Error = std::io::Error;
146
147    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
148        let mut buffer = vec![0_u8; READ_SIZE];
149        match tokio::io::AsyncReadExt::read(self, &mut buffer).await {
150            Err(e) => Some(Err(e)),
151            Ok(0) => None,
152            Ok(n) => {
153                buffer.resize(n, 0_u8);
154                Some(Ok(bytes::Bytes::from_owner(buffer)))
155            }
156        }
157    }
158}
159
160impl<S> Seek for S
161where
162    S: tokio::io::AsyncSeek + Unpin + Send,
163{
164    type Error = std::io::Error;
165
166    async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
167        let _ = tokio::io::AsyncSeekExt::seek(self, std::io::SeekFrom::Start(offset)).await?;
168        Ok(())
169    }
170}
171
172/// Wrap a `bytes::Bytes` to support `StreamingSource`.
173pub struct BytesSource {
174    contents: bytes::Bytes,
175    current: Option<bytes::Bytes>,
176}
177
178impl BytesSource {
179    pub(crate) fn new(contents: bytes::Bytes) -> Self {
180        let current = Some(contents.clone());
181        Self { contents, current }
182    }
183}
184
185impl StreamingSource for BytesSource {
186    type Error = crate::Error;
187
188    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
189        self.current.take().map(Result::Ok)
190    }
191
192    fn size_hint(&self) -> (u64, Option<u64>) {
193        let s = self.contents.len() as u64;
194        (s, Some(s))
195    }
196}
197
198impl Seek for BytesSource {
199    type Error = crate::Error;
200
201    async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
202        let pos = std::cmp::min(offset as usize, self.contents.len());
203        self.current = Some(self.contents.slice(pos..));
204        Ok(())
205    }
206}
207
208#[cfg(test)]
209pub(crate) mod tests {
210    use super::*;
211    use std::{collections::VecDeque, io::Write};
212    use tempfile::NamedTempFile;
213
214    type Result = anyhow::Result<()>;
215
216    const CONTENTS: &[u8] = b"how vexingly quick daft zebras jump";
217
218    /// A helper function to simplify the tests.
219    async fn collect<S>(source: S) -> anyhow::Result<Vec<u8>>
220    where
221        S: StreamingSource,
222    {
223        let mut vec = Vec::new();
224        let mut source = source;
225        while let Some(bytes) = source.next().await.transpose()? {
226            vec.extend_from_slice(&bytes);
227        }
228        Ok(vec)
229    }
230
231    #[tokio::test]
232    async fn empty_bytes() -> Result {
233        let buffer = InsertPayload::from(bytes::Bytes::default());
234        let range = buffer.size_hint();
235        assert_eq!(range, (0, Some(0)));
236        let got = collect(buffer).await?;
237        assert!(got.is_empty(), "{got:?}");
238
239        Ok(())
240    }
241
242    #[tokio::test]
243    async fn simple_bytes() -> Result {
244        let buffer = InsertPayload::from(bytes::Bytes::from_static(CONTENTS));
245        let range = buffer.size_hint();
246        assert_eq!(range, (CONTENTS.len() as u64, Some(CONTENTS.len() as u64)));
247        let got = collect(buffer).await?;
248        assert_eq!(got[..], CONTENTS[..], "{got:?}");
249        Ok(())
250    }
251
252    #[tokio::test]
253    async fn simple_u8() -> Result {
254        let buffer = InsertPayload::from(CONTENTS);
255        let range = buffer.size_hint();
256        assert_eq!(range, (CONTENTS.len() as u64, Some(CONTENTS.len() as u64)));
257        let got = collect(buffer).await?;
258        assert_eq!(got[..], CONTENTS[..], "{got:?}");
259        Ok(())
260    }
261
262    #[tokio::test]
263    async fn simple_str() -> Result {
264        const LAZY: &str = "the quick brown fox jumps over the lazy dog";
265        let buffer = InsertPayload::from(LAZY);
266        let range = buffer.size_hint();
267        assert_eq!(range, (LAZY.len() as u64, Some(LAZY.len() as u64)));
268        let got = collect(buffer).await?;
269        assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
270        Ok(())
271    }
272
273    #[tokio::test]
274    async fn seek_bytes() -> Result {
275        let mut buffer = InsertPayload::from(bytes::Bytes::from_static(CONTENTS));
276        buffer.seek(8).await?;
277        let got = collect(buffer).await?;
278        assert_eq!(got[..], CONTENTS[8..], "{got:?}");
279        Ok(())
280    }
281
282    #[tokio::test]
283    async fn empty_stream() -> Result {
284        let source = VecStream::new(vec![]);
285        let payload = InsertPayload::from(source);
286        let range = payload.size_hint();
287        assert_eq!(range, (0, Some(0)));
288        let got = collect(payload).await?;
289        assert!(got.is_empty(), "{got:?}");
290
291        Ok(())
292    }
293
294    #[tokio::test]
295    async fn simple_stream() -> Result {
296        let source = VecStream::new(
297            ["how ", "vexingly ", "quick ", "daft ", "zebras ", "jump"]
298                .map(|v| bytes::Bytes::from_static(v.as_bytes()))
299                .to_vec(),
300        );
301        let payload = InsertPayload::from(source);
302        let got = collect(payload).await?;
303        assert_eq!(got[..], CONTENTS[..]);
304
305        Ok(())
306    }
307
308    #[tokio::test]
309    async fn empty_file() -> Result {
310        let file = NamedTempFile::new()?;
311        let read = file.reopen()?;
312        let got = collect(tokio::fs::File::from(read)).await?;
313        assert!(got.is_empty(), "{got:?}");
314        Ok(())
315    }
316
317    #[tokio::test]
318    async fn small_file() -> Result {
319        let mut file = NamedTempFile::new()?;
320        assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
321        file.flush()?;
322        let read = file.reopen()?;
323        let got = collect(tokio::fs::File::from(read)).await?;
324        assert_eq!(got[..], CONTENTS[..], "{got:?}");
325        Ok(())
326    }
327
328    #[tokio::test]
329    async fn small_file_seek() -> Result {
330        let mut file = NamedTempFile::new()?;
331        assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
332        file.flush()?;
333        let mut read = tokio::fs::File::from(file.reopen()?);
334        read.seek(8).await?;
335        let got = collect(read).await?;
336        assert_eq!(got[..], CONTENTS[8..], "{got:?}");
337        Ok(())
338    }
339
340    #[tokio::test]
341    async fn larger_file() -> Result {
342        let mut file = NamedTempFile::new()?;
343        assert_eq!(file.write(&[0_u8; READ_SIZE])?, READ_SIZE);
344        assert_eq!(file.write(&[1_u8; READ_SIZE])?, READ_SIZE);
345        assert_eq!(file.write(&[2_u8; READ_SIZE])?, READ_SIZE);
346        assert_eq!(file.write(&[3_u8; READ_SIZE])?, READ_SIZE);
347        file.flush()?;
348        assert_eq!(READ_SIZE % 2, 0);
349        let mut read = tokio::fs::File::from(file.reopen()?);
350        read.seek((READ_SIZE + READ_SIZE / 2) as u64).await?;
351        let got = collect(read).await?;
352        let mut want = Vec::new();
353        want.extend_from_slice(&[1_u8; READ_SIZE / 2]);
354        want.extend_from_slice(&[2_u8; READ_SIZE]);
355        want.extend_from_slice(&[3_u8; READ_SIZE]);
356        assert_eq!(got[..], want[..], "{got:?}");
357        Ok(())
358    }
359
360    pub struct VecStream {
361        contents: Vec<bytes::Bytes>,
362        current: VecDeque<std::io::Result<bytes::Bytes>>,
363    }
364
365    impl VecStream {
366        pub fn new(contents: Vec<bytes::Bytes>) -> Self {
367            let current: VecDeque<std::io::Result<_>> =
368                contents.iter().map(|x| Ok(x.clone())).collect();
369            Self { contents, current }
370        }
371    }
372
373    impl StreamingSource for VecStream {
374        type Error = std::io::Error;
375
376        async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
377            self.current.pop_front()
378        }
379
380        fn size_hint(&self) -> (u64, Option<u64>) {
381            let s = self.contents.iter().fold(0_u64, |a, i| a + i.len() as u64);
382            (s, Some(s))
383        }
384    }
385
386    impl Seek for VecStream {
387        type Error = std::io::Error;
388
389        async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
390            let mut current: VecDeque<std::io::Result<_>> =
391                self.contents.iter().map(|x| Ok(x.clone())).collect();
392            let mut offset = offset as usize;
393            while offset > 0 {
394                match current.pop_front() {
395                    None => break,
396                    Some(Err(e)) => {
397                        current.push_front(Err(e));
398                        break;
399                    }
400                    Some(Ok(mut b)) if b.len() > offset => {
401                        current.push_front(Ok(b.split_off(offset)));
402                        break;
403                    }
404                    Some(Ok(b)) if b.len() == offset => {
405                        break;
406                    }
407                    Some(Ok(b)) => {
408                        offset -= b.len();
409                    }
410                };
411            }
412            self.current = current;
413            Ok(())
414        }
415    }
416}