mpart_async/
filestream.rs

1use futures_core::Stream;
2use std::path::PathBuf;
3use tokio::fs::File;
4use tokio_util::codec::{BytesCodec, FramedRead};
5
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use std::io::Error;
11
12use bytes::Bytes;
13
14/// Convenience wrapper around streaming out files.  Requires tokio
15///
16/// You can also add this to a `MultipartRequest` using the [`add_file`](../client/struct.MultipartRequest.html#method.add_file) method:
17/// ```no_run
18/// # use mpart_async::client::MultipartRequest;
19/// # fn main() {
20/// let mut req = MultipartRequest::default();
21/// req.add_file("file", "/path/to/file");
22/// # }
23/// ```
24pub struct FileStream {
25    inner: Option<FramedRead<File, BytesCodec>>,
26    file: Pin<Box<dyn Future<Output = Result<File, Error>> + Send + Sync>>,
27}
28
29impl FileStream {
30    /// Create a new FileStream from a file path
31    pub fn new<P: Into<PathBuf>>(file: P) -> Self {
32        FileStream {
33            file: Box::pin(File::open(file.into())),
34            inner: None,
35        }
36    }
37}
38
39impl Stream for FileStream {
40    type Item = Result<Bytes, Error>;
41
42    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43        if let Some(ref mut stream) = self.inner {
44            return Pin::new(stream)
45                .poll_next(cx)
46                .map(|val| val.map(|val| val.map(|val| val.freeze())));
47        } else if let Poll::Ready(file_result) = self.file.as_mut().poll(cx) {
48            match file_result {
49                Ok(file) => {
50                    self.inner = Some(FramedRead::new(file, BytesCodec::new()));
51                    cx.waker().wake_by_ref();
52                }
53                Err(err) => {
54                    return Poll::Ready(Some(Err(err)));
55                }
56            }
57        }
58
59        Poll::Pending
60    }
61}
62
63#[cfg(test)]
64mod tests {
65    use super::FileStream;
66    use std::io::Error;
67    use tokio_stream::StreamExt;
68
69    #[tokio::test]
70    async fn streams_file() -> Result<(), Error> {
71        let bytes = FileStream::new("Cargo.toml").next().await.unwrap()?;
72
73        assert_eq!(bytes, &include_bytes!("../Cargo.toml")[..]);
74
75        Ok(())
76    }
77}