1extern crate std;
2use crate::{ProgressEntry, Pusher};
3use bytes::Bytes;
4use mmap_io::{MemoryMappedFile, MmapIoError, MmapMode, flush::FlushPolicy};
5use std::path::Path;
6use tokio::fs::{self, OpenOptions};
7
8#[derive(Debug)]
9pub struct MmapFilePusher {
10 mmap: MemoryMappedFile,
11}
12impl MmapFilePusher {
13 pub async fn new(path: impl AsRef<Path>, size: u64) -> Result<Self, MmapIoError> {
14 let mmap_builder = MemoryMappedFile::builder(&path)
15 .mode(MmapMode::ReadWrite)
16 .huge_pages(true)
17 .flush_policy(FlushPolicy::Manual);
18 Ok(Self {
19 mmap: if fs::try_exists(&path).await? {
20 OpenOptions::new()
21 .write(true)
22 .open(path)
23 .await?
24 .set_len(size)
25 .await?;
26 mmap_builder.open()
27 } else {
28 mmap_builder.size(size).create()
29 }?,
30 })
31 }
32}
33impl Pusher for MmapFilePusher {
34 type Error = MmapIoError;
35 fn push(&mut self, range: &ProgressEntry, bytes: Bytes) -> Result<(), (Self::Error, Bytes)> {
36 self.mmap
37 .update_region(range.start, &bytes)
38 .map_err(|e| (e, bytes))
39 }
40 fn flush(&mut self) -> Result<(), Self::Error> {
41 self.mmap.flush()
42 }
43}
44
45#[cfg(test)]
46mod tests {
47 use super::*;
48 use std::vec::Vec;
49 use tempfile::NamedTempFile;
50 use tokio::{fs::File, io::AsyncReadExt};
51
52 #[tokio::test]
53 async fn test_rand_file_pusher() {
54 let temp_file = NamedTempFile::new().unwrap();
56 let file_path = temp_file.path();
57
58 let mut pusher = MmapFilePusher::new(file_path, 10).await.unwrap();
60
61 let data = b"234";
63 let range = 2..5;
64 pusher.push(&range, data[..].into()).unwrap();
65 pusher.flush().unwrap();
66
67 let mut file_content = Vec::new();
69 File::open(&file_path)
70 .await
71 .unwrap()
72 .read_to_end(&mut file_content)
73 .await
74 .unwrap();
75 assert_eq!(file_content, b"\0\x00234\0\0\0\0\0");
76 }
77}