1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use std::io::Read;
use pcap_parser::{PcapBlockOwned, PcapError};
use crate::{config::Config, context::*, error::Error};
pub trait BlockAnalyzer {
fn init(&mut self) -> Result<(), Error> {
Ok(())
}
fn handle_block(
&mut self,
_block: &PcapBlockOwned,
_block_ctx: &ParseBlockContext,
) -> Result<(), Error>;
fn teardown(&mut self) {}
fn before_refill(&mut self) {}
}
pub struct BlockEngine<A: BlockAnalyzer> {
analyzer: A,
capacity: usize,
}
impl<A: BlockAnalyzer> BlockEngine<A> {
pub fn new(analyzer: A, config: &Config) -> Self {
let capacity = config
.get_usize("buffer_initial_capacity")
.unwrap_or(128 * 1024);
BlockEngine { analyzer, capacity }
}
pub fn analyzer(&self) -> &A {
&self.analyzer
}
pub fn analyzer_mut(&mut self) -> &mut A {
&mut self.analyzer
}
pub fn run(&mut self, reader: &mut dyn Read) -> Result<(), Error> {
let mut reader = pcap_parser::create_reader(self.capacity, reader)?;
self.analyzer.init()?;
let mut ctx = ParseBlockContext::default();
let mut last_incomplete_index = 0;
loop {
match reader.next() {
Ok((offset, block)) => {
self.analyzer.handle_block(&block, &ctx)?;
ctx.block_index += 1;
reader.consume_noshift(offset);
continue;
}
Err(PcapError::Eof) => break,
Err(PcapError::Incomplete) => {
if last_incomplete_index == ctx.block_index && reader.reader_exhausted() {
warn!(
"Could not read complete data block (block_index={})",
ctx.block_index
);
warn!(
" Buffer: consumed={} position={}",
reader.consumed(),
reader.position()
);
warn!("Hint: the reader buffer size may be too small, or the input file may be truncated.");
break;
}
last_incomplete_index = ctx.block_index;
debug!("need refill");
self.analyzer.before_refill();
reader.refill().map_err(|e| e.to_owned_vec())?;
continue;
}
Err(e) => {
let e = e.to_owned_vec();
error!("error while reading: {:?}", e);
error!(
" Buffer: consumed={} position={}",
reader.consumed(),
reader.position()
);
return Err(Error::Pcap(e));
}
}
}
self.analyzer.teardown();
Ok(())
}
}