ethrecv 0.0.1

Receive ethernet packets at a high rate.
Documentation
//! Windows packet receiver backend implementation.
//!
//! This backend uses an event semaphore, provided by pcap, to unblock the
//! blocking `pcap_next()` whenever the controller or idle thread requests some
//! action to be taken.

use std::sync::{
  atomic::{AtomicU32, Ordering},
  mpsc::Receiver,
  Arc
};

#[cfg(feature = "inspect")]
use std::time::Instant;

use bitflags::bitflags;

use pcap::Active;

use super::PacketHandler;

use crate::err::Error;

#[cfg(feature = "idle")]
use crate::idlemon;

use super::Msg;

bitflags! {
  pub(crate) struct CmdFlags: u32 {
    const KILL =    0b00000001;
    #[cfg(feature = "inspect")]
    const INSPECT = 0b00000010;
    #[cfg(feature = "idle")]
    const IDLE    = 0b00000100;
    const CHANNEL = 0b00001000;

    const ALL = {
      let fl = 0;
      let fl = fl | Self::KILL.bits();
      #[cfg(feature = "inspect")]
      let fl = fl | Self::INSPECT.bits();
      #[cfg(feature = "idle")]
      let fl = fl | Self::IDLE.bits();
      let fl = fl | Self::CHANNEL.bits();
      fl
    };
  }
}


pub(super) struct RunParams {
  pub(super) cmdreq: Arc<AtomicU32>,

  #[cfg(feature = "idle")]
  pub(super) idle_sh: Option<Arc<idlemon::Shared>>,

  pub(super) ch_rx: Receiver<Msg>
}

pub(super) fn run<E>(
  mut cap: pcap::Capture<Active>,
  mut handler: impl PacketHandler<Error = E> + Send + 'static,
  rp: RunParams
) -> Result<(), Error<E>> {
  #[cfg(feature = "inspect")]
  let start_time = Instant::now();

  #[cfg(feature = "inspect")]
  let mut num_raw_pkts: u64 = 0;

  #[cfg(feature = "inspect")]
  let mut raw_pkt_bytes: u64 = 0;

  let res = 'outer: loop {
    let res = cap.next_packet();
    match res {
      Ok(pkt) => {
        #[cfg(feature = "inspect")]
        {
          num_raw_pkts = num_raw_pkts.wrapping_add(1);
          raw_pkt_bytes = raw_pkt_bytes.wrapping_add(pkt.len() as u64);
        }

        #[cfg(feature = "idle")]
        if let Some(ref idle_sh) = rp.idle_sh {
          idle_sh.touch();
        }

        //println!("Got a packet!");
        if let Err(e) = handler.proc(pkt) {
          break 'outer Err(Error::App(e));
        }
      }
      Err(pcap::Error::TimeoutExpired) => {
        // woken up
        //eprintln!("No more packets to read");
        //break;
      }
      Err(_e) => {
        //eprintln!("Got error: {}", e);
        //break;
      }
    }

    //
    // Process command requests
    //
    const MASK: u32 = !CmdFlags::ALL.bits();
    // ToDo: Is there a more efficient ordering for this?
    let cmd = rp.cmdreq.fetch_and(MASK, Ordering::SeqCst);

    if cmd & CmdFlags::KILL.bits() != 0 {
      break Ok(());
    }

    #[cfg(feature = "inspect")]
    if cmd & CmdFlags::INSPECT.bits() != 0 {
      let stats = cap.stats().unwrap();

      //println!("Got inspection event");
      let info = super::RecvInfo {
        overflow_dropped: stats.dropped,
        if_dropped: stats.if_dropped,
        num_raw_pkts,
        raw_pkt_bytes,
        runtime: Instant::now() - start_time
      };
      handler.inspect(&info);
    }

    #[cfg(feature = "idle")]
    if cmd & CmdFlags::IDLE.bits() != 0 {
      if let Err(e) = handler.idle() {
        break 'outer Err(Error::App(e));
      }
    }

    if cmd & CmdFlags::CHANNEL.bits() != 0 {
      while let Ok(msg) = rp.ch_rx.try_recv() {
        match msg {
          #[cfg(feature = "inspect")]
          Msg::Inspect(tx) => {
            let stats = cap.stats().unwrap();

            //println!("Got inspection event via channel");
            let info = super::RecvInfo {
              overflow_dropped: stats.dropped,
              if_dropped: stats.if_dropped,
              num_raw_pkts,
              raw_pkt_bytes,
              runtime: Instant::now() - start_time
            };

            if let Err(e) = tx.send(info) {
              break 'outer Err(Error::Internal(format!(
                "Unable to respond to channel request; {}",
                e
              )));
            }
          }
        }
      }
    }
  };

  handler.shutdown();

  res
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :