portus 0.9.0

A Congestion Control Plane
Documentation
use clap::Arg;
use portus::ipc::{Backend, BackendSender, Blocking, Ipc, Nonblocking};
use std::convert::TryInto;
use std::sync::{Arc, atomic};
use std::thread;
use std::vec::Vec;
use time::Duration;

#[macro_use]
extern crate clap;

#[derive(Debug)]
struct TimeMsg(time::OffsetDateTime);

use std::io::prelude::*;
impl portus::serialize::AsRawMsg for TimeMsg {
    fn get_hdr(&self) -> (u8, u32, u32) {
        (0xff, portus::serialize::HDR_LENGTH + 8 + 4, 0)
    }

    fn get_u32s<W: Write>(&self, _: &mut W) -> portus::Result<()> {
        Ok(())
    }

    fn get_u64s<W: Write>(&self, _: &mut W) -> portus::Result<()> {
        Ok(())
    }

    fn get_bytes<W: Write>(&self, w: &mut W) -> portus::Result<()> {
        let msg = self.0.unix_timestamp_nanos().to_le_bytes();
        w.write_all(&msg[..])?;
        Ok(())
    }

    fn from_raw_msg(msg: portus::serialize::RawMsg) -> portus::Result<Self> {
        let b = msg.get_bytes()?;
        let ts = i128::from_le_bytes((&b[0..16]).try_into().unwrap());
        Ok(TimeMsg(time::OffsetDateTime::from_unix_timestamp_nanos(ts)))
    }
}

#[derive(Debug)]
struct NlTimeMsg {
    kern_rt: time::OffsetDateTime,
    kern_st: time::OffsetDateTime,
}
impl portus::serialize::AsRawMsg for NlTimeMsg {
    fn get_hdr(&self) -> (u8, u32, u32) {
        (0xff - 1, portus::serialize::HDR_LENGTH + 16 + 8, 0)
    }

    fn get_u32s<W: Write>(&self, _: &mut W) -> portus::Result<()> {
        Ok(())
    }

    fn get_u64s<W: Write>(&self, _: &mut W) -> portus::Result<()> {
        Ok(())
    }

    fn get_bytes<W: Write>(&self, w: &mut W) -> portus::Result<()> {
        let mut msg = [0u8; 32]; // 2x i128
        (&mut msg[0..16]).copy_from_slice(&self.kern_rt.unix_timestamp_nanos().to_le_bytes());
        (&mut msg[16..]).copy_from_slice(&self.kern_st.unix_timestamp_nanos().to_le_bytes());
        w.write_all(&msg[..])?;
        Ok(())
    }

    fn from_raw_msg(msg: portus::serialize::RawMsg) -> portus::Result<Self> {
        let b = msg.get_bytes()?;
        let rt_ts = i128::from_le_bytes((&b[0..16]).try_into().unwrap());
        let st_ts = i128::from_le_bytes((&b[16..]).try_into().unwrap());
        Ok(NlTimeMsg {
            kern_rt: time::OffsetDateTime::from_unix_timestamp_nanos(rt_ts),
            kern_st: time::OffsetDateTime::from_unix_timestamp_nanos(st_ts),
        })
    }
}

use portus::serialize::AsRawMsg;
use std::sync::mpsc;
fn bench<T: Ipc>(b: BackendSender<T>, mut l: Backend<T>, iter: u32) -> Vec<Duration> {
    (0..iter)
        .map(|_| {
            let then = time::OffsetDateTime::now_utc();
            let msg = portus::serialize::serialize(&TimeMsg(then)).expect("serialize");
            b.send_msg(&msg[..]).expect("send ts");
            if let (portus::serialize::Msg::Other(raw), _addr) = l.next().expect("receive echo") {
                let then = TimeMsg::from_raw_msg(raw).expect("get time from raw");
                time::OffsetDateTime::now_utc() - then.0
            } else {
                panic!("wrong type");
            }
        })
        .collect()
}

