fast_pull/file/
mmap.rs

1extern crate std;
2use crate::{ProgressEntry, RandPusher, Total, file::FilePusherError};
3use mmap_io::{MemoryMappedFile, MmapMode, flush::FlushPolicy};
4use std::path::Path;
5use tokio::fs::OpenOptions;
6
7#[derive(Debug)]
8pub struct MmapFilePusher {
9    mmap: MemoryMappedFile,
10    downloaded: usize,
11    buffer_size: usize,
12}
13impl MmapFilePusher {
14    pub async fn new(
15        path: impl AsRef<Path>,
16        size: u64,
17        buffer_size: usize,
18    ) -> Result<Self, FilePusherError> {
19        let mmap_builder = MemoryMappedFile::builder(&path)
20            .mode(MmapMode::ReadWrite)
21            .huge_pages(true)
22            .flush_policy(FlushPolicy::Manual);
23        Ok(Self {
24            mmap: if path.as_ref().try_exists()? {
25                OpenOptions::new()
26                    .write(true)
27                    .open(path)
28                    .await?
29                    .set_len(size)
30                    .await?;
31                mmap_builder.open()
32            } else {
33                mmap_builder.size(size).create()
34            }?,
35            downloaded: 0,
36            buffer_size,
37        })
38    }
39}
40impl RandPusher for MmapFilePusher {
41    type Error = FilePusherError;
42    async fn push(&mut self, range: ProgressEntry, bytes: &[u8]) -> Result<(), Self::Error> {
43        self.mmap
44            .as_slice_mut(range.start, range.total())?
45            .as_mut()
46            .copy_from_slice(bytes);
47        self.downloaded += bytes.len();
48        if self.downloaded >= self.buffer_size {
49            self.mmap.flush_async().await?;
50            self.downloaded = 0;
51        }
52        Ok(())
53    }
54    async fn flush(&mut self) -> Result<(), Self::Error> {
55        self.mmap.flush_async().await?;
56        Ok(())
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use super::*;
63    use std::vec::Vec;
64    use tempfile::NamedTempFile;
65    use tokio::{fs::File, io::AsyncReadExt};
66
67    #[tokio::test]
68    async fn test_rand_file_pusher() {
69        // 创建一个临时文件用于测试
70        let temp_file = NamedTempFile::new().unwrap();
71        let file_path = temp_file.path();
72
73        // 初始化 RandFilePusher,假设文件大小为 10 字节
74        let mut pusher = MmapFilePusher::new(file_path, 10, 8 * 1024 * 1024)
75            .await
76            .unwrap();
77
78        // 写入数据
79        let data = b"234";
80        let range = 2..5;
81        pusher.push(range, &data[..]).await.unwrap();
82        pusher.flush().await.unwrap();
83
84        // 验证文件内容
85        let mut file_content = Vec::new();
86        File::open(&file_path)
87            .await
88            .unwrap()
89            .read_to_end(&mut file_content)
90            .await
91            .unwrap();
92        assert_eq!(file_content, b"\0\x00234\0\0\0\0\0");
93    }
94}