polkavm_linux_raw/
io_uring.rs

1use crate as linux_raw;
2use core::sync::atomic::{AtomicU32, Ordering};
3use core::time::Duration;
4
5pub struct IoUring {
6    submit_head_pointer: *const AtomicU32,
7    submit_tail_pointer: *const AtomicU32,
8    submit_head: u32,
9    submit_tail: u32,
10    submit_ring_mask: u32,
11    submit_capacity: u32,
12
13    cqes: *const linux_raw::io_uring_cqe,
14    completion_head_pointer: *const AtomicU32,
15    completion_tail_pointer: *const AtomicU32,
16    completion_head: u32,
17    completion_tail: u32,
18    completion_ring_mask: u32,
19
20    fd: linux_raw::Fd,
21    _ring_map: linux_raw::Mmap,
22    sqes_map: linux_raw::Mmap,
23}
24
25unsafe impl Send for IoUring {}
26unsafe impl Sync for IoUring {}
27
28impl IoUring {
29    pub fn new(queue_size: u32) -> Result<Self, linux_raw::Error> {
30        let mut params = linux_raw::io_uring_params::default();
31        let fd = linux_raw::sys_io_uring_setup(queue_size, &mut params)?;
32        let sring_size = params.sq_off.array + params.sq_entries * core::mem::size_of::<u32>() as u32;
33        let cring_size = params.cq_off.cqes + params.cq_entries * core::mem::size_of::<linux_raw::io_uring_cqe>() as u32;
34        let ring_size = core::cmp::max(sring_size, cring_size);
35        let ring_map = unsafe {
36            linux_raw::Mmap::map(
37                core::ptr::null_mut(),
38                ring_size as usize,
39                linux_raw::PROT_READ | linux_raw::PROT_WRITE,
40                linux_raw::MAP_SHARED | linux_raw::MAP_POPULATE,
41                Some(fd.borrow()),
42                u64::from(linux_raw::IORING_OFF_SQ_RING),
43            )?
44        };
45
46        let sqes_map = unsafe {
47            linux_raw::Mmap::map(
48                core::ptr::null_mut(),
49                params.sq_entries as usize * core::mem::size_of::<linux_raw::io_uring_sqe>(),
50                linux_raw::PROT_READ | linux_raw::PROT_WRITE,
51                linux_raw::MAP_SHARED | linux_raw::MAP_POPULATE,
52                Some(fd.borrow()),
53                u64::from(linux_raw::IORING_OFF_SQES),
54            )?
55        };
56
57        let submit_head_pointer = unsafe { ring_map.as_ptr().byte_add(params.sq_off.head as usize).cast::<AtomicU32>() };
58        let submit_tail_pointer = unsafe { ring_map.as_ptr().byte_add(params.sq_off.tail as usize).cast::<AtomicU32>() };
59        let submit_capacity = unsafe { *ring_map.as_ptr().byte_add(params.sq_off.ring_entries as usize).cast::<u32>() };
60        let array = unsafe { ring_map.as_mut_ptr().byte_add(params.sq_off.array as usize).cast::<u32>() };
61        for index in 0..submit_capacity {
62            unsafe { array.add(index as usize).write(index) };
63        }
64
65        let completion_head_pointer = unsafe { ring_map.as_ptr().byte_add(params.cq_off.head as usize).cast::<AtomicU32>() };
66        let completion_tail_pointer = unsafe { ring_map.as_ptr().byte_add(params.cq_off.tail as usize).cast::<AtomicU32>() };
67
68        Ok(IoUring {
69            submit_head_pointer,
70            submit_tail_pointer,
71            submit_head: unsafe { (*submit_head_pointer).load(Ordering::Acquire) },
72            submit_tail: unsafe { submit_tail_pointer.cast::<u32>().read() },
73            submit_ring_mask: unsafe { *ring_map.as_ptr().byte_add(params.sq_off.ring_mask as usize).cast::<u32>() },
74            submit_capacity,
75
76            cqes: unsafe {
77                ring_map
78                    .as_ptr()
79                    .byte_add(params.cq_off.cqes as usize)
80                    .cast::<linux_raw::io_uring_cqe>()
81            },
82            completion_head_pointer,
83            completion_tail_pointer,
84            completion_head: unsafe { completion_head_pointer.cast::<u32>().read() },
85            completion_tail: unsafe { (*completion_tail_pointer).load(Ordering::Acquire) },
86            completion_ring_mask: unsafe { *ring_map.as_ptr().byte_add(params.cq_off.ring_mask as usize).cast::<u32>() },
87
88            fd,
89            _ring_map: ring_map,
90            sqes_map,
91        })
92    }
93
94    fn append_sqe(&mut self, sqe: linux_raw::io_uring_sqe) {
95        let index = self.submit_tail & self.submit_ring_mask;
96
97        unsafe {
98            self.sqes_map
99                .as_mut_ptr()
100                .cast::<linux_raw::io_uring_sqe>()
101                .add(index as usize)
102                .write(sqe)
103        };
104        self.submit_tail = self.submit_tail.wrapping_add(1);
105    }
106
107    #[allow(clippy::field_reassign_with_default)]
108    pub fn queue_read(&mut self, user_data: u64, fd: linux_raw::FdRef, buffer: *mut u8, length: u32) -> Result<(), linux_raw::Error> {
109        if self.queue_length() >= self.queue_capacity() {
110            return Err(linux_raw::Error::from("no remaining capacity in the io_uring submission queue"));
111        }
112
113        let mut sqe = linux_raw::io_uring_sqe::default();
114        sqe.opcode = linux_raw::io_uring_op_IORING_OP_READ as u8;
115        sqe.fd = fd.raw();
116        sqe.__bindgen_anon_2.addr = buffer as u64;
117        sqe.len = length;
118        sqe.__bindgen_anon_1.off = u64::MAX;
119        sqe.user_data = user_data;
120        self.append_sqe(sqe);
121
122        Ok(())
123    }
124
125    #[allow(clippy::field_reassign_with_default)]
126    pub fn queue_timeout(&mut self, user_data: u64, event_count: u32, duration: Duration) -> Result<(), linux_raw::Error> {
127        if self.queue_length() >= self.queue_capacity() {
128            return Err(linux_raw::Error::from("no remaining capacity in the io_uring submission queue"));
129        }
130
131        let mut sqe = linux_raw::io_uring_sqe::default();
132        sqe.opcode = linux_raw::io_uring_op_IORING_OP_TIMEOUT as u8;
133        sqe.fd = -1;
134        let timespec = linux_raw::timespec {
135            tv_sec: duration.as_secs() as i64,
136            tv_nsec: i64::from(duration.subsec_nanos()),
137        };
138        sqe.__bindgen_anon_2.addr = core::ptr::addr_of!(timespec) as u64;
139        sqe.len = event_count;
140        sqe.__bindgen_anon_1.off = 0;
141        sqe.user_data = user_data;
142        self.append_sqe(sqe);
143
144        Ok(())
145    }
146
147    // Requires Linux 6.7+.
148    #[allow(clippy::field_reassign_with_default)]
149    pub fn queue_futex_wait(&mut self, user_data: u64, futex: *const AtomicU32, expected_value: u32) -> Result<(), linux_raw::Error> {
150        if self.queue_length() >= self.queue_capacity() {
151            return Err(linux_raw::Error::from("no remaining capacity in the io_uring submission queue"));
152        }
153
154        let mut sqe = linux_raw::io_uring_sqe::default();
155        sqe.opcode = linux_raw::io_uring_op_IORING_OP_FUTEX_WAIT as u8;
156        sqe.__bindgen_anon_2.addr = futex as usize as u64;
157        sqe.__bindgen_anon_1.addr2 = u64::from(expected_value);
158        unsafe { sqe.__bindgen_anon_6.__bindgen_anon_1.as_mut().addr3 = u64::from(linux_raw::FUTEX_BITSET_MATCH_ANY) };
159        sqe.user_data = user_data;
160        sqe.fd = linux_raw::FUTEX2_SIZE_U32 as i32;
161        self.append_sqe(sqe);
162
163        Ok(())
164    }
165
166    // Requires Linux 6.7+.
167    #[allow(clippy::field_reassign_with_default)]
168    pub fn queue_futex_wake_one(&mut self, user_data: u64, futex: *const AtomicU32) -> Result<(), linux_raw::Error> {
169        if self.queue_length() >= self.queue_capacity() {
170            return Err(linux_raw::Error::from("no remaining capacity in the io_uring submission queue"));
171        }
172
173        let mut sqe = linux_raw::io_uring_sqe::default();
174        sqe.opcode = linux_raw::io_uring_op_IORING_OP_FUTEX_WAKE as u8;
175        sqe.__bindgen_anon_2.addr = futex as usize as u64;
176        sqe.__bindgen_anon_1.addr2 = 1;
177        unsafe { sqe.__bindgen_anon_6.__bindgen_anon_1.as_mut().addr3 = u64::from(linux_raw::FUTEX_BITSET_MATCH_ANY) };
178        sqe.user_data = user_data;
179        sqe.fd = linux_raw::FUTEX2_SIZE_U32 as i32;
180        self.append_sqe(sqe);
181
182        Ok(())
183    }
184
185    // Requires Linux 6.5+.
186    #[allow(clippy::field_reassign_with_default)]
187    pub fn queue_waitid(
188        &mut self,
189        user_data: u64,
190        id_type: u32,
191        id: u32,
192        info: *mut crate::siginfo_t,
193        options: u32,
194    ) -> Result<(), linux_raw::Error> {
195        if self.queue_length() >= self.queue_capacity() {
196            return Err(linux_raw::Error::from("no remaining capacity in the io_uring submission queue"));
197        }
198
199        let mut sqe = linux_raw::io_uring_sqe::default();
200        sqe.user_data = user_data;
201        sqe.opcode = linux_raw::io_uring_op_IORING_OP_WAITID as u8;
202        sqe.fd = id as i32;
203        sqe.len = id_type;
204        sqe.__bindgen_anon_5.file_index = options;
205        sqe.__bindgen_anon_1.addr2 = info as u64;
206        self.append_sqe(sqe);
207
208        Ok(())
209    }
210
211    pub fn queue_length(&self) -> usize {
212        self.submit_tail.wrapping_sub(self.submit_head) as usize
213    }
214
215    pub fn queue_capacity(&self) -> usize {
216        self.submit_capacity as usize
217    }
218
219    pub unsafe fn submit_and_wait(&mut self, min_complete: u32) -> Result<(), linux_raw::Error> {
220        let count = self.queue_length() as u32;
221        if count == 0 && min_complete == 0 {
222            return Ok(());
223        }
224
225        (*self.submit_tail_pointer).store(self.submit_tail, Ordering::Release);
226        (*self.completion_head_pointer).store(self.completion_head, Ordering::Release);
227        let result = linux_raw::sys_io_uring_enter(
228            self.fd.borrow(),
229            count,
230            min_complete,
231            linux_raw::IORING_ENTER_GETEVENTS,
232            core::ptr::null_mut(),
233            0,
234        );
235        self.submit_head = (*self.submit_head_pointer).load(Ordering::Acquire);
236        self.completion_tail = (*self.completion_tail_pointer).load(Ordering::Acquire);
237
238        result?;
239        Ok(())
240    }
241
242    pub fn finished_count(&self) -> usize {
243        self.completion_tail.wrapping_sub(self.completion_head) as usize
244    }
245
246    pub fn pop_finished(&mut self) -> Option<linux_raw::io_uring_cqe> {
247        if self.finished_count() == 0 {
248            return None;
249        }
250
251        let index = self.completion_head & self.completion_ring_mask;
252        let event = unsafe { self.cqes.add(index as usize).read() };
253        self.completion_head = self.completion_head.wrapping_add(1);
254        Some(event)
255    }
256
257    pub fn cancel_all_sync(&mut self) -> Result<(), linux_raw::Error> {
258        let reg = linux_raw::io_uring_sync_cancel_reg {
259            addr: 0,
260            fd: 0,
261            flags: linux_raw::IORING_ASYNC_CANCEL_ANY,
262            timeout: linux_raw::arch_bindings::__kernel_timespec { tv_sec: !0, tv_nsec: !0 },
263            opcode: 0,
264            pad: Default::default(),
265            pad2: Default::default(),
266        };
267
268        linux_raw::sys_io_uring_register(
269            self.fd.borrow(),
270            linux_raw::IORING_REGISTER_SYNC_CANCEL,
271            core::ptr::addr_of!(reg).cast(),
272            1,
273        )
274    }
275}
276
277impl linux_raw::io_uring_cqe {
278    pub fn to_result(&self) -> Result<i32, linux_raw::Error> {
279        linux_raw::Error::from_syscall("io_uring", i64::from(self.res))?;
280        Ok(self.res)
281    }
282}
283
284#[allow(clippy::unwrap_used)]
285#[test]
286fn test_io_uring_read() {
287    let mut io_uring = IoUring::new(2).unwrap();
288    let fd = crate::sys_open(crate::cstr!("/dev/zero"), crate::O_CLOEXEC).unwrap();
289
290    let mut buffer = [0xff; 5];
291    assert_eq!(io_uring.queue_length(), 0);
292    io_uring.queue_read(0x1234, fd.borrow(), buffer.as_mut_ptr(), 1).unwrap();
293    assert_eq!(io_uring.queue_length(), 1);
294    io_uring.queue_read(0x1235, fd.borrow(), buffer[2..].as_mut_ptr(), 2).unwrap();
295    assert_eq!(io_uring.queue_length(), 2);
296    assert_eq!(io_uring.finished_count(), 0);
297    unsafe {
298        io_uring.submit_and_wait(2).unwrap();
299    }
300    assert_eq!(io_uring.queue_length(), 0);
301    assert_eq!(io_uring.finished_count(), 2);
302    assert_eq!(buffer, [0, 0xff, 0, 0, 0xff]);
303
304    let mut completion_1 = io_uring.pop_finished().unwrap();
305    let mut completion_2 = io_uring.pop_finished().unwrap();
306    let completion_3 = io_uring.pop_finished();
307    assert!(completion_3.is_none());
308
309    if completion_1.user_data == 0x1235 {
310        core::mem::swap(&mut completion_1, &mut completion_2);
311    }
312
313    assert_eq!(completion_1.user_data, 0x1234);
314    assert_eq!(completion_1.res, 1);
315    assert_eq!(completion_2.user_data, 0x1235);
316    assert_eq!(completion_2.res, 2);
317}
318
319#[cfg(test)]
320fn get_kernel_version() -> Result<(u32, u32), linux_raw::Error> {
321    let uname = crate::sys_uname()?;
322    let Ok(release) = unsafe { core::ffi::CStr::from_ptr(uname.release.as_ptr().cast()) }.to_str() else {
323        return Err(linux_raw::Error::from_str(
324            "failed to parse the kernel's release string: not valid UTF-8",
325        ));
326    };
327
328    let mut iter = release.split('.');
329    let Some(major) = iter.next().and_then(|major| major.parse::<u32>().ok()) else {
330        return Err(linux_raw::Error::from_str("failed to extract the kernel's major version"));
331    };
332
333    let Some(minor) = iter.next().and_then(|minor| minor.parse::<u32>().ok()) else {
334        return Err(linux_raw::Error::from_str("failed to extract the kernel's minor version"));
335    };
336
337    Ok((major, minor))
338}
339
340#[allow(clippy::unwrap_used)]
341#[test]
342fn test_io_uring_futex_wait() {
343    extern crate std;
344    use std::sync::Arc;
345
346    // TODO: Check for the feature like liburing does ('io_uring_opcode_supported(ring->probe, IORING_OP_FUTEX_WAIT)')
347    let (major, minor) = get_kernel_version().unwrap();
348    if !(major > 6 || (major == 6 && minor > 7)) {
349        return;
350    }
351
352    let futex = Arc::new(AtomicU32::new(0));
353    let mut io_uring = IoUring::new(2).unwrap();
354    io_uring.queue_futex_wait(0x1234, &*futex, 0).unwrap();
355
356    let futex_clone = Arc::clone(&futex);
357    std::thread::spawn(move || {
358        std::thread::sleep(core::time::Duration::from_millis(25));
359        futex_clone.store(1, Ordering::Relaxed);
360        crate::sys_futex_wake_one(&futex_clone).unwrap();
361    });
362
363    unsafe {
364        io_uring.submit_and_wait(1).unwrap();
365    }
366    let completion = io_uring.pop_finished().unwrap();
367    completion.to_result().unwrap();
368    assert_eq!(futex.load(Ordering::Relaxed), 1);
369}
370
371#[allow(clippy::unwrap_used)]
372#[test]
373fn test_io_uring_futex_wake() {
374    extern crate std;
375    use std::sync::Arc;
376
377    // TODO: Check for the feature like liburing does ('io_uring_opcode_supported(ring->probe, IORING_OP_FUTEX_WAIT)')
378    let (major, minor) = get_kernel_version().unwrap();
379    if !(major > 6 || (major == 6 && minor > 7)) {
380        return;
381    }
382
383    let futex = Arc::new(AtomicU32::new(0));
384    let futex_clone = Arc::clone(&futex);
385    std::thread::spawn(move || {
386        std::thread::sleep(core::time::Duration::from_millis(25));
387        futex_clone.store(1, Ordering::Relaxed);
388        let mut io_uring = IoUring::new(2).unwrap();
389        io_uring.queue_futex_wake_one(0x1234, &*futex_clone).unwrap();
390        unsafe {
391            io_uring.submit_and_wait(1).unwrap();
392        }
393        let completion = io_uring.pop_finished().unwrap();
394        completion.to_result().unwrap();
395    });
396
397    linux_raw::sys_futex_wait(&futex, 0, Some(core::time::Duration::from_secs(1))).unwrap();
398    assert_eq!(futex.load(Ordering::Relaxed), 1);
399}