use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use crate::device::VirtioMmioState;
use crate::irq::Irq;
pub struct BlkWorkItem {
pub head_idx: u16,
pub request_type: BlkRequestType,
pub sector: u64,
pub buffers: Vec<(u64, u32, bool)>,
pub status_gpa: u64,
pub total_data_len: u32,
pub used_addr: u64,
pub avail_addr: u64,
pub queue_size: u16,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BlkRequestType {
Read,
Write,
Flush,
GetId,
}
pub struct GuestMemWriter {
ptr: *mut u8,
len: usize,
gpa_base: usize,
}
unsafe impl Send for GuestMemWriter {}
unsafe impl Sync for GuestMemWriter {}
impl GuestMemWriter {
pub unsafe fn new(ptr: *mut u8, len: usize, gpa_base: usize) -> Self {
Self { ptr, len, gpa_base }
}
fn gpa_to_offset(&self, gpa: usize, access_len: usize) -> Option<usize> {
let off = gpa.checked_sub(self.gpa_base)?;
let end = off.checked_add(access_len)?;
if end > self.len {
return None;
}
Some(off)
}
#[allow(clippy::mut_from_ref)] pub unsafe fn slice_mut(&self, gpa: usize, len: usize) -> Option<&mut [u8]> {
let off = self.gpa_to_offset(gpa, len)?;
unsafe { Some(std::slice::from_raw_parts_mut(self.ptr.add(off), len)) }
}
pub fn slice(&self, gpa: usize, len: usize) -> Option<&[u8]> {
let off = self.gpa_to_offset(gpa, len)?;
unsafe { Some(std::slice::from_raw_parts(self.ptr.add(off), len)) }
}
pub(crate) fn read_u16(&self, gpa: usize) -> u16 {
let Some(off) = self.gpa_to_offset(gpa, 2) else {
return 0;
};
unsafe {
let p = self.ptr.add(off);
u16::from_le_bytes([*p, *p.add(1)])
}
}
pub(crate) fn write_u16(&self, gpa: usize, val: u16) {
let Some(off) = self.gpa_to_offset(gpa, 2) else {
return;
};
let bytes = val.to_le_bytes();
unsafe {
let p = self.ptr.add(off);
*p = bytes[0];
*p.add(1) = bytes[1];
}
}
pub(crate) fn write_u32(&self, gpa: usize, val: u32) {
let Some(off) = self.gpa_to_offset(gpa, 4) else {
return;
};
let bytes = val.to_le_bytes();
unsafe {
let p = self.ptr.add(off);
*p = bytes[0];
*p.add(1) = bytes[1];
*p.add(2) = bytes[2];
*p.add(3) = bytes[3];
}
}
fn write_byte(&self, gpa: usize, val: u8) {
let Some(off) = self.gpa_to_offset(gpa, 1) else {
return;
};
unsafe { *self.ptr.add(off) = val };
}
}
pub struct BlkWorkerContext {
pub guest_mem: GuestMemWriter,
pub raw_fd: i32,
pub blk_size: u32,
pub read_only: bool,
pub device_id: String,
pub mmio_state: Arc<RwLock<VirtioMmioState>>,
pub irq_callback: Arc<dyn Fn(Irq, bool) -> crate::error::Result<()> + Send + Sync>,
pub irq: Irq,
pub running: Arc<AtomicBool>,
pub flush_barrier: Arc<FlushBarrier>,
}
unsafe impl Send for BlkWorkerContext {}
pub fn blk_io_worker_loop(ctx: BlkWorkerContext, rx: std::sync::mpsc::Receiver<BlkWorkItem>) {
tracing::info!(
"blk-io-worker started (fd={}, blk_size={})",
ctx.raw_fd,
ctx.blk_size
);
while let Ok(first) = rx.recv() {
if !ctx.running.load(Ordering::Relaxed) {
break;
}
let used_addr = first.used_addr;
let avail_addr = first.avail_addr;
let queue_size = first.queue_size;
let old_used = read_used_idx(&ctx.guest_mem, used_addr);
let mut batch = Vec::with_capacity(32);
batch.push(first);
while batch.len() < 32 {
match rx.try_recv() {
Ok(item) => batch.push(item),
Err(_) => break,
}
}
process_batch(&ctx, &mut batch);
let new_used = read_used_idx(&ctx.guest_mem, used_addr);
if should_notify(&ctx.guest_mem, avail_addr, queue_size, old_used, new_used) {
trigger_irq(&ctx);
}
if batch.len() > 1 {
tracing::trace!("blk-io-worker: batch of {} items", batch.len());
}
}
tracing::info!("blk-io-worker exiting");
}
fn process_batch(ctx: &BlkWorkerContext, batch: &mut [BlkWorkItem]) {
let mut start = 0;
while start < batch.len() {
let mut seg_end = start;
while seg_end < batch.len()
&& matches!(
batch[seg_end].request_type,
BlkRequestType::Read | BlkRequestType::Write
)
{
seg_end += 1;
}
if seg_end - start > 1 {
batch[start..seg_end].sort_unstable_by(|a, b| {
let type_ord = |t: &BlkRequestType| match t {
BlkRequestType::Read => 0u8,
BlkRequestType::Write => 1,
_ => 2,
};
type_ord(&a.request_type)
.cmp(&type_ord(&b.request_type))
.then(a.sector.cmp(&b.sector))
});
}
let mut i = start;
while i < seg_end {
let item = &batch[i];
let mut end = i + 1;
while end < seg_end
&& batch[end].request_type == item.request_type
&& batch[end].sector
== batch[end - 1].sector
+ u64::from(batch[end - 1].total_data_len) / u64::from(ctx.blk_size)
{
end += 1;
}
if end == i + 1 {
process_item(ctx, item);
} else {
process_merged(ctx, &batch[i..end]);
}
i = end;
}
start = seg_end;
while start < batch.len()
&& !matches!(
batch[start].request_type,
BlkRequestType::Read | BlkRequestType::Write
)
{
process_item(ctx, &batch[start]);
start += 1;
}
}
}
fn process_item(ctx: &BlkWorkerContext, item: &BlkWorkItem) {
let is_io = matches!(
item.request_type,
BlkRequestType::Read | BlkRequestType::Write
);
if is_io {
ctx.flush_barrier.in_flight.fetch_add(1, Ordering::Relaxed);
}
let status = match item.request_type {
BlkRequestType::Read => process_read(ctx, item),
BlkRequestType::Write => process_write(ctx, item),
BlkRequestType::Flush => {
while ctx.flush_barrier.in_flight.load(Ordering::Acquire) > 0 {
std::hint::spin_loop();
}
process_flush(ctx)
}
BlkRequestType::GetId => process_get_id(ctx, item),
};
if is_io {
ctx.flush_barrier.in_flight.fetch_sub(1, Ordering::Release);
}
ctx.guest_mem.write_byte(item.status_gpa as usize, status);
let total_bytes = if status == 0 {
item.total_data_len + 1 } else {
1 };
write_used_entry(
&ctx.guest_mem,
item.used_addr,
item.queue_size,
item.head_idx,
total_bytes,
);
}
fn process_read(ctx: &BlkWorkerContext, item: &BlkWorkItem) -> u8 {
let mut iovecs: Vec<libc::iovec> = Vec::new();
for &(gpa, len, is_write) in &item.buffers {
if !is_write || len <= 1 {
continue;
}
let buf = unsafe { ctx.guest_mem.slice_mut(gpa as usize, len as usize) };
let Some(buf) = buf else {
tracing::warn!("blk read: GPA {:#x} len {} out of bounds", gpa, len);
return 1;
};
iovecs.push(libc::iovec {
iov_base: buf.as_mut_ptr().cast(),
iov_len: buf.len(),
});
}
if iovecs.is_empty() {
return 0;
}
#[allow(clippy::cast_possible_wrap)]
let offset = (item.sector * u64::from(ctx.blk_size)) as libc::off_t;
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let n = unsafe { libc::preadv(ctx.raw_fd, iovecs.as_ptr(), iovecs.len() as i32, offset) };
if n < 0 {
tracing::warn!(
"blk preadv failed at sector {}: {}",
item.sector,
std::io::Error::last_os_error()
);
return 1;
}
0
}
fn process_write(ctx: &BlkWorkerContext, item: &BlkWorkItem) -> u8 {
if ctx.read_only {
return 1;
}
let mut iovecs: Vec<libc::iovec> = Vec::new();
for &(gpa, len, is_write) in &item.buffers {
if is_write {
continue;
}
let Some(buf) = ctx.guest_mem.slice(gpa as usize, len as usize) else {
tracing::warn!("blk write: GPA {:#x} len {} out of bounds", gpa, len);
return 1;
};
iovecs.push(libc::iovec {
iov_base: buf.as_ptr().cast_mut().cast(),
iov_len: buf.len(),
});
}
if iovecs.is_empty() {
return 0;
}
#[allow(clippy::cast_possible_wrap)]
let offset = (item.sector * u64::from(ctx.blk_size)) as libc::off_t;
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let n = unsafe { libc::pwritev(ctx.raw_fd, iovecs.as_ptr(), iovecs.len() as i32, offset) };
if n < 0 {
tracing::warn!(
"blk pwritev failed at sector {}: {}",
item.sector,
std::io::Error::last_os_error()
);
return 1;
}
0
}
fn process_merged(ctx: &BlkWorkerContext, items: &[BlkWorkItem]) {
let is_read = items[0].request_type == BlkRequestType::Read;
let start_sector = items[0].sector;
ctx.flush_barrier
.in_flight
.fetch_add(items.len() as u32, Ordering::Relaxed);
let mut iovecs: Vec<libc::iovec> = Vec::new();
for item in items {
for &(gpa, len, is_write_flag) in &item.buffers {
let want = if is_read {
is_write_flag
} else {
!is_write_flag
};
if !want || len <= 1 {
continue;
}
if is_read {
if let Some(buf) = unsafe { ctx.guest_mem.slice_mut(gpa as usize, len as usize) } {
iovecs.push(libc::iovec {
iov_base: buf.as_mut_ptr().cast(),
iov_len: buf.len(),
});
}
} else if let Some(buf) = ctx.guest_mem.slice(gpa as usize, len as usize) {
iovecs.push(libc::iovec {
iov_base: buf.as_ptr().cast_mut().cast(),
iov_len: buf.len(),
});
}
}
}
#[allow(clippy::cast_possible_wrap)]
let offset = (start_sector * u64::from(ctx.blk_size)) as libc::off_t;
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let n = if is_read {
unsafe { libc::preadv(ctx.raw_fd, iovecs.as_ptr(), iovecs.len() as i32, offset) }
} else {
unsafe { libc::pwritev(ctx.raw_fd, iovecs.as_ptr(), iovecs.len() as i32, offset) }
};
let status: u8 = if n < 0 {
tracing::warn!(
"blk merged {} failed at sector {}: {}",
if is_read { "preadv" } else { "pwritev" },
start_sector,
std::io::Error::last_os_error()
);
1
} else {
0
};
for item in items {
ctx.guest_mem.write_byte(item.status_gpa as usize, status);
let total_bytes = if status == 0 {
item.total_data_len + 1
} else {
1
};
write_used_entry(
&ctx.guest_mem,
item.used_addr,
item.queue_size,
item.head_idx,
total_bytes,
);
}
ctx.flush_barrier
.in_flight
.fetch_sub(items.len() as u32, Ordering::Release);
tracing::trace!(
"blk merged {} items: sector {}..+{}, {} iovecs, {} bytes",
items.len(),
start_sector,
items.last().map_or(0, |i| i.sector
+ u64::from(i.total_data_len) / u64::from(ctx.blk_size))
- start_sector,
iovecs.len(),
n,
);
}
fn process_flush(ctx: &BlkWorkerContext) -> u8 {
let ret = unsafe { libc::fsync(ctx.raw_fd) };
if ret < 0 {
tracing::warn!("blk fsync failed: {}", std::io::Error::last_os_error());
1
} else {
0
}
}
fn process_get_id(ctx: &BlkWorkerContext, item: &BlkWorkItem) -> u8 {
let id_bytes = ctx.device_id.as_bytes();
for &(gpa, len, is_write) in &item.buffers {
if !is_write || len <= 1 {
continue;
}
let buf = unsafe { ctx.guest_mem.slice_mut(gpa as usize, len as usize) };
if let Some(buf) = buf {
let copy_len = id_bytes.len().min(buf.len());
buf[..copy_len].copy_from_slice(&id_bytes[..copy_len]);
}
break;
}
0
}
use crate::virtqueue_util::{read_used_idx, should_notify, write_used_entry};
fn trigger_irq(ctx: &BlkWorkerContext) {
if let Ok(mut s) = ctx.mmio_state.write() {
s.trigger_interrupt(1); }
let _ = (ctx.irq_callback)(ctx.irq, true);
}
pub struct BlkQueueWorker {
pub tx: std::sync::mpsc::Sender<BlkWorkItem>,
pub last_avail_idx: AtomicU16,
}
pub struct BlkWorkerHandle {
pub queues: Vec<BlkQueueWorker>,
}
impl BlkWorkerHandle {
pub fn get_queue(&self, queue_idx: u16) -> Option<&BlkQueueWorker> {
self.queues.get(queue_idx as usize)
}
pub fn dispatch(
&self,
memory: &mut [u8],
qcfg: &arcbox_virtio::QueueConfig,
queue_idx: u16,
) -> bool {
let Some(worker) = self.get_queue(queue_idx) else {
return false;
};
if !qcfg.ready || qcfg.size == 0 {
return false;
}
let gpa_base = qcfg.gpa_base as usize;
let Some(desc_addr) = (qcfg.desc_addr as usize).checked_sub(gpa_base) else {
tracing::warn!(
"blk dispatch: desc GPA {:#x} below ram base {:#x}",
qcfg.desc_addr,
gpa_base
);
return false;
};
let Some(avail_addr) = (qcfg.avail_addr as usize).checked_sub(gpa_base) else {
tracing::warn!(
"blk dispatch: avail GPA {:#x} below ram base {:#x}",
qcfg.avail_addr,
gpa_base
);
return false;
};
let Some(used_addr) = (qcfg.used_addr as usize).checked_sub(gpa_base) else {
tracing::warn!(
"blk dispatch: used GPA {:#x} below ram base {:#x}",
qcfg.used_addr,
gpa_base
);
return false;
};
let q_size = qcfg.size as usize;
if avail_addr + 4 > memory.len() {
return false;
}
let avail_idx = u16::from_le_bytes([memory[avail_addr + 2], memory[avail_addr + 3]]);
let last_avail = worker.last_avail_idx.load(Ordering::Relaxed);
let mut current = last_avail;
let mut dispatched = false;
while current != avail_idx {
let ring_off = avail_addr + 4 + 2 * ((current as usize) % q_size);
if ring_off + 2 > memory.len() {
break;
}
let head_idx = u16::from_le_bytes([memory[ring_off], memory[ring_off + 1]]);
let mut buffers = Vec::new();
let mut status_gpa: u64 = 0;
let mut total_data_len: u32 = 0;
let mut request_type = BlkRequestType::Read;
let mut sector: u64 = 0;
let mut first_desc = true;
let mut idx = head_idx as usize;
loop {
let d_off = desc_addr + idx * 16;
if d_off + 16 > memory.len() {
break;
}
let addr = u64::from_le_bytes(memory[d_off..d_off + 8].try_into().unwrap());
let len = u32::from_le_bytes(memory[d_off + 8..d_off + 12].try_into().unwrap());
let flags = u16::from_le_bytes(memory[d_off + 12..d_off + 14].try_into().unwrap());
let next = u16::from_le_bytes(memory[d_off + 14..d_off + 16].try_into().unwrap());
let is_write = flags & 2 != 0;
if first_desc {
first_desc = false;
if len >= 16 {
let Some(hdr_off) = (addr as usize).checked_sub(gpa_base) else {
break;
};
if hdr_off + 16 <= memory.len() {
let req_type = u32::from_le_bytes(
memory[hdr_off..hdr_off + 4].try_into().unwrap(),
);
sector = u64::from_le_bytes(
memory[hdr_off + 8..hdr_off + 16].try_into().unwrap(),
);
request_type = match req_type {
0 => BlkRequestType::Read,
1 => BlkRequestType::Write,
4 => BlkRequestType::Flush,
8 => BlkRequestType::GetId,
_ => BlkRequestType::Read,
};
}
}
} else {
buffers.push((addr, len, is_write));
if is_write && len > 0 {
status_gpa = addr + u64::from(len) - 1;
}
if len > 1 {
total_data_len += len;
}
}
if flags & 1 == 0 {
break; }
idx = next as usize;
if idx >= q_size {
break;
}
}
let item = BlkWorkItem {
head_idx,
request_type,
sector,
buffers,
status_gpa,
total_data_len,
used_addr: qcfg.used_addr,
avail_addr: qcfg.avail_addr,
queue_size: qcfg.size,
};
if worker.tx.send(item).is_err() {
tracing::warn!("blk worker channel closed, falling back to sync");
break;
}
dispatched = true;
current = current.wrapping_add(1);
}
worker.last_avail_idx.store(current, Ordering::Relaxed);
if dispatched {
let avail_event_off = used_addr + 4 + q_size * 8;
if avail_event_off + 2 <= memory.len() {
memory[avail_event_off..avail_event_off + 2]
.copy_from_slice(¤t.to_le_bytes());
}
}
dispatched
}
}
pub struct FlushBarrier {
pub in_flight: AtomicU32,
}
impl Default for FlushBarrier {
fn default() -> Self {
Self {
in_flight: AtomicU32::new(0),
}
}
}
impl FlushBarrier {
pub fn new() -> Self {
Self::default()
}
}