backblaze_b2_client/util/
file_stream.rs

1use std::pin::Pin;
2
3use bytes::Bytes;
4use futures::StreamExt;
5use futures_core::Stream;
6
7use crate::error::B2Error;
8
9use super::B2Callback;
10
11/// A file stream for the B2File, you're most likely gonna only use it as the following:
12///
13/// ```rs
14/// let mut response = client
15///        .download_file_by_id(B2DownloadFileByIdQueryParameters::builder().file_id("...".into()).build())
16///        .await
17///        .unwrap();
18///
19/// let data = response.file.read_all().await;
20/// ```
21pub struct B2FileStream {
22    stream: Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>,
23    size: usize,
24    middlewares: Vec<B2Callback<Bytes>>,
25}
26
27impl B2FileStream {
28    pub fn new<S>(stream: S, size: usize) -> Self
29    where
30        S: Stream<Item = Result<Bytes, reqwest::Error>> + 'static + Send,
31    {
32        Self {
33            stream: Box::pin(stream),
34            size,
35            middlewares: vec![],
36        }
37    }
38
39    /// Reads the entire file at once, consuming self in the process.
40    pub async fn read_all(mut self) -> Result<Bytes, B2Error> {
41        let mut buffer: Vec<u8> = Vec::with_capacity(self.size);
42
43        loop {
44            match self.stream.next().await {
45                Some(value) => {
46                    let value = value.map_err(|err| B2Error::RequestSendError(err))?;
47
48                    for middleware in &mut self.middlewares {
49                        match middleware {
50                            B2Callback::Fn(fun) => fun(value.clone()),
51                            B2Callback::AsyncFn(fun) => fun(value.clone()).await,
52                        }
53                    }
54
55                    buffer.extend_from_slice(value.as_ref());
56                }
57                None => break,
58            }
59        }
60
61        Ok(Bytes::from(buffer))
62    }
63
64    /// Consumes self, then returns the underlying stream and file size
65    pub fn into_stream(
66        self,
67    ) -> (
68        usize,
69        Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>,
70    ) {
71        (self.size, self.stream)
72    }
73
74    /// Adds a middleware to the list to run, returns mutable reference to self.
75    pub fn add_middleware(&mut self, middleware: B2Callback<Bytes>) -> &mut Self {
76        self.middlewares.push(middleware);
77
78        self
79    }
80}