google_cloud_storage/storage/
streaming_source.rs1use std::collections::VecDeque;
18
19pub type SizeHint = http_body::SizeHint;
21
22pub struct Payload<T> {
44 payload: T,
45}
46
47impl<T> Payload<T>
48where
49 T: StreamingSource,
50{
51 pub fn from_stream(payload: T) -> Self {
52 Self { payload }
53 }
54}
55
56impl<T> StreamingSource for Payload<T>
57where
58 T: StreamingSource + Send + Sync,
59{
60 type Error = T::Error;
61
62 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
63 self.payload.next().await
64 }
65
66 async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
67 self.payload.size_hint().await
68 }
69}
70
71impl<T> Seek for Payload<T>
72where
73 T: Seek,
74{
75 type Error = T::Error;
76
77 fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send {
78 self.payload.seek(offset)
79 }
80}
81
82impl From<bytes::Bytes> for Payload<BytesSource> {
83 fn from(value: bytes::Bytes) -> Self {
84 let payload = BytesSource::new(value);
85 Self { payload }
86 }
87}
88
89impl From<&'static str> for Payload<BytesSource> {
90 fn from(value: &'static str) -> Self {
91 let b = bytes::Bytes::from_static(value.as_bytes());
92 Payload::from(b)
93 }
94}
95
96impl From<Vec<bytes::Bytes>> for Payload<IterSource> {
97 fn from(value: Vec<bytes::Bytes>) -> Self {
98 let payload = IterSource::new(value);
99 Self { payload }
100 }
101}
102
103impl<S> From<S> for Payload<S>
104where
105 S: StreamingSource,
106{
107 fn from(value: S) -> Self {
108 Self { payload: value }
109 }
110}
111
112pub trait StreamingSource {
114 type Error: std::error::Error + Send + Sync + 'static;
116
117 fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes, Self::Error>>> + Send;
119
120 fn size_hint(&self) -> impl Future<Output = Result<SizeHint, Self::Error>> + Send {
128 std::future::ready(Ok(SizeHint::new()))
129 }
130}
131
132pub trait Seek {
138 type Error: std::error::Error + Send + Sync + 'static;
140
141 fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send;
150}
151
152const READ_SIZE: usize = 256 * 1024;
153
154impl From<tokio::fs::File> for Payload<FileSource> {
155 fn from(value: tokio::fs::File) -> Self {
156 Self {
157 payload: FileSource::new(value),
158 }
159 }
160}
161
162pub struct FileSource {
177 inner: tokio::fs::File,
178}
179
180impl FileSource {
181 fn new(inner: tokio::fs::File) -> Self {
182 Self { inner }
183 }
184}
185
186impl StreamingSource for FileSource {
187 type Error = std::io::Error;
188
189 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
190 let mut buffer = vec![0_u8; READ_SIZE];
191 match tokio::io::AsyncReadExt::read(&mut self.inner, &mut buffer).await {
192 Err(e) => Some(Err(e)),
193 Ok(0) => None,
194 Ok(n) => {
195 buffer.resize(n, 0_u8);
196 Some(Ok(bytes::Bytes::from_owner(buffer)))
197 }
198 }
199 }
200 async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
201 let m = self.inner.metadata().await?;
202 Ok(SizeHint::with_exact(m.len()))
203 }
204}
205
206impl Seek for FileSource {
207 type Error = std::io::Error;
208
209 async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
210 use tokio::io::AsyncSeekExt;
211 let _ = self.inner.seek(std::io::SeekFrom::Start(offset)).await?;
212 Ok(())
213 }
214}
215
216pub struct BytesSource {
231 contents: bytes::Bytes,
232 current: Option<bytes::Bytes>,
233}
234
235impl BytesSource {
236 pub(crate) fn new(contents: bytes::Bytes) -> Self {
237 let current = Some(contents.clone());
238 Self { contents, current }
239 }
240}
241
242impl StreamingSource for BytesSource {
243 type Error = crate::Error;
244
245 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
246 self.current.take().map(Result::Ok)
247 }
248
249 async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
250 let s = self.contents.len() as u64;
251 Ok(SizeHint::with_exact(s))
252 }
253}
254
255impl Seek for BytesSource {
256 type Error = crate::Error;
257
258 async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
259 let pos = std::cmp::min(offset as usize, self.contents.len());
260 self.current = Some(self.contents.slice(pos..));
261 Ok(())
262 }
263}
264
265pub(crate) struct IterSource {
267 contents: Vec<bytes::Bytes>,
268 current: VecDeque<bytes::Bytes>,
269}
270
271impl IterSource {
272 pub(crate) fn new<I>(iterator: I) -> Self
273 where
274 I: IntoIterator<Item = bytes::Bytes>,
275 {
276 let contents: Vec<bytes::Bytes> = iterator.into_iter().collect();
277 let current: VecDeque<bytes::Bytes> = contents.iter().cloned().collect();
278 Self { contents, current }
279 }
280}
281
282impl StreamingSource for IterSource {
283 type Error = std::io::Error;
284
285 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
286 self.current.pop_front().map(Ok)
287 }
288
289 async fn size_hint(&self) -> Result<SizeHint, Self::Error> {
290 let s = self.contents.iter().fold(0_u64, |a, i| a + i.len() as u64);
291 Ok(SizeHint::with_exact(s))
292 }
293}
294
295impl Seek for IterSource {
296 type Error = std::io::Error;
297 async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
298 let mut current = VecDeque::new();
299 let mut offset = offset as usize;
300 for b in self.contents.iter() {
301 offset = match (offset, b.len()) {
302 (0, _) => {
303 current.push_back(b.clone());
304 0
305 }
306 (o, n) if o >= n => o - n,
307 (o, n) => {
308 current.push_back(b.clone().split_off(n - o));
309 0
310 }
311 }
312 }
313 self.current = current;
314 Ok(())
315 }
316}
317
318#[cfg(test)]
319pub mod tests {
320 use super::*;
321 use std::io::Write;
322 use tempfile::NamedTempFile;
323
324 type Result = anyhow::Result<()>;
325
326 const CONTENTS: &[u8] = b"how vexingly quick daft zebras jump";
327
328 pub(crate) struct UnknownSize {
329 inner: BytesSource,
330 }
331 impl UnknownSize {
332 pub fn new(inner: BytesSource) -> Self {
333 Self { inner }
334 }
335 }
336 impl Seek for UnknownSize {
337 type Error = <BytesSource as Seek>::Error;
338 async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
339 self.inner.seek(offset).await
340 }
341 }
342 impl StreamingSource for UnknownSize {
343 type Error = <BytesSource as StreamingSource>::Error;
344 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
345 self.inner.next().await
346 }
347 async fn size_hint(&self) -> std::result::Result<SizeHint, Self::Error> {
348 let inner = self.inner.size_hint().await?;
349 let mut hint = SizeHint::default();
350 hint.set_lower(inner.lower());
351 Ok(hint)
352 }
353 }
354
355 mockall::mock! {
356 pub(crate) SimpleSource {}
357
358 impl StreamingSource for SimpleSource {
359 type Error = std::io::Error;
360 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
361 async fn size_hint(&self) -> std::result::Result<SizeHint, std::io::Error>;
362 }
363 }
364
365 mockall::mock! {
366 pub(crate) SeekSource {}
367
368 impl StreamingSource for SeekSource {
369 type Error = std::io::Error;
370 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, std::io::Error>>;
371 async fn size_hint(&self) -> std::result::Result<SizeHint, std::io::Error>;
372 }
373 impl Seek for SeekSource {
374 type Error = std::io::Error;
375 async fn seek(&mut self, offset: u64) ->std::result::Result<(), std::io::Error>;
376 }
377 }
378
379 async fn collect<S>(mut source: S) -> anyhow::Result<Vec<u8>>
381 where
382 S: StreamingSource,
383 {
384 collect_mut(&mut source).await
385 }
386
387 async fn collect_mut<S>(source: &mut S) -> anyhow::Result<Vec<u8>>
389 where
390 S: StreamingSource,
391 {
392 let mut vec = Vec::new();
393 while let Some(bytes) = source.next().await.transpose()? {
394 vec.extend_from_slice(&bytes);
395 }
396 Ok(vec)
397 }
398
399 #[tokio::test]
400 async fn empty_bytes() -> Result {
401 let buffer = Payload::from(bytes::Bytes::default());
402 let range = buffer.size_hint().await?;
403 assert_eq!(range.exact(), Some(0));
404 let got = collect(buffer).await?;
405 assert!(got.is_empty(), "{got:?}");
406
407 Ok(())
408 }
409
410 #[tokio::test]
411 async fn simple_bytes() -> Result {
412 let buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
413 let range = buffer.size_hint().await?;
414 assert_eq!(range.exact(), Some(CONTENTS.len() as u64));
415 let got = collect(buffer).await?;
416 assert_eq!(got[..], CONTENTS[..], "{got:?}");
417 Ok(())
418 }
419
420 #[tokio::test]
421 async fn simple_str() -> Result {
422 const LAZY: &str = "the quick brown fox jumps over the lazy dog";
423 let buffer = Payload::from(LAZY);
424 let range = buffer.size_hint().await?;
425 assert_eq!(range.exact(), Some(LAZY.len() as u64));
426 let got = collect(buffer).await?;
427 assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
428 Ok(())
429 }
430
431 #[tokio::test]
432 async fn seek_bytes() -> Result {
433 let mut buffer = Payload::from(bytes::Bytes::from_static(CONTENTS));
434 buffer.seek(8).await?;
435 let got = collect(buffer).await?;
436 assert_eq!(got[..], CONTENTS[8..], "{got:?}");
437 Ok(())
438 }
439
440 #[tokio::test]
441 async fn empty_stream() -> Result {
442 let source = IterSource::new(vec![]);
443 let payload = Payload::from(source);
444 let range = payload.size_hint().await?;
445 assert_eq!(range.exact(), Some(0));
446 let got = collect(payload).await?;
447 assert!(got.is_empty(), "{got:?}");
448
449 Ok(())
450 }
451
452 #[tokio::test]
453 async fn simple_stream() -> Result {
454 let source = IterSource::new(
455 ["how ", "vexingly ", "quick ", "daft ", "zebras ", "jump"]
456 .map(|v| bytes::Bytes::from_static(v.as_bytes())),
457 );
458 let payload = Payload::from_stream(source);
459 let got = collect(payload).await?;
460 assert_eq!(got[..], CONTENTS[..]);
461
462 Ok(())
463 }
464
465 #[tokio::test]
466 async fn empty_file() -> Result {
467 let file = NamedTempFile::new()?;
468 let read = tokio::fs::File::from(file.reopen()?);
469 let payload = Payload::from(read);
470 let hint = payload.size_hint().await?;
471 assert_eq!(hint.exact(), Some(0));
472 let got = collect(payload).await?;
473 assert!(got.is_empty(), "{got:?}");
474 Ok(())
475 }
476
477 #[tokio::test]
478 async fn small_file() -> Result {
479 let mut file = NamedTempFile::new()?;
480 assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
481 file.flush()?;
482 let read = tokio::fs::File::from(file.reopen()?);
483 let payload = Payload::from(read);
484 let hint = payload.size_hint().await?;
485 let s = CONTENTS.len() as u64;
486 assert_eq!(hint.exact(), Some(s));
487 let got = collect(payload).await?;
488 assert_eq!(got[..], CONTENTS[..], "{got:?}");
489 Ok(())
490 }
491
492 #[tokio::test]
493 async fn small_file_seek() -> Result {
494 let mut file = NamedTempFile::new()?;
495 assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
496 file.flush()?;
497 let read = tokio::fs::File::from(file.reopen()?);
498 let mut payload = Payload::from(read);
499 payload.seek(8).await?;
500 let got = collect(payload).await?;
501 assert_eq!(got[..], CONTENTS[8..], "{got:?}");
502 Ok(())
503 }
504
505 #[tokio::test]
506 async fn larger_file() -> Result {
507 let mut file = NamedTempFile::new()?;
508 assert_eq!(file.write(&[0_u8; READ_SIZE])?, READ_SIZE);
509 assert_eq!(file.write(&[1_u8; READ_SIZE])?, READ_SIZE);
510 assert_eq!(file.write(&[2_u8; READ_SIZE])?, READ_SIZE);
511 assert_eq!(file.write(&[3_u8; READ_SIZE])?, READ_SIZE);
512 file.flush()?;
513 assert_eq!(READ_SIZE % 2, 0);
514 let read = tokio::fs::File::from(file.reopen()?);
515 let mut payload = Payload::from(read);
516 payload.seek((READ_SIZE + READ_SIZE / 2) as u64).await?;
517 let got = collect(payload).await?;
518 let mut want = Vec::new();
519 want.extend_from_slice(&[1_u8; READ_SIZE / 2]);
520 want.extend_from_slice(&[2_u8; READ_SIZE]);
521 want.extend_from_slice(&[3_u8; READ_SIZE]);
522 assert_eq!(got[..], want[..], "{got:?}");
523 Ok(())
524 }
525
526 #[tokio::test]
527 async fn iter_source_full() -> Result {
528 const N: usize = 32;
529 let mut buf = Vec::new();
530 buf.extend_from_slice(&[1_u8; N]);
531 buf.extend_from_slice(&[2_u8; N]);
532 buf.extend_from_slice(&[3_u8; N]);
533 let b = bytes::Bytes::from_owner(buf);
534
535 let mut stream =
536 IterSource::new(vec![b.slice(0..N), b.slice(N..(2 * N)), b.slice((2 * N)..)]);
537 assert_eq!(stream.size_hint().await?.exact(), Some(3 * N as u64));
538
539 for offset in [0, N / 2, 0, N, 0, 2 * N + N / 2] {
542 stream.seek(offset as u64).await?;
543 let got = collect_mut(&mut stream).await?;
544 assert_eq!(got[..], b[offset..(3 * N)]);
545 }
546
547 Ok(())
548 }
549}