google_cloud_storage/storage/
streaming_source.rs1use std::collections::VecDeque;
18
19pub type SizeHint = http_body::SizeHint;
21
22pub struct Payload<T> {
44 payload: T,
45}
46
47impl<T> Payload<T>
48where
49 T: StreamingSource,
50{
51 pub fn from_stream(payload: T) -> Self {
52 Self { payload }
53 }
54}
55
56impl<T> StreamingSource for Payload<T>
57where
58 T: StreamingSource + Send + Sync,
59{
60 type Error = T::Error;
61
62 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
63 self.payload.next().await
64 }
65
66 async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
67 self.payload.size_hint().await
68 }
69}
70
71impl<T> Seek for Payload<T>
72where
73 T: Seek,
74{
75 type Error = T::Error;
76
77 fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send {
78 self.payload.seek(offset)
79 }
80}
81
82impl From<bytes::Bytes> for Payload<BytesSource> {
83 fn from(value: bytes::Bytes) -> Self {
84 let payload = BytesSource::new(value);
85 Self { payload }
86 }
87}
88
89impl From<&'static str> for Payload<BytesSource> {
90 fn from(value: &'static str) -> Self {
91 let b = bytes::Bytes::from_static(value.as_bytes());
92 Payload::from(b)
93 }
94}
95
96impl From<String> for Payload<BytesSource> {
97 fn from(value: String) -> Self {
98 let payload = BytesSource::new(bytes::Bytes::from(value));
99 Self { payload }
100 }
101}
102
103impl From<Vec<bytes::Bytes>> for Payload<IterSource> {
104 fn from(value: Vec<bytes::Bytes>) -> Self {
105 let payload = IterSource::new(value);
106 Self { payload }
107 }
108}
109
110impl<S> From<S> for Payload<S>
111where
112 S: StreamingSource,
113{
114 fn from(value: S) -> Self {
115 Self { payload: value }
116 }
117}
118
119pub trait StreamingSource {
121 type Error: std::error::Error + Send + Sync + 'static;
123
124 fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes, Self::Error>>> + Send;
126
127 fn size_hint(&self) -> impl Future<Output = Result<SizeHint, Self::Error>> + Send {
135 std::future::ready(Ok(SizeHint::new()))
136 }
137}
138
139pub trait Seek {
145 type Error: std::error::Error + Send + Sync + 'static;
147
148 fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send;
157}
158
159const READ_SIZE: usize = 256 * 1024;
160
161impl From<tokio::fs::File> for Payload<FileSource> {
162 fn from(value: tokio::fs::File) -> Self {
163 Self {
164 payload: FileSource::new(value),
165 }
166 }
167}
168
169pub struct FileSource {
184 inner: tokio::fs::File,
185}
186
187impl FileSource {
188 fn new(inner: tokio::fs::File) -> Self {
189 Self { inner }
190 }
191}
192
193impl StreamingSource for FileSource {
194 type Error = std::io::Error;
195
196 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
197 let mut buffer = vec![0_u8; READ_SIZE];
198 match tokio::io::AsyncReadExt::read(&mut self.inner, &mut buffer).await {
199 Err(e) => Some(Err(e)),
200 Ok(0) => None,
201 Ok(n) => {
202 buffer.resize(n, 0_u8);
203 Some(Ok(bytes::Bytes::from_owner(buffer)))
204 }
205 }
206 }
207 async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
208 let m = self.inner.metadata().await?;
209 Ok(SizeHint::with_exact(m.len()))
210 }
211}
212
213impl Seek for FileSource {
214 type Error = std::io::Error;
215
216 async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
217 use tokio::io::AsyncSeekExt;
218 let _ = self.inner.seek(std::io::SeekFrom::Start(offset)).await?;
219 Ok(())
220 }
221}
222
223pub struct BytesSource {
238 contents: bytes::Bytes,
239 current: Option<bytes::Bytes>,
240}
241
242impl BytesSource {
243 pub(crate) fn new(contents: bytes::Bytes) -> Self {
244 let current = Some(contents.clone());
245 Self { contents, current }
246 }
247}
248
249impl StreamingSource for BytesSource {
250 type Error = crate::Error;
251
252 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
253 self.current.take().map(Result::Ok)
254 }
255
256 async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
257 let s = self.contents.len() as u64;
258 Ok(SizeHint::with_exact(s))
259 }
260}
261
262impl Seek for BytesSource {
263 type Error = crate::Error;
264
265 async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
266 let pos = std::cmp::min(offset as usize, self.contents.len());
267 self.current = Some(self.contents.slice(pos..));
268 Ok(())
269 }
270}
271
272pub(crate) struct IterSource {
274 contents: Vec<bytes::Bytes>,
275 current: VecDeque<bytes::Bytes>,
276}
277
278impl IterSource {
279 pub(crate) fn new<I>(iterator: I) -> Self
280 where
281 I: IntoIterator<Item = bytes::Bytes>,
282 {
283 let contents: Vec<bytes::Bytes> = iterator.into_iter().collect();
284 let current: VecDeque<bytes::Bytes> = contents.iter().cloned().collect();
285 Self { contents, current }
286 }
287}
288
289impl StreamingSource for IterSource {
290 type Error = std::io::Error;
291
292 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
293 self.current.pop_front().map(Ok)
294 }
295
296 async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
297 let s = self.contents.iter().fold(0_u64, |a, i| a + i.len() as u64);
298 Ok(SizeHint::with_exact(s))
299 }
300}
301
302impl Seek for IterSource {
303 type Error = std::io::Error;
304 async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
305 let mut current = VecDeque::new();
306 let mut offset = offset as usize;
307 for b in self.contents.iter() {
308 offset = match (offset, b.len()) {
309 (0, _) => {
310 current.push_back(b.clone());
311 0
312 }
313 (o, n) if o >= n => o - n,
314 (o, n) => {
315 current.push_back(b.clone().split_off(n - o));
316 0
317 }
318 }
319 }
320 self.current = current;
321 Ok(())
322 }
323}
324
325#[cfg(test)]
326pub mod tests {
327 use super::*;
328 use std::io::Write;
329 use tempfile::NamedTempFile;
330
331 type Result = anyhow::Result<()>;
332
333 const CONTENTS: &[u8] = b"how vexingly quick daft zebras jump";
334
335 pub(crate) struct UnknownSize {
336 inner: BytesSource,
337 }
338 impl UnknownSize {
339 pub fn new(inner: BytesSource) -> Self {
340 Self { inner }
341 }
342 }
343 impl Seek for UnknownSize {
344 type Error = <BytesSource as Seek>::Error;
345 async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
346 self.inner.seek(offset).await
347 }
348 }
349 impl StreamingSource for UnknownSize {
350 type Error = <BytesSource as StreamingSource>::Error;
351 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
352 self.inner.next().await
353 }
354 async fn size_hint(&self) -> std::result::Result<SizeHint, Self::Error> {
355 let inner = self.inner.size_hint().await?;
356 let mut hint = SizeHint::default();
357 hint.set_lower(inner.lower());
358 Ok(hint)
359 }
360 }
361
362 mockall::mock! {
363 pub(crate) SimpleSource {}
364
365 impl StreamingSource for SimpleSource {
366 type Error = std::io::Error;
367 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
368 async fn size_hint(&self) -> std::result::Result<SizeHint, std::io::Error>;
369 }
370 }
371
372 mockall::mock! {
373 pub(crate) SeekSource {}
374
375 impl StreamingSource for SeekSource {
376 type Error = std::io::Error;
377 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
378 async fn size_hint(&self) -> std::result::Result<SizeHint, std::io::Error>;
379 }
380 impl Seek for SeekSource {
381 type Error = std::io::Error;
382 async fn seek(&mut self, offset: u64) ->std::result::Result<(), std::io::Error>;
383 }
384 }
385
386 async fn collect<S>(mut source: S) -> anyhow::Result<Vec<u8>>
388 where
389 S: StreamingSource,
390 {
391 collect_mut(&mut source).await
392 }
393
394 async fn collect_mut<S>(source: &mut S) -> anyhow::Result<Vec<u8>>
396 where
397 S: StreamingSource,
398 {
399 let mut vec = Vec::new();
400 while let Some(bytes) = source.next().await.transpose()? {
401 vec.extend_from_slice(&bytes);
402 }
403 Ok(vec)
404 }
405
406 #[tokio::test]
407 async fn empty_bytes() -> Result {
408 let buffer = Payload::from(bytes::Bytes::default());
409 let range = buffer.size_hint().await?;
410 assert_eq!(range.exact(), Some(0));
411 let got = collect(buffer).await?;
412 assert!(got.is_empty(), "{got:?}");
413
414 Ok(())
415 }
416
417 #[tokio::test]
418 async fn simple_bytes() -> Result {
419 let buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
420 let range = buffer.size_hint().await?;
421 assert_eq!(range.exact(), Some(CONTENTS.len() as u64));
422 let got = collect(buffer).await?;
423 assert_eq!(got[..], CONTENTS[..], "{got:?}");
424 Ok(())
425 }
426
427 #[tokio::test]
428 async fn simple_str() -> Result {
429 const LAZY: &str = "the quick brown fox jumps over the lazy dog";
430 let buffer = Payload::from(LAZY);
431 let range = buffer.size_hint().await?;
432 assert_eq!(range.exact(), Some(LAZY.len() as u64));
433 let got = collect(buffer).await?;
434 assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
435 Ok(())
436 }
437
438 #[tokio::test]
439 async fn simple_string() -> Result {
440 const LAZY: &str = "the quick brown fox jumps over the lazy dog";
441 let buffer = Payload::from(String::from(LAZY));
442 let range = buffer.size_hint().await?;
443 assert_eq!(range.exact(), Some(LAZY.len() as u64));
444 let got = collect(buffer).await?;
445 assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
446 Ok(())
447 }
448
449 #[tokio::test]
450 async fn seek_bytes() -> Result {
451 let mut buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
452 buffer.seek(8).await?;
453 let got = collect(buffer).await?;
454 assert_eq!(got[..], CONTENTS[8..], "{got:?}");
455 Ok(())
456 }
457
458 #[tokio::test]
459 async fn empty_stream() -> Result {
460 let source = IterSource::new(vec![]);
461 let payload = Payload::from(source);
462 let range = payload.size_hint().await?;
463 assert_eq!(range.exact(), Some(0));
464 let got = collect(payload).await?;
465 assert!(got.is_empty(), "{got:?}");
466
467 Ok(())
468 }
469
470 #[tokio::test]
471 async fn simple_stream() -> Result {
472 let source = IterSource::new(
473 ["how ", "vexingly ", "quick ", "daft ", "zebras ", "jump"]
474 .map(|v| bytes::Bytes::from_static(v.as_bytes())),
475 );
476 let payload = Payload::from_stream(source);
477 let got = collect(payload).await?;
478 assert_eq!(got[..], CONTENTS[..]);
479
480 Ok(())
481 }
482
483 #[tokio::test]
484 async fn empty_file() -> Result {
485 let file = NamedTempFile::new()?;
486 let read = tokio::fs::File::from(file.reopen()?);
487 let payload = Payload::from(read);
488 let hint = payload.size_hint().await?;
489 assert_eq!(hint.exact(), Some(0));
490 let got = collect(payload).await?;
491 assert!(got.is_empty(), "{got:?}");
492 Ok(())
493 }
494
495 #[tokio::test]
496 async fn small_file() -> Result {
497 let mut file = NamedTempFile::new()?;
498 assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
499 file.flush()?;
500 let read = tokio::fs::File::from(file.reopen()?);
501 let payload = Payload::from(read);
502 let hint = payload.size_hint().await?;
503 let s = CONTENTS.len() as u64;
504 assert_eq!(hint.exact(), Some(s));
505 let got = collect(payload).await?;
506 assert_eq!(got[..], CONTENTS[..], "{got:?}");
507 Ok(())
508 }
509
510 #[tokio::test]
511 async fn small_file_seek() -> Result {
512 let mut file = NamedTempFile::new()?;
513 assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
514 file.flush()?;
515 let read = tokio::fs::File::from(file.reopen()?);
516 let mut payload = Payload::from(read);
517 payload.seek(8).await?;
518 let got = collect(payload).await?;
519 assert_eq!(got[..], CONTENTS[8..], "{got:?}");
520 Ok(())
521 }
522
523 #[tokio::test]
524 async fn larger_file() -> Result {
525 let mut file = NamedTempFile::new()?;
526 assert_eq!(file.write(&[0_u8; READ_SIZE])?, READ_SIZE);
527 assert_eq!(file.write(&[1_u8; READ_SIZE])?, READ_SIZE);
528 assert_eq!(file.write(&[2_u8; READ_SIZE])?, READ_SIZE);
529 assert_eq!(file.write(&[3_u8; READ_SIZE])?, READ_SIZE);
530 file.flush()?;
531 assert_eq!(READ_SIZE % 2, 0);
532 let read = tokio::fs::File::from(file.reopen()?);
533 let mut payload = Payload::from(read);
534 payload.seek((READ_SIZE + READ_SIZE / 2) as u64).await?;
535 let got = collect(payload).await?;
536 let mut want = Vec::new();
537 want.extend_from_slice(&[1_u8; READ_SIZE / 2]);
538 want.extend_from_slice(&[2_u8; READ_SIZE]);
539 want.extend_from_slice(&[3_u8; READ_SIZE]);
540 assert_eq!(got[..], want[..], "{got:?}");
541 Ok(())
542 }
543
544 #[tokio::test]
545 async fn iter_source_full() -> Result {
546 const N: usize = 32;
547 let mut buf = Vec::new();
548 buf.extend_from_slice(&[1_u8; N]);
549 buf.extend_from_slice(&[2_u8; N]);
550 buf.extend_from_slice(&[3_u8; N]);
551 let b = bytes::Bytes::from_owner(buf);
552
553 let mut stream =
554 IterSource::new(vec![b.slice(0..N), b.slice(N..(2 * N)), b.slice((2 * N)..)]);
555 assert_eq!(stream.size_hint().await?.exact(), Some(3 * N as u64));
556
557 for offset in [0, N / 2, 0, N, 0, 2 * N + N / 2] {
560 stream.seek(offset as u64).await?;
561 let got = collect_mut(&mut stream).await?;
562 assert_eq!(got[..], b[offset..(3 * N)]);
563 }
564
565 Ok(())
566 }
567}