fluvio_future/fs/
mmap.rs

1// memory mapped file
2
3use std::fs::OpenOptions;
4use std::io::Error as IoError;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::RwLock;
8use std::sync::RwLockReadGuard;
9use std::sync::RwLockWriteGuard;
10
11use async_fs::File;
12use memmap2::Mmap;
13use memmap2::MmapMut;
14
15use crate::task::spawn_blocking;
16
17/// Mutable async wrapper for MmapMut
18pub struct MemoryMappedMutFile(Arc<RwLock<MmapMut>>);
19
20impl MemoryMappedMutFile {
21    pub async fn create(m_path: &Path, len: u64) -> Result<(Self, File), IoError> {
22        let owned_path = m_path.to_owned();
23        let (m_map, mfile, _) = spawn_blocking(move || {
24            let inner_path = owned_path.clone();
25            let mfile = OpenOptions::new()
26                .read(true)
27                .write(true)
28                .create(true)
29                .truncate(false)
30                .open(inner_path)
31                .unwrap();
32
33            mfile.set_len(len)?;
34
35            unsafe { MmapMut::map_mut(&mfile) }.map(|mm_file| (mm_file, mfile, owned_path))
36        })
37        .await?;
38
39        Ok((MemoryMappedMutFile::from_mmap(m_map), mfile.into()))
40    }
41
42    fn from_mmap(mmap: MmapMut) -> MemoryMappedMutFile {
43        MemoryMappedMutFile(Arc::new(RwLock::new(mmap)))
44    }
45
46    pub fn inner(&self) -> RwLockReadGuard<MmapMut> {
47        self.0.read().unwrap()
48    }
49
50    pub fn inner_map(&self) -> Arc<RwLock<MmapMut>> {
51        self.0.clone()
52    }
53
54    pub fn mut_inner(&self) -> RwLockWriteGuard<MmapMut> {
55        self.0.write().unwrap()
56    }
57
58    /// write bytes at location,
59    /// return number bytes written
60    pub fn write_bytes(&mut self, pos: usize, bytes: &[u8]) {
61        let mut m_file = self.mut_inner();
62        let m_array = &mut m_file[..];
63        m_array[pos..(pos + bytes.len())].clone_from_slice(bytes);
64    }
65
66    pub async fn flush_ft(&self) -> Result<(), IoError> {
67        let inner = self.0.clone();
68        spawn_blocking(move || {
69            let inner_map = inner.write().unwrap();
70            let res = inner_map.flush();
71            drop(inner_map);
72            res
73        })
74        .await
75    }
76
77    pub async fn flush_async_ft(&self) -> Result<(), IoError> {
78        let inner = self.0.clone();
79        spawn_blocking(move || {
80            let inner_map = inner.write().unwrap();
81            inner_map.flush_async()
82        })
83        .await
84    }
85
86    pub async fn flush_range_ft(&self, offset: usize, len: usize) -> Result<(), IoError> {
87        let inner = self.0.clone();
88        spawn_blocking(move || {
89            let inner_map = inner.write().unwrap();
90            inner_map.flush_range(offset, len)
91        })
92        .await
93    }
94}
95
96/// Async wrapper for read only mmap
97pub struct MemoryMappedFile(Arc<RwLock<Mmap>>);
98
99impl MemoryMappedFile {
100    /// open memory file, specify minimum size
101    pub async fn open<P>(path: P, min_len: u64) -> Result<(Self, File), IoError>
102    where
103        P: AsRef<Path>,
104    {
105        let m_path = path.as_ref().to_owned();
106        let (m_map, mfile, _) = spawn_blocking(move || {
107            let mfile = OpenOptions::new().read(true).open(&m_path).unwrap();
108            let meta = mfile.metadata().unwrap();
109            if meta.len() == 0 {
110                let fd = OpenOptions::new()
111                    .read(true)
112                    .write(true)
113                    .open(&m_path)
114                    .unwrap();
115
116                fd.set_len(min_len)?;
117            }
118
119            unsafe { Mmap::map(&mfile) }.map(|mm_file| (mm_file, mfile, m_path))
120        })
121        .await?;
122
123        Ok((MemoryMappedFile::from_mmap(m_map), mfile.into()))
124    }
125
126    fn from_mmap(mmap: Mmap) -> MemoryMappedFile {
127        MemoryMappedFile(Arc::new(RwLock::new(mmap)))
128    }
129
130    pub fn inner(&self) -> RwLockReadGuard<Mmap> {
131        self.0.read().unwrap()
132    }
133}
134
135#[cfg(test)]
136mod tests {
137
138    use std::env::temp_dir;
139    use std::fs::File;
140    use std::io::Error as IoError;
141    use std::io::Read;
142
143    use async_fs::OpenOptions;
144    use flv_util::fixture::ensure_clean_file;
145
146    use crate::test_async;
147
148    use super::{MemoryMappedFile, MemoryMappedMutFile};
149
150    #[test_async]
151    async fn test_mmap_write_slice() -> Result<(), IoError> {
152        let index_path = temp_dir().join("test.index");
153        ensure_clean_file(index_path.clone());
154
155        let result = MemoryMappedMutFile::create(&index_path, 3).await;
156        assert!(result.is_ok());
157
158        let (mm_file, _) = result.unwrap();
159        {
160            let mut mm = mm_file.mut_inner();
161            let src = [0x01, 0x02, 0x03];
162            mm.copy_from_slice(&src);
163        }
164
165        mm_file.flush_ft().await?;
166
167        let mut f = File::open(&index_path)?;
168        let mut buffer = vec![0; 3];
169        f.read_exact(&mut buffer)?;
170        assert_eq!(buffer[0], 0x01);
171        assert_eq!(buffer[1], 0x02);
172        assert_eq!(buffer[2], 0x03);
173
174        Ok(())
175    }
176
177    #[test_async]
178    async fn test_mmap_write_pair_slice() -> Result<(), IoError> {
179        let index_path = temp_dir().join("pairslice.index");
180        ensure_clean_file(index_path.clone());
181
182        let result = MemoryMappedMutFile::create(&index_path, 24).await;
183        assert!(result.is_ok());
184
185        let (mm_file, _) = result.unwrap();
186        {
187            let mut mm = mm_file.mut_inner();
188            let src: [(u32, u32); 3] = [(5, 10), (11, 22), (50, 100)];
189            let (_, bytes, _) = unsafe { src.align_to::<u8>() };
190            assert_eq!(bytes.len(), 24);
191            mm.copy_from_slice(bytes);
192        }
193
194        mm_file.flush_ft().await?;
195
196        let (mm_file2, _) = MemoryMappedMutFile::create(&index_path, 24).await?;
197        let mm2 = mm_file2.mut_inner();
198        let (_, pairs, _) = unsafe { mm2.align_to::<(u32, u32)>() };
199        assert_eq!(pairs.len(), 3);
200        assert_eq!(pairs[0].0, 5);
201        assert_eq!(pairs[2].1, 100);
202
203        Ok(())
204    }
205
206    #[test_async]
207    async fn test_mmap_write_with_pos() -> Result<(), IoError> {
208        let index_path = temp_dir().join("testpos.index");
209        ensure_clean_file(index_path.clone());
210
211        let (mut mm_file, _) = MemoryMappedMutFile::create(&index_path, 10).await?;
212
213        let src = vec![0x05, 0x10, 0x44];
214        mm_file.write_bytes(5, &src);
215
216        mm_file.flush_ft().await?;
217
218        let mut f = File::open(&index_path)?;
219        let mut buffer = vec![0; 10];
220        f.read_exact(&mut buffer)?;
221        assert_eq!(buffer[5], 0x05);
222        assert_eq!(buffer[6], 0x10);
223        assert_eq!(buffer[7], 0x44);
224
225        Ok(())
226    }
227
228    #[test_async]
229    async fn test_empty_index_read_only() -> Result<(), IoError> {
230        let index_path = temp_dir().join("zerosized.index");
231        ensure_clean_file(index_path.clone());
232        {
233            let file = OpenOptions::new()
234                .create(true)
235                .write(true)
236                .read(true)
237                .open(&index_path)
238                .await?;
239            let meta = file.metadata().await?;
240            assert_eq!(meta.len(), 0);
241        }
242
243        let min_size = 10;
244
245        let (mm_file, _) = MemoryMappedFile::open(&index_path, min_size).await?;
246        let memory_size = mm_file.inner().len();
247        assert_eq!(memory_size, min_size as usize);
248
249        Ok(())
250    }
251
252    /*
253    use std::fs::OpenOptions;
254    use std::path::PathBuf;
255    use memmap::MmapMut;
256
257
258    #[test]
259    fn debug_kafka_inspect() -> io::Result<()>  {
260
261        let path =  "/tmp/kafka-logs/test-0/00000000000000000000.index";
262        let file = OpenOptions::new()
263                       .read(true)
264                       .write(true)
265                       .open(path)?;
266
267        let mut mmap = unsafe { MmapMut::map_mut(&file)? };
268        println!("file size: {}",mmap.len());
269        Ok(())
270    }
271
272    #[test]
273    fn debug_file_inspect() -> io::Result<()>  {
274
275        let path =  "/tmp/kafka-logs/test-0/00000000000000000000.index";
276        let file = File::open(path)?;
277        let metadata = file.metadata()?;
278
279        println!("file len: {:#?}",metadata.len());
280        Ok(())
281    }
282    */
283}