mpart_async/
filestream.rs1use futures_core::Stream;
2use std::path::PathBuf;
3use tokio::fs::File;
4use tokio_util::codec::{BytesCodec, FramedRead};
5
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use std::io::Error;
11
12use bytes::Bytes;
13
14pub struct FileStream {
25 inner: Option<FramedRead<File, BytesCodec>>,
26 file: Pin<Box<dyn Future<Output = Result<File, Error>> + Send + Sync>>,
27}
28
29impl FileStream {
30 pub fn new<P: Into<PathBuf>>(file: P) -> Self {
32 FileStream {
33 file: Box::pin(File::open(file.into())),
34 inner: None,
35 }
36 }
37}
38
39impl Stream for FileStream {
40 type Item = Result<Bytes, Error>;
41
42 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43 if let Some(ref mut stream) = self.inner {
44 return Pin::new(stream)
45 .poll_next(cx)
46 .map(|val| val.map(|val| val.map(|val| val.freeze())));
47 } else if let Poll::Ready(file_result) = self.file.as_mut().poll(cx) {
48 match file_result {
49 Ok(file) => {
50 self.inner = Some(FramedRead::new(file, BytesCodec::new()));
51 cx.waker().wake_by_ref();
52 }
53 Err(err) => {
54 return Poll::Ready(Some(Err(err)));
55 }
56 }
57 }
58
59 Poll::Pending
60 }
61}
62
63#[cfg(test)]
64mod tests {
65 use super::FileStream;
66 use std::io::Error;
67 use tokio_stream::StreamExt;
68
69 #[tokio::test]
70 async fn streams_file() -> Result<(), Error> {
71 let bytes = FileStream::new("Cargo.toml").next().await.unwrap()?;
72
73 assert_eq!(bytes, &include_bytes!("../Cargo.toml")[..]);
74
75 Ok(())
76 }
77}