1use crate::common::*;
2use crate::packet::*;
3use chrono::{DateTime, Datelike, Local, Timelike};
4use libc::{fcntl, F_SETLK, F_SETLKW};
5use memmap2::{Mmap, MmapMut, MmapOptions};
6use pcap::Capture as PcapCapture;
7use pcap::Linktype;
8use pcap::Packet as CapPacket;
9use pcap::PacketHeader as CapPacketHeader;
10use std::fmt;
11use std::fs::{self, File, OpenOptions};
12use std::io::{Cursor, Read, Write};
13use std::os::fd::AsRawFd;
14use std::{cell::RefCell, path::PathBuf, sync::Arc};
15
16#[derive(Debug)]
17pub struct ChunkPool {
18 pool_path: PathBuf,
19 pool_size: u64,
20 file_size: u64,
21 chunk_size: u32,
22
23 actual_file_size: u64,
24 actual_file_num: u32,
25 file_chunk_num: u32,
26 chunk_num: u32,
27
28 pool_head_fd: RefCell<Option<File>>,
29 pool_head_map: RefCell<Option<MmapMut>>,
30 pool_head: RefCell<Option<PoolHead>>,
31
32 chunk_file_id: RefCell<Option<u32>>,
33 chunk_file_fd: RefCell<Option<File>>,
34 chunk_offset: RefCell<u32>,
35 chunk_map: RefCell<Option<MmapMut>>,
36 chunk_head: RefCell<Option<ChunkHead>>,
37
38 last_cover_minute: RefCell<DateTime<Local>>,
39}
40
41impl ChunkPool {
42 pub fn new(store_dir: PathBuf, pool_size: u64, file_size: u64, chunk_size: u32) -> Self {
43 let mut path = PathBuf::new();
44 path.push(store_dir);
45 path.push("chunk_pool");
46 let actual_size = ActualSize::new(pool_size, file_size, chunk_size);
47 ChunkPool {
48 pool_path: path,
49 pool_size,
50 file_size,
51 chunk_size,
52
53 actual_file_size: actual_size.actual_file_size,
54 actual_file_num: actual_size.actual_file_num,
55 file_chunk_num: actual_size.file_chunk_num,
56 chunk_num: actual_size.chunk_num,
57
58 pool_head_fd: RefCell::new(None),
59 pool_head_map: RefCell::new(None),
60 pool_head: RefCell::new(None),
61
62 chunk_file_id: RefCell::new(None),
63 chunk_file_fd: RefCell::new(None),
64 chunk_offset: RefCell::new(0),
65 chunk_map: RefCell::new(None),
66 chunk_head: RefCell::new(None),
67
68 last_cover_minute: RefCell::new(ts_date(0)),
69 }
70 }
71
72 pub fn init(&self) -> Result<(), StoreError> {
73 if !self.pool_path.exists() && fs::create_dir_all(&self.pool_path).is_err() {
74 return Err(StoreError::InitError(
75 "chunk pool create dir error".to_string(),
76 ));
77 }
78 let pool_file_path = self.pool_path.join("pool.pl");
79 if !pool_file_path.exists() {
80 self.create_pool_file(&pool_file_path)?;
81 self.create_chunk_file()?;
82 }
83
84 let result = OpenOptions::new()
85 .read(true)
86 .write(true)
87 .create(false)
88 .truncate(false)
89 .open(pool_file_path);
90 match result {
91 Ok(pool_file_fd) => {
92 *self.pool_head_fd.borrow_mut() = Some(pool_file_fd);
93 }
94 Err(e) => return Err(StoreError::IoError(e)),
95 }
96
97 let mmap = unsafe {
98 MmapOptions::new()
99 .offset(0)
100 .len(PoolHead::serialize_size())
101 .map_mut(self.pool_head_fd.borrow_mut().as_mut().unwrap().as_raw_fd())?
102 };
103 *self.pool_head_map.borrow_mut() = Some(mmap);
104
105 let pool_head_map = self.pool_head_map.borrow_mut();
106 let mut cursor = Cursor::new(pool_head_map.as_ref().unwrap());
107 let pool_file = PoolHead::deserialize_from(&mut cursor).unwrap();
108 *self.pool_head.borrow_mut() = Some(pool_file);
109 self.check_conf()?;
110
111 self.next_chunk(|_, _| {})?;
112 Ok(())
113 }
114
115 fn check_conf(&self) -> Result<(), StoreError> {
116 if self.pool_size == self.pool_head.borrow().as_ref().unwrap().pool_size
117 && self.file_size == self.pool_head.borrow().as_ref().unwrap().file_size
118 && self.chunk_size == self.pool_head.borrow().as_ref().unwrap().chunk_size
119 {
120 Ok(())
121 } else {
122 Err(StoreError::InitError("conf size error".to_string()))
123 }
124 }
125
126 pub fn write<F>(
127 &self,
128 pkt: Arc<Packet>,
129 now: u128,
130 cover_chunk_fn: F,
131 ) -> Result<ChunkOffset, StoreError>
132 where
133 F: Fn(PathBuf, u128),
134 {
135 if self.chunk_head.borrow().as_ref().unwrap().start_time == 0 {
136 self.chunk_head.borrow_mut().as_mut().unwrap().start_time = now;
137 }
138
139 if self.chunk_size - self.chunk_head.borrow().as_ref().unwrap().filled_size
140 < pkt.serialize_size()
141 {
142 self.flush()?;
143 self.next_chunk(cover_chunk_fn)?;
144 }
145
146 let pkt_start = self.chunk_head.borrow().as_ref().unwrap().filled_size;
147 let mut chunk_map = self.chunk_map.borrow_mut();
148 let chunk_u8: &mut [u8] = chunk_map.as_mut().unwrap();
149 let mut chunk_offset = &mut chunk_u8[pkt_start as usize..];
150
151 pkt.serialize_into(&mut chunk_offset)?;
152 self.chunk_head.borrow_mut().as_mut().unwrap().filled_size += pkt.serialize_size();
153 self.chunk_head.borrow_mut().as_mut().unwrap().end_time = now;
154
155 let mut chunk_id = self.pool_head.borrow().as_ref().unwrap().next_chunk_id;
156 if chunk_id != 0 {
157 chunk_id -= 1;
158 } else {
159 chunk_id = self.chunk_num - 1;
160 }
161
162 Ok(ChunkOffset {
163 chunk_id,
164 start_offset: pkt_start,
165 })
166 }
167
168 pub fn update(&self, offset: &ChunkOffset, value: &ChunkOffset) -> Result<(), StoreError> {
169 let offset = offset.start_offset;
170 let value = value.start_offset;
171
172 let mut chunk_map = self.chunk_map.borrow_mut();
173 let chunk_u8: &mut [u8] = chunk_map.as_mut().unwrap();
174 let mut chunk_offset = &mut chunk_u8[offset as usize..];
175 self.serialize_update(&mut chunk_offset, value)?;
176 Ok(())
177 }
178
179 fn serialize_update<W: Write>(&self, writer: &mut W, value: u32) -> Result<(), StoreError> {
180 writer.write_all(&value.to_le_bytes())?;
181 Ok(())
182 }
183
184 fn create_pool_file(&self, file_path: &PathBuf) -> Result<(), StoreError> {
185 let result = OpenOptions::new()
186 .read(false)
187 .write(true)
188 .create(true)
189 .truncate(true)
190 .open(file_path);
191 match result {
192 Ok(mut pool_file_fd) => {
193 let pool_file = PoolHead {
194 pool_size: self.pool_size,
195 file_size: self.file_size,
196 chunk_size: self.chunk_size,
197 next_chunk_id: 0,
198 };
199 pool_file.serialize_into(&mut pool_file_fd)?;
200 pool_file_fd.flush()?;
201 }
202 Err(e) => return Err(StoreError::IoError(e)),
203 }
204 Ok(())
205 }
206
207 fn create_chunk_file(&self) -> Result<(), StoreError> {
208 for i in 0..self.actual_file_num {
209 let path = self.pool_path.join(format!("{:03}.da", i));
210 let data_file = File::create(path)?;
211 data_file.set_len(self.actual_file_size)?;
212 }
213 Ok(())
214 }
215
216 fn next_chunk<F>(&self, cover_chunk_fn: F) -> Result<(), StoreError>
217 where
218 F: Fn(PathBuf, u128),
219 {
220 let chunk_id = self.pool_head.borrow().as_ref().unwrap().next_chunk_id;
221 let file_id = chunk_id / self.file_chunk_num;
222 if self.chunk_file_id.borrow().is_none() || self.chunk_file_id.borrow().unwrap() != file_id
223 {
224 let path = self.pool_path.join(format!("{:03}.da", file_id));
225 let result = OpenOptions::new()
226 .read(true)
227 .write(true)
228 .create(false)
229 .truncate(false)
230 .open(path);
231 match result {
232 Ok(file_fd) => {
233 *self.chunk_file_id.borrow_mut() = Some(file_id);
234 *self.chunk_file_fd.borrow_mut() = Some(file_fd);
235 }
236 Err(e) => {
237 return Err(StoreError::IoError(e));
238 }
239 }
240 }
241
242 let inner_chunk_id = chunk_id - file_id * self.file_chunk_num;
243 let chunk_offset = inner_chunk_id * self.chunk_size;
244 let mmap = unsafe {
245 MmapOptions::new()
246 .offset(chunk_offset.into())
247 .len(self.chunk_size as usize)
248 .map_mut(
249 self.chunk_file_fd
250 .borrow_mut()
251 .as_mut()
252 .unwrap()
253 .as_raw_fd(),
254 )?
255 };
256 *self.chunk_offset.borrow_mut() = chunk_offset;
257 *self.chunk_map.borrow_mut() = Some(mmap);
258
259 let chunk_map = self.chunk_map.borrow_mut();
260 let mut cursor = Cursor::new(chunk_map.as_ref().unwrap());
261 let old_chunk_head = ChunkHead::deserialize_from(&mut cursor)?;
262 let cover_minute = ts_date(old_chunk_head.end_time);
263 let last_minute = *self.last_cover_minute.borrow();
264 if !(cover_minute.year() == last_minute.year()
265 && cover_minute.month() == last_minute.month()
266 && cover_minute.day() == last_minute.day()
267 && cover_minute.hour() == last_minute.hour()
268 && cover_minute.minute() == last_minute.minute())
269 {
270 *self.last_cover_minute.borrow_mut() = cover_minute;
271 cover_chunk_fn(self.pool_path.clone(), old_chunk_head.end_time);
272 }
273
274 *self.chunk_head.borrow_mut() = Some(ChunkHead::new());
275 self.pool_head.borrow_mut().as_mut().unwrap().next_chunk_id += 1;
276 if self.pool_head.borrow().as_ref().unwrap().next_chunk_id >= self.chunk_num {
277 self.pool_head.borrow_mut().as_mut().unwrap().next_chunk_id = 0;
278 }
279 Ok(())
280 }
281
282 pub fn flush(&self) -> Result<(), StoreError> {
283 self.wlock_chunk()?;
284 {
285 let mut chunk_head_map = self.chunk_map.borrow_mut();
286 let mut chunk_head_map_u8: &mut [u8] = chunk_head_map.as_mut().unwrap();
287 self.chunk_head
288 .borrow()
289 .as_ref()
290 .unwrap()
291 .serialize_into(&mut chunk_head_map_u8)?;
292 }
293 self.chunk_map.borrow().as_ref().unwrap().flush()?;
294 self.unlock_chunk()?;
295
296 self.wlock_pool_head()?;
297 {
298 let mut pool_head_map = self.pool_head_map.borrow_mut();
299 let mut pool_head_map_u8: &mut [u8] = pool_head_map.as_mut().unwrap();
300 self.pool_head
301 .borrow()
302 .as_ref()
303 .unwrap()
304 .serialize_into(&mut pool_head_map_u8)?;
305 }
306 self.pool_head_map.borrow().as_ref().unwrap().flush()?;
307 self.unlock_pool_head()?;
308 Ok(())
309 }
310
311 fn wlock_pool_head(&self) -> Result<(), StoreError> {
312 let mut lock = libc::flock {
313 l_type: libc::F_WRLCK as _,
314 l_whence: libc::SEEK_SET as i16,
315 l_start: 0,
316 l_len: PoolHead::serialize_size() as i64,
317 l_pid: 0,
318 };
319 let result = unsafe {
320 fcntl(
321 self.pool_head_fd.borrow().as_ref().unwrap().as_raw_fd(),
322 F_SETLK,
323 &mut lock,
324 )
325 };
326 if result == -1 {
327 return Err(StoreError::LockError("lock_pool_head error".to_string()));
328 }
329 Ok(())
330 }
331
332 fn unlock_pool_head(&self) -> Result<(), StoreError> {
333 let mut lock = libc::flock {
334 l_type: libc::F_UNLCK as _,
335 l_whence: libc::SEEK_SET as i16,
336 l_start: 0,
337 l_len: PoolHead::serialize_size() as i64,
338 l_pid: 0,
339 };
340 let result = unsafe {
341 fcntl(
342 self.pool_head_fd.borrow().as_ref().unwrap().as_raw_fd(),
343 F_SETLKW,
344 &mut lock,
345 )
346 };
347 if result == -1 {
348 return Err(StoreError::LockError("unlock_pool_head error".to_string()));
349 }
350 Ok(())
351 }
352
353 fn wlock_chunk(&self) -> Result<(), StoreError> {
354 let mut lock = libc::flock {
355 l_type: libc::F_WRLCK as _,
356 l_whence: libc::SEEK_SET as i16,
357 l_start: *self.chunk_offset.borrow() as i64,
358 l_len: self.chunk_size as i64,
359 l_pid: 0,
360 };
361 let result = unsafe {
362 fcntl(
363 self.chunk_file_fd.borrow().as_ref().unwrap().as_raw_fd(),
364 F_SETLKW,
365 &mut lock,
366 )
367 };
368 if result == -1 {
369 return Err(StoreError::LockError("lock_chunk error".to_string()));
370 }
371 Ok(())
372 }
373
374 fn unlock_chunk(&self) -> Result<(), StoreError> {
375 let mut lock = libc::flock {
376 l_type: libc::F_UNLCK as _,
377 l_whence: libc::SEEK_SET as i16,
378 l_start: *self.chunk_offset.borrow() as i64,
379 l_len: self.chunk_size as i64,
380 l_pid: 0,
381 };
382 let result = unsafe {
383 fcntl(
384 self.chunk_file_fd.borrow().as_ref().unwrap().as_raw_fd(),
385 F_SETLK,
386 &mut lock,
387 )
388 };
389 if result == -1 {
390 return Err(StoreError::LockError("unlock_chunk error".to_string()));
391 }
392 Ok(())
393 }
394
395 pub fn finish(&self) {
396 let _ = self.flush();
397 }
398}
399
400impl Drop for ChunkPool {
401 fn drop(&mut self) {
402 let _ = self.flush();
403 }
404}
405
406#[derive(Debug, Clone, Copy)]
407pub struct ActualSize {
408 pub actual_file_size: u64,
409 pub actual_file_num: u32,
410 pub file_chunk_num: u32,
411 pub chunk_num: u32,
412}
413
414impl ActualSize {
415 pub fn new(pool_size: u64, file_size: u64, chunk_size: u32) -> Self {
416 let actual_file_size = ((file_size - 1) / (chunk_size as u64) + 1) * (chunk_size as u64);
417 let actual_file_num = ((pool_size + actual_file_size - 1) / actual_file_size) as u32;
418 let file_chunk_num = (actual_file_size / (chunk_size as u64)) as u32;
419 let chunk_num = actual_file_num * file_chunk_num;
420 ActualSize {
421 actual_file_size,
422 actual_file_num,
423 file_chunk_num,
424 chunk_num,
425 }
426 }
427}
428
429impl fmt::Display for ActualSize {
430 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431 write!(
432 f,
433 "ActualSize {{ actual_file_size: {} M, file_num: {}, file_chunk_num: {}, chunk_num: {} }}",
434 self.actual_file_size / 1024 / 1024,
435 self.actual_file_num,
436 self.file_chunk_num,
437 self.chunk_num
438 )
439 }
440}
441
442#[derive(Debug, Clone, Copy)]
443pub struct PoolHead {
444 pub pool_size: u64,
445 pub file_size: u64,
446 pub chunk_size: u32,
447 pub next_chunk_id: u32,
448}
449
450impl PoolHead {
451 pub fn serialize_into<W: Write>(&self, writer: &mut W) -> Result<(), StoreError> {
452 writer.write_all(&self.pool_size.to_le_bytes())?;
453 writer.write_all(&self.file_size.to_le_bytes())?;
454 writer.write_all(&self.chunk_size.to_le_bytes())?;
455 writer.write_all(&self.next_chunk_id.to_le_bytes())?;
456 Ok(())
457 }
458
459 pub fn deserialize_from<R: Read>(reader: &mut R) -> Result<Self, StoreError> {
460 let mut pool_size_bytes = [0; 8];
461 let mut file_size_bytes = [0; 8];
462 let mut chunk_size_bytes = [0; 4];
463 let mut current_chunk_bytes = [0; 4];
464
465 reader.read_exact(&mut pool_size_bytes)?;
466 reader.read_exact(&mut file_size_bytes)?;
467 reader.read_exact(&mut chunk_size_bytes)?;
468 reader.read_exact(&mut current_chunk_bytes)?;
469
470 Ok(PoolHead {
471 pool_size: u64::from_le_bytes(pool_size_bytes),
472 file_size: u64::from_le_bytes(file_size_bytes),
473 chunk_size: u32::from_le_bytes(chunk_size_bytes),
474 next_chunk_id: u32::from_le_bytes(current_chunk_bytes),
475 })
476 }
477
478 pub fn serialize_size() -> usize {
479 24
480 }
481}
482
483impl fmt::Display for PoolHead {
484 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
485 write!(
486 f,
487 "PoolHead {{ pool_size: {} M, file_size: {} M, chunk_size: {} K, next_chunk_id: {} }}",
488 self.pool_size / 1024 / 1024,
489 self.file_size / 1024 / 1024,
490 self.chunk_size / 1024,
491 self.next_chunk_id
492 )
493 }
494}
495
496#[derive(Debug)]
497pub struct ChunkHead {
498 pub start_time: u128,
499 pub end_time: u128,
500 pub filled_size: u32, }
502
503impl ChunkHead {
504 pub fn new() -> Self {
505 ChunkHead {
506 start_time: 0,
507 end_time: 0,
508 filled_size: Self::serialize_size() as u32,
509 }
510 }
511
512 pub fn serialize_into<W: Write>(&self, writer: &mut W) -> Result<(), StoreError> {
513 writer.write_all(&self.start_time.to_le_bytes())?;
514 writer.write_all(&self.end_time.to_le_bytes())?;
515 writer.write_all(&self.filled_size.to_le_bytes())?;
516 Ok(())
517 }
518
519 pub fn deserialize_from<R: Read>(reader: &mut R) -> Result<Self, StoreError> {
520 let mut start_time_bytes = [0; 16];
521 let mut end_time_bytes = [0; 16];
522 let mut data_size_bytes = [0; 4];
523
524 reader.read_exact(&mut start_time_bytes)?;
525 reader.read_exact(&mut end_time_bytes)?;
526 reader.read_exact(&mut data_size_bytes)?;
527
528 Ok(ChunkHead {
529 start_time: u128::from_le_bytes(start_time_bytes),
530 end_time: u128::from_le_bytes(end_time_bytes),
531 filled_size: u32::from_le_bytes(data_size_bytes),
532 })
533 }
534
535 pub fn serialize_size() -> usize {
536 36
537 }
538}
539
540impl Default for ChunkHead {
541 fn default() -> Self {
542 Self::new()
543 }
544}
545
546impl fmt::Display for ChunkHead {
547 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548 write!(
549 f,
550 "ChunkHead {{ start_time: {}, end_time: {}, filled_size: {} K }}",
551 ts_date(self.start_time),
552 ts_date(self.end_time),
553 self.filled_size / 1024,
554 )
555 }
556}
557
558#[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
559pub struct ChunkOffset {
560 pub chunk_id: u32,
561 pub start_offset: u32,
562}
563
564impl ChunkOffset {
565 pub fn new() -> Self {
566 ChunkOffset {
567 chunk_id: 0,
568 start_offset: 0,
569 }
570 }
571}
572
573impl Default for ChunkOffset {
574 fn default() -> Self {
575 Self::new()
576 }
577}
578
579#[derive(Debug)]
580pub struct StorePacket {
581 pub next_offset: u32,
582 pub timestamp: u128,
583 pub data_len: u16,
584 pub data: Vec<u8>,
585}
586
587impl StorePacket {
588 pub fn deserialize_from<R: Read>(reader: &mut R) -> Result<Self, StoreError> {
589 let mut next_offset_bytes = [0; 4];
590 let mut timestamp_bytes = [0; 16];
591 let mut data_len_bytes = [0; 2];
592
593 reader.read_exact(&mut next_offset_bytes)?;
594 reader.read_exact(&mut timestamp_bytes)?;
595 reader.read_exact(&mut data_len_bytes)?;
596
597 let next_offset = u32::from_le_bytes(next_offset_bytes);
598 let timestamp = u128::from_le_bytes(timestamp_bytes);
599 let data_len = u16::from_le_bytes(data_len_bytes);
600 let mut data = vec![0; data_len.into()];
601 reader.read_exact(&mut data)?;
602
603 Ok(StorePacket {
604 next_offset,
605 timestamp,
606 data_len,
607 data,
608 })
609 }
610}
611
612impl fmt::Display for StorePacket {
613 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
614 write!(
615 f,
616 "StorePacket {{ next_offset: {}, timestamp: {}, data_len: {} }}",
617 self.next_offset,
618 ts_date(self.timestamp),
619 self.data_len,
620 )
621 }
622}
623
624pub fn read_pool_head(path: &PathBuf) -> Result<PoolHead, StoreError> {
625 match path.extension() {
626 Some(ext) => {
627 if !ext.to_str().unwrap().eq("pl") {
628 return Err(StoreError::CliError("not pool file".to_string()));
629 }
630 }
631 None => return Err(StoreError::CliError("not pool file".to_string())),
632 };
633
634 let mut fd = File::open(path)?;
635 let mut lock = libc::flock {
636 l_type: libc::F_RDLCK as _,
637 l_whence: libc::SEEK_SET as i16,
638 l_start: 0,
639 l_len: PoolHead::serialize_size() as i64,
640 l_pid: 0,
641 };
642 let result = unsafe { fcntl(fd.as_raw_fd(), F_SETLK, &mut lock) };
643 if result == -1 {
644 return Err(StoreError::LockError(
645 "read lock pool head error".to_string(),
646 ));
647 }
648
649 let pool_head = PoolHead::deserialize_from(&mut fd)?;
650
651 lock.l_type = libc::F_UNLCK as _;
652 let result = unsafe { fcntl(fd.as_raw_fd(), F_SETLK, &mut lock) };
653 if result == -1 {
654 return Err(StoreError::LockError("unlock pool head error".to_string()));
655 }
656
657 Ok(pool_head)
658}
659
660pub fn dump_pool_file(path: PathBuf) -> Result<(), StoreError> {
661 if let Ok(pool_head) = read_pool_head(&path) {
662 let actual_size = ActualSize::new(
663 pool_head.pool_size,
664 pool_head.file_size,
665 pool_head.chunk_size,
666 );
667 println!("pool file {:?}:\n{}", path, pool_head);
668 println!("actual size: {}", actual_size);
669 Ok(())
670 } else {
671 println!("open pool file: {:?} error", path);
672 Err(StoreError::CliError("open pool file error".to_string()))
673 }
674}
675
676pub fn dump_data_file(da_path: PathBuf) -> Result<(), StoreError> {
677 match da_path.extension() {
678 Some(ext) => {
679 if !ext.to_str().unwrap().eq("da") {
680 return Err(StoreError::CliError("not data file".to_string()));
681 }
682 }
683 None => return Err(StoreError::CliError("not data file".to_string())),
684 };
685
686 let file_stem = da_path
687 .file_stem()
688 .ok_or(StoreError::CliError("filename error".to_string()))?;
689 let file_stem_str = file_stem.to_string_lossy();
690 let file_id = file_stem_str
691 .parse::<u32>()
692 .map_err(|_| StoreError::CliError("filename error".to_string()))?;
693
694 let pool_path = da_path.parent();
695 if pool_path.is_none() {
696 println!("can not find parent path");
697 return Err(StoreError::ReadError(
698 "can not find parent path".to_string(),
699 ));
700 }
701 let pool_path = pool_path.unwrap();
702 let pool_file_path = pool_path.join("pool.pl");
703 let pool_head = read_pool_head(&pool_file_path)?;
704 let actual_size = ActualSize::new(
705 pool_head.pool_size,
706 pool_head.file_size,
707 pool_head.chunk_size,
708 );
709 println!("pool file {:?}:\n{}", pool_file_path, pool_head);
710 println!("actual size: {}", actual_size);
711
712 let data_file = match OpenOptions::new()
713 .read(true)
714 .write(false)
715 .create(false)
716 .truncate(false)
717 .open(da_path)
718 {
719 Ok(file_fd) => file_fd,
720 Err(e) => {
721 return Err(StoreError::CliError(format!("open file error: {}", e)));
722 }
723 };
724
725 for chunk in 0..actual_size.file_chunk_num {
726 let offset = chunk * pool_head.chunk_size;
727 if let Ok(mmap) = get_rlk_chunk(&data_file, offset, pool_head.chunk_size as usize) {
728 let chunk_id = chunk + file_id * actual_size.file_chunk_num;
729 dump_chunk_head(chunk_id, &mmap, &pool_head)?;
730 free_rlk_chunk(&data_file, offset, pool_head.chunk_size as usize)?;
731 } else {
732 break;
733 }
734 }
735 Ok(())
736}
737
738pub fn get_rlk_chunk(fd: &File, offset: u32, len: usize) -> Result<Mmap, StoreError> {
739 let mut lock = libc::flock {
740 l_type: libc::F_RDLCK as _,
741 l_whence: libc::SEEK_SET as i16,
742 l_start: offset as i64,
743 l_len: len as i64,
744 l_pid: 0,
745 };
746 let result = unsafe { fcntl(fd.as_raw_fd(), F_SETLK, &mut lock) };
747 if result == -1 {
748 return Err(StoreError::LockError("read lock chunk error".to_string()));
749 }
750
751 let mmap = unsafe { MmapOptions::new().offset(offset as u64).len(len).map(fd)? };
752 Ok(mmap)
753}
754
755pub fn free_rlk_chunk(fd: &File, offset: u32, len: usize) -> Result<(), StoreError> {
756 let mut lock = libc::flock {
757 l_type: libc::F_UNLCK as _,
758 l_whence: libc::SEEK_SET as i16,
759 l_start: offset as i64,
760 l_len: len as i64,
761 l_pid: 0,
762 };
763 let result = unsafe { fcntl(fd.as_raw_fd(), F_SETLK, &mut lock) };
764 if result == -1 {
765 return Err(StoreError::LockError("unlock chunk error".to_string()));
766 }
767 Ok(())
768}
769
770pub fn dump_chunk(
771 chunk_pool_path: PathBuf,
772 chunk_id: u32,
773 pcap_file: Option<PathBuf>,
774) -> Result<(), StoreError> {
775 let pool_file_path = chunk_pool_path.join("pool.pl");
776 let pool_head = read_pool_head(&pool_file_path)?;
777 let actual_size = ActualSize::new(
778 pool_head.pool_size,
779 pool_head.file_size,
780 pool_head.chunk_size,
781 );
782 println!("pool head: {}", pool_head);
783 println!("actual size: {}", actual_size);
784
785 let data_file_id = chunk_id / actual_size.file_chunk_num;
786 let data_file_path = chunk_pool_path.join(format!("{:03}.da", data_file_id));
787 let data_file = match OpenOptions::new()
788 .read(true)
789 .write(false)
790 .create(false)
791 .truncate(false)
792 .open(data_file_path)
793 {
794 Ok(file_fd) => file_fd,
795 Err(e) => {
796 return Err(StoreError::CliError(format!("open data file error: {}", e)));
797 }
798 };
799 let inner_chunk_id = chunk_id - data_file_id * actual_size.file_chunk_num;
800 let offset = inner_chunk_id * pool_head.chunk_size;
801 if let Ok(mmap) = get_rlk_chunk(&data_file, offset, pool_head.chunk_size as usize) {
802 if let Some(file) = pcap_file {
803 dump_chunk_pcap(chunk_id, &mmap, &pool_head, file)?;
804 } else {
805 dump_chunk_info(chunk_id, &mmap, &pool_head)?;
806 }
807 free_rlk_chunk(&data_file, offset, pool_head.chunk_size as usize)?;
808 }
809 Ok(())
810}
811
812fn dump_chunk_head(id: u32, chunk: &[u8], pool_head: &PoolHead) -> Result<(), StoreError> {
813 let mut cursor = Cursor::new(chunk);
814 let head = ChunkHead::deserialize_from(&mut cursor)?;
815 println!(
816 "id: {:04}, {}, remain size: {} B",
817 id,
818 head,
819 pool_head.chunk_size - head.filled_size
820 );
821 Ok(())
822}
823
824fn dump_chunk_info(id: u32, chunk: &[u8], pool_head: &PoolHead) -> Result<(), StoreError> {
825 let mut cursor = Cursor::new(chunk);
826 let head = ChunkHead::deserialize_from(&mut cursor)?;
827 println!(
828 "id: {:04}, {}, remain size: {} B",
829 id,
830 head,
831 pool_head.chunk_size - head.filled_size
832 );
833
834 println!("in chunk packet:");
835 if head.filled_size > ChunkHead::serialize_size() as u32 {
836 while cursor.position() < head.filled_size.into() {
837 let store_pkt = StorePacket::deserialize_from(&mut cursor)?;
838 println!("{}", store_pkt);
839 }
840 } else {
841 println!("no packet\n");
842 }
843 Ok(())
844}
845
846fn dump_chunk_pcap(
847 id: u32,
848 chunk: &[u8],
849 pool_head: &PoolHead,
850 pcap_file: PathBuf,
851) -> Result<(), StoreError> {
852 let mut cursor = Cursor::new(chunk);
853 let head = ChunkHead::deserialize_from(&mut cursor)?;
854 println!(
855 "chunk id: {:04}, {}, remain size: {} B",
856 id,
857 head,
858 pool_head.chunk_size - head.filled_size
859 );
860
861 println!("dump packet to pcap: {:?}", pcap_file);
862 if head.filled_size > ChunkHead::serialize_size() as u32 {
863 let capture = PcapCapture::dead(Linktype::ETHERNET);
864 if capture.is_err() {
865 return Err(StoreError::WriteError("pcap open error".to_string()));
866 }
867 let capture = capture.unwrap();
868 let mut savefile = capture.savefile(pcap_file).unwrap();
869 let mut pkt_num: u32 = 0;
870
871 while cursor.position() < head.filled_size.into() {
872 let store_pkt = StorePacket::deserialize_from(&mut cursor)?;
873 println!("save pkt: {}", store_pkt);
874
875 let header = CapPacketHeader {
876 ts: ts_timeval(store_pkt.timestamp),
877 caplen: store_pkt.data_len as u32,
878 len: store_pkt.data_len as u32,
879 };
880 let cap_pkt = CapPacket {
881 header: &header,
882 data: &store_pkt.data,
883 };
884 savefile.write(&cap_pkt);
885 pkt_num += 1;
886 }
887 let _ = savefile.flush();
888 println!("save packet num: {}", pkt_num);
889 } else {
890 println!("no packet\n");
891 }
892 Ok(())
893}