use std::collections::VecDeque;
use std::fs::File;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use bytes::{Bytes, BytesMut};
use parking_lot::Mutex;
#[derive(Debug, Clone)]
pub struct IoUringConfig {
pub sq_entries: u32,
pub kernel_poll: bool,
pub sq_poll_idle_ms: u32,
pub max_inflight: usize,
pub direct_io: bool,
pub registered_buffers: usize,
pub buffer_size: usize,
}
impl Default for IoUringConfig {
fn default() -> Self {
Self {
sq_entries: 1024,
kernel_poll: false, sq_poll_idle_ms: 10,
max_inflight: 256,
direct_io: false,
registered_buffers: 64,
buffer_size: 64 * 1024, }
}
}
impl IoUringConfig {
pub fn high_throughput() -> Self {
Self {
sq_entries: 4096,
kernel_poll: true,
sq_poll_idle_ms: 100,
max_inflight: 1024,
direct_io: true,
registered_buffers: 256,
buffer_size: 256 * 1024, }
}
pub fn low_latency() -> Self {
Self {
sq_entries: 256,
kernel_poll: true,
sq_poll_idle_ms: 1,
max_inflight: 64,
direct_io: true,
registered_buffers: 32,
buffer_size: 16 * 1024, }
}
pub fn minimal() -> Self {
Self {
sq_entries: 128,
kernel_poll: false,
sq_poll_idle_ms: 0,
max_inflight: 32,
direct_io: false,
registered_buffers: 8,
buffer_size: 4 * 1024, }
}
}
#[cfg(target_os = "linux")]
pub fn is_io_uring_available() -> bool {
let version = match std::fs::read_to_string("/proc/sys/kernel/osrelease") {
Ok(v) => v,
Err(_) => return false,
};
let version = version.trim();
let parts: Vec<&str> = version.split('.').collect();
if parts.len() < 2 {
return false;
}
let major: u32 = parts[0].parse().unwrap_or(0);
let minor: u32 = parts[1]
.split('-')
.next()
.unwrap_or("0")
.parse()
.unwrap_or(0);
major > 5 || (major == 5 && minor >= 6)
}
#[cfg(not(target_os = "linux"))]
pub fn is_io_uring_available() -> bool {
false
}
#[derive(Debug, Default)]
pub struct IoUringStats {
pub ops_submitted: AtomicU64,
pub ops_completed: AtomicU64,
pub bytes_written: AtomicU64,
pub bytes_read: AtomicU64,
pub cqe_overflows: AtomicU64,
pub sq_dropped: AtomicU64,
}
impl IoUringStats {
pub fn new() -> Self {
Self::default()
}
pub fn snapshot(&self) -> IoUringStatsSnapshot {
IoUringStatsSnapshot {
ops_submitted: self.ops_submitted.load(Ordering::Relaxed),
ops_completed: self.ops_completed.load(Ordering::Relaxed),
bytes_written: self.bytes_written.load(Ordering::Relaxed),
bytes_read: self.bytes_read.load(Ordering::Relaxed),
cqe_overflows: self.cqe_overflows.load(Ordering::Relaxed),
sq_dropped: self.sq_dropped.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct IoUringStatsSnapshot {
pub ops_submitted: u64,
pub ops_completed: u64,
pub bytes_written: u64,
pub bytes_read: u64,
pub cqe_overflows: u64,
pub sq_dropped: u64,
}
impl IoUringStatsSnapshot {
pub fn in_flight(&self) -> u64 {
self.ops_submitted.saturating_sub(self.ops_completed)
}
pub fn completion_rate(&self) -> f64 {
if self.ops_submitted == 0 {
1.0
} else {
self.ops_completed as f64 / self.ops_submitted as f64
}
}
}
pub struct AsyncWriter {
file: Mutex<File>,
offset: AtomicU64,
stats: Arc<IoUringStats>,
#[allow(dead_code)] config: IoUringConfig,
}
impl AsyncWriter {
pub fn new(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
let offset = file.metadata()?.len();
Ok(Self {
file: Mutex::new(file),
offset: AtomicU64::new(offset),
stats: Arc::new(IoUringStats::new()),
config,
})
}
pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
let file = std::fs::OpenOptions::new().write(true).open(path)?;
let offset = file.metadata()?.len();
Ok(Self {
file: Mutex::new(file),
offset: AtomicU64::new(offset),
stats: Arc::new(IoUringStats::new()),
config,
})
}
pub fn write(&self, data: &[u8]) -> io::Result<u64> {
let mut file = self.file.lock();
file.write_all(data)?;
let offset = self.offset.fetch_add(data.len() as u64, Ordering::AcqRel);
self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
self.stats
.bytes_written
.fetch_add(data.len() as u64, Ordering::Relaxed);
Ok(offset)
}
pub fn flush(&self) -> io::Result<()> {
let mut file = self.file.lock();
file.flush()?;
file.sync_data()
}
pub fn sync(&self) -> io::Result<()> {
let file = self.file.lock();
file.sync_data()
}
pub fn offset(&self) -> u64 {
self.offset.load(Ordering::Acquire)
}
pub fn stats(&self) -> IoUringStatsSnapshot {
self.stats.snapshot()
}
}
pub struct AsyncReader {
file: Mutex<File>,
stats: Arc<IoUringStats>,
#[allow(dead_code)]
config: IoUringConfig,
}
impl AsyncReader {
pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
let file = std::fs::File::open(path)?;
Ok(Self {
file: Mutex::new(file),
stats: Arc::new(IoUringStats::new()),
config,
})
}
pub fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
let mut file = self.file.lock();
file.seek(SeekFrom::Start(offset))?;
let n = file.read(buf)?;
self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
self.stats.bytes_read.fetch_add(n as u64, Ordering::Relaxed);
Ok(n)
}
pub fn read_exact_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
let mut file = self.file.lock();
file.seek(SeekFrom::Start(offset))?;
file.read_exact(buf)?;
self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
self.stats
.bytes_read
.fetch_add(buf.len() as u64, Ordering::Relaxed);
Ok(())
}
pub fn stats(&self) -> IoUringStatsSnapshot {
self.stats.snapshot()
}
}
#[derive(Debug, Clone, Default)]
pub struct BatchStats {
pub total_ops: u64,
pub write_ops: u64,
pub read_ops: u64,
pub sync_ops: u64,
pub write_bytes: u64,
pub read_bytes: u64,
}
#[derive(Debug, Default)]
pub struct IoBatch {
operations: VecDeque<IoOperation>,
}
#[derive(Debug, Clone)]
pub enum IoOperation {
Write {
offset: u64,
data: Bytes,
},
Read {
offset: u64,
len: usize,
},
Sync,
}
impl IoBatch {
pub fn new() -> Self {
Self {
operations: VecDeque::new(),
}
}
pub fn write(&mut self, offset: u64, data: impl Into<Bytes>) {
self.operations.push_back(IoOperation::Write {
offset,
data: data.into(),
});
}
pub fn read(&mut self, offset: u64, len: usize) {
self.operations.push_back(IoOperation::Read { offset, len });
}
pub fn sync(&mut self) {
self.operations.push_back(IoOperation::Sync);
}
pub fn len(&self) -> usize {
self.operations.len()
}
pub fn is_empty(&self) -> bool {
self.operations.is_empty()
}
pub fn clear(&mut self) {
self.operations.clear();
}
pub fn drain(&mut self) -> impl Iterator<Item = IoOperation> + '_ {
self.operations.drain(..)
}
pub fn pending_write_bytes(&self) -> u64 {
self.operations
.iter()
.map(|op| match op {
IoOperation::Write { data, .. } => data.len() as u64,
_ => 0,
})
.sum()
}
pub fn stats(&self) -> BatchStats {
let mut stats = BatchStats::default();
for op in &self.operations {
stats.total_ops += 1;
match op {
IoOperation::Write { data, .. } => {
stats.write_ops += 1;
stats.write_bytes += data.len() as u64;
}
IoOperation::Read { len, .. } => {
stats.read_ops += 1;
stats.read_bytes += *len as u64;
}
IoOperation::Sync => {
stats.sync_ops += 1;
}
}
}
stats
}
pub fn pending_write_ops(&self) -> usize {
self.operations
.iter()
.filter(|op| matches!(op, IoOperation::Write { .. }))
.count()
}
pub fn pending_read_ops(&self) -> usize {
self.operations
.iter()
.filter(|op| matches!(op, IoOperation::Read { .. }))
.count()
}
}
#[derive(Debug, Clone)]
pub struct BatchReadResult {
pub offset: u64,
pub data: BytesMut,
}
pub struct BatchExecutor {
writer: Option<AsyncWriter>,
reader: Option<AsyncReader>,
stats: Arc<IoUringStats>,
}
impl BatchExecutor {
pub fn for_writer(writer: AsyncWriter) -> Self {
Self {
stats: writer.stats.clone(),
writer: Some(writer),
reader: None,
}
}
pub fn for_reader(reader: AsyncReader) -> Self {
Self {
stats: reader.stats.clone(),
writer: None,
reader: Some(reader),
}
}
pub fn execute(&self, batch: &mut IoBatch) -> io::Result<Vec<BatchReadResult>> {
let mut read_results = Vec::new();
for op in batch.drain() {
match op {
IoOperation::Write { offset: _, data } => {
if let Some(ref writer) = self.writer {
writer.write(&data)?;
} else {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"No writer configured for batch executor",
));
}
}
IoOperation::Read { offset, len } => {
if let Some(ref reader) = self.reader {
let mut buf = BytesMut::zeroed(len);
let n = reader.read_at(offset, &mut buf)?;
buf.truncate(n);
read_results.push(BatchReadResult { offset, data: buf });
} else {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"No reader configured for batch executor",
));
}
}
IoOperation::Sync => {
if let Some(ref writer) = self.writer {
writer.sync()?;
}
}
}
}
Ok(read_results)
}
pub fn stats(&self) -> IoUringStatsSnapshot {
self.stats.snapshot()
}
}
pub struct WalWriter {
writer: AsyncWriter,
batch: Mutex<IoBatch>,
pending_bytes: AtomicU64,
max_batch_bytes: u64,
}
impl WalWriter {
pub fn new(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
let max_batch_bytes = (config.registered_buffers * config.buffer_size) as u64;
Ok(Self {
writer: AsyncWriter::new(path, config)?,
batch: Mutex::new(IoBatch::new()),
pending_bytes: AtomicU64::new(0),
max_batch_bytes,
})
}
pub fn append(&self, data: &[u8]) -> io::Result<u64> {
self.writer.write(data)
}
pub fn append_batched(&self, data: &[u8]) -> io::Result<u64> {
let data_len = data.len() as u64;
let offset = self.writer.offset();
{
let mut batch = self.batch.lock();
batch.write(offset, Bytes::copy_from_slice(data));
}
let pending = self.pending_bytes.fetch_add(data_len, Ordering::AcqRel) + data_len;
if pending >= self.max_batch_bytes {
self.flush_batch()?;
}
Ok(pending)
}
pub fn flush_batch(&self) -> io::Result<()> {
let mut batch = self.batch.lock();
if batch.is_empty() {
return Ok(());
}
for op in batch.drain() {
match op {
IoOperation::Write { data, .. } => {
self.writer.write(&data)?;
}
IoOperation::Sync => {
self.writer.sync()?;
}
IoOperation::Read { .. } => {
}
}
}
self.pending_bytes.store(0, Ordering::Release);
self.writer.flush()?;
self.writer.sync()
}
pub fn pending_batch_bytes(&self) -> u64 {
self.pending_bytes.load(Ordering::Acquire)
}
pub fn pending_batch_ops(&self) -> usize {
self.batch.lock().len()
}
pub fn has_pending_batch(&self) -> bool {
!self.batch.lock().is_empty()
}
pub fn append_with_checksum(&self, data: &[u8]) -> io::Result<u64> {
let checksum = crc32fast::hash(data);
let mut buf = Vec::with_capacity(4 + data.len() + 4);
buf.extend_from_slice(&(data.len() as u32).to_be_bytes());
buf.extend_from_slice(data);
buf.extend_from_slice(&checksum.to_be_bytes());
self.writer.write(&buf)
}
pub fn append_with_checksum_batched(&self, data: &[u8]) -> io::Result<u64> {
let checksum = crc32fast::hash(data);
let mut buf = Vec::with_capacity(4 + data.len() + 4);
buf.extend_from_slice(&(data.len() as u32).to_be_bytes());
buf.extend_from_slice(data);
buf.extend_from_slice(&checksum.to_be_bytes());
self.append_batched(&buf)
}
pub fn sync(&self) -> io::Result<()> {
self.flush_batch()?;
self.writer.flush()?;
self.writer.sync()
}
pub fn size(&self) -> u64 {
self.writer.offset()
}
pub fn max_batch_bytes(&self) -> u64 {
self.max_batch_bytes
}
pub fn stats(&self) -> IoUringStatsSnapshot {
self.writer.stats()
}
}
pub struct SegmentReader {
reader: AsyncReader,
length: u64,
}
impl SegmentReader {
pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
let metadata = std::fs::metadata(&path)?;
let length = metadata.len();
Ok(Self {
reader: AsyncReader::open(path, config)?,
length,
})
}
pub fn read_messages(&self, offset: u64, max_bytes: usize) -> io::Result<BytesMut> {
let mut buf = BytesMut::zeroed(max_bytes);
let n = self.reader.read_at(offset, &mut buf)?;
buf.truncate(n);
Ok(buf)
}
pub fn read_range(&self, offset: u64, len: usize) -> io::Result<BytesMut> {
let mut buf = BytesMut::zeroed(len);
self.reader.read_exact_at(offset, &mut buf)?;
Ok(buf)
}
pub fn len(&self) -> u64 {
self.length
}
pub fn is_empty(&self) -> bool {
self.length == 0
}
pub fn stats(&self) -> IoUringStatsSnapshot {
self.reader.stats()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_config_defaults() {
let config = IoUringConfig::default();
assert_eq!(config.sq_entries, 1024);
assert!(!config.kernel_poll);
assert_eq!(config.max_inflight, 256);
}
#[test]
fn test_config_high_throughput() {
let config = IoUringConfig::high_throughput();
assert_eq!(config.sq_entries, 4096);
assert!(config.kernel_poll);
assert!(config.direct_io);
}
#[test]
fn test_config_low_latency() {
let config = IoUringConfig::low_latency();
assert_eq!(config.sq_entries, 256);
assert!(config.kernel_poll);
}
#[test]
fn test_stats_snapshot() {
let stats = IoUringStats::new();
stats.ops_submitted.store(100, Ordering::Relaxed);
stats.ops_completed.store(95, Ordering::Relaxed);
stats.bytes_written.store(10000, Ordering::Relaxed);
let snapshot = stats.snapshot();
assert_eq!(snapshot.in_flight(), 5);
assert!((snapshot.completion_rate() - 0.95).abs() < 0.001);
}
#[test]
fn test_async_writer() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.log");
let config = IoUringConfig::minimal();
let writer = AsyncWriter::new(&path, config).unwrap();
let offset = writer.write(b"hello").unwrap();
assert_eq!(offset, 0);
let offset = writer.write(b"world").unwrap();
assert_eq!(offset, 5);
writer.flush().unwrap();
let stats = writer.stats();
assert_eq!(stats.ops_completed, 2);
assert_eq!(stats.bytes_written, 10);
}
#[test]
fn test_async_reader() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.log");
std::fs::write(&path, b"hello world test data").unwrap();
let config = IoUringConfig::minimal();
let reader = AsyncReader::open(&path, config).unwrap();
let mut buf = [0u8; 5];
let n = reader.read_at(0, &mut buf).unwrap();
assert_eq!(n, 5);
assert_eq!(&buf, b"hello");
let mut buf = [0u8; 5];
reader.read_exact_at(6, &mut buf).unwrap();
assert_eq!(&buf, b"world");
let stats = reader.stats();
assert_eq!(stats.ops_completed, 2);
}
#[test]
fn test_io_batch() {
let mut batch = IoBatch::new();
assert!(batch.is_empty());
batch.write(0, Bytes::from_static(b"hello"));
batch.read(100, 50);
batch.sync();
assert_eq!(batch.len(), 3);
assert!(!batch.is_empty());
batch.clear();
assert!(batch.is_empty());
}
#[test]
fn test_wal_writer() {
let dir = tempdir().unwrap();
let path = dir.path().join("wal.log");
let config = IoUringConfig::minimal();
let wal = WalWriter::new(&path, config).unwrap();
let offset = wal.append(b"entry1").unwrap();
assert_eq!(offset, 0);
let offset = wal.append_with_checksum(b"entry2").unwrap();
assert!(offset > 0);
wal.sync().unwrap();
assert!(wal.size() > 0);
}
#[test]
fn test_segment_reader() {
let dir = tempdir().unwrap();
let path = dir.path().join("segment.log");
std::fs::write(&path, b"message1message2message3").unwrap();
let config = IoUringConfig::minimal();
let reader = SegmentReader::open(&path, config).unwrap();
assert_eq!(reader.len(), 24);
assert!(!reader.is_empty());
let data = reader.read_messages(0, 100).unwrap();
assert_eq!(&data[..], b"message1message2message3");
let data = reader.read_range(8, 8).unwrap();
assert_eq!(&data[..], b"message2");
}
#[test]
fn test_io_uring_availability() {
let available = is_io_uring_available();
println!("io_uring available: {}", available);
}
#[test]
fn test_io_batch_pending_write_bytes() {
let mut batch = IoBatch::new();
assert_eq!(batch.pending_write_bytes(), 0);
batch.write(0, Bytes::from_static(b"hello"));
batch.write(5, Bytes::from_static(b"world"));
batch.read(100, 50);
assert_eq!(batch.pending_write_bytes(), 10);
}
#[test]
fn test_io_batch_drain() {
let mut batch = IoBatch::new();
batch.write(0, Bytes::from_static(b"hello"));
batch.sync();
let ops: Vec<_> = batch.drain().collect();
assert_eq!(ops.len(), 2);
assert!(batch.is_empty());
}
#[test]
fn test_batch_executor_write() {
let dir = tempdir().unwrap();
let path = dir.path().join("batch_write.log");
let config = IoUringConfig::minimal();
let writer = AsyncWriter::new(&path, config).unwrap();
let executor = BatchExecutor::for_writer(writer);
let mut batch = IoBatch::new();
batch.write(0, Bytes::from_static(b"hello"));
batch.write(5, Bytes::from_static(b"world"));
batch.sync();
let results = executor.execute(&mut batch).unwrap();
assert!(results.is_empty());
let contents = std::fs::read(&path).unwrap();
assert_eq!(&contents, b"helloworld");
}
#[test]
fn test_batch_executor_read() {
let dir = tempdir().unwrap();
let path = dir.path().join("batch_read.log");
std::fs::write(&path, b"hello world test data").unwrap();
let config = IoUringConfig::minimal();
let reader = AsyncReader::open(&path, config).unwrap();
let executor = BatchExecutor::for_reader(reader);
let mut batch = IoBatch::new();
batch.read(0, 5);
batch.read(6, 5);
let results = executor.execute(&mut batch).unwrap();
assert_eq!(results.len(), 2);
assert_eq!(&results[0].data[..], b"hello");
assert_eq!(results[0].offset, 0);
assert_eq!(&results[1].data[..], b"world");
assert_eq!(results[1].offset, 6);
}
#[test]
fn test_wal_writer_batched() {
let dir = tempdir().unwrap();
let path = dir.path().join("wal_batch.log");
let config = IoUringConfig::minimal();
let wal = WalWriter::new(&path, config).unwrap();
wal.append(b"direct").unwrap();
wal.append_batched(b"batch1").unwrap();
wal.append_batched(b"batch2").unwrap();
assert!(wal.has_pending_batch());
assert_eq!(wal.pending_batch_ops(), 2);
assert_eq!(wal.pending_batch_bytes(), 12);
wal.flush_batch().unwrap();
assert!(!wal.has_pending_batch());
assert_eq!(wal.pending_batch_bytes(), 0);
assert!(wal.size() >= 18); }
#[test]
fn test_wal_writer_batched_checksum() {
let dir = tempdir().unwrap();
let path = dir.path().join("wal_batch_crc.log");
let config = IoUringConfig::minimal();
let wal = WalWriter::new(&path, config).unwrap();
wal.append_with_checksum_batched(b"data1").unwrap();
wal.append_with_checksum_batched(b"data2").unwrap();
assert!(wal.has_pending_batch());
wal.sync().unwrap();
assert!(!wal.has_pending_batch());
assert!(wal.size() > 0);
}
#[test]
fn test_wal_writer_auto_flush() {
let dir = tempdir().unwrap();
let path = dir.path().join("wal_auto_flush.log");
let mut config = IoUringConfig::minimal();
config.registered_buffers = 1;
config.buffer_size = 10;
let wal = WalWriter::new(&path, config).unwrap();
assert_eq!(wal.max_batch_bytes(), 10);
wal.append_batched(b"hello").unwrap(); assert!(wal.has_pending_batch());
wal.append_batched(b"world!").unwrap();
assert!(!wal.has_pending_batch()); }
#[test]
fn test_io_batch_stats() {
let mut batch = IoBatch::new();
let stats = batch.stats();
assert_eq!(stats.total_ops, 0);
assert_eq!(stats.write_ops, 0);
assert_eq!(stats.read_ops, 0);
assert_eq!(stats.sync_ops, 0);
batch.write(0, Bytes::from_static(b"hello"));
batch.write(5, Bytes::from_static(b"world"));
batch.read(100, 50);
batch.read(200, 100);
batch.sync();
let stats = batch.stats();
assert_eq!(stats.total_ops, 5);
assert_eq!(stats.write_ops, 2);
assert_eq!(stats.read_ops, 2);
assert_eq!(stats.sync_ops, 1);
assert_eq!(stats.write_bytes, 10); assert_eq!(stats.read_bytes, 150); }
#[test]
fn test_io_batch_pending_ops() {
let mut batch = IoBatch::new();
batch.write(0, Bytes::from_static(b"data1"));
batch.write(5, Bytes::from_static(b"data2"));
batch.read(100, 50);
batch.sync();
assert_eq!(batch.pending_write_ops(), 2);
assert_eq!(batch.pending_read_ops(), 1);
}
}