use std::fs::File;
use std::io::{BufReader, Read};
use std::path::Path;
use crate::tracker::FlowEvents;
use crate::{FlowEvent, FlowExtractor, FlowTracker, PacketView, Timestamp};
use pcap_file::pcap::PcapReader;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error("pcap: {0}")]
Pcap(#[from] pcap_file::PcapError),
}
pub struct PcapFlowSource<R: Read> {
reader: PcapReader<R>,
}
impl PcapFlowSource<BufReader<File>> {
pub fn open(path: impl AsRef<Path>) -> Result<Self, Error> {
let file = File::open(path)?;
let reader = PcapReader::new(BufReader::new(file))?;
Ok(Self { reader })
}
}
impl<R: Read> PcapFlowSource<R> {
pub fn from_reader(reader: R) -> Result<Self, Error> {
Ok(Self {
reader: PcapReader::new(reader)?,
})
}
pub fn views(self) -> ViewIter<R> {
ViewIter {
reader: self.reader,
}
}
pub fn with_extractor<E: FlowExtractor>(self, extractor: E) -> EventIter<R, E>
where
E::Key: Clone,
{
EventIter {
views: self.views(),
tracker: FlowTracker::new(extractor),
pending: std::collections::VecDeque::new(),
sweep_done: false,
}
}
}
#[derive(Debug, Clone)]
pub struct OwnedPacketView {
pub frame: Vec<u8>,
pub timestamp: Timestamp,
}
impl OwnedPacketView {
pub fn as_view(&self) -> PacketView<'_> {
PacketView::new(&self.frame, self.timestamp)
}
}
pub struct ViewIter<R: Read> {
reader: PcapReader<R>,
}
impl<R: Read> Iterator for ViewIter<R> {
type Item = Result<OwnedPacketView, Error>;
fn next(&mut self) -> Option<Self::Item> {
let pkt = self.reader.next_packet()?;
match pkt {
Ok(p) => {
let ts = Timestamp::new(p.timestamp.as_secs() as u32, p.timestamp.subsec_nanos());
Some(Ok(OwnedPacketView {
frame: p.data.into_owned(),
timestamp: ts,
}))
}
Err(e) => Some(Err(e.into())),
}
}
}
pub struct EventIter<R: Read, E: FlowExtractor>
where
E::Key: Clone,
{
views: ViewIter<R>,
tracker: FlowTracker<E, ()>,
pending: std::collections::VecDeque<FlowEvent<E::Key>>,
sweep_done: bool,
}
impl<R: Read, E: FlowExtractor> Iterator for EventIter<R, E>
where
E::Key: Clone,
{
type Item = Result<FlowEvent<E::Key>, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(ev) = self.pending.pop_front() {
return Some(Ok(ev));
}
match self.views.next() {
Some(Ok(view)) => {
let evts: FlowEvents<E::Key> = self.tracker.track(view.as_view());
for ev in evts {
self.pending.push_back(ev);
}
}
Some(Err(e)) => return Some(Err(e)),
None => {
if !self.sweep_done {
self.sweep_done = true;
let last_seen_sec = self
.tracker
.flows()
.map(|(_, e)| e.stats.last_seen.sec)
.max()
.unwrap_or(0);
let far = Timestamp::new(last_seen_sec.saturating_add(86_400), 0);
for ev in self.tracker.sweep(far) {
self.pending.push_back(ev);
}
} else {
return None;
}
}
}
}
}
}