struct NlDuration(Duration, Duration, Duration);
macro_rules! netlink_bench {
    ($name: ident, $mode: ident) => {
        #[cfg(target_os = "linux")] // netlink is linux-only
        fn $name(iter: u32) -> Vec<NlDuration> {
            use std::process::Command;
            Command::new("sudo")
                .arg("rmmod")
                .arg("nltest")
                .output()
                .expect("rmmod failed");

            // make clean
            Command::new("make")
                .arg("clean")
                .current_dir("./src/ipc/test-nl-kernel")
                .output()
                .expect("make failed to start");

            // compile kernel module
            Command::new("make")
                .current_dir("./src/ipc/test-nl-kernel")
                .output()
                .expect("make failed to start");

            let (tx, rx) = mpsc::channel::<Vec<NlDuration>>();

            // listen
            let c1 = thread::spawn(move || {
                let mut buf = [0u8; 1024];
                let mut nl = portus::ipc::netlink::Socket::<$mode>::new()
                    .map(|sk| {
                        Backend::new(sk, Arc::new(atomic::AtomicBool::new(true)), &mut buf[..])
                    })
                    .expect("nl ipc initialization");
                tx.send(vec![]).expect("ok to insmod");
                nl.next().expect("receive echo");
                let sender = nl.sender(());
                let res = (0..iter)
                    .map(|_| {
                        let portus_send_time = time::OffsetDateTime::now_utc();
                        let msg = portus::serialize::serialize(&TimeMsg(portus_send_time))
                            .expect("serialize");

                        sender.send_msg(&msg[..]).expect("send ts");
                        if let (portus::serialize::Msg::Other(raw), _addr) =
                            nl.next().expect("recv echo")
                        {
                            let portus_rt = time::OffsetDateTime::now_utc();
                            let kern_recv_msg =
                                NlTimeMsg::from_raw_msg(raw).expect("get time from raw");
                            return NlDuration(
                                portus_rt - portus_send_time,
                                kern_recv_msg.kern_rt - portus_send_time,
                                portus_rt - kern_recv_msg.kern_st,
                            );
                        } else {
                            panic!("wrong type");
                        };
                    })
                    .collect();
                tx.send(res).expect("report rtts");
            });

            rx.recv().expect("wait to insmod");
            // load kernel module
            Command::new("sudo")
                .arg("insmod")
                .arg("./src/ipc/test-nl-kernel/nltest.ko")
                .output()
                .expect("insmod failed");

            c1.join().expect("join netlink thread");
            Command::new("sudo")
                .arg("rmmod")
                .arg("nltest")
                .output()
                .expect("rmmod failed");
            rx.recv().expect("get rtts")
        }

        #[cfg(not(target_os = "linux"))] // netlink is linux-only
        fn $name(_: u32) -> Vec<NlDuration> {
            vec![]
        }
    };
}

netlink_bench!(netlink_blocking, Blocking);
netlink_bench!(netlink_nonblocking, Nonblocking);

macro_rules! kp_bench {
    ($name: ident, $mode: ident) => {
        #[cfg(target_os = "linux")] // kp is linux-only
        fn $name(iter: u32) -> Vec<Duration> {
            use std::process::Command;
            let (tx, rx) = mpsc::channel::<Vec<Duration>>();

            Command::new("sudo")
                .arg("./ccp_kernel_unload")
                .current_dir("./src/ipc/test-char-dev/ccp-kernel")
                .output()
                .expect("unload failed");

            // make clean
            Command::new("make")
                .arg("clean")
                .current_dir("./src/ipc/test-char-dev/ccp-kernel")
                .output()
                .expect("make failed to start");

            // compile kernel module
            Command::new("make")
                .arg("ONE_PIPE=y")
                .current_dir("./src/ipc/test-char-dev/ccp-kernel")
                .output()
                .expect("make failed to start");

            Command::new("sudo")
                .arg("./ccp_kernel_load")
                .arg("ipc=1")
                .current_dir("./src/ipc/test-char-dev/ccp-kernel")
                .output()
                .expect("load failed");

            let c1 = thread::spawn(move || {
                let mut receive_buf = [0u8; 1024];
                let kp = portus::ipc::kp::Socket::<$mode>::new()
                    .map(|sk| {
                        Backend::new(
                            sk,
                            Arc::new(atomic::AtomicBool::new(true)),
                            &mut receive_buf[..],
                        )
                    })
                    .expect("kp ipc initialization");
                tx.send(bench(kp.sender(()), kp, iter))
                    .expect("report rtts");
            });

            c1.join().expect("join kp thread");
            Command::new("sudo")
                .arg("./ccp_kernel_unload")
                .current_dir("./src/ipc/test-char-dev/ccp-kernel")
                .output()
                .expect("unload failed");
            rx.recv().expect("get rtts")
        }

        #[cfg(not(target_os = "linux"))] // kp is linux-only
        fn $name(_: u32) -> Vec<Duration> {
            vec![]
        }
    };
}

kp_bench!(kp_blocking, Blocking);
kp_bench!(kp_nonblocking, Nonblocking);

