use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use crate::device::{DeviceManager, DeviceType};
const BACKSTOP_TIMEOUT: Duration = Duration::from_millis(10);
const NO_PROGRESS_BACKOFF: Duration = Duration::from_millis(1);
const INT_VRING: u32 = 1;
pub struct VsockRxWorkerContext {
pub device_manager: Arc<DeviceManager>,
pub doorbell_rd: OwnedFd,
pub running: Arc<AtomicBool>,
pub exit_vcpus: Arc<dyn Fn() + Send + Sync>,
}
pub fn vsock_rx_worker_loop(ctx: VsockRxWorkerContext) {
let kq = unsafe { libc::kqueue() };
if kq < 0 {
tracing::error!(
"vsock-io: kqueue creation failed: {}",
std::io::Error::last_os_error()
);
return;
}
tracing::info!(
"vsock-io worker started (doorbell fd={})",
ctx.doorbell_rd.as_raw_fd()
);
let doorbell_fd = ctx.doorbell_rd.as_raw_fd();
let conns = ctx.device_manager.vsock_connections();
let timeout = libc::timespec {
tv_sec: 0,
#[allow(clippy::cast_possible_truncation)]
tv_nsec: BACKSTOP_TIMEOUT.as_nanos() as i64,
};
loop {
if !ctx.running.load(Ordering::Relaxed) {
break;
}
let mut changes: Vec<libc::kevent> = Vec::with_capacity(8);
changes.push(read_event(doorbell_fd));
{
let snapshot = conns
.lock()
.map(|mgr| mgr.connected_fds())
.unwrap_or_default();
for (_, fd) in snapshot {
changes.push(read_event(fd));
}
}
let mut events: Vec<libc::kevent> = Vec::with_capacity(changes.len() + 8);
let nchanges =
libc::c_int::try_from(changes.len()).expect("change list bounded by open fd count");
let nevents =
libc::c_int::try_from(events.capacity()).expect("event list bounded by open fd count");
let nev = unsafe {
libc::kevent(
kq,
changes.as_ptr(),
nchanges,
events.as_mut_ptr(),
nevents,
&raw const timeout,
)
};
if nev < 0 {
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::Interrupted {
continue;
}
tracing::error!("vsock-io: kevent wait failed: {err}");
break;
}
unsafe { events.set_len(nev as usize) };
if !ctx.running.load(Ordering::Relaxed) {
break;
}
let mut had_fd_data = false;
for ev in &events {
if ev.flags & libc::EV_ERROR != 0 {
continue; }
if ev.ident == doorbell_fd as usize {
drain_doorbell(doorbell_fd);
} else {
had_fd_data = true;
}
}
let injected = ctx.device_manager.poll_vsock_rx();
if injected {
ctx.device_manager
.raise_interrupt_for(DeviceType::VirtioVsock, INT_VRING);
(ctx.exit_vcpus)();
} else if had_fd_data {
std::thread::sleep(NO_PROGRESS_BACKOFF);
}
}
unsafe { libc::close(kq) };
tracing::info!("vsock-io worker stopped");
}
fn read_event(fd: RawFd) -> libc::kevent {
libc::kevent {
ident: fd as usize,
filter: libc::EVFILT_READ,
flags: libc::EV_ADD | libc::EV_ENABLE,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
}
}
fn drain_doorbell(fd: RawFd) {
let mut buf = [0u8; 64];
loop {
let n = unsafe { libc::read(fd, buf.as_mut_ptr().cast::<libc::c_void>(), buf.len()) };
if n <= 0 {
break;
}
}
}