1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
//! File module
mod named_file;
pub use named_file::*;

use std::cmp;
use std::future::Future;
use std::io::{self, Read, Seek};
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures_util::ready;
use futures_util::stream::Stream;

pub(crate) enum ChunkedState<T> {
    File(Option<T>),
    Future(tokio::task::JoinHandle<Result<(T, Bytes), io::Error>>),
}

/// FileChunk
pub struct FileChunk<T> {
    chunk_size: u64,
    read_size: u64,
    buffer_size: u64,
    offset: u64,
    state: ChunkedState<T>,
}

impl<T> Stream for FileChunk<T>
where
    T: Read + Seek + Unpin + Send + 'static,
{
    type Item = Result<Bytes, io::Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        if self.chunk_size == self.read_size {
            return Poll::Ready(None);
        }

        match self.state {
            ChunkedState::File(ref mut file) => {
                let mut file = file.take().expect("ChunkedReadFile polled after completion");
                let max_bytes = cmp::min(self.chunk_size.saturating_sub(self.read_size), self.buffer_size) as usize;
                let offset = self.offset;
                let fut = tokio::task::spawn_blocking(move || {
                    let mut buf = Vec::with_capacity(max_bytes);
                    file.seek(io::SeekFrom::Start(offset))?;
                    let bytes = file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
                    if bytes == 0 {
                        return Err(io::ErrorKind::UnexpectedEof.into());
                    }
                    Ok((file, Bytes::from(buf)))
                });

                self.state = ChunkedState::Future(fut);
                self.poll_next(cx)
            }
            ChunkedState::Future(ref mut fut) => {
                let (file, bytes) = ready!(Pin::new(fut).poll(cx))
                    .map_err(|_| io::Error::new(io::ErrorKind::Other, "BlockingErr"))??;
                self.state = ChunkedState::File(Some(file));

                self.offset += bytes.len() as u64;
                self.read_size += bytes.len() as u64;

                Poll::Ready(Some(Ok(bytes)))
            }
        }
    }
}

#[cfg(test)]
mod test {
    use std::io::Cursor;
    use std::path::Path;
    use std::str::FromStr;

    use futures_util::stream::StreamExt;
    use mime::Mime;

    use super::*;
    use crate::http::header::HeaderValue;

    #[tokio::test]
    async fn test_chunk_read() {
        const SIZE: u64 = 1024 * 1024 * 5;
        let mock = Cursor::new((0..SIZE).map(|_| rand::random::<u8>()).collect::<Vec<_>>());

        let mut chunk = FileChunk {
            chunk_size: SIZE,
            read_size: 0,
            buffer_size: 65535,
            offset: 0,
            state: ChunkedState::File(Some(mock.clone())),
        };

        let mut result = bytes::BytesMut::with_capacity(SIZE as usize);

        while let Some(Ok(read_chunck)) = chunk.next().await {
            result.extend_from_slice(&read_chunck)
        }

        assert_eq!(mock.into_inner(), result)
    }
    #[tokio::test]
    async fn test_named_file_builder() {
        let src = "../examples/static/test/test1.txt";
        // println!("current path: {:?}", std::env::current_dir());
        // println!("current current_exe: {:?}", std::env::current_exe());
        let file = NamedFile::builder(src.into())
            .with_attached_filename("attach.file")
            .with_buffer_size(8888)
            .with_content_type(Mime::from_str("text/html").unwrap())
            .build()
            .await
            .unwrap();
        assert_eq!(file.path(), Path::new(src));
        assert_eq!(file.content_type(), &Mime::from_str("text/html").unwrap());
        assert_eq!(
            file.content_disposition(),
            &HeaderValue::from_static("attachment; filename=attach.file")
        );
    }
}