1use crate::common::StoreError;
2use libc::{fcntl, F_SETLK, F_SETLKW};
3use memmap2::{MmapMut, MmapOptions};
4use std::io::{Read, Write};
5use std::{cell::RefCell, fs::File, os::fd::AsRawFd};
6
7const DEFAULT_WRITER_BUFF_SIZE: u64 = 1024;
8
9#[derive(Debug)]
10pub struct MmapBufWriter {
11 file: File,
12 file_len: u64,
13 mmap: Option<MmapMut>,
14 buf_write_len: u64,
15 conf_buf_len: u64,
16}
17
18impl MmapBufWriter {
19 pub fn new(file: File) -> Self {
20 MmapBufWriter::with_arg(file, 0, DEFAULT_WRITER_BUFF_SIZE)
21 }
22
23 pub fn with_arg(file: File, file_len: u64, buf_size: u64) -> Self {
24 MmapBufWriter {
25 file,
26 file_len,
27 mmap: None,
28 buf_write_len: 0,
29 conf_buf_len: buf_size,
30 }
31 }
32
33 fn next_mmap(&mut self) -> Result<(), StoreError> {
34 let offset = self.file_len;
35 self.file_len += self.conf_buf_len;
36 self.file.set_len(self.file_len)?;
37
38 let mmap = unsafe {
39 MmapOptions::new()
40 .offset(offset)
41 .len(self.conf_buf_len as usize)
42 .map_mut(self.file.as_raw_fd())?
43 };
44
45 self.mmap = Some(mmap);
46 self.buf_write_len = 0;
47
48 self.lock_mmap()?;
49 Ok(())
50 }
51
52 fn lock_mmap(&self) -> Result<(), StoreError> {
53 let mut lock = libc::flock {
54 l_type: libc::F_WRLCK as _,
55 l_whence: libc::SEEK_SET as i16,
56 l_start: (self.file_len - self.conf_buf_len) as i64,
57 l_len: self.conf_buf_len as i64,
58 l_pid: 0,
59 };
60 let result = unsafe { fcntl(self.file.as_raw_fd(), F_SETLK, &mut lock) };
61 if result == -1 {
62 return Err(StoreError::LockError("lock mmap buff error".to_string()));
63 }
64 Ok(())
65 }
66
67 fn unlock_mmap(&self) -> Result<(), StoreError> {
68 let mut lock = libc::flock {
69 l_type: libc::F_UNLCK as _,
70 l_whence: libc::SEEK_SET as i16,
71 l_start: (self.file_len - self.conf_buf_len) as i64,
72 l_len: self.conf_buf_len as i64,
73 l_pid: 0,
74 };
75 let result = unsafe { fcntl(self.file.as_raw_fd(), F_SETLKW, &mut lock) };
76 if result == -1 {
77 return Err(StoreError::LockError("unlock mmap buff error".to_string()));
78 }
79 Ok(())
80 }
81
82 pub fn next_offset(&self) -> u64 {
83 if self.mmap.is_none() {
84 self.file_len
85 } else {
86 self.file_len - (self.conf_buf_len - self.buf_write_len)
87 }
88 }
89}
90
91impl Drop for MmapBufWriter {
92 fn drop(&mut self) {
93 let _ = self.flush();
94 let _ = self
95 .file
96 .set_len(self.file_len - (self.conf_buf_len - self.buf_write_len));
97 }
98}
99
100impl Write for MmapBufWriter {
101 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
102 if self.mmap.is_none() {
103 self.next_mmap()?;
104 }
105
106 let write_len;
107 {
108 let buf_map = &mut self.mmap;
109 let buf_u8: &mut [u8] = buf_map.as_mut().unwrap();
110 let mut buf_write = &mut buf_u8[self.buf_write_len as usize..];
111 write_len = std::io::Write::write(&mut buf_write, buf)?;
112 self.buf_write_len += write_len as u64;
113 }
114 if self.buf_write_len >= self.conf_buf_len {
115 self.flush()?;
116 }
117 Ok(write_len)
118 }
119
120 fn flush(&mut self) -> std::io::Result<()> {
121 self.mmap.as_ref().unwrap().flush()?;
122 self.mmap = None;
123 self.unlock_mmap()?;
124 Ok(())
125 }
126}
127
128pub const DEFAULT_READEER_BUFF_SIEZ: u64 = 1024;
129
130#[derive(Debug)]
131pub struct MmapBufReader {
132 file: File,
133 file_len: u64,
134 mmap: RefCell<Option<MmapMut>>,
135 next_offset: RefCell<usize>,
136 map_len: RefCell<usize>,
137 buf_read_len: RefCell<usize>,
138 conf_buf_len: u64,
139}
140
141impl MmapBufReader {
142 pub fn new(file: File) -> Self {
143 MmapBufReader::new_with_arg(file, 0, DEFAULT_READEER_BUFF_SIEZ)
144 }
145
146 pub fn new_with_arg(file: File, offset: usize, conf_buff_len: u64) -> Self {
147 let file_len = match file.metadata() {
148 Ok(meta) => meta.len(),
149 Err(_) => 0,
150 };
151 MmapBufReader {
152 file,
153 file_len,
154 mmap: RefCell::new(None),
155 next_offset: RefCell::new(offset),
156 map_len: RefCell::new(0),
157 buf_read_len: RefCell::new(0),
158 conf_buf_len: conf_buff_len,
159 }
160 }
161
162 fn next_mmap(&self) -> Result<(), StoreError> {
163 let remain = self.file_len - *self.next_offset.borrow() as u64;
164 *self.map_len.borrow_mut() = if remain >= self.conf_buf_len {
165 self.conf_buf_len as usize
166 } else {
167 remain as usize
168 };
169 if *self.map_len.borrow() == 0 {
170 return Ok(());
171 }
172
173 let mmap = unsafe {
174 MmapOptions::new()
175 .offset(*self.next_offset.borrow() as u64)
176 .len(*self.map_len.borrow())
177 .map_mut(self.file.as_raw_fd())?
178 };
179 *self.mmap.borrow_mut() = Some(mmap);
180 *self.next_offset.borrow_mut() += *self.map_len.borrow();
181 *self.buf_read_len.borrow_mut() = 0;
182
183 self.lock_mmap()?;
184 Ok(())
185 }
186
187 fn lock_mmap(&self) -> Result<(), StoreError> {
188 let mut lock = libc::flock {
189 l_type: libc::F_RDLCK as _,
190 l_whence: libc::SEEK_SET as i16,
191 l_start: (*self.next_offset.borrow() - *self.map_len.borrow()) as i64,
192 l_len: *self.map_len.borrow() as i64,
193 l_pid: 0,
194 };
195 let result = unsafe { fcntl(self.file.as_raw_fd(), F_SETLK, &mut lock) };
196 if result == -1 {
197 return Err(StoreError::LockError("lock mmap buff error".to_string()));
198 }
199 Ok(())
200 }
201
202 fn unlock_mmap(&self) -> Result<(), StoreError> {
203 let mut lock = libc::flock {
204 l_type: libc::F_UNLCK as _,
205 l_whence: libc::SEEK_SET as i16,
206 l_start: (*self.next_offset.borrow() - *self.map_len.borrow()) as i64,
207 l_len: *self.map_len.borrow() as i64,
208 l_pid: 0,
209 };
210 let result = unsafe { fcntl(self.file.as_raw_fd(), F_SETLKW, &mut lock) };
211 if result == -1 {
212 return Err(StoreError::LockError("unlock mmap buff error".to_string()));
213 }
214 Ok(())
215 }
216}
217
218impl Drop for MmapBufReader {
219 fn drop(&mut self) {
220 let _ = self.unlock_mmap();
221 }
222}
223
224impl Read for MmapBufReader {
225 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
226 if self.mmap.borrow().is_none() {
227 self.next_mmap()?;
228 }
229 if *self.map_len.borrow() == 0 {
230 return Ok(0);
231 }
232
233 let read_len;
234 {
235 let buf_map = self.mmap.borrow();
236 let buf_u8: &[u8] = buf_map.as_ref().unwrap();
237 let buf_read = &buf_u8[*self.buf_read_len.borrow()..];
238 read_len = std::cmp::min(buf.len(), buf_read.len());
239
240 buf[..read_len].copy_from_slice(&buf_read[..read_len]);
241 *self.buf_read_len.borrow_mut() += read_len;
242 }
243
244 if *self.buf_read_len.borrow() >= *self.map_len.borrow() {
245 self.unlock_mmap()?;
246 *self.mmap.borrow_mut() = None;
247 }
248 Ok(read_len)
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use crate::{
256 chunkindex::ChunkIndexRd,
257 packet::{PacketKey, TransProto},
258 };
259 use bincode::deserialize_from;
260 use std::fs::OpenOptions;
261 use std::net::{IpAddr, Ipv4Addr};
262 use std::os::unix::fs::OpenOptionsExt;
263 use tempfile::Builder;
264 use tempfile::NamedTempFile;
265
266 #[test]
267 fn test_mmapbuf_writer() {
268 let file = NamedTempFile::new().expect("can not create tmp file");
269 let temp_file_path = file.path();
270 println!("tmp file path: {}", temp_file_path.display());
271 let file: File = file.into_file();
272 let mmap_writer = MmapBufWriter::new(file);
273
274 let tuple5 = PacketKey {
275 addr1: IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)),
276 port1: 111,
277 addr2: IpAddr::V4(Ipv4Addr::new(2, 2, 2, 2)),
278 port2: 222,
279 trans_proto: TransProto::Tcp,
280 };
281 let index = ChunkIndexRd {
282 start_time: 111,
283 end_time: 222,
284 chunk_id: 12,
285 chunk_offset: 100,
286 tuple5,
287 };
288 let result = bincode::serialize_into(mmap_writer, &index);
289 assert!(result.is_ok());
290 }
291
292 #[test]
293 fn test_mmapbuf_reader() {
294 let dir = Builder::new().tempdir().unwrap();
295 let path = dir.path().join("nsavechunkindex.test");
296 println!("file path: {:?}", &path);
297 let tuple5 = PacketKey {
298 addr1: IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)),
299 port1: 111,
300 addr2: IpAddr::V4(Ipv4Addr::new(2, 2, 2, 2)),
301 port2: 222,
302 trans_proto: TransProto::Tcp,
303 };
304 let index = ChunkIndexRd {
305 start_time: 111,
306 end_time: 222,
307 chunk_id: 12,
308 chunk_offset: 100,
309 tuple5,
310 };
311
312 let write_file = OpenOptions::new()
314 .read(true)
315 .write(true)
316 .create(true)
317 .truncate(true)
318 .mode(0o600)
319 .open(&path)
320 .unwrap();
321 let mmap_writer = MmapBufWriter::with_arg(write_file, 0, 100);
322 let result = bincode::serialize_into(mmap_writer, &index);
323 assert!(result.is_ok());
324
325 let read_file = OpenOptions::new()
327 .read(true)
328 .write(true)
329 .create(false)
330 .truncate(false)
331 .open(&path)
332 .unwrap();
333 let mut mmap_reader = MmapBufReader::new(read_file);
334 let read_index = deserialize_from::<_, ChunkIndexRd>(&mut mmap_reader).unwrap();
335 assert_eq!(read_index, index);
336 }
337
338 #[test]
339 fn test_mmapbuf_many() {
340 let index_num = 10;
341 let dir = Builder::new().tempdir().unwrap();
342 let path = dir.path().join("nsavechunkindex.test");
343 println!("file path: {:?}", &path);
344
345 let write_file = OpenOptions::new()
347 .read(true)
348 .write(true)
349 .create(true)
350 .truncate(true)
351 .mode(0o600)
352 .open(&path)
353 .unwrap();
354 let mut mmap_writer = MmapBufWriter::with_arg(write_file, 0, 100);
355 for n in 0..index_num {
356 let tuple5 = PacketKey {
357 addr1: IpAddr::V4(Ipv4Addr::new(n, 1, 1, 1)),
358 port1: 111,
359 addr2: IpAddr::V4(Ipv4Addr::new(n, 2, 2, 2)),
360 port2: 222,
361 trans_proto: TransProto::Tcp,
362 };
363 let index = ChunkIndexRd {
364 start_time: 111 + n as u128,
365 end_time: 222,
366 chunk_id: 12,
367 chunk_offset: 100,
368 tuple5,
369 };
370
371 let result = bincode::serialize_into(&mut mmap_writer, &index);
372 assert!(result.is_ok());
373 }
374
375 let read_file = OpenOptions::new()
377 .read(true)
378 .write(true)
379 .create(false)
380 .truncate(false)
381 .open(&path)
382 .unwrap();
383 let mut mmap_reader = MmapBufReader::new(read_file);
384 for n in 0..index_num + 1 {
385 let tuple5 = PacketKey {
386 addr1: IpAddr::V4(Ipv4Addr::new(n, 1, 1, 1)),
387 port1: 111,
388 addr2: IpAddr::V4(Ipv4Addr::new(n, 2, 2, 2)),
389 port2: 222,
390 trans_proto: TransProto::Tcp,
391 };
392 let index = ChunkIndexRd {
393 start_time: 111 + n as u128,
394 end_time: 222,
395 chunk_id: 12,
396 chunk_offset: 100,
397 tuple5,
398 };
399
400 let read_index = deserialize_from::<_, ChunkIndexRd>(&mut mmap_reader);
401 if read_index.is_err() {
402 let _ = dbg!(read_index);
403 } else {
404 assert_eq!(read_index.unwrap(), index);
405 }
406 }
407 }
408}