1use std::collections::VecDeque;
56use std::fs::File;
57use std::io::{self, Read, Seek, SeekFrom, Write};
58use std::sync::Arc;
59use std::sync::atomic::{AtomicU64, Ordering};
60
61#[cfg(target_os = "linux")]
62use io_uring::{IoUring, opcode, types};
63
64#[derive(Debug, Clone)]
66pub struct IoUringConfig {
67 pub sq_entries: u32,
69 pub cq_entries: u32,
71 pub sq_poll: bool,
73 pub sq_poll_idle_ms: u32,
75 pub use_registered_buffers: bool,
77 pub max_registered_buffers: usize,
79}
80
81impl Default for IoUringConfig {
82 fn default() -> Self {
83 Self {
84 sq_entries: 256,
85 cq_entries: 512,
86 sq_poll: false,
87 sq_poll_idle_ms: 1000,
88 use_registered_buffers: true,
89 max_registered_buffers: 64,
90 }
91 }
92}
93
94impl IoUringConfig {
95 pub fn high_throughput() -> Self {
97 Self {
98 sq_entries: 1024,
99 cq_entries: 2048,
100 sq_poll: true,
101 sq_poll_idle_ms: 2000,
102 use_registered_buffers: true,
103 max_registered_buffers: 256,
104 }
105 }
106
107 pub fn low_latency() -> Self {
109 Self {
110 sq_entries: 64,
111 cq_entries: 128,
112 sq_poll: true,
113 sq_poll_idle_ms: 100,
114 use_registered_buffers: true,
115 max_registered_buffers: 32,
116 }
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum IoOpType {
123 Read,
124 Write,
125 Fsync,
126 Fallocate,
127 Close,
128}
129
130#[derive(Debug)]
132pub struct IoOp {
133 pub op_type: IoOpType,
135 pub fd: i32,
137 pub buffer: Vec<u8>,
139 pub offset: u64,
141 pub len: usize,
143 pub user_data: u64,
145}
146
147impl IoOp {
148 pub fn read(fd: i32, offset: u64, len: usize, user_data: u64) -> Self {
150 Self {
151 op_type: IoOpType::Read,
152 fd,
153 buffer: vec![0u8; len],
154 offset,
155 len,
156 user_data,
157 }
158 }
159
160 pub fn write(fd: i32, data: Vec<u8>, offset: u64, user_data: u64) -> Self {
162 let len = data.len();
163 Self {
164 op_type: IoOpType::Write,
165 fd,
166 buffer: data,
167 offset,
168 len,
169 user_data,
170 }
171 }
172
173 pub fn fsync(fd: i32, user_data: u64) -> Self {
175 Self {
176 op_type: IoOpType::Fsync,
177 fd,
178 buffer: Vec::new(),
179 offset: 0,
180 len: 0,
181 user_data,
182 }
183 }
184}
185
186#[derive(Debug)]
188pub struct IoCompletion {
189 pub user_data: u64,
191 pub result: i32,
193 pub success: bool,
195}
196
197impl IoCompletion {
198 pub fn success(user_data: u64, result: i32) -> Self {
200 Self {
201 user_data,
202 result,
203 success: true,
204 }
205 }
206
207 pub fn failure(user_data: u64, error_code: i32) -> Self {
209 Self {
210 user_data,
211 result: error_code,
212 success: false,
213 }
214 }
215
216 pub fn bytes_transferred(&self) -> Option<usize> {
218 if self.success && self.result >= 0 {
219 Some(self.result as usize)
220 } else {
221 None
222 }
223 }
224}
225
226#[derive(Debug, Default)]
228pub struct IoUringStats {
229 pub ops_submitted: AtomicU64,
231 pub ops_completed: AtomicU64,
233 pub bytes_read: AtomicU64,
235 pub bytes_written: AtomicU64,
237 pub syscalls: AtomicU64,
239 pub ops_batched: AtomicU64,
241}
242
243impl IoUringStats {
244 pub fn new() -> Arc<Self> {
246 Arc::new(Self::default())
247 }
248
249 pub fn record_submit(&self, count: u64) {
251 self.ops_submitted.fetch_add(count, Ordering::Relaxed);
252 self.syscalls.fetch_add(1, Ordering::Relaxed);
253 if count > 1 {
254 self.ops_batched.fetch_add(count - 1, Ordering::Relaxed);
255 }
256 }
257
258 pub fn record_completion(&self, op_type: IoOpType, bytes: u64) {
260 self.ops_completed.fetch_add(1, Ordering::Relaxed);
261 match op_type {
262 IoOpType::Read => {
263 self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
264 }
265 IoOpType::Write => {
266 self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
267 }
268 _ => {}
269 }
270 }
271
272 pub fn snapshot(&self) -> IoUringStatsSnapshot {
274 IoUringStatsSnapshot {
275 ops_submitted: self.ops_submitted.load(Ordering::Relaxed),
276 ops_completed: self.ops_completed.load(Ordering::Relaxed),
277 bytes_read: self.bytes_read.load(Ordering::Relaxed),
278 bytes_written: self.bytes_written.load(Ordering::Relaxed),
279 syscalls: self.syscalls.load(Ordering::Relaxed),
280 ops_batched: self.ops_batched.load(Ordering::Relaxed),
281 }
282 }
283}
284
285#[derive(Debug, Clone)]
287pub struct IoUringStatsSnapshot {
288 pub ops_submitted: u64,
289 pub ops_completed: u64,
290 pub bytes_read: u64,
291 pub bytes_written: u64,
292 pub syscalls: u64,
293 pub ops_batched: u64,
294}
295
296impl IoUringStatsSnapshot {
297 pub fn batching_efficiency(&self) -> f64 {
299 if self.syscalls == 0 {
300 0.0
301 } else {
302 self.ops_submitted as f64 / self.syscalls as f64
303 }
304 }
305
306 pub fn bytes_per_syscall(&self) -> f64 {
308 if self.syscalls == 0 {
309 0.0
310 } else {
311 (self.bytes_read + self.bytes_written) as f64 / self.syscalls as f64
312 }
313 }
314}
315
316pub trait AsyncIoBackend: Send + Sync {
322 fn submit(&mut self, op: IoOp) -> io::Result<()>;
324
325 fn submit_batch(&mut self, ops: Vec<IoOp>) -> io::Result<()>;
327
328 fn wait_one(&mut self) -> io::Result<IoCompletion>;
330
331 fn wait_all(&mut self) -> io::Result<Vec<IoCompletion>>;
333
334 fn pending(&self) -> usize;
336
337 fn is_uring_available(&self) -> bool;
339}
340
341pub struct SyncIoBackend {
346 pending: parking_lot::Mutex<VecDeque<IoOp>>,
347 completions: parking_lot::Mutex<VecDeque<IoCompletion>>,
348 stats: Arc<IoUringStats>,
349}
350
351impl SyncIoBackend {
352 pub fn new(stats: Arc<IoUringStats>) -> Self {
354 Self {
355 pending: parking_lot::Mutex::new(VecDeque::new()),
356 completions: parking_lot::Mutex::new(VecDeque::new()),
357 stats,
358 }
359 }
360
361 fn execute(&self, mut op: IoOp) -> IoCompletion {
363 use std::os::unix::io::FromRawFd;
364
365 let result = unsafe {
366 let file = File::from_raw_fd(op.fd);
368 let res = match op.op_type {
369 IoOpType::Read => {
370 let mut file_ref = &file;
371 file_ref.seek(SeekFrom::Start(op.offset)).ok();
372 file_ref.read(&mut op.buffer)
373 }
374 IoOpType::Write => {
375 let mut file_ref = &file;
376 file_ref.seek(SeekFrom::Start(op.offset)).ok();
377 file_ref.write(&op.buffer)
378 }
379 IoOpType::Fsync => file.sync_all().map(|_| 0),
380 IoOpType::Fallocate | IoOpType::Close => Ok(0),
381 };
382 std::mem::forget(file);
384 res
385 };
386
387 match result {
388 Ok(n) => {
389 self.stats.record_completion(op.op_type, n as u64);
390 IoCompletion::success(op.user_data, n as i32)
391 }
392 Err(e) => IoCompletion::failure(op.user_data, e.raw_os_error().unwrap_or(-1)),
393 }
394 }
395}
396
397impl AsyncIoBackend for SyncIoBackend {
398 fn submit(&mut self, op: IoOp) -> io::Result<()> {
399 self.stats.record_submit(1);
400 let completion = self.execute(op);
401 self.completions.lock().push_back(completion);
402 Ok(())
403 }
404
405 fn submit_batch(&mut self, ops: Vec<IoOp>) -> io::Result<()> {
406 let count = ops.len() as u64;
407 self.stats.record_submit(count);
408
409 let completions: Vec<_> = ops.into_iter().map(|op| self.execute(op)).collect();
410 self.completions.lock().extend(completions);
411 Ok(())
412 }
413
414 fn wait_one(&mut self) -> io::Result<IoCompletion> {
415 self.completions
416 .lock()
417 .pop_front()
418 .ok_or_else(|| io::Error::new(io::ErrorKind::WouldBlock, "No completions"))
419 }
420
421 fn wait_all(&mut self) -> io::Result<Vec<IoCompletion>> {
422 Ok(self.completions.lock().drain(..).collect())
423 }
424
425 fn pending(&self) -> usize {
426 self.pending.lock().len()
427 }
428
429 fn is_uring_available(&self) -> bool {
430 false
431 }
432}
433
434#[cfg(target_os = "linux")]
439pub struct LinuxIoUringBackend {
440 uring: Option<IoUring>,
441 config: IoUringConfig,
442 pending: parking_lot::Mutex<VecDeque<IoOp>>,
443 completions: parking_lot::Mutex<VecDeque<IoCompletion>>,
444 stats: Arc<IoUringStats>,
445 uring_available: bool,
447}
448
449#[cfg(target_os = "linux")]
450impl LinuxIoUringBackend {
451 pub fn new(config: IoUringConfig, stats: Arc<IoUringStats>) -> io::Result<Self> {
453 let (uring, uring_available) = match IoUring::new(config.sq_entries) {
455 Ok(uring) => {
456 eprintln!(
457 "io_uring initialized successfully with {} entries",
458 config.sq_entries
459 );
460 (Some(uring), true)
461 }
462 Err(e) => {
463 eprintln!(
464 "io_uring initialization failed: {}. Falling back to sync I/O",
465 e
466 );
467 (None, false)
468 }
469 };
470
471 Ok(Self {
472 uring,
473 config,
474 pending: parking_lot::Mutex::new(VecDeque::new()),
475 completions: parking_lot::Mutex::new(VecDeque::new()),
476 stats,
477 uring_available,
478 })
479 }
480
481 fn check_uring_available() -> bool {
483 #[cfg(target_os = "linux")]
485 {
486 if let Ok(version) = std::fs::read_to_string("/proc/version") {
487 let parts: Vec<&str> = version.split_whitespace().collect();
489 if parts.len() >= 3 {
490 let version_parts: Vec<&str> = parts[2].split('.').collect();
491 if version_parts.len() >= 2
492 && let (Ok(major), Ok(minor)) = (
493 version_parts[0].parse::<u32>(),
494 version_parts[1].parse::<u32>(),
495 )
496 {
497 return major > 5 || (major == 5 && minor >= 1);
499 }
500 }
501 }
502 }
503 false
504 }
505
506 fn submit_to_uring(&mut self, op: IoOp) -> io::Result<()> {
508 if let Some(ref mut uring) = self.uring {
509 let mut sq = uring.submission();
510
511 let sqe = match op.op_type {
512 IoOpType::Read => opcode::Read::new(
513 types::Fd(op.fd),
514 op.buffer.as_ptr() as *mut u8,
515 op.len as u32,
516 )
517 .offset(op.offset)
518 .build()
519 .user_data(op.user_data),
520 IoOpType::Write => {
521 opcode::Write::new(types::Fd(op.fd), op.buffer.as_ptr(), op.len as u32)
522 .offset(op.offset)
523 .build()
524 .user_data(op.user_data)
525 }
526 IoOpType::Fsync => opcode::Fsync::new(types::Fd(op.fd))
527 .build()
528 .user_data(op.user_data),
529 _ => {
530 return Err(io::Error::new(
531 io::ErrorKind::Unsupported,
532 "Operation not supported",
533 ));
534 }
535 };
536
537 unsafe {
539 sq.push(&sqe).map_err(|_| {
540 io::Error::new(io::ErrorKind::Other, "Failed to push to submission queue")
541 })?;
542 }
543
544 sq.sync();
545 drop(sq);
546
547 uring.submit_and_wait(1)?;
549
550 let mut cq = uring.completion();
552 while let Some(cqe) = cq.next() {
553 let completion = if cqe.result() >= 0 {
554 self.stats
555 .record_completion(op.op_type, cqe.result() as u64);
556 IoCompletion::success(cqe.user_data(), cqe.result())
557 } else {
558 IoCompletion::failure(cqe.user_data(), cqe.result())
559 };
560 self.completions.lock().push_back(completion);
561 }
562
563 Ok(())
564 } else {
565 let completion = self.simulate_execute(op);
567 self.completions.lock().push_back(completion);
568 Ok(())
569 }
570 }
571
572 fn simulate_execute(&self, mut op: IoOp) -> IoCompletion {
574 use std::os::unix::io::FromRawFd;
575
576 let result = unsafe {
577 let file = File::from_raw_fd(op.fd);
578 let res = match op.op_type {
579 IoOpType::Read => {
580 let mut file_ref = &file;
581 file_ref.seek(SeekFrom::Start(op.offset)).ok();
582 file_ref.read(&mut op.buffer)
583 }
584 IoOpType::Write => {
585 let mut file_ref = &file;
586 file_ref.seek(SeekFrom::Start(op.offset)).ok();
587 file_ref.write(&op.buffer)
588 }
589 IoOpType::Fsync => file.sync_all().map(|_| 0),
590 IoOpType::Fallocate | IoOpType::Close => Ok(0),
591 };
592 std::mem::forget(file);
593 res
594 };
595
596 match result {
597 Ok(n) => {
598 self.stats.record_completion(op.op_type, n as u64);
599 IoCompletion::success(op.user_data, n as i32)
600 }
601 Err(e) => IoCompletion::failure(op.user_data, e.raw_os_error().unwrap_or(-1)),
602 }
603 }
604}
605
606#[cfg(target_os = "linux")]
607impl AsyncIoBackend for LinuxIoUringBackend {
608 fn submit(&mut self, op: IoOp) -> io::Result<()> {
609 self.stats.record_submit(1);
610 self.submit_to_uring(op)
611 }
612
613 fn submit_batch(&mut self, ops: Vec<IoOp>) -> io::Result<()> {
614 let count = ops.len() as u64;
615 self.stats.record_submit(count);
616
617 if let Some(ref mut uring) = self.uring {
618 let mut sq = uring.submission();
619
620 for op in ops {
622 let sqe = match op.op_type {
623 IoOpType::Read => opcode::Read::new(
624 types::Fd(op.fd),
625 op.buffer.as_ptr() as *mut u8,
626 op.len as u32,
627 )
628 .offset(op.offset)
629 .build()
630 .user_data(op.user_data),
631 IoOpType::Write => {
632 opcode::Write::new(types::Fd(op.fd), op.buffer.as_ptr(), op.len as u32)
633 .offset(op.offset)
634 .build()
635 .user_data(op.user_data)
636 }
637 IoOpType::Fsync => opcode::Fsync::new(types::Fd(op.fd))
638 .build()
639 .user_data(op.user_data),
640 _ => continue, };
642
643 unsafe {
645 if sq.push(&sqe).is_err() {
646 break; }
648 }
649 }
650
651 sq.sync();
652 drop(sq);
653
654 uring.submit()?;
656
657 Ok(())
658 } else {
659 let completions: Vec<_> = ops
661 .into_iter()
662 .map(|op| self.simulate_execute(op))
663 .collect();
664 self.completions.lock().extend(completions);
665 Ok(())
666 }
667 }
668
669 fn wait_one(&mut self) -> io::Result<IoCompletion> {
670 if let Some(completion) = self.completions.lock().pop_front() {
672 return Ok(completion);
673 }
674
675 if let Some(ref mut uring) = self.uring {
677 uring.submit_and_wait(1)?;
678 let mut cq = uring.completion();
679 if let Some(cqe) = cq.next() {
680 let completion = if cqe.result() >= 0 {
681 IoCompletion::success(cqe.user_data(), cqe.result())
682 } else {
683 IoCompletion::failure(cqe.user_data(), cqe.result())
684 };
685 return Ok(completion);
686 }
687 }
688
689 Err(io::Error::new(io::ErrorKind::WouldBlock, "No completions"))
690 }
691
692 fn wait_all(&mut self) -> io::Result<Vec<IoCompletion>> {
693 let mut all_completions = self.completions.lock().drain(..).collect::<Vec<_>>();
694
695 if let Some(ref mut uring) = self.uring {
697 let mut cq = uring.completion();
698 while let Some(cqe) = cq.next() {
699 let completion = if cqe.result() >= 0 {
700 IoCompletion::success(cqe.user_data(), cqe.result())
701 } else {
702 IoCompletion::failure(cqe.user_data(), cqe.result())
703 };
704 all_completions.push(completion);
705 }
706 }
707
708 Ok(all_completions)
709 }
710
711 fn pending(&self) -> usize {
712 self.pending.lock().len()
713 }
714
715 fn is_uring_available(&self) -> bool {
716 self.uring_available
717 }
718}
719
720pub fn create_backend(config: IoUringConfig, stats: Arc<IoUringStats>) -> Box<dyn AsyncIoBackend> {
722 #[cfg(target_os = "linux")]
723 {
724 match LinuxIoUringBackend::new(config, stats.clone()) {
725 Ok(backend) if backend.is_uring_available() => {
726 tracing::info!("Using Linux io_uring backend");
727 Box::new(backend)
728 }
729 _ => {
730 tracing::info!("Falling back to sync I/O backend");
731 Box::new(SyncIoBackend::new(stats))
732 }
733 }
734 }
735
736 #[cfg(not(target_os = "linux"))]
737 {
738 let _ = config; tracing::info!("Using sync I/O backend (io_uring not available on this platform)");
740 Box::new(SyncIoBackend::new(stats))
741 }
742}
743
744pub struct BatchedWriter {
746 backend: Box<dyn AsyncIoBackend>,
747 pending_ops: Vec<IoOp>,
748 batch_size: usize,
749 next_user_data: AtomicU64,
750}
751
752impl BatchedWriter {
753 pub fn new(backend: Box<dyn AsyncIoBackend>, batch_size: usize) -> Self {
755 Self {
756 backend,
757 pending_ops: Vec::with_capacity(batch_size),
758 batch_size,
759 next_user_data: AtomicU64::new(0),
760 }
761 }
762
763 pub fn write(&mut self, fd: i32, data: Vec<u8>, offset: u64) -> u64 {
765 let user_data = self.next_user_data.fetch_add(1, Ordering::Relaxed);
766 let op = IoOp::write(fd, data, offset, user_data);
767 self.pending_ops.push(op);
768
769 if self.pending_ops.len() >= self.batch_size {
770 self.flush().ok();
771 }
772
773 user_data
774 }
775
776 pub fn flush(&mut self) -> io::Result<Vec<IoCompletion>> {
778 if self.pending_ops.is_empty() {
779 return Ok(Vec::new());
780 }
781
782 let ops = std::mem::take(&mut self.pending_ops);
783 self.backend.submit_batch(ops)?;
784 self.backend.wait_all()
785 }
786
787 pub fn pending(&self) -> usize {
789 self.pending_ops.len()
790 }
791}
792
793#[cfg(test)]
794mod tests {
795 use super::*;
796
797 #[test]
798 fn test_io_uring_config() {
799 let default = IoUringConfig::default();
800 assert_eq!(default.sq_entries, 256);
801 assert!(!default.sq_poll);
802
803 let high = IoUringConfig::high_throughput();
804 assert_eq!(high.sq_entries, 1024);
805 assert!(high.sq_poll);
806
807 let low = IoUringConfig::low_latency();
808 assert_eq!(low.sq_entries, 64);
809 assert!(low.sq_poll);
810 }
811
812 #[test]
813 fn test_io_op_creation() {
814 let read_op = IoOp::read(5, 1024, 512, 42);
815 assert_eq!(read_op.op_type, IoOpType::Read);
816 assert_eq!(read_op.fd, 5);
817 assert_eq!(read_op.offset, 1024);
818 assert_eq!(read_op.len, 512);
819 assert_eq!(read_op.user_data, 42);
820
821 let write_op = IoOp::write(6, vec![1, 2, 3], 2048, 99);
822 assert_eq!(write_op.op_type, IoOpType::Write);
823 assert_eq!(write_op.buffer, vec![1, 2, 3]);
824
825 let fsync_op = IoOp::fsync(7, 100);
826 assert_eq!(fsync_op.op_type, IoOpType::Fsync);
827 }
828
829 #[test]
830 fn test_io_completion() {
831 let success = IoCompletion::success(42, 1024);
832 assert!(success.success);
833 assert_eq!(success.bytes_transferred(), Some(1024));
834
835 let failure = IoCompletion::failure(42, -5);
836 assert!(!failure.success);
837 assert_eq!(failure.bytes_transferred(), None);
838 }
839
840 #[test]
841 fn test_io_uring_stats() {
842 let stats = IoUringStats::new();
843
844 stats.record_submit(5);
845 stats.record_completion(IoOpType::Read, 1024);
846 stats.record_completion(IoOpType::Write, 512);
847
848 let snapshot = stats.snapshot();
849 assert_eq!(snapshot.ops_submitted, 5);
850 assert_eq!(snapshot.ops_completed, 2);
851 assert_eq!(snapshot.bytes_read, 1024);
852 assert_eq!(snapshot.bytes_written, 512);
853 assert_eq!(snapshot.syscalls, 1);
854 assert_eq!(snapshot.ops_batched, 4);
855 }
856
857 #[test]
858 fn test_stats_efficiency() {
859 let stats = IoUringStats::new();
860
861 stats.record_submit(5);
863 stats.record_submit(5);
864
865 for _ in 0..10 {
866 stats.record_completion(IoOpType::Write, 100);
867 }
868
869 let snapshot = stats.snapshot();
870 assert!((snapshot.batching_efficiency() - 5.0).abs() < 0.01);
871 assert!((snapshot.bytes_per_syscall() - 500.0).abs() < 0.01);
872 }
873
874 #[test]
875 fn test_sync_backend() {
876 use tempfile::NamedTempFile;
877
878 let stats = IoUringStats::new();
879 let backend = SyncIoBackend::new(stats.clone());
880
881 assert!(!backend.is_uring_available());
882 assert_eq!(backend.pending(), 0);
883
884 let mut temp = NamedTempFile::new().unwrap();
886 temp.write_all(b"hello world").unwrap();
887 temp.flush().unwrap();
888
889 let snapshot = stats.snapshot();
891 assert_eq!(snapshot.ops_submitted, 0);
892 }
893
894 #[test]
895 fn test_create_backend() {
896 let stats = IoUringStats::new();
897 let config = IoUringConfig::default();
898 let backend = create_backend(config, stats);
899
900 #[cfg(not(target_os = "linux"))]
902 assert!(!backend.is_uring_available());
903
904 assert_eq!(backend.pending(), 0);
905 }
906
907 #[test]
908 fn test_batched_writer() {
909 let stats = IoUringStats::new();
910 let backend = Box::new(SyncIoBackend::new(stats));
911 let writer = BatchedWriter::new(backend, 10);
912
913 assert_eq!(writer.pending(), 0);
914
915 }
918
919 #[cfg(target_os = "linux")]
920 #[test]
921 fn test_linux_uring_check() {
922 let stats = IoUringStats::new();
923 let config = IoUringConfig::default();
924 let backend = LinuxIoUringBackend::new(config, stats).unwrap();
925
926 println!("io_uring available: {}", backend.is_uring_available());
928 }
929}