1use std::fs::OpenOptions;
4use std::io::Error as IoError;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::RwLock;
8use std::sync::RwLockReadGuard;
9use std::sync::RwLockWriteGuard;
10
11use async_fs::File;
12use memmap2::Mmap;
13use memmap2::MmapMut;
14
15use crate::task::spawn_blocking;
16
17pub struct MemoryMappedMutFile(Arc<RwLock<MmapMut>>);
19
20impl MemoryMappedMutFile {
21 pub async fn create(m_path: &Path, len: u64) -> Result<(Self, File), IoError> {
22 let owned_path = m_path.to_owned();
23 let (m_map, mfile, _) = spawn_blocking(move || {
24 let inner_path = owned_path.clone();
25 let mfile = OpenOptions::new()
26 .read(true)
27 .write(true)
28 .create(true)
29 .truncate(false)
30 .open(inner_path)
31 .unwrap();
32
33 mfile.set_len(len)?;
34
35 unsafe { MmapMut::map_mut(&mfile) }.map(|mm_file| (mm_file, mfile, owned_path))
36 })
37 .await?;
38
39 Ok((MemoryMappedMutFile::from_mmap(m_map), mfile.into()))
40 }
41
42 fn from_mmap(mmap: MmapMut) -> MemoryMappedMutFile {
43 MemoryMappedMutFile(Arc::new(RwLock::new(mmap)))
44 }
45
46 pub fn inner(&self) -> RwLockReadGuard<MmapMut> {
47 self.0.read().unwrap()
48 }
49
50 pub fn inner_map(&self) -> Arc<RwLock<MmapMut>> {
51 self.0.clone()
52 }
53
54 pub fn mut_inner(&self) -> RwLockWriteGuard<MmapMut> {
55 self.0.write().unwrap()
56 }
57
58 pub fn write_bytes(&mut self, pos: usize, bytes: &[u8]) {
61 let mut m_file = self.mut_inner();
62 let m_array = &mut m_file[..];
63 m_array[pos..(pos + bytes.len())].clone_from_slice(bytes);
64 }
65
66 pub async fn flush_ft(&self) -> Result<(), IoError> {
67 let inner = self.0.clone();
68 spawn_blocking(move || {
69 let inner_map = inner.write().unwrap();
70 let res = inner_map.flush();
71 drop(inner_map);
72 res
73 })
74 .await
75 }
76
77 pub async fn flush_async_ft(&self) -> Result<(), IoError> {
78 let inner = self.0.clone();
79 spawn_blocking(move || {
80 let inner_map = inner.write().unwrap();
81 inner_map.flush_async()
82 })
83 .await
84 }
85
86 pub async fn flush_range_ft(&self, offset: usize, len: usize) -> Result<(), IoError> {
87 let inner = self.0.clone();
88 spawn_blocking(move || {
89 let inner_map = inner.write().unwrap();
90 inner_map.flush_range(offset, len)
91 })
92 .await
93 }
94}
95
96pub struct MemoryMappedFile(Arc<RwLock<Mmap>>);
98
99impl MemoryMappedFile {
100 pub async fn open<P>(path: P, min_len: u64) -> Result<(Self, File), IoError>
102 where
103 P: AsRef<Path>,
104 {
105 let m_path = path.as_ref().to_owned();
106 let (m_map, mfile, _) = spawn_blocking(move || {
107 let mfile = OpenOptions::new().read(true).open(&m_path).unwrap();
108 let meta = mfile.metadata().unwrap();
109 if meta.len() == 0 {
110 let fd = OpenOptions::new()
111 .read(true)
112 .write(true)
113 .open(&m_path)
114 .unwrap();
115
116 fd.set_len(min_len)?;
117 }
118
119 unsafe { Mmap::map(&mfile) }.map(|mm_file| (mm_file, mfile, m_path))
120 })
121 .await?;
122
123 Ok((MemoryMappedFile::from_mmap(m_map), mfile.into()))
124 }
125
126 fn from_mmap(mmap: Mmap) -> MemoryMappedFile {
127 MemoryMappedFile(Arc::new(RwLock::new(mmap)))
128 }
129
130 pub fn inner(&self) -> RwLockReadGuard<Mmap> {
131 self.0.read().unwrap()
132 }
133}
134
135#[cfg(test)]
136mod tests {
137
138 use std::env::temp_dir;
139 use std::fs::File;
140 use std::io::Error as IoError;
141 use std::io::Read;
142
143 use async_fs::OpenOptions;
144 use flv_util::fixture::ensure_clean_file;
145
146 use crate::test_async;
147
148 use super::{MemoryMappedFile, MemoryMappedMutFile};
149
150 #[test_async]
151 async fn test_mmap_write_slice() -> Result<(), IoError> {
152 let index_path = temp_dir().join("test.index");
153 ensure_clean_file(index_path.clone());
154
155 let result = MemoryMappedMutFile::create(&index_path, 3).await;
156 assert!(result.is_ok());
157
158 let (mm_file, _) = result.unwrap();
159 {
160 let mut mm = mm_file.mut_inner();
161 let src = [0x01, 0x02, 0x03];
162 mm.copy_from_slice(&src);
163 }
164
165 mm_file.flush_ft().await?;
166
167 let mut f = File::open(&index_path)?;
168 let mut buffer = vec![0; 3];
169 f.read_exact(&mut buffer)?;
170 assert_eq!(buffer[0], 0x01);
171 assert_eq!(buffer[1], 0x02);
172 assert_eq!(buffer[2], 0x03);
173
174 Ok(())
175 }
176
177 #[test_async]
178 async fn test_mmap_write_pair_slice() -> Result<(), IoError> {
179 let index_path = temp_dir().join("pairslice.index");
180 ensure_clean_file(index_path.clone());
181
182 let result = MemoryMappedMutFile::create(&index_path, 24).await;
183 assert!(result.is_ok());
184
185 let (mm_file, _) = result.unwrap();
186 {
187 let mut mm = mm_file.mut_inner();
188 let src: [(u32, u32); 3] = [(5, 10), (11, 22), (50, 100)];
189 let (_, bytes, _) = unsafe { src.align_to::<u8>() };
190 assert_eq!(bytes.len(), 24);
191 mm.copy_from_slice(bytes);
192 }
193
194 mm_file.flush_ft().await?;
195
196 let (mm_file2, _) = MemoryMappedMutFile::create(&index_path, 24).await?;
197 let mm2 = mm_file2.mut_inner();
198 let (_, pairs, _) = unsafe { mm2.align_to::<(u32, u32)>() };
199 assert_eq!(pairs.len(), 3);
200 assert_eq!(pairs[0].0, 5);
201 assert_eq!(pairs[2].1, 100);
202
203 Ok(())
204 }
205
206 #[test_async]
207 async fn test_mmap_write_with_pos() -> Result<(), IoError> {
208 let index_path = temp_dir().join("testpos.index");
209 ensure_clean_file(index_path.clone());
210
211 let (mut mm_file, _) = MemoryMappedMutFile::create(&index_path, 10).await?;
212
213 let src = vec![0x05, 0x10, 0x44];
214 mm_file.write_bytes(5, &src);
215
216 mm_file.flush_ft().await?;
217
218 let mut f = File::open(&index_path)?;
219 let mut buffer = vec![0; 10];
220 f.read_exact(&mut buffer)?;
221 assert_eq!(buffer[5], 0x05);
222 assert_eq!(buffer[6], 0x10);
223 assert_eq!(buffer[7], 0x44);
224
225 Ok(())
226 }
227
228 #[test_async]
229 async fn test_empty_index_read_only() -> Result<(), IoError> {
230 let index_path = temp_dir().join("zerosized.index");
231 ensure_clean_file(index_path.clone());
232 {
233 let file = OpenOptions::new()
234 .create(true)
235 .write(true)
236 .read(true)
237 .open(&index_path)
238 .await?;
239 let meta = file.metadata().await?;
240 assert_eq!(meta.len(), 0);
241 }
242
243 let min_size = 10;
244
245 let (mm_file, _) = MemoryMappedFile::open(&index_path, min_size).await?;
246 let memory_size = mm_file.inner().len();
247 assert_eq!(memory_size, min_size as usize);
248
249 Ok(())
250 }
251
252 }