1use std::collections::VecDeque;
56use std::fs::File;
57use std::io::{self, Read, Seek, SeekFrom, Write};
58use std::sync::Arc;
59use std::sync::atomic::{AtomicU64, Ordering};
60
61#[cfg(target_os = "linux")]
62use io_uring::{IoUring, opcode, types};
63
64#[derive(Debug, Clone)]
66pub struct IoUringConfig {
67 pub sq_entries: u32,
69 pub cq_entries: u32,
71 pub sq_poll: bool,
73 pub sq_poll_idle_ms: u32,
75 pub use_registered_buffers: bool,
77 pub max_registered_buffers: usize,
79}
80
81impl Default for IoUringConfig {
82 fn default() -> Self {
83 Self {
84 sq_entries: 256,
85 cq_entries: 512,
86 sq_poll: false,
87 sq_poll_idle_ms: 1000,
88 use_registered_buffers: true,
89 max_registered_buffers: 64,
90 }
91 }
92}
93
94impl IoUringConfig {
95 pub fn high_throughput() -> Self {
97 Self {
98 sq_entries: 1024,
99 cq_entries: 2048,
100 sq_poll: true,
101 sq_poll_idle_ms: 2000,
102 use_registered_buffers: true,
103 max_registered_buffers: 256,
104 }
105 }
106
107 pub fn low_latency() -> Self {
109 Self {
110 sq_entries: 64,
111 cq_entries: 128,
112 sq_poll: true,
113 sq_poll_idle_ms: 100,
114 use_registered_buffers: true,
115 max_registered_buffers: 32,
116 }
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum IoOpType {
123 Read,
124 Write,
125 Fsync,
126 Fallocate,
127 Close,
128}
129
130#[derive(Debug)]
132pub struct IoOp {
133 pub op_type: IoOpType,
135 pub fd: i32,
137 pub buffer: Vec<u8>,
139 pub offset: u64,
141 pub len: usize,
143 pub user_data: u64,
145}
146
147impl IoOp {
148 pub fn read(fd: i32, offset: u64, len: usize, user_data: u64) -> Self {
150 Self {
151 op_type: IoOpType::Read,
152 fd,
153 buffer: vec![0u8; len],
154 offset,
155 len,
156 user_data,
157 }
158 }
159
160 pub fn write(fd: i32, data: Vec<u8>, offset: u64, user_data: u64) -> Self {
162 let len = data.len();
163 Self {
164 op_type: IoOpType::Write,
165 fd,
166 buffer: data,
167 offset,
168 len,
169 user_data,
170 }
171 }
172
173 pub fn fsync(fd: i32, user_data: u64) -> Self {
175 Self {
176 op_type: IoOpType::Fsync,
177 fd,
178 buffer: Vec::new(),
179 offset: 0,
180 len: 0,
181 user_data,
182 }
183 }
184}
185
186#[derive(Debug)]
188pub struct IoCompletion {
189 pub user_data: u64,
191 pub result: i32,
193 pub success: bool,
195}
196
197impl IoCompletion {
198 pub fn success(user_data: u64, result: i32) -> Self {
200 Self {
201 user_data,
202 result,
203 success: true,
204 }
205 }
206
207 pub fn failure(user_data: u64, error_code: i32) -> Self {
209 Self {
210 user_data,
211 result: error_code,
212 success: false,
213 }
214 }
215
216 pub fn bytes_transferred(&self) -> Option<usize> {
218 if self.success && self.result >= 0 {
219 Some(self.result as usize)
220 } else {
221 None
222 }
223 }
224}
225
226#[derive(Debug, Default)]
228pub struct IoUringStats {
229 pub ops_submitted: AtomicU64,
231 pub ops_completed: AtomicU64,
233 pub bytes_read: AtomicU64,
235 pub bytes_written: AtomicU64,
237 pub syscalls: AtomicU64,
239 pub ops_batched: AtomicU64,
241}
242
243impl IoUringStats {
244 pub fn new() -> Arc<Self> {
246 Arc::new(Self::default())
247 }
248
249 pub fn record_submit(&self, count: u64) {
251 self.ops_submitted.fetch_add(count, Ordering::Relaxed);
252 self.syscalls.fetch_add(1, Ordering::Relaxed);
253 if count > 1 {
254 self.ops_batched.fetch_add(count - 1, Ordering::Relaxed);
255 }
256 }
257
258 pub fn record_completion(&self, op_type: IoOpType, bytes: u64) {
260 self.ops_completed.fetch_add(1, Ordering::Relaxed);
261 match op_type {
262 IoOpType::Read => {
263 self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
264 }
265 IoOpType::Write => {
266 self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
267 }
268 _ => {}
269 }
270 }
271
272 pub fn snapshot(&self) -> IoUringStatsSnapshot {
274 IoUringStatsSnapshot {
275 ops_submitted: self.ops_submitted.load(Ordering::Relaxed),
276 ops_completed: self.ops_completed.load(Ordering::Relaxed),
277 bytes_read: self.bytes_read.load(Ordering::Relaxed),
278 bytes_written: self.bytes_written.load(Ordering::Relaxed),
279 syscalls: self.syscalls.load(Ordering::Relaxed),
280 ops_batched: self.ops_batched.load(Ordering::Relaxed),
281 }
282 }
283}
284
285#[derive(Debug, Clone)]
287pub struct IoUringStatsSnapshot {
288 pub ops_submitted: u64,
289 pub ops_completed: u64,
290 pub bytes_read: u64,
291 pub bytes_written: u64,
292 pub syscalls: u64,
293 pub ops_batched: u64,
294}
295
296impl IoUringStatsSnapshot {
297 pub fn batching_efficiency(&self) -> f64 {
299 if self.syscalls == 0 {
300 0.0
301 } else {
302 self.ops_submitted as f64 / self.syscalls as f64
303 }
304 }
305
306 pub fn bytes_per_syscall(&self) -> f64 {
308 if self.syscalls == 0 {
309 0.0
310 } else {
311 (self.bytes_read + self.bytes_written) as f64 / self.syscalls as f64
312 }
313 }
314}
315
316pub trait AsyncIoBackend: Send + Sync {
322 fn submit(&mut self, op: IoOp) -> io::Result<()>;
324
325 fn submit_batch(&mut self, ops: Vec<IoOp>) -> io::Result<()>;
327
328 fn wait_one(&mut self) -> io::Result<IoCompletion>;
330
331 fn wait_all(&mut self) -> io::Result<Vec<IoCompletion>>;
333
334 fn pending(&self) -> usize;
336
337 fn is_uring_available(&self) -> bool;
339}
340
341pub struct SyncIoBackend {
346 pending: parking_lot::Mutex<VecDeque<IoOp>>,
347 completions: parking_lot::Mutex<VecDeque<IoCompletion>>,
348 stats: Arc<IoUringStats>,
349}
350
351impl SyncIoBackend {
352 pub fn new(stats: Arc<IoUringStats>) -> Self {
354 Self {
355 pending: parking_lot::Mutex::new(VecDeque::new()),
356 completions: parking_lot::Mutex::new(VecDeque::new()),
357 stats,
358 }
359 }
360
361 fn execute(&self, mut op: IoOp) -> IoCompletion {
363 use std::os::unix::io::FromRawFd;
364
365 let result = unsafe {
366 let file = File::from_raw_fd(op.fd);
368 let res = match op.op_type {
369 IoOpType::Read => {
370 let mut file_ref = &file;
371 file_ref.seek(SeekFrom::Start(op.offset)).ok();
372 file_ref.read(&mut op.buffer)
373 }
374 IoOpType::Write => {
375 let mut file_ref = &file;
376 file_ref.seek(SeekFrom::Start(op.offset)).ok();
377 file_ref.write(&op.buffer)
378 }
379 IoOpType::Fsync => file.sync_all().map(|_| 0),
380 IoOpType::Fallocate | IoOpType::Close => Ok(0),
381 };
382 std::mem::forget(file);
384 res
385 };
386
387 match result {
388 Ok(n) => {
389 self.stats.record_completion(op.op_type, n as u64);
390 IoCompletion::success(op.user_data, n as i32)
391 }
392 Err(e) => IoCompletion::failure(op.user_data, e.raw_os_error().unwrap_or(-1)),
393 }
394 }
395}
396
397impl AsyncIoBackend for SyncIoBackend {
398 fn submit(&mut self, op: IoOp) -> io::Result<()> {
399 self.stats.record_submit(1);
400 let completion = self.execute(op);
401 self.completions.lock().push_back(completion);
402 Ok(())
403 }
404
405 fn submit_batch(&mut self, ops: Vec<IoOp>) -> io::Result<()> {
406 let count = ops.len() as u64;
407 self.stats.record_submit(count);
408
409 let completions: Vec<_> = ops.into_iter().map(|op| self.execute(op)).collect();
410 self.completions.lock().extend(completions);
411 Ok(())
412 }
413
414 fn wait_one(&mut self) -> io::Result<IoCompletion> {
415 self.completions
416 .lock()
417 .pop_front()
418 .ok_or_else(|| io::Error::new(io::ErrorKind::WouldBlock, "No completions"))
419 }
420
421 fn wait_all(&mut self) -> io::Result<Vec<IoCompletion>> {
422 Ok(self.completions.lock().drain(..).collect())
423 }
424
425 fn pending(&self) -> usize {
426 self.pending.lock().len()
427 }
428
429 fn is_uring_available(&self) -> bool {
430 false
431 }
432}
433
434#[cfg(target_os = "linux")]
439pub struct LinuxIoUringBackend {
440 uring: Option<IoUring>,
441 config: IoUringConfig,
442 pending: parking_lot::Mutex<VecDeque<IoOp>>,
443 completions: parking_lot::Mutex<VecDeque<IoCompletion>>,
444 stats: Arc<IoUringStats>,
445 uring_available: bool,
447}
448
449#[cfg(target_os = "linux")]
450impl LinuxIoUringBackend {
451 pub fn new(config: IoUringConfig, stats: Arc<IoUringStats>) -> io::Result<Self> {
453 let (uring, uring_available) = match IoUring::new(config.sq_entries) {
455 Ok(uring) => {
456 eprintln!("io_uring initialized successfully with {} entries", config.sq_entries);
457 (Some(uring), true)
458 },
459 Err(e) => {
460 eprintln!("io_uring initialization failed: {}. Falling back to sync I/O", e);
461 (None, false)
462 }
463 };
464
465 Ok(Self {
466 uring,
467 config,
468 pending: parking_lot::Mutex::new(VecDeque::new()),
469 completions: parking_lot::Mutex::new(VecDeque::new()),
470 stats,
471 uring_available,
472 })
473 }
474
475 fn check_uring_available() -> bool {
477 #[cfg(target_os = "linux")]
479 {
480 if let Ok(version) = std::fs::read_to_string("/proc/version") {
481 let parts: Vec<&str> = version.split_whitespace().collect();
483 if parts.len() >= 3 {
484 let version_parts: Vec<&str> = parts[2].split('.').collect();
485 if version_parts.len() >= 2
486 && let (Ok(major), Ok(minor)) = (
487 version_parts[0].parse::<u32>(),
488 version_parts[1].parse::<u32>(),
489 )
490 {
491 return major > 5 || (major == 5 && minor >= 1);
493 }
494 }
495 }
496 }
497 false
498 }
499
500 fn submit_to_uring(&mut self, op: IoOp) -> io::Result<()> {
502 if let Some(ref mut uring) = self.uring {
503 let mut sq = uring.submission();
504
505 let sqe = match op.op_type {
506 IoOpType::Read => {
507 opcode::Read::new(types::Fd(op.fd), op.buffer.as_ptr() as *mut u8, op.len as u32)
508 .offset(op.offset)
509 .build()
510 .user_data(op.user_data)
511 }
512 IoOpType::Write => {
513 opcode::Write::new(types::Fd(op.fd), op.buffer.as_ptr(), op.len as u32)
514 .offset(op.offset)
515 .build()
516 .user_data(op.user_data)
517 }
518 IoOpType::Fsync => {
519 opcode::Fsync::new(types::Fd(op.fd))
520 .build()
521 .user_data(op.user_data)
522 }
523 _ => return Err(io::Error::new(io::ErrorKind::Unsupported, "Operation not supported")),
524 };
525
526 unsafe {
528 sq.push(&sqe).map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to push to submission queue"))?;
529 }
530
531 sq.sync();
532 drop(sq);
533
534 uring.submit_and_wait(1)?;
536
537 let mut cq = uring.completion();
539 while let Some(cqe) = cq.next() {
540 let completion = if cqe.result() >= 0 {
541 self.stats.record_completion(op.op_type, cqe.result() as u64);
542 IoCompletion::success(cqe.user_data(), cqe.result())
543 } else {
544 IoCompletion::failure(cqe.user_data(), cqe.result())
545 };
546 self.completions.lock().push_back(completion);
547 }
548
549 Ok(())
550 } else {
551 let completion = self.simulate_execute(op);
553 self.completions.lock().push_back(completion);
554 Ok(())
555 }
556 }
557
558 fn simulate_execute(&self, mut op: IoOp) -> IoCompletion {
560 use std::os::unix::io::FromRawFd;
561
562 let result = unsafe {
563 let file = File::from_raw_fd(op.fd);
564 let res = match op.op_type {
565 IoOpType::Read => {
566 let mut file_ref = &file;
567 file_ref.seek(SeekFrom::Start(op.offset)).ok();
568 file_ref.read(&mut op.buffer)
569 }
570 IoOpType::Write => {
571 let mut file_ref = &file;
572 file_ref.seek(SeekFrom::Start(op.offset)).ok();
573 file_ref.write(&op.buffer)
574 }
575 IoOpType::Fsync => file.sync_all().map(|_| 0),
576 IoOpType::Fallocate | IoOpType::Close => Ok(0),
577 };
578 std::mem::forget(file);
579 res
580 };
581
582 match result {
583 Ok(n) => {
584 self.stats.record_completion(op.op_type, n as u64);
585 IoCompletion::success(op.user_data, n as i32)
586 }
587 Err(e) => IoCompletion::failure(op.user_data, e.raw_os_error().unwrap_or(-1)),
588 }
589 }
590}
591
592#[cfg(target_os = "linux")]
593impl AsyncIoBackend for LinuxIoUringBackend {
594 fn submit(&mut self, op: IoOp) -> io::Result<()> {
595 self.stats.record_submit(1);
596 self.submit_to_uring(op)
597 }
598
599 fn submit_batch(&mut self, ops: Vec<IoOp>) -> io::Result<()> {
600 let count = ops.len() as u64;
601 self.stats.record_submit(count);
602
603 if let Some(ref mut uring) = self.uring {
604 let mut sq = uring.submission();
605
606 for op in ops {
608 let sqe = match op.op_type {
609 IoOpType::Read => {
610 opcode::Read::new(types::Fd(op.fd), op.buffer.as_ptr() as *mut u8, op.len as u32)
611 .offset(op.offset)
612 .build()
613 .user_data(op.user_data)
614 }
615 IoOpType::Write => {
616 opcode::Write::new(types::Fd(op.fd), op.buffer.as_ptr(), op.len as u32)
617 .offset(op.offset)
618 .build()
619 .user_data(op.user_data)
620 }
621 IoOpType::Fsync => {
622 opcode::Fsync::new(types::Fd(op.fd))
623 .build()
624 .user_data(op.user_data)
625 }
626 _ => continue, };
628
629 unsafe {
631 if sq.push(&sqe).is_err() {
632 break; }
634 }
635 }
636
637 sq.sync();
638 drop(sq);
639
640 uring.submit()?;
642
643 Ok(())
644 } else {
645 let completions: Vec<_> = ops
647 .into_iter()
648 .map(|op| self.simulate_execute(op))
649 .collect();
650 self.completions.lock().extend(completions);
651 Ok(())
652 }
653 }
654
655 fn wait_one(&mut self) -> io::Result<IoCompletion> {
656 if let Some(completion) = self.completions.lock().pop_front() {
658 return Ok(completion);
659 }
660
661 if let Some(ref mut uring) = self.uring {
663 uring.submit_and_wait(1)?;
664 let mut cq = uring.completion();
665 if let Some(cqe) = cq.next() {
666 let completion = if cqe.result() >= 0 {
667 IoCompletion::success(cqe.user_data(), cqe.result())
668 } else {
669 IoCompletion::failure(cqe.user_data(), cqe.result())
670 };
671 return Ok(completion);
672 }
673 }
674
675 Err(io::Error::new(io::ErrorKind::WouldBlock, "No completions"))
676 }
677
678 fn wait_all(&mut self) -> io::Result<Vec<IoCompletion>> {
679 let mut all_completions = self.completions.lock().drain(..).collect::<Vec<_>>();
680
681 if let Some(ref mut uring) = self.uring {
683 let mut cq = uring.completion();
684 while let Some(cqe) = cq.next() {
685 let completion = if cqe.result() >= 0 {
686 IoCompletion::success(cqe.user_data(), cqe.result())
687 } else {
688 IoCompletion::failure(cqe.user_data(), cqe.result())
689 };
690 all_completions.push(completion);
691 }
692 }
693
694 Ok(all_completions)
695 }
696
697 fn pending(&self) -> usize {
698 self.pending.lock().len()
699 }
700
701 fn is_uring_available(&self) -> bool {
702 self.uring_available
703 }
704}
705
706pub fn create_backend(config: IoUringConfig, stats: Arc<IoUringStats>) -> Box<dyn AsyncIoBackend> {
708 #[cfg(target_os = "linux")]
709 {
710 match LinuxIoUringBackend::new(config, stats.clone()) {
711 Ok(backend) if backend.is_uring_available() => {
712 tracing::info!("Using Linux io_uring backend");
713 Box::new(backend)
714 }
715 _ => {
716 tracing::info!("Falling back to sync I/O backend");
717 Box::new(SyncIoBackend::new(stats))
718 }
719 }
720 }
721
722 #[cfg(not(target_os = "linux"))]
723 {
724 let _ = config; tracing::info!("Using sync I/O backend (io_uring not available on this platform)");
726 Box::new(SyncIoBackend::new(stats))
727 }
728}
729
730pub struct BatchedWriter {
732 backend: Box<dyn AsyncIoBackend>,
733 pending_ops: Vec<IoOp>,
734 batch_size: usize,
735 next_user_data: AtomicU64,
736}
737
738impl BatchedWriter {
739 pub fn new(backend: Box<dyn AsyncIoBackend>, batch_size: usize) -> Self {
741 Self {
742 backend,
743 pending_ops: Vec::with_capacity(batch_size),
744 batch_size,
745 next_user_data: AtomicU64::new(0),
746 }
747 }
748
749 pub fn write(&mut self, fd: i32, data: Vec<u8>, offset: u64) -> u64 {
751 let user_data = self.next_user_data.fetch_add(1, Ordering::Relaxed);
752 let op = IoOp::write(fd, data, offset, user_data);
753 self.pending_ops.push(op);
754
755 if self.pending_ops.len() >= self.batch_size {
756 self.flush().ok();
757 }
758
759 user_data
760 }
761
762 pub fn flush(&mut self) -> io::Result<Vec<IoCompletion>> {
764 if self.pending_ops.is_empty() {
765 return Ok(Vec::new());
766 }
767
768 let ops = std::mem::take(&mut self.pending_ops);
769 self.backend.submit_batch(ops)?;
770 self.backend.wait_all()
771 }
772
773 pub fn pending(&self) -> usize {
775 self.pending_ops.len()
776 }
777}
778
779#[cfg(test)]
780mod tests {
781 use super::*;
782
783 #[test]
784 fn test_io_uring_config() {
785 let default = IoUringConfig::default();
786 assert_eq!(default.sq_entries, 256);
787 assert!(!default.sq_poll);
788
789 let high = IoUringConfig::high_throughput();
790 assert_eq!(high.sq_entries, 1024);
791 assert!(high.sq_poll);
792
793 let low = IoUringConfig::low_latency();
794 assert_eq!(low.sq_entries, 64);
795 assert!(low.sq_poll);
796 }
797
798 #[test]
799 fn test_io_op_creation() {
800 let read_op = IoOp::read(5, 1024, 512, 42);
801 assert_eq!(read_op.op_type, IoOpType::Read);
802 assert_eq!(read_op.fd, 5);
803 assert_eq!(read_op.offset, 1024);
804 assert_eq!(read_op.len, 512);
805 assert_eq!(read_op.user_data, 42);
806
807 let write_op = IoOp::write(6, vec![1, 2, 3], 2048, 99);
808 assert_eq!(write_op.op_type, IoOpType::Write);
809 assert_eq!(write_op.buffer, vec![1, 2, 3]);
810
811 let fsync_op = IoOp::fsync(7, 100);
812 assert_eq!(fsync_op.op_type, IoOpType::Fsync);
813 }
814
815 #[test]
816 fn test_io_completion() {
817 let success = IoCompletion::success(42, 1024);
818 assert!(success.success);
819 assert_eq!(success.bytes_transferred(), Some(1024));
820
821 let failure = IoCompletion::failure(42, -5);
822 assert!(!failure.success);
823 assert_eq!(failure.bytes_transferred(), None);
824 }
825
826 #[test]
827 fn test_io_uring_stats() {
828 let stats = IoUringStats::new();
829
830 stats.record_submit(5);
831 stats.record_completion(IoOpType::Read, 1024);
832 stats.record_completion(IoOpType::Write, 512);
833
834 let snapshot = stats.snapshot();
835 assert_eq!(snapshot.ops_submitted, 5);
836 assert_eq!(snapshot.ops_completed, 2);
837 assert_eq!(snapshot.bytes_read, 1024);
838 assert_eq!(snapshot.bytes_written, 512);
839 assert_eq!(snapshot.syscalls, 1);
840 assert_eq!(snapshot.ops_batched, 4);
841 }
842
843 #[test]
844 fn test_stats_efficiency() {
845 let stats = IoUringStats::new();
846
847 stats.record_submit(5);
849 stats.record_submit(5);
850
851 for _ in 0..10 {
852 stats.record_completion(IoOpType::Write, 100);
853 }
854
855 let snapshot = stats.snapshot();
856 assert!((snapshot.batching_efficiency() - 5.0).abs() < 0.01);
857 assert!((snapshot.bytes_per_syscall() - 500.0).abs() < 0.01);
858 }
859
860 #[test]
861 fn test_sync_backend() {
862 use tempfile::NamedTempFile;
863
864 let stats = IoUringStats::new();
865 let backend = SyncIoBackend::new(stats.clone());
866
867 assert!(!backend.is_uring_available());
868 assert_eq!(backend.pending(), 0);
869
870 let mut temp = NamedTempFile::new().unwrap();
872 temp.write_all(b"hello world").unwrap();
873 temp.flush().unwrap();
874
875 let snapshot = stats.snapshot();
877 assert_eq!(snapshot.ops_submitted, 0);
878 }
879
880 #[test]
881 fn test_create_backend() {
882 let stats = IoUringStats::new();
883 let config = IoUringConfig::default();
884 let backend = create_backend(config, stats);
885
886 #[cfg(not(target_os = "linux"))]
888 assert!(!backend.is_uring_available());
889
890 assert_eq!(backend.pending(), 0);
891 }
892
893 #[test]
894 fn test_batched_writer() {
895 let stats = IoUringStats::new();
896 let backend = Box::new(SyncIoBackend::new(stats));
897 let writer = BatchedWriter::new(backend, 10);
898
899 assert_eq!(writer.pending(), 0);
900
901 }
904
905 #[cfg(target_os = "linux")]
906 #[test]
907 fn test_linux_uring_check() {
908 let stats = IoUringStats::new();
909 let config = IoUringConfig::default();
910 let backend = LinuxIoUringBackend::new(config, stats).unwrap();
911
912 println!("io_uring available: {}", backend.is_uring_available());
914 }
915}