Skip to main content

rivven_core/
async_io.rs

1//! Cross-Platform Async I/O Layer
2//!
3//! High-performance asynchronous I/O that works on all major platforms:
4//!
5//! - **Linux**: Uses tokio's epoll-based I/O (io_uring available via feature flag)
6//! - **macOS**: Uses tokio's kqueue-based I/O (optimized for Apple Silicon)
7//! - **Windows**: Uses tokio's IOCP-based I/O
8//!
9//! # Design Philosophy
10//!
11//! Rather than requiring platform-specific features or elevated permissions,
12//! this module provides a unified async I/O API that delivers excellent
13//! performance on all platforms using tokio's battle-tested I/O primitives.
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────────────────────────────────────────────────────────────────┐
19//! │                     AsyncIo Unified API                             │
20//! ├─────────────────────────────────────────────────────────────────────┤
21//! │                                                                     │
22//! │  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │
23//! │  │   AsyncFile  │    │ BatchBuilder │    │ AsyncSegment │          │
24//! │  │  read/write  │    │  batched ops │    │  log storage │          │
25//! │  └──────────────┘    └──────────────┘    └──────────────┘          │
26//! │         │                   │                   │                   │
27//! │         └───────────────────┴───────────────────┘                   │
28//! │                             │                                       │
29//! │                    ┌────────┴────────┐                              │
30//! │                    │   Tokio Async   │                              │
31//! │                    │   File I/O      │                              │
32//! │                    └────────┬────────┘                              │
33//! │                             │                                       │
34//! │  ┌──────────┬───────────────┼───────────────┬──────────┐           │
35//! │  │  Linux   │    macOS      │    Windows    │  Other   │           │
36//! │  │  epoll   │    kqueue     │    IOCP       │  poll    │           │
37//! │  └──────────┴───────────────┴───────────────┴──────────┘           │
38//! └─────────────────────────────────────────────────────────────────────┘
39//! ```
40//!
41//! # Performance Characteristics
42//!
43//! | Platform | Backend | Typical Latency | Notes |
44//! |----------|---------|-----------------|-------|
45//! | Linux    | epoll   | ~5-10µs         | Scales to millions of fds |
46//! | macOS    | kqueue  | ~5-10µs         | Native Apple Silicon support |
47//! | Windows  | IOCP    | ~10-20µs        | True async completion ports |
48//!
49//! # Example
50//!
51//! ```rust,ignore
52//! use rivven_core::async_io::{AsyncIo, AsyncFile};
53//!
54//! let io = AsyncIo::new(AsyncIoConfig::default())?;
55//! let file = AsyncFile::open("data.log", io.clone()).await?;
56//!
57//! // Write data
58//! file.write_at(0, b"Hello, World!").await?;
59//!
60//! // Read it back
61//! let data = file.read_at(0, 13).await?;
62//! ```
63
64use std::io;
65use std::path::Path;
66use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
67use std::sync::Arc;
68
69use bytes::{Bytes, BytesMut};
70use tokio::fs::{File, OpenOptions};
71use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
72use tokio::sync::Mutex as TokioMutex;
73
74// ============================================================================
75// Configuration
76// ============================================================================
77
78/// Async I/O configuration
79#[derive(Debug, Clone)]
80pub struct AsyncIoConfig {
81    /// Maximum concurrent operations
82    pub max_concurrent_ops: usize,
83    /// Read buffer size
84    pub read_buffer_size: usize,
85    /// Write buffer size  
86    pub write_buffer_size: usize,
87    /// Enable direct I/O hints (advisory, platform-dependent)
88    pub direct_io_hint: bool,
89    /// Sync on write (for durability)
90    pub sync_on_write: bool,
91}
92
93impl Default for AsyncIoConfig {
94    fn default() -> Self {
95        Self {
96            max_concurrent_ops: 1024,
97            read_buffer_size: 64 * 1024,  // 64KB
98            write_buffer_size: 64 * 1024, // 64KB
99            direct_io_hint: false,
100            sync_on_write: false,
101        }
102    }
103}
104
105impl AsyncIoConfig {
106    /// High-performance configuration for SSDs
107    pub fn high_performance() -> Self {
108        Self {
109            max_concurrent_ops: 4096,
110            read_buffer_size: 128 * 1024,  // 128KB
111            write_buffer_size: 128 * 1024, // 128KB
112            direct_io_hint: true,
113            sync_on_write: false,
114        }
115    }
116
117    /// Low-latency configuration  
118    pub fn low_latency() -> Self {
119        Self {
120            max_concurrent_ops: 2048,
121            read_buffer_size: 4 * 1024,  // 4KB - smaller for lower latency
122            write_buffer_size: 4 * 1024, // 4KB
123            direct_io_hint: true,
124            sync_on_write: false,
125        }
126    }
127
128    /// Durable configuration (sync writes)
129    pub fn durable() -> Self {
130        Self {
131            max_concurrent_ops: 512,
132            read_buffer_size: 64 * 1024,
133            write_buffer_size: 64 * 1024,
134            direct_io_hint: false,
135            sync_on_write: true,
136        }
137    }
138}
139
140// ============================================================================
141// Statistics
142// ============================================================================
143
144/// I/O operation statistics
145#[derive(Debug, Default)]
146pub struct AsyncIoStats {
147    /// Total read operations
148    pub read_ops: AtomicU64,
149    /// Total write operations
150    pub write_ops: AtomicU64,
151    /// Total sync operations
152    pub sync_ops: AtomicU64,
153    /// Total bytes read
154    pub bytes_read: AtomicU64,
155    /// Total bytes written
156    pub bytes_written: AtomicU64,
157    /// Failed operations
158    pub failed_ops: AtomicU64,
159    /// Current in-flight operations
160    pub inflight: AtomicUsize,
161}
162
163impl AsyncIoStats {
164    /// Get a snapshot of current statistics
165    pub fn snapshot(&self) -> AsyncIoStatsSnapshot {
166        AsyncIoStatsSnapshot {
167            read_ops: self.read_ops.load(Ordering::Relaxed),
168            write_ops: self.write_ops.load(Ordering::Relaxed),
169            sync_ops: self.sync_ops.load(Ordering::Relaxed),
170            bytes_read: self.bytes_read.load(Ordering::Relaxed),
171            bytes_written: self.bytes_written.load(Ordering::Relaxed),
172            failed_ops: self.failed_ops.load(Ordering::Relaxed),
173            inflight: self.inflight.load(Ordering::Relaxed),
174        }
175    }
176}
177
178/// Snapshot of I/O statistics
179#[derive(Debug, Clone)]
180pub struct AsyncIoStatsSnapshot {
181    pub read_ops: u64,
182    pub write_ops: u64,
183    pub sync_ops: u64,
184    pub bytes_read: u64,
185    pub bytes_written: u64,
186    pub failed_ops: u64,
187    pub inflight: usize,
188}
189
190impl AsyncIoStatsSnapshot {
191    /// Calculate throughput in MB/s given a time window
192    pub fn throughput_mbps(&self, duration_secs: f64) -> f64 {
193        if duration_secs > 0.0 {
194            let total_bytes = self.bytes_read + self.bytes_written;
195            (total_bytes as f64 / 1024.0 / 1024.0) / duration_secs
196        } else {
197            0.0
198        }
199    }
200
201    /// Calculate operations per second
202    pub fn ops_per_second(&self, duration_secs: f64) -> f64 {
203        if duration_secs > 0.0 {
204            (self.read_ops + self.write_ops + self.sync_ops) as f64 / duration_secs
205        } else {
206            0.0
207        }
208    }
209}
210
211// ============================================================================
212// Async I/O Engine
213// ============================================================================
214
215/// Cross-platform async I/O engine
216///
217/// Provides high-performance async I/O using the best available
218/// backend on each platform (epoll/kqueue/IOCP via tokio).
219pub struct AsyncIo {
220    config: AsyncIoConfig,
221    stats: Arc<AsyncIoStats>,
222}
223
224impl AsyncIo {
225    /// Create a new async I/O engine
226    pub fn new(config: AsyncIoConfig) -> io::Result<Arc<Self>> {
227        Ok(Arc::new(Self {
228            config,
229            stats: Arc::new(AsyncIoStats::default()),
230        }))
231    }
232
233    /// Get the configuration
234    pub fn config(&self) -> &AsyncIoConfig {
235        &self.config
236    }
237
238    /// Get statistics
239    pub fn stats(&self) -> &AsyncIoStats {
240        &self.stats
241    }
242
243    /// Check if there's capacity for more operations
244    pub fn has_capacity(&self) -> bool {
245        self.stats.inflight.load(Ordering::Relaxed) < self.config.max_concurrent_ops
246    }
247
248    /// Get number of in-flight operations
249    pub fn inflight(&self) -> usize {
250        self.stats.inflight.load(Ordering::Relaxed)
251    }
252}
253
254// ============================================================================
255// Async File
256// ============================================================================
257
258/// High-level async file handle
259///
260/// Provides async read/write operations with automatic statistics tracking.
261pub struct AsyncFile {
262    file: TokioMutex<File>,
263    io: Arc<AsyncIo>,
264    position: AtomicU64,
265}
266
267impl AsyncFile {
268    /// Open a file for reading and writing (creates if not exists)
269    pub async fn open<P: AsRef<Path>>(path: P, io: Arc<AsyncIo>) -> io::Result<Self> {
270        let file = OpenOptions::new()
271            .read(true)
272            .write(true)
273            .create(true)
274            .truncate(false) // Preserve existing data
275            .open(path)
276            .await?;
277
278        Ok(Self {
279            file: TokioMutex::new(file),
280            io,
281            position: AtomicU64::new(0),
282        })
283    }
284
285    /// Open a file read-only
286    pub async fn open_read<P: AsRef<Path>>(path: P, io: Arc<AsyncIo>) -> io::Result<Self> {
287        let file = OpenOptions::new().read(true).open(path).await?;
288
289        Ok(Self {
290            file: TokioMutex::new(file),
291            io,
292            position: AtomicU64::new(0),
293        })
294    }
295
296    /// Async read at a specific offset
297    pub async fn read_at(&self, offset: u64, len: usize) -> io::Result<Bytes> {
298        self.io.stats.inflight.fetch_add(1, Ordering::Relaxed);
299
300        let result = async {
301            let mut file = self.file.lock().await;
302            file.seek(io::SeekFrom::Start(offset)).await?;
303
304            let mut buf = BytesMut::with_capacity(len);
305            buf.resize(len, 0);
306
307            let bytes_read = file.read(&mut buf).await?;
308            buf.truncate(bytes_read);
309
310            Ok::<_, io::Error>(buf.freeze())
311        }
312        .await;
313
314        self.io.stats.inflight.fetch_sub(1, Ordering::Relaxed);
315
316        match &result {
317            Ok(data) => {
318                self.io.stats.read_ops.fetch_add(1, Ordering::Relaxed);
319                self.io
320                    .stats
321                    .bytes_read
322                    .fetch_add(data.len() as u64, Ordering::Relaxed);
323            }
324            Err(_) => {
325                self.io.stats.failed_ops.fetch_add(1, Ordering::Relaxed);
326            }
327        }
328
329        result
330    }
331
332    /// Async write at a specific offset
333    pub async fn write_at(&self, offset: u64, data: &[u8]) -> io::Result<usize> {
334        self.io.stats.inflight.fetch_add(1, Ordering::Relaxed);
335
336        let result = async {
337            let mut file = self.file.lock().await;
338            file.seek(io::SeekFrom::Start(offset)).await?;
339
340            let written = file.write(data).await?;
341
342            if self.io.config.sync_on_write {
343                file.sync_all().await?;
344            }
345
346            Ok::<_, io::Error>(written)
347        }
348        .await;
349
350        self.io.stats.inflight.fetch_sub(1, Ordering::Relaxed);
351
352        match &result {
353            Ok(written) => {
354                self.io.stats.write_ops.fetch_add(1, Ordering::Relaxed);
355                self.io
356                    .stats
357                    .bytes_written
358                    .fetch_add(*written as u64, Ordering::Relaxed);
359            }
360            Err(_) => {
361                self.io.stats.failed_ops.fetch_add(1, Ordering::Relaxed);
362            }
363        }
364
365        result
366    }
367
368    /// Async read at current position (updates position)
369    pub async fn read(&self, len: usize) -> io::Result<Bytes> {
370        let pos = self.position.load(Ordering::Relaxed);
371        let data = self.read_at(pos, len).await?;
372        self.position
373            .fetch_add(data.len() as u64, Ordering::Relaxed);
374        Ok(data)
375    }
376
377    /// Async write at current position (updates position)
378    pub async fn write(&self, data: &[u8]) -> io::Result<usize> {
379        let pos = self.position.load(Ordering::Relaxed);
380        let written = self.write_at(pos, data).await?;
381        self.position.fetch_add(written as u64, Ordering::Relaxed);
382        Ok(written)
383    }
384
385    /// Sync file to disk
386    pub async fn sync(&self) -> io::Result<()> {
387        self.io.stats.inflight.fetch_add(1, Ordering::Relaxed);
388
389        let result = {
390            let file = self.file.lock().await;
391            file.sync_all().await
392        };
393
394        self.io.stats.inflight.fetch_sub(1, Ordering::Relaxed);
395
396        match &result {
397            Ok(_) => {
398                self.io.stats.sync_ops.fetch_add(1, Ordering::Relaxed);
399            }
400            Err(_) => {
401                self.io.stats.failed_ops.fetch_add(1, Ordering::Relaxed);
402            }
403        }
404
405        result
406    }
407
408    /// Seek to position
409    pub fn seek(&self, pos: u64) {
410        self.position.store(pos, Ordering::Relaxed);
411    }
412
413    /// Get current position
414    pub fn position(&self) -> u64 {
415        self.position.load(Ordering::Relaxed)
416    }
417
418    /// Get file size
419    pub async fn size(&self) -> io::Result<u64> {
420        let file = self.file.lock().await;
421        Ok(file.metadata().await?.len())
422    }
423}
424
425// ============================================================================
426// Batch Builder
427// ============================================================================
428
429/// Builder for batched I/O operations
430///
431/// Allows building up multiple I/O operations and executing them together.
432pub struct BatchBuilder {
433    io: Arc<AsyncIo>,
434    ops: Vec<BatchOp>,
435}
436
437enum BatchOp {
438    Read {
439        path: std::path::PathBuf,
440        offset: u64,
441        len: usize,
442    },
443    Write {
444        path: std::path::PathBuf,
445        offset: u64,
446        data: Vec<u8>,
447    },
448}
449
450/// Result of a batch operation
451#[derive(Debug)]
452pub enum BatchResult {
453    /// Read completed successfully
454    Read(Bytes),
455    /// Write completed successfully (bytes written)
456    Write(usize),
457    /// Operation failed
458    Error(io::Error),
459}
460
461impl BatchBuilder {
462    /// Create a new batch builder
463    pub fn new(io: Arc<AsyncIo>) -> Self {
464        Self {
465            io,
466            ops: Vec::new(),
467        }
468    }
469
470    /// Add a read operation
471    pub fn read<P: AsRef<Path>>(mut self, path: P, offset: u64, len: usize) -> Self {
472        self.ops.push(BatchOp::Read {
473            path: path.as_ref().to_path_buf(),
474            offset,
475            len,
476        });
477        self
478    }
479
480    /// Add a write operation
481    pub fn write<P: AsRef<Path>>(mut self, path: P, offset: u64, data: Vec<u8>) -> Self {
482        self.ops.push(BatchOp::Write {
483            path: path.as_ref().to_path_buf(),
484            offset,
485            data,
486        });
487        self
488    }
489
490    /// Execute all operations concurrently
491    pub async fn execute(self) -> Vec<BatchResult> {
492        use futures::future::join_all;
493
494        let io = self.io;
495        let futures: Vec<_> = self
496            .ops
497            .into_iter()
498            .map(|op| {
499                let io = io.clone();
500                async move {
501                    match op {
502                        BatchOp::Read { path, offset, len } => {
503                            match AsyncFile::open(&path, io).await {
504                                Ok(file) => match file.read_at(offset, len).await {
505                                    Ok(data) => BatchResult::Read(data),
506                                    Err(e) => BatchResult::Error(e),
507                                },
508                                Err(e) => BatchResult::Error(e),
509                            }
510                        }
511                        BatchOp::Write { path, offset, data } => {
512                            match AsyncFile::open(&path, io).await {
513                                Ok(file) => match file.write_at(offset, &data).await {
514                                    Ok(written) => BatchResult::Write(written),
515                                    Err(e) => BatchResult::Error(e),
516                                },
517                                Err(e) => BatchResult::Error(e),
518                            }
519                        }
520                    }
521                }
522            })
523            .collect();
524
525        join_all(futures).await
526    }
527}
528
529// ============================================================================
530// Async Segment (for log storage)
531// ============================================================================
532
533/// Async segment for log-structured storage
534///
535/// A segment is a single file that stores log entries sequentially.
536pub struct AsyncSegment {
537    file: AsyncFile,
538    base_offset: u64,
539    size: AtomicU64,
540}
541
542impl AsyncSegment {
543    /// Create or open a segment
544    pub async fn open<P: AsRef<Path>>(
545        path: P,
546        base_offset: u64,
547        io: Arc<AsyncIo>,
548    ) -> io::Result<Self> {
549        let file = AsyncFile::open(&path, io).await?;
550        let size = file.size().await.unwrap_or(0);
551
552        Ok(Self {
553            file,
554            base_offset,
555            size: AtomicU64::new(size),
556        })
557    }
558
559    /// Append data to the segment
560    pub async fn append(&self, data: &[u8]) -> io::Result<u64> {
561        let offset = self.size.fetch_add(data.len() as u64, Ordering::SeqCst);
562        self.file.write_at(offset, data).await?;
563        Ok(offset)
564    }
565
566    /// Read data from the segment
567    pub async fn read(&self, offset: u64, len: usize) -> io::Result<Bytes> {
568        self.file.read_at(offset, len).await
569    }
570
571    /// Sync the segment to disk
572    pub async fn sync(&self) -> io::Result<()> {
573        self.file.sync().await
574    }
575
576    /// Get the base offset
577    pub fn base_offset(&self) -> u64 {
578        self.base_offset
579    }
580
581    /// Get the current size
582    pub fn size(&self) -> u64 {
583        self.size.load(Ordering::Relaxed)
584    }
585}
586
587// ============================================================================
588// Tests
589// ============================================================================
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594    use tempfile::tempdir;
595
596    #[tokio::test]
597    async fn test_async_io_basic() {
598        let config = AsyncIoConfig::default();
599        let io = AsyncIo::new(config).unwrap();
600
601        // Create temp file
602        let dir = tempdir().unwrap();
603        let path = dir.path().join("test.dat");
604
605        let file = AsyncFile::open(&path, io.clone()).await.unwrap();
606
607        // Write
608        let data = b"Hello, cross-platform I/O!";
609        let written = file.write_at(0, data).await.unwrap();
610        assert_eq!(written, data.len());
611
612        // Read back
613        let read = file.read_at(0, data.len()).await.unwrap();
614        assert_eq!(&read[..], data);
615
616        // Check stats
617        let stats = io.stats().snapshot();
618        assert!(stats.write_ops > 0);
619        assert!(stats.read_ops > 0);
620        assert_eq!(stats.bytes_written, data.len() as u64);
621        assert_eq!(stats.bytes_read, data.len() as u64);
622    }
623
624    #[tokio::test]
625    async fn test_async_file_sequential() {
626        let io = AsyncIo::new(AsyncIoConfig::default()).unwrap();
627        let dir = tempdir().unwrap();
628        let path = dir.path().join("sequential.dat");
629
630        let file = AsyncFile::open(&path, io).await.unwrap();
631
632        // Sequential writes
633        file.write(b"Hello").await.unwrap();
634        file.write(b" World").await.unwrap();
635
636        // Seek back and read
637        file.seek(0);
638        let data = file.read(11).await.unwrap();
639        assert_eq!(&data[..], b"Hello World");
640    }
641
642    #[tokio::test]
643    async fn test_async_segment() {
644        let io = AsyncIo::new(AsyncIoConfig::default()).unwrap();
645        let dir = tempdir().unwrap();
646        let path = dir.path().join("segment.log");
647
648        let segment = AsyncSegment::open(&path, 0, io).await.unwrap();
649
650        // Append messages
651        let pos1 = segment.append(b"message1").await.unwrap();
652        let pos2 = segment.append(b"message2").await.unwrap();
653
654        assert_eq!(pos1, 0);
655        assert_eq!(pos2, 8);
656
657        // Read back
658        let data1 = segment.read(0, 8).await.unwrap();
659        let data2 = segment.read(8, 8).await.unwrap();
660
661        assert_eq!(&data1[..], b"message1");
662        assert_eq!(&data2[..], b"message2");
663
664        // Check size
665        assert_eq!(segment.size(), 16);
666    }
667
668    #[tokio::test]
669    async fn test_batch_operations() {
670        let io = AsyncIo::new(AsyncIoConfig::default()).unwrap();
671        let dir = tempdir().unwrap();
672
673        let path1 = dir.path().join("batch1.dat");
674        let path2 = dir.path().join("batch2.dat");
675
676        // First create files with some data
677        let file1 = AsyncFile::open(&path1, io.clone()).await.unwrap();
678        let file2 = AsyncFile::open(&path2, io.clone()).await.unwrap();
679
680        file1.write_at(0, b"file1 data").await.unwrap();
681        file2.write_at(0, b"file2 data").await.unwrap();
682
683        // Now batch read
684        let results = BatchBuilder::new(io)
685            .read(&path1, 0, 10)
686            .read(&path2, 0, 10)
687            .execute()
688            .await;
689
690        assert_eq!(results.len(), 2);
691
692        match &results[0] {
693            BatchResult::Read(data) => assert_eq!(&data[..], b"file1 data"),
694            _ => panic!("Expected read result"),
695        }
696
697        match &results[1] {
698            BatchResult::Read(data) => assert_eq!(&data[..], b"file2 data"),
699            _ => panic!("Expected read result"),
700        }
701    }
702
703    #[tokio::test]
704    async fn test_sync_operations() {
705        let config = AsyncIoConfig::durable();
706        let io = AsyncIo::new(config).unwrap();
707        let dir = tempdir().unwrap();
708        let path = dir.path().join("durable.dat");
709
710        let file = AsyncFile::open(&path, io.clone()).await.unwrap();
711
712        // Write with sync
713        file.write_at(0, b"durable data").await.unwrap();
714
715        // Explicit sync
716        file.sync().await.unwrap();
717
718        let stats = io.stats().snapshot();
719        assert!(stats.sync_ops >= 1);
720    }
721
722    #[tokio::test]
723    async fn test_config_variants() {
724        // Test all config variants compile and create valid I/O engines
725        let configs = vec![
726            AsyncIoConfig::default(),
727            AsyncIoConfig::high_performance(),
728            AsyncIoConfig::low_latency(),
729            AsyncIoConfig::durable(),
730        ];
731
732        for config in configs {
733            let io = AsyncIo::new(config).unwrap();
734            assert!(io.has_capacity());
735        }
736    }
737}