Skip to main content

rivven_core/
io_uring.rs

1//! High-performance async I/O with optional io_uring backend
2//!
3//! This module provides storage I/O operations optimized for throughput:
4//!
5//! - **Batched operations**: Multiple I/O operations coalesced per syscall
6//! - **Write-ahead log**: Production WAL with batch + direct write modes
7//! - **Segment reader**: Efficient segment file reading with statistics
8//!
9//! On Linux kernel 5.6+, a future io_uring backend can replace the standard
10//! file I/O with true kernel-async I/O for zero-syscall overhead. The current
11//! implementation uses `parking_lot::Mutex<std::fs::File>` as a portable
12//! fallback that works on all platforms.
13//!
14//! # Performance
15//!
16//! | Backend        | IOPS (4KB) | Latency p99 | Notes                  |
17//! |----------------|------------|-------------|------------------------|
18//! | Fallback (std) | ~100K      | ~2ms        | Portable, all platforms|
19//! | io_uring       | ~800K      | ~0.3ms      | Linux 5.6+ (planned)   |
20//!
21//! # Example
22//!
23//! ```rust,ignore
24//! use rivven_core::io_uring::{IoUringConfig, AsyncWriter};
25//!
26//! let config = IoUringConfig::default();
27//! let writer = AsyncWriter::new("/data/segment.log", config)?;
28//!
29//! writer.write(b"entry data")?;
30//! writer.flush()?;
31//! ```
32//!
33//! # Feature Detection
34//!
35//! Use `is_io_uring_available()` to check runtime support. When true,
36//! future versions will transparently use io_uring for registered operations.
37
38use std::collections::VecDeque;
39use std::fs::File;
40use std::io::{self, Read, Seek, SeekFrom, Write};
41use std::path::Path;
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::Arc;
44
45use bytes::{Bytes, BytesMut};
46use parking_lot::Mutex;
47
48// ============================================================================
49// Configuration
50// ============================================================================
51
52/// Configuration for io_uring operations
53#[derive(Debug, Clone)]
54pub struct IoUringConfig {
55    /// Submission queue size (power of 2, default: 1024)
56    pub sq_entries: u32,
57    /// Enable kernel-side polling (SQPOLL)
58    pub kernel_poll: bool,
59    /// Kernel poll idle timeout (milliseconds)
60    pub sq_poll_idle_ms: u32,
61    /// Maximum in-flight operations
62    pub max_inflight: usize,
63    /// Direct I/O (bypasses page cache)
64    pub direct_io: bool,
65    /// Pre-registered buffers for zero-copy
66    pub registered_buffers: usize,
67    /// Buffer size for registered buffers
68    pub buffer_size: usize,
69}
70
71impl Default for IoUringConfig {
72    fn default() -> Self {
73        Self {
74            sq_entries: 1024,
75            kernel_poll: false, // Requires CAP_SYS_NICE
76            sq_poll_idle_ms: 10,
77            max_inflight: 256,
78            direct_io: false,
79            registered_buffers: 64,
80            buffer_size: 64 * 1024, // 64KB
81        }
82    }
83}
84
85impl IoUringConfig {
86    /// Configuration optimized for high-throughput writes
87    pub fn high_throughput() -> Self {
88        Self {
89            sq_entries: 4096,
90            kernel_poll: true,
91            sq_poll_idle_ms: 100,
92            max_inflight: 1024,
93            direct_io: true,
94            registered_buffers: 256,
95            buffer_size: 256 * 1024, // 256KB
96        }
97    }
98
99    /// Configuration optimized for low-latency
100    pub fn low_latency() -> Self {
101        Self {
102            sq_entries: 256,
103            kernel_poll: true,
104            sq_poll_idle_ms: 1,
105            max_inflight: 64,
106            direct_io: true,
107            registered_buffers: 32,
108            buffer_size: 16 * 1024, // 16KB
109        }
110    }
111
112    /// Configuration for resource-constrained environments
113    pub fn minimal() -> Self {
114        Self {
115            sq_entries: 128,
116            kernel_poll: false,
117            sq_poll_idle_ms: 0,
118            max_inflight: 32,
119            direct_io: false,
120            registered_buffers: 8,
121            buffer_size: 4 * 1024, // 4KB
122        }
123    }
124}
125
126// ============================================================================
127// io_uring Availability Detection
128// ============================================================================
129
130/// Check if io_uring is available on this system
131#[cfg(target_os = "linux")]
132pub fn is_io_uring_available() -> bool {
133    // Read kernel version from /proc/sys/kernel/osrelease (no libc or subprocess needed)
134    let version = match std::fs::read_to_string("/proc/sys/kernel/osrelease") {
135        Ok(v) => v,
136        Err(_) => return false,
137    };
138    let version = version.trim();
139    let parts: Vec<&str> = version.split('.').collect();
140
141    if parts.len() < 2 {
142        return false;
143    }
144
145    let major: u32 = parts[0].parse().unwrap_or(0);
146    let minor: u32 = parts[1]
147        .split('-')
148        .next()
149        .unwrap_or("0")
150        .parse()
151        .unwrap_or(0);
152
153    // io_uring available on Linux 5.6+
154    major > 5 || (major == 5 && minor >= 6)
155}
156
157#[cfg(not(target_os = "linux"))]
158pub fn is_io_uring_available() -> bool {
159    false
160}
161
162// ============================================================================
163// Statistics
164// ============================================================================
165
166/// io_uring operation statistics
167#[derive(Debug, Default)]
168pub struct IoUringStats {
169    /// Total operations submitted
170    pub ops_submitted: AtomicU64,
171    /// Total operations completed
172    pub ops_completed: AtomicU64,
173    /// Bytes written
174    pub bytes_written: AtomicU64,
175    /// Bytes read
176    pub bytes_read: AtomicU64,
177    /// CQE overflows (ring full)
178    pub cqe_overflows: AtomicU64,
179    /// SQ entries dropped
180    pub sq_dropped: AtomicU64,
181}
182
183impl IoUringStats {
184    /// Create a new statistics tracker
185    pub fn new() -> Self {
186        Self::default()
187    }
188
189    /// Get a snapshot of current statistics
190    pub fn snapshot(&self) -> IoUringStatsSnapshot {
191        IoUringStatsSnapshot {
192            ops_submitted: self.ops_submitted.load(Ordering::Relaxed),
193            ops_completed: self.ops_completed.load(Ordering::Relaxed),
194            bytes_written: self.bytes_written.load(Ordering::Relaxed),
195            bytes_read: self.bytes_read.load(Ordering::Relaxed),
196            cqe_overflows: self.cqe_overflows.load(Ordering::Relaxed),
197            sq_dropped: self.sq_dropped.load(Ordering::Relaxed),
198        }
199    }
200}
201
202/// Snapshot of io_uring statistics
203#[derive(Debug, Clone)]
204pub struct IoUringStatsSnapshot {
205    pub ops_submitted: u64,
206    pub ops_completed: u64,
207    pub bytes_written: u64,
208    pub bytes_read: u64,
209    pub cqe_overflows: u64,
210    pub sq_dropped: u64,
211}
212
213impl IoUringStatsSnapshot {
214    /// Get number of in-flight operations
215    pub fn in_flight(&self) -> u64 {
216        self.ops_submitted.saturating_sub(self.ops_completed)
217    }
218
219    /// Get completion rate (0.0 to 1.0)
220    pub fn completion_rate(&self) -> f64 {
221        if self.ops_submitted == 0 {
222            1.0
223        } else {
224            self.ops_completed as f64 / self.ops_submitted as f64
225        }
226    }
227}
228
229// ============================================================================
230// Portable I/O Implementation
231// ============================================================================
232
233/// Async-compatible writer using mutex-guarded file I/O
234///
235/// This is the portable implementation using `parking_lot::Mutex<std::fs::File>`.
236/// It provides the same API that a future io_uring backend would expose,
237/// allowing transparent upgrades on Linux 5.6+.
238pub struct AsyncWriter {
239    file: Mutex<File>,
240    offset: AtomicU64,
241    stats: Arc<IoUringStats>,
242    #[allow(dead_code)] // Used in future io_uring implementation
243    config: IoUringConfig,
244}
245
246impl AsyncWriter {
247    /// Create a new async writer
248    pub fn new(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
249        let file = std::fs::OpenOptions::new()
250            .create(true)
251            .append(true)
252            .open(path)?;
253
254        let offset = file.metadata()?.len();
255
256        Ok(Self {
257            file: Mutex::new(file),
258            offset: AtomicU64::new(offset),
259            stats: Arc::new(IoUringStats::new()),
260            config,
261        })
262    }
263
264    /// Open an existing file for writing
265    pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
266        let file = std::fs::OpenOptions::new().write(true).open(path)?;
267
268        let offset = file.metadata()?.len();
269
270        Ok(Self {
271            file: Mutex::new(file),
272            offset: AtomicU64::new(offset),
273            stats: Arc::new(IoUringStats::new()),
274            config,
275        })
276    }
277
278    /// Write data (queued for batching)
279    pub fn write(&self, data: &[u8]) -> io::Result<u64> {
280        let mut file = self.file.lock();
281        file.write_all(data)?;
282
283        let offset = self.offset.fetch_add(data.len() as u64, Ordering::AcqRel);
284        self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
285        self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
286        self.stats
287            .bytes_written
288            .fetch_add(data.len() as u64, Ordering::Relaxed);
289
290        Ok(offset)
291    }
292
293    /// Flush pending writes
294    pub fn flush(&self) -> io::Result<()> {
295        let mut file = self.file.lock();
296        file.flush()?;
297        file.sync_data()
298    }
299
300    /// Sync data to disk (fdatasync)
301    pub fn sync(&self) -> io::Result<()> {
302        let file = self.file.lock();
303        file.sync_data()
304    }
305
306    /// Get current write offset
307    pub fn offset(&self) -> u64 {
308        self.offset.load(Ordering::Acquire)
309    }
310
311    /// Get statistics
312    pub fn stats(&self) -> IoUringStatsSnapshot {
313        self.stats.snapshot()
314    }
315}
316
317/// Async-compatible reader using mutex-guarded file I/O
318///
319/// Portable implementation; same API as a future io_uring backend.
320pub struct AsyncReader {
321    file: Mutex<File>,
322    stats: Arc<IoUringStats>,
323    #[allow(dead_code)]
324    config: IoUringConfig,
325}
326
327impl AsyncReader {
328    /// Open a file for reading
329    pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
330        let file = std::fs::File::open(path)?;
331
332        Ok(Self {
333            file: Mutex::new(file),
334            stats: Arc::new(IoUringStats::new()),
335            config,
336        })
337    }
338
339    /// Read data at the specified offset
340    pub fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
341        let mut file = self.file.lock();
342        file.seek(SeekFrom::Start(offset))?;
343        let n = file.read(buf)?;
344
345        self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
346        self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
347        self.stats.bytes_read.fetch_add(n as u64, Ordering::Relaxed);
348
349        Ok(n)
350    }
351
352    /// Read exact amount at offset
353    pub fn read_exact_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
354        let mut file = self.file.lock();
355        file.seek(SeekFrom::Start(offset))?;
356        file.read_exact(buf)?;
357
358        self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
359        self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
360        self.stats
361            .bytes_read
362            .fetch_add(buf.len() as u64, Ordering::Relaxed);
363
364        Ok(())
365    }
366
367    /// Get statistics
368    pub fn stats(&self) -> IoUringStatsSnapshot {
369        self.stats.snapshot()
370    }
371}
372
373// ============================================================================
374// Batch Operations
375// ============================================================================
376
377/// Statistics for a batch of I/O operations.
378#[derive(Debug, Clone, Default)]
379pub struct BatchStats {
380    /// Total operations in the batch
381    pub total_ops: u64,
382    /// Number of write operations
383    pub write_ops: u64,
384    /// Number of read operations
385    pub read_ops: u64,
386    /// Number of sync operations
387    pub sync_ops: u64,
388    /// Total bytes to be written
389    pub write_bytes: u64,
390    /// Total bytes to be read
391    pub read_bytes: u64,
392}
393
394/// A batch of I/O operations for efficient submission
395#[derive(Debug, Default)]
396pub struct IoBatch {
397    operations: VecDeque<IoOperation>,
398}
399
400/// A single I/O operation for batching
401#[derive(Debug, Clone)]
402pub enum IoOperation {
403    /// Write data at an offset
404    Write {
405        /// Offset in the file (used for io_uring, ignored in fallback)
406        offset: u64,
407        /// Data to write
408        data: Bytes,
409    },
410    /// Read data from an offset
411    Read {
412        /// Offset to read from
413        offset: u64,
414        /// Number of bytes to read
415        len: usize,
416    },
417    /// Sync/fsync operation
418    Sync,
419}
420
421impl IoBatch {
422    /// Create a new empty batch
423    pub fn new() -> Self {
424        Self {
425            operations: VecDeque::new(),
426        }
427    }
428
429    /// Add a write operation
430    pub fn write(&mut self, offset: u64, data: impl Into<Bytes>) {
431        self.operations.push_back(IoOperation::Write {
432            offset,
433            data: data.into(),
434        });
435    }
436
437    /// Add a read operation
438    pub fn read(&mut self, offset: u64, len: usize) {
439        self.operations.push_back(IoOperation::Read { offset, len });
440    }
441
442    /// Add a sync operation
443    pub fn sync(&mut self) {
444        self.operations.push_back(IoOperation::Sync);
445    }
446
447    /// Get number of pending operations
448    pub fn len(&self) -> usize {
449        self.operations.len()
450    }
451
452    /// Check if batch is empty
453    pub fn is_empty(&self) -> bool {
454        self.operations.is_empty()
455    }
456
457    /// Clear the batch
458    pub fn clear(&mut self) {
459        self.operations.clear();
460    }
461
462    /// Drain operations from the batch for execution
463    pub fn drain(&mut self) -> impl Iterator<Item = IoOperation> + '_ {
464        self.operations.drain(..)
465    }
466
467    /// Get total bytes to be written in this batch
468    pub fn pending_write_bytes(&self) -> u64 {
469        self.operations
470            .iter()
471            .map(|op| match op {
472                IoOperation::Write { data, .. } => data.len() as u64,
473                _ => 0,
474            })
475            .sum()
476    }
477
478    /// Get batch statistics
479    pub fn stats(&self) -> BatchStats {
480        let mut stats = BatchStats::default();
481        for op in &self.operations {
482            stats.total_ops += 1;
483            match op {
484                IoOperation::Write { data, .. } => {
485                    stats.write_ops += 1;
486                    stats.write_bytes += data.len() as u64;
487                }
488                IoOperation::Read { len, .. } => {
489                    stats.read_ops += 1;
490                    stats.read_bytes += *len as u64;
491                }
492                IoOperation::Sync => {
493                    stats.sync_ops += 1;
494                }
495            }
496        }
497        stats
498    }
499
500    /// Get number of pending write operations
501    pub fn pending_write_ops(&self) -> usize {
502        self.operations
503            .iter()
504            .filter(|op| matches!(op, IoOperation::Write { .. }))
505            .count()
506    }
507
508    /// Get number of pending read operations
509    pub fn pending_read_ops(&self) -> usize {
510        self.operations
511            .iter()
512            .filter(|op| matches!(op, IoOperation::Read { .. }))
513            .count()
514    }
515}
516
517/// Result of a batch read operation
518#[derive(Debug, Clone)]
519pub struct BatchReadResult {
520    /// Offset where data was read from
521    pub offset: u64,
522    /// The data that was read
523    pub data: BytesMut,
524}
525
526// ============================================================================
527// Batch Executor (Portable Implementation)
528// ============================================================================
529
530/// Executes batched I/O operations
531///
532/// Uses sequential file I/O on all platforms. On Linux 5.6+, a future
533/// io_uring backend would submit all operations via the kernel submission
534/// queue for true async batched I/O.
535pub struct BatchExecutor {
536    writer: Option<AsyncWriter>,
537    reader: Option<AsyncReader>,
538    stats: Arc<IoUringStats>,
539}
540
541impl BatchExecutor {
542    /// Create a new batch executor for writing
543    pub fn for_writer(writer: AsyncWriter) -> Self {
544        Self {
545            stats: writer.stats.clone(),
546            writer: Some(writer),
547            reader: None,
548        }
549    }
550
551    /// Create a new batch executor for reading
552    pub fn for_reader(reader: AsyncReader) -> Self {
553        Self {
554            stats: reader.stats.clone(),
555            writer: None,
556            reader: Some(reader),
557        }
558    }
559
560    /// Execute all operations in a batch
561    ///
562    /// Returns a vector of read results (for read operations) or errors
563    pub fn execute(&self, batch: &mut IoBatch) -> io::Result<Vec<BatchReadResult>> {
564        let mut read_results = Vec::new();
565
566        for op in batch.drain() {
567            match op {
568                IoOperation::Write { offset: _, data } => {
569                    if let Some(ref writer) = self.writer {
570                        writer.write(&data)?;
571                    } else {
572                        return Err(io::Error::new(
573                            io::ErrorKind::InvalidInput,
574                            "No writer configured for batch executor",
575                        ));
576                    }
577                }
578                IoOperation::Read { offset, len } => {
579                    if let Some(ref reader) = self.reader {
580                        let mut buf = BytesMut::zeroed(len);
581                        let n = reader.read_at(offset, &mut buf)?;
582                        buf.truncate(n);
583                        read_results.push(BatchReadResult { offset, data: buf });
584                    } else {
585                        return Err(io::Error::new(
586                            io::ErrorKind::InvalidInput,
587                            "No reader configured for batch executor",
588                        ));
589                    }
590                }
591                IoOperation::Sync => {
592                    if let Some(ref writer) = self.writer {
593                        writer.sync()?;
594                    }
595                }
596            }
597        }
598
599        Ok(read_results)
600    }
601
602    /// Get statistics
603    pub fn stats(&self) -> IoUringStatsSnapshot {
604        self.stats.snapshot()
605    }
606}
607
608// ============================================================================
609// Write-Ahead Log Optimization
610// ============================================================================
611
612/// Optimized WAL writer using io_uring or fallback
613///
614/// This writer supports both direct writes and batched operations.
615/// Batched writes can improve throughput by reducing syscall overhead.
616///
617/// # Batching Mode
618///
619/// When using batched mode:
620/// 1. Writes are queued in a batch
621/// 2. When batch reaches max_batch_bytes or flush_batch() is called, all writes execute
622/// 3. Sync is deferred until batch execution
623///
624/// # Example
625///
626/// ```ignore
627/// let wal = WalWriter::new("wal.log", IoUringConfig::default())?;
628///
629/// // Direct write (immediate)
630/// wal.append(b"entry1")?;
631///
632/// // Batched write (queued)
633/// wal.append_batched(b"entry2")?;
634/// wal.append_batched(b"entry3")?;
635/// wal.flush_batch()?; // Execute all batched writes
636/// ```
637pub struct WalWriter {
638    writer: AsyncWriter,
639    batch: Mutex<IoBatch>,
640    pending_bytes: AtomicU64,
641    max_batch_bytes: u64,
642}
643
644impl WalWriter {
645    /// Create a new WAL writer
646    pub fn new(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
647        let max_batch_bytes = (config.registered_buffers * config.buffer_size) as u64;
648
649        Ok(Self {
650            writer: AsyncWriter::new(path, config)?,
651            batch: Mutex::new(IoBatch::new()),
652            pending_bytes: AtomicU64::new(0),
653            max_batch_bytes,
654        })
655    }
656
657    /// Append data to the WAL (direct write, immediate)
658    pub fn append(&self, data: &[u8]) -> io::Result<u64> {
659        self.writer.write(data)
660    }
661
662    /// Append data to the WAL in batched mode
663    ///
664    /// The write is queued and executed when:
665    /// - `flush_batch()` is called
666    /// - The batch exceeds `max_batch_bytes`
667    ///
668    /// Returns the number of pending bytes in the batch
669    pub fn append_batched(&self, data: &[u8]) -> io::Result<u64> {
670        let data_len = data.len() as u64;
671        let offset = self.writer.offset();
672
673        {
674            let mut batch = self.batch.lock();
675            batch.write(offset, Bytes::copy_from_slice(data));
676        }
677
678        let pending = self.pending_bytes.fetch_add(data_len, Ordering::AcqRel) + data_len;
679
680        // Auto-flush if we've exceeded the batch threshold
681        if pending >= self.max_batch_bytes {
682            self.flush_batch()?;
683        }
684
685        Ok(pending)
686    }
687
688    /// Flush all batched writes to disk
689    ///
690    /// Executes all pending write operations in the batch and syncs to disk.
691    pub fn flush_batch(&self) -> io::Result<()> {
692        let mut batch = self.batch.lock();
693
694        if batch.is_empty() {
695            return Ok(());
696        }
697
698        // Execute all operations
699        for op in batch.drain() {
700            match op {
701                IoOperation::Write { data, .. } => {
702                    self.writer.write(&data)?;
703                }
704                IoOperation::Sync => {
705                    self.writer.sync()?;
706                }
707                IoOperation::Read { .. } => {
708                    // WAL writer doesn't support reads in batch
709                }
710            }
711        }
712
713        self.pending_bytes.store(0, Ordering::Release);
714        self.writer.flush()?;
715        self.writer.sync()
716    }
717
718    /// Get the number of pending bytes in the batch
719    pub fn pending_batch_bytes(&self) -> u64 {
720        self.pending_bytes.load(Ordering::Acquire)
721    }
722
723    /// Get the number of pending operations in the batch
724    pub fn pending_batch_ops(&self) -> usize {
725        self.batch.lock().len()
726    }
727
728    /// Check if there are pending batched writes
729    pub fn has_pending_batch(&self) -> bool {
730        !self.batch.lock().is_empty()
731    }
732
733    /// Append with CRC32 checksum
734    pub fn append_with_checksum(&self, data: &[u8]) -> io::Result<u64> {
735        let checksum = crc32fast::hash(data);
736
737        let mut buf = Vec::with_capacity(4 + data.len() + 4);
738        buf.extend_from_slice(&(data.len() as u32).to_be_bytes());
739        buf.extend_from_slice(data);
740        buf.extend_from_slice(&checksum.to_be_bytes());
741
742        self.writer.write(&buf)
743    }
744
745    /// Append with CRC32 checksum in batched mode
746    pub fn append_with_checksum_batched(&self, data: &[u8]) -> io::Result<u64> {
747        let checksum = crc32fast::hash(data);
748
749        let mut buf = Vec::with_capacity(4 + data.len() + 4);
750        buf.extend_from_slice(&(data.len() as u32).to_be_bytes());
751        buf.extend_from_slice(data);
752        buf.extend_from_slice(&checksum.to_be_bytes());
753
754        self.append_batched(&buf)
755    }
756
757    /// Flush and sync the WAL
758    pub fn sync(&self) -> io::Result<()> {
759        // First flush any pending batch
760        self.flush_batch()?;
761        self.writer.flush()?;
762        self.writer.sync()
763    }
764
765    /// Get current WAL size
766    pub fn size(&self) -> u64 {
767        self.writer.offset()
768    }
769
770    /// Get max batch bytes threshold
771    pub fn max_batch_bytes(&self) -> u64 {
772        self.max_batch_bytes
773    }
774
775    /// Get statistics
776    pub fn stats(&self) -> IoUringStatsSnapshot {
777        self.writer.stats()
778    }
779}
780
781// ============================================================================
782// Segment File I/O
783// ============================================================================
784
785/// Optimized segment reader
786pub struct SegmentReader {
787    reader: AsyncReader,
788    length: u64,
789}
790
791impl SegmentReader {
792    /// Open a segment file for reading
793    pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
794        let metadata = std::fs::metadata(&path)?;
795        let length = metadata.len();
796
797        Ok(Self {
798            reader: AsyncReader::open(path, config)?,
799            length,
800        })
801    }
802
803    /// Read messages starting at offset
804    pub fn read_messages(&self, offset: u64, max_bytes: usize) -> io::Result<BytesMut> {
805        let mut buf = BytesMut::zeroed(max_bytes);
806        let n = self.reader.read_at(offset, &mut buf)?;
807        buf.truncate(n);
808        Ok(buf)
809    }
810
811    /// Read a specific range
812    pub fn read_range(&self, offset: u64, len: usize) -> io::Result<BytesMut> {
813        let mut buf = BytesMut::zeroed(len);
814        self.reader.read_exact_at(offset, &mut buf)?;
815        Ok(buf)
816    }
817
818    /// Get segment length
819    pub fn len(&self) -> u64 {
820        self.length
821    }
822
823    /// Check if segment is empty
824    pub fn is_empty(&self) -> bool {
825        self.length == 0
826    }
827
828    /// Get statistics
829    pub fn stats(&self) -> IoUringStatsSnapshot {
830        self.reader.stats()
831    }
832}
833
834// ============================================================================
835// Tests
836// ============================================================================
837
838#[cfg(test)]
839mod tests {
840    use super::*;
841    use tempfile::tempdir;
842
843    #[test]
844    fn test_config_defaults() {
845        let config = IoUringConfig::default();
846        assert_eq!(config.sq_entries, 1024);
847        assert!(!config.kernel_poll);
848        assert_eq!(config.max_inflight, 256);
849    }
850
851    #[test]
852    fn test_config_high_throughput() {
853        let config = IoUringConfig::high_throughput();
854        assert_eq!(config.sq_entries, 4096);
855        assert!(config.kernel_poll);
856        assert!(config.direct_io);
857    }
858
859    #[test]
860    fn test_config_low_latency() {
861        let config = IoUringConfig::low_latency();
862        assert_eq!(config.sq_entries, 256);
863        assert!(config.kernel_poll);
864    }
865
866    #[test]
867    fn test_stats_snapshot() {
868        let stats = IoUringStats::new();
869        stats.ops_submitted.store(100, Ordering::Relaxed);
870        stats.ops_completed.store(95, Ordering::Relaxed);
871        stats.bytes_written.store(10000, Ordering::Relaxed);
872
873        let snapshot = stats.snapshot();
874        assert_eq!(snapshot.in_flight(), 5);
875        assert!((snapshot.completion_rate() - 0.95).abs() < 0.001);
876    }
877
878    #[test]
879    fn test_async_writer() {
880        let dir = tempdir().unwrap();
881        let path = dir.path().join("test.log");
882
883        let config = IoUringConfig::minimal();
884        let writer = AsyncWriter::new(&path, config).unwrap();
885
886        let offset = writer.write(b"hello").unwrap();
887        assert_eq!(offset, 0);
888
889        let offset = writer.write(b"world").unwrap();
890        assert_eq!(offset, 5);
891
892        writer.flush().unwrap();
893
894        let stats = writer.stats();
895        assert_eq!(stats.ops_completed, 2);
896        assert_eq!(stats.bytes_written, 10);
897    }
898
899    #[test]
900    fn test_async_reader() {
901        let dir = tempdir().unwrap();
902        let path = dir.path().join("test.log");
903
904        // Write some data first
905        std::fs::write(&path, b"hello world test data").unwrap();
906
907        let config = IoUringConfig::minimal();
908        let reader = AsyncReader::open(&path, config).unwrap();
909
910        let mut buf = [0u8; 5];
911        let n = reader.read_at(0, &mut buf).unwrap();
912        assert_eq!(n, 5);
913        assert_eq!(&buf, b"hello");
914
915        let mut buf = [0u8; 5];
916        reader.read_exact_at(6, &mut buf).unwrap();
917        assert_eq!(&buf, b"world");
918
919        let stats = reader.stats();
920        assert_eq!(stats.ops_completed, 2);
921    }
922
923    #[test]
924    fn test_io_batch() {
925        let mut batch = IoBatch::new();
926        assert!(batch.is_empty());
927
928        batch.write(0, Bytes::from_static(b"hello"));
929        batch.read(100, 50);
930        batch.sync();
931
932        assert_eq!(batch.len(), 3);
933        assert!(!batch.is_empty());
934
935        batch.clear();
936        assert!(batch.is_empty());
937    }
938
939    #[test]
940    fn test_wal_writer() {
941        let dir = tempdir().unwrap();
942        let path = dir.path().join("wal.log");
943
944        let config = IoUringConfig::minimal();
945        let wal = WalWriter::new(&path, config).unwrap();
946
947        let offset = wal.append(b"entry1").unwrap();
948        assert_eq!(offset, 0);
949
950        let offset = wal.append_with_checksum(b"entry2").unwrap();
951        assert!(offset > 0);
952
953        wal.sync().unwrap();
954        assert!(wal.size() > 0);
955    }
956
957    #[test]
958    fn test_segment_reader() {
959        let dir = tempdir().unwrap();
960        let path = dir.path().join("segment.log");
961
962        std::fs::write(&path, b"message1message2message3").unwrap();
963
964        let config = IoUringConfig::minimal();
965        let reader = SegmentReader::open(&path, config).unwrap();
966
967        assert_eq!(reader.len(), 24);
968        assert!(!reader.is_empty());
969
970        let data = reader.read_messages(0, 100).unwrap();
971        assert_eq!(&data[..], b"message1message2message3");
972
973        let data = reader.read_range(8, 8).unwrap();
974        assert_eq!(&data[..], b"message2");
975    }
976
977    #[test]
978    fn test_io_uring_availability() {
979        // This just checks the function runs without panicking
980        let available = is_io_uring_available();
981        println!("io_uring available: {}", available);
982    }
983
984    // ========================================================================
985    // Batch Operation Tests
986    // ========================================================================
987
988    #[test]
989    fn test_io_batch_pending_write_bytes() {
990        let mut batch = IoBatch::new();
991        assert_eq!(batch.pending_write_bytes(), 0);
992
993        batch.write(0, Bytes::from_static(b"hello"));
994        batch.write(5, Bytes::from_static(b"world"));
995        batch.read(100, 50); // Read doesn't contribute to write bytes
996
997        assert_eq!(batch.pending_write_bytes(), 10);
998    }
999
1000    #[test]
1001    fn test_io_batch_drain() {
1002        let mut batch = IoBatch::new();
1003        batch.write(0, Bytes::from_static(b"hello"));
1004        batch.sync();
1005
1006        let ops: Vec<_> = batch.drain().collect();
1007        assert_eq!(ops.len(), 2);
1008        assert!(batch.is_empty());
1009    }
1010
1011    #[test]
1012    fn test_batch_executor_write() {
1013        let dir = tempdir().unwrap();
1014        let path = dir.path().join("batch_write.log");
1015
1016        let config = IoUringConfig::minimal();
1017        let writer = AsyncWriter::new(&path, config).unwrap();
1018        let executor = BatchExecutor::for_writer(writer);
1019
1020        let mut batch = IoBatch::new();
1021        batch.write(0, Bytes::from_static(b"hello"));
1022        batch.write(5, Bytes::from_static(b"world"));
1023        batch.sync();
1024
1025        let results = executor.execute(&mut batch).unwrap();
1026        assert!(results.is_empty()); // No read results for write operations
1027
1028        // Verify file contents
1029        let contents = std::fs::read(&path).unwrap();
1030        assert_eq!(&contents, b"helloworld");
1031    }
1032
1033    #[test]
1034    fn test_batch_executor_read() {
1035        let dir = tempdir().unwrap();
1036        let path = dir.path().join("batch_read.log");
1037        std::fs::write(&path, b"hello world test data").unwrap();
1038
1039        let config = IoUringConfig::minimal();
1040        let reader = AsyncReader::open(&path, config).unwrap();
1041        let executor = BatchExecutor::for_reader(reader);
1042
1043        let mut batch = IoBatch::new();
1044        batch.read(0, 5);
1045        batch.read(6, 5);
1046
1047        let results = executor.execute(&mut batch).unwrap();
1048        assert_eq!(results.len(), 2);
1049        assert_eq!(&results[0].data[..], b"hello");
1050        assert_eq!(results[0].offset, 0);
1051        assert_eq!(&results[1].data[..], b"world");
1052        assert_eq!(results[1].offset, 6);
1053    }
1054
1055    #[test]
1056    fn test_wal_writer_batched() {
1057        let dir = tempdir().unwrap();
1058        let path = dir.path().join("wal_batch.log");
1059
1060        let config = IoUringConfig::minimal();
1061        let wal = WalWriter::new(&path, config).unwrap();
1062
1063        // Direct write
1064        wal.append(b"direct").unwrap();
1065
1066        // Batched writes
1067        wal.append_batched(b"batch1").unwrap();
1068        wal.append_batched(b"batch2").unwrap();
1069
1070        assert!(wal.has_pending_batch());
1071        assert_eq!(wal.pending_batch_ops(), 2);
1072        assert_eq!(wal.pending_batch_bytes(), 12);
1073
1074        // Flush the batch
1075        wal.flush_batch().unwrap();
1076
1077        assert!(!wal.has_pending_batch());
1078        assert_eq!(wal.pending_batch_bytes(), 0);
1079
1080        // Verify file size includes all writes
1081        assert!(wal.size() >= 18); // "direct" + "batch1" + "batch2"
1082    }
1083
1084    #[test]
1085    fn test_wal_writer_batched_checksum() {
1086        let dir = tempdir().unwrap();
1087        let path = dir.path().join("wal_batch_crc.log");
1088
1089        let config = IoUringConfig::minimal();
1090        let wal = WalWriter::new(&path, config).unwrap();
1091
1092        // Batched write with checksum
1093        wal.append_with_checksum_batched(b"data1").unwrap();
1094        wal.append_with_checksum_batched(b"data2").unwrap();
1095
1096        assert!(wal.has_pending_batch());
1097        wal.sync().unwrap(); // sync() calls flush_batch()
1098
1099        assert!(!wal.has_pending_batch());
1100        assert!(wal.size() > 0);
1101    }
1102
1103    #[test]
1104    fn test_wal_writer_auto_flush() {
1105        let dir = tempdir().unwrap();
1106        let path = dir.path().join("wal_auto_flush.log");
1107
1108        // Create a config with a very small batch threshold
1109        let mut config = IoUringConfig::minimal();
1110        config.registered_buffers = 1;
1111        config.buffer_size = 10; // 10 bytes max batch
1112
1113        let wal = WalWriter::new(&path, config).unwrap();
1114        assert_eq!(wal.max_batch_bytes(), 10);
1115
1116        // First batch should not auto-flush
1117        wal.append_batched(b"hello").unwrap(); // 5 bytes
1118        assert!(wal.has_pending_batch());
1119
1120        // Second batch should trigger auto-flush (5 + 6 = 11 > 10)
1121        wal.append_batched(b"world!").unwrap();
1122        assert!(!wal.has_pending_batch()); // Auto-flushed
1123    }
1124
1125    #[test]
1126    fn test_io_batch_stats() {
1127        let mut batch = IoBatch::new();
1128
1129        // Empty batch
1130        let stats = batch.stats();
1131        assert_eq!(stats.total_ops, 0);
1132        assert_eq!(stats.write_ops, 0);
1133        assert_eq!(stats.read_ops, 0);
1134        assert_eq!(stats.sync_ops, 0);
1135
1136        // Add mixed operations
1137        batch.write(0, Bytes::from_static(b"hello"));
1138        batch.write(5, Bytes::from_static(b"world"));
1139        batch.read(100, 50);
1140        batch.read(200, 100);
1141        batch.sync();
1142
1143        let stats = batch.stats();
1144        assert_eq!(stats.total_ops, 5);
1145        assert_eq!(stats.write_ops, 2);
1146        assert_eq!(stats.read_ops, 2);
1147        assert_eq!(stats.sync_ops, 1);
1148        assert_eq!(stats.write_bytes, 10); // "hello" + "world"
1149        assert_eq!(stats.read_bytes, 150); // 50 + 100
1150    }
1151
1152    #[test]
1153    fn test_io_batch_pending_ops() {
1154        let mut batch = IoBatch::new();
1155
1156        batch.write(0, Bytes::from_static(b"data1"));
1157        batch.write(5, Bytes::from_static(b"data2"));
1158        batch.read(100, 50);
1159        batch.sync();
1160
1161        assert_eq!(batch.pending_write_ops(), 2);
1162        assert_eq!(batch.pending_read_ops(), 1);
1163    }
1164}