google_cloud_storage/storage/
streaming_source.rs1use std::collections::VecDeque;
18
19pub type SizeHint = http_body::SizeHint;
21
22pub struct Payload<T> {
44 payload: T,
45}
46
47impl<T> Payload<T>
48where
49 T: StreamingSource,
50{
51 pub fn from_stream(payload: T) -> Self {
53 Self { payload }
54 }
55}
56
57impl<T> StreamingSource for Payload<T>
58where
59 T: StreamingSource + Send + Sync,
60{
61 type Error = T::Error;
62
63 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
64 self.payload.next().await
65 }
66
67 async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
68 self.payload.size_hint().await
69 }
70}
71
72impl<T> Seek for Payload<T>
73where
74 T: Seek,
75{
76 type Error = T::Error;
77
78 fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send {
79 self.payload.seek(offset)
80 }
81}
82
83impl From<bytes::Bytes> for Payload<BytesSource> {
84 fn from(value: bytes::Bytes) -> Self {
85 let payload = BytesSource::new(value);
86 Self { payload }
87 }
88}
89
90impl From<&'static str> for Payload<BytesSource> {
91 fn from(value: &'static str) -> Self {
92 let b = bytes::Bytes::from_static(value.as_bytes());
93 Payload::from(b)
94 }
95}
96
97impl From<String> for Payload<BytesSource> {
98 fn from(value: String) -> Self {
99 let payload = BytesSource::new(bytes::Bytes::from(value));
100 Self { payload }
101 }
102}
103
104impl From<Vec<bytes::Bytes>> for Payload<IterSource> {
105 fn from(value: Vec<bytes::Bytes>) -> Self {
106 let payload = IterSource::new(value);
107 Self { payload }
108 }
109}
110
111impl<S> From<S> for Payload<S>
112where
113 S: StreamingSource,
114{
115 fn from(value: S) -> Self {
116 Self { payload: value }
117 }
118}
119
120pub trait StreamingSource {
122 type Error: std::error::Error + Send + Sync + 'static;
124
125 fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes, Self::Error>>> + Send;
127
128 fn size_hint(&self) -> impl Future<Output = Result<SizeHint, Self::Error>> + Send {
136 std::future::ready(Ok(SizeHint::new()))
137 }
138}
139
140pub trait Seek {
146 type Error: std::error::Error + Send + Sync + 'static;
148
149 fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send;
158}
159
160const READ_SIZE: usize = 256 * 1024;
161
162impl From<tokio::fs::File> for Payload<FileSource> {
163 fn from(value: tokio::fs::File) -> Self {
164 Self {
165 payload: FileSource::new(value),
166 }
167 }
168}
169
170pub struct FileSource {
185 inner: tokio::fs::File,
186}
187
188impl FileSource {
189 fn new(inner: tokio::fs::File) -> Self {
190 Self { inner }
191 }
192}
193
194impl StreamingSource for FileSource {
195 type Error = std::io::Error;
196
197 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
198 let mut buffer = vec![0_u8; READ_SIZE];
199 match tokio::io::AsyncReadExt::read(&mut self.inner, &mut buffer).await {
200 Err(e) => Some(Err(e)),
201 Ok(0) => None,
202 Ok(n) => {
203 buffer.resize(n, 0_u8);
204 Some(Ok(bytes::Bytes::from_owner(buffer)))
205 }
206 }
207 }
208 async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
209 let m = self.inner.metadata().await?;
210 Ok(SizeHint::with_exact(m.len()))
211 }
212}
213
214impl Seek for FileSource {
215 type Error = std::io::Error;
216
217 async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
218 use tokio::io::AsyncSeekExt;
219 let _ = self.inner.seek(std::io::SeekFrom::Start(offset)).await?;
220 Ok(())
221 }
222}
223
224pub struct BytesSource {
239 contents: bytes::Bytes,
240 current: Option<bytes::Bytes>,
241}
242
243impl BytesSource {
244 pub(crate) fn new(contents: bytes::Bytes) -> Self {
245 let current = Some(contents.clone());
246 Self { contents, current }
247 }
248}
249
250impl StreamingSource for BytesSource {
251 type Error = crate::Error;
252
253 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
254 self.current.take().map(Result::Ok)
255 }
256
257 async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
258 let s = self.contents.len() as u64;
259 Ok(SizeHint::with_exact(s))
260 }
261}
262
263impl Seek for BytesSource {
264 type Error = crate::Error;
265
266 async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
267 let pos = std::cmp::min(offset as usize, self.contents.len());
268 self.current = Some(self.contents.slice(pos..));
269 Ok(())
270 }
271}
272
273pub(crate) struct IterSource {
275 contents: Vec<bytes::Bytes>,
276 current: VecDeque<bytes::Bytes>,
277}
278
279impl IterSource {
280 pub(crate) fn new<I>(iterator: I) -> Self
281 where
282 I: IntoIterator<Item = bytes::Bytes>,
283 {
284 let contents: Vec<bytes::Bytes> = iterator.into_iter().collect();
285 let current: VecDeque<bytes::Bytes> = contents.iter().cloned().collect();
286 Self { contents, current }
287 }
288}
289
290impl StreamingSource for IterSource {
291 type Error = std::io::Error;
292
293 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
294 self.current.pop_front().map(Ok)
295 }
296
297 async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
298 let s = self.contents.iter().fold(0_u64, |a, i| a + i.len() as u64);
299 Ok(SizeHint::with_exact(s))
300 }
301}
302
303impl Seek for IterSource {
304 type Error = std::io::Error;
305 async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
306 let mut current = VecDeque::new();
307 let mut offset = offset as usize;
308 for b in self.contents.iter() {
309 offset = match (offset, b.len()) {
310 (0, _) => {
311 current.push_back(b.clone());
312 0
313 }
314 (o, n) if o >= n => o - n,
315 (o, n) => {
316 current.push_back(b.clone().split_off(n - o));
317 0
318 }
319 }
320 }
321 self.current = current;
322 Ok(())
323 }
324}
325
326#[cfg(test)]
327pub(crate) mod tests {
328 use super::*;
329 use std::io::Write;
330 use tempfile::NamedTempFile;
331
332 type Result = anyhow::Result<()>;
333
334 const CONTENTS: &[u8] = b"how vexingly quick daft zebras jump";
335
336 pub(crate) struct UnknownSize {
337 inner: BytesSource,
338 }
339 impl UnknownSize {
340 pub fn new(inner: BytesSource) -> Self {
341 Self { inner }
342 }
343 }
344 impl Seek for UnknownSize {
345 type Error = <BytesSource as Seek>::Error;
346 async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
347 self.inner.seek(offset).await
348 }
349 }
350 impl StreamingSource for UnknownSize {
351 type Error = <BytesSource as StreamingSource>::Error;
352 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
353 self.inner.next().await
354 }
355 async fn size_hint(&self) -> std::result::Result<SizeHint, Self::Error> {
356 let inner = self.inner.size_hint().await?;
357 let mut hint = SizeHint::default();
358 hint.set_lower(inner.lower());
359 Ok(hint)
360 }
361 }
362
363 mockall::mock! {
364 pub(crate) SimpleSource {}
365
366 impl StreamingSource for SimpleSource {
367 type Error = std::io::Error;
368 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
369 async fn size_hint(&self) -> std::result::Result<SizeHint, std::io::Error>;
370 }
371 }
372
373 mockall::mock! {
374 pub(crate) SeekSource {}
375
376 impl StreamingSource for SeekSource {
377 type Error = std::io::Error;
378 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
379 async fn size_hint(&self) -> std::result::Result<SizeHint, std::io::Error>;
380 }
381 impl Seek for SeekSource {
382 type Error = std::io::Error;
383 async fn seek(&mut self, offset: u64) ->std::result::Result<(), std::io::Error>;
384 }
385 }
386
387 async fn collect<S>(mut source: S) -> anyhow::Result<Vec<u8>>
389 where
390 S: StreamingSource,
391 {
392 collect_mut(&mut source).await
393 }
394
395 async fn collect_mut<S>(source: &mut S) -> anyhow::Result<Vec<u8>>
397 where
398 S: StreamingSource,
399 {
400 let mut vec = Vec::new();
401 while let Some(bytes) = source.next().await.transpose()? {
402 vec.extend_from_slice(&bytes);
403 }
404 Ok(vec)
405 }
406
407 #[tokio::test]
408 async fn empty_bytes() -> Result {
409 let buffer = Payload::from(bytes::Bytes::default());
410 let range = buffer.size_hint().await?;
411 assert_eq!(range.exact(), Some(0));
412 let got = collect(buffer).await?;
413 assert!(got.is_empty(), "{got:?}");
414
415 Ok(())
416 }
417
418 #[tokio::test]
419 async fn simple_bytes() -> Result {
420 let buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
421 let range = buffer.size_hint().await?;
422 assert_eq!(range.exact(), Some(CONTENTS.len() as u64));
423 let got = collect(buffer).await?;
424 assert_eq!(got[..], CONTENTS[..], "{got:?}");
425 Ok(())
426 }
427
428 #[tokio::test]
429 async fn simple_str() -> Result {
430 const LAZY: &str = "the quick brown fox jumps over the lazy dog";
431 let buffer = Payload::from(LAZY);
432 let range = buffer.size_hint().await?;
433 assert_eq!(range.exact(), Some(LAZY.len() as u64));
434 let got = collect(buffer).await?;
435 assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
436 Ok(())
437 }
438
439 #[tokio::test]
440 async fn simple_string() -> Result {
441 const LAZY: &str = "the quick brown fox jumps over the lazy dog";
442 let buffer = Payload::from(String::from(LAZY));
443 let range = buffer.size_hint().await?;
444 assert_eq!(range.exact(), Some(LAZY.len() as u64));
445 let got = collect(buffer).await?;
446 assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
447 Ok(())
448 }
449
450 #[tokio::test]
451 async fn seek_bytes() -> Result {
452 let mut buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
453 buffer.seek(8).await?;
454 let got = collect(buffer).await?;
455 assert_eq!(got[..], CONTENTS[8..], "{got:?}");
456 Ok(())
457 }
458
459 #[tokio::test]
460 async fn empty_stream() -> Result {
461 let source = IterSource::new(vec![]);
462 let payload = Payload::from(source);
463 let range = payload.size_hint().await?;
464 assert_eq!(range.exact(), Some(0));
465 let got = collect(payload).await?;
466 assert!(got.is_empty(), "{got:?}");
467
468 Ok(())
469 }
470
471 #[tokio::test]
472 async fn simple_stream() -> Result {
473 let source = IterSource::new(
474 ["how ", "vexingly ", "quick ", "daft ", "zebras ", "jump"]
475 .map(|v| bytes::Bytes::from_static(v.as_bytes())),
476 );
477 let payload = Payload::from_stream(source);
478 let got = collect(payload).await?;
479 assert_eq!(got[..], CONTENTS[..]);
480
481 Ok(())
482 }
483
484 #[tokio::test]
485 async fn empty_file() -> Result {
486 let file = NamedTempFile::new()?;
487 let read = tokio::fs::File::from(file.reopen()?);
488 let payload = Payload::from(read);
489 let hint = payload.size_hint().await?;
490 assert_eq!(hint.exact(), Some(0));
491 let got = collect(payload).await?;
492 assert!(got.is_empty(), "{got:?}");
493 Ok(())
494 }
495
496 #[tokio::test]
497 async fn small_file() -> Result {
498 let mut file = NamedTempFile::new()?;
499 assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
500 file.flush()?;
501 let read = tokio::fs::File::from(file.reopen()?);
502 let payload = Payload::from(read);
503 let hint = payload.size_hint().await?;
504 let s = CONTENTS.len() as u64;
505 assert_eq!(hint.exact(), Some(s));
506 let got = collect(payload).await?;
507 assert_eq!(got[..], CONTENTS[..], "{got:?}");
508 Ok(())
509 }
510
511 #[tokio::test]
512 async fn small_file_seek() -> Result {
513 let mut file = NamedTempFile::new()?;
514 assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
515 file.flush()?;
516 let read = tokio::fs::File::from(file.reopen()?);
517 let mut payload = Payload::from(read);
518 payload.seek(8).await?;
519 let got = collect(payload).await?;
520 assert_eq!(got[..], CONTENTS[8..], "{got:?}");
521 Ok(())
522 }
523
524 #[tokio::test]
525 async fn larger_file() -> Result {
526 let mut file = NamedTempFile::new()?;
527 assert_eq!(file.write(&[0_u8; READ_SIZE])?, READ_SIZE);
528 assert_eq!(file.write(&[1_u8; READ_SIZE])?, READ_SIZE);
529 assert_eq!(file.write(&[2_u8; READ_SIZE])?, READ_SIZE);
530 assert_eq!(file.write(&[3_u8; READ_SIZE])?, READ_SIZE);
531 file.flush()?;
532 assert_eq!(READ_SIZE % 2, 0);
533 let read = tokio::fs::File::from(file.reopen()?);
534 let mut payload = Payload::from(read);
535 payload.seek((READ_SIZE + READ_SIZE / 2) as u64).await?;
536 let got = collect(payload).await?;
537 let mut want = Vec::new();
538 want.extend_from_slice(&[1_u8; READ_SIZE / 2]);
539 want.extend_from_slice(&[2_u8; READ_SIZE]);
540 want.extend_from_slice(&[3_u8; READ_SIZE]);
541 assert_eq!(got[..], want[..], "{got:?}");
542 Ok(())
543 }
544
545 #[tokio::test]
546 async fn iter_source_full() -> Result {
547 const N: usize = 32;
548 let mut buf = Vec::new();
549 buf.extend_from_slice(&[1_u8; N]);
550 buf.extend_from_slice(&[2_u8; N]);
551 buf.extend_from_slice(&[3_u8; N]);
552 let b = bytes::Bytes::from_owner(buf);
553
554 let mut stream =
555 IterSource::new(vec![b.slice(0..N), b.slice(N..(2 * N)), b.slice((2 * N)..)]);
556 assert_eq!(stream.size_hint().await?.exact(), Some(3 * N as u64));
557
558 for offset in [0, N / 2, 0, N, 0, 2 * N + N / 2] {
561 stream.seek(offset as u64).await?;
562 let got = collect_mut(&mut stream).await?;
563 assert_eq!(got[..], b[offset..(3 * N)]);
564 }
565
566 Ok(())
567 }
568}