1#![cfg(target_os = "linux")]
12#![allow(warnings)]
13
14use std::cell::UnsafeCell;
15use std::os::fd::{AsRawFd, RawFd};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
18use std::time::Duration;
19
20use crate::driver::{CompletionEntry, Driver, ERROR_TRANSPORT, Interest, SubmitEntry};
21
22const MIN_IOURING_SIZE: u32 = 32;
24
25const MAX_CQES: u32 = 256;
28
29#[repr(C)]
31#[derive(Clone, Copy, Debug, Default)]
32struct IoUringParams {
33 sq_entries: u32,
35 cq_entries: u32,
37 flags: u32,
39 _resv: [u32; 5],
41 sq_off: IoUringOffsets,
43 cq_off: IoUringOffsets,
45}
46
47#[repr(C)]
50#[derive(Clone, Copy, Debug, Default)]
51struct IoUringOffsets {
52 head: u32,
54 tail: u32,
56 ring_mask: u32,
58 ring_entries: u32,
60 flags: u32,
62 dropped: u32,
64 array: u32,
66 overflow: u32,
68 cqes: u32,
70 _resv: [u32; 2],
72}
73
74#[repr(C)]
77#[derive(Clone, Copy)]
78struct SubmissionQueueEntry {
79 opcode: u8,
81 flags: u8,
83 ioprio: u16,
85 fd: i32,
87 offset: u64,
89 addr: u64,
91 len: u32,
93 rw_flags: i32,
95 user_data: u64,
97 buf_index: u16,
99 personality: u16,
101 _spare: [u64; 3],
103}
104
105#[repr(C)]
108#[derive(Clone, Copy)]
109struct CompletionQueueEntry {
110 user_data: u64,
112 res: i32,
114 flags: u32,
116 _resv: [u64; 2],
118}
119
120struct SubmissionQueue {
123 head: *const u32,
125 tail: *const u32,
127 ring_mask: *const u32,
129 ring_entries: *const u32,
131 flags: *const u32,
133 array: *mut u32,
135 sqes: *mut SubmissionQueueEntry,
137 ring_mask_value: u32,
139 entries: u32,
141}
142
143unsafe impl Send for SubmissionQueue {}
146unsafe impl Sync for SubmissionQueue {}
147
148struct CompletionQueue {
151 head: *const u32,
153 tail: *const u32,
155 ring_mask: *const u32,
157 ring_entries: *const u32,
159 overflow: *const u32,
161 cqes: *const CompletionQueueEntry,
163 ring_mask_value: u32,
165}
166
167unsafe impl Send for CompletionQueue {}
170unsafe impl Sync for CompletionQueue {}
171
172struct IoUringState {
175 sq_head: AtomicU32,
177 sq_tail: AtomicU32,
179 cq_head: AtomicU32,
181 cq_tail: AtomicU32,
183 sq_len: AtomicUsize,
185}
186
187pub struct IoUringDriver {
200 ring_fd: RawFd,
202 sq_ring: *mut u8,
204 cq_ring: *mut u8,
206 sqes: *mut SubmissionQueueEntry,
208 sq: SubmissionQueue,
210 cq: CompletionQueue,
212 capacity: usize,
214 state: Arc<IoUringState>,
216 submit_queue: UnsafeCell<Vec<SubmitEntry>>,
218 completion_queue: UnsafeCell<Vec<Option<CompletionEntry>>>,
220 sq_ring_mmap_size: usize,
222 cq_ring_mmap_size: usize,
224 #[cfg(debug_assertions)]
227 owner_thread: std::thread::ThreadId,
228}
229
230unsafe impl Send for IoUringDriver {}
237
238unsafe impl Sync for IoUringDriver {}
247
248impl IoUringDriver {
249 #[inline]
254 #[cfg(debug_assertions)]
255 fn assert_owner(&self) {
256 assert_eq!(
257 std::thread::current().id(),
258 self.owner_thread,
259 "IoUringDriver accessed from wrong thread: thread-per-core violation"
260 );
261 }
262
263 #[inline]
265 #[cfg(not(debug_assertions))]
266 fn assert_owner(&self) {}
267
268 pub fn new() -> std::io::Result<Self> {
276 Self::with_config(crate::driver::DriverConfig::default())
277 }
278
279 pub fn with_config(config: crate::driver::DriverConfig) -> std::io::Result<Self> {
290 let entries = config.entries.max(MIN_IOURING_SIZE);
291
292 let mut params = IoUringParams {
295 sq_entries: entries,
296 cq_entries: entries,
297 flags: 0, ..Default::default()
299 };
300
301 let ring_fd = unsafe {
304 libc::syscall(
305 425, entries as libc::c_long,
307 &mut params as *mut _ as libc::c_long,
308 ) as RawFd
309 };
310
311 if ring_fd < 0 {
312 return Err(std::io::Error::last_os_error());
313 }
314
315 let sq_ring_size = unsafe {
318 (params.sq_off.array as usize) + (params.sq_entries as usize) * 4
320 };
321
322 let cq_ring_size = unsafe {
323 (params.cq_off.array as usize) + (params.cq_entries as usize) * 16
325 };
326
327 let sqes_size = (params.sq_entries as usize) * size_of::<SubmissionQueueEntry>();
328
329 let sq_ring = unsafe {
332 libc::mmap(
333 std::ptr::null_mut(),
334 sq_ring_size,
335 libc::PROT_READ | libc::PROT_WRITE,
336 libc::MAP_SHARED | libc::MAP_POPULATE,
337 ring_fd,
338 0, ) as *mut u8
340 };
341
342 if sq_ring as *mut libc::c_void == libc::MAP_FAILED {
343 unsafe { libc::close(ring_fd) };
344 return Err(std::io::Error::last_os_error());
345 }
346
347 let cq_ring = unsafe {
348 libc::mmap(
349 std::ptr::null_mut(),
350 cq_ring_size,
351 libc::PROT_READ | libc::PROT_WRITE,
352 libc::MAP_SHARED | libc::MAP_POPULATE,
353 ring_fd,
354 sq_ring_size as libc::off_t, ) as *mut u8
356 };
357
358 if cq_ring as *mut libc::c_void == libc::MAP_FAILED {
359 unsafe {
360 libc::munmap(sq_ring as *mut libc::c_void, sq_ring_size);
361 libc::close(ring_fd);
362 }
363 return Err(std::io::Error::last_os_error());
364 }
365
366 let sqes = unsafe {
367 libc::mmap(
368 std::ptr::null_mut(),
369 sqes_size,
370 libc::PROT_READ | libc::PROT_WRITE,
371 libc::MAP_SHARED | libc::MAP_POPULATE,
372 ring_fd,
373 0x8000_0000_usize as libc::off_t, ) as *mut SubmissionQueueEntry
375 };
376
377 if sqes as *mut libc::c_void == libc::MAP_FAILED {
378 unsafe {
379 libc::munmap(sq_ring as *mut libc::c_void, sq_ring_size);
380 libc::munmap(cq_ring as *mut libc::c_void, cq_ring_size);
381 libc::close(ring_fd);
382 }
383 return Err(std::io::Error::last_os_error());
384 }
385
386 let sq = unsafe {
389 let sq_ptr = sq_ring as *const u8;
390
391 SubmissionQueue {
392 head: sq_ptr.add(params.sq_off.head as usize) as *const u32,
393 tail: sq_ptr.add(params.sq_off.tail as usize) as *const u32,
394 ring_mask: sq_ptr.add(params.sq_off.ring_mask as usize) as *const u32,
395 ring_entries: sq_ptr.add(params.sq_off.ring_entries as usize) as *const u32,
396 flags: sq_ptr.add(params.sq_off.flags as usize) as *const u32,
397 array: sq_ptr.add(params.sq_off.array as usize) as *mut u32,
398 sqes: sqes as *mut SubmissionQueueEntry,
399 ring_mask_value: *(sq_ptr.add(params.sq_off.ring_mask as usize) as *const u32),
400 entries: params.sq_entries,
401 }
402 };
403
404 let cq = unsafe {
407 let cq_ptr = cq_ring as *const u8;
408
409 CompletionQueue {
410 head: cq_ptr.add(params.cq_off.head as usize) as *const u32,
411 tail: cq_ptr.add(params.cq_off.tail as usize) as *const u32,
412 ring_mask: cq_ptr.add(params.cq_off.ring_mask as usize) as *const u32,
413 ring_entries: cq_ptr.add(params.cq_off.ring_entries as usize) as *const u32,
414 overflow: cq_ptr.add(params.cq_off.overflow as usize) as *const u32,
415 cqes: cq_ptr.add(params.cq_off.cqes as usize) as *const CompletionQueueEntry,
416 ring_mask_value: *(cq_ptr.add(params.cq_off.ring_mask as usize) as *const u32),
417 }
418 };
419
420 let capacity = entries as usize;
421
422 Ok(Self {
423 ring_fd,
424 sq_ring,
425 cq_ring,
426 sqes,
427 sq,
428 cq,
429 capacity,
430 state: Arc::new(IoUringState {
431 sq_head: AtomicU32::new(0),
432 sq_tail: AtomicU32::new(0),
433 cq_head: AtomicU32::new(0),
434 cq_tail: AtomicU32::new(0),
435 sq_len: AtomicUsize::new(0),
436 }),
437 submit_queue: UnsafeCell::new(vec![SubmitEntry::new(-1, 0, 0); capacity]),
438 completion_queue: UnsafeCell::new(vec![None; capacity]),
439 sq_ring_mmap_size: sq_ring_size,
440 cq_ring_mmap_size: cq_ring_size,
441 #[cfg(debug_assertions)]
442 owner_thread: std::thread::current().id(),
443 })
444 }
445
446 #[inline]
449 fn sq_pos(&self, index: u32) -> u32 {
450 index & self.sq.ring_mask_value
451 }
452
453 #[inline]
456 fn cq_pos(&self, index: u32) -> u32 {
457 index & self.cq.ring_mask_value
458 }
459
460 fn submit_to_kernel(&self) -> std::io::Result<usize> {
463 let head = unsafe { *self.sq.head };
464 let tail = self.state.sq_tail.load(Ordering::Acquire);
465 let to_submit = tail - head;
466
467 if to_submit == 0 {
468 return Ok(0);
469 }
470
471 unsafe {
474 *(self.sq.tail as *mut u32) = tail;
475 }
476
477 let result = unsafe {
480 libc::syscall(
481 426, self.ring_fd as libc::c_long,
483 to_submit as libc::c_long,
484 0, 1, std::ptr::null_mut::<libc::sigset_t>(),
487 ) as libc::c_long
488 };
489
490 if result < 0 {
491 Err(std::io::Error::last_os_error())
492 } else {
493 Ok(result as usize)
494 }
495 }
496
497 fn get_free_sqe(&self) -> Option<*mut SubmissionQueueEntry> {
500 let head = unsafe { *self.sq.head };
501 let tail = self.state.sq_tail.load(Ordering::Acquire);
502 let next_tail = tail + 1;
503
504 if next_tail - head >= self.sq.entries {
507 return None;
508 }
509
510 let index = self.sq_pos(tail);
511 unsafe { Some(self.sq.sqes.add(index as usize)) }
512 }
513}
514
515impl Drop for IoUringDriver {
516 fn drop(&mut self) {
517 let sqes_size = self.capacity * size_of::<SubmissionQueueEntry>();
520
521 unsafe {
522 libc::munmap(self.sq_ring as *mut libc::c_void, self.sq_ring_mmap_size);
523 libc::munmap(self.cq_ring as *mut libc::c_void, self.cq_ring_mmap_size);
524 libc::munmap(self.sqes as *mut libc::c_void, sqes_size);
525 libc::close(self.ring_fd);
526 }
527 }
528}
529
530impl AsRawFd for IoUringDriver {
531 fn as_raw_fd(&self) -> RawFd {
532 self.ring_fd
533 }
534}
535
536impl Driver for IoUringDriver {
537 fn submit(&self) -> std::io::Result<usize> {
538 let mut submitted = 0;
539
540 self.assert_owner();
543 let len = self.state.sq_len.load(Ordering::Acquire);
544 for i in 0..len {
545 let submit_queue = unsafe { &*self.submit_queue.get() };
546 let entry = &submit_queue[i];
547
548 if entry.fd >= 0 {
549 if let Some(sqe) = self.get_free_sqe() {
550 unsafe {
551 (*sqe).opcode = entry.opcode;
552 (*sqe).flags = 0;
553 (*sqe).ioprio = 0;
554 (*sqe).fd = entry.fd;
555 (*sqe).offset = entry.offset as u64;
556 (*sqe).addr = entry.buf_ptr.map_or(0, |p| p.as_ptr() as u64);
557 (*sqe).len = entry.buf_len;
558 (*sqe).rw_flags = 0;
559 (*sqe).user_data = entry.user_data;
560 (*sqe).buf_index = 0;
561 (*sqe).personality = 0;
562
563 let tail = self.state.sq_tail.load(Ordering::Acquire);
566 let index = self.sq_pos(tail);
567 *self.sq.array.add(index as usize) = index;
568
569 self.state.sq_tail.store(tail + 1, Ordering::Release);
572 }
573
574 submitted += 1;
575 }
576 }
577 }
578
579 self.state.sq_len.store(0, Ordering::Release);
582
583 let _kernel_submitted = self.submit_to_kernel()?;
586
587 Ok(submitted)
588 }
589
590 fn wait(&self) -> std::io::Result<usize> {
591 self.wait_timeout(Duration::from_secs(1)).map(|(n, _)| n)
592 }
593
594 fn wait_timeout(&self, duration: Duration) -> std::io::Result<(usize, bool)> {
595 let ts = libc::timespec {
598 tv_sec: duration.as_secs() as libc::time_t,
599 tv_nsec: duration.subsec_nanos() as libc::c_long,
600 };
601
602 let result = unsafe {
603 libc::syscall(
604 426, self.ring_fd as libc::c_long,
606 0, 1, 2, &ts as *const _ as *const libc::sigset_t,
610 ) as libc::c_long
611 };
612
613 if result < 0 {
614 return Err(std::io::Error::last_os_error());
615 }
616
617 self.assert_owner();
620 let mut completed = 0;
621 let head = self.state.cq_head.load(Ordering::Acquire);
622 let tail = unsafe { *self.cq.tail };
623
624 while head != tail {
625 let index = self.cq_pos(head);
626 let cqe = unsafe { &*self.cq.cqes.add(index as usize) };
627
628 unsafe {
631 let completion_queue = &mut *self.completion_queue.get();
632 let pos = self.state.cq_tail.load(Ordering::Acquire) as usize % self.capacity;
633 completion_queue[pos] = Some(CompletionEntry {
634 user_data: (*cqe).user_data,
635 result: if (*cqe).res < 0 {
636 ERROR_TRANSPORT
637 } else {
638 (*cqe).res
639 },
640 flags: (*cqe).flags,
641 });
642 self.state.cq_tail.fetch_add(1, Ordering::Release);
643 }
644
645 completed += 1;
646 unsafe {
647 *(self.cq.head as *mut u32) = head + 1;
648 }
649 }
650
651 self.state.cq_head.store(tail, Ordering::Release);
652
653 let timed_out = completed == 0;
656
657 Ok((completed, timed_out))
658 }
659
660 fn get_submission(&self) -> Option<&mut SubmitEntry> {
661 self.assert_owner();
662 let len = self.state.sq_len.load(Ordering::Acquire);
663
664 if len >= self.capacity {
665 return None;
666 }
667
668 self.state.sq_len.fetch_add(1, Ordering::Release);
669
670 unsafe {
671 let submit_queue = &mut *self.submit_queue.get();
672 Some(&mut submit_queue[len])
673 }
674 }
675
676 fn get_completion(&self) -> Option<&CompletionEntry> {
677 self.assert_owner();
678 let head = self.state.cq_head.load(Ordering::Acquire);
679 let tail = self.state.cq_tail.load(Ordering::Acquire);
680
681 if head == tail {
682 return None;
683 }
684
685 unsafe {
686 let completion_queue = &*self.completion_queue.get();
687 let pos = head as usize % self.capacity;
688 completion_queue[pos].as_ref()
689 }
690 }
691
692 fn advance_completion(&self) {
693 self.assert_owner();
694 let head = self.state.cq_head.load(Ordering::Acquire);
695 let tail = self.state.cq_tail.load(Ordering::Acquire);
696
697 if head != tail {
698 unsafe {
699 let completion_queue = &mut *self.completion_queue.get();
700 let pos = head as usize % self.capacity;
701 completion_queue[pos] = None;
702 }
703
704 self.state.cq_head.fetch_add(1, Ordering::Release);
705 }
706 }
707
708 fn register(&self, fd: RawFd, interest: Interest) -> std::io::Result<()> {
709 let mut events = 0i16;
715 if interest.readable {
716 events |= libc::POLLIN as i16;
717 }
718 if interest.writable {
719 events |= libc::POLLOUT as i16;
720 }
721
722 let sqe = self.get_free_sqe().ok_or_else(|| {
725 std::io::Error::new(std::io::ErrorKind::WouldBlock, "Submission queue full")
726 })?;
727
728 unsafe {
729 (*sqe).opcode = 6; (*sqe).fd = fd;
731 (*sqe).addr = events as u64;
732 (*sqe).len = 0;
733 (*sqe).user_data = fd as u64;
734
735 let tail = self.state.sq_tail.load(Ordering::Acquire);
738 let index = self.sq_pos(tail);
739 *self.sq.array.add(index as usize) = index;
740 self.state.sq_tail.store(tail + 1, Ordering::Release);
741 }
742
743 Ok(())
744 }
745
746 fn deregister(&self, fd: RawFd) -> std::io::Result<()> {
747 let sqe = self.get_free_sqe().ok_or_else(|| {
750 std::io::Error::new(std::io::ErrorKind::WouldBlock, "Submission queue full")
751 })?;
752
753 unsafe {
754 (*sqe).opcode = 7; (*sqe).fd = -1;
756 (*sqe).addr = fd as u64;
757 (*sqe).user_data = fd as u64;
758
759 let tail = self.state.sq_tail.load(Ordering::Acquire);
760 let index = self.sq_pos(tail);
761 *self.sq.array.add(index as usize) = index;
762 self.state.sq_tail.store(tail + 1, Ordering::Release);
763 }
764
765 Ok(())
766 }
767
768 fn modify(&self, fd: RawFd, interest: Interest) -> std::io::Result<()> {
769 self.deregister(fd)?;
772 self.register(fd, interest)
773 }
774
775 fn submission_capacity(&self) -> usize {
776 self.capacity
777 }
778
779 fn completion_capacity(&self) -> usize {
780 self.capacity
781 }
782
783 fn supports_operation(&self, opcode: u8) -> bool {
784 matches!(
787 opcode,
788 crate::driver::opcode::READ
789 | crate::driver::opcode::WRITE
790 | crate::driver::opcode::FSYNC
791 | crate::driver::opcode::CLOSE
792 )
793 }
794}
795
796#[cfg(test)]
797mod tests {
798 use super::*;
799
800 #[test]
801 fn test_iouring_driver_creation() {
802 let driver = IoUringDriver::new();
805 let _ = driver;
808 }
809
810 #[test]
811 fn test_iouring_params_size() {
812 assert_eq!(size_of::<IoUringParams>(), 40);
813 }
814
815 #[test]
816 fn test_submission_queue_entry_size() {
817 assert_eq!(size_of::<SubmissionQueueEntry>(), 64);
818 }
819
820 #[test]
821 fn test_completion_queue_entry_size() {
822 assert_eq!(size_of::<CompletionQueueEntry>(), 16);
823 }
824}