Skip to main content

fast_pull/file/
mmap.rs

1use crate::{ProgressEntry, Pusher};
2use bytes::Bytes;
3use mmap_io::{MemoryMappedFile, MmapIoError, MmapMode, flush::FlushPolicy};
4use std::path::Path;
5use tokio::fs::{self, OpenOptions};
6
7#[derive(Debug)]
8pub struct MmapFilePusher {
9    mmap: MemoryMappedFile,
10}
11impl MmapFilePusher {
12    /// # Errors
13    /// 1. 当 `fs::try_exists` 失败时返回错误。
14    /// 2. 当 `fs::open` 失败时返回错误。
15    /// 3. 当 `fs::set_len` 失败时返回错误。
16    /// 4. 当 `mmap_io::open` 失败时返回错误。
17    pub async fn new(path: impl AsRef<Path> + Send + Sync, size: u64) -> Result<Self, MmapIoError> {
18        let mmap_builder = MemoryMappedFile::builder(&path)
19            .mode(MmapMode::ReadWrite)
20            .huge_pages(true)
21            .flush_policy(FlushPolicy::Manual);
22        Ok(Self {
23            mmap: if fs::try_exists(&path).await? {
24                OpenOptions::new()
25                    .write(true)
26                    .open(path)
27                    .await?
28                    .set_len(size)
29                    .await?;
30                mmap_builder.open()
31            } else {
32                mmap_builder.size(size).create()
33            }?,
34        })
35    }
36}
37impl Pusher for MmapFilePusher {
38    type Error = MmapIoError;
39    fn push(&mut self, range: &ProgressEntry, bytes: Bytes) -> Result<(), (Self::Error, Bytes)> {
40        self.mmap
41            .update_region(range.start, &bytes)
42            .map_err(|e| (e, bytes))
43    }
44    fn flush(&mut self) -> Result<(), Self::Error> {
45        self.mmap.flush()
46    }
47}
48
49#[cfg(test)]
50mod tests {
51    #![allow(clippy::unwrap_used)]
52    use super::*;
53    use std::vec::Vec;
54    use tempfile::NamedTempFile;
55    use tokio::{fs::File, io::AsyncReadExt};
56
57    #[tokio::test]
58    async fn test_rand_file_pusher() {
59        // 创建一个临时文件用于测试
60        let temp_file = NamedTempFile::new().unwrap();
61        let file_path = temp_file.path();
62
63        // 初始化 RandFilePusher,假设文件大小为 10 字节
64        let mut pusher = MmapFilePusher::new(file_path, 10).await.unwrap();
65
66        // 写入数据
67        let data = b"234";
68        let range = 2..5;
69        pusher.push(&range, data[..].into()).unwrap();
70        pusher.flush().unwrap();
71
72        // 验证文件内容
73        let mut file_content = Vec::new();
74        File::open(&file_path)
75            .await
76            .unwrap()
77            .read_to_end(&mut file_content)
78            .await
79            .unwrap();
80        assert_eq!(file_content, b"\0\x00234\0\0\0\0\0");
81    }
82}