use pnet::{
datalink::DataLinkReceiver,
packet::{
ethernet::{EtherTypes, EthernetPacket},
ip::IpNextHeaderProtocols,
ipv4::Ipv4Packet,
tcp::TcpPacket,
},
};
use python_comm::raise_error_use::*;
use std::{
net::Ipv4Addr,
sync::mpsc::Receiver,
time::{Duration, Instant},
};
pub use crate::send::create_l2_channel;
pub enum RecvPacket {
Count,
Discard,
Exit,
}
struct RecvTcp<'a, F>
where
F: FnMut(&[u8], usize) -> RecvPacket,
{
msg_in: Option<&'a Receiver<String>>, count: u64, timeout: Option<Duration>, src_ip: Option<Ipv4Addr>, dst_ip: Option<Ipv4Addr>, src_port: Option<u16>, dst_port: Option<u16>, handle_func: F, }
impl<'a, F> RecvTcp<'a, F>
where
F: FnMut(&[u8], usize) -> RecvPacket,
{
#[auto_func_name]
fn __call__(&mut self, net_rx: &mut Box<dyn DataLinkReceiver>) -> Result<u64, anyhow::Error> {
let mut count: u64 = 0;
let start = Instant::now();
loop {
if let Ok(packet) = net_rx.next() {
if let Some(frame) = EthernetPacket::new(packet) {
if frame.get_ethertype() == EtherTypes::Ipv4 {
match self.recv_ipv4(packet) {
RecvPacket::Count => {
count += 1;
}
RecvPacket::Discard => {}
RecvPacket::Exit => {
return Ok(count);
}
}
}
}
}
if self.msg_in.is_some() && self.handle_msg_and_exit() {
return Ok(count);
}
if self.count > 0 && count >= self.count {
return Ok(count);
}
if self.timeout.is_some() && self.timeout.unwrap() < Instant::now().duration_since(start) {
return raise_error!(__func__, "超时");
}
}
}
fn handle_msg_and_exit(&self) -> bool {
match &self.msg_in {
Some(msg_in) => match msg_in.try_recv() {
Ok(text) => text == "stop",
Err(_) => false,
},
None => false,
}
}
fn new(
msg_in: Option<&'a Receiver<String>>,
count: u64,
timeout: Option<Duration>,
src_ip: Option<Ipv4Addr>,
dst_ip: Option<Ipv4Addr>,
src_port: Option<u16>,
dst_port: Option<u16>,
handle_func: F,
) -> Self {
Self {
msg_in,
count,
timeout,
src_ip,
dst_ip,
src_port,
dst_port,
handle_func,
}
}
fn recv_ipv4(&mut self, packet: &[u8]) -> RecvPacket {
if packet.len() <= 14 {
return RecvPacket::Discard;
}
if let Some(ip_header) = Ipv4Packet::new(&packet[14..]) {
let length: usize = ((ip_header.get_header_length() & 0x0F) << 2) as usize;
if ip_header.get_next_level_protocol() != IpNextHeaderProtocols::Tcp {}
if self.src_ip.is_some() && self.src_ip.unwrap() != ip_header.get_source() {}
if self.dst_ip.is_some() && self.dst_ip.unwrap() != ip_header.get_destination() {}
return self.recv_tcp(packet, 14 + length);
}
RecvPacket::Discard
}
fn recv_tcp(&mut self, packet: &[u8], tcp_offset: usize) -> RecvPacket {
if packet.len() <= tcp_offset {
return RecvPacket::Discard;
}
if let Some(tcp_header) = TcpPacket::new(&packet[tcp_offset..]) {
if self.src_port.is_some() && self.src_port != Some(tcp_header.get_source()) {
return RecvPacket::Discard;
}
if self.dst_port.is_some() && self.dst_port != Some(tcp_header.get_destination()) {
return RecvPacket::Discard;
}
return (self.handle_func)(packet, tcp_offset);
}
RecvPacket::Discard
}
}
#[auto_func_name]
pub fn recv_tcp<F>(
net_rx: &mut Box<dyn DataLinkReceiver>,
msg_in: Option<&Receiver<String>>,
count: u64,
timeout: Option<Duration>,
src_ip: Option<Ipv4Addr>,
dst_ip: Option<Ipv4Addr>,
src_port: Option<u16>,
dst_port: Option<u16>,
handle_func: F,
) -> Result<u64, anyhow::Error>
where
F: FnMut(&[u8], usize) -> RecvPacket,
{
RecvTcp::new(msg_in, count, timeout, src_ip, dst_ip, src_port, dst_port, handle_func).__call__(net_rx)
}