use std::marker::PhantomData;
use std::ptr::NonNull;
use crate::afpacket::ffi;
use crate::afpacket::ring::{self, MmapRing};
pub use flowscope::Timestamp;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct PacketStatus {
pub truncated: bool,
pub losing: bool,
pub vlan_valid: bool,
pub vlan_tpid_valid: bool,
pub csum_valid: bool,
pub csum_not_ready: bool,
pub gso_tcp: bool,
}
impl PacketStatus {
#[inline]
pub fn from_raw(status: u32) -> Self {
Self {
truncated: status & ffi::TP_STATUS_COPY != 0,
losing: status & ffi::TP_STATUS_LOSING != 0,
vlan_valid: status & ffi::TP_STATUS_VLAN_VALID != 0,
vlan_tpid_valid: status & ffi::TP_STATUS_VLAN_TPID_VALID != 0,
csum_valid: status & ffi::TP_STATUS_CSUM_VALID != 0,
csum_not_ready: status & ffi::TP_STATUS_CSUMNOTREADY != 0,
gso_tcp: status & ffi::TP_STATUS_GSO_TCP != 0,
}
}
}
#[derive(Debug, Clone)]
pub struct OwnedPacket {
pub data: Vec<u8>,
pub timestamp: Timestamp,
pub original_len: usize,
pub status: PacketStatus,
pub direction: PacketDirection,
pub rxhash: u32,
pub vlan_tci: u16,
pub vlan_tpid: u16,
pub ll_protocol: u16,
pub source_ll_addr: [u8; 8],
pub source_ll_addr_len: u8,
}
impl OwnedPacket {
#[inline]
pub fn source_ll_addr(&self) -> &[u8] {
let n = (self.source_ll_addr_len as usize).min(self.source_ll_addr.len());
&self.source_ll_addr[..n]
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PacketDirection {
Host,
Broadcast,
Multicast,
OtherHost,
Outgoing,
Unknown(u8),
}
impl PacketDirection {
#[inline]
pub(crate) fn from_raw(pkttype: u8) -> Self {
match pkttype {
ffi::PACKET_HOST => Self::Host,
ffi::PACKET_BROADCAST => Self::Broadcast,
ffi::PACKET_MULTICAST => Self::Multicast,
ffi::PACKET_OTHERHOST => Self::OtherHost,
ffi::PACKET_OUTGOING => Self::Outgoing,
v => Self::Unknown(v),
}
}
}
pub struct Packet<'a> {
data: &'a [u8],
hdr: &'a ffi::tpacket3_hdr,
}
impl<'a> Packet<'a> {
#[inline]
pub fn data(&self) -> &'a [u8] {
self.data
}
#[inline]
pub fn timestamp(&self) -> Timestamp {
Timestamp::new(self.hdr.tp_sec, self.hdr.tp_nsec)
}
#[inline]
pub fn len(&self) -> usize {
self.hdr.tp_snaplen as usize
}
#[inline]
pub fn is_empty(&self) -> bool {
self.hdr.tp_snaplen == 0
}
#[inline]
pub fn original_len(&self) -> usize {
self.hdr.tp_len as usize
}
#[inline]
pub fn status(&self) -> PacketStatus {
PacketStatus::from_raw(self.hdr.tp_status)
}
#[inline]
pub fn rxhash(&self) -> u32 {
self.hdr.hv1.tp_rxhash
}
#[inline]
pub fn vlan_tci(&self) -> u16 {
self.hdr.hv1.tp_vlan_tci as u16
}
#[inline]
pub fn vlan_tpid(&self) -> u16 {
self.hdr.hv1.tp_vlan_tpid
}
#[inline]
pub fn direction(&self) -> PacketDirection {
let sll_offset = ffi::tpacket_align(std::mem::size_of::<ffi::tpacket3_hdr>());
let hdr_ptr = self.hdr as *const ffi::tpacket3_hdr as *const u8;
let sll_ptr = hdr_ptr.map_addr(|a| a + sll_offset);
let sll = unsafe { &*(sll_ptr as *const ffi::sockaddr_ll) };
PacketDirection::from_raw(sll.sll_pkttype)
}
#[inline]
pub fn source_ll_addr(&self) -> &[u8] {
let sll_offset = ffi::tpacket_align(std::mem::size_of::<ffi::tpacket3_hdr>());
let hdr_ptr = self.hdr as *const ffi::tpacket3_hdr as *const u8;
let sll_ptr = hdr_ptr.map_addr(|a| a + sll_offset);
let sll = unsafe { &*(sll_ptr as *const ffi::sockaddr_ll) };
let len = sll.sll_halen as usize;
&sll.sll_addr[..len.min(8)]
}
#[inline]
pub fn ll_protocol(&self) -> u16 {
let sll_offset = ffi::tpacket_align(std::mem::size_of::<ffi::tpacket3_hdr>());
let hdr_ptr = self.hdr as *const ffi::tpacket3_hdr as *const u8;
let sll_ptr = hdr_ptr.map_addr(|a| a + sll_offset);
let sll = unsafe { &*(sll_ptr as *const ffi::sockaddr_ll) };
u16::from_be(sll.sll_protocol)
}
pub fn to_owned(&self) -> OwnedPacket {
let sll_offset = ffi::tpacket_align(std::mem::size_of::<ffi::tpacket3_hdr>());
let hdr_ptr = self.hdr as *const ffi::tpacket3_hdr as *const u8;
let sll_ptr = hdr_ptr.map_addr(|a| a + sll_offset);
let sll = unsafe { &*(sll_ptr as *const ffi::sockaddr_ll) };
OwnedPacket {
data: self.data.to_vec(),
timestamp: self.timestamp(),
original_len: self.original_len(),
status: self.status(),
direction: PacketDirection::from_raw(sll.sll_pkttype),
rxhash: self.hdr.hv1.tp_rxhash,
vlan_tci: self.hdr.hv1.tp_vlan_tci as u16,
vlan_tpid: self.hdr.hv1.tp_vlan_tpid,
ll_protocol: u16::from_be(sll.sll_protocol),
source_ll_addr: sll.sll_addr,
source_ll_addr_len: sll.sll_halen.min(8),
}
}
#[cfg(feature = "parse")]
#[inline]
pub fn parse(
&self,
) -> Result<etherparse::SlicedPacket<'a>, etherparse::err::packet::SliceError> {
etherparse::SlicedPacket::from_ethernet(self.data)
}
#[inline]
pub fn view(&self) -> flowscope::PacketView<'a> {
flowscope::PacketView::new(self.data, self.timestamp())
}
}
impl OwnedPacket {
#[cfg(feature = "parse")]
#[inline]
pub fn parse(
&self,
) -> Result<etherparse::SlicedPacket<'_>, etherparse::err::packet::SliceError> {
etherparse::SlicedPacket::from_ethernet(&self.data)
}
}
pub struct PacketBatch<'a> {
block: NonNull<ffi::tpacket_block_desc>,
num_pkts: u32,
block_status: u32,
seq_num: u64,
offset_to_first_pkt: u32,
blk_len: u32,
ts_first: Timestamp,
ts_last: Timestamp,
_marker: PhantomData<&'a MmapRing>,
}
impl<'a> PacketBatch<'a> {
pub(crate) unsafe fn new(block: NonNull<ffi::tpacket_block_desc>) -> Self {
let bd = unsafe { &*block.as_ptr() };
let bh1 = unsafe { &bd.hdr.bh1 };
let ts_first = Timestamp::new(
bh1.ts_first_pkt.ts_sec,
bh1.ts_first_pkt.ts_usec,
);
let ts_last = Timestamp::new(bh1.ts_last_pkt.ts_sec, bh1.ts_last_pkt.ts_usec);
Self {
block,
num_pkts: bh1.num_pkts,
block_status: bh1.block_status,
seq_num: bh1.seq_num,
offset_to_first_pkt: bh1.offset_to_first_pkt,
blk_len: bh1.blk_len,
ts_first,
ts_last,
_marker: PhantomData,
}
}
#[inline]
pub fn len(&self) -> usize {
self.num_pkts as usize
}
#[inline]
pub fn is_empty(&self) -> bool {
self.num_pkts == 0
}
pub fn timed_out(&self) -> bool {
self.block_status & ffi::TP_STATUS_BLK_TMO != 0
}
pub fn seq_num(&self) -> u64 {
self.seq_num
}
pub fn ts_first(&self) -> Timestamp {
self.ts_first
}
pub fn ts_last(&self) -> Timestamp {
self.ts_last
}
pub fn iter(&self) -> BatchIter<'a> {
if self.num_pkts == 0 {
return BatchIter {
current: std::ptr::null(),
remaining: 0,
block_end: std::ptr::null(),
_marker: PhantomData,
};
}
let base = self.block.as_ptr().cast::<u8>();
let first = base.map_addr(|a| a + self.offset_to_first_pkt as usize);
let end = base.map_addr(|a| a + self.blk_len as usize);
BatchIter {
current: first,
remaining: self.num_pkts,
block_end: end,
_marker: PhantomData,
}
}
}
impl<'a> IntoIterator for &'a PacketBatch<'a> {
type Item = Packet<'a>;
type IntoIter = BatchIter<'a>;
fn into_iter(self) -> BatchIter<'a> {
self.iter()
}
}
impl std::fmt::Debug for PacketBatch<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PacketBatch")
.field("num_pkts", &self.num_pkts)
.field("seq_num", &self.seq_num)
.field("timed_out", &self.timed_out())
.finish()
}
}
impl Drop for PacketBatch<'_> {
fn drop(&mut self) {
unsafe { ring::release_block(self.block) };
}
}
pub struct BatchIter<'a> {
current: *const u8,
remaining: u32,
block_end: *const u8,
_marker: PhantomData<&'a ()>,
}
impl std::fmt::Debug for BatchIter<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BatchIter")
.field("remaining", &self.remaining)
.finish()
}
}
impl<'a> Iterator for BatchIter<'a> {
type Item = Packet<'a>;
#[inline]
fn next(&mut self) -> Option<Packet<'a>> {
if self.remaining == 0 {
return None;
}
let hdr_plus_sll = ffi::tpacket_align(std::mem::size_of::<ffi::tpacket3_hdr>())
+ std::mem::size_of::<ffi::sockaddr_ll>();
if (self.current as usize) + hdr_plus_sll > self.block_end as usize {
tracing::warn!("BatchIter: tpacket3_hdr extends past block boundary, stopping");
self.remaining = 0;
return None;
}
let hdr = unsafe { &*(self.current as *const ffi::tpacket3_hdr) };
let data_offset = hdr.tp_mac as usize;
let snaplen = hdr.tp_snaplen as usize;
let data_ptr = self.current.map_addr(|a| a + data_offset);
if (data_ptr as usize) + snaplen > self.block_end as usize {
tracing::warn!("BatchIter: packet data extends past block boundary, stopping");
self.remaining = 0;
return None;
}
let data = unsafe { std::slice::from_raw_parts(data_ptr, snaplen) };
if hdr.tp_next_offset != 0 {
self.current = self.current.map_addr(|a| a + hdr.tp_next_offset as usize);
self.remaining -= 1;
} else {
self.remaining = 0;
}
Some(Packet { data, hdr })
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(self.remaining as usize))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn packet_status_from_raw_empty() {
let s = PacketStatus::from_raw(0);
assert!(!s.truncated);
assert!(!s.losing);
assert!(!s.vlan_valid);
assert!(!s.csum_valid);
assert!(!s.gso_tcp);
}
#[test]
fn packet_status_from_raw_truncated() {
let s = PacketStatus::from_raw(ffi::TP_STATUS_COPY);
assert!(s.truncated);
assert!(!s.losing);
}
#[test]
fn packet_status_from_raw_combined() {
let bits = ffi::TP_STATUS_COPY
| ffi::TP_STATUS_LOSING
| ffi::TP_STATUS_VLAN_VALID
| ffi::TP_STATUS_CSUM_VALID
| ffi::TP_STATUS_GSO_TCP;
let s = PacketStatus::from_raw(bits);
assert!(s.truncated);
assert!(s.losing);
assert!(s.vlan_valid);
assert!(s.csum_valid);
assert!(s.gso_tcp);
assert!(!s.vlan_tpid_valid);
assert!(!s.csum_not_ready);
}
#[test]
fn owned_packet_clone() {
let pkt = OwnedPacket {
data: vec![0xDE, 0xAD],
timestamp: Timestamp::new(1, 2),
original_len: 100,
status: PacketStatus::default(),
direction: PacketDirection::Host,
rxhash: 0xCAFE,
vlan_tci: 100,
vlan_tpid: 0x8100,
ll_protocol: 0x0800,
source_ll_addr: [1, 2, 3, 4, 5, 6, 0, 0],
source_ll_addr_len: 6,
};
let cloned = pkt.clone();
assert_eq!(cloned.data, pkt.data);
assert_eq!(cloned.timestamp, pkt.timestamp);
assert_eq!(cloned.original_len, pkt.original_len);
assert_eq!(cloned.rxhash, 0xCAFE);
assert_eq!(cloned.source_ll_addr(), &[1, 2, 3, 4, 5, 6]);
}
fn build_synthetic_block(packets: &[&[u8]], block_status: u32) -> Vec<u8> {
let block_desc_size = std::mem::size_of::<ffi::tpacket_block_desc>();
let hdr_size = std::mem::size_of::<ffi::tpacket3_hdr>();
let mut total = block_desc_size;
for payload in packets {
let frame_size = ffi::tpacket_align(hdr_size + payload.len());
total += frame_size;
}
let block_size = total.max(4096);
let mut block = vec![0u8; block_size];
let bd = block.as_mut_ptr().cast::<ffi::tpacket_block_desc>();
unsafe {
(*bd).version = 1;
(*bd).hdr.bh1.block_status = block_status;
(*bd).hdr.bh1.num_pkts = packets.len() as u32;
(*bd).hdr.bh1.offset_to_first_pkt = block_desc_size as u32;
(*bd).hdr.bh1.blk_len = block_size as u32;
(*bd).hdr.bh1.seq_num = 1;
}
let mut offset = block_desc_size;
for (i, payload) in packets.iter().enumerate() {
let frame_size = ffi::tpacket_align(hdr_size + payload.len());
let is_last = i == packets.len() - 1;
let hdr = block[offset..].as_mut_ptr().cast::<ffi::tpacket3_hdr>();
unsafe {
(*hdr).tp_next_offset = if is_last { 0 } else { frame_size as u32 };
(*hdr).tp_sec = 1000 + i as u32;
(*hdr).tp_nsec = i as u32 * 1000;
(*hdr).tp_snaplen = payload.len() as u32;
(*hdr).tp_len = payload.len() as u32;
(*hdr).tp_status = 0;
(*hdr).tp_mac = hdr_size as u16;
(*hdr).tp_net = hdr_size as u16;
}
let data_start = offset + hdr_size;
block[data_start..data_start + payload.len()].copy_from_slice(payload);
offset += frame_size;
}
block
}
fn iter_from_block(block: &[u8], num_pkts: u32) -> BatchIter<'_> {
let _bd = block.as_ptr().cast::<ffi::tpacket_block_desc>();
let bd_size = std::mem::size_of::<ffi::tpacket_block_desc>();
let first = block[bd_size..].as_ptr();
let end = block[block.len()..].as_ptr();
BatchIter {
current: first,
remaining: num_pkts,
block_end: end,
_marker: PhantomData,
}
}
#[test]
fn batch_iter_single_packet() {
let data = b"hello world";
let block = build_synthetic_block(&[data], ffi::TP_STATUS_USER);
let mut iter = iter_from_block(&block, 1);
let pkt = iter.next().unwrap();
assert_eq!(pkt.data(), data);
assert_eq!(pkt.len(), data.len());
assert_eq!(pkt.original_len(), data.len());
assert_eq!(pkt.timestamp().sec, 1000);
assert!(iter.next().is_none());
}
#[test]
fn batch_iter_multiple_packets() {
let p1 = b"packet one";
let p2 = b"packet two!!";
let p3 = b"pkt3";
let block = build_synthetic_block(&[p1, p2, p3], ffi::TP_STATUS_USER);
let mut iter = iter_from_block(&block, 3);
let pkt1 = iter.next().unwrap();
assert_eq!(pkt1.data(), p1.as_slice());
assert_eq!(pkt1.timestamp().sec, 1000);
let pkt2 = iter.next().unwrap();
assert_eq!(pkt2.data(), p2.as_slice());
assert_eq!(pkt2.timestamp().sec, 1001);
let pkt3 = iter.next().unwrap();
assert_eq!(pkt3.data(), p3.as_slice());
assert_eq!(pkt3.timestamp().sec, 1002);
assert!(iter.next().is_none());
}
#[test]
fn batch_iter_empty_block() {
let block = build_synthetic_block(&[], ffi::TP_STATUS_USER);
let mut iter = iter_from_block(&block, 0);
assert!(iter.next().is_none());
}
#[test]
fn batch_iter_size_hint_is_upper_bound() {
let block = build_synthetic_block(&[b"a", b"bb", b"ccc"], ffi::TP_STATUS_USER);
let iter = iter_from_block(&block, 3);
assert_eq!(iter.size_hint(), (0, Some(3)));
assert_eq!(iter.count(), 3);
}
#[test]
fn batch_iter_terminates_on_last_packet_marker() {
let block = build_synthetic_block(&[b"only one"], ffi::TP_STATUS_USER);
let mut iter = iter_from_block(&block, 10);
let pkt = iter.next().unwrap();
assert_eq!(pkt.data(), b"only one");
assert!(
iter.next().is_none(),
"iterator must terminate on tp_next_offset == 0 marker"
);
assert!(iter.next().is_none(), "subsequent calls remain None");
}
#[test]
fn batch_iter_walks_three_then_stops() {
let block = build_synthetic_block(&[b"a", b"bb", b"ccc"], ffi::TP_STATUS_USER);
let mut iter = iter_from_block(&block, 3);
assert_eq!(iter.next().unwrap().data(), b"a");
assert_eq!(iter.next().unwrap().data(), b"bb");
assert_eq!(iter.next().unwrap().data(), b"ccc");
assert!(iter.next().is_none());
}
#[test]
fn packet_to_owned_roundtrip() {
let data = b"test packet data";
let block = build_synthetic_block(&[data], ffi::TP_STATUS_USER);
let mut iter = iter_from_block(&block, 1);
let pkt = iter.next().unwrap();
let owned = pkt.to_owned();
assert_eq!(owned.data, data);
assert_eq!(owned.timestamp.sec, 1000);
assert_eq!(owned.original_len, data.len());
}
#[test]
fn batch_timed_out_flag() {
let block = build_synthetic_block(&[b"data"], ffi::TP_STATUS_USER | ffi::TP_STATUS_BLK_TMO);
let bd = NonNull::new(block.as_ptr() as *mut ffi::tpacket_block_desc).unwrap();
let batch = unsafe { PacketBatch::new(bd) };
assert!(batch.timed_out());
std::mem::forget(batch);
}
#[test]
fn batch_not_timed_out() {
let block = build_synthetic_block(&[b"data"], ffi::TP_STATUS_USER);
let bd = NonNull::new(block.as_ptr() as *mut ffi::tpacket_block_desc).unwrap();
let batch = unsafe { PacketBatch::new(bd) };
assert!(!batch.timed_out());
std::mem::forget(batch);
}
#[test]
fn batch_seq_num() {
let block = build_synthetic_block(&[b"data"], ffi::TP_STATUS_USER);
let bd = NonNull::new(block.as_ptr() as *mut ffi::tpacket_block_desc).unwrap();
let batch = unsafe { PacketBatch::new(bd) };
assert_eq!(batch.seq_num(), 1);
std::mem::forget(batch);
}
#[test]
fn batch_len_and_is_empty() {
let block_with = build_synthetic_block(&[b"a", b"b"], ffi::TP_STATUS_USER);
let bd = NonNull::new(block_with.as_ptr() as *mut ffi::tpacket_block_desc).unwrap();
let batch = unsafe { PacketBatch::new(bd) };
assert_eq!(batch.len(), 2);
assert!(!batch.is_empty());
std::mem::forget(batch);
let block_empty = build_synthetic_block(&[], ffi::TP_STATUS_USER);
let bd = NonNull::new(block_empty.as_ptr() as *mut ffi::tpacket_block_desc).unwrap();
let batch = unsafe { PacketBatch::new(bd) };
assert_eq!(batch.len(), 0);
assert!(batch.is_empty());
std::mem::forget(batch);
}
}