use crate::{
ReceiveAction,
error::Error,
fmt,
pdu_loop::{PduRx, PduTx},
std::ParkSignal,
};
use pnet_datalink::{self, Channel, DataLinkReceiver, DataLinkSender, channel};
use std::io;
use std::{sync::Arc, task::Waker, time::SystemTime};
fn get_tx_rx(
device: &str,
) -> Result<(Box<dyn DataLinkSender>, Box<dyn DataLinkReceiver>), std::io::Error> {
let interfaces = pnet_datalink::interfaces();
let interface = match interfaces.iter().find(|interface| interface.name == device) {
Some(interface) => interface,
None => {
fmt::error!("Could not find interface {device}");
fmt::error!("Available interfaces:");
for interface in interfaces.iter() {
fmt::error!("-> {} {}", interface.name, interface.description);
}
panic!();
}
};
let config = pnet_datalink::Config {
write_buffer_size: 16384,
read_buffer_size: 16384,
..Default::default()
};
let (tx, rx) = match channel(interface, config) {
Ok(Channel::Ethernet(tx, rx)) => (tx, rx),
Ok(_) => panic!("Unhandled channel type"),
Err(e) => return Err(e),
};
Ok((tx, rx))
}
#[derive(Copy, Clone, Debug, Default)]
pub struct TxRxTaskConfig {
pub spinloop: bool,
}
pub fn tx_rx_task_blocking<'sto>(
device: &str,
mut pdu_tx: PduTx<'sto>,
mut pdu_rx: PduRx<'sto>,
config: TxRxTaskConfig,
) -> Result<(PduTx<'sto>, PduRx<'sto>), io::Error> {
let signal = Arc::new(ParkSignal::new());
let waker = Waker::from(Arc::clone(&signal));
let mut cap = pcap::Capture::from_device(device)
.expect("Device")
.immediate_mode(true)
.open()
.expect("Open device")
.setnonblock()
.expect("Can't set non-blocking");
let mut sq = pcap::sendqueue::SendQueue::new(1024 * 1024).expect("Failed to create send queue");
let mut in_flight = 0usize;
loop {
fmt::trace!("Begin TX/RX iteration");
pdu_tx.replace_waker(&waker);
let mut sent_this_iter = 0usize;
while let Some(frame) = pdu_tx.next_sendable_frame() {
let idx = frame.storage_slot_index();
frame
.send_blocking(|frame_bytes| {
fmt::trace!("Send frame {:#04x}, {} bytes", idx, frame_bytes.len());
sq.queue(None, frame_bytes).expect("Enqueue");
Ok(frame_bytes.len())
})
.map_err(std::io::Error::other)?;
sent_this_iter += 1;
}
if sent_this_iter > 0 {
fmt::trace!("Send {} enqueued frames", sent_this_iter);
sq.transmit(&mut cap, pcap::sendqueue::SendSync::Off)
.expect("Transmit");
in_flight += sent_this_iter;
}
if in_flight > 0 {
debug_assert!(cap.is_nonblock(), "Must be in non-blocking mode");
fmt::trace!("{} frames are in flight", in_flight);
loop {
match cap.next_packet() {
Ok(packet) => {
let frame_buf = packet.data;
let frame_index = frame_buf
.get(0x11)
.ok_or_else(|| io::Error::other(Error::Internal))?;
let res = pdu_rx
.receive_frame(&frame_buf)
.map_err(|e| io::Error::other(e))?;
fmt::trace!(
"Received and {:?} frame {:#04x} ({} bytes)",
res,
frame_index,
packet.header.len
);
if res == ReceiveAction::Processed {
in_flight = in_flight
.checked_sub(1)
.expect("More frames processed than in flight");
}
}
Err(pcap::Error::NoMorePackets) => {
break;
}
Err(pcap::Error::TimeoutExpired) => {
break;
}
Err(e) => {
fmt::error!("Packet receive failed: {}", e);
return Err(io::Error::other(e));
}
}
}
}
else if !config.spinloop {
fmt::trace!("No frames in flight, waiting to be woken with new frames to send");
signal.wait();
if pdu_tx.should_exit() {
fmt::debug!("io_uring TX/RX was asked to exit");
break;
}
} else {
std::hint::spin_loop()
}
}
Ok((pdu_tx.release(), pdu_rx.release()))
}
pub fn ethercat_now() -> u64 {
let t = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
t.saturating_sub(946684800)
}