ethrecv 0.0.1

Receive ethernet packets at a high rate.
Documentation
//! Unix packet receiver backend implementation.
//!
//! This backend uses `mio` to wait for either packets or intra-process
//! commands.

use std::{
  io::{ErrorKind, Read},
  os::fd::AsRawFd,
  sync::mpsc::Receiver
};

#[cfg(feature = "inspect")]
use std::time::Instant;

#[cfg(feature = "idle")]
use {crate::idlemon, std::sync::Arc};

use mio::{
  unix::{pipe, SourceFd},
  Events, Interest, Poll, Token
};

use pcap::Active;

use super::PacketHandler;

use crate::{err::Error, Msg};


pub(super) struct RunParams {
  pub(super) ctl_rx: pipe::Receiver,

  #[cfg(feature = "idle")]
  pub(super) idle_sh: Option<Arc<idlemon::Shared>>,

  pub(super) ch_rx: Receiver<Msg>
}


const PKTREADER: Token = Token(0);
const CONTROLLER: Token = Token(1);

pub(super) fn run<E>(
  cap: pcap::Capture<Active>,
  mut handler: impl PacketHandler<Error = E> + Send + 'static,
  mut rp: RunParams
) -> Result<(), Error<E>> {
  let mut poll = Poll::new().unwrap();
  let mut events = Events::with_capacity(128);

  // Make sure capture device is set to non-blocking
  let mut cap = cap.setnonblock().unwrap();

  // Make sure control pipe is non-blocking
  rp.ctl_rx.set_nonblocking(true)?;

  poll
    .registry()
    .register(
      &mut SourceFd(&cap.as_raw_fd()),
      PKTREADER,
      Interest::READABLE
    )
    .unwrap();

  poll
    .registry()
    .register(&mut rp.ctl_rx, CONTROLLER, Interest::READABLE)
    .unwrap();

  #[cfg(feature = "inspect")]
  let (start_time, mut num_raw_pkts, mut raw_pkt_bytes) =
    (Instant::now(), 0u64, 0u64);

  let res = 'outer: loop {
    //println!("poll()");
    poll.poll(&mut events, None).unwrap();
    for evt in events.iter() {
      match evt.token() {
        PKTREADER => loop {
          let res = cap.next_packet();
          match res {
            Ok(pkt) => {
              #[cfg(feature = "inspect")]
              {
                num_raw_pkts = num_raw_pkts.wrapping_add(1);
                raw_pkt_bytes = raw_pkt_bytes.wrapping_add(pkt.len() as u64);
              }

              #[cfg(feature = "idle")]
              if let Some(ref idle_sh) = rp.idle_sh {
                idle_sh.touch();
              }

              //println!("Got a packet!");
              if let Err(e) = handler.proc(pkt) {
                break 'outer Err(Error::App(e));
              }
            }
            Err(pcap::Error::TimeoutExpired) => {
              // Timeout here means "no more packets to read".
              //eprintln!("No more packets to read");
              break;
            }
            Err(_e) => {
              //eprintln!("Got error: {}", e);
              break;
            }
          }
        },
        CONTROLLER => {
          //println!("Got a controller event");

          let mut cmd = [0u8];

          'readctrls: loop {
            // Instead of reading a single byte each loop iteration this could
            // read multiple command bytes and then process each byte in a loop
            // instead.
            //
            // Rather pointless at this stage, because commands will be
            // unusual.
            match rp.ctl_rx.read(&mut cmd) {
              Ok(_n) => {
                // fall through
              }
              Err(e) => {
                let ek = e.kind();
                if ek == ErrorKind::WouldBlock {
                  // No more bytes to read.
                  break 'readctrls;
                } else {
                  eprintln!("Unexpected error {}", e);
                  break 'readctrls;
                }
              }
            }

            match cmd[0] {
              b't' => {
                // terminate
                //println!("Got termination event");
                break 'outer Ok(());
              }
              #[cfg(feature = "inspect")]
              b'i' => {
                // call inspect

                let stats = cap.stats().unwrap();

                //println!("Got inspection event");
                let info = super::RecvInfo {
                  overflow_dropped: stats.dropped,
                  if_dropped: stats.if_dropped,
                  num_raw_pkts,
                  raw_pkt_bytes,
                  runtime: Instant::now() - start_time
                };
                handler.inspect(&info);
              }
              #[cfg(feature = "idle")]
              b'd' => {
                // ToDo: Handle error
                let _ = handler.idle();
              }

              b'c' => {
                while let Ok(msg) = rp.ch_rx.try_recv() {
                  match msg {
                    #[cfg(feature = "inspect")]
                    Msg::Inspect(tx) => {
                      let stats = cap.stats().unwrap();

                      //println!("Got inspection event via channel");
                      let info = super::RecvInfo {
                        overflow_dropped: stats.dropped,
                        if_dropped: stats.if_dropped,
                        num_raw_pkts,
                        raw_pkt_bytes,
                        runtime: Instant::now() - start_time
                      };

                      if let Err(e) = tx.send(info) {
                        break 'outer Err(Error::Internal(format!(
                          "Unable to respond to channel request; {}",
                          e
                        )));
                      }
                    }
                  }
                }
              }

              _ => {
                unreachable!()
              }
            }
          }
        }
        _ => unreachable!()
      }
    }
  };

  handler.shutdown();

  res
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :