ethrecv 0.0.1

Receive ethernet packets at a high rate.
Documentation
//! _ethrecv_ can be used to receive ethernet packets, with the goal of being
//! able to do so at a high rate.  It runs a receiver loop in a dedicated
//! thread and passes packets to a application callback implemented using the
//! trait [`PacketHandler`].
//!
//! It optionally supports an `inspect` callback, which passes some receiver
//! statistics to a callback.  It also supports an optional `idle` callback
//! which will be called whenever the receiver has been idle for a configurable
//! amount of time (basically a debouncer for detecting an idle period).
//!
//! # Features
//! | Feature    | Function
//! |------------|----------
//! | `idle`     | Enable support for [`PacketHandler::idle()`].
//! | `inspect`  | Enable support for [`PacketHandler::inspect()`].

#![cfg_attr(docsrs, feature(doc_cfg))]

#[cfg(unix)]
mod unix;

#[cfg(windows)]
mod win;

#[cfg(feature = "idle")]
mod idlemon;

mod cmdsig;
mod err;

use std::{
  sync::{mpsc::Sender, Arc},
  thread
};

#[cfg(any(feature = "idle", feature = "inspect"))]
use std::time::Duration;

#[cfg(unix)]
use parking_lot::Mutex;

#[cfg(windows)]
use std::sync::atomic::AtomicU32;

pub use err::Error;

use cmdsig::CmdSignal;


/// Messages that can be sent back to controller from recever thread.
enum Msg {
  #[cfg(feature = "inspect")]
  Inspect(std::sync::mpsc::Sender<RecvInfo>)
}

#[cfg(feature = "inspect")]
#[derive(Debug)]
pub struct RecvInfo {
  pub overflow_dropped: u32,
  pub if_dropped: u32,
  pub num_raw_pkts: u64,
  pub raw_pkt_bytes: u64,
  pub runtime: Duration
}


/// Application-defined packet handler callback.
pub trait PacketHandler {
  type Error;

  /// Called on the receiver thread before the processing reading and
  /// processing loop has been entered.
  fn init(&mut self) -> Result<(), Self::Error> {
    Ok(())
  }

  /// Called to process packets.
  fn proc(&mut self, pkt: pcap::Packet) -> Result<(), Self::Error>;

  /// Called whenever a timeout has been reached without any new packets
  /// arriving.
  #[cfg(feature = "idle")]
  #[cfg_attr(docsrs, doc(cfg(feature = "idle")))]
  fn idle(&mut self) -> Result<(), Self::Error> {
    Ok(())
  }

  /// Called when the controller has requested an inspection.
  #[cfg(feature = "inspect")]
  #[cfg_attr(docsrs, doc(cfg(feature = "inspect")))]
  #[allow(unused_variables)]
  fn inspect(&self, info: &RecvInfo) {}

  /// Called on the receiver thread once the main loop been been terminated.
  fn shutdown(&mut self) {}
}

/// A builder-like object for initializing the packet receiver thread.
pub struct RecvThread {
  devname: String,
  bufsize: i32,

  #[cfg(feature = "idle")]
  idle_dur: Option<Duration>
}

impl RecvThread {
  pub fn new(devname: &str) -> Self {
    Self {
      devname: devname.to_string(),
      bufsize: 16 * 1024 * 1024,
      #[cfg(feature = "idle")]
      idle_dur: None
    }
  }

  pub fn bufsize<E>(mut self, bufsize: usize) -> Result<Self, Error<E>> {
    self.bufsize_r(bufsize)?;
    Ok(self)
  }

  pub fn bufsize_r<E>(
    &mut self,
    bufsize: usize
  ) -> Result<&mut Self, Error<E>> {
    const MIN: usize = 256 * 1024;
    if bufsize < MIN {
      let m = format!("Buffer size must be at least {}", MIN);
      Err(Error::bad_arg(m))
    } else {
      self.bufsize = i32::try_from(bufsize).map_err(|e| Error::bad_arg(e))?;
      Ok(self)
    }
  }

  #[cfg(feature = "idle")]
  #[cfg_attr(docsrs, doc(cfg(feature = "idle")))]
  pub fn idle_duration(mut self, dur: Duration) -> Self {
    self.idle_dur = Some(dur);
    self
  }

  #[cfg(feature = "idle")]
  #[cfg_attr(docsrs, doc(cfg(feature = "idle")))]
  pub fn idle_duration_r(&mut self, dur: Duration) -> &mut Self {
    self.idle_dur = Some(dur);
    self
  }
}


