pcap-async 0.1.2

Async/Stream Extensions for libpcap
Documentation
use crate::{Config, Error, Handle, Packet};

use log::*;
use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::task;

extern "C" fn dispatch_callback(
    user: *mut u8,
    header: *const pcap_sys::pcap_pkthdr,
    data: *const u8,
) {
    if user == std::ptr::null_mut() || header == std::ptr::null() && data == std::ptr::null() {
        warn!("Invalid data passed to callback");
    } else {
        unsafe {
            let pending = std::mem::transmute::<*mut u8, &mut Vec<Packet>>(user);
            let ts = std::time::SystemTime::UNIX_EPOCH
                + std::time::Duration::from_secs((*header).ts.tv_sec as u64)
                + std::time::Duration::from_micros((*header).ts.tv_usec as u64);
            let length = (*header).caplen as usize;
            let mut data_vec = vec![0u8; length];
            std::ptr::copy(data, data_vec.as_mut_ptr(), length);
            let record = Packet::new(ts, (*header).caplen, (*header).len, data_vec);
            pending.push(record)
        }
    }
}

#[pin_project]
pub struct PacketFuture {
    pcap_handle: Arc<Handle>,
    max_packets_read: usize,
    live_capture: bool,
    outstanding: Option<task::JoinHandle<Result<Option<Vec<Packet>>, Error>>>,
}

impl PacketFuture {
    pub fn new(config: &Config, handle: &Arc<Handle>) -> PacketFuture {
        trace!("Creating new packet future");
        PacketFuture {
            pcap_handle: Arc::clone(handle),
            max_packets_read: config.max_packets_read(),
            live_capture: handle.is_live_capture(),
            outstanding: None,
        }
    }
}

fn dispatch(
    pcap_handle: Arc<Handle>,
    live_capture: bool,
    max_packets_read: usize,
) -> task::JoinHandle<Result<Option<Vec<Packet>>, Error>> {
    task::spawn_blocking(move || {
        let mut packets = vec![];

        while !pcap_handle.interrupted() {
            let ret_code = unsafe {
                pcap_sys::pcap_dispatch(
                    pcap_handle.as_mut_ptr(),
                    -1,
                    Some(dispatch_callback),
                    &mut packets as *mut Vec<Packet> as *mut u8,
                )
            };

            debug!("Dispatch returned with {}", ret_code);

            match ret_code {
                -2 => {
                    debug!("Pcap breakloop invoked");
                    return Ok(None);
                }
                -1 => {
                    let err = crate::pcap_util::convert_libpcap_error(pcap_handle.as_mut_ptr());
                    error!("Error encountered when calling pcap_dispatch: {}", err);
                    return Err(err);
                }
                0 => {
                    if packets.is_empty() {
                        trace!("No packets in buffer");
                        return Ok(Some(vec![]));
                    } else {
                        if !live_capture {
                            debug!("Not live capture, calling breakloop");
                            unsafe { pcap_sys::pcap_breakloop(pcap_handle.as_mut_ptr()) }
                        }
                        trace!("Capture loop captured {} available packets", packets.len());
                        return Ok(Some(packets));
                    }
                }
                x if x > 0 => {
                    trace!("Capture loop captured {} packets", x);
                    if packets.len() >= max_packets_read {
                        debug!(
                            "Capture loop captured up to maximum packets of {}",
                            max_packets_read
                        );
                        return Ok(Some(packets));
                    }
                }
                _ => {
                    let err = crate::pcap_util::convert_libpcap_error(pcap_handle.as_mut_ptr());
                    error!("Pcap dispatch returned {}: {:?}", ret_code, err);
                    return Err(err);
                }
            }
        }

        debug!("Interrupt invoked");

        let r = if packets.is_empty() {
            None
        } else {
            Some(packets)
        };

        return Ok(r);
    })
}

impl Future for PacketFuture {
    type Output = Result<Option<Vec<Packet>>, Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();

        let mut f = this.outstanding.take().unwrap_or_else(|| {
            dispatch(
                this.pcap_handle.clone(),
                *this.live_capture,
                *this.max_packets_read,
            )
        });

        match Pin::new(&mut f).poll(cx) {
            Poll::Pending => {
                *this.outstanding = Some(f);
                Poll::Pending
            }
            Poll::Ready(Err(e)) => Poll::Ready(Err(Error::Task(e))),
            Poll::Ready(Ok(r)) => Poll::Ready(r),
        }
    }
}