1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use std::io::Read;

use pcap_parser::{PcapBlockOwned, PcapError};

use crate::{config::Config, context::*, error::Error};

pub trait BlockAnalyzer {
    /// Initialization function, called before reading pcap data (optional)
    fn init(&mut self) -> Result<(), Error> {
        Ok(())
    }
    /// Callback function for every block of the pcap/pcapng file
    fn handle_block(
        &mut self,
        _block: &PcapBlockOwned,
        _block_ctx: &ParseBlockContext,
    ) -> Result<(), Error>;

    /// Teardown function, called after reading pcap data (optional)
    fn teardown(&mut self) {}

    fn before_refill(&mut self) {}
}

pub struct BlockEngine<A: BlockAnalyzer> {
    analyzer: A,

    capacity: usize,
}

impl<A: BlockAnalyzer> BlockEngine<A> {
    pub fn new(analyzer: A, config: &Config) -> Self {
        let capacity = config
            .get_usize("buffer_initial_capacity")
            .unwrap_or(128 * 1024);
        BlockEngine { analyzer, capacity }
    }

    pub fn analyzer(&self) -> &A {
        &self.analyzer
    }

    pub fn analyzer_mut(&mut self) -> &mut A {
        &mut self.analyzer
    }

    /// Main function: given a reader, read all pcap data and call analyzer for each Packet
    pub fn run(&mut self, reader: &mut dyn Read) -> Result<(), Error> {
        let mut reader = pcap_parser::create_reader(self.capacity, reader)?;

        self.analyzer.init()?;
        let mut ctx = ParseBlockContext::default();
        let mut last_incomplete_index = 0;

        loop {
            match reader.next() {
                Ok((offset, block)) => {
                    self.analyzer.handle_block(&block, &ctx)?;
                    ctx.block_index += 1;
                    reader.consume_noshift(offset);
                    continue;
                }
                Err(PcapError::Eof) => break,
                Err(PcapError::Incomplete) => {
                    if last_incomplete_index == ctx.block_index && reader.reader_exhausted() {
                        warn!(
                            "Could not read complete data block (block_index={})",
                            ctx.block_index
                        );
                        warn!(
                            "  Buffer: consumed={} position={}",
                            reader.consumed(),
                            reader.position()
                        );
                        warn!("Hint: the reader buffer size may be too small, or the input file may be truncated.");
                        break;
                    }
                    last_incomplete_index = ctx.block_index;
                    // refill the buffer
                    debug!("need refill");
                    self.analyzer.before_refill();
                    reader.refill().map_err(|e| e.to_owned_vec())?;
                    continue;
                }
                Err(e) => {
                    let e = e.to_owned_vec();
                    error!("error while reading: {:?}", e);
                    error!(
                        "  Buffer: consumed={} position={}",
                        reader.consumed(),
                        reader.position()
                    );
                    return Err(Error::Pcap(e));
                }
            }
        }

        self.analyzer.teardown();
        Ok(())
    }
}