1use crate::{ProgressEntry, Pusher};
2use bytes::Bytes;
3use mmap_io::{MemoryMappedFile, MmapIoError, MmapMode, flush::FlushPolicy};
4use std::path::Path;
5use tokio::fs::{self, OpenOptions};
6
7#[derive(Debug)]
8pub struct MmapFilePusher {
9 mmap: MemoryMappedFile,
10}
11impl MmapFilePusher {
12 pub async fn new(path: impl AsRef<Path>, size: u64) -> Result<Self, MmapIoError> {
13 let mmap_builder = MemoryMappedFile::builder(&path)
14 .mode(MmapMode::ReadWrite)
15 .huge_pages(true)
16 .flush_policy(FlushPolicy::Manual);
17 Ok(Self {
18 mmap: if fs::try_exists(&path).await? {
19 OpenOptions::new()
20 .write(true)
21 .open(path)
22 .await?
23 .set_len(size)
24 .await?;
25 mmap_builder.open()
26 } else {
27 mmap_builder.size(size).create()
28 }?,
29 })
30 }
31}
32impl Pusher for MmapFilePusher {
33 type Error = MmapIoError;
34 fn push(&mut self, range: &ProgressEntry, bytes: Bytes) -> Result<(), (Self::Error, Bytes)> {
35 self.mmap
36 .update_region(range.start, &bytes)
37 .map_err(|e| (e, bytes))
38 }
39 fn flush(&mut self) -> Result<(), Self::Error> {
40 self.mmap.flush()
41 }
42}
43
44#[cfg(test)]
45mod tests {
46 use super::*;
47 use std::vec::Vec;
48 use tempfile::NamedTempFile;
49 use tokio::{fs::File, io::AsyncReadExt};
50
51 #[tokio::test]
52 async fn test_rand_file_pusher() {
53 let temp_file = NamedTempFile::new().unwrap();
55 let file_path = temp_file.path();
56
57 let mut pusher = MmapFilePusher::new(file_path, 10).await.unwrap();
59
60 let data = b"234";
62 let range = 2..5;
63 pusher.push(&range, data[..].into()).unwrap();
64 pusher.flush().unwrap();
65
66 let mut file_content = Vec::new();
68 File::open(&file_path)
69 .await
70 .unwrap()
71 .read_to_end(&mut file_content)
72 .await
73 .unwrap();
74 assert_eq!(file_content, b"\0\x00234\0\0\0\0\0");
75 }
76}