#[cfg(not(test))]
use std::os::unix::io::AsRawFd;
#[cfg(not(test))]
use std::sync::Arc;
#[cfg(not(test))]
use std::sync::OnceLock;
#[cfg(not(test))]
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
#[cfg(not(test))]
use vm_memory::GuestMemoryMmap;
use vmm_sys_util::epoll::EventSet;
#[cfg(not(test))]
use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent};
#[cfg(not(test))]
use vmm_sys_util::eventfd::EventFd;
use super::DrainOutcome;
#[cfg(not(test))]
use super::{BlkQueue, BlkWorkerState, NUM_QUEUES, WorkerPlacement, drain_bracket_impl};
pub(crate) const RETRY_TIMER_MAX_NANOS: u64 = 1_000_000_000;
pub(crate) fn clamp_retry_nanos(wait_nanos: u64) -> u64 {
wait_nanos.clamp(1, RETRY_TIMER_MAX_NANOS)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum StallAction {
Continue,
ReDrain,
Sleep { nanos: u64 },
}
pub(crate) fn decide_stall_action(outcome: DrainOutcome) -> StallAction {
match outcome {
DrainOutcome::Done => StallAction::Continue,
DrainOutcome::ThrottleStalled { wait_nanos: 0 } => StallAction::ReDrain,
DrainOutcome::ThrottleStalled { wait_nanos } => StallAction::Sleep {
nanos: clamp_retry_nanos(wait_nanos),
},
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum WorkerAction {
Continue,
Sleep { nanos: u64 },
}
pub(crate) fn resolve_action(
first: StallAction,
redrain: impl FnOnce() -> DrainOutcome,
) -> WorkerAction {
match first {
StallAction::Continue => WorkerAction::Continue,
StallAction::Sleep { nanos } => WorkerAction::Sleep { nanos },
StallAction::ReDrain => match decide_stall_action(redrain()) {
StallAction::Continue => WorkerAction::Continue,
StallAction::Sleep { nanos } => WorkerAction::Sleep { nanos },
StallAction::ReDrain => WorkerAction::Sleep { nanos: 1 },
},
}
}
pub(crate) const KICK_TOKEN: u64 = 1;
pub(crate) const STOP_TOKEN: u64 = 2;
pub(crate) const THROTTLE_TOKEN: u64 = 3;
pub(crate) const PAUSE_TOKEN: u64 = 4;
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(crate) enum WorkerDispatchAction {
Stop,
Drain { throttle_token_fired: bool },
Pause,
Skip,
}
pub(crate) fn worker_dispatch_event(event_set: EventSet, token: u64) -> WorkerDispatchAction {
if event_set.contains(EventSet::ERROR) {
tracing::warn!(
?event_set,
token,
"virtio-blk worker: epoll event_set contains EPOLLERR; \
expected only on eventfd counter saturation \
(count == ULLONG_MAX) — fall through to per-token \
handler so the eventfd read drains the saturated \
counter back to 0"
);
}
if event_set.contains(EventSet::HANG_UP) {
tracing::warn!(
?event_set,
token,
"virtio-blk worker: epoll event_set contains EPOLLHUP; \
structurally impossible for eventfd/timerfd \
(eventfd_poll and timerfd_poll never set POLLHUP). \
Indicates a kernel-contract change or Epoll \
registration bug — log and fall through"
);
}
match token {
STOP_TOKEN => WorkerDispatchAction::Stop,
KICK_TOKEN => WorkerDispatchAction::Drain {
throttle_token_fired: false,
},
THROTTLE_TOKEN => WorkerDispatchAction::Drain {
throttle_token_fired: true,
},
PAUSE_TOKEN => WorkerDispatchAction::Pause,
_ => {
tracing::warn!(?event_set, token, "virtio-blk worker: unknown epoll token");
WorkerDispatchAction::Skip
}
}
}
#[cfg(not(test))]
#[allow(clippy::too_many_arguments)]
pub(crate) fn worker_thread_main(
mut state: BlkWorkerState,
mut queues: [BlkQueue; NUM_QUEUES],
mem: Arc<OnceLock<GuestMemoryMmap>>,
irq_evt: Arc<EventFd>,
interrupt_status: Arc<AtomicU32>,
device_status: Arc<AtomicU32>,
mem_unset_warned: Arc<AtomicBool>,
paused: Arc<AtomicBool>,
placement: WorkerPlacement,
kick_fd: EventFd,
stop_fd: EventFd,
pause_fd: EventFd,
parked_evt_slot: Arc<std::sync::Mutex<Option<Arc<EventFd>>>>,
) -> BlkWorkerState {
if let Some(cpu) = placement.service_cpu {
crate::vmm::vcpu::pin_current_thread(cpu, "virtio-blk worker");
} else if let Some(ref cpus) = placement.no_perf_cpus {
crate::vmm::vcpu::set_thread_cpumask(cpus, "virtio-blk worker");
}
paused.store(false, Ordering::Release);
let epoll = match Epoll::new() {
Ok(e) => e,
Err(e) => {
tracing::error!(%e, "virtio-blk worker: failed to create epoll instance; \
exiting (device IO will not be serviced)");
return state;
}
};
if let Err(e) = epoll.ctl(
ControlOperation::Add,
kick_fd.as_raw_fd(),
EpollEvent::new(EventSet::IN, KICK_TOKEN),
) {
tracing::error!(%e, "virtio-blk worker: failed to add kick_fd to epoll; exiting");
return state;
}
if let Err(e) = epoll.ctl(
ControlOperation::Add,
stop_fd.as_raw_fd(),
EpollEvent::new(EventSet::IN, STOP_TOKEN),
) {
tracing::error!(%e, "virtio-blk worker: failed to add stop_fd to epoll; exiting");
return state;
}
if let Err(e) = epoll.ctl(
ControlOperation::Add,
pause_fd.as_raw_fd(),
EpollEvent::new(EventSet::IN, PAUSE_TOKEN),
) {
tracing::error!(%e, "virtio-blk worker: failed to add pause_fd to epoll; exiting");
return state;
}
let timer_fd_raw = unsafe {
libc::timerfd_create(
libc::CLOCK_MONOTONIC,
libc::TFD_NONBLOCK | libc::TFD_CLOEXEC,
)
};
if timer_fd_raw < 0 {
tracing::error!(
err = std::io::Error::last_os_error().to_string(),
"virtio-blk worker: timerfd_create failed; exiting"
);
return state;
}
let timer_fd: std::fs::File =
unsafe { std::os::unix::io::FromRawFd::from_raw_fd(timer_fd_raw) };
if let Err(e) = epoll.ctl(
ControlOperation::Add,
timer_fd_raw,
EpollEvent::new(EventSet::IN, THROTTLE_TOKEN),
) {
tracing::error!(%e, "virtio-blk worker: failed to add timer_fd to epoll; exiting");
return state;
}
let mut events = [EpollEvent::default(); 4];
let mut last_known_blocked: bool = false;
loop {
let n = match epoll.wait(-1, &mut events) {
Ok(n) => n,
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
tracing::error!(%e, "virtio-blk worker: epoll_wait failed; exiting");
return state;
}
};
let mut should_drain = false;
let mut throttle_token_fired = false;
for ev in &events[..n] {
match worker_dispatch_event(ev.event_set(), ev.data()) {
WorkerDispatchAction::Stop => {
return state;
}
WorkerDispatchAction::Drain {
throttle_token_fired: tt_fired,
} => {
should_drain = true;
if tt_fired {
let mut buf = [0u8; 8];
use std::io::Read;
match (&timer_fd).read(&mut buf) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => {
tracing::warn!(
%e,
"virtio-blk worker: unexpected timerfd read error",
);
}
}
throttle_token_fired = true;
last_known_blocked = false;
}
}
WorkerDispatchAction::Pause => {
match pause_fd.read() {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(e) => {
tracing::warn!(
%e,
"virtio-blk worker: pause_fd read failed at PAUSE entry",
);
}
}
paused.store(true, Ordering::Release);
let guard = match parked_evt_slot.lock() {
Ok(g) => g,
Err(poisoned) => {
tracing::warn!(
"virtio-blk worker: parked_evt_slot lock poisoned; \
recovering inner data via PoisonError::into_inner"
);
poisoned.into_inner()
}
};
if let Some(ref evt) = *guard
&& let Err(e) = evt.write(1)
{
tracing::debug!(
err = %e,
"virtio-blk worker: parked_evt write failed (EAGAIN expected on counter saturation)"
);
}
while paused.load(Ordering::Acquire) {
std::thread::park_timeout(std::time::Duration::from_millis(10));
}
}
WorkerDispatchAction::Skip => {
}
}
}
if !should_drain {
continue;
}
match kick_fd.read() {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
}
Err(e) => {
tracing::warn!(%e, "virtio-blk worker: kick_fd read failed");
}
}
if last_known_blocked && !throttle_token_fired {
continue;
}
let Some(mem_ref) = mem.get() else {
if !mem_unset_warned.swap(true, Ordering::Relaxed) {
tracing::warn!(
"virtio-blk: queue notify before set_mem; \
dropping requests until guest memory is wired"
);
}
continue;
};
let outcome = drain_bracket_impl(
&mut state,
&mut queues,
mem_ref,
&irq_evt,
&interrupt_status,
&device_status,
);
let action = resolve_action(decide_stall_action(outcome), || {
tracing::trace!("virtio-blk worker: wait_nanos==0 inline re-drain");
drain_bracket_impl(
&mut state,
&mut queues,
mem_ref,
&irq_evt,
&interrupt_status,
&device_status,
)
});
match action {
WorkerAction::Sleep { nanos } => {
last_known_blocked = true;
let new_value = libc::itimerspec {
it_interval: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
it_value: libc::timespec {
tv_sec: (nanos / 1_000_000_000) as libc::time_t,
tv_nsec: (nanos % 1_000_000_000) as libc::c_long,
},
};
let rc = unsafe {
libc::timerfd_settime(
timer_fd_raw,
0, &new_value as *const _,
std::ptr::null_mut(),
)
};
if rc < 0 {
tracing::warn!(
err = std::io::Error::last_os_error().to_string(),
"virtio-blk worker: timerfd_settime failed; \
stalled chain will not auto-retry — guest may \
hang on this request until kernel.hung_task_timeout_secs \
(default 120s) fires or higher-layer retries"
);
}
}
WorkerAction::Continue => {
last_known_blocked = false;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use vmm_sys_util::epoll::{EpollEvent, EventSet};
#[test]
fn clamp_retry_nanos_zero_floors_at_one() {
assert_eq!(
clamp_retry_nanos(0),
1,
"wait_nanos==0 must be floored to 1ns to avoid \
timerfd_settime(it_value=0) disarming the timer",
);
}
#[test]
fn clamp_retry_nanos_saturates_at_cap() {
assert_eq!(
clamp_retry_nanos(u64::MAX),
RETRY_TIMER_MAX_NANOS,
"wait_nanos==u64::MAX must saturate at RETRY_TIMER_MAX_NANOS \
(1s) — well below the guest's hung-task watchdog (120s)",
);
assert_eq!(
clamp_retry_nanos(RETRY_TIMER_MAX_NANOS + 1),
RETRY_TIMER_MAX_NANOS,
);
assert_eq!(
clamp_retry_nanos(RETRY_TIMER_MAX_NANOS),
RETRY_TIMER_MAX_NANOS,
);
assert_eq!(
clamp_retry_nanos(RETRY_TIMER_MAX_NANOS - 1),
RETRY_TIMER_MAX_NANOS - 1,
);
assert_eq!(clamp_retry_nanos(1), 1);
assert_eq!(clamp_retry_nanos(500_000_000), 500_000_000);
}
#[test]
fn retry_timer_max_nanos_constant_pin() {
assert_eq!(RETRY_TIMER_MAX_NANOS, 1_000_000_000);
}
#[test]
fn decide_stall_action_done_is_continue() {
assert_eq!(
decide_stall_action(DrainOutcome::Done),
StallAction::Continue,
);
}
#[test]
fn decide_stall_action_zero_wait_is_redrain() {
assert_eq!(
decide_stall_action(DrainOutcome::ThrottleStalled { wait_nanos: 0 }),
StallAction::ReDrain,
);
}
#[test]
fn decide_stall_action_nonzero_wait_is_sleep_with_clamped_nanos() {
assert_eq!(
decide_stall_action(DrainOutcome::ThrottleStalled { wait_nanos: 1 }),
StallAction::Sleep { nanos: 1 },
);
assert_eq!(
decide_stall_action(DrainOutcome::ThrottleStalled {
wait_nanos: 500_000_000,
}),
StallAction::Sleep { nanos: 500_000_000 },
);
assert_eq!(
decide_stall_action(DrainOutcome::ThrottleStalled {
wait_nanos: RETRY_TIMER_MAX_NANOS,
}),
StallAction::Sleep {
nanos: RETRY_TIMER_MAX_NANOS,
},
);
assert_eq!(
decide_stall_action(DrainOutcome::ThrottleStalled {
wait_nanos: RETRY_TIMER_MAX_NANOS + 1,
}),
StallAction::Sleep {
nanos: RETRY_TIMER_MAX_NANOS,
},
);
assert_eq!(
decide_stall_action(DrainOutcome::ThrottleStalled {
wait_nanos: u64::MAX,
}),
StallAction::Sleep {
nanos: RETRY_TIMER_MAX_NANOS,
},
);
}
#[test]
fn decide_stall_action_is_pure() {
let inputs = [
DrainOutcome::Done,
DrainOutcome::ThrottleStalled { wait_nanos: 0 },
DrainOutcome::ThrottleStalled { wait_nanos: 1 },
DrainOutcome::ThrottleStalled { wait_nanos: 12345 },
DrainOutcome::ThrottleStalled {
wait_nanos: u64::MAX,
},
];
for input in inputs {
assert_eq!(
decide_stall_action(input),
decide_stall_action(input),
"decide_stall_action must be deterministic for {input:?}",
);
}
}
#[test]
fn decide_stall_action_redrain_downgrades_to_sleep_one_ns() {
let outcome1 = DrainOutcome::ThrottleStalled { wait_nanos: 0 };
assert_eq!(decide_stall_action(outcome1), StallAction::ReDrain);
let outcome2 = DrainOutcome::ThrottleStalled { wait_nanos: 0 };
let downgraded = match decide_stall_action(outcome2) {
StallAction::ReDrain => StallAction::Sleep { nanos: 1 },
other => other,
};
assert_eq!(downgraded, StallAction::Sleep { nanos: 1 });
assert_eq!(clamp_retry_nanos(0), 1);
}
#[test]
fn resolve_action_continue_skips_redrain() {
let mut redrain_called = false;
let action = resolve_action(StallAction::Continue, || {
redrain_called = true;
DrainOutcome::Done
});
assert_eq!(action, WorkerAction::Continue);
assert!(
!redrain_called,
"Continue must NOT invoke the inline-retry closure",
);
}
#[test]
fn resolve_action_sleep_skips_redrain() {
let mut redrain_called = false;
let action = resolve_action(StallAction::Sleep { nanos: 12345 }, || {
redrain_called = true;
DrainOutcome::Done
});
assert_eq!(action, WorkerAction::Sleep { nanos: 12345 });
assert!(
!redrain_called,
"Sleep must NOT invoke the inline-retry closure",
);
}
#[test]
fn resolve_action_redrain_done_is_continue() {
let mut call_count = 0;
let action = resolve_action(StallAction::ReDrain, || {
call_count += 1;
DrainOutcome::Done
});
assert_eq!(action, WorkerAction::Continue);
assert_eq!(
call_count, 1,
"ReDrain must invoke the inline-retry closure exactly once",
);
}
#[test]
fn resolve_action_redrain_sleep_passes_through() {
let mut call_count = 0;
let action = resolve_action(StallAction::ReDrain, || {
call_count += 1;
DrainOutcome::ThrottleStalled {
wait_nanos: 500_000_000,
}
});
assert_eq!(action, WorkerAction::Sleep { nanos: 500_000_000 });
assert_eq!(
call_count, 1,
"ReDrain must invoke the inline-retry closure exactly once",
);
}
#[test]
fn resolve_action_redrain_redrain_downgrades_to_sleep_one_ns() {
let mut call_count = 0;
let action = resolve_action(StallAction::ReDrain, || {
call_count += 1;
DrainOutcome::ThrottleStalled { wait_nanos: 0 }
});
assert_eq!(action, WorkerAction::Sleep { nanos: 1 });
assert_eq!(
call_count, 1,
"ReDrain followed by ReDrain must invoke the closure \
exactly once — the downgrade prevents a third call",
);
assert_eq!(clamp_retry_nanos(0), 1);
}
#[test]
fn worker_dispatch_stop_token_clean() {
assert_eq!(
worker_dispatch_event(EventSet::IN, STOP_TOKEN),
WorkerDispatchAction::Stop,
);
}
#[test]
fn worker_dispatch_kick_token_clean() {
assert_eq!(
worker_dispatch_event(EventSet::IN, KICK_TOKEN),
WorkerDispatchAction::Drain {
throttle_token_fired: false,
},
);
}
#[test]
fn worker_dispatch_throttle_token_sets_flag() {
assert_eq!(
worker_dispatch_event(EventSet::IN, THROTTLE_TOKEN),
WorkerDispatchAction::Drain {
throttle_token_fired: true,
},
);
}
#[test]
fn worker_dispatch_pause_token_clean() {
assert_eq!(
worker_dispatch_event(EventSet::IN, PAUSE_TOKEN),
WorkerDispatchAction::Pause,
);
}
#[test]
fn worker_dispatch_pause_token_with_epollerr_still_pauses() {
let event_set = EventSet::IN | EventSet::ERROR;
assert_eq!(
worker_dispatch_event(event_set, PAUSE_TOKEN),
WorkerDispatchAction::Pause,
"EPOLLERR on pause_fd must fall through to the pause arm \
so the entry-side drain clears the saturated counter",
);
}
#[test]
fn worker_dispatch_unknown_token_skips() {
for token in [0u64, 5, 99, u64::MAX] {
assert_eq!(
worker_dispatch_event(EventSet::IN, token),
WorkerDispatchAction::Skip,
"token {token} must dispatch to Skip",
);
}
}
#[test]
fn worker_dispatch_kick_token_with_epollerr_still_drains() {
let event_set = EventSet::IN | EventSet::ERROR;
assert_eq!(
worker_dispatch_event(event_set, KICK_TOKEN),
WorkerDispatchAction::Drain {
throttle_token_fired: false,
},
"EPOLLERR on eventfd indicates counter saturation; \
fall through to per-token drain so the read clears it",
);
}
#[test]
fn worker_dispatch_throttle_token_with_epollerr_still_drains() {
let event_set = EventSet::IN | EventSet::ERROR;
assert_eq!(
worker_dispatch_event(event_set, THROTTLE_TOKEN),
WorkerDispatchAction::Drain {
throttle_token_fired: true,
},
);
}
#[test]
fn worker_dispatch_stop_token_with_epollerr_still_stops() {
let event_set = EventSet::IN | EventSet::ERROR;
assert_eq!(
worker_dispatch_event(event_set, STOP_TOKEN),
WorkerDispatchAction::Stop,
);
}
#[test]
fn worker_dispatch_kick_token_with_epollhup_still_drains() {
let event_set = EventSet::IN | EventSet::HANG_UP;
assert_eq!(
worker_dispatch_event(event_set, KICK_TOKEN),
WorkerDispatchAction::Drain {
throttle_token_fired: false,
},
);
}
#[test]
fn worker_dispatch_kick_token_epollerr_alone_still_drains() {
let event_set = EventSet::ERROR;
assert_eq!(
worker_dispatch_event(event_set, KICK_TOKEN),
WorkerDispatchAction::Drain {
throttle_token_fired: false,
},
);
}
#[test]
fn worker_dispatch_all_flags_throttle_still_drains() {
let event_set = EventSet::IN | EventSet::ERROR | EventSet::HANG_UP;
assert_eq!(
worker_dispatch_event(event_set, THROTTLE_TOKEN),
WorkerDispatchAction::Drain {
throttle_token_fired: true,
},
);
}
#[test]
fn epoll_event_set_roundtrip_pin() {
let combo = EventSet::IN | EventSet::ERROR | EventSet::HANG_UP;
let ev = EpollEvent::new(combo, KICK_TOKEN);
assert_eq!(ev.data(), KICK_TOKEN);
assert!(ev.event_set().contains(EventSet::IN));
assert!(ev.event_set().contains(EventSet::ERROR));
assert!(ev.event_set().contains(EventSet::HANG_UP));
}
}