pako_tools/
block_engine.rs

1use std::io::Read;
2
3use pcap_parser::{PcapBlockOwned, PcapError};
4
5use crate::{config::Config, context::*, error::Error};
6
7pub trait BlockAnalyzer {
8    /// Initialization function, called before reading pcap data (optional)
9    fn init(&mut self) -> Result<(), Error> {
10        Ok(())
11    }
12    /// Callback function for every block of the pcap/pcapng file
13    fn handle_block(
14        &mut self,
15        _block: &PcapBlockOwned,
16        _block_ctx: &ParseBlockContext,
17    ) -> Result<(), Error>;
18
19    /// Teardown function, called after reading pcap data (optional)
20    fn teardown(&mut self) {}
21
22    fn before_refill(&mut self) {}
23}
24
25pub struct BlockEngine<A: BlockAnalyzer> {
26    analyzer: A,
27
28    capacity: usize,
29}
30
31impl<A: BlockAnalyzer> BlockEngine<A> {
32    pub fn new(analyzer: A, config: &Config) -> Self {
33        let capacity = config
34            .get_usize("buffer_initial_capacity")
35            .unwrap_or(128 * 1024);
36        BlockEngine { analyzer, capacity }
37    }
38
39    pub fn analyzer(&self) -> &A {
40        &self.analyzer
41    }
42
43    pub fn analyzer_mut(&mut self) -> &mut A {
44        &mut self.analyzer
45    }
46
47    /// Main function: given a reader, read all pcap data and call analyzer for each Packet
48    pub fn run(&mut self, reader: &mut dyn Read) -> Result<(), Error> {
49        let mut reader = pcap_parser::create_reader(self.capacity, reader)?;
50
51        self.analyzer.init()?;
52        let mut ctx = ParseBlockContext::default();
53        let mut last_incomplete_index = 0;
54
55        loop {
56            match reader.next() {
57                Ok((offset, block)) => {
58                    self.analyzer.handle_block(&block, &ctx)?;
59                    ctx.block_index += 1;
60                    reader.consume_noshift(offset);
61                    continue;
62                }
63                Err(PcapError::Eof) => break,
64                Err(PcapError::Incomplete) => {
65                    if last_incomplete_index == ctx.block_index && reader.reader_exhausted() {
66                        warn!(
67                            "Could not read complete data block (block_index={})",
68                            ctx.block_index
69                        );
70                        warn!(
71                            "  Buffer: consumed={} position={}",
72                            reader.consumed(),
73                            reader.position()
74                        );
75                        warn!("Hint: the reader buffer size may be too small, or the input file may be truncated.");
76                        break;
77                    }
78                    last_incomplete_index = ctx.block_index;
79                    // refill the buffer
80                    debug!("need refill");
81                    self.analyzer.before_refill();
82                    reader.refill().map_err(|e| e.to_owned_vec())?;
83                    continue;
84                }
85                Err(e) => {
86                    let e = e.to_owned_vec();
87                    error!("error while reading: {:?}", e);
88                    error!(
89                        "  Buffer: consumed={} position={}",
90                        reader.consumed(),
91                        reader.position()
92                    );
93                    return Err(Error::Pcap(e));
94                }
95            }
96        }
97
98        self.analyzer.teardown();
99        Ok(())
100    }
101}