Skip to main content

fast_pull/file/
mmap.rs

1use crate::{ProgressEntry, Pusher};
2use bytes::Bytes;
3use mmap_io::{MemoryMappedFile, MmapIoError, MmapMode, flush::FlushPolicy};
4use std::path::Path;
5use tokio::fs::{self, OpenOptions};
6
7#[derive(Debug)]
8pub struct MmapFilePusher {
9    mmap: MemoryMappedFile,
10}
11impl MmapFilePusher {
12    pub async fn new(path: impl AsRef<Path>, size: u64) -> Result<Self, MmapIoError> {
13        let mmap_builder = MemoryMappedFile::builder(&path)
14            .mode(MmapMode::ReadWrite)
15            .huge_pages(true)
16            .flush_policy(FlushPolicy::Manual);
17        Ok(Self {
18            mmap: if fs::try_exists(&path).await? {
19                OpenOptions::new()
20                    .write(true)
21                    .open(path)
22                    .await?
23                    .set_len(size)
24                    .await?;
25                mmap_builder.open()
26            } else {
27                mmap_builder.size(size).create()
28            }?,
29        })
30    }
31}
32impl Pusher for MmapFilePusher {
33    type Error = MmapIoError;
34    fn push(&mut self, range: &ProgressEntry, bytes: Bytes) -> Result<(), (Self::Error, Bytes)> {
35        self.mmap
36            .update_region(range.start, &bytes)
37            .map_err(|e| (e, bytes))
38    }
39    fn flush(&mut self) -> Result<(), Self::Error> {
40        self.mmap.flush()
41    }
42}
43
44#[cfg(test)]
45mod tests {
46    use super::*;
47    use std::vec::Vec;
48    use tempfile::NamedTempFile;
49    use tokio::{fs::File, io::AsyncReadExt};
50
51    #[tokio::test]
52    async fn test_rand_file_pusher() {
53        // 创建一个临时文件用于测试
54        let temp_file = NamedTempFile::new().unwrap();
55        let file_path = temp_file.path();
56
57        // 初始化 RandFilePusher,假设文件大小为 10 字节
58        let mut pusher = MmapFilePusher::new(file_path, 10).await.unwrap();
59
60        // 写入数据
61        let data = b"234";
62        let range = 2..5;
63        pusher.push(&range, data[..].into()).unwrap();
64        pusher.flush().unwrap();
65
66        // 验证文件内容
67        let mut file_content = Vec::new();
68        File::open(&file_path)
69            .await
70            .unwrap()
71            .read_to_end(&mut file_content)
72            .await
73            .unwrap();
74        assert_eq!(file_content, b"\0\x00234\0\0\0\0\0");
75    }
76}