1use std::collections::VecDeque;
53use std::fs::File;
54use std::io::{self, Read, Seek, SeekFrom, Write};
55use std::sync::Arc;
56use std::sync::atomic::{AtomicU64, Ordering};
57
58#[cfg(target_os = "linux")]
59use io_uring::{IoUring, opcode, types};
60
61#[derive(Debug, Clone)]
63pub struct IoUringConfig {
64 pub sq_entries: u32,
66 pub cq_entries: u32,
68 pub sq_poll: bool,
70 pub sq_poll_idle_ms: u32,
72 pub use_registered_buffers: bool,
74 pub max_registered_buffers: usize,
76}
77
78impl Default for IoUringConfig {
79 fn default() -> Self {
80 Self {
81 sq_entries: 256,
82 cq_entries: 512,
83 sq_poll: false,
84 sq_poll_idle_ms: 1000,
85 use_registered_buffers: true,
86 max_registered_buffers: 64,
87 }
88 }
89}
90
91impl IoUringConfig {
92 pub fn high_throughput() -> Self {
94 Self {
95 sq_entries: 1024,
96 cq_entries: 2048,
97 sq_poll: true,
98 sq_poll_idle_ms: 2000,
99 use_registered_buffers: true,
100 max_registered_buffers: 256,
101 }
102 }
103
104 pub fn low_latency() -> Self {
106 Self {
107 sq_entries: 64,
108 cq_entries: 128,
109 sq_poll: true,
110 sq_poll_idle_ms: 100,
111 use_registered_buffers: true,
112 max_registered_buffers: 32,
113 }
114 }
115}
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
119pub enum IoOpType {
120 Read,
121 Write,
122 Fsync,
123 Fallocate,
124 Close,
125}
126
127#[derive(Debug)]
129pub struct IoOp {
130 pub op_type: IoOpType,
132 pub fd: i32,
134 pub buffer: Vec<u8>,
136 pub offset: u64,
138 pub len: usize,
140 pub user_data: u64,
142}
143
144impl IoOp {
145 pub fn read(fd: i32, offset: u64, len: usize, user_data: u64) -> Self {
147 Self {
148 op_type: IoOpType::Read,
149 fd,
150 buffer: vec![0u8; len],
151 offset,
152 len,
153 user_data,
154 }
155 }
156
157 pub fn write(fd: i32, data: Vec<u8>, offset: u64, user_data: u64) -> Self {
159 let len = data.len();
160 Self {
161 op_type: IoOpType::Write,
162 fd,
163 buffer: data,
164 offset,
165 len,
166 user_data,
167 }
168 }
169
170 pub fn fsync(fd: i32, user_data: u64) -> Self {
172 Self {
173 op_type: IoOpType::Fsync,
174 fd,
175 buffer: Vec::new(),
176 offset: 0,
177 len: 0,
178 user_data,
179 }
180 }
181}
182
183#[derive(Debug)]
185pub struct IoCompletion {
186 pub user_data: u64,
188 pub result: i32,
190 pub success: bool,
192}
193
194impl IoCompletion {
195 pub fn success(user_data: u64, result: i32) -> Self {
197 Self {
198 user_data,
199 result,
200 success: true,
201 }
202 }
203
204 pub fn failure(user_data: u64, error_code: i32) -> Self {
206 Self {
207 user_data,
208 result: error_code,
209 success: false,
210 }
211 }
212
213 pub fn bytes_transferred(&self) -> Option<usize> {
215 if self.success && self.result >= 0 {
216 Some(self.result as usize)
217 } else {
218 None
219 }
220 }
221}
222
223#[derive(Debug, Default)]
225pub struct IoUringStats {
226 pub ops_submitted: AtomicU64,
228 pub ops_completed: AtomicU64,
230 pub bytes_read: AtomicU64,
232 pub bytes_written: AtomicU64,
234 pub syscalls: AtomicU64,
236 pub ops_batched: AtomicU64,
238}
239
240impl IoUringStats {
241 pub fn new() -> Arc<Self> {
243 Arc::new(Self::default())
244 }
245
246 pub fn record_submit(&self, count: u64) {
248 self.ops_submitted.fetch_add(count, Ordering::Relaxed);
249 self.syscalls.fetch_add(1, Ordering::Relaxed);
250 if count > 1 {
251 self.ops_batched.fetch_add(count - 1, Ordering::Relaxed);
252 }
253 }
254
255 pub fn record_completion(&self, op_type: IoOpType, bytes: u64) {
257 self.ops_completed.fetch_add(1, Ordering::Relaxed);
258 match op_type {
259 IoOpType::Read => {
260 self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
261 }
262 IoOpType::Write => {
263 self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
264 }
265 _ => {}
266 }
267 }
268
269 pub fn snapshot(&self) -> IoUringStatsSnapshot {
271 IoUringStatsSnapshot {
272 ops_submitted: self.ops_submitted.load(Ordering::Relaxed),
273 ops_completed: self.ops_completed.load(Ordering::Relaxed),
274 bytes_read: self.bytes_read.load(Ordering::Relaxed),
275 bytes_written: self.bytes_written.load(Ordering::Relaxed),
276 syscalls: self.syscalls.load(Ordering::Relaxed),
277 ops_batched: self.ops_batched.load(Ordering::Relaxed),
278 }
279 }
280}
281
282#[derive(Debug, Clone)]
284pub struct IoUringStatsSnapshot {
285 pub ops_submitted: u64,
286 pub ops_completed: u64,
287 pub bytes_read: u64,
288 pub bytes_written: u64,
289 pub syscalls: u64,
290 pub ops_batched: u64,
291}
292
293impl IoUringStatsSnapshot {
294 pub fn batching_efficiency(&self) -> f64 {
296 if self.syscalls == 0 {
297 0.0
298 } else {
299 self.ops_submitted as f64 / self.syscalls as f64
300 }
301 }
302
303 pub fn bytes_per_syscall(&self) -> f64 {
305 if self.syscalls == 0 {
306 0.0
307 } else {
308 (self.bytes_read + self.bytes_written) as f64 / self.syscalls as f64
309 }
310 }
311}
312
313pub trait AsyncIoBackend: Send + Sync {
319 fn submit(&mut self, op: IoOp) -> io::Result<()>;
321
322 fn submit_batch(&mut self, ops: Vec<IoOp>) -> io::Result<()>;
324
325 fn wait_one(&mut self) -> io::Result<IoCompletion>;
327
328 fn wait_all(&mut self) -> io::Result<Vec<IoCompletion>>;
330
331 fn pending(&self) -> usize;
333
334 fn is_uring_available(&self) -> bool;
336}
337
338pub struct SyncIoBackend {
343 pending: parking_lot::Mutex<VecDeque<IoOp>>,
344 completions: parking_lot::Mutex<VecDeque<IoCompletion>>,
345 stats: Arc<IoUringStats>,
346}
347
348impl SyncIoBackend {
349 pub fn new(stats: Arc<IoUringStats>) -> Self {
351 Self {
352 pending: parking_lot::Mutex::new(VecDeque::new()),
353 completions: parking_lot::Mutex::new(VecDeque::new()),
354 stats,
355 }
356 }
357
358 fn execute(&self, mut op: IoOp) -> IoCompletion {
360 use std::os::unix::io::FromRawFd;
361
362 let result = unsafe {
363 let file = File::from_raw_fd(op.fd);
365 let res = match op.op_type {
366 IoOpType::Read => {
367 let mut file_ref = &file;
368 file_ref.seek(SeekFrom::Start(op.offset)).ok();
369 file_ref.read(&mut op.buffer)
370 }
371 IoOpType::Write => {
372 let mut file_ref = &file;
373 file_ref.seek(SeekFrom::Start(op.offset)).ok();
374 file_ref.write(&op.buffer)
375 }
376 IoOpType::Fsync => file.sync_all().map(|_| 0),
377 IoOpType::Fallocate | IoOpType::Close => Ok(0),
378 };
379 std::mem::forget(file);
381 res
382 };
383
384 match result {
385 Ok(n) => {
386 self.stats.record_completion(op.op_type, n as u64);
387 IoCompletion::success(op.user_data, n as i32)
388 }
389 Err(e) => IoCompletion::failure(op.user_data, e.raw_os_error().unwrap_or(-1)),
390 }
391 }
392}
393
394impl AsyncIoBackend for SyncIoBackend {
395 fn submit(&mut self, op: IoOp) -> io::Result<()> {
396 self.stats.record_submit(1);
397 let completion = self.execute(op);
398 self.completions.lock().push_back(completion);
399 Ok(())
400 }
401
402 fn submit_batch(&mut self, ops: Vec<IoOp>) -> io::Result<()> {
403 let count = ops.len() as u64;
404 self.stats.record_submit(count);
405
406 let completions: Vec<_> = ops.into_iter().map(|op| self.execute(op)).collect();
407 self.completions.lock().extend(completions);
408 Ok(())
409 }
410
411 fn wait_one(&mut self) -> io::Result<IoCompletion> {
412 self.completions
413 .lock()
414 .pop_front()
415 .ok_or_else(|| io::Error::new(io::ErrorKind::WouldBlock, "No completions"))
416 }
417
418 fn wait_all(&mut self) -> io::Result<Vec<IoCompletion>> {
419 Ok(self.completions.lock().drain(..).collect())
420 }
421
422 fn pending(&self) -> usize {
423 self.pending.lock().len()
424 }
425
426 fn is_uring_available(&self) -> bool {
427 false
428 }
429}
430
431#[cfg(target_os = "linux")]
436pub struct LinuxIoUringBackend {
437 uring: Option<IoUring>,
438 config: IoUringConfig,
439 pending: parking_lot::Mutex<VecDeque<IoOp>>,
440 completions: parking_lot::Mutex<VecDeque<IoCompletion>>,
441 stats: Arc<IoUringStats>,
442 uring_available: bool,
444}
445
446#[cfg(target_os = "linux")]
447impl LinuxIoUringBackend {
448 pub fn new(config: IoUringConfig, stats: Arc<IoUringStats>) -> io::Result<Self> {
450 let (uring, uring_available) = match IoUring::new(config.sq_entries) {
452 Ok(uring) => {
453 eprintln!("io_uring initialized successfully with {} entries", config.sq_entries);
454 (Some(uring), true)
455 },
456 Err(e) => {
457 eprintln!("io_uring initialization failed: {}. Falling back to sync I/O", e);
458 (None, false)
459 }
460 };
461
462 Ok(Self {
463 uring,
464 config,
465 pending: parking_lot::Mutex::new(VecDeque::new()),
466 completions: parking_lot::Mutex::new(VecDeque::new()),
467 stats,
468 uring_available,
469 })
470 }
471
472 fn check_uring_available() -> bool {
474 #[cfg(target_os = "linux")]
476 {
477 if let Ok(version) = std::fs::read_to_string("/proc/version") {
478 let parts: Vec<&str> = version.split_whitespace().collect();
480 if parts.len() >= 3 {
481 let version_parts: Vec<&str> = parts[2].split('.').collect();
482 if version_parts.len() >= 2
483 && let (Ok(major), Ok(minor)) = (
484 version_parts[0].parse::<u32>(),
485 version_parts[1].parse::<u32>(),
486 )
487 {
488 return major > 5 || (major == 5 && minor >= 1);
490 }
491 }
492 }
493 }
494 false
495 }
496
497 fn submit_to_uring(&mut self, op: IoOp) -> io::Result<()> {
499 if let Some(ref mut uring) = self.uring {
500 let mut sq = uring.submission();
501
502 let sqe = match op.op_type {
503 IoOpType::Read => {
504 opcode::Read::new(types::Fd(op.fd), op.buffer.as_ptr() as *mut u8, op.len as u32)
505 .offset(op.offset)
506 .build()
507 .user_data(op.user_data)
508 }
509 IoOpType::Write => {
510 opcode::Write::new(types::Fd(op.fd), op.buffer.as_ptr(), op.len as u32)
511 .offset(op.offset)
512 .build()
513 .user_data(op.user_data)
514 }
515 IoOpType::Fsync => {
516 opcode::Fsync::new(types::Fd(op.fd))
517 .build()
518 .user_data(op.user_data)
519 }
520 _ => return Err(io::Error::new(io::ErrorKind::Unsupported, "Operation not supported")),
521 };
522
523 unsafe {
525 sq.push(&sqe).map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to push to submission queue"))?;
526 }
527
528 sq.sync();
529 drop(sq);
530
531 uring.submit_and_wait(1)?;
533
534 let mut cq = uring.completion();
536 while let Some(cqe) = cq.next() {
537 let completion = if cqe.result() >= 0 {
538 self.stats.record_completion(op.op_type, cqe.result() as u64);
539 IoCompletion::success(cqe.user_data(), cqe.result())
540 } else {
541 IoCompletion::failure(cqe.user_data(), cqe.result())
542 };
543 self.completions.lock().push_back(completion);
544 }
545
546 Ok(())
547 } else {
548 let completion = self.simulate_execute(op);
550 self.completions.lock().push_back(completion);
551 Ok(())
552 }
553 }
554
555 fn simulate_execute(&self, mut op: IoOp) -> IoCompletion {
557 use std::os::unix::io::FromRawFd;
558
559 let result = unsafe {
560 let file = File::from_raw_fd(op.fd);
561 let res = match op.op_type {
562 IoOpType::Read => {
563 let mut file_ref = &file;
564 file_ref.seek(SeekFrom::Start(op.offset)).ok();
565 file_ref.read(&mut op.buffer)
566 }
567 IoOpType::Write => {
568 let mut file_ref = &file;
569 file_ref.seek(SeekFrom::Start(op.offset)).ok();
570 file_ref.write(&op.buffer)
571 }
572 IoOpType::Fsync => file.sync_all().map(|_| 0),
573 IoOpType::Fallocate | IoOpType::Close => Ok(0),
574 };
575 std::mem::forget(file);
576 res
577 };
578
579 match result {
580 Ok(n) => {
581 self.stats.record_completion(op.op_type, n as u64);
582 IoCompletion::success(op.user_data, n as i32)
583 }
584 Err(e) => IoCompletion::failure(op.user_data, e.raw_os_error().unwrap_or(-1)),
585 }
586 }
587}
588
589#[cfg(target_os = "linux")]
590impl AsyncIoBackend for LinuxIoUringBackend {
591 fn submit(&mut self, op: IoOp) -> io::Result<()> {
592 self.stats.record_submit(1);
593 self.submit_to_uring(op)
594 }
595
596 fn submit_batch(&mut self, ops: Vec<IoOp>) -> io::Result<()> {
597 let count = ops.len() as u64;
598 self.stats.record_submit(count);
599
600 if let Some(ref mut uring) = self.uring {
601 let mut sq = uring.submission();
602
603 for op in ops {
605 let sqe = match op.op_type {
606 IoOpType::Read => {
607 opcode::Read::new(types::Fd(op.fd), op.buffer.as_ptr() as *mut u8, op.len as u32)
608 .offset(op.offset)
609 .build()
610 .user_data(op.user_data)
611 }
612 IoOpType::Write => {
613 opcode::Write::new(types::Fd(op.fd), op.buffer.as_ptr(), op.len as u32)
614 .offset(op.offset)
615 .build()
616 .user_data(op.user_data)
617 }
618 IoOpType::Fsync => {
619 opcode::Fsync::new(types::Fd(op.fd))
620 .build()
621 .user_data(op.user_data)
622 }
623 _ => continue, };
625
626 unsafe {
628 if sq.push(&sqe).is_err() {
629 break; }
631 }
632 }
633
634 sq.sync();
635 drop(sq);
636
637 uring.submit()?;
639
640 Ok(())
641 } else {
642 let completions: Vec<_> = ops
644 .into_iter()
645 .map(|op| self.simulate_execute(op))
646 .collect();
647 self.completions.lock().extend(completions);
648 Ok(())
649 }
650 }
651
652 fn wait_one(&mut self) -> io::Result<IoCompletion> {
653 if let Some(completion) = self.completions.lock().pop_front() {
655 return Ok(completion);
656 }
657
658 if let Some(ref mut uring) = self.uring {
660 uring.submit_and_wait(1)?;
661 let mut cq = uring.completion();
662 if let Some(cqe) = cq.next() {
663 let completion = if cqe.result() >= 0 {
664 IoCompletion::success(cqe.user_data(), cqe.result())
665 } else {
666 IoCompletion::failure(cqe.user_data(), cqe.result())
667 };
668 return Ok(completion);
669 }
670 }
671
672 Err(io::Error::new(io::ErrorKind::WouldBlock, "No completions"))
673 }
674
675 fn wait_all(&mut self) -> io::Result<Vec<IoCompletion>> {
676 let mut all_completions = self.completions.lock().drain(..).collect::<Vec<_>>();
677
678 if let Some(ref mut uring) = self.uring {
680 let mut cq = uring.completion();
681 while let Some(cqe) = cq.next() {
682 let completion = if cqe.result() >= 0 {
683 IoCompletion::success(cqe.user_data(), cqe.result())
684 } else {
685 IoCompletion::failure(cqe.user_data(), cqe.result())
686 };
687 all_completions.push(completion);
688 }
689 }
690
691 Ok(all_completions)
692 }
693
694 fn pending(&self) -> usize {
695 self.pending.lock().len()
696 }
697
698 fn is_uring_available(&self) -> bool {
699 self.uring_available
700 }
701}
702
703pub fn create_backend(config: IoUringConfig, stats: Arc<IoUringStats>) -> Box<dyn AsyncIoBackend> {
705 #[cfg(target_os = "linux")]
706 {
707 match LinuxIoUringBackend::new(config, stats.clone()) {
708 Ok(backend) if backend.is_uring_available() => {
709 tracing::info!("Using Linux io_uring backend");
710 Box::new(backend)
711 }
712 _ => {
713 tracing::info!("Falling back to sync I/O backend");
714 Box::new(SyncIoBackend::new(stats))
715 }
716 }
717 }
718
719 #[cfg(not(target_os = "linux"))]
720 {
721 let _ = config; tracing::info!("Using sync I/O backend (io_uring not available on this platform)");
723 Box::new(SyncIoBackend::new(stats))
724 }
725}
726
727pub struct BatchedWriter {
729 backend: Box<dyn AsyncIoBackend>,
730 pending_ops: Vec<IoOp>,
731 batch_size: usize,
732 next_user_data: AtomicU64,
733}
734
735impl BatchedWriter {
736 pub fn new(backend: Box<dyn AsyncIoBackend>, batch_size: usize) -> Self {
738 Self {
739 backend,
740 pending_ops: Vec::with_capacity(batch_size),
741 batch_size,
742 next_user_data: AtomicU64::new(0),
743 }
744 }
745
746 pub fn write(&mut self, fd: i32, data: Vec<u8>, offset: u64) -> u64 {
748 let user_data = self.next_user_data.fetch_add(1, Ordering::Relaxed);
749 let op = IoOp::write(fd, data, offset, user_data);
750 self.pending_ops.push(op);
751
752 if self.pending_ops.len() >= self.batch_size {
753 self.flush().ok();
754 }
755
756 user_data
757 }
758
759 pub fn flush(&mut self) -> io::Result<Vec<IoCompletion>> {
761 if self.pending_ops.is_empty() {
762 return Ok(Vec::new());
763 }
764
765 let ops = std::mem::take(&mut self.pending_ops);
766 self.backend.submit_batch(ops)?;
767 self.backend.wait_all()
768 }
769
770 pub fn pending(&self) -> usize {
772 self.pending_ops.len()
773 }
774}
775
776#[cfg(test)]
777mod tests {
778 use super::*;
779
780 #[test]
781 fn test_io_uring_config() {
782 let default = IoUringConfig::default();
783 assert_eq!(default.sq_entries, 256);
784 assert!(!default.sq_poll);
785
786 let high = IoUringConfig::high_throughput();
787 assert_eq!(high.sq_entries, 1024);
788 assert!(high.sq_poll);
789
790 let low = IoUringConfig::low_latency();
791 assert_eq!(low.sq_entries, 64);
792 assert!(low.sq_poll);
793 }
794
795 #[test]
796 fn test_io_op_creation() {
797 let read_op = IoOp::read(5, 1024, 512, 42);
798 assert_eq!(read_op.op_type, IoOpType::Read);
799 assert_eq!(read_op.fd, 5);
800 assert_eq!(read_op.offset, 1024);
801 assert_eq!(read_op.len, 512);
802 assert_eq!(read_op.user_data, 42);
803
804 let write_op = IoOp::write(6, vec![1, 2, 3], 2048, 99);
805 assert_eq!(write_op.op_type, IoOpType::Write);
806 assert_eq!(write_op.buffer, vec![1, 2, 3]);
807
808 let fsync_op = IoOp::fsync(7, 100);
809 assert_eq!(fsync_op.op_type, IoOpType::Fsync);
810 }
811
812 #[test]
813 fn test_io_completion() {
814 let success = IoCompletion::success(42, 1024);
815 assert!(success.success);
816 assert_eq!(success.bytes_transferred(), Some(1024));
817
818 let failure = IoCompletion::failure(42, -5);
819 assert!(!failure.success);
820 assert_eq!(failure.bytes_transferred(), None);
821 }
822
823 #[test]
824 fn test_io_uring_stats() {
825 let stats = IoUringStats::new();
826
827 stats.record_submit(5);
828 stats.record_completion(IoOpType::Read, 1024);
829 stats.record_completion(IoOpType::Write, 512);
830
831 let snapshot = stats.snapshot();
832 assert_eq!(snapshot.ops_submitted, 5);
833 assert_eq!(snapshot.ops_completed, 2);
834 assert_eq!(snapshot.bytes_read, 1024);
835 assert_eq!(snapshot.bytes_written, 512);
836 assert_eq!(snapshot.syscalls, 1);
837 assert_eq!(snapshot.ops_batched, 4);
838 }
839
840 #[test]
841 fn test_stats_efficiency() {
842 let stats = IoUringStats::new();
843
844 stats.record_submit(5);
846 stats.record_submit(5);
847
848 for _ in 0..10 {
849 stats.record_completion(IoOpType::Write, 100);
850 }
851
852 let snapshot = stats.snapshot();
853 assert!((snapshot.batching_efficiency() - 5.0).abs() < 0.01);
854 assert!((snapshot.bytes_per_syscall() - 500.0).abs() < 0.01);
855 }
856
857 #[test]
858 fn test_sync_backend() {
859 use tempfile::NamedTempFile;
860
861 let stats = IoUringStats::new();
862 let backend = SyncIoBackend::new(stats.clone());
863
864 assert!(!backend.is_uring_available());
865 assert_eq!(backend.pending(), 0);
866
867 let mut temp = NamedTempFile::new().unwrap();
869 temp.write_all(b"hello world").unwrap();
870 temp.flush().unwrap();
871
872 let snapshot = stats.snapshot();
874 assert_eq!(snapshot.ops_submitted, 0);
875 }
876
877 #[test]
878 fn test_create_backend() {
879 let stats = IoUringStats::new();
880 let config = IoUringConfig::default();
881 let backend = create_backend(config, stats);
882
883 #[cfg(not(target_os = "linux"))]
885 assert!(!backend.is_uring_available());
886
887 assert_eq!(backend.pending(), 0);
888 }
889
890 #[test]
891 fn test_batched_writer() {
892 let stats = IoUringStats::new();
893 let backend = Box::new(SyncIoBackend::new(stats));
894 let writer = BatchedWriter::new(backend, 10);
895
896 assert_eq!(writer.pending(), 0);
897
898 }
901
902 #[cfg(target_os = "linux")]
903 #[test]
904 fn test_linux_uring_check() {
905 let stats = IoUringStats::new();
906 let config = IoUringConfig::default();
907 let backend = LinuxIoUringBackend::new(config, stats).unwrap();
908
909 println!("io_uring available: {}", backend.is_uring_available());
911 }
912}