google_cloud_storage/storage/
upload_source.rs1use std::collections::VecDeque;
18
19pub struct Payload<T> {
41 payload: T,
42}
43
44impl<T> Payload<T>
45where
46 T: StreamingSource,
47{
48 pub fn from_stream(payload: T) -> Self {
49 Self { payload }
50 }
51}
52
53impl<T> StreamingSource for Payload<T>
54where
55 T: StreamingSource + Send + Sync,
56{
57 type Error = T::Error;
58
59 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
60 self.payload.next().await
61 }
62
63 async fn size_hint(&self) -> Result<(u64, Option<u64>), Self::Error> {
64 self.payload.size_hint().await
65 }
66}
67
68impl<T> Seek for Payload<T>
69where
70 T: Seek,
71{
72 type Error = T::Error;
73
74 fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send {
75 self.payload.seek(offset)
76 }
77}
78
79impl From<bytes::Bytes> for Payload<BytesSource> {
80 fn from(value: bytes::Bytes) -> Self {
81 let payload = BytesSource::new(value);
82 Self { payload }
83 }
84}
85
86impl From<&'static str> for Payload<BytesSource> {
87 fn from(value: &'static str) -> Self {
88 let b = bytes::Bytes::from_static(value.as_bytes());
89 Payload::from(b)
90 }
91}
92
93impl From<Vec<bytes::Bytes>> for Payload<IterSource> {
94 fn from(value: Vec<bytes::Bytes>) -> Self {
95 let payload = IterSource::new(value);
96 Self { payload }
97 }
98}
99
100impl<S> From<S> for Payload<S>
101where
102 S: StreamingSource,
103{
104 fn from(value: S) -> Self {
105 Self { payload: value }
106 }
107}
108
109pub trait StreamingSource {
111 type Error: std::error::Error + Send + Sync + 'static;
113
114 fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes, Self::Error>>> + Send;
116
117 fn size_hint(&self) -> impl Future<Output = Result<(u64, Option<u64>), Self::Error>> + Send {
125 std::future::ready(Ok((0_u64, None)))
126 }
127}
128
129pub trait Seek {
136 type Error: std::error::Error + Send + Sync + 'static;
138
139 fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send;
148}
149
150const READ_SIZE: usize = 256 * 1024;
151
152impl From<tokio::fs::File> for Payload<FileSource> {
153 fn from(value: tokio::fs::File) -> Self {
154 Self {
155 payload: FileSource::new(value),
156 }
157 }
158}
159
160pub struct FileSource {
175 inner: tokio::fs::File,
176}
177
178impl FileSource {
179 fn new(inner: tokio::fs::File) -> Self {
180 Self { inner }
181 }
182}
183
184impl StreamingSource for FileSource {
185 type Error = std::io::Error;
186
187 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
188 let mut buffer = vec![0_u8; READ_SIZE];
189 match tokio::io::AsyncReadExt::read(&mut self.inner, &mut buffer).await {
190 Err(e) => Some(Err(e)),
191 Ok(0) => None,
192 Ok(n) => {
193 buffer.resize(n, 0_u8);
194 Some(Ok(bytes::Bytes::from_owner(buffer)))
195 }
196 }
197 }
198 async fn size_hint(&self) -> Result<(u64, Option<u64>), Self::Error> {
199 let m = self.inner.metadata().await?;
200 Ok((m.len(), Some(m.len())))
201 }
202}
203
204impl Seek for FileSource {
205 type Error = std::io::Error;
206
207 async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
208 use tokio::io::AsyncSeekExt;
209 let _ = self.inner.seek(std::io::SeekFrom::Start(offset)).await?;
210 Ok(())
211 }
212}
213
214pub struct BytesSource {
229 contents: bytes::Bytes,
230 current: Option<bytes::Bytes>,
231}
232
233impl BytesSource {
234 pub(crate) fn new(contents: bytes::Bytes) -> Self {
235 let current = Some(contents.clone());
236 Self { contents, current }
237 }
238}
239
240impl StreamingSource for BytesSource {
241 type Error = crate::Error;
242
243 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
244 self.current.take().map(Result::Ok)
245 }
246
247 async fn size_hint(&self) -> Result<(u64, Option<u64>), Self::Error> {
248 let s = self.contents.len() as u64;
249 Ok((s, Some(s)))
250 }
251}
252
253impl Seek for BytesSource {
254 type Error = crate::Error;
255
256 async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
257 let pos = std::cmp::min(offset as usize, self.contents.len());
258 self.current = Some(self.contents.slice(pos..));
259 Ok(())
260 }
261}
262
263pub(crate) struct IterSource {
265 contents: Vec<bytes::Bytes>,
266 current: VecDeque<bytes::Bytes>,
267}
268
269impl IterSource {
270 pub(crate) fn new<I>(iterator: I) -> Self
271 where
272 I: IntoIterator<Item = bytes::Bytes>,
273 {
274 let contents: Vec<bytes::Bytes> = iterator.into_iter().collect();
275 let current: VecDeque<bytes::Bytes> = contents.iter().cloned().collect();
276 Self { contents, current }
277 }
278}
279
280impl StreamingSource for IterSource {
281 type Error = std::io::Error;
282
283 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
284 self.current.pop_front().map(Ok)
285 }
286
287 async fn size_hint(&self) -> Result<(u64, Option<u64>), Self::Error> {
288 let s = self.contents.iter().fold(0_u64, |a, i| a + i.len() as u64);
289 Ok((s, Some(s)))
290 }
291}
292
293impl Seek for IterSource {
294 type Error = std::io::Error;
295 async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
296 let mut current = VecDeque::new();
297 let mut offset = offset as usize;
298 for b in self.contents.iter() {
299 offset = match (offset, b.len()) {
300 (0, _) => {
301 current.push_back(b.clone());
302 0
303 }
304 (o, n) if o >= n => o - n,
305 (o, n) => {
306 current.push_back(b.clone().split_off(n - o));
307 0
308 }
309 }
310 }
311 self.current = current;
312 Ok(())
313 }
314}
315
316#[cfg(test)]
317pub mod tests {
318 use super::*;
319 use std::io::Write;
320 use tempfile::NamedTempFile;
321
322 type Result = anyhow::Result<()>;
323
324 const CONTENTS: &[u8] = b"how vexingly quick daft zebras jump";
325
326 pub(crate) struct UnknownSize {
327 inner: BytesSource,
328 }
329 impl UnknownSize {
330 pub fn new(inner: BytesSource) -> Self {
331 Self { inner }
332 }
333 }
334 impl Seek for UnknownSize {
335 type Error = <BytesSource as Seek>::Error;
336 async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
337 self.inner.seek(offset).await
338 }
339 }
340 impl StreamingSource for UnknownSize {
341 type Error = <BytesSource as StreamingSource>::Error;
342 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
343 self.inner.next().await
344 }
345 async fn size_hint(&self) -> std::result::Result<(u64, Option<u64>), Self::Error> {
346 let hint = self.inner.size_hint().await?;
347 Ok((hint.0, None))
348 }
349 }
350
351 mockall::mock! {
352 pub(crate) SimpleSource {}
353
354 impl StreamingSource for SimpleSource {
355 type Error = std::io::Error;
356 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
357 async fn size_hint(&self) -> std::result::Result<(u64, Option<u64>), std::io::Error>;
358 }
359 }
360
361 mockall::mock! {
362 pub(crate) SeekSource {}
363
364 impl StreamingSource for SeekSource {
365 type Error = std::io::Error;
366 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
367 async fn size_hint(&self) -> std::result::Result<(u64, Option<u64>), std::io::Error>;
368 }
369 impl Seek for SeekSource {
370 type Error = std::io::Error;
371 async fn seek(&mut self, offset: u64) ->std::result::Result<(), std::io::Error>;
372 }
373 }
374
375 async fn collect<S>(mut source: S) -> anyhow::Result<Vec<u8>>
377 where
378 S: StreamingSource,
379 {
380 collect_mut(&mut source).await
381 }
382
383 async fn collect_mut<S>(source: &mut S) -> anyhow::Result<Vec<u8>>
385 where
386 S: StreamingSource,
387 {
388 let mut vec = Vec::new();
389 while let Some(bytes) = source.next().await.transpose()? {
390 vec.extend_from_slice(&bytes);
391 }
392 Ok(vec)
393 }
394
395 #[tokio::test]
396 async fn empty_bytes() -> Result {
397 let buffer = Payload::from(bytes::Bytes::default());
398 let range = buffer.size_hint().await?;
399 assert_eq!(range, (0, Some(0)));
400 let got = collect(buffer).await?;
401 assert!(got.is_empty(), "{got:?}");
402
403 Ok(())
404 }
405
406 #[tokio::test]
407 async fn simple_bytes() -> Result {
408 let buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
409 let range = buffer.size_hint().await?;
410 assert_eq!(range, (CONTENTS.len() as u64, Some(CONTENTS.len() as u64)));
411 let got = collect(buffer).await?;
412 assert_eq!(got[..], CONTENTS[..], "{got:?}");
413 Ok(())
414 }
415
416 #[tokio::test]
417 async fn simple_str() -> Result {
418 const LAZY: &str = "the quick brown fox jumps over the lazy dog";
419 let buffer = Payload::from(LAZY);
420 let range = buffer.size_hint().await?;
421 assert_eq!(range, (LAZY.len() as u64, Some(LAZY.len() as u64)));
422 let got = collect(buffer).await?;
423 assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
424 Ok(())
425 }
426
427 #[tokio::test]
428 async fn seek_bytes() -> Result {
429 let mut buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
430 buffer.seek(8).await?;
431 let got = collect(buffer).await?;
432 assert_eq!(got[..], CONTENTS[8..], "{got:?}");
433 Ok(())
434 }
435
436 #[tokio::test]
437 async fn empty_stream() -> Result {
438 let source = IterSource::new(vec![]);
439 let payload = Payload::from(source);
440 let range = payload.size_hint().await?;
441 assert_eq!(range, (0, Some(0)));
442 let got = collect(payload).await?;
443 assert!(got.is_empty(), "{got:?}");
444
445 Ok(())
446 }
447
448 #[tokio::test]
449 async fn simple_stream() -> Result {
450 let source = IterSource::new(
451 ["how ", "vexingly ", "quick ", "daft ", "zebras ", "jump"]
452 .map(|v| bytes::Bytes::from_static(v.as_bytes())),
453 );
454 let payload = Payload::from_stream(source);
455 let got = collect(payload).await?;
456 assert_eq!(got[..], CONTENTS[..]);
457
458 Ok(())
459 }
460
461 #[tokio::test]
462 async fn empty_file() -> Result {
463 let file = NamedTempFile::new()?;
464 let read = tokio::fs::File::from(file.reopen()?);
465 let payload = Payload::from(read);
466 let hint = payload.size_hint().await?;
467 assert_eq!(hint, (0_u64, Some(0_u64)));
468 let got = collect(payload).await?;
469 assert!(got.is_empty(), "{got:?}");
470 Ok(())
471 }
472
473 #[tokio::test]
474 async fn small_file() -> Result {
475 let mut file = NamedTempFile::new()?;
476 assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
477 file.flush()?;
478 let read = tokio::fs::File::from(file.reopen()?);
479 let payload = Payload::from(read);
480 let hint = payload.size_hint().await?;
481 let s = CONTENTS.len() as u64;
482 assert_eq!(hint, (s, Some(s)));
483 let got = collect(payload).await?;
484 assert_eq!(got[..], CONTENTS[..], "{got:?}");
485 Ok(())
486 }
487
488 #[tokio::test]
489 async fn small_file_seek() -> Result {
490 let mut file = NamedTempFile::new()?;
491 assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
492 file.flush()?;
493 let read = tokio::fs::File::from(file.reopen()?);
494 let mut payload = Payload::from(read);
495 payload.seek(8).await?;
496 let got = collect(payload).await?;
497 assert_eq!(got[..], CONTENTS[8..], "{got:?}");
498 Ok(())
499 }
500
501 #[tokio::test]
502 async fn larger_file() -> Result {
503 let mut file = NamedTempFile::new()?;
504 assert_eq!(file.write(&[0_u8; READ_SIZE])?, READ_SIZE);
505 assert_eq!(file.write(&[1_u8; READ_SIZE])?, READ_SIZE);
506 assert_eq!(file.write(&[2_u8; READ_SIZE])?, READ_SIZE);
507 assert_eq!(file.write(&[3_u8; READ_SIZE])?, READ_SIZE);
508 file.flush()?;
509 assert_eq!(READ_SIZE % 2, 0);
510 let read = tokio::fs::File::from(file.reopen()?);
511 let mut payload = Payload::from(read);
512 payload.seek((READ_SIZE + READ_SIZE / 2) as u64).await?;
513 let got = collect(payload).await?;
514 let mut want = Vec::new();
515 want.extend_from_slice(&[1_u8; READ_SIZE / 2]);
516 want.extend_from_slice(&[2_u8; READ_SIZE]);
517 want.extend_from_slice(&[3_u8; READ_SIZE]);
518 assert_eq!(got[..], want[..], "{got:?}");
519 Ok(())
520 }
521
522 #[tokio::test]
523 async fn iter_source_full() -> Result {
524 const N: usize = 32;
525 let mut buf = Vec::new();
526 buf.extend_from_slice(&[1_u8; N]);
527 buf.extend_from_slice(&[2_u8; N]);
528 buf.extend_from_slice(&[3_u8; N]);
529 let b = bytes::Bytes::from_owner(buf);
530
531 let mut stream =
532 IterSource::new(vec![b.slice(0..N), b.slice(N..(2 * N)), b.slice((2 * N)..)]);
533 assert_eq!(
534 stream.size_hint().await?,
535 (3 * N as u64, Some(3 * N as u64))
536 );
537
538 for offset in [0, N / 2, 0, N, 0, 2 * N + N / 2] {
541 stream.seek(offset as u64).await?;
542 let got = collect_mut(&mut stream).await?;
543 assert_eq!(got[..], b[offset..(3 * N)]);
544 }
545
546 Ok(())
547 }
548}