use crate::properties;
use crate::properties::{properties, PropertiesList, Property};
use crate::wait::{wait_for_completion_fd, TimeoutUpdater};
use crate::{
Blkioq, Completion, CompletionBacklog, Driver, Error, MemoryRegion, Queue, ReqFlags, Request,
RequestBacklog, RequestTypeArgs, Result, State,
};
use const_cstr::const_cstr;
use io_uring::opcode::{Fallocate64, Fsync, Read, Readv, Write, Writev};
use io_uring::types::{Fd, Fixed, FsyncFlags, SubmitArgs, Timespec};
use libc::{
c_int, c_void, dev_t, iovec, sigset_t, ENOTSUP, FALLOC_FL_KEEP_SIZE, FALLOC_FL_PUNCH_HOLE,
FALLOC_FL_ZERO_RANGE, O_DIRECT, O_RDWR, O_WRONLY, RWF_DSYNC, RWF_HIPRI,
};
use nix::errno::Errno;
use nix::fcntl::{fcntl, FcntlArg};
use nix::sys::eventfd::{eventfd, EfdFlags};
use nix::sys::stat::{major, minor};
use nix::sys::statfs::fstatfs;
use nix::unistd::{close, lseek64, sysconf, SysconfVar, Whence};
use std::cmp;
use std::convert::{TryFrom, TryInto};
use std::fs::{self, File, OpenOptions};
use std::io::{self, ErrorKind};
use std::iter;
use std::num::ParseIntError;
use std::os::linux::fs::MetadataExt;
use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::path::{Path, PathBuf};
use std::ptr;
use std::ptr::null_mut;
use std::str::FromStr;
use std::time::Duration;
const NUM_ENTRIES_DEFAULT: i32 = 128;
const IORING_ENTER_GETEVENTS: u32 = 1;
fn sysfs_attr_read<P, T>(path: P) -> io::Result<T>
where
P: AsRef<Path>,
T: FromStr<Err = ParseIntError>,
{
let contents = fs::read(path)?;
String::from_utf8_lossy(&contents)
.trim()
.parse()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
}
#[derive(Copy, Clone, Debug)]
struct LinuxBlockLimits {
logical_block_size: u32,
physical_block_size: u32,
optimal_io_size: u32,
write_zeroes_max_bytes: u64,
discard_alignment: u32,
discard_alignment_offset: u32,
supports_fua_natively: bool,
}
impl LinuxBlockLimits {
fn from_device_number(device_number: dev_t) -> io::Result<LinuxBlockLimits> {
let dev_dir = PathBuf::from(format!(
"/sys/dev/block/{}:{}",
major(device_number),
minor(device_number)
));
let is_partition = match dev_dir.join("partition").metadata() {
Ok(_) => true,
Err(e) if e.kind() == ErrorKind::NotFound => false,
Err(e) => return Err(e),
};
let queue_dir = dev_dir.join(if is_partition { "../queue" } else { "queue" });
let dev_attr = |name| sysfs_attr_read(dev_dir.join(name));
let queue_attr_u32 = |name| -> io::Result<u32> { sysfs_attr_read(queue_dir.join(name)) };
let queue_attr_u64 = |name| -> io::Result<u64> { sysfs_attr_read(queue_dir.join(name)) };
let discard_alignment = queue_attr_u32("discard_granularity")?;
let discard_alignment_offset = {
let value = dev_attr("discard_alignment")?;
if value == discard_alignment {
0
} else {
value
}
};
let supports_fua_natively = queue_attr_u32("fua")? != 0;
Ok(LinuxBlockLimits {
logical_block_size: queue_attr_u32("logical_block_size")?,
physical_block_size: queue_attr_u32("physical_block_size")?,
optimal_io_size: queue_attr_u32("optimal_io_size")?,
write_zeroes_max_bytes: queue_attr_u64("write_zeroes_max_bytes")?,
discard_alignment,
discard_alignment_offset,
supports_fua_natively,
})
}
}
#[derive(Copy, Clone, Debug)]
struct TargetInfo {
is_block_device: bool,
direct: bool,
read_only: bool,
request_alignment: i32,
optimal_io_alignment: i32,
optimal_io_size: i32,
supports_write_zeroes_without_fallback: bool,
discard_alignment: i32,
discard_alignment_offset: i32,
supports_fua_natively: bool,
}
impl TargetInfo {
fn from_file(file: &File) -> io::Result<TargetInfo> {
let meta = file.metadata()?;
let file_status_flags = fcntl(file.as_raw_fd(), FcntlArg::F_GETFL)?;
let direct = file_status_flags & O_DIRECT != 0;
let read_only = file_status_flags & (O_WRONLY | O_RDWR) == 0;
if meta.file_type().is_block_device() {
let limits = LinuxBlockLimits::from_device_number(meta.st_rdev())?;
let request_alignment = if direct {
limits.logical_block_size as i32
} else {
1
};
Ok(TargetInfo {
is_block_device: true,
direct,
read_only,
request_alignment,
optimal_io_alignment: limits.physical_block_size as i32,
optimal_io_size: limits.optimal_io_size as i32,
supports_write_zeroes_without_fallback: limits.write_zeroes_max_bytes > 0,
discard_alignment: limits.discard_alignment as i32,
discard_alignment_offset: limits.discard_alignment_offset as i32,
supports_fua_natively: limits.supports_fua_natively,
})
} else {
let block_limits = LinuxBlockLimits::from_device_number(meta.st_dev()).ok();
let request_alignment = if direct {
match block_limits {
Some(limits) => limits.logical_block_size as i32,
None => sysconf(SysconfVar::PAGE_SIZE)?.unwrap() as i32,
}
} else {
1
};
let supports_fua_natively = match block_limits {
Some(limits) => limits.supports_fua_natively,
None => false,
};
let file_system_block_size = fstatfs(file)?.block_size() as i32;
Ok(TargetInfo {
is_block_device: false,
direct,
read_only,
request_alignment,
optimal_io_alignment: cmp::max(file_system_block_size, request_alignment),
optimal_io_size: 0,
supports_write_zeroes_without_fallback: true,
discard_alignment: request_alignment,
discard_alignment_offset: 0,
supports_fua_natively,
})
}
}
}
#[derive(Clone, Copy)]
struct ReqContext {
expected_ret: usize,
user_data: usize,
}
struct Requests {
all_req_slots: Vec<ReqContext>,
free_req_slots: Vec<usize>,
}
impl Requests {
fn new(capacity: usize) -> Self {
Self {
all_req_slots: Vec::with_capacity(capacity),
free_req_slots: Vec::with_capacity(capacity),
}
}
fn len(&self) -> usize {
self.all_req_slots.len() - self.free_req_slots.len()
}
fn insert(&mut self, request: ReqContext) -> u64 {
if let Some(req_id) = self.free_req_slots.pop() {
self.all_req_slots[req_id] = request;
req_id as u64
} else {
let req_id = self.all_req_slots.len();
self.all_req_slots.push(request);
req_id as u64
}
}
fn remove(&mut self, req_id: u64) -> ReqContext {
let req_id = usize::try_from(req_id).expect("Request ID must fit into a usize");
self.free_req_slots.push(req_id);
*self
.all_req_slots
.get(req_id)
.expect("All in-flight requests are tracked")
}
}
struct IoUringQueue {
target_info: TargetInfo,
ring: io_uring::IoUring,
iovecs: Box<[iovec]>,
iovecs_used: usize,
supports_read: bool,
supports_write: bool,
supports_fallocate: bool,
eventfd: Option<RawFd>,
requests: Requests,
}
impl IoUringQueue {
pub fn new(poll: bool, num_entries: u32, fd: RawFd, target_info: &TargetInfo) -> Result<Self> {
let mut builder = io_uring::IoUring::builder();
if poll {
builder.setup_iopoll();
}
let ring = builder
.build(num_entries)
.map_err(|e| Error::from_io_error(e, Errno::ENOMEM))?;
let mut supports_read = false;
let mut supports_write = false;
let mut supports_fallocate = false;
let mut probe = io_uring::Probe::new();
if ring.submitter().register_probe(&mut probe).is_ok() {
supports_read = probe.is_supported(Read::CODE);
supports_write = probe.is_supported(Write::CODE);
supports_fallocate = probe.is_supported(Fallocate64::CODE);
}
ring.submitter()
.register_files(&[fd])
.map_err(|e| Error::from_io_error(e, Errno::ENOTSUP))?;
let eventfd = if poll {
None
} else {
Some(eventfd(0, EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)?)
};
let zero_iovec = iovec {
iov_base: null_mut(),
iov_len: 0,
};
let requests_capacity = ring.params().sq_entries() + ring.params().cq_entries();
let queue = IoUringQueue {
target_info: *target_info,
ring,
iovecs: vec![zero_iovec; num_entries.try_into().unwrap()].into_boxed_slice(),
iovecs_used: 0,
supports_read,
supports_write,
supports_fallocate,
eventfd,
requests: Requests::new(requests_capacity as usize),
};
if let Some(eventfd) = eventfd {
queue
.ring
.submitter()
.register_eventfd(eventfd)
.map_err(|e| Error::from_io_error(e, Errno::ENOTSUP))?;
}
Ok(queue)
}
fn try_readv_for_read(
&mut self,
completion_backlog: &mut CompletionBacklog,
start: u64,
buf: *mut u8,
len: usize,
user_data: usize,
flags: ReqFlags,
) -> bool {
if self.iovecs_used == self.iovecs.len() {
return false;
}
self.iovecs[self.iovecs_used] = iovec {
iov_base: buf as *mut c_void,
iov_len: len,
};
let readv_req = Request {
args: RequestTypeArgs::Readv {
start,
iovec: &self.iovecs[self.iovecs_used] as *const iovec,
iovcnt: 1,
},
user_data,
flags,
};
if !self.try_enqueue(completion_backlog, &readv_req) {
return false;
}
self.iovecs_used += 1;
true
}
fn try_writev_for_write(
&mut self,
completion_backlog: &mut CompletionBacklog,
start: u64,
buf: *const u8,
len: usize,
user_data: usize,
flags: ReqFlags,
) -> bool {
if self.iovecs_used == self.iovecs.len() {
return false;
}
self.iovecs[self.iovecs_used] = iovec {
iov_base: buf as *mut c_void,
iov_len: len,
};
let writev_req = Request {
args: RequestTypeArgs::Writev {
start,
iovec: &self.iovecs[self.iovecs_used] as *const iovec,
iovcnt: 1,
},
user_data,
flags,
};
if !self.try_enqueue(completion_backlog, &writev_req) {
return false;
}
self.iovecs_used += 1;
true
}
fn enter_with_ext_arg_timeout(
&mut self,
min_complete_hint: usize,
timeout: Duration,
sig: Option<&sigset_t>,
) -> Result<usize> {
let ts = Timespec::new()
.sec(timeout.as_secs())
.nsec(timeout.subsec_nanos());
let mut submit_args = SubmitArgs::new().timespec(&ts);
if let Some(s) = sig {
submit_args = submit_args.sigmask(s);
}
self.ring
.submitter()
.submit_with_args(min_complete_hint, &submit_args)
.map_err(|e| Error::from_io_error(e, Errno::EINVAL))
}
fn enter_with_ppoll_timeout(
&mut self,
min_complete_hint: usize,
timeout: Duration,
sig: Option<&sigset_t>,
eventfd: RawFd,
) -> Result<usize> {
let n = self
.ring
.submit()
.map_err(|e| Error::from_io_error(e, Errno::EINVAL))?;
if min_complete_hint > 0 {
wait_for_completion_fd(eventfd, Some(timeout), sig)?;
}
Ok(n)
}
fn enter_with_timeout(
&mut self,
min_complete_hint: usize,
timeout: Duration,
sig: Option<&sigset_t>,
) -> Result<usize> {
if self.ring.params().is_feature_ext_arg() {
self.enter_with_ext_arg_timeout(min_complete_hint, timeout, sig)
} else if let Some(eventfd) = self.eventfd {
self.enter_with_ppoll_timeout(min_complete_hint, timeout, sig, eventfd)
} else {
Err(Error::new(
Errno::ENOTSUP,
"driver \"io_uring\" only supports calling blkioq_do_io() on a poll queue with a timeout since mainline Linux kernel 5.11",
))
}
}
}
impl Drop for IoUringQueue {
fn drop(&mut self) {
if let Some(eventfd) = self.eventfd {
let _ = close(eventfd);
}
}
}
fn supports_iopoll(file: &File, target_info: &TargetInfo) -> Result<bool> {
if !target_info.direct {
return Ok(false);
};
let mut ring = io_uring::IoUring::builder()
.setup_iopoll()
.build(1)
.map_err(|e| Error::from_io_error(e, Errno::ENOMEM))?;
let iovec = iovec {
iov_base: ptr::null_mut(),
iov_len: 0,
};
let entry = Readv::new(Fd(file.as_raw_fd()), &iovec, 1).build();
unsafe { ring.submission().push(&entry).unwrap() };
ring.submit_and_wait(1)
.map_err(|e| Error::from_io_error(e, Errno::EINVAL))?;
let cqe = ring
.completion()
.next()
.ok_or_else(|| Error::new(Errno::EINVAL, "Failed to check poll queue support"))?;
if cqe.result() == 0 {
Ok(true)
} else if cqe.result() == -ENOTSUP {
Ok(false)
} else {
Err(Error::new(
Errno::EINVAL,
"Failed to check poll queue support",
))
}
}
fn iovec_total_bytes(iovec: *const libc::iovec, iovcnt: u32) -> usize {
let io_vectors = unsafe { std::slice::from_raw_parts(iovec, iovcnt as usize) };
io_vectors.iter().map(|iov| iov.iov_len).sum()
}
impl Queue for IoUringQueue {
fn is_poll_queue(&self) -> bool {
self.eventfd.is_none()
}
fn get_completion_fd(&self) -> Option<RawFd> {
self.eventfd
}
fn set_completion_fd_enabled(&mut self, _enabled: bool) {
}
fn try_enqueue(&mut self, completion_backlog: &mut CompletionBacklog, req: &Request) -> bool {
if self.requests.len() == self.ring.params().cq_entries() as usize {
return false; }
let poll_rw_flags = if self.is_poll_queue() { RWF_HIPRI } else { 0 };
let request_id;
let entry = match *req {
Request {
args: RequestTypeArgs::Read { start, buf, len },
user_data,
flags,
} => {
if len > u32::MAX as usize {
completion_backlog.push(Completion::for_failed_req(
req,
Errno::EINVAL,
const_cstr!("len must fit in an unsigned 32-bit integer"),
));
return true;
}
if !self.supports_read {
return self.try_readv_for_read(
completion_backlog,
start,
buf,
len,
user_data,
flags,
);
}
request_id = self.requests.insert(ReqContext {
expected_ret: len,
user_data,
});
Read::new(Fixed(0), buf, len as u32)
.offset64(start as i64)
.rw_flags(poll_rw_flags)
.build()
.user_data(request_id)
}
Request {
args: RequestTypeArgs::Write { start, buf, len },
user_data,
flags,
} => {
if len > u32::MAX as usize {
completion_backlog.push(Completion::for_failed_req(
req,
Errno::EINVAL,
const_cstr!("len must fit in an unsigned 32-bit integer"),
));
return true;
}
if !self.supports_write {
return self.try_writev_for_write(
completion_backlog,
start,
buf,
len,
user_data,
flags,
);
}
let rw_flags = if flags.contains(ReqFlags::FUA) {
RWF_DSYNC
} else {
0
};
request_id = self.requests.insert(ReqContext {
expected_ret: len,
user_data,
});
Write::new(Fixed(0), buf, len as u32)
.offset64(start as i64)
.rw_flags(rw_flags | poll_rw_flags)
.build()
.user_data(request_id)
}
Request {
args:
RequestTypeArgs::Readv {
start,
iovec,
iovcnt,
},
user_data,
..
} => {
let len = iovec_total_bytes(iovec, iovcnt);
request_id = self.requests.insert(ReqContext {
expected_ret: len,
user_data,
});
Readv::new(Fixed(0), iovec, iovcnt)
.offset64(start as i64)
.rw_flags(poll_rw_flags)
.build()
.user_data(request_id)
}
Request {
args:
RequestTypeArgs::Writev {
start,
iovec,
iovcnt,
},
user_data,
flags,
} => {
let rw_flags = if flags.contains(ReqFlags::FUA) {
RWF_DSYNC
} else {
0
};
let len = iovec_total_bytes(iovec, iovcnt);
request_id = self.requests.insert(ReqContext {
expected_ret: len,
user_data,
});
Writev::new(Fixed(0), iovec, iovcnt)
.offset64(start as i64)
.rw_flags(rw_flags | poll_rw_flags)
.build()
.user_data(request_id)
}
Request {
args: RequestTypeArgs::Flush,
user_data,
..
} => {
request_id = self.requests.insert(ReqContext {
expected_ret: 0,
user_data,
});
Fsync::new(Fixed(0))
.flags(FsyncFlags::DATASYNC)
.build()
.user_data(request_id)
}
Request {
args: RequestTypeArgs::WriteZeroes { start, len },
user_data,
flags,
} => {
if !self.supports_fallocate {
completion_backlog.push(Completion::for_failed_req(
req,
Errno::ENOTSUP,
const_cstr!("the kernel does not support IORING_OP_FALLOCATE"),
));
return true;
}
#[allow(clippy::collapsible_else_if)]
let mode = if self.target_info.is_block_device {
if self.target_info.supports_write_zeroes_without_fallback {
if flags.contains(ReqFlags::NO_UNMAP) {
FALLOC_FL_ZERO_RANGE | FALLOC_FL_KEEP_SIZE
} else {
FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE
}
} else {
if flags.contains(ReqFlags::NO_FALLBACK) {
completion_backlog.push(Completion::for_failed_req(
req,
Errno::ENOTSUP,
const_cstr!("the block device does not support write zeroes with BLKIO_REQ_NO_FALLBACK"),
));
return true;
}
FALLOC_FL_ZERO_RANGE | FALLOC_FL_KEEP_SIZE
}
} else {
if flags.contains(ReqFlags::NO_UNMAP) {
FALLOC_FL_ZERO_RANGE
} else {
FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE
}
};
request_id = self.requests.insert(ReqContext {
expected_ret: 0,
user_data,
});
Fallocate64::new(Fixed(0), len as i64)
.offset64(start as i64)
.mode(mode)
.build()
.user_data(request_id)
}
Request {
args: RequestTypeArgs::Discard { start, len },
user_data,
..
} => {
if !self.supports_fallocate {
completion_backlog.push(Completion::for_failed_req(
req,
Errno::ENOTSUP,
const_cstr!("the kernel does not support IORING_OP_FALLOCATE"),
));
return true;
}
const FALLOC_FL_NO_HIDE_STALE: c_int = 0x04;
request_id = self.requests.insert(ReqContext {
expected_ret: 0,
user_data,
});
Fallocate64::new(Fixed(0), len as i64)
.offset64(start as i64)
.mode(FALLOC_FL_PUNCH_HOLE | FALLOC_FL_NO_HIDE_STALE | FALLOC_FL_KEEP_SIZE)
.build()
.user_data(request_id)
}
};
let result = unsafe { self.ring.submission().push(&entry) };
if result.is_err() {
let _ = self.requests.remove(request_id);
}
result.is_ok()
}
fn do_io(
&mut self,
request_backlog: &mut RequestBacklog,
completion_backlog: &mut CompletionBacklog,
completions: &mut [std::mem::MaybeUninit<Completion>],
min_completions: usize,
mut timeout_updater: Option<&mut TimeoutUpdater>,
sig: Option<&sigset_t>,
) -> Result<usize> {
let mut filled_completions = completion_backlog.fill_completions(completions);
fn drain_cqueue(
ring: &mut io_uring::IoUring,
completions: &mut [std::mem::MaybeUninit<Completion>],
requests: &mut Requests,
) -> usize {
let mut cqueue = ring.completion();
let mut i = 0;
while i < completions.len() {
if let Some(cqe) = cqueue.next() {
let ReqContext {
expected_ret,
user_data,
} = requests.remove(cqe.user_data());
let ret = if cqe.result() < 0 {
cqe.result()
} else if expected_ret == cqe.result() as usize {
0
} else {
-libc::EIO
};
let c = Completion {
user_data,
ret,
error_msg: ptr::null(),
reserved_: [0; 12],
};
unsafe { completions[i].as_mut_ptr().write(c) };
i += 1;
} else {
break;
}
}
i
}
let n = drain_cqueue(
&mut self.ring,
&mut completions[filled_completions..],
&mut self.requests,
);
filled_completions += n;
if n > 0 {
request_backlog.process(self, completion_backlog);
}
if min_completions > filled_completions + self.requests.len() + request_backlog.len() {
completion_backlog.unfill_completions(completions, filled_completions);
return Err(Error::new(
Errno::EINVAL,
"min_completions is larger than total outstanding requests",
));
}
let mut to_submit = self.ring.submission().len();
let mut must_enter_once = self.is_poll_queue()
&& filled_completions == 0
&& min_completions == 0
&& to_submit == 0;
while filled_completions < min_completions || to_submit > 0 || must_enter_once {
let min_complete_hint = if filled_completions < min_completions {
std::cmp::min(min_completions - filled_completions, self.requests.len())
} else {
0
};
let result = if let Some(timeout) = timeout_updater.as_mut().map(|t| t.next()) {
self.enter_with_timeout(min_complete_hint, timeout, sig)
} else {
let flags = if min_complete_hint > 0 || self.is_poll_queue() {
IORING_ENTER_GETEVENTS
} else {
0
};
unsafe {
self.ring
.submitter()
.enter(to_submit as u32, min_complete_hint as u32, flags, sig)
.map_err(|e| Error::from_io_error(e, Errno::EINVAL))
}
};
let num_submitted = match result {
Ok(n) => n,
Err(_) => to_submit - self.ring.submission().len(),
};
if let Err(err) = result {
completion_backlog.unfill_completions(completions, filled_completions);
return Err(err);
}
let n = drain_cqueue(
&mut self.ring,
&mut completions[filled_completions..],
&mut self.requests,
);
filled_completions += n;
assert!(num_submitted == to_submit);
self.iovecs_used = 0;
if num_submitted > 0 || n > 0 {
request_backlog.process(self, completion_backlog);
}
to_submit = self.ring.submission().len();
must_enter_once = false;
}
Ok(filled_completions)
}
}
properties! {
IOURING_PROPS: PropertyState for IoUring.props {
fn buf_alignment: i32,
fn capacity: u64,
mut direct: bool,
fn discard_alignment: i32,
fn discard_alignment_offset: i32,
driver: str,
mut fd: i32,
fn max_discard_len: u64,
max_queues: i32,
max_mem_regions: u64,
fn max_segment_len: i32,
fn max_segments: i32,
fn max_transfer: i32,
fn max_write_zeroes_len: u64,
fn mem_region_alignment: u64,
needs_mem_regions: bool,
needs_mem_region_fd: bool,
mut num_entries: i32,
mut num_queues: i32,
mut num_poll_queues: i32,
fn optimal_io_alignment: i32,
fn optimal_io_size: i32,
fn optimal_buf_alignment: i32,
mut path: str,
mut read_only: bool,
fn request_alignment: i32,
supports_fua_natively: bool,
supports_poll_queues: bool
}
}
pub struct IoUring {
props: PropertyState,
file: Option<File>,
target_info: Option<TargetInfo>,
queues: Vec<Blkioq>,
poll_queues: Vec<Blkioq>,
state: State,
}
impl IoUring {
pub fn new() -> Self {
IoUring {
props: PropertyState {
direct: false,
driver: "io_uring".to_string(),
fd: -1,
max_queues: i32::MAX,
max_mem_regions: u64::MAX,
needs_mem_regions: false,
needs_mem_region_fd: false,
num_entries: NUM_ENTRIES_DEFAULT,
num_queues: 1,
num_poll_queues: 0,
path: String::new(),
read_only: false,
supports_fua_natively: false,
supports_poll_queues: false,
},
file: None,
target_info: None,
queues: Vec::new(),
poll_queues: Vec::new(),
state: State::Created,
}
}
fn cant_set_while_connected(&self) -> Result<()> {
if self.state >= State::Connected {
Err(properties::error_cant_set_while_connected())
} else {
Ok(())
}
}
fn cant_set_while_started(&self) -> Result<()> {
if self.state >= State::Started {
Err(properties::error_cant_set_while_started())
} else {
Ok(())
}
}
fn must_be_connected(&self) -> Result<()> {
if self.state >= State::Connected {
Ok(())
} else {
Err(properties::error_must_be_connected())
}
}
fn must_be_started(&self) -> Result<()> {
if self.state >= State::Started {
Ok(())
} else {
Err(Error::new(Errno::EBUSY, "Device must be started"))
}
}
fn get_capacity(&self) -> Result<u64> {
self.must_be_connected()?;
Ok(lseek64(self.props.fd, 0, Whence::SeekEnd)? as u64)
}
fn set_direct(&mut self, value: bool) -> Result<()> {
self.cant_set_while_connected()?;
self.props.direct = value;
Ok(())
}
fn set_fd(&mut self, value: i32) -> Result<()> {
self.cant_set_while_connected()?;
self.props.fd = value;
Ok(())
}
fn open_file(&mut self) -> Result<()> {
if !self.props.path.is_empty() {
if self.props.fd != -1 {
return Err(Error::new(
Errno::EINVAL,
"path and fd cannot be set at the same time",
));
}
let open_flags = if self.props.direct { O_DIRECT } else { 0 };
let file = OpenOptions::new()
.custom_flags(open_flags)
.read(true)
.write(!self.props.read_only)
.open(self.props.path.as_str())
.map_err(|e| Error::from_io_error(e, Errno::EINVAL))?;
self.props.fd = file.as_raw_fd();
self.assign_file(file)
} else if self.props.fd != -1 {
let file = unsafe { File::from_raw_fd(self.props.fd) };
self.assign_file(file)
} else {
Err(Error::new(Errno::EINVAL, "One of path and fd must be set"))
}
}
fn assign_file(&mut self, file: File) -> Result<()> {
let file_type = file
.metadata()
.map_err(|e| Error::from_io_error(e, Errno::EINVAL))?
.file_type();
if !file_type.is_block_device() && !file_type.is_file() {
return Err(Error::new(
Errno::EINVAL,
"The file must be a block device or a regular file",
));
}
let target_info =
TargetInfo::from_file(&file).map_err(|e| Error::from_io_error(e, Errno::EINVAL))?;
let supports_poll_queues = supports_iopoll(&file, &target_info)?;
self.props.direct = target_info.direct;
self.props.read_only = target_info.read_only;
self.props.supports_fua_natively = target_info.supports_fua_natively;
self.props.supports_poll_queues = supports_poll_queues;
self.file = Some(file);
self.target_info = Some(target_info);
Ok(())
}
fn get_max_segment_len(&self) -> Result<i32> {
self.must_be_connected()?;
Ok(0) }
fn get_max_segments(&self) -> Result<i32> {
self.must_be_connected()?;
Ok(sysconf(SysconfVar::IOV_MAX)?.unwrap() as i32)
}
fn get_max_transfer(&self) -> Result<i32> {
self.must_be_connected()?;
Ok(0) }
fn get_max_write_zeroes_len(&self) -> Result<u64> {
self.must_be_connected()?;
Ok(0) }
fn get_max_discard_len(&self) -> Result<u64> {
self.must_be_connected()?;
Ok(0) }
fn get_mem_region_alignment(&self) -> Result<u64> {
Ok(self.get_buf_alignment()? as u64)
}
fn get_buf_alignment(&self) -> Result<i32> {
self.must_be_connected()?;
self.get_request_alignment()
}
fn set_num_entries(&mut self, value: i32) -> Result<()> {
self.must_be_connected()?;
self.cant_set_while_started()?;
if value <= 0 {
return Err(Error::new(
Errno::EINVAL,
"num-entries must be greater than 0",
));
}
self.props.num_entries = value;
Ok(())
}
fn set_num_queues(&mut self, value: i32) -> Result<()> {
self.must_be_connected()?;
self.cant_set_while_started()?;
if value < 0 {
return Err(Error::new(
Errno::EINVAL,
"num_queues must be equal to or greater than 0",
));
}
self.props.num_queues = value;
Ok(())
}
fn set_num_poll_queues(&mut self, value: i32) -> Result<()> {
self.must_be_connected()?;
self.cant_set_while_started()?;
if value < 0 {
return Err(Error::new(
Errno::EINVAL,
"num_poll_queues must be equal to or greater than 0",
));
}
self.props.num_poll_queues = value;
Ok(())
}
fn get_optimal_io_alignment(&self) -> Result<i32> {
self.must_be_connected()?;
Ok(self.target_info.as_ref().unwrap().optimal_io_alignment)
}
fn get_optimal_io_size(&self) -> Result<i32> {
self.must_be_connected()?;
Ok(self.target_info.as_ref().unwrap().optimal_io_size)
}
fn get_optimal_buf_alignment(&self) -> Result<i32> {
self.must_be_connected()?;
Ok(sysconf(SysconfVar::PAGE_SIZE)?.unwrap() as i32)
}
fn set_path(&mut self, value: &str) -> Result<()> {
self.cant_set_while_connected()?;
self.props.path = value.to_string();
Ok(())
}
fn set_read_only(&mut self, value: bool) -> Result<()> {
self.cant_set_while_connected()?;
self.props.read_only = value;
Ok(())
}
fn get_request_alignment(&self) -> Result<i32> {
self.must_be_connected()?;
Ok(self.target_info.as_ref().unwrap().request_alignment)
}
fn get_discard_alignment(&self) -> Result<i32> {
self.must_be_connected()?;
Ok(self.target_info.as_ref().unwrap().discard_alignment)
}
fn get_discard_alignment_offset(&self) -> Result<i32> {
self.must_be_connected()?;
Ok(self.target_info.as_ref().unwrap().discard_alignment_offset)
}
}
impl Driver for IoUring {
fn state(&self) -> State {
self.state
}
fn connect(&mut self) -> Result<()> {
self.cant_set_while_started()?;
if self.state == State::Connected {
return Ok(());
}
self.open_file()?;
self.state = State::Connected;
Ok(())
}
fn start(&mut self) -> Result<()> {
self.must_be_connected()?;
if self.state == State::Started {
return Ok(());
}
if !self.props.supports_poll_queues && self.props.num_poll_queues > 0 {
return Err(Error::new(Errno::EINVAL, "Poll queues not supported"));
}
if self.props.num_queues == 0 && self.props.num_poll_queues == 0 {
return Err(Error::new(
Errno::EINVAL,
"At least one of num_queues and num_poll_queues must be greater than 0",
));
}
let target_info = self.target_info.as_ref().unwrap();
let create_queue = |poll| {
let q = IoUringQueue::new(
poll,
self.props.num_entries as u32,
self.props.fd,
target_info,
)?;
Ok(Blkioq::new(Box::new(q)))
};
let queues = iter::repeat_with(|| create_queue(false))
.take(self.props.num_queues as usize)
.collect::<Result<_>>()?;
let poll_queues = iter::repeat_with(|| create_queue(true))
.take(self.props.num_poll_queues as usize)
.collect::<Result<_>>()?;
self.queues = queues;
self.poll_queues = poll_queues;
self.state = State::Started;
Ok(())
}
fn map_mem_region(&mut self, _region: &MemoryRegion) -> Result<()> {
self.must_be_started()
}
fn unmap_mem_region(&mut self, _region: &MemoryRegion) {}
fn get_queue(&mut self, index: usize) -> Result<&mut Blkioq> {
self.queues
.get_mut(index)
.ok_or_else(|| Error::new(Errno::EINVAL, "invalid queue index"))
}
fn get_poll_queue(&mut self, index: usize) -> Result<&mut Blkioq> {
self.poll_queues
.get_mut(index)
.ok_or_else(|| Error::new(Errno::EINVAL, "invalid queue index"))
}
}