macro_rules! unix_bench {
    ($name: ident, $mode: ident) => {
        fn $name(iter: u32) -> Vec<Duration> {
            let (tx, rx) = mpsc::channel::<Vec<Duration>>();
            let (ready_tx, ready_rx) = mpsc::channel::<bool>();

            // listen
            let c1 = thread::spawn(move || {
                let mut receive_buf = [0u8; 1024];
                let unix = portus::ipc::unix::Socket::<$mode>::new("bench_rx")
                    .map(|sk| {
                        Backend::new(
                            sk,
                            Arc::new(atomic::AtomicBool::new(true)),
                            &mut receive_buf[..],
                        )
                    })
                    .expect("unix ipc initialization");
                ready_rx.recv().expect("sync");
                tx.send(bench(
                    unix.sender(std::path::PathBuf::from("/tmp/ccp/bench_tx")),
                    unix,
                    iter,
                ))
                .expect("report rtts");
            });

            // echo-er
            let c2 = thread::spawn(move || {
                let sk = portus::ipc::unix::Socket::<Blocking>::new("bench_tx").expect("sk init");
                let mut buf = [0u8; 1024];
                ready_tx.send(true).expect("sync");
                for _ in 0..iter {
                    let (rcv, addr) = sk.recv(&mut buf[..]).expect("recv");
                    sk.send(&buf[..rcv], &addr).expect("echo");
                }
            });

            c1.join().expect("join thread");
            c2.join().expect("join echo thread");
            rx.recv().expect("get rtts")
        }
    };
}

unix_bench!(unix_blocking, Blocking);
unix_bench!(unix_nonblocking, Nonblocking);

arg_enum! {
    #[derive(PartialEq, Debug)]
    pub enum IpcType {
        Nl,
        Unix,
        Kp,
    }
}

#[cfg(target_os = "linux")]
fn nl_exp(trials: u32) {
    for t in netlink_nonblocking(trials).iter().map(|d| {
        (
            d.0.whole_nanoseconds(),
            d.1.whole_nanoseconds(),
            d.2.whole_nanoseconds(),
        )
    }) {
        println!("nl nonblk {:?} {:?} {:?}", t.0, t.1, t.2);
    }

    for t in netlink_blocking(trials).iter().map(|d| {
        (
            d.0.whole_nanoseconds(),
            d.1.whole_nanoseconds(),
            d.2.whole_nanoseconds(),
        )
    }) {
        println!("nl blk {:?} {:?} {:?}", t.0, t.1, t.2);
    }
}

#[cfg(not(target_os = "linux"))]
fn nl_exp(trials: u32) {
    netlink_blocking(trials);
    netlink_nonblocking(trials);
}

fn main() {
    let matches = clap::App::new("IPC Latency Benchmark")
        .version("0.2.0")
        .author("Akshay Narayan <akshayn@mit.edu>")
        .about("Benchmark of IPC Latency")
        .arg(
            Arg::with_name("iterations")
                .long("iterations")
                .short("i")
                .help("Specifies how many trials to run (default 100)")
                .default_value("100"),
        )
        .arg(
            Arg::with_name("implementation")
                .long("impl")
                .help("Specifies the type of ipc being benchmarked")
                .possible_values(&IpcType::variants())
                .case_insensitive(true)
                .multiple(true)
                .default_value("nl"),
        )
        .get_matches();

    let trials = u32::from_str_radix(matches.value_of("iterations").unwrap(), 10)
        .expect("iterations must be integral");

    let imps = values_t!(matches.values_of("implementation"), IpcType).unwrap();

    println!("Impl Mode Rtt To From");
    if imps.contains(&IpcType::Unix) {
        for t in unix_nonblocking(trials)
            .iter()
            .map(|d| d.whole_nanoseconds())
        {
            println!("unix nonblk {:?} 0 0", t);
        }

        for t in unix_blocking(trials).iter().map(|d| d.whole_nanoseconds()) {
            println!("unix blk {:?} 0 0", t);
        }
    }

    if imps.contains(&IpcType::Nl) {
        nl_exp(trials);
    }

    if imps.contains(&IpcType::Kp) && cfg!(target_os = "linux") {
        for t in kp_nonblocking(trials).iter().map(|d| d.whole_nanoseconds()) {
            println!("kp nonblk {:?} 0 0", t);
        }

        for t in kp_blocking(trials).iter().map(|d| d.whole_nanoseconds()) {
            println!("kp blk {:?} 0 0", t);
        }
    }
}