use std::io::{self, Read};
use pcap_parser::{
create_reader, data::get_packetdata_ethernet, data::PacketData, traits::PcapReaderIterator,
Block, PcapBlockOwned, PcapError,
};
use crate::{BareEvent, Error};
pub type Event = BareEvent;
const PCAP_BUFFER_SIZE: usize = 65536;
pub struct Input {
data_channel: Option<crossbeam_channel::Sender<Event>>,
ack_channel: crossbeam_channel::Receiver<super::SeqNo>,
iter: Box<dyn PcapReaderIterator>,
}
unsafe impl Send for Input {}
impl Input {
pub fn with_read<R: Read + Send + 'static>(
data_channel: crossbeam_channel::Sender<Event>,
ack_channel: crossbeam_channel::Receiver<super::SeqNo>,
read: R,
) -> Self {
Self {
data_channel: Some(data_channel),
ack_channel,
iter: create_reader(PCAP_BUFFER_SIZE, read).expect("pcap error"),
}
}
}
impl super::Input for Input {
type Data = Event;
type Ack = u64;
fn run(mut self) -> Result<(), Error> {
let Some(data_channel) = &self.data_channel else {
return Err(Error::ChannelClosed);
};
let mut sel = crossbeam_channel::Select::new();
let send_data = sel.send(data_channel);
let recv_ack = sel.recv(&self.ack_channel);
let mut id = 0;
'poll: loop {
match self.iter.next() {
Ok((offset, block)) => {
let res = match block {
PcapBlockOwned::NG(Block::EnhancedPacket(ref epb)) => {
get_packetdata_ethernet(epb.data, epb.caplen as usize)
}
PcapBlockOwned::NG(Block::SimplePacket(ref spb)) => {
get_packetdata_ethernet(spb.data, (spb.block_len1 - 16) as usize)
}
PcapBlockOwned::NG(_) | PcapBlockOwned::LegacyHeader(_) => None,
PcapBlockOwned::Legacy(lpb) => {
get_packetdata_ethernet(lpb.data, lpb.caplen as usize)
}
};
if let Some(PacketData::L2(eslice)) = res {
id += 1;
loop {
let oper = sel.select();
match oper.index() {
i if i == send_data => {
let event = Event {
raw: eslice.to_vec(),
seq_no: id,
};
if oper.send(data_channel, event).is_err() {
break 'poll;
}
break;
}
i if i == recv_ack => {
if oper.recv(&self.ack_channel).is_err() {
break 'poll;
}
}
_ => unreachable!(),
}
}
}
self.iter.consume_noshift(offset);
}
Err(PcapError::Eof) => break 'poll,
Err(PcapError::Incomplete(_)) => {
self.iter.refill().map_err(|e| {
Error::CannotFetch(Box::new(io::Error::new(
io::ErrorKind::Other,
format!("cannot read packet from pcap: {e:?}"),
)))
})?;
}
Err(e) => {
return Err(Error::CannotFetch(Box::new(io::Error::new(
io::ErrorKind::Other,
format!("cannot read packet from pcap: {e:?}"),
))));
}
}
}
self.data_channel = None;
for _ in &self.ack_channel {}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use std::thread;
use pcap_parser::{LegacyPcapBlock, PcapHeader, ToVec};
use crate::{pcap, Input};
fn create_pcap() -> Cursor<Vec<u8>> {
let fake_content = b"fake packet";
let pkt = LegacyPcapBlock {
ts_sec: 0,
ts_usec: 0,
caplen: fake_content.len() as u32,
origlen: fake_content.len() as u32,
data: fake_content,
}
.to_vec_raw()
.unwrap();
let mut buf = PcapHeader::new().to_vec_raw().unwrap();
for _ in 0..10 {
buf.extend(pkt.iter());
}
Cursor::new(buf)
}
#[test]
fn pcap_input() {
let tester = create_pcap();
let (data_tx, data_rx) = crossbeam_channel::bounded(1);
let (ack_tx, ack_rx) = crossbeam_channel::bounded(1);
let in_thread = thread::spawn(move || {
let input = pcap::Input::with_read(data_tx, ack_rx, tester);
input.run().unwrap()
});
let mut events = Vec::new();
{
let ack_tx = ack_tx;
for ev in data_rx {
events.push(ev.raw);
ack_tx.send(ev.seq_no).unwrap();
}
}
in_thread.join().unwrap();
assert_eq!(events.len(), 10);
}
}