use pnet::datalink;
use pnet::datalink::Channel::Ethernet;
use pnet::packet::ethernet::EthernetPacket;
use pnet::packet::ipv4::Ipv4Packet;
use pnet::packet::Packet;
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, RecvTimeoutError},
Arc,
};
use std::thread;
use std::time::Duration;
pub fn recv(device: String, dns_send: mpsc::Sender<Arc<Vec<u8>>>, running: Arc<AtomicBool>) {
let interfaces = datalink::interfaces();
let interface = match interfaces
.iter()
.find(|iface| iface.name == device && !iface.is_loopback())
{
Some(iface) => iface,
None => {
eprintln!("错误:网络接口 '{}' 未找到", device);
return;
}
};
let config = datalink::Config {
read_timeout: Some(Duration::from_millis(1000)), ..Default::default()
};
let (_, mut rx) = match datalink::channel(&interface, config) {
Ok(Ethernet(tx, rx)) => (tx, rx),
Ok(_) => {
eprintln!("错误:通道类型不支持");
return;
}
Err(e) => {
eprintln!("错误:创建数据链路通道失败: {}", e);
return;
}
};
let (packet_sender, packet_receiver) = mpsc::channel();
let running_clone = running.clone();
let capture_handle = thread::spawn(move || {
while running_clone.load(Ordering::Relaxed) {
match rx.next() {
Ok(packet) => {
if !running_clone.load(Ordering::Relaxed) {
break;
}
let packet_data = packet.to_vec();
if packet_sender.send(Ok(packet_data)).is_err() {
break;
}
}
Err(e) => {
if e.kind() == std::io::ErrorKind::TimedOut {
continue;
}
if packet_sender.send(Err(e)).is_err() {
break;
}
break; }
}
}
drop(rx);
});
let mut consecutive_errors = 0;
const MAX_CONSECUTIVE_ERRORS: u32 = 10;
while running.load(Ordering::Relaxed) {
match packet_receiver.recv_timeout(Duration::from_millis(500)) {
Ok(Ok(packet_data)) => {
consecutive_errors = 0;
if let Some(ethernet) = EthernetPacket::new(&packet_data) {
if let Some(ipv4) = Ipv4Packet::new(ethernet.payload()) {
let ipv4_data = ipv4.packet().to_vec();
let cloned_ipv4: Arc<Vec<u8>> = Arc::new(ipv4_data);
match dns_send.send(cloned_ipv4) {
Ok(_) => {}
Err(e) => {
println!("Failed to send packet: {}", e);
break; }
}
}
}
}
Ok(Err(e)) => {
consecutive_errors += 1;
println!(
"An error occurred when reading from the datalink channel: {}",
e
);
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
println!("Too many consecutive errors, stopping packet reception");
break;
}
}
Err(RecvTimeoutError::Timeout) => {
continue;
}
Err(RecvTimeoutError::Disconnected) => {
break;
}
}
}
drop(packet_receiver);
drop(dns_send);
if let Err(e) = capture_handle.join() {
println!("Warning: Failed to join packet capture thread: {:?}", e);
}
}