use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use crate::blk_worker::GuestMemWriter;
use crate::device::VirtioMmioState;
use crate::irq::Irq;
use crate::virtqueue_util;
const BATCH_SIZE: usize = 64;
const COALESCE_COUNT: u16 = 64;
const COALESCE_TIMEOUT: Duration = Duration::from_micros(50);
const POLL_TIMEOUT: Duration = Duration::from_millis(1);
const VIRTIO_NET_HDR_SIZE: usize = 12;
pub struct RxQueueConfig {
pub desc_gpa: u64,
pub avail_gpa: u64,
pub used_gpa: u64,
pub size: u16,
}
pub struct NetRxWorkerContext {
pub net_host_fd: i32,
pub guest_mem: GuestMemWriter,
pub rx_queue: RxQueueConfig,
pub mmio_state: Arc<RwLock<VirtioMmioState>>,
pub irq_callback: Arc<dyn Fn(Irq, bool) -> crate::error::Result<()> + Send + Sync>,
pub irq: Irq,
pub exit_vcpus: Arc<dyn Fn() + Send + Sync>,
pub running: Arc<AtomicBool>,
}
unsafe impl Send for NetRxWorkerContext {}
fn trigger_net_irq(ctx: &NetRxWorkerContext) {
if let Ok(mut s) = ctx.mmio_state.write() {
s.trigger_interrupt(1); }
let _ = (ctx.irq_callback)(ctx.irq, true);
(ctx.exit_vcpus)();
}
fn maybe_notify(ctx: &NetRxWorkerContext, old_used: u16, new_used: u16) {
if virtqueue_util::should_notify(
&ctx.guest_mem,
ctx.rx_queue.avail_gpa,
ctx.rx_queue.size,
old_used,
new_used,
) {
trigger_net_irq(ctx);
}
}
fn inject_one_frame(ctx: &NetRxWorkerContext, frame: &[u8], used_idx: &mut u16) -> bool {
let q_size = ctx.rx_queue.size as usize;
if q_size == 0 {
return false;
}
std::sync::atomic::fence(Ordering::Acquire);
let avail_idx = ctx.guest_mem.read_u16(ctx.rx_queue.avail_gpa as usize + 2);
if *used_idx == avail_idx {
return false; }
let ring_off = ctx.rx_queue.avail_gpa as usize + 4 + 2 * ((*used_idx as usize) % q_size);
let head_idx = ctx.guest_mem.read_u16(ring_off) as usize;
let total_len = VIRTIO_NET_HDR_SIZE + frame.len();
let mut written = 0;
let mut idx = head_idx;
let desc_base = ctx.rx_queue.desc_gpa as usize;
for _ in 0..q_size {
let d_off = desc_base + idx * 16;
let Some(desc_slice) = ctx.guest_mem.slice(d_off, 16) else {
break;
};
let addr_gpa = u64::from_le_bytes(desc_slice[0..8].try_into().unwrap()) as usize;
let len = u32::from_le_bytes(desc_slice[8..12].try_into().unwrap()) as usize;
let flags = u16::from_le_bytes(desc_slice[12..14].try_into().unwrap());
let next = u16::from_le_bytes(desc_slice[14..16].try_into().unwrap());
if flags & 2 != 0 && len > 0 {
let Some(buf) = (unsafe { ctx.guest_mem.slice_mut(addr_gpa, len) }) else {
continue;
};
let remaining = total_len.saturating_sub(written);
let to_write = remaining.min(len);
if written < VIRTIO_NET_HDR_SIZE {
let hdr_remaining = VIRTIO_NET_HDR_SIZE - written;
let hdr_bytes = hdr_remaining.min(to_write);
buf[..hdr_bytes].fill(0);
if written <= 10 && written + hdr_bytes > 10 {
let nb_off = 10 - written;
if nb_off + 2 <= hdr_bytes {
buf[nb_off..nb_off + 2].copy_from_slice(&1u16.to_le_bytes());
}
}
if written == 0
&& hdr_bytes >= 10
&& frame.len() > 1500
&& frame.len() >= 54
&& frame[12] == 0x08
&& frame[13] == 0x00
&& frame[23] == 6
&& frame[14] & 0x0F == 5
{
buf[0] = 1; buf[1] = 1; buf[2..4].copy_from_slice(&54u16.to_le_bytes()); buf[4..6].copy_from_slice(&1460u16.to_le_bytes()); buf[6..8].copy_from_slice(&34u16.to_le_bytes()); buf[8..10].copy_from_slice(&16u16.to_le_bytes()); }
let frame_bytes = to_write - hdr_bytes;
if frame_bytes > 0 {
buf[hdr_bytes..hdr_bytes + frame_bytes].copy_from_slice(&frame[..frame_bytes]);
}
} else {
let frame_off = written - VIRTIO_NET_HDR_SIZE;
buf[..to_write].copy_from_slice(&frame[frame_off..frame_off + to_write]);
}
written += to_write;
}
if flags & 1 == 0 || written >= total_len {
break;
}
idx = next as usize;
}
if written == 0 {
return false;
}
let used_entry_off = ctx.rx_queue.used_gpa as usize + 4 + ((*used_idx as usize) % q_size) * 8;
ctx.guest_mem.write_u32(used_entry_off, head_idx as u32);
ctx.guest_mem.write_u32(used_entry_off + 4, written as u32);
std::sync::atomic::fence(Ordering::Release);
*used_idx = used_idx.wrapping_add(1);
ctx.guest_mem
.write_u16(ctx.rx_queue.used_gpa as usize + 2, *used_idx);
true
}
pub fn net_rx_worker_loop(ctx: NetRxWorkerContext) {
tracing::info!(
"net-io worker started (fd={}, queue_size={})",
ctx.net_host_fd,
ctx.rx_queue.size
);
let kq = unsafe { libc::kqueue() };
if kq < 0 {
tracing::error!(
"net-io: kqueue creation failed: {}",
std::io::Error::last_os_error()
);
return;
}
let changelist = libc::kevent {
ident: ctx.net_host_fd as usize,
filter: libc::EVFILT_READ,
flags: libc::EV_ADD | libc::EV_ENABLE,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
};
let ret = unsafe {
libc::kevent(
kq,
&raw const changelist,
1,
std::ptr::null_mut(),
0,
std::ptr::null(),
)
};
if ret < 0 {
tracing::error!(
"net-io: kevent registration failed: {}",
std::io::Error::last_os_error()
);
unsafe { libc::close(kq) };
return;
}
let timeout = libc::timespec {
tv_sec: 0,
tv_nsec: POLL_TIMEOUT.as_nanos() as i64,
};
let mut used_idx = ctx.guest_mem.read_u16(ctx.rx_queue.used_gpa as usize + 2);
let mut pending_frames: u16 = 0;
let mut batch_start: Option<Instant> = None;
let mut old_used = used_idx;
loop {
if !ctx.running.load(Ordering::Relaxed) {
break;
}
let mut event = libc::kevent {
ident: 0,
filter: 0,
flags: 0,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
};
let nev = unsafe {
libc::kevent(
kq,
std::ptr::null(),
0,
&raw mut event,
1,
&raw const timeout,
)
};
if nev > 0 {
let mut frame_buf = [0u8; 2048];
old_used = used_idx;
for _ in 0..BATCH_SIZE {
let n = unsafe {
libc::read(
ctx.net_host_fd,
frame_buf.as_mut_ptr().cast::<libc::c_void>(),
frame_buf.len(),
)
};
if n <= 0 {
break; }
let frame = &frame_buf[..n as usize];
if inject_one_frame(&ctx, frame, &mut used_idx) {
pending_frames += 1;
if batch_start.is_none() {
batch_start = Some(Instant::now());
}
} else {
if pending_frames > 0 {
write_avail_event(&ctx, used_idx);
maybe_notify(&ctx, old_used, used_idx);
pending_frames = 0;
batch_start = None;
old_used = used_idx;
}
std::thread::sleep(Duration::from_micros(100));
break;
}
if pending_frames >= COALESCE_COUNT {
write_avail_event(&ctx, used_idx);
maybe_notify(&ctx, old_used, used_idx);
pending_frames = 0;
batch_start = None;
old_used = used_idx;
}
}
}
if pending_frames > 0 {
if let Some(start) = batch_start {
if start.elapsed() >= COALESCE_TIMEOUT {
write_avail_event(&ctx, used_idx);
maybe_notify(&ctx, old_used, used_idx);
pending_frames = 0;
batch_start = None;
}
}
}
}
if pending_frames > 0 {
write_avail_event(&ctx, used_idx);
trigger_net_irq(&ctx); }
unsafe { libc::close(kq) };
tracing::info!("net-io worker stopped");
}
fn write_avail_event(ctx: &NetRxWorkerContext, used_idx: u16) {
let q_size = ctx.rx_queue.size as usize;
let avail_event_off = ctx.rx_queue.used_gpa as usize + 4 + q_size * 8;
let avail_idx = ctx.guest_mem.read_u16(ctx.rx_queue.avail_gpa as usize + 2);
std::sync::atomic::fence(Ordering::Release);
ctx.guest_mem.write_u16(avail_event_off, avail_idx);
let _ = used_idx; }