1use crate::{ProgressEntry, Pusher};
2use bytes::Bytes;
3use mmap_io::{MemoryMappedFile, MmapIoError, MmapMode, flush::FlushPolicy};
4use std::path::Path;
5use tokio::fs::{self, OpenOptions};
6
7#[derive(Debug)]
8pub struct MmapFilePusher {
9 mmap: MemoryMappedFile,
10}
11impl MmapFilePusher {
12 pub async fn new(path: impl AsRef<Path> + Send + Sync, size: u64) -> Result<Self, MmapIoError> {
18 let mmap_builder = MemoryMappedFile::builder(&path)
19 .mode(MmapMode::ReadWrite)
20 .huge_pages(true)
21 .flush_policy(FlushPolicy::Manual);
22 Ok(Self {
23 mmap: if fs::try_exists(&path).await? {
24 OpenOptions::new()
25 .write(true)
26 .open(path)
27 .await?
28 .set_len(size)
29 .await?;
30 mmap_builder.open()
31 } else {
32 mmap_builder.size(size).create()
33 }?,
34 })
35 }
36}
37impl Pusher for MmapFilePusher {
38 type Error = MmapIoError;
39 fn push(&mut self, range: &ProgressEntry, bytes: Bytes) -> Result<(), (Self::Error, Bytes)> {
40 self.mmap
41 .update_region(range.start, &bytes)
42 .map_err(|e| (e, bytes))
43 }
44 fn flush(&mut self) -> Result<(), Self::Error> {
45 self.mmap.flush()
46 }
47}
48
49#[cfg(test)]
50mod tests {
51 #![allow(clippy::unwrap_used)]
52 use super::*;
53 use std::vec::Vec;
54 use tempfile::NamedTempFile;
55 use tokio::{fs::File, io::AsyncReadExt};
56
57 #[tokio::test]
58 async fn test_rand_file_pusher() {
59 let temp_file = NamedTempFile::new().unwrap();
61 let file_path = temp_file.path();
62
63 let mut pusher = MmapFilePusher::new(file_path, 10).await.unwrap();
65
66 let data = b"234";
68 let range = 2..5;
69 pusher.push(&range, data[..].into()).unwrap();
70 pusher.flush().unwrap();
71
72 let mut file_content = Vec::new();
74 File::open(&file_path)
75 .await
76 .unwrap()
77 .read_to_end(&mut file_content)
78 .await
79 .unwrap();
80 assert_eq!(file_content, b"\0\x00234\0\0\0\0\0");
81 }
82}