Skip to main content

fast_pull/file/
mmap.rs

1extern crate std;
2use crate::{ProgressEntry, Pusher};
3use bytes::Bytes;
4use mmap_io::{MemoryMappedFile, MmapIoError, MmapMode, flush::FlushPolicy};
5use std::path::Path;
6use tokio::fs::{self, OpenOptions};
7
8#[derive(Debug)]
9pub struct MmapFilePusher {
10    mmap: MemoryMappedFile,
11}
12impl MmapFilePusher {
13    pub async fn new(path: impl AsRef<Path>, size: u64) -> Result<Self, MmapIoError> {
14        let mmap_builder = MemoryMappedFile::builder(&path)
15            .mode(MmapMode::ReadWrite)
16            .huge_pages(true)
17            .flush_policy(FlushPolicy::Manual);
18        Ok(Self {
19            mmap: if fs::try_exists(&path).await? {
20                OpenOptions::new()
21                    .write(true)
22                    .open(path)
23                    .await?
24                    .set_len(size)
25                    .await?;
26                mmap_builder.open()
27            } else {
28                mmap_builder.size(size).create()
29            }?,
30        })
31    }
32}
33impl Pusher for MmapFilePusher {
34    type Error = MmapIoError;
35    fn push(&mut self, range: &ProgressEntry, bytes: Bytes) -> Result<(), (Self::Error, Bytes)> {
36        self.mmap
37            .update_region(range.start, &bytes)
38            .map_err(|e| (e, bytes))
39    }
40    fn flush(&mut self) -> Result<(), Self::Error> {
41        self.mmap.flush()
42    }
43}
44
45#[cfg(test)]
46mod tests {
47    use super::*;
48    use std::vec::Vec;
49    use tempfile::NamedTempFile;
50    use tokio::{fs::File, io::AsyncReadExt};
51
52    #[tokio::test]
53    async fn test_rand_file_pusher() {
54        // 创建一个临时文件用于测试
55        let temp_file = NamedTempFile::new().unwrap();
56        let file_path = temp_file.path();
57
58        // 初始化 RandFilePusher,假设文件大小为 10 字节
59        let mut pusher = MmapFilePusher::new(file_path, 10).await.unwrap();
60
61        // 写入数据
62        let data = b"234";
63        let range = 2..5;
64        pusher.push(&range, data[..].into()).unwrap();
65        pusher.flush().unwrap();
66
67        // 验证文件内容
68        let mut file_content = Vec::new();
69        File::open(&file_path)
70            .await
71            .unwrap()
72            .read_to_end(&mut file_content)
73            .await
74            .unwrap();
75        assert_eq!(file_content, b"\0\x00234\0\0\0\0\0");
76    }
77}