use std::convert::TryFrom;
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::marker::PhantomData;
use std::net::IpAddr;
use std::num::TryFromIntError;
use std::path::PathBuf;
use std::str::FromStr;
use etherparse::{
Ethernet2Header, InternetSlice, Ipv4Header, Ipv6Header, LinkSlice, SlicedPacket, TcpHeader, TransportSlice,
UdpHeader,
};
use ip_network::IpNetwork;
use pcap::{Activated, Capture, Device, Error as PCAPError, Packet};
use rtlola_interpreter::monitor::{Record, ValueProjection};
use rtlola_interpreter::time::TimeRepresentation;
use rtlola_interpreter::Value;
use crate::EventSource;
fn get_ethernet_header(packet: &SlicedPacket) -> Option<Ethernet2Header> {
match &packet.link {
Some(s) => {
match s {
LinkSlice::Ethernet2(hdr) => Some(hdr.to_header()),
}
},
None => None,
}
}
fn ethernet_source(packet: &SlicedPacket) -> Value {
if let Some(ethernet_header) = get_ethernet_header(packet) {
let values: Vec<Value> = ethernet_header
.source
.iter()
.map(|v: &u8| Value::Unsigned((*v).into()))
.collect();
Value::Tuple(values.into_boxed_slice())
} else {
Value::None
}
}
fn ethernet_destination(packet: &SlicedPacket) -> Value {
if let Some(ethernet_header) = get_ethernet_header(packet) {
let values: Vec<Value> = ethernet_header
.destination
.iter()
.map(|v: &u8| Value::Unsigned((*v).into()))
.collect();
Value::Tuple(values.into_boxed_slice())
} else {
Value::None
}
}
fn ethernet_type(packet: &SlicedPacket) -> Value {
if let Some(ethernet_header) = get_ethernet_header(packet) {
Value::Unsigned(ethernet_header.ether_type.into())
} else {
Value::None
}
}
fn get_ipv4_header(packet: &SlicedPacket) -> Option<Ipv4Header> {
match &packet.ip {
Some(int_slice) => {
match int_slice {
InternetSlice::Ipv4(h, _) => Some(h.to_header()),
InternetSlice::Ipv6(_, _) => None,
}
},
None => None,
}
}
fn ipv4_source(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
let values: Vec<Value> = header
.source
.iter()
.map(|v: &u8| Value::Unsigned((*v).into()))
.collect();
Value::Tuple(values.into_boxed_slice())
} else {
Value::None
}
}
fn ipv4_destination(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
let values: Vec<Value> = header
.destination
.iter()
.map(|v: &u8| Value::Unsigned((*v).into()))
.collect();
Value::Tuple(values.into_boxed_slice())
} else {
Value::None
}
}
fn ipv4_ihl(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
Value::Unsigned(header.ihl().into())
} else {
Value::None
}
}
fn ipv4_dscp(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
Value::Unsigned(header.differentiated_services_code_point.into())
} else {
Value::None
}
}
fn ipv4_ecn(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
Value::Unsigned(header.explicit_congestion_notification.into())
} else {
Value::None
}
}
fn ipv4_length(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
Value::Unsigned(header.total_len().into())
} else {
Value::None
}
}
fn ipv4_id(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
Value::Unsigned(header.identification.into())
} else {
Value::None
}
}
fn ipv4_fragment_offset(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
Value::Unsigned(header.fragments_offset.into())
} else {
Value::None
}
}
fn ipv4_ttl(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
Value::Unsigned(header.time_to_live.into())
} else {
Value::None
}
}
fn ipv4_protocol(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
Value::Unsigned(header.protocol.into())
} else {
Value::None
}
}
fn ipv4_checksum(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
Value::Unsigned(header.header_checksum.into())
} else {
Value::None
}
}
fn ipv4_flags_df(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
Value::Bool(header.dont_fragment)
} else {
Value::None
}
}
fn ipv4_flags_mf(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv4_header(packet) {
Value::Bool(header.more_fragments)
} else {
Value::None
}
}
fn get_ipv6_header(packet: &SlicedPacket) -> Option<Ipv6Header> {
match &packet.ip {
Some(int_slice) => {
match int_slice {
InternetSlice::Ipv4(_, _) => None,
InternetSlice::Ipv6(h, _) => Some(h.to_header()),
}
},
None => None,
}
}
fn ipv6_source(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv6_header(packet) {
let values: Vec<Value> = header
.source
.iter()
.map(|v: &u8| Value::Unsigned((*v).into()))
.collect();
Value::Tuple(values.into_boxed_slice())
} else {
Value::None
}
}
fn ipv6_destination(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv6_header(packet) {
let values: Vec<Value> = header
.destination
.iter()
.map(|v: &u8| Value::Unsigned((*v).into()))
.collect();
Value::Tuple(values.into_boxed_slice())
} else {
Value::None
}
}
fn ipv6_traffic_class(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv6_header(packet) {
Value::Unsigned(header.traffic_class.into())
} else {
Value::None
}
}
fn ipv6_flow_label(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv6_header(packet) {
Value::Unsigned(header.flow_label.into())
} else {
Value::None
}
}
fn ipv6_length(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv6_header(packet) {
Value::Unsigned(header.payload_length.into())
} else {
Value::None
}
}
fn ipv6_hop_limit(packet: &SlicedPacket) -> Value {
if let Some(header) = get_ipv6_header(packet) {
Value::Unsigned(header.hop_limit.into())
} else {
Value::None
}
}
fn get_tcp_header(packet: &SlicedPacket) -> Option<TcpHeader> {
use TransportSlice::*;
match &packet.transport {
Some(t) => {
match t {
Tcp(h) => Some(h.to_header()),
Udp(_) | Icmpv4(_) | Icmpv6(_) | Unknown(_) => None,
}
},
None => None,
}
}
fn tcp_source_port(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Unsigned(header.source_port.into())
} else {
Value::None
}
}
fn tcp_destination_port(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Unsigned(header.destination_port.into())
} else {
Value::None
}
}
fn tcp_seq_number(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Unsigned(header.sequence_number.into())
} else {
Value::None
}
}
fn tcp_ack_number(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Unsigned(header.acknowledgment_number.into())
} else {
Value::None
}
}
fn tcp_data_offset(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Unsigned(header.data_offset().into())
} else {
Value::None
}
}
fn tcp_flags_ns(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Bool(header.ns)
} else {
Value::None
}
}
fn tcp_flags_fin(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Bool(header.fin)
} else {
Value::None
}
}
fn tcp_flags_syn(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Bool(header.syn)
} else {
Value::None
}
}
fn tcp_flags_rst(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Bool(header.rst)
} else {
Value::None
}
}
fn tcp_flags_psh(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Bool(header.psh)
} else {
Value::None
}
}
fn tcp_flags_ack(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Bool(header.ack)
} else {
Value::None
}
}
fn tcp_flags_urg(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Bool(header.urg)
} else {
Value::None
}
}
fn tcp_flags_ece(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Bool(header.ece)
} else {
Value::None
}
}
fn tcp_flags_cwr(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Bool(header.cwr)
} else {
Value::None
}
}
fn tcp_window_size(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Unsigned(header.window_size.into())
} else {
Value::None
}
}
fn tcp_checksum(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Unsigned(header.checksum.into())
} else {
Value::None
}
}
fn tcp_urgent_pointer(packet: &SlicedPacket) -> Value {
if let Some(header) = get_tcp_header(packet) {
Value::Unsigned(header.urgent_pointer.into())
} else {
Value::None
}
}
fn get_udp_header(packet: &SlicedPacket) -> Option<UdpHeader> {
use TransportSlice::*;
match &packet.transport {
Some(t) => {
match t {
Tcp(_) | Icmpv4(_) | Icmpv6(_) | Unknown(_) => None,
Udp(h) => Some(h.to_header()),
}
},
None => None,
}
}
fn udp_source_port(packet: &SlicedPacket) -> Value {
if let Some(header) = get_udp_header(packet) {
Value::Unsigned(header.source_port.into())
} else {
Value::None
}
}
fn udp_destination_port(packet: &SlicedPacket) -> Value {
if let Some(header) = get_udp_header(packet) {
Value::Unsigned(header.destination_port.into())
} else {
Value::None
}
}
fn udp_length(packet: &SlicedPacket) -> Value {
if let Some(header) = get_udp_header(packet) {
Value::Unsigned(header.length.into())
} else {
Value::None
}
}
fn udp_checksum(packet: &SlicedPacket) -> Value {
if let Some(header) = get_udp_header(packet) {
Value::Unsigned(header.checksum.into())
} else {
Value::None
}
}
fn get_packet_payload(packet: &SlicedPacket) -> Value {
Value::Str(String::from_utf8_lossy(packet.payload).as_ref().into())
}
fn get_packet_protocol(packet: &SlicedPacket) -> Value {
match &packet.transport {
Some(transport) => {
match transport {
TransportSlice::Tcp(_) => Value::Str("TCP".into()),
TransportSlice::Udp(_) => Value::Str("UDP".into()),
TransportSlice::Icmpv4(_) => Value::Str("ICMPv4".into()),
TransportSlice::Icmpv6(_) => Value::Str("ICMPv6".into()),
TransportSlice::Unknown(_) => Value::Str("Unknown".into()),
}
},
None => {
match &packet.ip {
Some(ip) => {
match ip {
InternetSlice::Ipv4(_, _) => Value::Str("IPv4".into()),
InternetSlice::Ipv6(_, _) => Value::Str("IPv6".into()),
}
},
None => {
match &packet.link {
Some(link) => {
match link {
LinkSlice::Ethernet2(_) => Value::Str("Ethernet2".into()),
}
},
None => Value::Str("Unknown".into()),
}
},
}
},
}
}
#[derive(Debug)]
pub enum PcapError {
UnknownInput(String),
UnknownDevice(String, Vec<String>),
InvalidLocalNetwork(String, ip_network::IpNetworkParseError),
TimeParseError(TryFromIntError),
TimeFormatError(String),
Pcap(pcap::Error),
}
impl Display for PcapError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
PcapError::UnknownInput(i) => write!(f, "Malformed input name: {}", i),
PcapError::UnknownDevice(d, available) => {
write!(
f,
"Interface {} was not found. Available interfaces are: {}",
d,
available.join(", ")
)
},
PcapError::InvalidLocalNetwork(name, e) => {
write!(f, "Could not parse local network range: {}. Error: {}", *name, e)
},
PcapError::TimeParseError(e) => write!(f, "Could not parse timestamp from packet: {}", e),
PcapError::TimeFormatError(e) => write!(f, "Could not parse timestamp to time format: {}", e),
PcapError::Pcap(e) => write!(f, "Could not process packet: {}", e),
}
}
}
impl Error for PcapError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
PcapError::UnknownInput(_) => None,
PcapError::UnknownDevice(_, _) => None,
PcapError::InvalidLocalNetwork(_, e) => Some(e),
PcapError::TimeParseError(e) => Some(e),
PcapError::TimeFormatError(_) => None,
PcapError::Pcap(e) => Some(e),
}
}
}
pub struct PcapRecord(Vec<u8>);
impl Record for PcapRecord {
type CreationData = IpNetwork;
type Error = PcapError;
fn func_for_input(name: &str, data: Self::CreationData) -> Result<ValueProjection<Self, PcapError>, PcapError> {
let local_net = data;
let layers: Vec<&str> = name.split("::").collect();
if layers.len() > 3 || layers.is_empty() {
return Err(PcapError::UnknownInput(name.to_string()));
}
let get_packet_direction = move |packet: &SlicedPacket| -> Value {
let addr: IpAddr = match &packet.ip {
Some(ip) => {
match ip {
InternetSlice::Ipv4(header, _) => IpAddr::V4(header.destination_addr()),
InternetSlice::Ipv6(header, _) => IpAddr::V6(header.destination_addr()),
}
},
None => return Value::None,
};
if local_net.contains(addr) {
Value::Str("Incoming".into())
} else {
Value::Str("Outgoing".into())
}
};
let inner_fn: Box<dyn Fn(&SlicedPacket) -> Value> = match layers[0] {
"Ethernet" => {
if layers.len() != 2 {
return Err(PcapError::UnknownInput(name.to_string()));
};
match layers[1] {
"source" => Box::new(ethernet_source),
"destination" => Box::new(ethernet_destination),
"type" => Box::new(ethernet_type),
_ => {
return Err(PcapError::UnknownInput(name.to_string()));
},
}
},
"IPv4" => {
if layers.len() < 2 {
return Err(PcapError::UnknownInput(name.to_string()));
};
match layers[1] {
"source" => Box::new(ipv4_source),
"destination" => Box::new(ipv4_destination),
"ihl" => Box::new(ipv4_ihl),
"dscp" => Box::new(ipv4_dscp),
"ecn" => Box::new(ipv4_ecn),
"length" => Box::new(ipv4_length),
"identification" => Box::new(ipv4_id),
"fragment_offset" => Box::new(ipv4_fragment_offset),
"ttl" => Box::new(ipv4_ttl),
"protocol" => Box::new(ipv4_protocol),
"checksum" => Box::new(ipv4_checksum),
"flags" => {
if layers.len() < 3 {
return Err(PcapError::UnknownInput(name.to_string()));
}
match layers[2] {
"df" => Box::new(ipv4_flags_df),
"mf" => Box::new(ipv4_flags_mf),
_ => {
return Err(PcapError::UnknownInput(name.to_string()));
},
}
},
_ => {
return Err(PcapError::UnknownInput(name.to_string()));
},
}
},
"IPv6" => {
if layers.len() < 2 {
return Err(PcapError::UnknownInput(name.to_string()));
};
match layers[1] {
"source" => Box::new(ipv6_source),
"destination" => Box::new(ipv6_destination),
"traffic_class" => Box::new(ipv6_traffic_class),
"flow_label" => Box::new(ipv6_flow_label),
"length" => Box::new(ipv6_length),
"hop_limit" => Box::new(ipv6_hop_limit),
_ => {
return Err(PcapError::UnknownInput(name.to_string()));
},
}
},
"TCP" => {
if layers.len() < 2 {
return Err(PcapError::UnknownInput(name.to_string()));
};
match layers[1] {
"source" => Box::new(tcp_source_port),
"destination" => Box::new(tcp_destination_port),
"seq_number" => Box::new(tcp_seq_number),
"ack_number" => Box::new(tcp_ack_number),
"data_offset" => Box::new(tcp_data_offset),
"window_size" => Box::new(tcp_window_size),
"checksum" => Box::new(tcp_checksum),
"urgent_pointer" => Box::new(tcp_urgent_pointer),
"flags" => {
if layers.len() < 3 {
return Err(PcapError::UnknownInput(name.to_string()));
};
match layers[2] {
"ns" => Box::new(tcp_flags_ns),
"fin" => Box::new(tcp_flags_fin),
"syn" => Box::new(tcp_flags_syn),
"rst" => Box::new(tcp_flags_rst),
"psh" => Box::new(tcp_flags_psh),
"ack" => Box::new(tcp_flags_ack),
"urg" => Box::new(tcp_flags_urg),
"ece" => Box::new(tcp_flags_ece),
"cwr" => Box::new(tcp_flags_cwr),
_ => {
return Err(PcapError::UnknownInput(name.to_string()));
},
}
},
_ => {
return Err(PcapError::UnknownInput(name.to_string()));
},
}
},
"UDP" => {
if layers.len() < 2 {
return Err(PcapError::UnknownInput(name.to_string()));
};
match layers[1] {
"source" => Box::new(udp_source_port),
"destination" => Box::new(udp_destination_port),
"length" => Box::new(udp_length),
"checksum" => Box::new(udp_checksum),
_ => {
return Err(PcapError::UnknownInput(name.to_string()));
},
}
},
"payload" => Box::new(get_packet_payload),
"direction" => Box::new(get_packet_direction),
"protocol" => Box::new(get_packet_protocol),
_ => {
return Err(PcapError::UnknownInput(name.to_string()));
},
};
Ok(Box::new(move |packet: &PcapRecord| {
let packet = SlicedPacket::from_ethernet(packet.0.as_slice()).expect("Could not parse packet!");
Ok(inner_fn(&packet))
}))
}
}
#[derive(Debug, Clone)]
pub enum PcapInputSource {
Device {
name: String,
local_network: String,
},
File {
path: PathBuf,
local_network: String,
},
}
#[allow(missing_debug_implementations)] pub struct PcapEventSource<InputTime: TimeRepresentation> {
capture_handle: Capture<dyn Activated>,
timer: PhantomData<InputTime>,
local_net: IpNetwork,
}
impl<InputTime: TimeRepresentation> PcapEventSource<InputTime> {
pub fn setup(src: &PcapInputSource) -> Result<PcapEventSource<InputTime>, PcapError> {
let capture_handle = match src {
PcapInputSource::Device { name, .. } => {
let all_devices = Device::list().map_err(|e| PcapError::Pcap(e))?;
let device_names: Vec<_> = all_devices.iter().map(|d| d.name.clone()).collect();
let dev: Device = all_devices
.into_iter()
.find(|d| &d.name == name)
.ok_or_else(|| PcapError::UnknownDevice(name.clone(), device_names))?;
let capture_handle = Capture::from_device(dev)
.map_err(|e| PcapError::Pcap(e))?
.promisc(true)
.snaplen(65535)
.open()
.map_err(|e| PcapError::Pcap(e))?;
capture_handle.into()
},
PcapInputSource::File { path, .. } => {
let capture_handle = Capture::from_file(path).map_err(|e| PcapError::Pcap(e))?;
capture_handle.into()
},
};
let local_network_range = match src {
PcapInputSource::Device { local_network, .. } => local_network,
PcapInputSource::File { local_network, .. } => local_network,
};
let local_net = IpNetwork::from_str(local_network_range.as_ref())
.map_err(|e| PcapError::InvalidLocalNetwork(local_network_range.clone(), e))?;
Ok(Self {
capture_handle,
timer: Default::default(),
local_net,
})
}
}
impl<InputTime: TimeRepresentation> EventSource<InputTime> for PcapEventSource<InputTime> {
type Error = PcapError;
type Rec = PcapRecord;
fn init_data(&self) -> Result<IpNetwork, PcapError> {
Ok(self.local_net)
}
fn next_event(&mut self) -> Result<Option<(PcapRecord, InputTime::InnerTime)>, PcapError> {
let raw_packet: Packet = match self.capture_handle.next_packet() {
Ok(pkt) => pkt,
Err(PCAPError::NoMorePackets) => return Ok(None),
Err(e) => return Err(PcapError::Pcap(e)),
};
let p = (*raw_packet).to_vec();
let (secs, nanos) = u64::try_from(raw_packet.header.ts.tv_sec)
.and_then(|secs| u32::try_from(raw_packet.header.ts.tv_usec * 1000).map(|sub_secs| (secs, sub_secs)))
.map_err(|e| PcapError::TimeParseError(e))?;
let time_str = format!("{}.{:09}", secs, nanos);
let time = InputTime::parse(&time_str).map_err(|e| PcapError::TimeFormatError(e))?;
Ok(Some((PcapRecord(p), time)))
}
}