libnsave/
mmapbuf.rs

1use crate::common::StoreError;
2use libc::{fcntl, F_SETLK, F_SETLKW};
3use memmap2::{MmapMut, MmapOptions};
4use std::io::{Read, Write};
5use std::{cell::RefCell, fs::File, os::fd::AsRawFd};
6
7const DEFAULT_WRITER_BUFF_SIZE: u64 = 1024;
8
9#[derive(Debug)]
10pub struct MmapBufWriter {
11    file: File,
12    file_len: u64,
13    mmap: Option<MmapMut>,
14    buf_write_len: u64,
15    conf_buf_len: u64,
16}
17
18impl MmapBufWriter {
19    pub fn new(file: File) -> Self {
20        MmapBufWriter::with_arg(file, 0, DEFAULT_WRITER_BUFF_SIZE)
21    }
22
23    pub fn with_arg(file: File, file_len: u64, buf_size: u64) -> Self {
24        MmapBufWriter {
25            file,
26            file_len,
27            mmap: None,
28            buf_write_len: 0,
29            conf_buf_len: buf_size,
30        }
31    }
32
33    fn next_mmap(&mut self) -> Result<(), StoreError> {
34        let offset = self.file_len;
35        self.file_len += self.conf_buf_len;
36        self.file.set_len(self.file_len)?;
37
38        let mmap = unsafe {
39            MmapOptions::new()
40                .offset(offset)
41                .len(self.conf_buf_len as usize)
42                .map_mut(self.file.as_raw_fd())?
43        };
44
45        self.mmap = Some(mmap);
46        self.buf_write_len = 0;
47
48        self.lock_mmap()?;
49        Ok(())
50    }
51
52    fn lock_mmap(&self) -> Result<(), StoreError> {
53        let mut lock = libc::flock {
54            l_type: libc::F_WRLCK as _,
55            l_whence: libc::SEEK_SET as i16,
56            l_start: (self.file_len - self.conf_buf_len) as i64,
57            l_len: self.conf_buf_len as i64,
58            l_pid: 0,
59        };
60        let result = unsafe { fcntl(self.file.as_raw_fd(), F_SETLK, &mut lock) };
61        if result == -1 {
62            return Err(StoreError::LockError("lock mmap buff error".to_string()));
63        }
64        Ok(())
65    }
66
67    fn unlock_mmap(&self) -> Result<(), StoreError> {
68        let mut lock = libc::flock {
69            l_type: libc::F_UNLCK as _,
70            l_whence: libc::SEEK_SET as i16,
71            l_start: (self.file_len - self.conf_buf_len) as i64,
72            l_len: self.conf_buf_len as i64,
73            l_pid: 0,
74        };
75        let result = unsafe { fcntl(self.file.as_raw_fd(), F_SETLKW, &mut lock) };
76        if result == -1 {
77            return Err(StoreError::LockError("unlock mmap buff error".to_string()));
78        }
79        Ok(())
80    }
81
82    pub fn next_offset(&self) -> u64 {
83        if self.mmap.is_none() {
84            self.file_len
85        } else {
86            self.file_len - (self.conf_buf_len - self.buf_write_len)
87        }
88    }
89}
90
91impl Drop for MmapBufWriter {
92    fn drop(&mut self) {
93        let _ = self.flush();
94        let _ = self
95            .file
96            .set_len(self.file_len - (self.conf_buf_len - self.buf_write_len));
97    }
98}
99
100impl Write for MmapBufWriter {
101    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
102        if self.mmap.is_none() {
103            self.next_mmap()?;
104        }
105
106        let write_len;
107        {
108            let buf_map = &mut self.mmap;
109            let buf_u8: &mut [u8] = buf_map.as_mut().unwrap();
110            let mut buf_write = &mut buf_u8[self.buf_write_len as usize..];
111            write_len = std::io::Write::write(&mut buf_write, buf)?;
112            self.buf_write_len += write_len as u64;
113        }
114        if self.buf_write_len >= self.conf_buf_len {
115            self.flush()?;
116        }
117        Ok(write_len)
118    }
119
120    fn flush(&mut self) -> std::io::Result<()> {
121        self.mmap.as_ref().unwrap().flush()?;
122        self.mmap = None;
123        self.unlock_mmap()?;
124        Ok(())
125    }
126}
127
128pub const DEFAULT_READEER_BUFF_SIEZ: u64 = 1024;
129
130#[derive(Debug)]
131pub struct MmapBufReader {
132    file: File,
133    file_len: u64,
134    mmap: RefCell<Option<MmapMut>>,
135    next_offset: RefCell<usize>,
136    map_len: RefCell<usize>,
137    buf_read_len: RefCell<usize>,
138    conf_buf_len: u64,
139}
140
141impl MmapBufReader {
142    pub fn new(file: File) -> Self {
143        MmapBufReader::new_with_arg(file, 0, DEFAULT_READEER_BUFF_SIEZ)
144    }
145
146    pub fn new_with_arg(file: File, offset: usize, conf_buff_len: u64) -> Self {
147        let file_len = match file.metadata() {
148            Ok(meta) => meta.len(),
149            Err(_) => 0,
150        };
151        MmapBufReader {
152            file,
153            file_len,
154            mmap: RefCell::new(None),
155            next_offset: RefCell::new(offset),
156            map_len: RefCell::new(0),
157            buf_read_len: RefCell::new(0),
158            conf_buf_len: conf_buff_len,
159        }
160    }
161
162    fn next_mmap(&self) -> Result<(), StoreError> {
163        let remain = self.file_len - *self.next_offset.borrow() as u64;
164        *self.map_len.borrow_mut() = if remain >= self.conf_buf_len {
165            self.conf_buf_len as usize
166        } else {
167            remain as usize
168        };
169        if *self.map_len.borrow() == 0 {
170            return Ok(());
171        }
172
173        let mmap = unsafe {
174            MmapOptions::new()
175                .offset(*self.next_offset.borrow() as u64)
176                .len(*self.map_len.borrow())
177                .map_mut(self.file.as_raw_fd())?
178        };
179        *self.mmap.borrow_mut() = Some(mmap);
180        *self.next_offset.borrow_mut() += *self.map_len.borrow();
181        *self.buf_read_len.borrow_mut() = 0;
182
183        self.lock_mmap()?;
184        Ok(())
185    }
186
187    fn lock_mmap(&self) -> Result<(), StoreError> {
188        let mut lock = libc::flock {
189            l_type: libc::F_RDLCK as _,
190            l_whence: libc::SEEK_SET as i16,
191            l_start: (*self.next_offset.borrow() - *self.map_len.borrow()) as i64,
192            l_len: *self.map_len.borrow() as i64,
193            l_pid: 0,
194        };
195        let result = unsafe { fcntl(self.file.as_raw_fd(), F_SETLK, &mut lock) };
196        if result == -1 {
197            return Err(StoreError::LockError("lock mmap buff error".to_string()));
198        }
199        Ok(())
200    }
201
202    fn unlock_mmap(&self) -> Result<(), StoreError> {
203        let mut lock = libc::flock {
204            l_type: libc::F_UNLCK as _,
205            l_whence: libc::SEEK_SET as i16,
206            l_start: (*self.next_offset.borrow() - *self.map_len.borrow()) as i64,
207            l_len: *self.map_len.borrow() as i64,
208            l_pid: 0,
209        };
210        let result = unsafe { fcntl(self.file.as_raw_fd(), F_SETLKW, &mut lock) };
211        if result == -1 {
212            return Err(StoreError::LockError("unlock mmap buff error".to_string()));
213        }
214        Ok(())
215    }
216}
217
218impl Drop for MmapBufReader {
219    fn drop(&mut self) {
220        let _ = self.unlock_mmap();
221    }
222}
223
224impl Read for MmapBufReader {
225    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
226        if self.mmap.borrow().is_none() {
227            self.next_mmap()?;
228        }
229        if *self.map_len.borrow() == 0 {
230            return Ok(0);
231        }
232
233        let read_len;
234        {
235            let buf_map = self.mmap.borrow();
236            let buf_u8: &[u8] = buf_map.as_ref().unwrap();
237            let buf_read = &buf_u8[*self.buf_read_len.borrow()..];
238            read_len = std::cmp::min(buf.len(), buf_read.len());
239
240            buf[..read_len].copy_from_slice(&buf_read[..read_len]);
241            *self.buf_read_len.borrow_mut() += read_len;
242        }
243
244        if *self.buf_read_len.borrow() >= *self.map_len.borrow() {
245            self.unlock_mmap()?;
246            *self.mmap.borrow_mut() = None;
247        }
248        Ok(read_len)
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use crate::{
256        chunkindex::ChunkIndexRd,
257        packet::{PacketKey, TransProto},
258    };
259    use bincode::deserialize_from;
260    use std::fs::OpenOptions;
261    use std::net::{IpAddr, Ipv4Addr};
262    use std::os::unix::fs::OpenOptionsExt;
263    use tempfile::Builder;
264    use tempfile::NamedTempFile;
265
266    #[test]
267    fn test_mmapbuf_writer() {
268        let file = NamedTempFile::new().expect("can not create tmp file");
269        let temp_file_path = file.path();
270        println!("tmp file path: {}", temp_file_path.display());
271        let file: File = file.into_file();
272        let mmap_writer = MmapBufWriter::new(file);
273
274        let tuple5 = PacketKey {
275            addr1: IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)),
276            port1: 111,
277            addr2: IpAddr::V4(Ipv4Addr::new(2, 2, 2, 2)),
278            port2: 222,
279            trans_proto: TransProto::Tcp,
280        };
281        let index = ChunkIndexRd {
282            start_time: 111,
283            end_time: 222,
284            chunk_id: 12,
285            chunk_offset: 100,
286            tuple5,
287        };
288        let result = bincode::serialize_into(mmap_writer, &index);
289        assert!(result.is_ok());
290    }
291
292    #[test]
293    fn test_mmapbuf_reader() {
294        let dir = Builder::new().tempdir().unwrap();
295        let path = dir.path().join("nsavechunkindex.test");
296        println!("file path: {:?}", &path);
297        let tuple5 = PacketKey {
298            addr1: IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)),
299            port1: 111,
300            addr2: IpAddr::V4(Ipv4Addr::new(2, 2, 2, 2)),
301            port2: 222,
302            trans_proto: TransProto::Tcp,
303        };
304        let index = ChunkIndexRd {
305            start_time: 111,
306            end_time: 222,
307            chunk_id: 12,
308            chunk_offset: 100,
309            tuple5,
310        };
311
312        // write
313        let write_file = OpenOptions::new()
314            .read(true)
315            .write(true)
316            .create(true)
317            .truncate(true)
318            .mode(0o600)
319            .open(&path)
320            .unwrap();
321        let mmap_writer = MmapBufWriter::with_arg(write_file, 0, 100);
322        let result = bincode::serialize_into(mmap_writer, &index);
323        assert!(result.is_ok());
324
325        // read
326        let read_file = OpenOptions::new()
327            .read(true)
328            .write(true)
329            .create(false)
330            .truncate(false)
331            .open(&path)
332            .unwrap();
333        let mut mmap_reader = MmapBufReader::new(read_file);
334        let read_index = deserialize_from::<_, ChunkIndexRd>(&mut mmap_reader).unwrap();
335        assert_eq!(read_index, index);
336    }
337
338    #[test]
339    fn test_mmapbuf_many() {
340        let index_num = 10;
341        let dir = Builder::new().tempdir().unwrap();
342        let path = dir.path().join("nsavechunkindex.test");
343        println!("file path: {:?}", &path);
344
345        // write
346        let write_file = OpenOptions::new()
347            .read(true)
348            .write(true)
349            .create(true)
350            .truncate(true)
351            .mode(0o600)
352            .open(&path)
353            .unwrap();
354        let mut mmap_writer = MmapBufWriter::with_arg(write_file, 0, 100);
355        for n in 0..index_num {
356            let tuple5 = PacketKey {
357                addr1: IpAddr::V4(Ipv4Addr::new(n, 1, 1, 1)),
358                port1: 111,
359                addr2: IpAddr::V4(Ipv4Addr::new(n, 2, 2, 2)),
360                port2: 222,
361                trans_proto: TransProto::Tcp,
362            };
363            let index = ChunkIndexRd {
364                start_time: 111 + n as u128,
365                end_time: 222,
366                chunk_id: 12,
367                chunk_offset: 100,
368                tuple5,
369            };
370
371            let result = bincode::serialize_into(&mut mmap_writer, &index);
372            assert!(result.is_ok());
373        }
374
375        // read
376        let read_file = OpenOptions::new()
377            .read(true)
378            .write(true)
379            .create(false)
380            .truncate(false)
381            .open(&path)
382            .unwrap();
383        let mut mmap_reader = MmapBufReader::new(read_file);
384        for n in 0..index_num + 1 {
385            let tuple5 = PacketKey {
386                addr1: IpAddr::V4(Ipv4Addr::new(n, 1, 1, 1)),
387                port1: 111,
388                addr2: IpAddr::V4(Ipv4Addr::new(n, 2, 2, 2)),
389                port2: 222,
390                trans_proto: TransProto::Tcp,
391            };
392            let index = ChunkIndexRd {
393                start_time: 111 + n as u128,
394                end_time: 222,
395                chunk_id: 12,
396                chunk_offset: 100,
397                tuple5,
398            };
399
400            let read_index = deserialize_from::<_, ChunkIndexRd>(&mut mmap_reader);
401            if read_index.is_err() {
402                let _ = dbg!(read_index);
403            } else {
404                assert_eq!(read_index.unwrap(), index);
405            }
406        }
407    }
408}