1use std::io;
65use std::path::Path;
66use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
67use std::sync::Arc;
68
69use bytes::{Bytes, BytesMut};
70use tokio::fs::{File, OpenOptions};
71use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
72use tokio::sync::Mutex as TokioMutex;
73
74#[derive(Debug, Clone)]
80pub struct AsyncIoConfig {
81 pub max_concurrent_ops: usize,
83 pub read_buffer_size: usize,
85 pub write_buffer_size: usize,
87 pub direct_io_hint: bool,
89 pub sync_on_write: bool,
91}
92
93impl Default for AsyncIoConfig {
94 fn default() -> Self {
95 Self {
96 max_concurrent_ops: 1024,
97 read_buffer_size: 64 * 1024, write_buffer_size: 64 * 1024, direct_io_hint: false,
100 sync_on_write: false,
101 }
102 }
103}
104
105impl AsyncIoConfig {
106 pub fn high_performance() -> Self {
108 Self {
109 max_concurrent_ops: 4096,
110 read_buffer_size: 128 * 1024, write_buffer_size: 128 * 1024, direct_io_hint: true,
113 sync_on_write: false,
114 }
115 }
116
117 pub fn low_latency() -> Self {
119 Self {
120 max_concurrent_ops: 2048,
121 read_buffer_size: 4 * 1024, write_buffer_size: 4 * 1024, direct_io_hint: true,
124 sync_on_write: false,
125 }
126 }
127
128 pub fn durable() -> Self {
130 Self {
131 max_concurrent_ops: 512,
132 read_buffer_size: 64 * 1024,
133 write_buffer_size: 64 * 1024,
134 direct_io_hint: false,
135 sync_on_write: true,
136 }
137 }
138}
139
140#[derive(Debug, Default)]
146pub struct AsyncIoStats {
147 pub read_ops: AtomicU64,
149 pub write_ops: AtomicU64,
151 pub sync_ops: AtomicU64,
153 pub bytes_read: AtomicU64,
155 pub bytes_written: AtomicU64,
157 pub failed_ops: AtomicU64,
159 pub inflight: AtomicUsize,
161}
162
163impl AsyncIoStats {
164 pub fn snapshot(&self) -> AsyncIoStatsSnapshot {
166 AsyncIoStatsSnapshot {
167 read_ops: self.read_ops.load(Ordering::Relaxed),
168 write_ops: self.write_ops.load(Ordering::Relaxed),
169 sync_ops: self.sync_ops.load(Ordering::Relaxed),
170 bytes_read: self.bytes_read.load(Ordering::Relaxed),
171 bytes_written: self.bytes_written.load(Ordering::Relaxed),
172 failed_ops: self.failed_ops.load(Ordering::Relaxed),
173 inflight: self.inflight.load(Ordering::Relaxed),
174 }
175 }
176}
177
178#[derive(Debug, Clone)]
180pub struct AsyncIoStatsSnapshot {
181 pub read_ops: u64,
182 pub write_ops: u64,
183 pub sync_ops: u64,
184 pub bytes_read: u64,
185 pub bytes_written: u64,
186 pub failed_ops: u64,
187 pub inflight: usize,
188}
189
190impl AsyncIoStatsSnapshot {
191 pub fn throughput_mbps(&self, duration_secs: f64) -> f64 {
193 if duration_secs > 0.0 {
194 let total_bytes = self.bytes_read + self.bytes_written;
195 (total_bytes as f64 / 1024.0 / 1024.0) / duration_secs
196 } else {
197 0.0
198 }
199 }
200
201 pub fn ops_per_second(&self, duration_secs: f64) -> f64 {
203 if duration_secs > 0.0 {
204 (self.read_ops + self.write_ops + self.sync_ops) as f64 / duration_secs
205 } else {
206 0.0
207 }
208 }
209}
210
211pub struct AsyncIo {
220 config: AsyncIoConfig,
221 stats: Arc<AsyncIoStats>,
222}
223
224impl AsyncIo {
225 pub fn new(config: AsyncIoConfig) -> io::Result<Arc<Self>> {
227 Ok(Arc::new(Self {
228 config,
229 stats: Arc::new(AsyncIoStats::default()),
230 }))
231 }
232
233 pub fn config(&self) -> &AsyncIoConfig {
235 &self.config
236 }
237
238 pub fn stats(&self) -> &AsyncIoStats {
240 &self.stats
241 }
242
243 pub fn has_capacity(&self) -> bool {
245 self.stats.inflight.load(Ordering::Relaxed) < self.config.max_concurrent_ops
246 }
247
248 pub fn inflight(&self) -> usize {
250 self.stats.inflight.load(Ordering::Relaxed)
251 }
252}
253
254pub struct AsyncFile {
262 file: TokioMutex<File>,
263 io: Arc<AsyncIo>,
264 position: AtomicU64,
265}
266
267impl AsyncFile {
268 pub async fn open<P: AsRef<Path>>(path: P, io: Arc<AsyncIo>) -> io::Result<Self> {
270 let file = OpenOptions::new()
271 .read(true)
272 .write(true)
273 .create(true)
274 .truncate(false) .open(path)
276 .await?;
277
278 Ok(Self {
279 file: TokioMutex::new(file),
280 io,
281 position: AtomicU64::new(0),
282 })
283 }
284
285 pub async fn open_read<P: AsRef<Path>>(path: P, io: Arc<AsyncIo>) -> io::Result<Self> {
287 let file = OpenOptions::new().read(true).open(path).await?;
288
289 Ok(Self {
290 file: TokioMutex::new(file),
291 io,
292 position: AtomicU64::new(0),
293 })
294 }
295
296 pub async fn read_at(&self, offset: u64, len: usize) -> io::Result<Bytes> {
298 self.io.stats.inflight.fetch_add(1, Ordering::Relaxed);
299
300 let result = async {
301 let mut file = self.file.lock().await;
302 file.seek(io::SeekFrom::Start(offset)).await?;
303
304 let mut buf = BytesMut::with_capacity(len);
305 buf.resize(len, 0);
306
307 let bytes_read = file.read(&mut buf).await?;
308 buf.truncate(bytes_read);
309
310 Ok::<_, io::Error>(buf.freeze())
311 }
312 .await;
313
314 self.io.stats.inflight.fetch_sub(1, Ordering::Relaxed);
315
316 match &result {
317 Ok(data) => {
318 self.io.stats.read_ops.fetch_add(1, Ordering::Relaxed);
319 self.io
320 .stats
321 .bytes_read
322 .fetch_add(data.len() as u64, Ordering::Relaxed);
323 }
324 Err(_) => {
325 self.io.stats.failed_ops.fetch_add(1, Ordering::Relaxed);
326 }
327 }
328
329 result
330 }
331
332 pub async fn write_at(&self, offset: u64, data: &[u8]) -> io::Result<usize> {
334 self.io.stats.inflight.fetch_add(1, Ordering::Relaxed);
335
336 let result = async {
337 let mut file = self.file.lock().await;
338 file.seek(io::SeekFrom::Start(offset)).await?;
339
340 let written = file.write(data).await?;
341
342 if self.io.config.sync_on_write {
343 file.sync_all().await?;
344 }
345
346 Ok::<_, io::Error>(written)
347 }
348 .await;
349
350 self.io.stats.inflight.fetch_sub(1, Ordering::Relaxed);
351
352 match &result {
353 Ok(written) => {
354 self.io.stats.write_ops.fetch_add(1, Ordering::Relaxed);
355 self.io
356 .stats
357 .bytes_written
358 .fetch_add(*written as u64, Ordering::Relaxed);
359 }
360 Err(_) => {
361 self.io.stats.failed_ops.fetch_add(1, Ordering::Relaxed);
362 }
363 }
364
365 result
366 }
367
368 pub async fn read(&self, len: usize) -> io::Result<Bytes> {
370 let pos = self.position.load(Ordering::Relaxed);
371 let data = self.read_at(pos, len).await?;
372 self.position
373 .fetch_add(data.len() as u64, Ordering::Relaxed);
374 Ok(data)
375 }
376
377 pub async fn write(&self, data: &[u8]) -> io::Result<usize> {
379 let pos = self.position.load(Ordering::Relaxed);
380 let written = self.write_at(pos, data).await?;
381 self.position.fetch_add(written as u64, Ordering::Relaxed);
382 Ok(written)
383 }
384
385 pub async fn sync(&self) -> io::Result<()> {
387 self.io.stats.inflight.fetch_add(1, Ordering::Relaxed);
388
389 let result = {
390 let file = self.file.lock().await;
391 file.sync_all().await
392 };
393
394 self.io.stats.inflight.fetch_sub(1, Ordering::Relaxed);
395
396 match &result {
397 Ok(_) => {
398 self.io.stats.sync_ops.fetch_add(1, Ordering::Relaxed);
399 }
400 Err(_) => {
401 self.io.stats.failed_ops.fetch_add(1, Ordering::Relaxed);
402 }
403 }
404
405 result
406 }
407
408 pub fn seek(&self, pos: u64) {
410 self.position.store(pos, Ordering::Relaxed);
411 }
412
413 pub fn position(&self) -> u64 {
415 self.position.load(Ordering::Relaxed)
416 }
417
418 pub async fn size(&self) -> io::Result<u64> {
420 let file = self.file.lock().await;
421 Ok(file.metadata().await?.len())
422 }
423}
424
425pub struct BatchBuilder {
433 io: Arc<AsyncIo>,
434 ops: Vec<BatchOp>,
435}
436
437enum BatchOp {
438 Read {
439 path: std::path::PathBuf,
440 offset: u64,
441 len: usize,
442 },
443 Write {
444 path: std::path::PathBuf,
445 offset: u64,
446 data: Vec<u8>,
447 },
448}
449
450#[derive(Debug)]
452pub enum BatchResult {
453 Read(Bytes),
455 Write(usize),
457 Error(io::Error),
459}
460
461impl BatchBuilder {
462 pub fn new(io: Arc<AsyncIo>) -> Self {
464 Self {
465 io,
466 ops: Vec::new(),
467 }
468 }
469
470 pub fn read<P: AsRef<Path>>(mut self, path: P, offset: u64, len: usize) -> Self {
472 self.ops.push(BatchOp::Read {
473 path: path.as_ref().to_path_buf(),
474 offset,
475 len,
476 });
477 self
478 }
479
480 pub fn write<P: AsRef<Path>>(mut self, path: P, offset: u64, data: Vec<u8>) -> Self {
482 self.ops.push(BatchOp::Write {
483 path: path.as_ref().to_path_buf(),
484 offset,
485 data,
486 });
487 self
488 }
489
490 pub async fn execute(self) -> Vec<BatchResult> {
492 use futures::future::join_all;
493
494 let io = self.io;
495 let futures: Vec<_> = self
496 .ops
497 .into_iter()
498 .map(|op| {
499 let io = io.clone();
500 async move {
501 match op {
502 BatchOp::Read { path, offset, len } => {
503 match AsyncFile::open(&path, io).await {
504 Ok(file) => match file.read_at(offset, len).await {
505 Ok(data) => BatchResult::Read(data),
506 Err(e) => BatchResult::Error(e),
507 },
508 Err(e) => BatchResult::Error(e),
509 }
510 }
511 BatchOp::Write { path, offset, data } => {
512 match AsyncFile::open(&path, io).await {
513 Ok(file) => match file.write_at(offset, &data).await {
514 Ok(written) => BatchResult::Write(written),
515 Err(e) => BatchResult::Error(e),
516 },
517 Err(e) => BatchResult::Error(e),
518 }
519 }
520 }
521 }
522 })
523 .collect();
524
525 join_all(futures).await
526 }
527}
528
529pub struct AsyncSegment {
537 file: AsyncFile,
538 base_offset: u64,
539 size: AtomicU64,
540}
541
542impl AsyncSegment {
543 pub async fn open<P: AsRef<Path>>(
545 path: P,
546 base_offset: u64,
547 io: Arc<AsyncIo>,
548 ) -> io::Result<Self> {
549 let file = AsyncFile::open(&path, io).await?;
550 let size = file.size().await.unwrap_or(0);
551
552 Ok(Self {
553 file,
554 base_offset,
555 size: AtomicU64::new(size),
556 })
557 }
558
559 pub async fn append(&self, data: &[u8]) -> io::Result<u64> {
561 let offset = self.size.fetch_add(data.len() as u64, Ordering::SeqCst);
562 self.file.write_at(offset, data).await?;
563 Ok(offset)
564 }
565
566 pub async fn read(&self, offset: u64, len: usize) -> io::Result<Bytes> {
568 self.file.read_at(offset, len).await
569 }
570
571 pub async fn sync(&self) -> io::Result<()> {
573 self.file.sync().await
574 }
575
576 pub fn base_offset(&self) -> u64 {
578 self.base_offset
579 }
580
581 pub fn size(&self) -> u64 {
583 self.size.load(Ordering::Relaxed)
584 }
585}
586
587#[cfg(test)]
592mod tests {
593 use super::*;
594 use tempfile::tempdir;
595
596 #[tokio::test]
597 async fn test_async_io_basic() {
598 let config = AsyncIoConfig::default();
599 let io = AsyncIo::new(config).unwrap();
600
601 let dir = tempdir().unwrap();
603 let path = dir.path().join("test.dat");
604
605 let file = AsyncFile::open(&path, io.clone()).await.unwrap();
606
607 let data = b"Hello, cross-platform I/O!";
609 let written = file.write_at(0, data).await.unwrap();
610 assert_eq!(written, data.len());
611
612 let read = file.read_at(0, data.len()).await.unwrap();
614 assert_eq!(&read[..], data);
615
616 let stats = io.stats().snapshot();
618 assert!(stats.write_ops > 0);
619 assert!(stats.read_ops > 0);
620 assert_eq!(stats.bytes_written, data.len() as u64);
621 assert_eq!(stats.bytes_read, data.len() as u64);
622 }
623
624 #[tokio::test]
625 async fn test_async_file_sequential() {
626 let io = AsyncIo::new(AsyncIoConfig::default()).unwrap();
627 let dir = tempdir().unwrap();
628 let path = dir.path().join("sequential.dat");
629
630 let file = AsyncFile::open(&path, io).await.unwrap();
631
632 file.write(b"Hello").await.unwrap();
634 file.write(b" World").await.unwrap();
635
636 file.seek(0);
638 let data = file.read(11).await.unwrap();
639 assert_eq!(&data[..], b"Hello World");
640 }
641
642 #[tokio::test]
643 async fn test_async_segment() {
644 let io = AsyncIo::new(AsyncIoConfig::default()).unwrap();
645 let dir = tempdir().unwrap();
646 let path = dir.path().join("segment.log");
647
648 let segment = AsyncSegment::open(&path, 0, io).await.unwrap();
649
650 let pos1 = segment.append(b"message1").await.unwrap();
652 let pos2 = segment.append(b"message2").await.unwrap();
653
654 assert_eq!(pos1, 0);
655 assert_eq!(pos2, 8);
656
657 let data1 = segment.read(0, 8).await.unwrap();
659 let data2 = segment.read(8, 8).await.unwrap();
660
661 assert_eq!(&data1[..], b"message1");
662 assert_eq!(&data2[..], b"message2");
663
664 assert_eq!(segment.size(), 16);
666 }
667
668 #[tokio::test]
669 async fn test_batch_operations() {
670 let io = AsyncIo::new(AsyncIoConfig::default()).unwrap();
671 let dir = tempdir().unwrap();
672
673 let path1 = dir.path().join("batch1.dat");
674 let path2 = dir.path().join("batch2.dat");
675
676 let file1 = AsyncFile::open(&path1, io.clone()).await.unwrap();
678 let file2 = AsyncFile::open(&path2, io.clone()).await.unwrap();
679
680 file1.write_at(0, b"file1 data").await.unwrap();
681 file2.write_at(0, b"file2 data").await.unwrap();
682
683 let results = BatchBuilder::new(io)
685 .read(&path1, 0, 10)
686 .read(&path2, 0, 10)
687 .execute()
688 .await;
689
690 assert_eq!(results.len(), 2);
691
692 match &results[0] {
693 BatchResult::Read(data) => assert_eq!(&data[..], b"file1 data"),
694 _ => panic!("Expected read result"),
695 }
696
697 match &results[1] {
698 BatchResult::Read(data) => assert_eq!(&data[..], b"file2 data"),
699 _ => panic!("Expected read result"),
700 }
701 }
702
703 #[tokio::test]
704 async fn test_sync_operations() {
705 let config = AsyncIoConfig::durable();
706 let io = AsyncIo::new(config).unwrap();
707 let dir = tempdir().unwrap();
708 let path = dir.path().join("durable.dat");
709
710 let file = AsyncFile::open(&path, io.clone()).await.unwrap();
711
712 file.write_at(0, b"durable data").await.unwrap();
714
715 file.sync().await.unwrap();
717
718 let stats = io.stats().snapshot();
719 assert!(stats.sync_ops >= 1);
720 }
721
722 #[tokio::test]
723 async fn test_config_variants() {
724 let configs = vec![
726 AsyncIoConfig::default(),
727 AsyncIoConfig::high_performance(),
728 AsyncIoConfig::low_latency(),
729 AsyncIoConfig::durable(),
730 ];
731
732 for config in configs {
733 let io = AsyncIo::new(config).unwrap();
734 assert!(io.has_capacity());
735 }
736 }
737}