pub(crate) use std::fs::File;
pub(crate) use std::os::unix::io::AsRawFd;
pub(crate) use std::sync::Arc;
pub(crate) use std::sync::OnceLock;
pub(crate) use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::mpsc;
use std::thread;
pub(crate) use std::time::Duration;
pub(crate) use virtio_bindings::virtio_blk::{
VIRTIO_BLK_F_BLK_SIZE, VIRTIO_BLK_F_FLUSH, VIRTIO_BLK_F_RO, VIRTIO_BLK_F_SEG_MAX,
VIRTIO_BLK_F_SIZE_MAX, VIRTIO_BLK_ID_BYTES, VIRTIO_BLK_S_IOERR, VIRTIO_BLK_S_OK,
VIRTIO_BLK_S_UNSUPP, VIRTIO_BLK_T_FLUSH, VIRTIO_BLK_T_GET_ID, VIRTIO_BLK_T_IN,
VIRTIO_BLK_T_OUT,
};
pub(crate) use virtio_bindings::virtio_config::{
VIRTIO_CONFIG_S_ACKNOWLEDGE, VIRTIO_CONFIG_S_DRIVER, VIRTIO_CONFIG_S_DRIVER_OK,
VIRTIO_CONFIG_S_FAILED, VIRTIO_CONFIG_S_FEATURES_OK, VIRTIO_CONFIG_S_NEEDS_RESET,
VIRTIO_F_VERSION_1,
};
pub(crate) use virtio_bindings::virtio_ids::VIRTIO_ID_BLOCK;
#[allow(unused_imports)]
pub(crate) use virtio_bindings::virtio_mmio::{
VIRTIO_MMIO_CONFIG_GENERATION, VIRTIO_MMIO_DEVICE_FEATURES, VIRTIO_MMIO_DEVICE_FEATURES_SEL,
VIRTIO_MMIO_DEVICE_ID, VIRTIO_MMIO_DRIVER_FEATURES, VIRTIO_MMIO_DRIVER_FEATURES_SEL,
VIRTIO_MMIO_INT_CONFIG, VIRTIO_MMIO_INT_VRING, VIRTIO_MMIO_INTERRUPT_ACK,
VIRTIO_MMIO_INTERRUPT_STATUS, VIRTIO_MMIO_MAGIC_VALUE, VIRTIO_MMIO_QUEUE_AVAIL_HIGH,
VIRTIO_MMIO_QUEUE_AVAIL_LOW, VIRTIO_MMIO_QUEUE_DESC_HIGH, VIRTIO_MMIO_QUEUE_DESC_LOW,
VIRTIO_MMIO_QUEUE_NOTIFY, VIRTIO_MMIO_QUEUE_NUM, VIRTIO_MMIO_QUEUE_NUM_MAX,
VIRTIO_MMIO_QUEUE_READY, VIRTIO_MMIO_QUEUE_SEL, VIRTIO_MMIO_QUEUE_USED_HIGH,
VIRTIO_MMIO_QUEUE_USED_LOW, VIRTIO_MMIO_STATUS, VIRTIO_MMIO_VENDOR_ID, VIRTIO_MMIO_VERSION,
};
pub(crate) use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
#[allow(unused_imports)]
pub(crate) use virtio_queue::Error as VirtioQueueError;
#[cfg(test)]
use virtio_queue::Queue;
#[allow(unused_imports)]
pub(crate) use virtio_queue::QueueOwnedT;
#[cfg(not(test))]
use virtio_queue::QueueSync;
pub(crate) use virtio_queue::QueueT;
pub(crate) use vm_memory::{ByteValued, Bytes, GuestAddress, GuestMemory, GuestMemoryMmap};
use super::VirtioBlkCounters;
#[allow(unused_imports)]
pub(crate) use vmm_sys_util::epoll::{EpollEvent, EventSet};
pub(crate) use vmm_sys_util::eventfd::EventFd;
pub(crate) use super::super::disk_config::DiskThrottle;
pub(crate) const MMIO_MAGIC: u32 = 0x7472_6976; pub(crate) const MMIO_VERSION: u32 = 2; pub(crate) const VENDOR_ID: u32 = 0;
pub const VIRTIO_MMIO_SIZE: u64 = 0x1000;
pub(crate) const NUM_QUEUES: usize = 1;
pub(crate) const QUEUE_MAX_SIZE: u16 = 256;
pub(crate) const REQ_QUEUE: usize = 0;
#[cfg(not(test))]
pub(crate) type BlkQueue = QueueSync;
#[cfg(test)]
pub(crate) type BlkQueue = Queue;
pub const VIRTIO_BLK_SECTOR_SIZE: u32 = 512;
#[allow(dead_code)]
pub const VIRTIO_BLK_DEFAULT_CAPACITY_BYTES: u64 = 256 * 1024 * 1024;
pub(crate) const VIRTIO_BLK_SEG_MAX: u32 = 128;
pub(crate) const VIRTIO_BLK_SIZE_MAX: u32 = 1 << 20;
pub(crate) const VIRTIO_BLK_SERIAL: [u8; VIRTIO_BLK_ID_BYTES as usize] =
*b"ktstr-virtio-blk\0\0\0\0";
#[repr(C)]
#[derive(Copy, Clone, Default, Debug)]
pub(crate) struct VirtioBlkOutHdr {
pub(crate) type_: u32,
pub(crate) _ioprio: u32,
pub(crate) sector: u64,
}
unsafe impl vm_memory::ByteValued for VirtioBlkOutHdr {}
pub(crate) const VIRTIO_BLK_OUTHDR_SIZE: usize = std::mem::size_of::<VirtioBlkOutHdr>();
#[repr(C, packed)]
#[derive(Copy, Clone, Default, Debug)]
pub(crate) struct VirtioBlkGeometry {
pub(crate) cylinders: u16,
pub(crate) heads: u8,
pub(crate) sectors: u8,
}
#[repr(C, packed)]
#[derive(Copy, Clone, Default, Debug)]
pub(crate) struct VirtioBlkConfig {
pub(crate) capacity: u64,
pub(crate) size_max: u32,
pub(crate) seg_max: u32,
pub(crate) geometry: VirtioBlkGeometry,
pub(crate) blk_size: u32,
}
unsafe impl vm_memory::ByteValued for VirtioBlkConfig {}
unsafe impl vm_memory::ByteValued for VirtioBlkGeometry {}
pub(crate) const VIRTIO_BLK_CONFIG_SIZE: usize = std::mem::size_of::<VirtioBlkConfig>();
const _: () = assert!(VIRTIO_BLK_CONFIG_SIZE == 24);
const _: () = assert!(std::mem::offset_of!(VirtioBlkConfig, capacity) == 0x00);
const _: () = assert!(std::mem::offset_of!(VirtioBlkConfig, size_max) == 0x08);
const _: () = assert!(std::mem::offset_of!(VirtioBlkConfig, seg_max) == 0x0C);
const _: () = assert!(std::mem::offset_of!(VirtioBlkConfig, geometry) == 0x10);
const _: () = assert!(std::mem::offset_of!(VirtioBlkConfig, blk_size) == 0x14);
#[derive(Clone, Copy, Debug)]
pub(crate) struct ChainDescriptor {
pub(crate) addr: GuestAddress,
pub(crate) len: u32,
pub(crate) is_write_only: bool,
}
pub(crate) const S_ACK: u32 = VIRTIO_CONFIG_S_ACKNOWLEDGE;
pub(crate) const S_DRV: u32 = S_ACK | VIRTIO_CONFIG_S_DRIVER;
pub(crate) const S_FEAT: u32 = S_DRV | VIRTIO_CONFIG_S_FEATURES_OK;
#[cfg(test)]
pub(crate) const S_OK: u32 = S_FEAT | VIRTIO_CONFIG_S_DRIVER_OK;
use super::throttle::*;
#[cfg(not(test))]
use super::worker::worker_thread_main;
#[allow(clippy::too_many_arguments)]
pub(crate) fn publish_completion<Q: QueueT>(
mem: &GuestMemoryMmap,
q: &mut Q,
counters: &VirtioBlkCounters,
head: u16,
status_addr: GuestAddress,
status_byte: u8,
used_len: u32,
label: &'static str,
) -> bool {
if mem.write_slice(&[status_byte], status_addr).is_err() {
counters.record_io_error();
return false;
}
match q.add_used(mem, head, used_len) {
Ok(()) => true,
Err(e) => {
tracing::warn!(head, %e, label, "virtio-blk add_used failed");
counters.record_io_error();
false
}
}
}
pub(crate) struct BlkWorkerState {
pub(crate) backing: File,
pub(crate) ops_bucket: TokenBucket,
pub(crate) bytes_bucket: TokenBucket,
pub(crate) all_descs_scratch: Vec<ChainDescriptor>,
pub(crate) io_buf_scratch: Vec<u8>,
pub(crate) capacity_bytes: u64,
pub(crate) read_only: bool,
pub(crate) counters: Arc<VirtioBlkCounters>,
pub(crate) currently_stalled: bool,
pub(crate) queue_poisoned: bool,
}
pub(crate) struct BlkWorker {
pub(crate) queues: [BlkQueue; NUM_QUEUES],
pub(crate) read_only: bool,
pub(crate) counters: Arc<VirtioBlkCounters>,
pub(crate) engine: WorkerEngine,
}
pub(crate) enum WorkerEngine {
#[cfg(test)]
Inline(InlineEngine),
#[cfg(not(test))]
Spawned(SpawnedEngine),
}
#[cfg(test)]
pub(crate) struct InlineEngine {
pub(crate) state: BlkWorkerState,
}
#[cfg(test)]
impl BlkWorker {
pub(crate) fn state(&self) -> &BlkWorkerState {
let WorkerEngine::Inline(engine) = &self.engine;
&engine.state
}
pub(crate) fn state_mut(&mut self) -> &mut BlkWorkerState {
let WorkerEngine::Inline(engine) = &mut self.engine;
&mut engine.state
}
}
#[cfg(not(test))]
pub(crate) struct SpawnedEngine {
pub(crate) kick_fd: EventFd,
pub(crate) stop_fd: EventFd,
pub(crate) handle: Option<thread::JoinHandle<BlkWorkerState>>,
pub(crate) respawn_pending: Option<BlkWorkerState>,
}
pub(crate) static VIRTIO_BLK_INSTANCE_COUNTER: AtomicU64 = AtomicU64::new(0);
pub struct VirtioBlk {
pub(crate) queue_select: u32,
pub(crate) device_features_sel: u32,
pub(crate) driver_features_sel: u32,
pub(crate) driver_features: u64,
pub(crate) device_status: Arc<AtomicU32>,
pub(crate) interrupt_status: Arc<AtomicU32>,
pub(crate) config_generation: AtomicU32,
pub(crate) irq_evt: Arc<EventFd>,
pub(crate) mem: Arc<OnceLock<GuestMemoryMmap>>,
pub(crate) capacity_sectors: u64,
pub(crate) worker: BlkWorker,
pub(crate) mem_unset_warned: Arc<AtomicBool>,
pub(crate) throttle: DiskThrottle,
pub(crate) instance_id: u64,
pub(crate) pause_evt: Arc<EventFd>,
pub(crate) paused: Arc<AtomicBool>,
pub(crate) parked_evt: Arc<std::sync::Mutex<Option<Arc<EventFd>>>>,
pub(crate) worker_placement: WorkerPlacement,
}
#[allow(dead_code)]
#[derive(Debug, Clone, Default)]
pub struct WorkerPlacement {
pub service_cpu: Option<usize>,
pub no_perf_cpus: Option<Vec<usize>>,
}
impl VirtioBlk {
#[allow(dead_code)]
pub fn new(backing: File, capacity_bytes: u64, throttle: DiskThrottle) -> Self {
Self::with_options(backing, capacity_bytes, throttle, false)
}
pub fn with_options(
backing: File,
capacity_bytes: u64,
throttle: DiskThrottle,
read_only: bool,
) -> Self {
let irq_evt = Arc::new(
EventFd::new(libc::EFD_NONBLOCK).expect("failed to create virtio-blk irq eventfd"),
);
if capacity_bytes < VIRTIO_BLK_SECTOR_SIZE as u64 && capacity_bytes != 0 {
tracing::warn!(
capacity_bytes,
sector_size = VIRTIO_BLK_SECTOR_SIZE,
"virtio-blk capacity_bytes smaller than one sector; clamping \
capacity_sectors to 0 (every IO will be rejected)"
);
}
let capacity_sectors = capacity_bytes / VIRTIO_BLK_SECTOR_SIZE as u64;
let capacity_bytes = capacity_sectors * VIRTIO_BLK_SECTOR_SIZE as u64;
let (ops_bucket, bytes_bucket) = buckets_from_throttle(throttle);
let counters = Arc::new(VirtioBlkCounters::default());
let state = BlkWorkerState {
backing,
ops_bucket,
bytes_bucket,
all_descs_scratch: Vec::with_capacity(VIRTIO_BLK_SEG_MAX as usize + 2),
io_buf_scratch: Vec::new(),
capacity_bytes,
read_only,
counters: Arc::clone(&counters),
currently_stalled: false,
queue_poisoned: false,
};
let interrupt_status = Arc::new(AtomicU32::new(0));
let device_status = Arc::new(AtomicU32::new(0));
let mem = Arc::new(OnceLock::new());
let mem_unset_warned = Arc::new(AtomicBool::new(false));
let pause_evt = Arc::new(
EventFd::new(libc::EFD_NONBLOCK).expect("failed to create virtio-blk pause eventfd"),
);
let paused = Arc::new(AtomicBool::new(true));
let queues = [BlkQueue::new(QUEUE_MAX_SIZE).expect("valid queue size")];
#[cfg(test)]
let engine = WorkerEngine::Inline(InlineEngine { state });
#[cfg(not(test))]
let engine = {
let kick_fd =
EventFd::new(libc::EFD_NONBLOCK).expect("failed to create virtio-blk kick eventfd");
let stop_fd =
EventFd::new(libc::EFD_NONBLOCK).expect("failed to create virtio-blk stop eventfd");
WorkerEngine::Spawned(SpawnedEngine {
kick_fd,
stop_fd,
handle: None,
respawn_pending: Some(state),
})
};
let worker = BlkWorker {
queues,
read_only,
counters,
engine,
};
VirtioBlk {
queue_select: 0,
device_features_sel: 0,
driver_features_sel: 0,
driver_features: 0,
device_status,
interrupt_status,
config_generation: AtomicU32::new(0),
irq_evt,
mem,
capacity_sectors,
worker,
mem_unset_warned,
throttle,
instance_id: VIRTIO_BLK_INSTANCE_COUNTER.fetch_add(1, Ordering::Relaxed),
pause_evt,
paused,
parked_evt: Arc::new(std::sync::Mutex::new(None)),
worker_placement: WorkerPlacement::default(),
}
}
pub fn set_parked_evt(&self, evt: Arc<EventFd>) {
if let Ok(mut guard) = self.parked_evt.lock() {
*guard = Some(evt);
}
}
pub fn set_worker_placement(&mut self, placement: WorkerPlacement) {
self.worker_placement = placement;
}
pub fn irq_evt(&self) -> &EventFd {
&self.irq_evt
}
pub fn set_mem(&mut self, mem: GuestMemoryMmap) {
if self.mem.set(mem).is_err() {
tracing::warn!(
"virtio-blk: set_mem called on already-initialised \
device; guest memory binding unchanged (mem is set \
once at boot and preserved across reset())"
);
}
}
#[allow(dead_code)]
pub fn capacity_sectors(&self) -> u64 {
self.capacity_sectors
}
pub fn counters(&self) -> Arc<VirtioBlkCounters> {
Arc::clone(&self.worker.counters)
}
pub fn paused_handle(&self) -> Arc<AtomicBool> {
Arc::clone(&self.paused)
}
pub(crate) fn device_features(&self) -> u64 {
let mut feats = (1u64 << VIRTIO_F_VERSION_1)
| (1u64 << VIRTIO_BLK_F_BLK_SIZE)
| (1u64 << VIRTIO_BLK_F_SEG_MAX)
| (1u64 << VIRTIO_BLK_F_SIZE_MAX)
| (1u64 << VIRTIO_BLK_F_FLUSH)
| (1u64 << VIRTIO_RING_F_EVENT_IDX);
if self.worker.read_only {
feats |= 1u64 << VIRTIO_BLK_F_RO;
}
feats
}
pub(crate) fn selected_queue(&self) -> Option<usize> {
let idx = self.queue_select as usize;
if idx < NUM_QUEUES { Some(idx) } else { None }
}
pub(crate) fn queue_config_allowed(&self) -> bool {
let status = self.device_status.load(Ordering::Acquire);
status & S_FEAT == S_FEAT && status & VIRTIO_CONFIG_S_DRIVER_OK == 0
}
pub(crate) fn features_write_allowed(&self) -> bool {
let status = self.device_status.load(Ordering::Acquire);
status & S_DRV == S_DRV && status & VIRTIO_CONFIG_S_FEATURES_OK == 0
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn handle_read_vectored_impl(
backing: &File,
capacity_bytes: u64,
counters: &VirtioBlkCounters,
mem: &GuestMemoryMmap,
sector: u64,
data_segments: &[ChainDescriptor],
data_len: u64,
) -> (u8, u32) {
let Some(base_offset) = sector.checked_mul(VIRTIO_BLK_SECTOR_SIZE as u64) else {
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
};
if base_offset
.checked_add(data_len)
.is_none_or(|end| end > capacity_bytes)
{
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
}
let mut iovecs: Vec<libc::iovec> = Vec::with_capacity(VIRTIO_BLK_SEG_MAX as usize + 2);
let mut _guards: Vec<vm_memory::volatile_memory::PtrGuardMut> =
Vec::with_capacity(VIRTIO_BLK_SEG_MAX as usize + 2);
for seg in data_segments {
if !seg.is_write_only {
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
}
let len = seg.len as usize;
if len == 0 {
continue;
}
for slice_result in mem.get_slices(seg.addr, len) {
let slice = match slice_result {
Ok(s) => s,
Err(_) => {
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
}
};
let guard = slice.ptr_guard_mut();
iovecs.push(libc::iovec {
iov_base: guard.as_ptr() as *mut libc::c_void,
iov_len: slice.len(),
});
_guards.push(guard);
}
}
let bytes_from_backing: u64 = if iovecs.is_empty() {
0
} else {
let r = unsafe {
libc::preadv(
backing.as_raw_fd(),
iovecs.as_ptr(),
iovecs.len() as libc::c_int,
base_offset as libc::off_t,
)
};
if r < 0 {
let e = std::io::Error::last_os_error();
tracing::warn!(sector, %e, "virtio-blk preadv error");
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
}
r as u64
};
if bytes_from_backing < data_len {
let mut filled = bytes_from_backing;
let mut to_zero = data_len - bytes_from_backing;
const ZERO_BUF_LEN: usize = 65536;
let zeros = [0u8; ZERO_BUF_LEN];
for seg in data_segments {
if to_zero == 0 {
break;
}
let seg_len = seg.len as u64;
if filled >= seg_len {
filled -= seg_len;
continue;
}
let seg_offset = filled as u32;
let seg_remaining = (seg_len - filled).min(to_zero) as u32;
let Some(zero_addr_u64) = seg.addr.0.checked_add(seg_offset as u64) else {
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
};
let mut zero_addr = GuestAddress(zero_addr_u64);
let mut remaining = seg_remaining;
while remaining > 0 {
let chunk = (remaining as usize).min(ZERO_BUF_LEN);
if mem.write_slice(&zeros[..chunk], zero_addr).is_err() {
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
}
let Some(next) = zero_addr.0.checked_add(chunk as u64) else {
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
};
zero_addr = GuestAddress(next);
remaining -= chunk as u32;
}
to_zero -= seg_remaining as u64;
filled = 0;
}
}
counters.record_read(bytes_from_backing);
let bytes_to_guest = data_len as u32;
(VIRTIO_BLK_S_OK as u8, bytes_to_guest + 1)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn handle_write_vectored_impl(
backing: &File,
capacity_bytes: u64,
counters: &VirtioBlkCounters,
mem: &GuestMemoryMmap,
sector: u64,
data_segments: &[ChainDescriptor],
data_len: u64,
) -> (u8, u32) {
let Some(base_offset) = sector.checked_mul(VIRTIO_BLK_SECTOR_SIZE as u64) else {
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
};
if base_offset
.checked_add(data_len)
.is_none_or(|end| end > capacity_bytes)
{
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
}
let mut iovecs: Vec<libc::iovec> = Vec::with_capacity(VIRTIO_BLK_SEG_MAX as usize + 2);
let mut _guards: Vec<vm_memory::volatile_memory::PtrGuard> =
Vec::with_capacity(VIRTIO_BLK_SEG_MAX as usize + 2);
for seg in data_segments {
if seg.is_write_only {
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
}
let len = seg.len as usize;
if len == 0 {
continue;
}
for slice_result in mem.get_slices(seg.addr, len) {
let slice = match slice_result {
Ok(s) => s,
Err(_) => {
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
}
};
let guard = slice.ptr_guard();
iovecs.push(libc::iovec {
iov_base: guard.as_ptr() as *mut libc::c_void,
iov_len: slice.len(),
});
_guards.push(guard);
}
}
if iovecs.is_empty() {
counters.record_write(0);
return (VIRTIO_BLK_S_OK as u8, 1);
}
let r = unsafe {
libc::pwritev(
backing.as_raw_fd(),
iovecs.as_ptr(),
iovecs.len() as libc::c_int,
base_offset as libc::off_t,
)
};
if r < 0 {
let e = std::io::Error::last_os_error();
tracing::warn!(sector, %e, "virtio-blk pwritev error");
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
}
let total_written = r as u64;
if total_written != data_len {
tracing::warn!(
sector,
total_written,
data_len,
"virtio-blk pwritev short write"
);
counters.record_io_error();
return (VIRTIO_BLK_S_IOERR as u8, 1);
}
counters.record_write(total_written);
(VIRTIO_BLK_S_OK as u8, 1)
}
pub(crate) fn classify_pre_throttle(
req_type: u32,
read_only: bool,
counters: &VirtioBlkCounters,
) -> Option<(u8, u32)> {
match req_type {
VIRTIO_BLK_T_OUT if read_only => {
counters.record_io_error();
Some((VIRTIO_BLK_S_IOERR as u8, 1))
}
VIRTIO_BLK_T_FLUSH if read_only => {
counters.record_flush();
Some((VIRTIO_BLK_S_OK as u8, 1))
}
VIRTIO_BLK_T_IN | VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH | VIRTIO_BLK_T_GET_ID => None,
_ => Some((VIRTIO_BLK_S_UNSUPP as u8, 1)),
}
}
pub(crate) fn process_requests(&mut self) {
#[cfg(test)]
{
self.drain_inline();
}
#[cfg(not(test))]
{
let WorkerEngine::Spawned(eng) = &self.worker.engine;
let _ = eng.kick_fd.write(1);
}
}
#[cfg(test)]
pub(crate) fn drain_inline(&mut self) {
let Some(mem) = self.mem.get() else {
if !self.mem_unset_warned.swap(true, Ordering::Relaxed) {
tracing::warn!(
"virtio-blk: queue notify before set_mem; \
dropping requests until guest memory is wired"
);
}
return;
};
let WorkerEngine::Inline(engine) = &mut self.worker.engine;
let _ = super::drain_bracket_impl(
&mut engine.state,
&mut self.worker.queues,
mem,
&self.irq_evt,
&self.interrupt_status,
&self.device_status,
);
}
}
impl VirtioBlk {
pub fn mmio_read(&self, offset: u64, data: &mut [u8]) {
if offset >= 0x100 {
self.read_blk_config(offset - 0x100, data);
return;
}
if data.len() != 4 {
data.fill(0xff);
return;
}
let val: u32 = match offset as u32 {
VIRTIO_MMIO_MAGIC_VALUE => MMIO_MAGIC,
VIRTIO_MMIO_VERSION => MMIO_VERSION,
VIRTIO_MMIO_DEVICE_ID => VIRTIO_ID_BLOCK,
VIRTIO_MMIO_VENDOR_ID => VENDOR_ID,
VIRTIO_MMIO_DEVICE_FEATURES => {
let page = self.device_features_sel;
if page == 0 {
self.device_features() as u32
} else if page == 1 {
(self.device_features() >> 32) as u32
} else {
0
}
}
VIRTIO_MMIO_QUEUE_NUM_MAX => self
.selected_queue()
.map(|i| self.worker.queues[i].max_size() as u32)
.unwrap_or(0),
VIRTIO_MMIO_QUEUE_READY => self
.selected_queue()
.map(|i| self.worker.queues[i].ready() as u32)
.unwrap_or(0),
VIRTIO_MMIO_INTERRUPT_STATUS => self.interrupt_status.load(Ordering::Acquire),
VIRTIO_MMIO_STATUS => self.device_status.load(Ordering::Acquire),
VIRTIO_MMIO_CONFIG_GENERATION => self.config_generation.load(Ordering::Acquire),
_ => 0,
};
data.copy_from_slice(&val.to_le_bytes());
}
pub(crate) fn read_blk_config(&self, offset: u64, data: &mut [u8]) {
let cfg = VirtioBlkConfig {
capacity: self.capacity_sectors,
size_max: VIRTIO_BLK_SIZE_MAX,
seg_max: VIRTIO_BLK_SEG_MAX,
geometry: VirtioBlkGeometry::default(),
blk_size: VIRTIO_BLK_SECTOR_SIZE,
};
let cfg_bytes = cfg.as_slice();
let len = data.len();
let start = offset as usize;
if start >= cfg_bytes.len() {
data.fill(0);
return;
}
let end = (start + len).min(cfg_bytes.len());
let n = end - start;
data[..n].copy_from_slice(&cfg_bytes[start..end]);
data[n..].fill(0);
}
pub fn mmio_write(&mut self, offset: u64, data: &[u8]) {
if offset >= 0x100 {
return;
}
if data.len() != 4 {
return;
}
let val = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
match offset as u32 {
VIRTIO_MMIO_DEVICE_FEATURES_SEL => self.device_features_sel = val,
VIRTIO_MMIO_DRIVER_FEATURES_SEL => self.driver_features_sel = val,
VIRTIO_MMIO_DRIVER_FEATURES => {
if !self.features_write_allowed() {
return;
}
let page = self.driver_features_sel;
if page == 0 {
self.driver_features =
(self.driver_features & 0xFFFF_FFFF_0000_0000) | val as u64;
} else if page == 1 {
self.driver_features =
(self.driver_features & 0x0000_0000_FFFF_FFFF) | ((val as u64) << 32);
}
}
VIRTIO_MMIO_QUEUE_SEL => self.queue_select = val,
VIRTIO_MMIO_QUEUE_NUM if self.queue_config_allowed() => {
if let Some(i) = self.selected_queue() {
self.worker.queues[i].set_size(val as u16);
}
}
VIRTIO_MMIO_QUEUE_READY if self.queue_config_allowed() => {
if let Some(i) = self.selected_queue() {
self.worker.queues[i].set_ready(val == 1);
}
}
VIRTIO_MMIO_QUEUE_NOTIFY => {
let idx = val as usize;
if idx == REQ_QUEUE {
self.process_requests();
}
}
VIRTIO_MMIO_INTERRUPT_ACK => {
self.interrupt_status.fetch_and(!val, Ordering::AcqRel);
}
VIRTIO_MMIO_STATUS => {
if val == 0 {
self.reset();
} else {
self.set_status(val);
}
}
VIRTIO_MMIO_QUEUE_DESC_LOW if self.queue_config_allowed() => {
if let Some(i) = self.selected_queue() {
self.worker.queues[i].set_desc_table_address(Some(val), None);
}
}
VIRTIO_MMIO_QUEUE_DESC_HIGH if self.queue_config_allowed() => {
if let Some(i) = self.selected_queue() {
self.worker.queues[i].set_desc_table_address(None, Some(val));
}
}
VIRTIO_MMIO_QUEUE_AVAIL_LOW if self.queue_config_allowed() => {
if let Some(i) = self.selected_queue() {
self.worker.queues[i].set_avail_ring_address(Some(val), None);
}
}
VIRTIO_MMIO_QUEUE_AVAIL_HIGH if self.queue_config_allowed() => {
if let Some(i) = self.selected_queue() {
self.worker.queues[i].set_avail_ring_address(None, Some(val));
}
}
VIRTIO_MMIO_QUEUE_USED_LOW if self.queue_config_allowed() => {
if let Some(i) = self.selected_queue() {
self.worker.queues[i].set_used_ring_address(Some(val), None);
}
}
VIRTIO_MMIO_QUEUE_USED_HIGH if self.queue_config_allowed() => {
if let Some(i) = self.selected_queue() {
self.worker.queues[i].set_used_ring_address(None, Some(val));
}
}
_ => {}
}
}
pub(crate) fn set_status(&mut self, val: u32) {
let mut current_status = self.device_status.load(Ordering::Acquire);
const MAX_CAS_RETRIES: u32 = 4;
let mut cas_retries: u32 = 0;
loop {
if val & current_status != current_status {
if current_status & VIRTIO_CONFIG_S_NEEDS_RESET != 0 {
tracing::warn!(
device_status = current_status,
requested = val,
"virtio-blk set_status rejected — device in \
NEEDS_RESET state from prior queue poison; \
guest must write STATUS=0 to reset before any \
further FSM advance can succeed"
);
} else {
tracing::warn!(
device_status = current_status,
requested = val,
"virtio-blk set_status rejected — attempted to clear \
a previously-set status bit without a full reset \
(virtio-v1.2 §3.1.1: status bits are monotone within \
a driver session)"
);
}
return;
}
let new_bits = val & !current_status;
if new_bits == 0 {
return;
}
if new_bits == VIRTIO_CONFIG_S_FAILED {
match self.device_status.compare_exchange(
current_status,
val,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {
tracing::warn!(
old = current_status,
new = val,
"virtio-blk set_status: guest set FAILED status \
(virtio-v1.2 §2.1.1 bit 0x80 — driver gave up on \
device probe). Stored without further FSM advance.",
);
return;
}
Err(observed) => {
debug_assert_eq!(
observed & !current_status & !VIRTIO_CONFIG_S_NEEDS_RESET,
0,
"device_status race: observed bits beyond NEEDS_RESET — \
worker invariant violated (snapshot={current_status:#x}, \
observed={observed:#x})",
);
cas_retries += 1;
if cas_retries >= MAX_CAS_RETRIES {
tracing::error!(
device_status = observed,
requested = val,
retries = cas_retries,
"virtio-blk set_status abandoned — \
CAS retry budget exhausted on FAILED \
store; either the worker invariant is \
violated or a hardware live-lock is \
starving the vCPU thread; bailing \
without advancing the FSM",
);
return;
}
current_status = observed;
continue;
}
}
}
let valid = match new_bits {
VIRTIO_CONFIG_S_ACKNOWLEDGE => current_status == 0,
VIRTIO_CONFIG_S_DRIVER => current_status == S_ACK,
VIRTIO_CONFIG_S_FEATURES_OK => {
current_status == S_DRV
&& self.driver_features & (1u64 << VIRTIO_F_VERSION_1) != 0
&& self.driver_features & !self.device_features() == 0
}
VIRTIO_CONFIG_S_DRIVER_OK => current_status == S_FEAT,
_ => false,
};
if valid {
match self.device_status.compare_exchange(
current_status,
val,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {}
Err(observed) => {
debug_assert_eq!(
observed & !current_status & !VIRTIO_CONFIG_S_NEEDS_RESET,
0,
"device_status race: observed bits beyond NEEDS_RESET — \
worker invariant violated (snapshot={current_status:#x}, \
observed={observed:#x})",
);
cas_retries += 1;
if cas_retries >= MAX_CAS_RETRIES {
tracing::error!(
device_status = observed,
requested = val,
retries = cas_retries,
"virtio-blk set_status abandoned — \
CAS retry budget exhausted; either the \
worker invariant is violated or a \
hardware live-lock is starving the \
vCPU thread; bailing without \
advancing the FSM",
);
return;
}
current_status = observed;
continue;
}
}
if new_bits == VIRTIO_CONFIG_S_FEATURES_OK
&& self.driver_features & (1u64 << VIRTIO_RING_F_EVENT_IDX) != 0
{
self.worker.queues[REQ_QUEUE].set_event_idx(true);
}
#[cfg(not(test))]
if new_bits == VIRTIO_CONFIG_S_DRIVER_OK {
self.consume_pending_respawn();
}
return;
}
if new_bits == VIRTIO_CONFIG_S_FEATURES_OK && current_status == S_DRV {
if self.driver_features & (1u64 << VIRTIO_F_VERSION_1) == 0 {
tracing::warn!(
driver_features = ?self.driver_features,
"FEATURES_OK rejected — VIRTIO_F_VERSION_1 not negotiated; \
legacy/transitional driver against modern-only device",
);
} else {
let unadvertised = self.driver_features & !self.device_features();
if unadvertised != 0 {
tracing::warn!(
driver_features = ?self.driver_features,
device_features = ?self.device_features(),
unadvertised = ?unadvertised,
"FEATURES_OK rejected — driver acked unadvertised \
feature bits; subset rule (virtio-v1.2 §3.1.1) \
violated",
);
}
}
} else if current_status & VIRTIO_CONFIG_S_NEEDS_RESET != 0 {
tracing::warn!(
device_status = current_status,
requested = val,
new_bits = new_bits,
"virtio-blk set_status rejected — device in \
NEEDS_RESET state from prior queue poison; \
guest must write STATUS=0 to reset before any \
further FSM advance can succeed",
);
} else {
tracing::warn!(
device_status = current_status,
requested = val,
new_bits = new_bits,
"virtio-blk set_status rejected — illegal FSM transition \
(virtio-v1.2 §3.1.1 ordering: ACK → DRIVER → FEATURES_OK \
→ DRIVER_OK, one bit at a time)",
);
}
return;
}
}
pub(crate) fn reset(&mut self) {
self.queue_select = 0;
self.device_features_sel = 0;
self.driver_features_sel = 0;
self.driver_features = 0;
self.config_generation.fetch_add(1, Ordering::Release);
#[cfg(test)]
self.reset_engine_inline();
#[cfg(not(test))]
self.reset_engine_spawned();
let _ = self.irq_evt.read();
let _ = self.pause_evt.read();
self.interrupt_status.store(0, Ordering::Release);
self.device_status.store(0, Ordering::Release);
self.mem_unset_warned.store(false, Ordering::Relaxed);
}
#[cfg(test)]
pub(crate) fn reset_engine_inline(&mut self) {
for q in &mut self.worker.queues {
q.reset();
}
let WorkerEngine::Inline(engine) = &mut self.worker.engine;
let (ops_bucket, bytes_bucket) = buckets_from_throttle(self.throttle);
engine.state.ops_bucket = ops_bucket;
engine.state.bytes_bucket = bytes_bucket;
engine.state.all_descs_scratch.clear();
engine.state.io_buf_scratch.clear();
if engine.state.currently_stalled {
engine.state.currently_stalled = false;
engine.state.counters.record_throttle_pending_dec();
}
engine.state.queue_poisoned = false;
}
#[cfg(not(test))]
pub(crate) fn reset_engine_spawned(&mut self) {
let already_pending = {
let WorkerEngine::Spawned(eng) = &self.worker.engine;
eng.respawn_pending.is_some()
};
if !already_pending {
self.resume();
let reclaimed = self.stop_worker_and_reclaim_state();
self.paused.store(true, Ordering::Release);
let WorkerEngine::Spawned(eng) = &mut self.worker.engine;
eng.respawn_pending = reclaimed;
}
for q in &mut self.worker.queues {
q.reset();
}
}
#[cfg(not(test))]
pub(crate) fn stop_worker_and_reclaim_state(&mut self) -> Option<BlkWorkerState> {
let WorkerEngine::Spawned(eng) = &mut self.worker.engine;
let stop_fd = eng.stop_fd.as_raw_fd();
let capacity_sectors = self.capacity_sectors;
let instance_id = self.instance_id;
signal_worker_stop(&eng.stop_fd, stop_fd, instance_id, capacity_sectors);
let WorkerEngine::Spawned(eng) = &mut self.worker.engine;
let handle = eng.handle.take()?;
match join_worker_with_timeout(handle, RESET_JOIN_TIMEOUT) {
JoinWithTimeoutOutcome::Joined(state) => Some(state),
JoinWithTimeoutOutcome::Panicked(payload) => {
tracing::error!(
panic = panic_payload_str(&*payload),
stop_fd,
capacity_sectors,
instance_id,
"virtio-blk worker thread panicked during reset; \
no state to reclaim — device will not service IO \
until a fresh VirtioBlk is constructed"
);
None
}
JoinWithTimeoutOutcome::TimedOut => {
tracing::warn!(
timeout_s = RESET_JOIN_TIMEOUT.as_secs_f32(),
stop_fd,
capacity_sectors,
instance_id,
"virtio-blk worker did not exit within \
RESET_JOIN_TIMEOUT of stop_fd during reset; \
leaking the worker thread to avoid blocking the \
vCPU thread (which the freeze coordinator may \
target with SIGRTMIN). Device enters the \
permanent-workerless state — guests will hang \
on every request until \
kernel.hung_task_timeout_secs (default 120 s) \
fires, and only constructing a fresh VirtioBlk \
recovers IO service. \
hint: identify the wedged device by stop_fd / \
instance_id / capacity_sectors above. \
hint: check `dmesg` for the backing fd's \
storage path stalling on I/O, or kill -USR1 \
the host process to dump worker thread \
backtraces."
);
None
}
JoinWithTimeoutOutcome::HelperSpawnFailed => {
tracing::error!(
stop_fd,
capacity_sectors,
instance_id,
"virtio-blk reset helper thread spawn failed; \
detaching worker without join — device enters \
the permanent-workerless state"
);
None
}
JoinWithTimeoutOutcome::HelperDisconnected => {
tracing::error!(
stop_fd,
capacity_sectors,
instance_id,
"virtio-blk reset helper thread terminated \
without forwarding the worker join result; \
device enters the permanent-workerless state"
);
None
}
}
}
#[cfg(not(test))]
pub(crate) fn consume_pending_respawn(&mut self) {
let pending = {
let WorkerEngine::Spawned(eng) = &mut self.worker.engine;
eng.respawn_pending.take()
};
if let Some(state) = pending {
self.respawn_worker(state);
}
}
#[cfg(not(test))]
pub(crate) fn respawn_worker(&mut self, mut state: BlkWorkerState) {
let (ops_bucket, bytes_bucket) = buckets_from_throttle(self.throttle);
state.ops_bucket = ops_bucket;
state.bytes_bucket = bytes_bucket;
state.all_descs_scratch.clear();
state.io_buf_scratch.clear();
if state.currently_stalled {
state.currently_stalled = false;
state.counters.record_throttle_pending_dec();
}
state.queue_poisoned = false;
let kick_fd = match EventFd::new(libc::EFD_NONBLOCK) {
Ok(fd) => fd,
Err(e) => {
tracing::error!(
%e,
"virtio-blk reset: kick eventfd creation failed; \
leaving device without a worker — IO will not \
be serviced until reconstruction"
);
return;
}
};
let stop_fd = match EventFd::new(libc::EFD_NONBLOCK) {
Ok(fd) => fd,
Err(e) => {
tracing::error!(
%e,
"virtio-blk reset: stop eventfd creation failed; \
leaving device without a worker — IO will not \
be serviced until reconstruction"
);
return;
}
};
let worker_kick = match kick_fd.try_clone() {
Ok(fd) => fd,
Err(e) => {
tracing::error!(
%e,
"virtio-blk reset: kick eventfd clone failed; \
leaving device without a worker"
);
return;
}
};
let worker_stop = match stop_fd.try_clone() {
Ok(fd) => fd,
Err(e) => {
tracing::error!(
%e,
"virtio-blk reset: stop eventfd clone failed; \
leaving device without a worker"
);
return;
}
};
let pause_fd = match self.pause_evt.try_clone() {
Ok(fd) => fd,
Err(e) => {
tracing::error!(
%e,
"virtio-blk reset: pause eventfd clone failed; \
leaving device without a worker"
);
return;
}
};
let worker_queues = [self.worker.queues[REQ_QUEUE].clone()];
let worker_mem = Arc::clone(&self.mem);
let worker_irq = Arc::clone(&self.irq_evt);
let worker_status = Arc::clone(&self.interrupt_status);
let worker_device_status = Arc::clone(&self.device_status);
let worker_warned = Arc::clone(&self.mem_unset_warned);
let worker_paused = Arc::clone(&self.paused);
let worker_parked_evt_slot = Arc::clone(&self.parked_evt);
let worker_placement = self.worker_placement.clone();
let handle = match thread::Builder::new()
.name("ktstr-vblk".to_string())
.spawn(move || {
worker_thread_main(
state,
worker_queues,
worker_mem,
worker_irq,
worker_status,
worker_device_status,
worker_warned,
worker_paused,
worker_placement,
worker_kick,
worker_stop,
pause_fd,
worker_parked_evt_slot,
)
}) {
Ok(h) => h,
Err(e) => {
tracing::error!(
%e,
"virtio-blk reset: worker thread spawn failed; \
leaving device without a worker"
);
return;
}
};
let WorkerEngine::Spawned(eng) = &mut self.worker.engine;
*eng = SpawnedEngine {
kick_fd,
stop_fd,
handle: Some(handle),
respawn_pending: None,
};
}
pub fn pause(&self) {
#[cfg(not(test))]
{
let WorkerEngine::Spawned(eng) = &self.worker.engine;
if eng.handle.is_none() {
tracing::debug!(
"virtio-blk pause() with no live worker; \
`paused` is already `true` from construction \
(or post-stop), rendezvous will pass vacuously"
);
return;
}
}
if let Err(e) = self.pause_evt.write(1) {
tracing::warn!(%e, "virtio-blk pause_evt.write failed");
}
}
pub fn resume(&self) -> bool {
#[cfg(not(test))]
{
let WorkerEngine::Spawned(eng) = &self.worker.engine;
if let Some(ref handle) = eng.handle {
self.paused.store(false, Ordering::Release);
handle.thread().unpark();
return true;
}
self.paused.store(true, Ordering::Release);
false
}
#[cfg(test)]
{
self.paused.store(false, Ordering::Release);
false
}
}
#[allow(dead_code)]
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::Acquire)
}
}
#[cfg(not(test))]
const STOP_FD_WRITE_MAX_RETRIES: u32 = 4;
#[cfg(not(test))]
pub(crate) fn signal_worker_stop(
stop_fd: &EventFd,
raw_fd: std::os::unix::io::RawFd,
instance_id: u64,
capacity_sectors: u64,
) {
for attempt in 0..STOP_FD_WRITE_MAX_RETRIES {
match stop_fd.write(1) {
Ok(()) => return,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
tracing::warn!(
attempt,
stop_fd = raw_fd,
instance_id,
capacity_sectors,
"virtio-blk stop_fd write returned WouldBlock; \
eventfd counter likely saturated. Yielding and retrying"
);
std::thread::yield_now();
}
Err(e) => {
tracing::error!(
attempt,
stop_fd = raw_fd,
instance_id,
capacity_sectors,
%e,
"virtio-blk stop_fd write failed with non-EAGAIN error; \
worker may not observe the stop signal — \
downstream join will surface the timeout"
);
return;
}
}
}
tracing::error!(
max_retries = STOP_FD_WRITE_MAX_RETRIES,
stop_fd = raw_fd,
instance_id,
capacity_sectors,
"virtio-blk stop_fd write exhausted retries on WouldBlock; \
worker did not consume the eventfd counter in time — \
downstream join will surface the timeout and the device \
enters the permanent-workerless state"
);
}
pub(crate) const DROP_JOIN_TIMEOUT: Duration = Duration::from_secs(1);
pub(crate) const RESET_JOIN_TIMEOUT: Duration = Duration::from_secs(1);
pub(crate) enum JoinWithTimeoutOutcome {
#[allow(dead_code)]
Joined(BlkWorkerState),
Panicked(Box<dyn std::any::Any + Send>),
TimedOut,
HelperSpawnFailed,
HelperDisconnected,
}
pub(crate) fn panic_payload_str(payload: &(dyn std::any::Any + Send)) -> &str {
if let Some(s) = payload.downcast_ref::<&'static str>() {
s
} else if let Some(s) = payload.downcast_ref::<String>() {
s.as_str()
} else {
"<non-string panic>"
}
}
pub(crate) fn join_worker_with_timeout(
handle: thread::JoinHandle<BlkWorkerState>,
timeout: Duration,
) -> JoinWithTimeoutOutcome {
let (tx, rx) = mpsc::channel();
let spawn_result = thread::Builder::new()
.name("ktstr-vblk-drop".to_string())
.spawn(move || {
let _ = tx.send(handle.join());
});
let _helper = match spawn_result {
Ok(h) => h,
Err(_) => return JoinWithTimeoutOutcome::HelperSpawnFailed,
};
match rx.recv_timeout(timeout) {
Ok(Ok(state)) => JoinWithTimeoutOutcome::Joined(state),
Ok(Err(payload)) => JoinWithTimeoutOutcome::Panicked(payload),
Err(mpsc::RecvTimeoutError::Timeout) => JoinWithTimeoutOutcome::TimedOut,
Err(mpsc::RecvTimeoutError::Disconnected) => JoinWithTimeoutOutcome::HelperDisconnected,
}
}
impl Drop for VirtioBlk {
fn drop(&mut self) {
let capacity_sectors = self.capacity_sectors;
let instance_id = self.instance_id;
match &mut self.worker.engine {
#[cfg(test)]
WorkerEngine::Inline(engine) => {
let _ = (capacity_sectors, instance_id);
if engine.state.currently_stalled {
engine.state.currently_stalled = false;
engine.state.counters.record_throttle_pending_dec();
}
}
#[cfg(not(test))]
WorkerEngine::Spawned(eng) => {
let stop_fd = eng.stop_fd.as_raw_fd();
self.paused.store(false, Ordering::Release);
if let Some(ref handle) = eng.handle {
handle.thread().unpark();
}
signal_worker_stop(&eng.stop_fd, stop_fd, instance_id, capacity_sectors);
if let Some(handle) = eng.handle.take() {
match join_worker_with_timeout(handle, DROP_JOIN_TIMEOUT) {
JoinWithTimeoutOutcome::Joined(state) => {
if state.currently_stalled {
state.counters.record_throttle_pending_dec();
}
}
JoinWithTimeoutOutcome::Panicked(payload) => {
tracing::error!(
panic = panic_payload_str(&*payload),
stop_fd,
capacity_sectors,
instance_id,
"virtio-blk worker thread panicked"
);
}
JoinWithTimeoutOutcome::TimedOut => {
tracing::warn!(
timeout_s = DROP_JOIN_TIMEOUT.as_secs_f32(),
stop_fd,
capacity_sectors,
instance_id,
"virtio-blk worker did not exit within \
DROP_JOIN_TIMEOUT of stop_fd; leaking \
the worker thread to avoid blocking the \
calling thread (likely a vCPU). Worker \
is wedged in a blocking syscall that \
does not check stop_fd. \
hint: identify the wedged device by \
stop_fd / instance_id / capacity_sectors \
above; per-device GuestMemoryMmap and \
EventFd Arcs stay live until the worker \
unblocks (see Drop's resource-retention \
doc). hint: kill -USR1 the host process \
to dump worker thread backtraces, OR \
check `dmesg` for the backing fd's \
storage path stalling on I/O."
);
}
JoinWithTimeoutOutcome::HelperSpawnFailed => {
tracing::error!(
stop_fd,
capacity_sectors,
instance_id,
"virtio-blk drop helper thread spawn \
failed; detaching worker without join"
);
}
JoinWithTimeoutOutcome::HelperDisconnected => {
tracing::error!(
stop_fd,
capacity_sectors,
instance_id,
"virtio-blk drop helper thread \
terminated without forwarding the \
worker join result"
);
}
}
}
}
}
}
}