Skip to main content

rivven_core/
io_uring.rs

1//! High-performance async I/O with optional io_uring backend
2//!
3//! This module provides storage I/O operations optimized for throughput:
4//!
5//! - **Batched operations**: Multiple I/O operations coalesced per syscall
6//! - **Write-ahead log**: Production WAL with batch + direct write modes
7//! - **Segment reader**: Efficient segment file reading with statistics
8//!
9//! On Linux kernel 5.6+, a future io_uring backend can replace the standard
10//! file I/O with true kernel-async I/O for zero-syscall overhead. The current
11//! implementation uses `parking_lot::Mutex<std::fs::File>` as a portable
12//! fallback that works on all platforms.
13//!
14//! # Performance
15//!
16//! | Backend        | IOPS (4KB) | Latency p99 | Notes                  |
17//! |----------------|------------|-------------|------------------------|
18//! | Fallback (std) | ~100K      | ~2ms        | Portable, all platforms|
19//! | io_uring       | ~800K      | ~0.3ms      | Linux 5.6+ (planned)   |
20//!
21//! # Example
22//!
23//! ```rust,ignore
24//! use rivven_core::io_uring::{IoUringConfig, BlockingWriter};
25//!
26//! let config = IoUringConfig::default();
27//! let writer = BlockingWriter::new("/data/segment.log", config)?;
28//!
29//! writer.write(b"entry data")?;
30//! writer.flush()?;
31//! ```
32//!
33//! # Feature Detection
34//!
35//! Use `is_io_uring_available()` to check runtime support. When true,
36//! future versions will transparently use io_uring for registered operations.
37
38use std::collections::VecDeque;
39use std::fs::File;
40use std::io::{self, Read, Seek, SeekFrom, Write};
41use std::path::Path;
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::Arc;
44
45use bytes::{Bytes, BytesMut};
46use parking_lot::Mutex;
47
48// ============================================================================
49// Configuration
50// ============================================================================
51
52/// Configuration for io_uring operations
53#[derive(Debug, Clone)]
54pub struct IoUringConfig {
55    /// Submission queue size (power of 2, default: 1024)
56    pub sq_entries: u32,
57    /// Enable kernel-side polling (SQPOLL)
58    pub kernel_poll: bool,
59    /// Kernel poll idle timeout (milliseconds)
60    pub sq_poll_idle_ms: u32,
61    /// Maximum in-flight operations
62    pub max_inflight: usize,
63    /// Direct I/O (bypasses page cache)
64    pub direct_io: bool,
65    /// Pre-registered buffers for zero-copy
66    pub registered_buffers: usize,
67    /// Buffer size for registered buffers
68    pub buffer_size: usize,
69}
70
71impl Default for IoUringConfig {
72    fn default() -> Self {
73        Self {
74            sq_entries: 1024,
75            kernel_poll: false, // Requires CAP_SYS_NICE
76            sq_poll_idle_ms: 10,
77            max_inflight: 256,
78            direct_io: false,
79            registered_buffers: 64,
80            buffer_size: 64 * 1024, // 64KB
81        }
82    }
83}
84
85impl IoUringConfig {
86    /// Configuration optimized for high-throughput writes
87    pub fn high_throughput() -> Self {
88        Self {
89            sq_entries: 4096,
90            kernel_poll: true,
91            sq_poll_idle_ms: 100,
92            max_inflight: 1024,
93            direct_io: true,
94            registered_buffers: 256,
95            buffer_size: 256 * 1024, // 256KB
96        }
97    }
98
99    /// Configuration optimized for low-latency
100    pub fn low_latency() -> Self {
101        Self {
102            sq_entries: 256,
103            kernel_poll: true,
104            sq_poll_idle_ms: 1,
105            max_inflight: 64,
106            direct_io: true,
107            registered_buffers: 32,
108            buffer_size: 16 * 1024, // 16KB
109        }
110    }
111
112    /// Configuration for resource-constrained environments
113    pub fn minimal() -> Self {
114        Self {
115            sq_entries: 128,
116            kernel_poll: false,
117            sq_poll_idle_ms: 0,
118            max_inflight: 32,
119            direct_io: false,
120            registered_buffers: 8,
121            buffer_size: 4 * 1024, // 4KB
122        }
123    }
124}
125
126// ============================================================================
127// io_uring Availability Detection
128// ============================================================================
129
130/// Check if io_uring is available on this system
131#[cfg(target_os = "linux")]
132pub fn is_io_uring_available() -> bool {
133    // Read kernel version from /proc/sys/kernel/osrelease (no libc or subprocess needed)
134    let version = match std::fs::read_to_string("/proc/sys/kernel/osrelease") {
135        Ok(v) => v,
136        Err(_) => return false,
137    };
138    let version = version.trim();
139    let parts: Vec<&str> = version.split('.').collect();
140
141    if parts.len() < 2 {
142        return false;
143    }
144
145    let major: u32 = parts[0].parse().unwrap_or(0);
146    let minor: u32 = parts[1]
147        .split('-')
148        .next()
149        .unwrap_or("0")
150        .parse()
151        .unwrap_or(0);
152
153    // io_uring available on Linux 5.6+
154    major > 5 || (major == 5 && minor >= 6)
155}
156
157#[cfg(not(target_os = "linux"))]
158pub fn is_io_uring_available() -> bool {
159    false
160}
161
162// ============================================================================
163// Statistics
164// ============================================================================
165
166/// io_uring operation statistics
167#[derive(Debug, Default)]
168pub struct IoUringStats {
169    /// Total operations submitted
170    pub ops_submitted: AtomicU64,
171    /// Total operations completed
172    pub ops_completed: AtomicU64,
173    /// Bytes written
174    pub bytes_written: AtomicU64,
175    /// Bytes read
176    pub bytes_read: AtomicU64,
177    /// CQE overflows (ring full)
178    pub cqe_overflows: AtomicU64,
179    /// SQ entries dropped
180    pub sq_dropped: AtomicU64,
181}
182
183impl IoUringStats {
184    /// Create a new statistics tracker
185    pub fn new() -> Self {
186        Self::default()
187    }
188
189    /// Get a snapshot of current statistics
190    pub fn snapshot(&self) -> IoUringStatsSnapshot {
191        IoUringStatsSnapshot {
192            ops_submitted: self.ops_submitted.load(Ordering::Relaxed),
193            ops_completed: self.ops_completed.load(Ordering::Relaxed),
194            bytes_written: self.bytes_written.load(Ordering::Relaxed),
195            bytes_read: self.bytes_read.load(Ordering::Relaxed),
196            cqe_overflows: self.cqe_overflows.load(Ordering::Relaxed),
197            sq_dropped: self.sq_dropped.load(Ordering::Relaxed),
198        }
199    }
200}
201
202/// Snapshot of io_uring statistics
203#[derive(Debug, Clone)]
204pub struct IoUringStatsSnapshot {
205    pub ops_submitted: u64,
206    pub ops_completed: u64,
207    pub bytes_written: u64,
208    pub bytes_read: u64,
209    pub cqe_overflows: u64,
210    pub sq_dropped: u64,
211}
212
213impl IoUringStatsSnapshot {
214    /// Get number of in-flight operations
215    pub fn in_flight(&self) -> u64 {
216        self.ops_submitted.saturating_sub(self.ops_completed)
217    }
218
219    /// Get completion rate (0.0 to 1.0)
220    pub fn completion_rate(&self) -> f64 {
221        if self.ops_submitted == 0 {
222            1.0
223        } else {
224            self.ops_completed as f64 / self.ops_submitted as f64
225        }
226    }
227}
228
229// ============================================================================
230// Portable I/O Implementation
231// ============================================================================
232
233/// Blocking writer using mutex-guarded file I/O
234///
235/// This is the portable implementation using `parking_lot::Mutex<std::fs::File>`.
236/// It provides the same API that a future io_uring backend would expose,
237/// allowing transparent upgrades on Linux 5.6+.
238///
239/// # ⚠️ Blocking I/O
240///
241/// All methods (`write`, `flush`, `sync`) perform **synchronous** file I/O
242/// under a blocking mutex. **Do not call from async contexts** (tokio worker
243/// threads) — use `tokio::task::spawn_blocking` to wrap calls, or use the
244/// async `AsyncFile` from `async_io.rs` instead.
245pub struct BlockingWriter {
246    file: Mutex<File>,
247    offset: AtomicU64,
248    stats: Arc<IoUringStats>,
249    #[allow(dead_code)] // Used in future io_uring implementation
250    config: IoUringConfig,
251}
252
253impl BlockingWriter {
254    /// Create a new blocking writer
255    pub fn new(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
256        let file = std::fs::OpenOptions::new()
257            .create(true)
258            .append(true)
259            .open(path)?;
260
261        let offset = file.metadata()?.len();
262
263        Ok(Self {
264            file: Mutex::new(file),
265            offset: AtomicU64::new(offset),
266            stats: Arc::new(IoUringStats::new()),
267            config,
268        })
269    }
270
271    /// Open an existing file for writing
272    pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
273        let file = std::fs::OpenOptions::new().write(true).open(path)?;
274
275        let offset = file.metadata()?.len();
276
277        Ok(Self {
278            file: Mutex::new(file),
279            offset: AtomicU64::new(offset),
280            stats: Arc::new(IoUringStats::new()),
281            config,
282        })
283    }
284
285    /// Write data (queued for batching)
286    pub fn write(&self, data: &[u8]) -> io::Result<u64> {
287        let mut file = self.file.lock();
288        file.write_all(data)?;
289
290        let offset = self.offset.fetch_add(data.len() as u64, Ordering::AcqRel);
291        self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
292        self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
293        self.stats
294            .bytes_written
295            .fetch_add(data.len() as u64, Ordering::Relaxed);
296
297        Ok(offset)
298    }
299
300    /// Flush pending writes
301    pub fn flush(&self) -> io::Result<()> {
302        let mut file = self.file.lock();
303        file.flush()?;
304        file.sync_data()
305    }
306
307    /// Sync data to disk (fdatasync)
308    pub fn sync(&self) -> io::Result<()> {
309        let file = self.file.lock();
310        file.sync_data()
311    }
312
313    /// Get current write offset
314    pub fn offset(&self) -> u64 {
315        self.offset.load(Ordering::Acquire)
316    }
317
318    /// Get statistics
319    pub fn stats(&self) -> IoUringStatsSnapshot {
320        self.stats.snapshot()
321    }
322}
323
324/// Async-compatible reader using mutex-guarded file I/O
325///
326/// Portable implementation; same API as a future io_uring backend.
327pub struct AsyncReader {
328    file: Mutex<File>,
329    stats: Arc<IoUringStats>,
330    #[allow(dead_code)]
331    config: IoUringConfig,
332}
333
334impl AsyncReader {
335    /// Open a file for reading
336    pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
337        let file = std::fs::File::open(path)?;
338
339        Ok(Self {
340            file: Mutex::new(file),
341            stats: Arc::new(IoUringStats::new()),
342            config,
343        })
344    }
345
346    /// Read data at the specified offset
347    pub fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
348        let mut file = self.file.lock();
349        file.seek(SeekFrom::Start(offset))?;
350        let n = file.read(buf)?;
351
352        self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
353        self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
354        self.stats.bytes_read.fetch_add(n as u64, Ordering::Relaxed);
355
356        Ok(n)
357    }
358
359    /// Read exact amount at offset
360    pub fn read_exact_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
361        let mut file = self.file.lock();
362        file.seek(SeekFrom::Start(offset))?;
363        file.read_exact(buf)?;
364
365        self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
366        self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
367        self.stats
368            .bytes_read
369            .fetch_add(buf.len() as u64, Ordering::Relaxed);
370
371        Ok(())
372    }
373
374    /// Get statistics
375    pub fn stats(&self) -> IoUringStatsSnapshot {
376        self.stats.snapshot()
377    }
378}
379
380// ============================================================================
381// Batch Operations
382// ============================================================================
383
384/// Statistics for a batch of I/O operations.
385#[derive(Debug, Clone, Default)]
386pub struct BatchStats {
387    /// Total operations in the batch
388    pub total_ops: u64,
389    /// Number of write operations
390    pub write_ops: u64,
391    /// Number of read operations
392    pub read_ops: u64,
393    /// Number of sync operations
394    pub sync_ops: u64,
395    /// Total bytes to be written
396    pub write_bytes: u64,
397    /// Total bytes to be read
398    pub read_bytes: u64,
399}
400
401/// A batch of I/O operations for efficient submission
402#[derive(Debug, Default)]
403pub struct IoBatch {
404    operations: VecDeque<IoOperation>,
405}
406
407/// A single I/O operation for batching
408#[derive(Debug, Clone)]
409pub enum IoOperation {
410    /// Write data at an offset
411    Write {
412        /// Offset in the file (used for io_uring, ignored in fallback)
413        offset: u64,
414        /// Data to write
415        data: Bytes,
416    },
417    /// Read data from an offset
418    Read {
419        /// Offset to read from
420        offset: u64,
421        /// Number of bytes to read
422        len: usize,
423    },
424    /// Sync/fsync operation
425    Sync,
426}
427
428impl IoBatch {
429    /// Create a new empty batch
430    pub fn new() -> Self {
431        Self {
432            operations: VecDeque::new(),
433        }
434    }
435
436    /// Add a write operation
437    pub fn write(&mut self, offset: u64, data: impl Into<Bytes>) {
438        self.operations.push_back(IoOperation::Write {
439            offset,
440            data: data.into(),
441        });
442    }
443
444    /// Add a read operation
445    pub fn read(&mut self, offset: u64, len: usize) {
446        self.operations.push_back(IoOperation::Read { offset, len });
447    }
448
449    /// Add a sync operation
450    pub fn sync(&mut self) {
451        self.operations.push_back(IoOperation::Sync);
452    }
453
454    /// Get number of pending operations
455    pub fn len(&self) -> usize {
456        self.operations.len()
457    }
458
459    /// Check if batch is empty
460    pub fn is_empty(&self) -> bool {
461        self.operations.is_empty()
462    }
463
464    /// Clear the batch
465    pub fn clear(&mut self) {
466        self.operations.clear();
467    }
468
469    /// Drain operations from the batch for execution
470    pub fn drain(&mut self) -> impl Iterator<Item = IoOperation> + '_ {
471        self.operations.drain(..)
472    }
473
474    /// Get total bytes to be written in this batch
475    pub fn pending_write_bytes(&self) -> u64 {
476        self.operations
477            .iter()
478            .map(|op| match op {
479                IoOperation::Write { data, .. } => data.len() as u64,
480                _ => 0,
481            })
482            .sum()
483    }
484
485    /// Get batch statistics
486    pub fn stats(&self) -> BatchStats {
487        let mut stats = BatchStats::default();
488        for op in &self.operations {
489            stats.total_ops += 1;
490            match op {
491                IoOperation::Write { data, .. } => {
492                    stats.write_ops += 1;
493                    stats.write_bytes += data.len() as u64;
494                }
495                IoOperation::Read { len, .. } => {
496                    stats.read_ops += 1;
497                    stats.read_bytes += *len as u64;
498                }
499                IoOperation::Sync => {
500                    stats.sync_ops += 1;
501                }
502            }
503        }
504        stats
505    }
506
507    /// Get number of pending write operations
508    pub fn pending_write_ops(&self) -> usize {
509        self.operations
510            .iter()
511            .filter(|op| matches!(op, IoOperation::Write { .. }))
512            .count()
513    }
514
515    /// Get number of pending read operations
516    pub fn pending_read_ops(&self) -> usize {
517        self.operations
518            .iter()
519            .filter(|op| matches!(op, IoOperation::Read { .. }))
520            .count()
521    }
522}
523
524/// Result of a batch read operation
525#[derive(Debug, Clone)]
526pub struct BatchReadResult {
527    /// Offset where data was read from
528    pub offset: u64,
529    /// The data that was read
530    pub data: BytesMut,
531}
532
533// ============================================================================
534// Batch Executor (Portable Implementation)
535// ============================================================================
536
537/// Executes batched I/O operations
538///
539/// Uses sequential file I/O on all platforms. On Linux 5.6+, a future
540/// io_uring backend would submit all operations via the kernel submission
541/// queue for true async batched I/O.
542pub struct BatchExecutor {
543    writer: Option<BlockingWriter>,
544    reader: Option<AsyncReader>,
545    stats: Arc<IoUringStats>,
546}
547
548impl BatchExecutor {
549    /// Create a new batch executor for writing
550    pub fn for_writer(writer: BlockingWriter) -> Self {
551        Self {
552            stats: writer.stats.clone(),
553            writer: Some(writer),
554            reader: None,
555        }
556    }
557
558    /// Create a new batch executor for reading
559    pub fn for_reader(reader: AsyncReader) -> Self {
560        Self {
561            stats: reader.stats.clone(),
562            writer: None,
563            reader: Some(reader),
564        }
565    }
566
567    /// Execute all operations in a batch
568    ///
569    /// Returns a vector of read results (for read operations) or errors
570    pub fn execute(&self, batch: &mut IoBatch) -> io::Result<Vec<BatchReadResult>> {
571        let mut read_results = Vec::new();
572
573        for op in batch.drain() {
574            match op {
575                IoOperation::Write { offset: _, data } => {
576                    if let Some(ref writer) = self.writer {
577                        writer.write(&data)?;
578                    } else {
579                        return Err(io::Error::new(
580                            io::ErrorKind::InvalidInput,
581                            "No writer configured for batch executor",
582                        ));
583                    }
584                }
585                IoOperation::Read { offset, len } => {
586                    if let Some(ref reader) = self.reader {
587                        let mut buf = BytesMut::zeroed(len);
588                        let n = reader.read_at(offset, &mut buf)?;
589                        buf.truncate(n);
590                        read_results.push(BatchReadResult { offset, data: buf });
591                    } else {
592                        return Err(io::Error::new(
593                            io::ErrorKind::InvalidInput,
594                            "No reader configured for batch executor",
595                        ));
596                    }
597                }
598                IoOperation::Sync => {
599                    if let Some(ref writer) = self.writer {
600                        writer.sync()?;
601                    }
602                }
603            }
604        }
605
606        Ok(read_results)
607    }
608
609    /// Get statistics
610    pub fn stats(&self) -> IoUringStatsSnapshot {
611        self.stats.snapshot()
612    }
613}
614
615// ============================================================================
616// Write-Ahead Log Optimization
617// ============================================================================
618
619/// Optimized WAL writer using io_uring or fallback
620///
621/// This writer supports both direct writes and batched operations.
622/// Batched writes can improve throughput by reducing syscall overhead.
623///
624/// # Batching Mode
625///
626/// When using batched mode:
627/// 1. Writes are queued in a batch
628/// 2. When batch reaches max_batch_bytes or flush_batch() is called, all writes execute
629/// 3. Sync is deferred until batch execution
630///
631/// # Example
632///
633/// ```ignore
634/// let wal = PortableWalWriter::new("wal.log", IoUringConfig::default())?;
635///
636/// // Direct write (immediate)
637/// wal.append(b"entry1")?;
638///
639/// // Batched write (queued)
640/// wal.append_batched(b"entry2")?;
641/// wal.append_batched(b"entry3")?;
642/// wal.flush_batch()?; // Execute all batched writes
643/// ```
644pub struct PortableWalWriter {
645    writer: BlockingWriter,
646    batch: Mutex<IoBatch>,
647    pending_bytes: AtomicU64,
648    max_batch_bytes: u64,
649}
650
651impl PortableWalWriter {
652    /// Create a new WAL writer
653    pub fn new(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
654        let max_batch_bytes = (config.registered_buffers * config.buffer_size) as u64;
655
656        Ok(Self {
657            writer: BlockingWriter::new(path, config)?,
658            batch: Mutex::new(IoBatch::new()),
659            pending_bytes: AtomicU64::new(0),
660            max_batch_bytes,
661        })
662    }
663
664    /// Append data to the WAL (direct write, immediate)
665    pub fn append(&self, data: &[u8]) -> io::Result<u64> {
666        self.writer.write(data)
667    }
668
669    /// Append data to the WAL in batched mode
670    ///
671    /// The write is queued and executed when:
672    /// - `flush_batch()` is called
673    /// - The batch exceeds `max_batch_bytes`
674    ///
675    /// Returns the number of pending bytes in the batch
676    pub fn append_batched(&self, data: &[u8]) -> io::Result<u64> {
677        let data_len = data.len() as u64;
678        let offset = self.writer.offset();
679
680        {
681            let mut batch = self.batch.lock();
682            batch.write(offset, Bytes::copy_from_slice(data));
683        }
684
685        let pending = self.pending_bytes.fetch_add(data_len, Ordering::AcqRel) + data_len;
686
687        // Auto-flush if we've exceeded the batch threshold
688        if pending >= self.max_batch_bytes {
689            self.flush_batch()?;
690        }
691
692        Ok(pending)
693    }
694
695    /// Flush all batched writes to disk
696    ///
697    /// Executes all pending write operations in the batch and syncs to disk.
698    pub fn flush_batch(&self) -> io::Result<()> {
699        let mut batch = self.batch.lock();
700
701        if batch.is_empty() {
702            return Ok(());
703        }
704
705        // Execute all operations
706        for op in batch.drain() {
707            match op {
708                IoOperation::Write { data, .. } => {
709                    self.writer.write(&data)?;
710                }
711                IoOperation::Sync => {
712                    self.writer.sync()?;
713                }
714                IoOperation::Read { .. } => {
715                    // WAL writer doesn't support reads in batch
716                }
717            }
718        }
719
720        self.pending_bytes.store(0, Ordering::Release);
721        self.writer.flush()?;
722        self.writer.sync()
723    }
724
725    /// Get the number of pending bytes in the batch
726    pub fn pending_batch_bytes(&self) -> u64 {
727        self.pending_bytes.load(Ordering::Acquire)
728    }
729
730    /// Get the number of pending operations in the batch
731    pub fn pending_batch_ops(&self) -> usize {
732        self.batch.lock().len()
733    }
734
735    /// Check if there are pending batched writes
736    pub fn has_pending_batch(&self) -> bool {
737        !self.batch.lock().is_empty()
738    }
739
740    /// Append with CRC32 checksum
741    pub fn append_with_checksum(&self, data: &[u8]) -> io::Result<u64> {
742        let checksum = crc32fast::hash(data);
743
744        let mut buf = Vec::with_capacity(4 + data.len() + 4);
745        buf.extend_from_slice(&(data.len() as u32).to_be_bytes());
746        buf.extend_from_slice(data);
747        buf.extend_from_slice(&checksum.to_be_bytes());
748
749        self.writer.write(&buf)
750    }
751
752    /// Append with CRC32 checksum in batched mode
753    pub fn append_with_checksum_batched(&self, data: &[u8]) -> io::Result<u64> {
754        let checksum = crc32fast::hash(data);
755
756        let mut buf = Vec::with_capacity(4 + data.len() + 4);
757        buf.extend_from_slice(&(data.len() as u32).to_be_bytes());
758        buf.extend_from_slice(data);
759        buf.extend_from_slice(&checksum.to_be_bytes());
760
761        self.append_batched(&buf)
762    }
763
764    /// Flush and sync the WAL
765    pub fn sync(&self) -> io::Result<()> {
766        // First flush any pending batch
767        self.flush_batch()?;
768        self.writer.flush()?;
769        self.writer.sync()
770    }
771
772    /// Get current WAL size
773    pub fn size(&self) -> u64 {
774        self.writer.offset()
775    }
776
777    /// Get max batch bytes threshold
778    pub fn max_batch_bytes(&self) -> u64 {
779        self.max_batch_bytes
780    }
781
782    /// Get statistics
783    pub fn stats(&self) -> IoUringStatsSnapshot {
784        self.writer.stats()
785    }
786}
787
788// ============================================================================
789// Segment File I/O
790// ============================================================================
791
792/// Optimized segment reader
793pub struct SegmentReader {
794    reader: AsyncReader,
795    length: u64,
796}
797
798impl SegmentReader {
799    /// Open a segment file for reading
800    pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
801        let metadata = std::fs::metadata(&path)?;
802        let length = metadata.len();
803
804        Ok(Self {
805            reader: AsyncReader::open(path, config)?,
806            length,
807        })
808    }
809
810    /// Read messages starting at offset
811    pub fn read_messages(&self, offset: u64, max_bytes: usize) -> io::Result<BytesMut> {
812        let mut buf = BytesMut::zeroed(max_bytes);
813        let n = self.reader.read_at(offset, &mut buf)?;
814        buf.truncate(n);
815        Ok(buf)
816    }
817
818    /// Read a specific range
819    pub fn read_range(&self, offset: u64, len: usize) -> io::Result<BytesMut> {
820        let mut buf = BytesMut::zeroed(len);
821        self.reader.read_exact_at(offset, &mut buf)?;
822        Ok(buf)
823    }
824
825    /// Get segment length
826    pub fn len(&self) -> u64 {
827        self.length
828    }
829
830    /// Check if segment is empty
831    pub fn is_empty(&self) -> bool {
832        self.length == 0
833    }
834
835    /// Get statistics
836    pub fn stats(&self) -> IoUringStatsSnapshot {
837        self.reader.stats()
838    }
839}
840
841// ============================================================================
842// Tests
843// ============================================================================
844
845#[cfg(test)]
846mod tests {
847    use super::*;
848    use tempfile::tempdir;
849
850    #[test]
851    fn test_config_defaults() {
852        let config = IoUringConfig::default();
853        assert_eq!(config.sq_entries, 1024);
854        assert!(!config.kernel_poll);
855        assert_eq!(config.max_inflight, 256);
856    }
857
858    #[test]
859    fn test_config_high_throughput() {
860        let config = IoUringConfig::high_throughput();
861        assert_eq!(config.sq_entries, 4096);
862        assert!(config.kernel_poll);
863        assert!(config.direct_io);
864    }
865
866    #[test]
867    fn test_config_low_latency() {
868        let config = IoUringConfig::low_latency();
869        assert_eq!(config.sq_entries, 256);
870        assert!(config.kernel_poll);
871    }
872
873    #[test]
874    fn test_stats_snapshot() {
875        let stats = IoUringStats::new();
876        stats.ops_submitted.store(100, Ordering::Relaxed);
877        stats.ops_completed.store(95, Ordering::Relaxed);
878        stats.bytes_written.store(10000, Ordering::Relaxed);
879
880        let snapshot = stats.snapshot();
881        assert_eq!(snapshot.in_flight(), 5);
882        assert!((snapshot.completion_rate() - 0.95).abs() < 0.001);
883    }
884
885    #[test]
886    fn test_async_writer() {
887        let dir = tempdir().unwrap();
888        let path = dir.path().join("test.log");
889
890        let config = IoUringConfig::minimal();
891        let writer = BlockingWriter::new(&path, config).unwrap();
892
893        let offset = writer.write(b"hello").unwrap();
894        assert_eq!(offset, 0);
895
896        let offset = writer.write(b"world").unwrap();
897        assert_eq!(offset, 5);
898
899        writer.flush().unwrap();
900
901        let stats = writer.stats();
902        assert_eq!(stats.ops_completed, 2);
903        assert_eq!(stats.bytes_written, 10);
904    }
905
906    #[test]
907    fn test_async_reader() {
908        let dir = tempdir().unwrap();
909        let path = dir.path().join("test.log");
910
911        // Write some data first
912        std::fs::write(&path, b"hello world test data").unwrap();
913
914        let config = IoUringConfig::minimal();
915        let reader = AsyncReader::open(&path, config).unwrap();
916
917        let mut buf = [0u8; 5];
918        let n = reader.read_at(0, &mut buf).unwrap();
919        assert_eq!(n, 5);
920        assert_eq!(&buf, b"hello");
921
922        let mut buf = [0u8; 5];
923        reader.read_exact_at(6, &mut buf).unwrap();
924        assert_eq!(&buf, b"world");
925
926        let stats = reader.stats();
927        assert_eq!(stats.ops_completed, 2);
928    }
929
930    #[test]
931    fn test_io_batch() {
932        let mut batch = IoBatch::new();
933        assert!(batch.is_empty());
934
935        batch.write(0, Bytes::from_static(b"hello"));
936        batch.read(100, 50);
937        batch.sync();
938
939        assert_eq!(batch.len(), 3);
940        assert!(!batch.is_empty());
941
942        batch.clear();
943        assert!(batch.is_empty());
944    }
945
946    #[test]
947    fn test_wal_writer() {
948        let dir = tempdir().unwrap();
949        let path = dir.path().join("wal.log");
950
951        let config = IoUringConfig::minimal();
952        let wal = PortableWalWriter::new(&path, config).unwrap();
953
954        let offset = wal.append(b"entry1").unwrap();
955        assert_eq!(offset, 0);
956
957        let offset = wal.append_with_checksum(b"entry2").unwrap();
958        assert!(offset > 0);
959
960        wal.sync().unwrap();
961        assert!(wal.size() > 0);
962    }
963
964    #[test]
965    fn test_segment_reader() {
966        let dir = tempdir().unwrap();
967        let path = dir.path().join("segment.log");
968
969        std::fs::write(&path, b"message1message2message3").unwrap();
970
971        let config = IoUringConfig::minimal();
972        let reader = SegmentReader::open(&path, config).unwrap();
973
974        assert_eq!(reader.len(), 24);
975        assert!(!reader.is_empty());
976
977        let data = reader.read_messages(0, 100).unwrap();
978        assert_eq!(&data[..], b"message1message2message3");
979
980        let data = reader.read_range(8, 8).unwrap();
981        assert_eq!(&data[..], b"message2");
982    }
983
984    #[test]
985    fn test_io_uring_availability() {
986        // This just checks the function runs without panicking
987        let available = is_io_uring_available();
988        println!("io_uring available: {}", available);
989    }
990
991    // ========================================================================
992    // Batch Operation Tests
993    // ========================================================================
994
995    #[test]
996    fn test_io_batch_pending_write_bytes() {
997        let mut batch = IoBatch::new();
998        assert_eq!(batch.pending_write_bytes(), 0);
999
1000        batch.write(0, Bytes::from_static(b"hello"));
1001        batch.write(5, Bytes::from_static(b"world"));
1002        batch.read(100, 50); // Read doesn't contribute to write bytes
1003
1004        assert_eq!(batch.pending_write_bytes(), 10);
1005    }
1006
1007    #[test]
1008    fn test_io_batch_drain() {
1009        let mut batch = IoBatch::new();
1010        batch.write(0, Bytes::from_static(b"hello"));
1011        batch.sync();
1012
1013        let ops: Vec<_> = batch.drain().collect();
1014        assert_eq!(ops.len(), 2);
1015        assert!(batch.is_empty());
1016    }
1017
1018    #[test]
1019    fn test_batch_executor_write() {
1020        let dir = tempdir().unwrap();
1021        let path = dir.path().join("batch_write.log");
1022
1023        let config = IoUringConfig::minimal();
1024        let writer = BlockingWriter::new(&path, config).unwrap();
1025        let executor = BatchExecutor::for_writer(writer);
1026
1027        let mut batch = IoBatch::new();
1028        batch.write(0, Bytes::from_static(b"hello"));
1029        batch.write(5, Bytes::from_static(b"world"));
1030        batch.sync();
1031
1032        let results = executor.execute(&mut batch).unwrap();
1033        assert!(results.is_empty()); // No read results for write operations
1034
1035        // Verify file contents
1036        let contents = std::fs::read(&path).unwrap();
1037        assert_eq!(&contents, b"helloworld");
1038    }
1039
1040    #[test]
1041    fn test_batch_executor_read() {
1042        let dir = tempdir().unwrap();
1043        let path = dir.path().join("batch_read.log");
1044        std::fs::write(&path, b"hello world test data").unwrap();
1045
1046        let config = IoUringConfig::minimal();
1047        let reader = AsyncReader::open(&path, config).unwrap();
1048        let executor = BatchExecutor::for_reader(reader);
1049
1050        let mut batch = IoBatch::new();
1051        batch.read(0, 5);
1052        batch.read(6, 5);
1053
1054        let results = executor.execute(&mut batch).unwrap();
1055        assert_eq!(results.len(), 2);
1056        assert_eq!(&results[0].data[..], b"hello");
1057        assert_eq!(results[0].offset, 0);
1058        assert_eq!(&results[1].data[..], b"world");
1059        assert_eq!(results[1].offset, 6);
1060    }
1061
1062    #[test]
1063    fn test_wal_writer_batched() {
1064        let dir = tempdir().unwrap();
1065        let path = dir.path().join("wal_batch.log");
1066
1067        let config = IoUringConfig::minimal();
1068        let wal = PortableWalWriter::new(&path, config).unwrap();
1069
1070        // Direct write
1071        wal.append(b"direct").unwrap();
1072
1073        // Batched writes
1074        wal.append_batched(b"batch1").unwrap();
1075        wal.append_batched(b"batch2").unwrap();
1076
1077        assert!(wal.has_pending_batch());
1078        assert_eq!(wal.pending_batch_ops(), 2);
1079        assert_eq!(wal.pending_batch_bytes(), 12);
1080
1081        // Flush the batch
1082        wal.flush_batch().unwrap();
1083
1084        assert!(!wal.has_pending_batch());
1085        assert_eq!(wal.pending_batch_bytes(), 0);
1086
1087        // Verify file size includes all writes
1088        assert!(wal.size() >= 18); // "direct" + "batch1" + "batch2"
1089    }
1090
1091    #[test]
1092    fn test_wal_writer_batched_checksum() {
1093        let dir = tempdir().unwrap();
1094        let path = dir.path().join("wal_batch_crc.log");
1095
1096        let config = IoUringConfig::minimal();
1097        let wal = PortableWalWriter::new(&path, config).unwrap();
1098
1099        // Batched write with checksum
1100        wal.append_with_checksum_batched(b"data1").unwrap();
1101        wal.append_with_checksum_batched(b"data2").unwrap();
1102
1103        assert!(wal.has_pending_batch());
1104        wal.sync().unwrap(); // sync() calls flush_batch()
1105
1106        assert!(!wal.has_pending_batch());
1107        assert!(wal.size() > 0);
1108    }
1109
1110    #[test]
1111    fn test_wal_writer_auto_flush() {
1112        let dir = tempdir().unwrap();
1113        let path = dir.path().join("wal_auto_flush.log");
1114
1115        // Create a config with a very small batch threshold
1116        let mut config = IoUringConfig::minimal();
1117        config.registered_buffers = 1;
1118        config.buffer_size = 10; // 10 bytes max batch
1119
1120        let wal = PortableWalWriter::new(&path, config).unwrap();
1121        assert_eq!(wal.max_batch_bytes(), 10);
1122
1123        // First batch should not auto-flush
1124        wal.append_batched(b"hello").unwrap(); // 5 bytes
1125        assert!(wal.has_pending_batch());
1126
1127        // Second batch should trigger auto-flush (5 + 6 = 11 > 10)
1128        wal.append_batched(b"world!").unwrap();
1129        assert!(!wal.has_pending_batch()); // Auto-flushed
1130    }
1131
1132    #[test]
1133    fn test_io_batch_stats() {
1134        let mut batch = IoBatch::new();
1135
1136        // Empty batch
1137        let stats = batch.stats();
1138        assert_eq!(stats.total_ops, 0);
1139        assert_eq!(stats.write_ops, 0);
1140        assert_eq!(stats.read_ops, 0);
1141        assert_eq!(stats.sync_ops, 0);
1142
1143        // Add mixed operations
1144        batch.write(0, Bytes::from_static(b"hello"));
1145        batch.write(5, Bytes::from_static(b"world"));
1146        batch.read(100, 50);
1147        batch.read(200, 100);
1148        batch.sync();
1149
1150        let stats = batch.stats();
1151        assert_eq!(stats.total_ops, 5);
1152        assert_eq!(stats.write_ops, 2);
1153        assert_eq!(stats.read_ops, 2);
1154        assert_eq!(stats.sync_ops, 1);
1155        assert_eq!(stats.write_bytes, 10); // "hello" + "world"
1156        assert_eq!(stats.read_bytes, 150); // 50 + 100
1157    }
1158
1159    #[test]
1160    fn test_io_batch_pending_ops() {
1161        let mut batch = IoBatch::new();
1162
1163        batch.write(0, Bytes::from_static(b"data1"));
1164        batch.write(5, Bytes::from_static(b"data2"));
1165        batch.read(100, 50);
1166        batch.sync();
1167
1168        assert_eq!(batch.pending_write_ops(), 2);
1169        assert_eq!(batch.pending_read_ops(), 1);
1170    }
1171}