use std::borrow::Cow;
use std::fs::File;
use std::io::{BufWriter, ErrorKind, Write};
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::{anyhow, bail, Context, Result};
use pcap_file::pcapng::blocks::enhanced_packet::EnhancedPacketBlock;
use pcap_file::pcapng::blocks::interface_description::{
InterfaceDescriptionBlock, InterfaceDescriptionOption,
};
use pcap_file::pcapng::PcapNgWriter;
use pcap_file::DataLink;
use pnet_datalink::{self, Channel::Ethernet};
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum CaptureMode {
AfPacket { interface_name: String },
}
pub struct FrameCaptureWriter {
pcap: Option<PcapNgWriter<BufWriter<File>>>,
}
impl FrameCaptureWriter {
pub fn open_rootless(output_path: &Path) -> Result<Self> {
let pcap = open_pcap_writer(output_path)?;
Ok(Self { pcap: Some(pcap) })
}
pub fn write_frame(&mut self, frame: &[u8]) -> Result<()> {
let timestamp = capture_timestamp();
let block = EnhancedPacketBlock {
interface_id: 0,
timestamp,
original_len: frame.len() as u32,
data: Cow::Owned(frame.to_vec()),
options: vec![],
};
self.pcap
.as_mut()
.ok_or_else(|| anyhow!("rootless frame capture writer is already closed"))?
.write_pcapng_block(block)
.context("failed to append enhanced packet block")?;
self.flush()?;
Ok(())
}
pub fn close(&mut self) -> Result<()> {
let Some(pcap) = self.pcap.take() else {
return Ok(());
};
let mut writer = pcap.into_inner();
writer
.flush()
.context("failed to flush rootless pcapng output")?;
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
let Some(pcap) = self.pcap.as_mut() else {
return Ok(());
};
pcap.get_mut()
.flush()
.context("failed to flush rootless pcapng output")?;
Ok(())
}
}
impl Drop for FrameCaptureWriter {
fn drop(&mut self) {
if let Err(err) = self.close() {
crate::util::warn(format!(
"failed to finalize rootless packet capture output: {err:#}"
));
}
}
}
pub struct CaptureHandle {
stop: Arc<AtomicBool>,
join: Option<JoinHandle<Result<()>>>,
}
impl CaptureHandle {
pub fn start(mode: CaptureMode, output_path: &Path) -> Result<Self> {
let output_path = output_path.to_path_buf();
let stop = Arc::new(AtomicBool::new(false));
let stop_for_thread = Arc::clone(&stop);
let join = thread::spawn(move || capture_loop(mode, &output_path, stop_for_thread));
Ok(Self {
stop,
join: Some(join),
})
}
fn stop_and_join(&mut self) -> Result<()> {
self.stop.store(true, Ordering::Relaxed);
if let Some(join) = self.join.take() {
match join.join() {
Ok(Ok(())) => {}
Ok(Err(err)) => {
return Err(err).context("packet capture stopped with an error");
}
Err(_) => {
bail!("packet capture thread panicked");
}
}
}
Ok(())
}
pub fn shutdown(mut self) -> Result<()> {
self.stop_and_join()
}
}
impl Drop for CaptureHandle {
fn drop(&mut self) {
if let Err(err) = self.stop_and_join() {
crate::util::warn(format!("{err:#}"));
}
}
}
fn capture_loop(mode: CaptureMode, output_path: &Path, stop: Arc<AtomicBool>) -> Result<()> {
let CaptureMode::AfPacket { interface_name } = mode;
let interface = pnet_datalink::interfaces()
.into_iter()
.find(|iface| iface.name == interface_name)
.ok_or_else(|| anyhow!("capture interface not found: {interface_name}"))?;
let config = pnet_datalink::Config {
read_timeout: Some(Duration::from_millis(250)),
read_buffer_size: 65_535,
promiscuous: true,
..Default::default()
};
let (_, mut rx) = match pnet_datalink::channel(&interface, config)
.with_context(|| format!("failed to open AF_PACKET channel on {interface_name}"))?
{
Ethernet(tx, rx) => (tx, rx),
_ => bail!("unsupported datalink channel type"),
};
let mut pcap = open_pcap_writer(output_path)?;
while !stop.load(Ordering::Relaxed) {
match rx.next() {
Ok(packet) => {
let timestamp = capture_timestamp();
let block = EnhancedPacketBlock {
interface_id: 0,
timestamp,
original_len: packet.len() as u32,
data: Cow::Owned(packet.to_vec()),
options: vec![],
};
pcap.write_pcapng_block(block)
.context("failed to append enhanced packet block")?;
pcap.get_mut()
.flush()
.context("failed to flush AF_PACKET pcapng output")?;
}
Err(err)
if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::TimedOut => {}
Err(err) => {
if stop.load(Ordering::Relaxed) {
crate::util::debug(format!(
"stopping AF_PACKET capture on {interface_name} after shutdown signal: {err}"
));
break;
}
return Err(err)
.with_context(|| format!("AF_PACKET receive failed on {interface_name}"));
}
}
}
let mut writer = pcap.into_inner();
writer.flush().context("failed to flush pcapng output")?;
Ok(())
}
fn open_pcap_writer(output_path: &Path) -> Result<PcapNgWriter<BufWriter<File>>> {
let file = File::create(output_path)
.with_context(|| format!("failed to create {}", output_path.display()))?;
let writer = BufWriter::new(file);
let mut pcap = PcapNgWriter::new(writer).context("failed to create pcapng writer")?;
let idb = InterfaceDescriptionBlock {
linktype: DataLink::ETHERNET,
snaplen: 65_535,
options: vec![InterfaceDescriptionOption::IfTsResol(9)],
};
pcap.write_pcapng_block(idb)
.context("failed to write pcapng interface description block")?;
pcap.get_mut()
.flush()
.context("failed to flush the initial pcapng header")?;
Ok(pcap)
}
fn capture_timestamp() -> Duration {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
}