backblaze_b2_client/util/
file_stream.rs1use std::pin::Pin;
2
3use bytes::Bytes;
4use futures::StreamExt;
5use futures_core::Stream;
6
7use crate::error::B2Error;
8
9use super::B2Callback;
10
11pub struct B2FileStream {
22 stream: Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>,
23 size: usize,
24 middlewares: Vec<B2Callback<Bytes>>,
25}
26
27impl B2FileStream {
28 pub fn new<S>(stream: S, size: usize) -> Self
29 where
30 S: Stream<Item = Result<Bytes, reqwest::Error>> + 'static + Send,
31 {
32 Self {
33 stream: Box::pin(stream),
34 size,
35 middlewares: vec![],
36 }
37 }
38
39 pub async fn read_all(mut self) -> Result<Bytes, B2Error> {
41 let mut buffer: Vec<u8> = Vec::with_capacity(self.size);
42
43 loop {
44 match self.stream.next().await {
45 Some(value) => {
46 let value = value.map_err(|err| B2Error::RequestSendError(err))?;
47
48 for middleware in &mut self.middlewares {
49 match middleware {
50 B2Callback::Fn(fun) => fun(value.clone()),
51 B2Callback::AsyncFn(fun) => fun(value.clone()).await,
52 }
53 }
54
55 buffer.extend_from_slice(value.as_ref());
56 }
57 None => break,
58 }
59 }
60
61 Ok(Bytes::from(buffer))
62 }
63
64 pub fn into_stream(
66 self,
67 ) -> (
68 usize,
69 Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>,
70 ) {
71 (self.size, self.stream)
72 }
73
74 pub fn add_middleware(&mut self, middleware: B2Callback<Bytes>) -> &mut Self {
76 self.middlewares.push(middleware);
77
78 self
79 }
80}