libnsave/
chunkpool.rs

1use crate::common::*;
2use crate::packet::*;
3use chrono::{DateTime, Datelike, Local, Timelike};
4use libc::{fcntl, F_SETLK, F_SETLKW};
5use memmap2::{Mmap, MmapMut, MmapOptions};
6use pcap::Capture as PcapCapture;
7use pcap::Linktype;
8use pcap::Packet as CapPacket;
9use pcap::PacketHeader as CapPacketHeader;
10use std::fmt;
11use std::fs::{self, File, OpenOptions};
12use std::io::{Cursor, Read, Write};
13use std::os::fd::AsRawFd;
14use std::{cell::RefCell, path::PathBuf, sync::Arc};
15
16#[derive(Debug)]
17pub struct ChunkPool {
18    pool_path: PathBuf,
19    pool_size: u64,
20    file_size: u64,
21    chunk_size: u32,
22
23    actual_file_size: u64,
24    actual_file_num: u32,
25    file_chunk_num: u32,
26    chunk_num: u32,
27
28    pool_head_fd: RefCell<Option<File>>,
29    pool_head_map: RefCell<Option<MmapMut>>,
30    pool_head: RefCell<Option<PoolHead>>,
31
32    chunk_file_id: RefCell<Option<u32>>,
33    chunk_file_fd: RefCell<Option<File>>,
34    chunk_offset: RefCell<u32>,
35    chunk_map: RefCell<Option<MmapMut>>,
36    chunk_head: RefCell<Option<ChunkHead>>,
37
38    last_cover_minute: RefCell<DateTime<Local>>,
39}
40
41impl ChunkPool {
42    pub fn new(store_dir: PathBuf, pool_size: u64, file_size: u64, chunk_size: u32) -> Self {
43        let mut path = PathBuf::new();
44        path.push(store_dir);
45        path.push("chunk_pool");
46        let actual_size = ActualSize::new(pool_size, file_size, chunk_size);
47        ChunkPool {
48            pool_path: path,
49            pool_size,
50            file_size,
51            chunk_size,
52
53            actual_file_size: actual_size.actual_file_size,
54            actual_file_num: actual_size.actual_file_num,
55            file_chunk_num: actual_size.file_chunk_num,
56            chunk_num: actual_size.chunk_num,
57
58            pool_head_fd: RefCell::new(None),
59            pool_head_map: RefCell::new(None),
60            pool_head: RefCell::new(None),
61
62            chunk_file_id: RefCell::new(None),
63            chunk_file_fd: RefCell::new(None),
64            chunk_offset: RefCell::new(0),
65            chunk_map: RefCell::new(None),
66            chunk_head: RefCell::new(None),
67
68            last_cover_minute: RefCell::new(ts_date(0)),
69        }
70    }
71
72    pub fn init(&self) -> Result<(), StoreError> {
73        if !self.pool_path.exists() && fs::create_dir_all(&self.pool_path).is_err() {
74            return Err(StoreError::InitError(
75                "chunk pool create dir error".to_string(),
76            ));
77        }
78        let pool_file_path = self.pool_path.join("pool.pl");
79        if !pool_file_path.exists() {
80            self.create_pool_file(&pool_file_path)?;
81            self.create_chunk_file()?;
82        }
83
84        let result = OpenOptions::new()
85            .read(true)
86            .write(true)
87            .create(false)
88            .truncate(false)
89            .open(pool_file_path);
90        match result {
91            Ok(pool_file_fd) => {
92                *self.pool_head_fd.borrow_mut() = Some(pool_file_fd);
93            }
94            Err(e) => return Err(StoreError::IoError(e)),
95        }
96
97        let mmap = unsafe {
98            MmapOptions::new()
99                .offset(0)
100                .len(PoolHead::serialize_size())
101                .map_mut(self.pool_head_fd.borrow_mut().as_mut().unwrap().as_raw_fd())?
102        };
103        *self.pool_head_map.borrow_mut() = Some(mmap);
104
105        let pool_head_map = self.pool_head_map.borrow_mut();
106        let mut cursor = Cursor::new(pool_head_map.as_ref().unwrap());
107        let pool_file = PoolHead::deserialize_from(&mut cursor).unwrap();
108        *self.pool_head.borrow_mut() = Some(pool_file);
109        self.check_conf()?;
110
111        self.next_chunk(|_, _| {})?;
112        Ok(())
113    }
114
115    fn check_conf(&self) -> Result<(), StoreError> {
116        if self.pool_size == self.pool_head.borrow().as_ref().unwrap().pool_size
117            && self.file_size == self.pool_head.borrow().as_ref().unwrap().file_size
118            && self.chunk_size == self.pool_head.borrow().as_ref().unwrap().chunk_size
119        {
120            Ok(())
121        } else {
122            Err(StoreError::InitError("conf size error".to_string()))
123        }
124    }
125
126    pub fn write<F>(
127        &self,
128        pkt: Arc<Packet>,
129        now: u128,
130        cover_chunk_fn: F,
131    ) -> Result<ChunkOffset, StoreError>
132    where
133        F: Fn(PathBuf, u128),
134    {
135        if self.chunk_head.borrow().as_ref().unwrap().start_time == 0 {
136            self.chunk_head.borrow_mut().as_mut().unwrap().start_time = now;
137        }
138
139        if self.chunk_size - self.chunk_head.borrow().as_ref().unwrap().filled_size
140            < pkt.serialize_size()
141        {
142            self.flush()?;
143            self.next_chunk(cover_chunk_fn)?;
144        }
145
146        let pkt_start = self.chunk_head.borrow().as_ref().unwrap().filled_size;
147        let mut chunk_map = self.chunk_map.borrow_mut();
148        let chunk_u8: &mut [u8] = chunk_map.as_mut().unwrap();
149        let mut chunk_offset = &mut chunk_u8[pkt_start as usize..];
150
151        pkt.serialize_into(&mut chunk_offset)?;
152        self.chunk_head.borrow_mut().as_mut().unwrap().filled_size += pkt.serialize_size();
153        self.chunk_head.borrow_mut().as_mut().unwrap().end_time = now;
154
155        let mut chunk_id = self.pool_head.borrow().as_ref().unwrap().next_chunk_id;
156        if chunk_id != 0 {
157            chunk_id -= 1;
158        } else {
159            chunk_id = self.chunk_num - 1;
160        }
161
162        Ok(ChunkOffset {
163            chunk_id,
164            start_offset: pkt_start,
165        })
166    }
167
168    pub fn update(&self, offset: &ChunkOffset, value: &ChunkOffset) -> Result<(), StoreError> {
169        let offset = offset.start_offset;
170        let value = value.start_offset;
171
172        let mut chunk_map = self.chunk_map.borrow_mut();
173        let chunk_u8: &mut [u8] = chunk_map.as_mut().unwrap();
174        let mut chunk_offset = &mut chunk_u8[offset as usize..];
175        self.serialize_update(&mut chunk_offset, value)?;
176        Ok(())
177    }
178
179    fn serialize_update<W: Write>(&self, writer: &mut W, value: u32) -> Result<(), StoreError> {
180        writer.write_all(&value.to_le_bytes())?;
181        Ok(())
182    }
183
184    fn create_pool_file(&self, file_path: &PathBuf) -> Result<(), StoreError> {
185        let result = OpenOptions::new()
186            .read(false)
187            .write(true)
188            .create(true)
189            .truncate(true)
190            .open(file_path);
191        match result {
192            Ok(mut pool_file_fd) => {
193                let pool_file = PoolHead {
194                    pool_size: self.pool_size,
195                    file_size: self.file_size,
196                    chunk_size: self.chunk_size,
197                    next_chunk_id: 0,
198                };
199                pool_file.serialize_into(&mut pool_file_fd)?;
200                pool_file_fd.flush()?;
201            }
202            Err(e) => return Err(StoreError::IoError(e)),
203        }
204        Ok(())
205    }
206
207    fn create_chunk_file(&self) -> Result<(), StoreError> {
208        for i in 0..self.actual_file_num {
209            let path = self.pool_path.join(format!("{:03}.da", i));
210            let data_file = File::create(path)?;
211            data_file.set_len(self.actual_file_size)?;
212        }
213        Ok(())
214    }
215
216    fn next_chunk<F>(&self, cover_chunk_fn: F) -> Result<(), StoreError>
217    where
218        F: Fn(PathBuf, u128),
219    {
220        let chunk_id = self.pool_head.borrow().as_ref().unwrap().next_chunk_id;
221        let file_id = chunk_id / self.file_chunk_num;
222        if self.chunk_file_id.borrow().is_none() || self.chunk_file_id.borrow().unwrap() != file_id
223        {
224            let path = self.pool_path.join(format!("{:03}.da", file_id));
225            let result = OpenOptions::new()
226                .read(true)
227                .write(true)
228                .create(false)
229                .truncate(false)
230                .open(path);
231            match result {
232                Ok(file_fd) => {
233                    *self.chunk_file_id.borrow_mut() = Some(file_id);
234                    *self.chunk_file_fd.borrow_mut() = Some(file_fd);
235                }
236                Err(e) => {
237                    return Err(StoreError::IoError(e));
238                }
239            }
240        }
241
242        let inner_chunk_id = chunk_id - file_id * self.file_chunk_num;
243        let chunk_offset = inner_chunk_id * self.chunk_size;
244        let mmap = unsafe {
245            MmapOptions::new()
246                .offset(chunk_offset.into())
247                .len(self.chunk_size as usize)
248                .map_mut(
249                    self.chunk_file_fd
250                        .borrow_mut()
251                        .as_mut()
252                        .unwrap()
253                        .as_raw_fd(),
254                )?
255        };
256        *self.chunk_offset.borrow_mut() = chunk_offset;
257        *self.chunk_map.borrow_mut() = Some(mmap);
258
259        let chunk_map = self.chunk_map.borrow_mut();
260        let mut cursor = Cursor::new(chunk_map.as_ref().unwrap());
261        let old_chunk_head = ChunkHead::deserialize_from(&mut cursor)?;
262        let cover_minute = ts_date(old_chunk_head.end_time);
263        let last_minute = *self.last_cover_minute.borrow();
264        if !(cover_minute.year() == last_minute.year()
265            && cover_minute.month() == last_minute.month()
266            && cover_minute.day() == last_minute.day()
267            && cover_minute.hour() == last_minute.hour()
268            && cover_minute.minute() == last_minute.minute())
269        {
270            *self.last_cover_minute.borrow_mut() = cover_minute;
271            cover_chunk_fn(self.pool_path.clone(), old_chunk_head.end_time);
272        }
273
274        *self.chunk_head.borrow_mut() = Some(ChunkHead::new());
275        self.pool_head.borrow_mut().as_mut().unwrap().next_chunk_id += 1;
276        if self.pool_head.borrow().as_ref().unwrap().next_chunk_id >= self.chunk_num {
277            self.pool_head.borrow_mut().as_mut().unwrap().next_chunk_id = 0;
278        }
279        Ok(())
280    }
281
282    pub fn flush(&self) -> Result<(), StoreError> {
283        self.wlock_chunk()?;
284        {
285            let mut chunk_head_map = self.chunk_map.borrow_mut();
286            let mut chunk_head_map_u8: &mut [u8] = chunk_head_map.as_mut().unwrap();
287            self.chunk_head
288                .borrow()
289                .as_ref()
290                .unwrap()
291                .serialize_into(&mut chunk_head_map_u8)?;
292        }
293        self.chunk_map.borrow().as_ref().unwrap().flush()?;
294        self.unlock_chunk()?;
295
296        self.wlock_pool_head()?;
297        {
298            let mut pool_head_map = self.pool_head_map.borrow_mut();
299            let mut pool_head_map_u8: &mut [u8] = pool_head_map.as_mut().unwrap();
300            self.pool_head
301                .borrow()
302                .as_ref()
303                .unwrap()
304                .serialize_into(&mut pool_head_map_u8)?;
305        }
306        self.pool_head_map.borrow().as_ref().unwrap().flush()?;
307        self.unlock_pool_head()?;
308        Ok(())
309    }
310
311    fn wlock_pool_head(&self) -> Result<(), StoreError> {
312        let mut lock = libc::flock {
313            l_type: libc::F_WRLCK as _,
314            l_whence: libc::SEEK_SET as i16,
315            l_start: 0,
316            l_len: PoolHead::serialize_size() as i64,
317            l_pid: 0,
318        };
319        let result = unsafe {
320            fcntl(
321                self.pool_head_fd.borrow().as_ref().unwrap().as_raw_fd(),
322                F_SETLK,
323                &mut lock,
324            )
325        };
326        if result == -1 {
327            return Err(StoreError::LockError("lock_pool_head error".to_string()));
328        }
329        Ok(())
330    }
331
332    fn unlock_pool_head(&self) -> Result<(), StoreError> {
333        let mut lock = libc::flock {
334            l_type: libc::F_UNLCK as _,
335            l_whence: libc::SEEK_SET as i16,
336            l_start: 0,
337            l_len: PoolHead::serialize_size() as i64,
338            l_pid: 0,
339        };
340        let result = unsafe {
341            fcntl(
342                self.pool_head_fd.borrow().as_ref().unwrap().as_raw_fd(),
343                F_SETLKW,
344                &mut lock,
345            )
346        };
347        if result == -1 {
348            return Err(StoreError::LockError("unlock_pool_head error".to_string()));
349        }
350        Ok(())
351    }
352
353    fn wlock_chunk(&self) -> Result<(), StoreError> {
354        let mut lock = libc::flock {
355            l_type: libc::F_WRLCK as _,
356            l_whence: libc::SEEK_SET as i16,
357            l_start: *self.chunk_offset.borrow() as i64,
358            l_len: self.chunk_size as i64,
359            l_pid: 0,
360        };
361        let result = unsafe {
362            fcntl(
363                self.chunk_file_fd.borrow().as_ref().unwrap().as_raw_fd(),
364                F_SETLKW,
365                &mut lock,
366            )
367        };
368        if result == -1 {
369            return Err(StoreError::LockError("lock_chunk error".to_string()));
370        }
371        Ok(())
372    }
373
374    fn unlock_chunk(&self) -> Result<(), StoreError> {
375        let mut lock = libc::flock {
376            l_type: libc::F_UNLCK as _,
377            l_whence: libc::SEEK_SET as i16,
378            l_start: *self.chunk_offset.borrow() as i64,
379            l_len: self.chunk_size as i64,
380            l_pid: 0,
381        };
382        let result = unsafe {
383            fcntl(
384                self.chunk_file_fd.borrow().as_ref().unwrap().as_raw_fd(),
385                F_SETLK,
386                &mut lock,
387            )
388        };
389        if result == -1 {
390            return Err(StoreError::LockError("unlock_chunk error".to_string()));
391        }
392        Ok(())
393    }
394
395    pub fn finish(&self) {
396        let _ = self.flush();
397    }
398}
399
400impl Drop for ChunkPool {
401    fn drop(&mut self) {
402        let _ = self.flush();
403    }
404}
405
406#[derive(Debug, Clone, Copy)]
407pub struct ActualSize {
408    pub actual_file_size: u64,
409    pub actual_file_num: u32,
410    pub file_chunk_num: u32,
411    pub chunk_num: u32,
412}
413
414impl ActualSize {
415    pub fn new(pool_size: u64, file_size: u64, chunk_size: u32) -> Self {
416        let actual_file_size = ((file_size - 1) / (chunk_size as u64) + 1) * (chunk_size as u64);
417        let actual_file_num = ((pool_size + actual_file_size - 1) / actual_file_size) as u32;
418        let file_chunk_num = (actual_file_size / (chunk_size as u64)) as u32;
419        let chunk_num = actual_file_num * file_chunk_num;
420        ActualSize {
421            actual_file_size,
422            actual_file_num,
423            file_chunk_num,
424            chunk_num,
425        }
426    }
427}
428
429impl fmt::Display for ActualSize {
430    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431        write!(
432            f,
433            "ActualSize {{ actual_file_size: {} M, file_num: {}, file_chunk_num: {}, chunk_num: {} }}",
434            self.actual_file_size / 1024 / 1024,
435            self.actual_file_num,
436            self.file_chunk_num,
437            self.chunk_num
438        )
439    }
440}
441
442#[derive(Debug, Clone, Copy)]
443pub struct PoolHead {
444    pub pool_size: u64,
445    pub file_size: u64,
446    pub chunk_size: u32,
447    pub next_chunk_id: u32,
448}
449
450impl PoolHead {
451    pub fn serialize_into<W: Write>(&self, writer: &mut W) -> Result<(), StoreError> {
452        writer.write_all(&self.pool_size.to_le_bytes())?;
453        writer.write_all(&self.file_size.to_le_bytes())?;
454        writer.write_all(&self.chunk_size.to_le_bytes())?;
455        writer.write_all(&self.next_chunk_id.to_le_bytes())?;
456        Ok(())
457    }
458
459    pub fn deserialize_from<R: Read>(reader: &mut R) -> Result<Self, StoreError> {
460        let mut pool_size_bytes = [0; 8];
461        let mut file_size_bytes = [0; 8];
462        let mut chunk_size_bytes = [0; 4];
463        let mut current_chunk_bytes = [0; 4];
464
465        reader.read_exact(&mut pool_size_bytes)?;
466        reader.read_exact(&mut file_size_bytes)?;
467        reader.read_exact(&mut chunk_size_bytes)?;
468        reader.read_exact(&mut current_chunk_bytes)?;
469
470        Ok(PoolHead {
471            pool_size: u64::from_le_bytes(pool_size_bytes),
472            file_size: u64::from_le_bytes(file_size_bytes),
473            chunk_size: u32::from_le_bytes(chunk_size_bytes),
474            next_chunk_id: u32::from_le_bytes(current_chunk_bytes),
475        })
476    }
477
478    pub fn serialize_size() -> usize {
479        24
480    }
481}
482
483impl fmt::Display for PoolHead {
484    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
485        write!(
486            f,
487            "PoolHead {{ pool_size: {} M, file_size: {} M, chunk_size: {} K, next_chunk_id: {} }}",
488            self.pool_size / 1024 / 1024,
489            self.file_size / 1024 / 1024,
490            self.chunk_size / 1024,
491            self.next_chunk_id
492        )
493    }
494}
495
496#[derive(Debug)]
497pub struct ChunkHead {
498    pub start_time: u128,
499    pub end_time: u128,
500    pub filled_size: u32, // 包括chunkhead在内
501}
502
503impl ChunkHead {
504    pub fn new() -> Self {
505        ChunkHead {
506            start_time: 0,
507            end_time: 0,
508            filled_size: Self::serialize_size() as u32,
509        }
510    }
511
512    pub fn serialize_into<W: Write>(&self, writer: &mut W) -> Result<(), StoreError> {
513        writer.write_all(&self.start_time.to_le_bytes())?;
514        writer.write_all(&self.end_time.to_le_bytes())?;
515        writer.write_all(&self.filled_size.to_le_bytes())?;
516        Ok(())
517    }
518
519    pub fn deserialize_from<R: Read>(reader: &mut R) -> Result<Self, StoreError> {
520        let mut start_time_bytes = [0; 16];
521        let mut end_time_bytes = [0; 16];
522        let mut data_size_bytes = [0; 4];
523
524        reader.read_exact(&mut start_time_bytes)?;
525        reader.read_exact(&mut end_time_bytes)?;
526        reader.read_exact(&mut data_size_bytes)?;
527
528        Ok(ChunkHead {
529            start_time: u128::from_le_bytes(start_time_bytes),
530            end_time: u128::from_le_bytes(end_time_bytes),
531            filled_size: u32::from_le_bytes(data_size_bytes),
532        })
533    }
534
535    pub fn serialize_size() -> usize {
536        36
537    }
538}
539
540impl Default for ChunkHead {
541    fn default() -> Self {
542        Self::new()
543    }
544}
545
546impl fmt::Display for ChunkHead {
547    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548        write!(
549            f,
550            "ChunkHead {{ start_time: {}, end_time: {}, filled_size: {} K }}",
551            ts_date(self.start_time),
552            ts_date(self.end_time),
553            self.filled_size / 1024,
554        )
555    }
556}
557
558#[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
559pub struct ChunkOffset {
560    pub chunk_id: u32,
561    pub start_offset: u32,
562}
563
564impl ChunkOffset {
565    pub fn new() -> Self {
566        ChunkOffset {
567            chunk_id: 0,
568            start_offset: 0,
569        }
570    }
571}
572
573impl Default for ChunkOffset {
574    fn default() -> Self {
575        Self::new()
576    }
577}
578
579#[derive(Debug)]
580pub struct StorePacket {
581    pub next_offset: u32,
582    pub timestamp: u128,
583    pub data_len: u16,
584    pub data: Vec<u8>,
585}
586
587impl StorePacket {
588    pub fn deserialize_from<R: Read>(reader: &mut R) -> Result<Self, StoreError> {
589        let mut next_offset_bytes = [0; 4];
590        let mut timestamp_bytes = [0; 16];
591        let mut data_len_bytes = [0; 2];
592
593        reader.read_exact(&mut next_offset_bytes)?;
594        reader.read_exact(&mut timestamp_bytes)?;
595        reader.read_exact(&mut data_len_bytes)?;
596
597        let next_offset = u32::from_le_bytes(next_offset_bytes);
598        let timestamp = u128::from_le_bytes(timestamp_bytes);
599        let data_len = u16::from_le_bytes(data_len_bytes);
600        let mut data = vec![0; data_len.into()];
601        reader.read_exact(&mut data)?;
602
603        Ok(StorePacket {
604            next_offset,
605            timestamp,
606            data_len,
607            data,
608        })
609    }
610}
611
612impl fmt::Display for StorePacket {
613    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
614        write!(
615            f,
616            "StorePacket {{ next_offset: {}, timestamp: {}, data_len: {} }}",
617            self.next_offset,
618            ts_date(self.timestamp),
619            self.data_len,
620        )
621    }
622}
623
624pub fn read_pool_head(path: &PathBuf) -> Result<PoolHead, StoreError> {
625    match path.extension() {
626        Some(ext) => {
627            if !ext.to_str().unwrap().eq("pl") {
628                return Err(StoreError::CliError("not pool file".to_string()));
629            }
630        }
631        None => return Err(StoreError::CliError("not pool file".to_string())),
632    };
633
634    let mut fd = File::open(path)?;
635    let mut lock = libc::flock {
636        l_type: libc::F_RDLCK as _,
637        l_whence: libc::SEEK_SET as i16,
638        l_start: 0,
639        l_len: PoolHead::serialize_size() as i64,
640        l_pid: 0,
641    };
642    let result = unsafe { fcntl(fd.as_raw_fd(), F_SETLK, &mut lock) };
643    if result == -1 {
644        return Err(StoreError::LockError(
645            "read lock pool head error".to_string(),
646        ));
647    }
648
649    let pool_head = PoolHead::deserialize_from(&mut fd)?;
650
651    lock.l_type = libc::F_UNLCK as _;
652    let result = unsafe { fcntl(fd.as_raw_fd(), F_SETLK, &mut lock) };
653    if result == -1 {
654        return Err(StoreError::LockError("unlock pool head error".to_string()));
655    }
656
657    Ok(pool_head)
658}
659
660pub fn dump_pool_file(path: PathBuf) -> Result<(), StoreError> {
661    if let Ok(pool_head) = read_pool_head(&path) {
662        let actual_size = ActualSize::new(
663            pool_head.pool_size,
664            pool_head.file_size,
665            pool_head.chunk_size,
666        );
667        println!("pool file {:?}:\n{}", path, pool_head);
668        println!("actual size: {}", actual_size);
669        Ok(())
670    } else {
671        println!("open pool file: {:?} error", path);
672        Err(StoreError::CliError("open pool file error".to_string()))
673    }
674}
675
676pub fn dump_data_file(da_path: PathBuf) -> Result<(), StoreError> {
677    match da_path.extension() {
678        Some(ext) => {
679            if !ext.to_str().unwrap().eq("da") {
680                return Err(StoreError::CliError("not data file".to_string()));
681            }
682        }
683        None => return Err(StoreError::CliError("not data file".to_string())),
684    };
685
686    let file_stem = da_path
687        .file_stem()
688        .ok_or(StoreError::CliError("filename error".to_string()))?;
689    let file_stem_str = file_stem.to_string_lossy();
690    let file_id = file_stem_str
691        .parse::<u32>()
692        .map_err(|_| StoreError::CliError("filename error".to_string()))?;
693
694    let pool_path = da_path.parent();
695    if pool_path.is_none() {
696        println!("can not find parent path");
697        return Err(StoreError::ReadError(
698            "can not find parent path".to_string(),
699        ));
700    }
701    let pool_path = pool_path.unwrap();
702    let pool_file_path = pool_path.join("pool.pl");
703    let pool_head = read_pool_head(&pool_file_path)?;
704    let actual_size = ActualSize::new(
705        pool_head.pool_size,
706        pool_head.file_size,
707        pool_head.chunk_size,
708    );
709    println!("pool file {:?}:\n{}", pool_file_path, pool_head);
710    println!("actual size: {}", actual_size);
711
712    let data_file = match OpenOptions::new()
713        .read(true)
714        .write(false)
715        .create(false)
716        .truncate(false)
717        .open(da_path)
718    {
719        Ok(file_fd) => file_fd,
720        Err(e) => {
721            return Err(StoreError::CliError(format!("open file error: {}", e)));
722        }
723    };
724
725    for chunk in 0..actual_size.file_chunk_num {
726        let offset = chunk * pool_head.chunk_size;
727        if let Ok(mmap) = get_rlk_chunk(&data_file, offset, pool_head.chunk_size as usize) {
728            let chunk_id = chunk + file_id * actual_size.file_chunk_num;
729            dump_chunk_head(chunk_id, &mmap, &pool_head)?;
730            free_rlk_chunk(&data_file, offset, pool_head.chunk_size as usize)?;
731        } else {
732            break;
733        }
734    }
735    Ok(())
736}
737
738pub fn get_rlk_chunk(fd: &File, offset: u32, len: usize) -> Result<Mmap, StoreError> {
739    let mut lock = libc::flock {
740        l_type: libc::F_RDLCK as _,
741        l_whence: libc::SEEK_SET as i16,
742        l_start: offset as i64,
743        l_len: len as i64,
744        l_pid: 0,
745    };
746    let result = unsafe { fcntl(fd.as_raw_fd(), F_SETLK, &mut lock) };
747    if result == -1 {
748        return Err(StoreError::LockError("read lock chunk error".to_string()));
749    }
750
751    let mmap = unsafe { MmapOptions::new().offset(offset as u64).len(len).map(fd)? };
752    Ok(mmap)
753}
754
755pub fn free_rlk_chunk(fd: &File, offset: u32, len: usize) -> Result<(), StoreError> {
756    let mut lock = libc::flock {
757        l_type: libc::F_UNLCK as _,
758        l_whence: libc::SEEK_SET as i16,
759        l_start: offset as i64,
760        l_len: len as i64,
761        l_pid: 0,
762    };
763    let result = unsafe { fcntl(fd.as_raw_fd(), F_SETLK, &mut lock) };
764    if result == -1 {
765        return Err(StoreError::LockError("unlock chunk error".to_string()));
766    }
767    Ok(())
768}
769
770pub fn dump_chunk(
771    chunk_pool_path: PathBuf,
772    chunk_id: u32,
773    pcap_file: Option<PathBuf>,
774) -> Result<(), StoreError> {
775    let pool_file_path = chunk_pool_path.join("pool.pl");
776    let pool_head = read_pool_head(&pool_file_path)?;
777    let actual_size = ActualSize::new(
778        pool_head.pool_size,
779        pool_head.file_size,
780        pool_head.chunk_size,
781    );
782    println!("pool head: {}", pool_head);
783    println!("actual size: {}", actual_size);
784
785    let data_file_id = chunk_id / actual_size.file_chunk_num;
786    let data_file_path = chunk_pool_path.join(format!("{:03}.da", data_file_id));
787    let data_file = match OpenOptions::new()
788        .read(true)
789        .write(false)
790        .create(false)
791        .truncate(false)
792        .open(data_file_path)
793    {
794        Ok(file_fd) => file_fd,
795        Err(e) => {
796            return Err(StoreError::CliError(format!("open data file error: {}", e)));
797        }
798    };
799    let inner_chunk_id = chunk_id - data_file_id * actual_size.file_chunk_num;
800    let offset = inner_chunk_id * pool_head.chunk_size;
801    if let Ok(mmap) = get_rlk_chunk(&data_file, offset, pool_head.chunk_size as usize) {
802        if let Some(file) = pcap_file {
803            dump_chunk_pcap(chunk_id, &mmap, &pool_head, file)?;
804        } else {
805            dump_chunk_info(chunk_id, &mmap, &pool_head)?;
806        }
807        free_rlk_chunk(&data_file, offset, pool_head.chunk_size as usize)?;
808    }
809    Ok(())
810}
811
812fn dump_chunk_head(id: u32, chunk: &[u8], pool_head: &PoolHead) -> Result<(), StoreError> {
813    let mut cursor = Cursor::new(chunk);
814    let head = ChunkHead::deserialize_from(&mut cursor)?;
815    println!(
816        "id: {:04}, {}, remain size: {} B",
817        id,
818        head,
819        pool_head.chunk_size - head.filled_size
820    );
821    Ok(())
822}
823
824fn dump_chunk_info(id: u32, chunk: &[u8], pool_head: &PoolHead) -> Result<(), StoreError> {
825    let mut cursor = Cursor::new(chunk);
826    let head = ChunkHead::deserialize_from(&mut cursor)?;
827    println!(
828        "id: {:04}, {}, remain size: {} B",
829        id,
830        head,
831        pool_head.chunk_size - head.filled_size
832    );
833
834    println!("in chunk packet:");
835    if head.filled_size > ChunkHead::serialize_size() as u32 {
836        while cursor.position() < head.filled_size.into() {
837            let store_pkt = StorePacket::deserialize_from(&mut cursor)?;
838            println!("{}", store_pkt);
839        }
840    } else {
841        println!("no packet\n");
842    }
843    Ok(())
844}
845
846fn dump_chunk_pcap(
847    id: u32,
848    chunk: &[u8],
849    pool_head: &PoolHead,
850    pcap_file: PathBuf,
851) -> Result<(), StoreError> {
852    let mut cursor = Cursor::new(chunk);
853    let head = ChunkHead::deserialize_from(&mut cursor)?;
854    println!(
855        "chunk id: {:04}, {}, remain size: {} B",
856        id,
857        head,
858        pool_head.chunk_size - head.filled_size
859    );
860
861    println!("dump packet to pcap: {:?}", pcap_file);
862    if head.filled_size > ChunkHead::serialize_size() as u32 {
863        let capture = PcapCapture::dead(Linktype::ETHERNET);
864        if capture.is_err() {
865            return Err(StoreError::WriteError("pcap open error".to_string()));
866        }
867        let capture = capture.unwrap();
868        let mut savefile = capture.savefile(pcap_file).unwrap();
869        let mut pkt_num: u32 = 0;
870
871        while cursor.position() < head.filled_size.into() {
872            let store_pkt = StorePacket::deserialize_from(&mut cursor)?;
873            println!("save pkt: {}", store_pkt);
874
875            let header = CapPacketHeader {
876                ts: ts_timeval(store_pkt.timestamp),
877                caplen: store_pkt.data_len as u32,
878                len: store_pkt.data_len as u32,
879            };
880            let cap_pkt = CapPacket {
881                header: &header,
882                data: &store_pkt.data,
883            };
884            savefile.write(&cap_pkt);
885            pkt_num += 1;
886        }
887        let _ = savefile.flush();
888        println!("save packet num: {}", pkt_num);
889    } else {
890        println!("no packet\n");
891    }
892    Ok(())
893}