mod emit;
mod filter;
mod filter_map;
mod for_each;
mod group_by;
mod inspect;
mod map;
mod poll;
mod replace;
mod rxtx;
mod send;
pub use self::emit::*;
pub use self::filter::*;
pub use self::filter_map::*;
pub use self::for_each::*;
pub use self::group_by::*;
pub use self::inspect::*;
pub use self::map::*;
pub use self::poll::*;
pub use self::replace::*;
pub use self::rxtx::*;
pub use self::send::*;
use crate::packets::Packet;
use crate::Mbuf;
use anyhow::{Error, Result};
use std::collections::HashMap;
use std::hash::Hash;
#[allow(missing_debug_implementations)]
pub enum Disposition<T: Packet> {
Act(T),
Emit,
Drop(Mbuf),
Abort(Error),
}
impl<T: Packet> Disposition<T> {
fn map<U: Packet, F>(self, f: F) -> Disposition<U>
where
F: FnOnce(T) -> Disposition<U>,
{
match self {
Disposition::Act(packet) => f(packet),
Disposition::Emit => Disposition::Emit,
Disposition::Drop(mbuf) => Disposition::Drop(mbuf),
Disposition::Abort(err) => Disposition::Abort(err),
}
}
pub fn is_act(&self) -> bool {
matches!(self, Disposition::Act(_))
}
pub fn is_emit(&self) -> bool {
matches!(self, Disposition::Emit)
}
pub fn is_drop(&self) -> bool {
matches!(self, Disposition::Drop(_))
}
pub fn is_abort(&self) -> bool {
matches!(self, Disposition::Abort(_))
}
}
pub trait PacketRx {
fn receive(&mut self) -> Vec<Mbuf>;
}
pub trait PacketTx {
fn transmit(&mut self, packets: Vec<Mbuf>);
}
pub trait Batch {
type Item: Packet;
fn replenish(&mut self);
fn next(&mut self) -> Option<Disposition<Self::Item>>;
fn emit<Tx: PacketTx>(self, tx: Tx) -> Emit<Self, Tx>
where
Self: Sized,
{
Emit::new(self, tx)
}
#[inline]
fn filter<P>(self, predicate: P) -> Filter<Self, P>
where
P: FnMut(&Self::Item) -> bool,
Self: Sized,
{
Filter::new(self, predicate)
}
#[inline]
fn filter_map<T: Packet, F>(self, f: F) -> FilterMap<Self, T, F>
where
F: FnMut(Self::Item) -> Result<Either<T>>,
Self: Sized,
{
FilterMap::new(self, f)
}
#[inline]
fn map<T: Packet, F>(self, f: F) -> Map<Self, T, F>
where
F: FnMut(Self::Item) -> Result<T>,
Self: Sized,
{
Map::new(self, f)
}
#[inline]
fn for_each<F>(self, f: F) -> ForEach<Self, F>
where
F: FnMut(&Self::Item) -> Result<()>,
Self: Sized,
{
ForEach::new(self, f)
}
#[inline]
fn inspect<F>(self, f: F) -> Inspect<Self, F>
where
F: FnMut(&Disposition<Self::Item>),
Self: Sized,
{
Inspect::new(self, f)
}
#[inline]
fn group_by<D, S, C>(self, selector: S, composer: C) -> GroupBy<Self, D, S>
where
D: Eq + Clone + Hash,
S: Fn(&Self::Item) -> D,
C: FnOnce(&mut HashMap<Option<D>, Box<GroupByBatchBuilder<Self::Item>>>),
Self: Sized,
{
GroupBy::new(self, selector, composer)
}
fn replace<T: Packet, F>(self, f: F) -> Replace<Self, T, F>
where
F: FnMut(&Self::Item) -> Result<T>,
Self: Sized,
{
Replace::new(self, f)
}
#[inline]
fn send<Tx: PacketTx>(self, tx: Tx) -> Send<Self, Tx>
where
Self: Sized,
{
Batch::send_named(self, "default", tx)
}
#[inline]
fn send_named<Tx: PacketTx>(self, name: &str, tx: Tx) -> Send<Self, Tx>
where
Self: Sized,
{
Send::new(name.to_owned(), self, tx)
}
}
pub trait Pipeline: futures::Future<Output = ()> {
fn name(&self) -> &str;
fn run_once(&mut self);
}
pub fn splice<Rx: PacketRx + Unpin, Tx: PacketTx + Unpin>(rx: Rx, tx: Tx) -> impl Pipeline {
Poll::new(rx).send(tx)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::compose;
use crate::packets::ip::v4::Ipv4;
use crate::packets::ip::ProtocolNumbers;
use crate::packets::Ethernet;
use crate::testils::byte_arrays::{ICMPV4_PACKET, IPV4_TCP_PACKET, IPV4_UDP_PACKET};
use std::sync::mpsc::{self, TryRecvError};
fn new_batch(data: &[&[u8]]) -> impl Batch<Item = Mbuf> {
let packets = data
.iter()
.map(|bytes| Mbuf::from_bytes(bytes).unwrap())
.collect::<Vec<_>>();
let (mut tx, rx) = mpsc::channel();
tx.transmit(packets);
let mut batch = Poll::new(rx);
batch.replenish();
batch
}
#[capsule::test]
fn emit_batch() {
let (tx, mut rx) = mpsc::channel();
let mut batch = new_batch(&[&IPV4_UDP_PACKET])
.map(|p| p.parse::<Ethernet>())
.emit(tx)
.for_each(|_| panic!("emit broken!"));
assert!(batch.next().unwrap().is_emit());
assert_eq!(1, rx.receive().len());
}
#[capsule::test]
fn filter_batch() {
let mut batch = new_batch(&[&IPV4_UDP_PACKET]).filter(|_| true);
assert!(batch.next().unwrap().is_act());
let mut batch = new_batch(&[&IPV4_UDP_PACKET]).filter(|_| false);
assert!(batch.next().unwrap().is_drop());
}
#[capsule::test]
fn filter_map_batch() {
let mut batch = new_batch(&[&IPV4_UDP_PACKET, &ICMPV4_PACKET]).filter_map(|p| {
let v4 = p.parse::<Ethernet>()?.parse::<Ipv4>()?;
if v4.protocol() == ProtocolNumbers::Udp {
Ok(Either::Keep(v4))
} else {
Ok(Either::Drop(v4.reset()))
}
});
assert!(batch.next().unwrap().is_act());
assert!(batch.next().unwrap().is_drop());
assert!(batch.next().is_none());
}
#[capsule::test]
fn map_batch() {
let mut batch = new_batch(&[&IPV4_UDP_PACKET]).map(|p| p.parse::<Ethernet>());
assert!(batch.next().unwrap().is_act());
let mut batch = new_batch(&[&IPV4_UDP_PACKET]).map(|mut p| {
p.shrink(0, 999_999)?;
Ok(p)
});
assert!(batch.next().unwrap().is_abort());
}
#[capsule::test]
fn for_each_batch() {
let mut side_effect = false;
let mut batch = new_batch(&[&IPV4_UDP_PACKET]).for_each(|_| {
side_effect = true;
Ok(())
});
assert!(batch.next().unwrap().is_act());
assert!(side_effect);
}
#[capsule::test]
fn inspect_batch() {
let mut side_effect = false;
let mut batch = new_batch(&[&IPV4_UDP_PACKET]).inspect(|_| {
side_effect = true;
});
assert!(batch.next().unwrap().is_act());
assert!(side_effect);
}
#[capsule::test]
fn group_by_batch() {
let mut batch = new_batch(&[&IPV4_TCP_PACKET, &IPV4_UDP_PACKET, &ICMPV4_PACKET])
.map(|p| p.parse::<Ethernet>()?.parse::<Ipv4>())
.group_by(
|p| p.protocol(),
|groups| {
compose!( groups {
ProtocolNumbers::Tcp => |group| {
group.map(|mut p| {
p.set_ttl(1);
Ok(p)
})
}
ProtocolNumbers::Udp => |group| {
group.map(|mut p| {
p.set_ttl(2);
Ok(p)
})
}
_ => |group| {
group.filter(|_| {
false
})
}
})
},
);
let disp = batch.next().unwrap();
assert!(disp.is_act());
if let Disposition::Act(pkt) = disp {
assert_eq!(1, pkt.ttl());
}
let disp = batch.next().unwrap();
assert!(disp.is_act());
if let Disposition::Act(pkt) = disp {
assert_eq!(2, pkt.ttl());
}
assert!(batch.next().unwrap().is_drop());
}
#[capsule::test]
fn group_by_no_catchall() {
let mut batch = new_batch(&[&ICMPV4_PACKET])
.map(|p| p.parse::<Ethernet>()?.parse::<Ipv4>())
.group_by(
|p| p.protocol(),
|groups| {
compose!( groups {
ProtocolNumbers::Tcp => |group| {
group.filter(|_| false)
}
})
},
);
assert!(batch.next().unwrap().is_act());
}
#[capsule::test]
fn group_by_or() {
let mut batch = new_batch(&[&IPV4_TCP_PACKET, &IPV4_UDP_PACKET, &ICMPV4_PACKET])
.map(|p| p.parse::<Ethernet>()?.parse::<Ipv4>())
.group_by(
|p| p.protocol(),
|groups| {
compose!( groups {
ProtocolNumbers::Tcp, ProtocolNumbers::Udp => |group| {
group.map(|mut p| {
p.set_ttl(1);
Ok(p)
})
}
_ => |group| {
group.filter(|_| {
false
})
}
})
},
);
let disp = batch.next().unwrap();
assert!(disp.is_act());
if let Disposition::Act(pkt) = disp {
assert_eq!(1, pkt.ttl());
}
let disp = batch.next().unwrap();
assert!(disp.is_act());
if let Disposition::Act(pkt) = disp {
assert_eq!(1, pkt.ttl());
}
assert!(batch.next().unwrap().is_drop());
}
#[capsule::test]
fn group_by_or_no_catchall() {
let mut batch = new_batch(&[&IPV4_TCP_PACKET, &IPV4_UDP_PACKET])
.map(|p| p.parse::<Ethernet>()?.parse::<Ipv4>())
.group_by(
|p| p.protocol(),
|groups| {
compose!( groups {
ProtocolNumbers::Tcp, ProtocolNumbers::Udp => |group| {
group.map(|mut p| {
p.set_ttl(1);
Ok(p)
})
}
})
},
);
let disp = batch.next().unwrap();
assert!(disp.is_act());
if let Disposition::Act(pkt) = disp {
assert_eq!(1, pkt.ttl());
}
let disp = batch.next().unwrap();
assert!(disp.is_act());
if let Disposition::Act(pkt) = disp {
assert_eq!(1, pkt.ttl());
}
}
#[capsule::test]
fn group_by_fanout() {
let mut batch = new_batch(&[&IPV4_TCP_PACKET])
.map(|p| p.parse::<Ethernet>()?.parse::<Ipv4>())
.group_by(
|p| p.protocol(),
|groups| {
compose!( groups {
ProtocolNumbers::Tcp => |group| {
group.replace(|_| {
Mbuf::from_bytes(&IPV4_UDP_PACKET)?
.parse::<Ethernet>()?
.parse::<Ipv4>()
})
}
})
},
);
assert!(batch.next().unwrap().is_act());
assert!(batch.next().unwrap().is_drop());
assert!(batch.next().is_none());
}
#[capsule::test]
fn replace_batch() {
let mut batch =
new_batch(&[&IPV4_UDP_PACKET]).replace(|_| Mbuf::from_bytes(&IPV4_TCP_PACKET));
assert!(batch.next().unwrap().is_act());
assert!(batch.next().unwrap().is_drop());
assert!(batch.next().is_none());
}
#[capsule::test]
fn poll_fn_batch() {
let mut batch = poll_fn(|| vec![Mbuf::new().unwrap()]);
batch.replenish();
assert!(batch.next().unwrap().is_act());
assert!(batch.next().is_none());
}
#[capsule::test]
fn splice_pipeline() {
let (mut tx1, rx1) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
let mut pipeline = splice(rx1, tx2);
pipeline.run_once();
assert_eq!(TryRecvError::Empty, rx2.try_recv().unwrap_err());
let packet = Mbuf::from_bytes(&IPV4_UDP_PACKET).unwrap();
tx1.transmit(vec![packet]);
pipeline.run_once();
assert!(rx2.try_recv().is_ok());
}
}