#![allow(clippy::arc_with_non_send_sync)]
use super::{
common, Completion, CompletionInner, File, OpenFlags, SharedWalLockKind, SharedWalMappedRegion,
IO,
};
use crate::error::io_error;
use crate::io::clock::{Clock, DefaultClock, MonotonicInstant, WallClockInstant};
use crate::io::unix::{
unix_shared_wal_lock_byte, unix_shared_wal_map, unix_shared_wal_unlock_byte,
};
use crate::storage::wal::CKPT_BATCH_PAGES;
use crate::sync::Mutex;
use crate::turso_assert;
use crate::{CompletionError, LimboError, Result};
use rustix::fs::{self, FlockOperation, OFlags};
use std::ptr::NonNull;
use std::{
collections::{HashMap, VecDeque},
io::ErrorKind,
ops::Deref,
os::{fd::AsFd, unix::io::AsRawFd},
sync::Arc,
};
use tracing::{debug, trace, warn};
const ENTRIES: u32 = 512;
const SQPOLL_IDLE: u32 = 1000;
const IOVEC_POOL_SIZE: usize = 64;
const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES;
const MAX_WAIT: usize = 4;
const ARENA_COUNT: usize = 2;
const CANCEL_TAG: u64 = 1;
struct UringCapabilities {
ftruncate: bool,
}
pub struct UringIO {
ring: Arc<io_uring::IoUring>,
state: Arc<Mutex<RingState>>,
wait_lock: Arc<Mutex<()>>,
caps: Arc<UringCapabilities>,
}
unsafe impl Send for UringIO {}
unsafe impl Sync for UringIO {}
crate::assert::assert_send_sync!(UringIO);
struct RingState {
pending_ops: usize,
writev_states: HashMap<u64, WritevState>,
overflow: VecDeque<io_uring::squeue::Entry>,
iov_pool: IovecPool,
free_arenas: [Option<(NonNull<u8>, usize)>; ARENA_COUNT],
}
impl RingState {
fn empty(&self) -> bool {
self.pending_ops == 0 && self.overflow.is_empty()
}
unsafe fn flush_overflow(&mut self, ring: &io_uring::IoUring) {
if self.overflow.is_empty() {
return;
}
let mut sq = ring.submission_shared();
while !self.overflow.is_empty() {
if sq.is_full() {
break;
}
let entry = self.overflow.pop_front().expect("checked not empty");
if sq.push(&entry).is_err() {
self.overflow.push_front(entry);
break;
}
self.pending_ops += 1;
}
}
unsafe fn submit_entry(&mut self, ring: &io_uring::IoUring, entry: &io_uring::squeue::Entry) {
trace!("submit_entry({:?})", entry);
self.flush_overflow(ring);
let pushed = {
let mut sq = ring.submission_shared();
sq.push(entry).is_ok()
};
if pushed {
self.pending_ops += 1;
return;
}
self.overflow.push_back(entry.clone());
let _ = ring.submit();
}
unsafe fn submit_cancel_urgent(
&mut self,
ring: &io_uring::IoUring,
entry: &io_uring::squeue::Entry,
) -> Result<()> {
let pushed = {
let mut sq = ring.submission_shared();
sq.push(entry).is_ok()
};
if pushed {
self.pending_ops += 1;
return Ok(());
}
self.overflow.push_front(entry.clone());
ring.submit().map_err(|e| io_error(e, "io_uring_submit"))?;
Ok(())
}
}
struct IovecPool {
pool: Vec<Box<[libc::iovec; MAX_IOVEC_ENTRIES]>>,
}
impl IovecPool {
fn new() -> Self {
let pool = (0..IOVEC_POOL_SIZE)
.map(|_| {
Box::new(
[libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 0,
}; MAX_IOVEC_ENTRIES],
)
})
.collect();
Self { pool }
}
#[inline(always)]
fn acquire(&mut self) -> Option<Box<[libc::iovec; MAX_IOVEC_ENTRIES]>> {
self.pool.pop()
}
#[inline(always)]
fn release(&mut self, iovec: Box<[libc::iovec; MAX_IOVEC_ENTRIES]>) {
if self.pool.len() < IOVEC_POOL_SIZE {
self.pool.push(iovec);
}
}
}
impl UringIO {
pub fn new() -> Result<Self> {
let ring = match io_uring::IoUring::builder()
.setup_sqpoll(SQPOLL_IDLE)
.build(ENTRIES)
{
Ok(ring) => ring,
Err(_) => io_uring::IoUring::new(ENTRIES).map_err(|e| io_error(e, "io_uring_setup"))?,
};
ring.submitter()
.register_buffers_sparse(ARENA_COUNT as u32)
.map_err(|e| io_error(e, "register_buffers"))?;
let mut probe = io_uring::register::Probe::new();
let caps = if ring.submitter().register_probe(&mut probe).is_ok() {
UringCapabilities {
ftruncate: probe.is_supported(io_uring::opcode::Ftruncate::CODE),
}
} else {
UringCapabilities { ftruncate: false }
};
if !caps.ftruncate {
warn!("io_uring: IORING_OP_FTRUNCATE not supported by kernel, using POSIX fallback");
}
let state = RingState {
overflow: VecDeque::new(),
pending_ops: 0,
writev_states: HashMap::default(),
iov_pool: IovecPool::new(),
free_arenas: [const { None }; ARENA_COUNT],
};
debug!("Using IO backend 'io-uring'");
Ok(Self {
ring: Arc::new(ring),
state: Arc::new(Mutex::new(state)),
wait_lock: Arc::new(Mutex::new(())),
caps: Arc::new(caps),
})
}
}
struct WritevState {
file_id: io_uring::types::Fd,
file_pos: u64,
current_buffer_idx: usize,
current_buffer_offset: usize,
total_written: usize,
total_len: usize,
bufs: Vec<Arc<crate::Buffer>>,
last_iov_allocation: Option<Box<[libc::iovec; MAX_IOVEC_ENTRIES]>>,
}
impl WritevState {
fn new(file: &UringFile, pos: u64, bufs: Vec<Arc<crate::Buffer>>) -> Self {
let file_id = file.file.as_raw_fd();
let total_len = bufs.iter().map(|b| b.len()).sum();
Self {
file_id: io_uring::types::Fd(file_id),
file_pos: pos,
current_buffer_idx: 0,
current_buffer_offset: 0,
total_written: 0,
bufs,
last_iov_allocation: None,
total_len,
}
}
#[inline(always)]
fn remaining(&self) -> usize {
self.total_len - self.total_written
}
#[inline(always)]
fn advance(&mut self, written: u64) {
let mut remaining = written;
while remaining > 0 {
let current_buf_len = self.bufs[self.current_buffer_idx].len();
let left = current_buf_len - self.current_buffer_offset;
if remaining < left as u64 {
self.current_buffer_offset += remaining as usize;
self.file_pos += remaining;
remaining = 0;
} else {
remaining -= left as u64;
self.file_pos += left as u64;
self.current_buffer_idx += 1;
self.current_buffer_offset = 0;
}
}
self.total_written += written as usize;
}
#[inline(always)]
fn free_last_iov(&mut self, pool: &mut IovecPool) {
if let Some(allocation) = self.last_iov_allocation.take() {
pool.release(allocation);
}
}
}
impl RingState {
#[cfg(debug_assertions)]
fn debug_check_fixed(&self, idx: u32, ptr: *const u8, len: usize) {
let (base, blen) = self.free_arenas[idx as usize].expect("slot not registered");
let start = base.as_ptr() as usize;
let end = start + blen;
let p = ptr as usize;
turso_assert!(
p >= start && p + len <= end,
"Fixed operation, pointer out of registered range"
);
}
unsafe fn submit_writev(&mut self, ring: &io_uring::IoUring, key: u64, mut st: WritevState) {
st.free_last_iov(&mut self.iov_pool);
let mut iov_allocation = self.iov_pool.acquire().unwrap_or_else(|| {
Box::new(
[libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 0,
}; MAX_IOVEC_ENTRIES],
)
});
let mut iov_count = 0;
let mut last_end: Option<(*const u8, usize)> = None;
for (idx, buffer) in st.bufs.iter().enumerate().skip(st.current_buffer_idx) {
let mut ptr = buffer.as_ptr();
let mut len = buffer.len();
if idx == st.current_buffer_idx && st.current_buffer_offset != 0 {
turso_assert!(
st.current_buffer_offset <= len,
"writev state offset out of bounds"
);
ptr = ptr.add(st.current_buffer_offset);
len -= st.current_buffer_offset;
}
if let Some((last_ptr, last_len)) = last_end {
if last_ptr.add(last_len) == ptr {
iov_allocation[iov_count - 1].iov_len += len;
last_end = Some((last_ptr, last_len + len));
continue;
}
}
iov_allocation[iov_count] = libc::iovec {
iov_base: ptr as *mut _,
iov_len: len,
};
last_end = Some((ptr, len));
iov_count += 1;
if iov_count >= MAX_IOVEC_ENTRIES {
break;
}
}
let ptr = iov_allocation.as_ptr() as *mut libc::iovec;
st.last_iov_allocation = Some(iov_allocation);
let entry = io_uring::opcode::Writev::new(st.file_id, ptr, iov_count as u32)
.offset(st.file_pos)
.build()
.user_data(key);
self.writev_states.insert(key, st);
self.submit_entry(ring, &entry);
}
unsafe fn handle_writev_completion(
&mut self,
ring: &io_uring::IoUring,
mut state: WritevState,
user_data: u64,
result: i32,
) {
if result < 0 {
let err = std::io::Error::from_raw_os_error(-result);
tracing::error!("writev failed (user_data: {}): {}", user_data, err);
state.free_last_iov(&mut self.iov_pool);
completion_from_key(user_data).error(CompletionError::IOError(err.kind(), "pwritev"));
return;
}
let written = result;
if written == 0 && state.remaining() > 0 {
state.free_last_iov(&mut self.iov_pool);
completion_from_key(user_data).error(CompletionError::ShortWrite);
return;
}
state.advance(written as u64);
match state.remaining() {
0 => {
tracing::debug!(
"writev operation completed: wrote {} bytes",
state.total_written
);
state.free_last_iov(&mut self.iov_pool);
completion_from_key(user_data).complete(state.total_written as i32);
}
remaining => {
tracing::trace!(
"resubmitting writev operation for user_data {}: wrote {} bytes, remaining {}",
user_data,
written,
remaining
);
self.submit_writev(ring, user_data, state);
wake_user_data(user_data);
}
}
}
}
impl IO for UringIO {
fn supports_shared_wal_coordination(&self) -> bool {
true
}
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Arc<dyn File>> {
trace!("open_file(path = {})", path);
let mut file = std::fs::File::options();
file.read(true);
if !flags.contains(OpenFlags::ReadOnly) {
file.write(true);
file.create(flags.contains(OpenFlags::Create));
}
let file = file.open(path).map_err(|e| io_error(e, "open"))?;
let fd = file.as_fd();
if direct {
match fs::fcntl_setfl(fd, OFlags::DIRECT) {
Ok(_) => {}
Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"),
}
}
let uring_file = Arc::new(UringFile {
ring: self.ring.clone(),
state: self.state.clone(),
caps: self.caps.clone(),
file,
});
if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err()
&& !flags.intersects(OpenFlags::ReadOnly | OpenFlags::NoLock)
{
uring_file.lock_file(true)?;
}
Ok(uring_file)
}
fn remove_file(&self, path: &str) -> Result<()> {
std::fs::remove_file(path).map_err(|e| io_error(e, "remove_file"))?;
Ok(())
}
fn cancel(&self, completions: &[Completion]) -> Result<()> {
let mut state = self.state.lock();
for c in completions {
c.abort();
let e = io_uring::opcode::AsyncCancel::new(Arc::as_ptr(c.get_inner()) as u64)
.build()
.user_data(CANCEL_TAG);
unsafe { state.submit_cancel_urgent(&self.ring, &e)? };
}
Ok(())
}
fn step(&self) -> Result<()> {
let Some(_wait_guard) = self.wait_lock.try_lock() else {
return Ok(());
};
loop {
let pending = {
let mut state = self.state.lock();
unsafe { state.flush_overflow(&self.ring) };
if state.empty() {
return Ok(());
}
state.pending_ops
};
let wants = std::cmp::min(pending, MAX_WAIT);
tracing::trace!("submit_and_wait for {wants} pending operations to complete");
self.ring
.submit_and_wait(wants)
.map_err(|e| io_error(e, "io_uring_submit_and_wait"))?;
self.drain_cq()?;
}
}
fn register_fixed_buffer(&self, ptr: std::ptr::NonNull<u8>, len: usize) -> Result<u32> {
turso_assert!(
len % 512 == 0,
"fixed buffer length must be logical block aligned"
);
let mut state = self.state.lock();
let slot =
state.free_arenas.iter().position(|e| e.is_none()).ok_or({
crate::error::CompletionError::UringIOError("no free fixed buffer slots")
})?;
unsafe {
self.ring
.submitter()
.register_buffers_update(
slot as u32,
&[libc::iovec {
iov_base: ptr.as_ptr() as *mut libc::c_void,
iov_len: len,
}],
None,
)
.map_err(|e| io_error(e, "register_buffers_update"))?
};
state.free_arenas[slot] = Some((ptr, len));
Ok(slot as u32)
}
}
impl UringIO {
fn drain_cq(&self) -> Result<bool> {
let mut state = self.state.lock();
let mut drained_any = false;
loop {
let mut cq = unsafe { self.ring.completion_shared() };
let Some(cqe) = cq.next() else {
return Ok(drained_any);
};
drained_any = true;
state.pending_ops -= 1;
let user_data = cqe.user_data();
if user_data == CANCEL_TAG {
continue;
}
let result = cqe.result();
turso_assert!(
user_data != 0,
"user_data must not be zero, we dont submit linked timeouts that would cause this"
);
if let Some(wstate) = state.writev_states.remove(&user_data) {
drop(cq);
unsafe { state.handle_writev_completion(&self.ring, wstate, user_data, result) };
continue;
}
if result < 0 {
let errno = -result;
let err = std::io::Error::from_raw_os_error(errno);
completion_from_key(user_data)
.error(CompletionError::IOError(err.kind(), "io_uring_cqe"));
} else {
completion_from_key(user_data).complete(result);
}
}
}
}
impl Clock for UringIO {
fn current_time_monotonic(&self) -> MonotonicInstant {
DefaultClock.current_time_monotonic()
}
fn current_time_wall_clock(&self) -> WallClockInstant {
DefaultClock.current_time_wall_clock()
}
}
#[inline(always)]
fn get_key(c: Completion) -> u64 {
Arc::into_raw(c.get_inner().clone()) as u64
}
#[inline(always)]
fn completion_from_key(key: u64) -> Completion {
let c_inner = unsafe { Arc::from_raw(key as *const CompletionInner) };
Completion {
inner: Some(c_inner),
}
}
fn wake_user_data(key: u64) {
let ptr = key as *const CompletionInner;
let kernel_ref = unsafe { Arc::from_raw(ptr) };
let cloned = kernel_ref.clone();
let _ = Arc::into_raw(kernel_ref);
Completion {
inner: Some(cloned),
}
.wake_progress();
}
pub struct UringFile {
ring: Arc<io_uring::IoUring>,
state: Arc<Mutex<RingState>>,
caps: Arc<UringCapabilities>,
file: std::fs::File,
}
impl Deref for UringFile {
type Target = std::fs::File;
fn deref(&self) -> &Self::Target {
&self.file
}
}
unsafe impl Send for UringFile {}
unsafe impl Sync for UringFile {}
crate::assert::assert_send_sync!(UringFile);
impl File for UringFile {
fn lock_file(&self, exclusive: bool) -> Result<()> {
let fd = self.file.as_fd();
fs::fcntl_lock(
fd,
if exclusive {
FlockOperation::NonBlockingLockExclusive
} else {
FlockOperation::NonBlockingLockShared
},
)
.map_err(|e| {
let io_error = std::io::Error::from(e);
let message = match io_error.kind() {
ErrorKind::WouldBlock => {
"Failed locking file. File is locked by another process".to_string()
}
_ => format!("Failed locking file, {io_error}"),
};
LimboError::LockingError(message)
})?;
Ok(())
}
fn unlock_file(&self) -> Result<()> {
let fd = self.file.as_fd();
fs::fcntl_lock(fd, FlockOperation::NonBlockingUnlock).map_err(|e| {
LimboError::LockingError(format!(
"Failed to release file lock: {}",
std::io::Error::from(e)
))
})?;
Ok(())
}
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
let r = c.as_read();
let read_e = {
let buf = r.buf();
let ptr = buf.as_mut_ptr();
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let len = buf.len();
if let Some(idx) = buf.fixed_id() {
trace!(
"pread_fixed(pos = {}, length = {}, idx = {})",
pos,
len,
idx
);
#[cfg(debug_assertions)]
{
self.state.lock().debug_check_fixed(idx, ptr, len);
}
io_uring::opcode::ReadFixed::new(fd, ptr, len as u32, idx as u16)
.offset(pos)
.build()
.user_data(get_key(c.clone()))
} else {
trace!("pread(pos = {}, length = {})", pos, len);
io_uring::opcode::Read::new(fd, buf.as_mut_ptr(), len as u32)
.offset(pos)
.build()
.user_data(get_key(c.clone()))
}
};
let mut state = self.state.lock();
unsafe { state.submit_entry(&self.ring, &read_e) };
Ok(c)
}
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let write = {
let ptr = buffer.as_ptr();
let len = buffer.len();
let fd = io_uring::types::Fd(self.file.as_raw_fd());
if let Some(idx) = buffer.fixed_id() {
trace!(
"pwrite_fixed(pos = {}, length = {}, idx= {})",
pos,
len,
idx
);
#[cfg(debug_assertions)]
{
self.state.lock().debug_check_fixed(idx, ptr, len);
}
io_uring::opcode::WriteFixed::new(fd, ptr, len as u32, idx as u16)
.offset(pos)
.build()
.user_data(get_key(c.clone()))
} else {
trace!("pwrite(pos = {}, length = {})", pos, buffer.len());
io_uring::opcode::Write::new(fd, ptr, len as u32)
.offset(pos)
.build()
.user_data(get_key(c.clone()))
}
};
c.keep_write_buffer_alive(buffer);
let mut state = self.state.lock();
unsafe { state.submit_entry(&self.ring, &write) };
Ok(c)
}
fn sync(&self, c: Completion, _sync_type: crate::io::FileSyncType) -> Result<Completion> {
trace!("sync()");
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let sync = io_uring::opcode::Fsync::new(fd)
.build()
.user_data(get_key(c.clone()));
let mut state = self.state.lock();
unsafe { state.submit_entry(&self.ring, &sync) };
Ok(c)
}
fn pwritev(
&self,
pos: u64,
bufs: Vec<Arc<crate::Buffer>>,
c: Completion,
) -> Result<Completion> {
tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len());
let wstate = WritevState::new(self, pos, bufs);
let mut state = self.state.lock();
unsafe { state.submit_writev(&self.ring, get_key(c.clone()), wstate) };
Ok(c)
}
fn size(&self) -> Result<u64> {
Ok(self
.file
.metadata()
.map_err(|e| io_error(e, "metadata"))?
.len())
}
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
let fd = io_uring::types::Fd(self.file.as_raw_fd());
if self.caps.ftruncate {
let truncate = io_uring::opcode::Ftruncate::new(fd, len)
.build()
.user_data(get_key(c.clone()));
let mut state = self.state.lock();
unsafe { state.submit_entry(&self.ring, &truncate) };
Ok(c)
} else {
let result = self.file.set_len(len);
match result {
Ok(()) => {
trace!("file truncated to len=({})", len);
c.complete(0);
Ok(c)
}
Err(e) => Err(io_error(e, "truncate")),
}
}
}
fn shared_wal_lock_byte(
&self,
offset: u64,
exclusive: bool,
kind: SharedWalLockKind,
) -> Result<()> {
unix_shared_wal_lock_byte(self.file.as_raw_fd(), offset, exclusive, true, kind).map(|_| ())
}
fn shared_wal_try_lock_byte(
&self,
offset: u64,
exclusive: bool,
kind: SharedWalLockKind,
) -> Result<bool> {
unix_shared_wal_lock_byte(self.file.as_raw_fd(), offset, exclusive, false, kind)
}
fn shared_wal_unlock_byte(&self, offset: u64, kind: SharedWalLockKind) -> Result<()> {
unix_shared_wal_unlock_byte(self.file.as_raw_fd(), offset, kind)
}
fn shared_wal_set_len(&self, len: u64) -> Result<()> {
self.file
.set_len(len)
.map_err(|err| io_error(err, "resize shared WAL coordination file"))
}
fn shared_wal_map(&self, offset: u64, len: usize) -> Result<Box<dyn SharedWalMappedRegion>> {
unix_shared_wal_map(offset, len, self.file.as_raw_fd())
}
}
impl Drop for UringFile {
fn drop(&mut self) {
self.unlock_file().expect("Failed to unlock file");
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::io::common;
#[test]
fn test_multiple_processes_cannot_open_file() {
common::tests::test_multiple_processes_cannot_open_file(UringIO::new);
}
}