pcap-async 0.4.1

Async/Stream Extensions for libpcap
Documentation
use crate::config::Config;
use crate::errors::Error;
use crate::handle::Handle;
use crate::packet::{Packet, PacketFuture};
use crate::pcap_util;

use futures::stream::{Stream, StreamExt};
use log::*;
use pin_project::pin_project;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

pub type StreamItem<E> = Result<Vec<Packet>, E>;

#[pin_project]
pub struct PacketStream {
    config: Config,
    handle: Arc<Handle>,
    pending: Option<PacketFuture>,
    complete: bool,
}

impl PacketStream {
    pub fn new(config: Config, handle: Arc<Handle>) -> Result<PacketStream, Error> {
        let live_capture = handle.is_live_capture();

        if live_capture {
            let h = handle
                .set_snaplen(config.snaplen())?
                .set_promiscuous()?
                .set_buffer_size(config.buffer_size())?
                .activate()?;
            if !config.blocking() {
                h.set_non_block()?;
            }

            if let Some(bpf) = config.bpf() {
                let bpf = handle.compile_bpf(bpf)?;
                handle.set_bpf(bpf)?;
            }
        }

        Ok(PacketStream {
            config: config,
            handle: handle,
            pending: None,
            complete: false,
        })
    }
}

impl Stream for PacketStream {
    type Item = StreamItem<Error>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();

        if *this.complete {
            return Poll::Ready(None);
        }

        let mut f = if let Some(f) = this.pending.take() {
            f
        } else {
            match PacketFuture::new(this.config, this.handle) {
                Err(e) => {
                    *this.complete = true;
                    return Poll::Ready(Some(Err(e)));
                }
                Ok(f) => f,
            }
        };

        match Pin::new(&mut f).poll(cx) {
            Poll::Pending => {
                *this.pending = Some(f);
                Poll::Pending
            }
            Poll::Ready(None) => {
                debug!("Stream was complete");
                *this.complete = true;
                Poll::Ready(None)
            }
            Poll::Ready(Some(Err(e))) => {
                *this.complete = true;
                Poll::Ready(Some(Err(e)))
            }
            Poll::Ready(Some(Ok(packets))) => {
                trace!("Returning {} packets", packets.len());
                Poll::Ready(Some(Ok(packets)))
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use byteorder::{ByteOrder, ReadBytesExt};
    use futures::{Future, Stream};
    use std::io::Cursor;
    use std::path::PathBuf;

    #[test]
    fn packets_from_file() {
        let _ = env_logger::try_init();

        let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
            .join("resources")
            .join("canary.pcap");

        info!("Testing against {:?}", pcap_path);

        let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
            .expect("No handle created");

        let packets = smol::block_on(async move {
            let packet_provider =
                PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
            let fut_packets = packet_provider.collect::<Vec<_>>();
            let packets: Vec<_> = fut_packets
                .await
                .into_iter()
                .flatten()
                .flatten()
                .filter(|p| p.data().len() == p.actual_length() as usize)
                .collect();

            handle.interrupt();

            packets
        });

        assert_eq!(packets.len(), 10);

        let packet = packets.first().cloned().expect("No packets");
        let data = packet
            .into_pcap_record::<byteorder::BigEndian>()
            .expect("Failed to convert to pcap record");
        let mut cursor = Cursor::new(data);
        let ts_sec = cursor
            .read_u32::<byteorder::BigEndian>()
            .expect("Failed to read");
        let ts_usec = cursor
            .read_u32::<byteorder::BigEndian>()
            .expect("Failed to read");
        let actual_length = cursor
            .read_u32::<byteorder::BigEndian>()
            .expect("Failed to read");
        assert_eq!(
            ts_sec as u64 * 1_000_000 as u64 + ts_usec as u64,
            1513735120021685
        );
        assert_eq!(actual_length, 54);
    }

    #[test]
    fn packets_from_large_file() {
        let _ = env_logger::try_init();

        let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
            .join("resources")
            .join("4SICS-GeekLounge-151020.pcap");

        info!("Testing against {:?}", pcap_path);

        let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
            .expect("No handle created");

        let packets = smol::block_on(async move {
            let packet_provider =
                PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
            let fut_packets = packet_provider.collect::<Vec<_>>();
            let packets: Vec<_> = fut_packets
                .await
                .into_iter()
                .flatten()
                .flatten()
                .filter(|p| p.data().len() == p.actual_length() as usize)
                .collect();

            handle.interrupt();

            packets
        });

        assert_eq!(packets.len(), 246137);
    }

    #[test]
    fn packets_from_file_next() {
        let _ = env_logger::try_init();

        let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
            .join("resources")
            .join("canary.pcap");

        info!("Testing against {:?}", pcap_path);

        let packets = smol::block_on(async move {
            let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
                .expect("No handle created");

            let packet_provider =
                PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
            let fut_packets = async move {
                let mut packet_provider = packet_provider.boxed();
                let mut packets = vec![];
                while let Some(p) = packet_provider.next().await {
                    packets.extend(p);
                }
                packets
            };
            let packets = fut_packets
                .await
                .into_iter()
                .flatten()
                .filter(|p| p.data().len() == p.actual_length() as _)
                .count();

            handle.interrupt();

            packets
        });

        assert_eq!(packets, 10);
    }

    #[test]
    fn packets_from_lookup() {
        let _ = env_logger::try_init();

        let handle = Handle::lookup().expect("No handle created");

        let stream = PacketStream::new(Config::default(), handle);

        assert!(
            stream.is_ok(),
            format!("Could not build stream {}", stream.err().unwrap())
        );

        let mut stream = stream.unwrap();

        smol::block_on(async move { stream.next().await })
            .unwrap()
            .unwrap();
    }

    #[test]
    fn packets_from_lookup_with_bpf() {
        let _ = env_logger::try_init();

        let mut cfg = Config::default();
        cfg.with_bpf(
            "(not (net 172.16.0.0/16 and port 443)) and (not (host 172.17.76.33 and port 443))"
                .to_owned(),
        );
        let handle = Handle::lookup().expect("No handle created");

        let stream = PacketStream::new(cfg, handle);

        assert!(
            stream.is_ok(),
            format!("Could not build stream {}", stream.err().unwrap())
        );
    }
}