impl RecvThread {
  pub fn run<E>(
    self,
    mut handler: impl PacketHandler<Error = E> + Send + 'static
  ) -> Result<Controller<E>, Error<E>>
  where
    E: Send + 'static
  {
    let cap = pcap::Capture::from_device(self.devname.as_ref())
      .map_err(|e| Error::NetIface(e.to_string()))?
      .buffer_size(self.bufsize);
    //.timeout(100);
    #[cfg(windows)]
    let cap = cap.immediate_mode(true);
    let cap = cap.open().map_err(|e| Error::NetIface(e.to_string()))?;

    #[cfg(unix)]
    let (ctl_tx, ctl_rx) = mio::unix::pipe::new().unwrap();

    #[cfg(unix)]
    let ctl_tx = Arc::new(Mutex::new(ctl_tx));

    #[cfg(windows)]
    let (cmdreq, hev_wakeup) =
      (Arc::new(AtomicU32::new(0)), unsafe { cap.get_event() });

    let cmdsig = CmdSignal {
      #[cfg(unix)]
      ctl_tx,
      #[cfg(windows)]
      cmdreq: Arc::clone(&cmdreq),
      #[cfg(windows)]
      hev_wakeup
    };

    // Channel used to send requests to receiver thread.
    let (ch_tx, ch_rx) = std::sync::mpsc::channel::<Msg>();

    #[cfg(feature = "idle")]
    let cmdsig2 = cmdsig.clone();

    let jh = thread::spawn(move || {
      handler.init().map_err(|e| Error::App(e))?;

      // If the "idle" feature is used, then kick off the idle monitoring
      // thread.
      #[cfg(feature = "idle")]
      let idle_res = if let Some(dur) = self.idle_dur {
        let r = idlemon::run(dur, cmdsig2);
        Some(r)
      } else {
        None
      };

      #[cfg(unix)]
      let ret = {
        #[cfg(feature = "idle")]
        let idle_sh = if let Some((idle_sh, _)) = &idle_res {
          Some(Arc::clone(idle_sh))
        } else {
          None
        };

        let rp = unix::RunParams {
          ctl_rx,
          #[cfg(feature = "idle")]
          idle_sh,
          ch_rx
        };
        unix::run(cap, handler, rp)
      };

      #[cfg(windows)]
      let ret = {
        #[cfg(feature = "idle")]
        let idle_sh = if let Some((idle_sh, _)) = &idle_res {
          Some(Arc::clone(idle_sh))
        } else {
          None
        };

        let rp = win::RunParams {
          cmdreq,
          #[cfg(feature = "idle")]
          idle_sh,
          ch_rx
        };

        win::run(cap, handler, rp)
      };

      // {unix,win}::run() calls handler.shutdown() because the handler's
      // ownership was passed to it.

      // Kill idle monitoring thread before terminating the receiver thread
      #[cfg(feature = "idle")]
      if let Some((idle_sh, idle_jh)) = idle_res {
        idle_sh.kill();
        let _ = idle_jh.join();
      }

      ret
    });

    Ok(Controller {
      jh: Some(jh),
      cmdsig,
      ch_tx
    })
  }
}


/// A controller that can be used to interact with the receiver thread.
pub struct Controller<E> {
  jh: Option<thread::JoinHandle<Result<(), Error<E>>>>,

  cmdsig: CmdSignal,

  #[allow(dead_code)]
  ch_tx: Sender<Msg>
}

impl<E> Controller<E>
where
  E: 'static
{
  /// Signal the receiver thread that it should call the
  /// [`PacketHandler::inspect()`] callback.
  ///
  /// This call is asynchronous with regards to the callback; the call may
  /// return to the caller before the receiver has received the request.
  #[cfg(feature = "inspect")]
  #[cfg_attr(docsrs, doc(cfg(feature = "inspect")))]
  pub fn signal_inspect(&mut self) -> Result<(), Error<E>> {
    self.cmdsig.inspect();
    Ok(())
  }

  #[cfg(feature = "inspect")]
  #[cfg_attr(docsrs, doc(cfg(feature = "inspect")))]
  pub fn inspect(&self) -> Result<RecvInfo, Error<E>> {
    let (tx, rx) = std::sync::mpsc::channel();
    self
      .ch_tx
      .send(Msg::Inspect(tx))
      .map_err(|e| Error::Internal(e.to_string()))?;
    self.cmdsig.channel();

    rx.recv().map_err(|e| Error::Internal(e.to_string()))
  }

  /// Signal receiver thread to terminate.
  ///
  /// This call is asynchronous; at the time this function returns to the
  /// caller the termination request may not have reached the receiver thread.
  ///
  /// A call to this method should be followed by a call to
  /// [`Controller::wait()`] to wait for the receiver to report termination.
  pub fn signal_shutdown(&self) -> Result<(), Error<E>> {
    if let Some(ref _jh) = self.jh {
      // ToDo: Handle return value
      self.cmdsig.kill()?;
      Ok(())
    } else {
      Ok(())
    }
  }

  /// Wait for receiver to shut down.
  ///
  /// Blocks the calling thread, and assumes that the receiver will be killed
  /// by another source.
  ///
  /// Use [`Controller::signal_shutdown()`] to signal the receiver thread to
  /// shut down before calling `Controller::wait()`.
  ///
  /// To avoid having the application need perform the two-stage shutdown, use
  /// [`Controller::shutdown()`] instead.
  pub fn wait(mut self) -> Result<(), Error<E>> {
    if let Some(jh) = self.jh.take() {
      // unwrap() should be okay, because the application should be unable to
      // pass anything other than Error<E>.
      jh.join().map_err(|e| *e.downcast::<Error<E>>().unwrap())?
    } else {
      Ok(())
    }
  }

  /// Tell receiver thread to terminate and wait for for it to end.
  pub fn shutdown(mut self) -> Result<(), Error<E>> {
    if let Some(jh) = self.jh.take() {
      self.cmdsig.kill()?;

      // unwrap() should be okay, because the application should be unable to
      // pass anything other than Error<E>.
      jh.join().map_err(|e| *e.downcast::<Error<E>>().unwrap())?
    } else {
      Ok(())
    }
  }
}

impl<E> Drop for Controller<E> {
  fn drop(&mut self) {
    if let Some(jh) = self.jh.take() {
      match self.cmdsig.kill::<E>() {
        Ok(_) => {
          // Only wait for thread to terminate if posting the termination
          // request was successful.
          if let Err(e) = jh.join() {
            eprintln!("Receiver thread returned error; {:?}", e);
          }
        }
        Err(e) => {
          eprintln!(
            "Unable to send termination request to receiver thread; {}",
            e
          );
        }
      }
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :