Skip to main content

hiver_runtime/driver/
iouring.rs

1//! io_uring driver implementation for Linux 5.1+
2//! Linux 5.1+的io_uring驱动实现
3//!
4//! This module provides an io_uring-based I/O driver for Linux systems.
5//! io_uring is the fastest I/O mechanism available on Linux, providing
6//! excellent performance through shared memory queues and zero-copy I/O.
7//!
8//! 本模块为Linux系统提供基于io_uring的I/O驱动。
9//! io_uring是Linux上最快的I/O机制,通过共享内存队列和零拷贝I/O提供卓越的性能。
10
11#![cfg(target_os = "linux")]
12#![allow(warnings)]
13
14use std::cell::UnsafeCell;
15use std::os::fd::{AsRawFd, RawFd};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
18use std::time::Duration;
19
20use crate::driver::{CompletionEntry, Driver, ERROR_TRANSPORT, Interest, SubmitEntry};
21
22/// Minimum io_uring instance size / 最小io_uring实例大小
23const MIN_IOURING_SIZE: u32 = 32;
24
25/// Maximum entries in submission queue (for CQE overflow handling)
26/// 提交队列中的最大条目数(用于CQE溢出处理)
27const MAX_CQES: u32 = 256;
28
29/// io_uring setup flags / io_uring设置标志
30#[repr(C)]
31#[derive(Clone, Copy, Debug, Default)]
32struct IoUringParams {
33    /// Submission queue entries / 提交队列条目数
34    sq_entries: u32,
35    /// Completion queue entries / 完成队列条目数
36    cq_entries: u32,
37    /// Flags / 标志
38    flags: u32,
39    /// Reserved fields / 保留字段
40    _resv: [u32; 5],
41    /// Submission queue ring buffer offset / 提交队列环形缓冲区偏移
42    sq_off: IoUringOffsets,
43    /// Completion queue ring buffer offset / 完成队列环形缓冲区偏移
44    cq_off: IoUringOffsets,
45}
46
47/// io_uring offsets for ring buffer access
48/// io_uring环形缓冲区访问的偏移量
49#[repr(C)]
50#[derive(Clone, Copy, Debug, Default)]
51struct IoUringOffsets {
52    /// Head index / 头索引
53    head: u32,
54    /// Tail index / 尾索引
55    tail: u32,
56    /// Ring mask / 环形掩码
57    ring_mask: u32,
58    /// Ring entries count / 环形条目数
59    ring_entries: u32,
60    /// Flags / 标志
61    flags: u32,
62    /// Dropped entries / 丢弃的条目
63    dropped: u32,
64    /// Array offset / 数组偏移
65    array: u32,
66    /// Overflow count / 溢出计数
67    overflow: u32,
68    /// CQEs offset / 完成队列条目偏移
69    cqes: u32,
70    /// Reserved fields / 保留字段
71    _resv: [u32; 2],
72}
73
74/// io_uring submission queue entry (SQE)
75/// io_uring提交队列条目(SQE)
76#[repr(C)]
77#[derive(Clone, Copy)]
78struct SubmissionQueueEntry {
79    /// Opcode / 操作码
80    opcode: u8,
81    /// Flags / 标志
82    flags: u8,
83    /// I/O priority / I/O优先级
84    ioprio: u16,
85    /// File descriptor / 文件描述符
86    fd: i32,
87    /// Offset / 偏移量
88    offset: u64,
89    /// Address / 地址
90    addr: u64,
91    /// Length / 长度
92    len: u32,
93    /// Flags for operation / 操作标志
94    rw_flags: i32,
95    /// User data / 用户数据
96    user_data: u64,
97    /// Buffer select / 缓冲区选择
98    buf_index: u16,
99    /// Personality / 个性
100    personality: u16,
101    /// Spare fields / 备用字段
102    _spare: [u64; 3],
103}
104
105/// io_uring completion queue entry (CQE)
106/// io_uring完成队列条目(CQE)
107#[repr(C)]
108#[derive(Clone, Copy)]
109struct CompletionQueueEntry {
110    /// User data / 用户数据
111    user_data: u64,
112    /// Result / 结果
113    res: i32,
114    /// Flags / 标志
115    flags: u32,
116    /// Reserved fields / 保留字段
117    _resv: [u64; 2],
118}
119
120/// io_uring submission queue
121/// io_uring提交队列
122struct SubmissionQueue {
123    /// Head index / 头索引
124    head: *const u32,
125    /// Tail index / 尾索引
126    tail: *const u32,
127    /// Ring mask / 环形掩码
128    ring_mask: *const u32,
129    /// Ring entries / 环形条目数
130    ring_entries: *const u32,
131    /// Flags / 标志
132    flags: *const u32,
133    /// Array / 数组
134    array: *mut u32,
135    /// Submission queue entries / 提交队列条目
136    sqes: *mut SubmissionQueueEntry,
137    /// Ring mask value / 环形掩码值
138    ring_mask_value: u32,
139    /// Number of entries / 条目数
140    entries: u32,
141}
142
143// SAFETY: SubmissionQueue uses raw pointers for direct memory access
144// SubmissionQueue使用原始指针进行直接内存访问
145unsafe impl Send for SubmissionQueue {}
146unsafe impl Sync for SubmissionQueue {}
147
148/// io_uring completion queue
149/// io_uring完成队列
150struct CompletionQueue {
151    /// Head index / 头索引
152    head: *const u32,
153    /// Tail index / 尾索引
154    tail: *const u32,
155    /// Ring mask / 环形掩码
156    ring_mask: *const u32,
157    /// Ring entries / 环形条目数
158    ring_entries: *const u32,
159    /// Overflow / 溢出
160    overflow: *const u32,
161    /// Completion queue entries / 完成队列条目
162    cqes: *const CompletionQueueEntry,
163    /// Ring mask value / 环形掩码值
164    ring_mask_value: u32,
165}
166
167// SAFETY: CompletionQueue uses raw pointers for read-only memory access
168// CompletionQueue使用原始指针进行只读内存访问
169unsafe impl Send for CompletionQueue {}
170unsafe impl Sync for CompletionQueue {}
171
172/// Internal state for the io_uring driver
173/// io_uring driver的内部状态
174struct IoUringState {
175    /// Submission queue head index / 提交队列头索引
176    sq_head: AtomicU32,
177    /// Submission queue tail index / 提交队列尾索引
178    sq_tail: AtomicU32,
179    /// Completion queue head index / 完成队列头索引
180    cq_head: AtomicU32,
181    /// Completion queue tail index / 完成队列尾索引
182    cq_tail: AtomicU32,
183    /// Submission queue length / 提交队列长度
184    sq_len: AtomicUsize,
185}
186
187/// io_uring-based I/O driver for Linux
188/// Linux的基于io_uring的I/O driver
189///
190/// Uses io_uring for high-performance asynchronous I/O.
191/// 使用io_uring实现高性能异步I/O。
192///
193/// io_uring provides:
194/// io_uring提供:
195/// - Shared memory queues for reduced syscall overhead / 共享内存队列减少系统调用开销
196/// - Zero-copy I/O support / 零拷贝I/O支持
197/// - Batched operation submission / 批量操作提交
198/// - Efficient poll-based I/O / 高效的基于轮询的I/O
199pub struct IoUringDriver {
200    /// io_uring instance file descriptor / io_uring实例文件描述符
201    ring_fd: RawFd,
202    /// Submission queue ring buffer memory (mapped) / 提交队列环形缓冲区内存(映射)
203    sq_ring: *mut u8,
204    /// Completion queue ring buffer memory (mapped) / 完成队列环形缓冲区内存(映射)
205    cq_ring: *mut u8,
206    /// Submission queue entries memory (mapped) / 提交队列条目内存(映射)
207    sqes: *mut SubmissionQueueEntry,
208    /// Submission queue / 提交队列
209    sq: SubmissionQueue,
210    /// Completion queue / 完成队列
211    cq: CompletionQueue,
212    /// Queue capacity / 队列容量
213    capacity: usize,
214    /// Internal state / 内部状态
215    state: Arc<IoUringState>,
216    /// Submission queue / 提交队列(用于应用层)
217    submit_queue: UnsafeCell<Vec<SubmitEntry>>,
218    /// Completion queue / 完成队列(用于应用层)
219    completion_queue: UnsafeCell<Vec<Option<CompletionEntry>>>,
220    /// Actual mmap size for submission queue ring / 提交队列环形缓冲区的实际 mmap 尺寸
221    sq_ring_mmap_size: usize,
222    /// Actual mmap size for completion queue ring / 完成队列环形缓冲区的实际 mmap 尺寸
223    cq_ring_mmap_size: usize,
224    /// Thread ID that created this driver (for debug safety checks)
225    /// 创建此驱动程序的线程 ID(用于调试安全检查)
226    #[cfg(debug_assertions)]
227    owner_thread: std::thread::ThreadId,
228}
229
230// SAFETY: IoUringDriver can be sent between threads.
231// The internal UnsafeCell fields are only accessed from the owning thread
232// in the thread-per-core model (each core has its own driver instance).
233// IoUringDriver 可以在线程间发送。
234// 内部 UnsafeCell 字段仅在 thread-per-core 模型中由所属线程访问
235// (每个核心有自己的驱动实例)。
236unsafe impl Send for IoUringDriver {}
237
238// SAFETY: IoUringDriver uses atomic operations for shared state (sq_head, sq_tail, etc.).
239// The UnsafeCell<Vec<...>> fields (submit_queue, completion_queue) are only accessed
240// from the driver's owning thread in the thread-per-core architecture.
241// Debug builds include thread-id checks to catch misuse.
242// IoUringDriver 使用原子操作管理共享状态(sq_head, sq_tail 等)。
243// UnsafeCell<Vec<...>> 字段(submit_queue, completion_queue)仅在
244// thread-per-core 架构中由驱动所属线程访问。
245// Debug 构建包含线程 ID 检查以捕获误用。
246unsafe impl Sync for IoUringDriver {}
247
248impl IoUringDriver {
249    /// Assert that the caller is the thread that created this driver.
250    /// In debug builds, panics if called from a different thread.
251    /// 断言调用者是创建此驱动程序的线程。
252    /// Debug 构建中,如果从不同线程调用则 panic。
253    #[inline]
254    #[cfg(debug_assertions)]
255    fn assert_owner(&self) {
256        assert_eq!(
257            std::thread::current().id(),
258            self.owner_thread,
259            "IoUringDriver accessed from wrong thread: thread-per-core violation"
260        );
261    }
262
263    /// No-op in release builds / Release 构建中无操作
264    #[inline]
265    #[cfg(not(debug_assertions))]
266    fn assert_owner(&self) {}
267
268    /// Create a new io_uring driver with default configuration
269    /// 使用默认配置创建新的io_uring driver
270    ///
271    /// # Errors / 错误
272    ///
273    /// Returns an error if io_uring instance creation fails.
274    /// 如果io_uring实例创建失败则返回错误。
275    pub fn new() -> std::io::Result<Self> {
276        Self::with_config(crate::driver::DriverConfig::default())
277    }
278
279    /// Create a new io_uring driver with the specified configuration
280    /// 使用指定配置创建新的io_uring driver
281    ///
282    /// # Errors / 错误
283    ///
284    /// Returns an error if:
285    /// 返回错误如果:
286    /// - The configuration is invalid / 配置无效
287    /// - io_uring setup fails / io_uring设置失败
288    /// - Memory mapping fails / 内存映射失败
289    pub fn with_config(config: crate::driver::DriverConfig) -> std::io::Result<Self> {
290        let entries = config.entries.max(MIN_IOURING_SIZE);
291
292        // Setup io_uring parameters
293        // 设置io_uring参数
294        let mut params = IoUringParams {
295            sq_entries: entries,
296            cq_entries: entries,
297            flags: 0, // No flags for now / 目前无标志
298            ..Default::default()
299        };
300
301        // Create io_uring instance
302        // 创建io_uring实例
303        let ring_fd = unsafe {
304            libc::syscall(
305                425, // __NR_io_uring_setup
306                entries as libc::c_long,
307                &mut params as *mut _ as libc::c_long,
308            ) as RawFd
309        };
310
311        if ring_fd < 0 {
312            return Err(std::io::Error::last_os_error());
313        }
314
315        // Calculate ring buffer sizes
316        // 计算环形缓冲区大小
317        let sq_ring_size = unsafe {
318            // Size = sq_off.array + sq_entries * sizeof(u32)
319            (params.sq_off.array as usize) + (params.sq_entries as usize) * 4
320        };
321
322        let cq_ring_size = unsafe {
323            // Size = cq_off.cqes + cq_entries * sizeof(cqe)
324            (params.cq_off.array as usize) + (params.cq_entries as usize) * 16
325        };
326
327        let sqes_size = (params.sq_entries as usize) * size_of::<SubmissionQueueEntry>();
328
329        // Map memory regions
330        // 映射内存区域
331        let sq_ring = unsafe {
332            libc::mmap(
333                std::ptr::null_mut(),
334                sq_ring_size,
335                libc::PROT_READ | libc::PROT_WRITE,
336                libc::MAP_SHARED | libc::MAP_POPULATE,
337                ring_fd,
338                0, // Submission queue ring is at offset 0
339            ) as *mut u8
340        };
341
342        if sq_ring as *mut libc::c_void == libc::MAP_FAILED {
343            unsafe { libc::close(ring_fd) };
344            return Err(std::io::Error::last_os_error());
345        }
346
347        let cq_ring = unsafe {
348            libc::mmap(
349                std::ptr::null_mut(),
350                cq_ring_size,
351                libc::PROT_READ | libc::PROT_WRITE,
352                libc::MAP_SHARED | libc::MAP_POPULATE,
353                ring_fd,
354                sq_ring_size as libc::off_t, // Completion queue ring is after submission queue
355            ) as *mut u8
356        };
357
358        if cq_ring as *mut libc::c_void == libc::MAP_FAILED {
359            unsafe {
360                libc::munmap(sq_ring as *mut libc::c_void, sq_ring_size);
361                libc::close(ring_fd);
362            }
363            return Err(std::io::Error::last_os_error());
364        }
365
366        let sqes = unsafe {
367            libc::mmap(
368                std::ptr::null_mut(),
369                sqes_size,
370                libc::PROT_READ | libc::PROT_WRITE,
371                libc::MAP_SHARED | libc::MAP_POPULATE,
372                ring_fd,
373                0x8000_0000_usize as libc::off_t, // SQEs are at this offset (IORING_OFF_SQES)
374            ) as *mut SubmissionQueueEntry
375        };
376
377        if sqes as *mut libc::c_void == libc::MAP_FAILED {
378            unsafe {
379                libc::munmap(sq_ring as *mut libc::c_void, sq_ring_size);
380                libc::munmap(cq_ring as *mut libc::c_void, cq_ring_size);
381                libc::close(ring_fd);
382            }
383            return Err(std::io::Error::last_os_error());
384        }
385
386        // Setup submission queue
387        // 设置提交队列
388        let sq = unsafe {
389            let sq_ptr = sq_ring as *const u8;
390
391            SubmissionQueue {
392                head: sq_ptr.add(params.sq_off.head as usize) as *const u32,
393                tail: sq_ptr.add(params.sq_off.tail as usize) as *const u32,
394                ring_mask: sq_ptr.add(params.sq_off.ring_mask as usize) as *const u32,
395                ring_entries: sq_ptr.add(params.sq_off.ring_entries as usize) as *const u32,
396                flags: sq_ptr.add(params.sq_off.flags as usize) as *const u32,
397                array: sq_ptr.add(params.sq_off.array as usize) as *mut u32,
398                sqes: sqes as *mut SubmissionQueueEntry,
399                ring_mask_value: *(sq_ptr.add(params.sq_off.ring_mask as usize) as *const u32),
400                entries: params.sq_entries,
401            }
402        };
403
404        // Setup completion queue
405        // 设置完成队列
406        let cq = unsafe {
407            let cq_ptr = cq_ring as *const u8;
408
409            CompletionQueue {
410                head: cq_ptr.add(params.cq_off.head as usize) as *const u32,
411                tail: cq_ptr.add(params.cq_off.tail as usize) as *const u32,
412                ring_mask: cq_ptr.add(params.cq_off.ring_mask as usize) as *const u32,
413                ring_entries: cq_ptr.add(params.cq_off.ring_entries as usize) as *const u32,
414                overflow: cq_ptr.add(params.cq_off.overflow as usize) as *const u32,
415                cqes: cq_ptr.add(params.cq_off.cqes as usize) as *const CompletionQueueEntry,
416                ring_mask_value: *(cq_ptr.add(params.cq_off.ring_mask as usize) as *const u32),
417            }
418        };
419
420        let capacity = entries as usize;
421
422        Ok(Self {
423            ring_fd,
424            sq_ring,
425            cq_ring,
426            sqes,
427            sq,
428            cq,
429            capacity,
430            state: Arc::new(IoUringState {
431                sq_head: AtomicU32::new(0),
432                sq_tail: AtomicU32::new(0),
433                cq_head: AtomicU32::new(0),
434                cq_tail: AtomicU32::new(0),
435                sq_len: AtomicUsize::new(0),
436            }),
437            submit_queue: UnsafeCell::new(vec![SubmitEntry::new(-1, 0, 0); capacity]),
438            completion_queue: UnsafeCell::new(vec![None; capacity]),
439            sq_ring_mmap_size: sq_ring_size,
440            cq_ring_mmap_size: cq_ring_size,
441            #[cfg(debug_assertions)]
442            owner_thread: std::thread::current().id(),
443        })
444    }
445
446    /// Get the current submission queue position
447    /// 获取当前提交队列位置
448    #[inline]
449    fn sq_pos(&self, index: u32) -> u32 {
450        index & self.sq.ring_mask_value
451    }
452
453    /// Get the current completion queue position
454    /// 获取当前完成队列位置
455    #[inline]
456    fn cq_pos(&self, index: u32) -> u32 {
457        index & self.cq.ring_mask_value
458    }
459
460    /// Submit operations to the kernel
461    /// 向内核提交操作
462    fn submit_to_kernel(&self) -> std::io::Result<usize> {
463        let head = unsafe { *self.sq.head };
464        let tail = self.state.sq_tail.load(Ordering::Acquire);
465        let to_submit = tail - head;
466
467        if to_submit == 0 {
468            return Ok(0);
469        }
470
471        // Set submission queue tail
472        // 设置提交队列尾
473        unsafe {
474            *(self.sq.tail as *mut u32) = tail;
475        }
476
477        // Use IORING_ENTER_GETEVENTS flag to also wait for completions
478        // 使用IORING_ENTER_GETEVENTS标志同时等待完成
479        let result = unsafe {
480            libc::syscall(
481                426, // __NR_io_uring_enter
482                self.ring_fd as libc::c_long,
483                to_submit as libc::c_long,
484                0, // min_complete
485                1, // flags: IORING_ENTER_GETEVENTS
486                std::ptr::null_mut::<libc::sigset_t>(),
487            ) as libc::c_long
488        };
489
490        if result < 0 {
491            Err(std::io::Error::last_os_error())
492        } else {
493            Ok(result as usize)
494        }
495    }
496
497    /// Get a free submission queue entry
498    /// 获取一个空闲的提交队列条目
499    fn get_free_sqe(&self) -> Option<*mut SubmissionQueueEntry> {
500        let head = unsafe { *self.sq.head };
501        let tail = self.state.sq_tail.load(Ordering::Acquire);
502        let next_tail = tail + 1;
503
504        // Check if queue is full
505        // 检查队列是否已满
506        if next_tail - head >= self.sq.entries {
507            return None;
508        }
509
510        let index = self.sq_pos(tail);
511        unsafe { Some(self.sq.sqes.add(index as usize)) }
512    }
513}
514
515impl Drop for IoUringDriver {
516    fn drop(&mut self) {
517        // Use the actual mmap sizes stored during setup, not hardcoded values
518        // 使用 setup 时存储的实际 mmap 尺寸,而非硬编码值
519        let sqes_size = self.capacity * size_of::<SubmissionQueueEntry>();
520
521        unsafe {
522            libc::munmap(self.sq_ring as *mut libc::c_void, self.sq_ring_mmap_size);
523            libc::munmap(self.cq_ring as *mut libc::c_void, self.cq_ring_mmap_size);
524            libc::munmap(self.sqes as *mut libc::c_void, sqes_size);
525            libc::close(self.ring_fd);
526        }
527    }
528}
529
530impl AsRawFd for IoUringDriver {
531    fn as_raw_fd(&self) -> RawFd {
532        self.ring_fd
533    }
534}
535
536impl Driver for IoUringDriver {
537    fn submit(&self) -> std::io::Result<usize> {
538        let mut submitted = 0;
539
540        // Process all pending submissions from our internal queue
541        // 处理内部队列中所有挂起的提交
542        self.assert_owner();
543        let len = self.state.sq_len.load(Ordering::Acquire);
544        for i in 0..len {
545            let submit_queue = unsafe { &*self.submit_queue.get() };
546            let entry = &submit_queue[i];
547
548            if entry.fd >= 0 {
549                if let Some(sqe) = self.get_free_sqe() {
550                    unsafe {
551                        (*sqe).opcode = entry.opcode;
552                        (*sqe).flags = 0;
553                        (*sqe).ioprio = 0;
554                        (*sqe).fd = entry.fd;
555                        (*sqe).offset = entry.offset as u64;
556                        (*sqe).addr = entry.buf_ptr.map_or(0, |p| p.as_ptr() as u64);
557                        (*sqe).len = entry.buf_len;
558                        (*sqe).rw_flags = 0;
559                        (*sqe).user_data = entry.user_data;
560                        (*sqe).buf_index = 0;
561                        (*sqe).personality = 0;
562
563                        // Set array index
564                        // 设置数组索引
565                        let tail = self.state.sq_tail.load(Ordering::Acquire);
566                        let index = self.sq_pos(tail);
567                        *self.sq.array.add(index as usize) = index;
568
569                        // Advance tail
570                        // 前进尾指针
571                        self.state.sq_tail.store(tail + 1, Ordering::Release);
572                    }
573
574                    submitted += 1;
575                }
576            }
577        }
578
579        // Clear the submission queue
580        // 清空提交队列
581        self.state.sq_len.store(0, Ordering::Release);
582
583        // Submit to kernel
584        // 提交到内核
585        let _kernel_submitted = self.submit_to_kernel()?;
586
587        Ok(submitted)
588    }
589
590    fn wait(&self) -> std::io::Result<usize> {
591        self.wait_timeout(Duration::from_secs(1)).map(|(n, _)| n)
592    }
593
594    fn wait_timeout(&self, duration: Duration) -> std::io::Result<(usize, bool)> {
595        // Convert duration to timespec
596        // 转换持续时间为timespec
597        let ts = libc::timespec {
598            tv_sec: duration.as_secs() as libc::time_t,
599            tv_nsec: duration.subsec_nanos() as libc::c_long,
600        };
601
602        let result = unsafe {
603            libc::syscall(
604                426, // __NR_io_uring_enter
605                self.ring_fd as libc::c_long,
606                0, // to_submit
607                1, // min_complete
608                2, // flags: IORING_ENTER_GETEVENTS | IORING_ENTER_TIMEOUT
609                &ts as *const _ as *const libc::sigset_t,
610            ) as libc::c_long
611        };
612
613        if result < 0 {
614            return Err(std::io::Error::last_os_error());
615        }
616
617        // Process completion queue
618        // 处理完成队列
619        self.assert_owner();
620        let mut completed = 0;
621        let head = self.state.cq_head.load(Ordering::Acquire);
622        let tail = unsafe { *self.cq.tail };
623
624        while head != tail {
625            let index = self.cq_pos(head);
626            let cqe = unsafe { &*self.cq.cqes.add(index as usize) };
627
628            // Store in completion queue
629            // 存储到完成队列
630            unsafe {
631                let completion_queue = &mut *self.completion_queue.get();
632                let pos = self.state.cq_tail.load(Ordering::Acquire) as usize % self.capacity;
633                completion_queue[pos] = Some(CompletionEntry {
634                    user_data: (*cqe).user_data,
635                    result: if (*cqe).res < 0 {
636                        ERROR_TRANSPORT
637                    } else {
638                        (*cqe).res
639                    },
640                    flags: (*cqe).flags,
641                });
642                self.state.cq_tail.fetch_add(1, Ordering::Release);
643            }
644
645            completed += 1;
646            unsafe {
647                *(self.cq.head as *mut u32) = head + 1;
648            }
649        }
650
651        self.state.cq_head.store(tail, Ordering::Release);
652
653        // Check if we timed out
654        // 检查是否超时
655        let timed_out = completed == 0;
656
657        Ok((completed, timed_out))
658    }
659
660    fn get_submission(&self) -> Option<&mut SubmitEntry> {
661        self.assert_owner();
662        let len = self.state.sq_len.load(Ordering::Acquire);
663
664        if len >= self.capacity {
665            return None;
666        }
667
668        self.state.sq_len.fetch_add(1, Ordering::Release);
669
670        unsafe {
671            let submit_queue = &mut *self.submit_queue.get();
672            Some(&mut submit_queue[len])
673        }
674    }
675
676    fn get_completion(&self) -> Option<&CompletionEntry> {
677        self.assert_owner();
678        let head = self.state.cq_head.load(Ordering::Acquire);
679        let tail = self.state.cq_tail.load(Ordering::Acquire);
680
681        if head == tail {
682            return None;
683        }
684
685        unsafe {
686            let completion_queue = &*self.completion_queue.get();
687            let pos = head as usize % self.capacity;
688            completion_queue[pos].as_ref()
689        }
690    }
691
692    fn advance_completion(&self) {
693        self.assert_owner();
694        let head = self.state.cq_head.load(Ordering::Acquire);
695        let tail = self.state.cq_tail.load(Ordering::Acquire);
696
697        if head != tail {
698            unsafe {
699                let completion_queue = &mut *self.completion_queue.get();
700                let pos = head as usize % self.capacity;
701                completion_queue[pos] = None;
702            }
703
704            self.state.cq_head.fetch_add(1, Ordering::Release);
705        }
706    }
707
708    fn register(&self, fd: RawFd, interest: Interest) -> std::io::Result<()> {
709        // io_uring uses POLL_ADD or POLL_REMOVE for registration
710        // For now, we'll use a simple approach with poll operation
711        // io_uring使用POLL_ADD或POLL_REMOVE进行注册
712        // 目前,我们使用简单的poll操作
713
714        let mut events = 0i16;
715        if interest.readable {
716            events |= libc::POLLIN as i16;
717        }
718        if interest.writable {
719            events |= libc::POLLOUT as i16;
720        }
721
722        // Get a free SQE
723        // 获取一个空闲的SQE
724        let sqe = self.get_free_sqe().ok_or_else(|| {
725            std::io::Error::new(std::io::ErrorKind::WouldBlock, "Submission queue full")
726        })?;
727
728        unsafe {
729            (*sqe).opcode = 6; // IORING_OP_POLL_ADD
730            (*sqe).fd = fd;
731            (*sqe).addr = events as u64;
732            (*sqe).len = 0;
733            (*sqe).user_data = fd as u64;
734
735            // Set array index and advance tail
736            // 设置数组索引并前进尾指针
737            let tail = self.state.sq_tail.load(Ordering::Acquire);
738            let index = self.sq_pos(tail);
739            *self.sq.array.add(index as usize) = index;
740            self.state.sq_tail.store(tail + 1, Ordering::Release);
741        }
742
743        Ok(())
744    }
745
746    fn deregister(&self, fd: RawFd) -> std::io::Result<()> {
747        // Use POLL_REMOVE to deregister
748        // 使用POLL_REMOVE注销
749        let sqe = self.get_free_sqe().ok_or_else(|| {
750            std::io::Error::new(std::io::ErrorKind::WouldBlock, "Submission queue full")
751        })?;
752
753        unsafe {
754            (*sqe).opcode = 7; // IORING_OP_POLL_REMOVE
755            (*sqe).fd = -1;
756            (*sqe).addr = fd as u64;
757            (*sqe).user_data = fd as u64;
758
759            let tail = self.state.sq_tail.load(Ordering::Acquire);
760            let index = self.sq_pos(tail);
761            *self.sq.array.add(index as usize) = index;
762            self.state.sq_tail.store(tail + 1, Ordering::Release);
763        }
764
765        Ok(())
766    }
767
768    fn modify(&self, fd: RawFd, interest: Interest) -> std::io::Result<()> {
769        // Remove and re-register
770        // 移除并重新注册
771        self.deregister(fd)?;
772        self.register(fd, interest)
773    }
774
775    fn submission_capacity(&self) -> usize {
776        self.capacity
777    }
778
779    fn completion_capacity(&self) -> usize {
780        self.capacity
781    }
782
783    fn supports_operation(&self, opcode: u8) -> bool {
784        // io_uring supports all operations
785        // io_uring支持所有操作
786        matches!(
787            opcode,
788            crate::driver::opcode::READ
789                | crate::driver::opcode::WRITE
790                | crate::driver::opcode::FSYNC
791                | crate::driver::opcode::CLOSE
792        )
793    }
794}
795
796#[cfg(test)]
797mod tests {
798    use super::*;
799
800    #[test]
801    fn test_iouring_driver_creation() {
802        // This test may fail on systems without io_uring support
803        // 此测试可能在没有io_uring支持的系统上失败
804        let driver = IoUringDriver::new();
805        // Allow test to pass if io_uring is not available
806        // 如果io_uring不可用,允许测试通过
807        let _ = driver;
808    }
809
810    #[test]
811    fn test_iouring_params_size() {
812        assert_eq!(size_of::<IoUringParams>(), 40);
813    }
814
815    #[test]
816    fn test_submission_queue_entry_size() {
817        assert_eq!(size_of::<SubmissionQueueEntry>(), 64);
818    }
819
820    #[test]
821    fn test_completion_queue_entry_size() {
822        assert_eq!(size_of::<CompletionQueueEntry>(), 16);
823    }
824}