1
2use std::{
38 path::Path,
39 thread,
40 task,
41 sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}},
42 io,
43 os::fd::{RawFd, FromRawFd, AsRawFd},
44 task::Waker,
45 pin::Pin,
46 future::Future, mem::zeroed, cell::UnsafeCell, fs::File,
47};
48
49use io_uring::opcode;
50
51pub struct IoUring {
65 shared: Arc<IoUringState>,
66 reaper: Option<thread::JoinHandle<()>>
67}
68
69impl Drop for IoUring {
70 fn drop(&mut self) {
72 self.shutdown().unwrap();
73 self.reaper.take().unwrap().join().unwrap();
74 }
75}
76
77struct IoUringState {
78 pub io_uring: io_uring::IoUring,
79 pub sq_lock: Mutex<()>,
80 pub in_flight: AtomicUsize,
81}
82
83struct Completion {
84 shared: Arc<CompletionState>,
85}
86
87struct CompletionState {
88 inner: Mutex<CompletionInner>, data: CompletionData }
91
92enum CompletionData {
93 Path(Box<[u8]>),
94 Stat(StatCompletionData),
95 Buffer(UnsafeCell<Vec<u8>>),
96 ReadOnlyBuffer(Vec<u8>)
97}
98
99unsafe impl Sync for CompletionData {}
100
101impl CompletionData {
102 pub fn as_path(&self) -> &Box<[u8]> {
103 if let Self::Path(val) = self { val }
104 else { unreachable!() }
105 }
106 pub fn as_stat(&self) -> &StatCompletionData {
107 if let Self::Stat(val) = self { val }
108 else { unreachable!() }
109 }
110 pub fn into_stat(self) -> StatCompletionData {
111 if let Self::Stat(val) = self { val }
112 else { unreachable!() }
113 }
114 pub fn as_buffer(&self) -> &UnsafeCell<Vec<u8>> {
115 if let Self::Buffer(val) = self { val }
116 else { unreachable!() }
117 }
118 pub fn into_buffer(self) -> UnsafeCell<Vec<u8>> {
119 if let Self::Buffer(val) = self { val }
120 else { unreachable!() }
121 }
122 pub fn as_read_only_buffer(&self) -> &Vec<u8> {
123 if let Self::ReadOnlyBuffer(val) = self { val }
124 else { unreachable!() }
125 }
126}
127
128struct StatCompletionData {
129 pub path: Box<[u8]>,
130 pub statx: UnsafeCell<libc::statx>
131}
132
133struct CompletionInner {
134 pub waker: Option<Waker>,
135 pub result: Option<i32>,
136}
137
138pub struct Flags {
143 pub inner: i32
144}
145
146impl Flags {
147 pub const RDONLY: Self = Self { inner: libc::O_RDONLY };
148 pub const WRONLY: Self = Self { inner: libc::O_WRONLY };
149 pub const RDWR: Self = Self { inner: libc::O_RDWR };
150}
151
152pub struct Stat {
156 pub raw: libc::statx,
157}
158
159impl Stat {
160 pub fn size(&self) -> u32 {
162 self.raw.stx_size as u32
163 }
164}
165
166fn io_uring_fd(fd: RawFd) -> io_uring::types::Fd {
167 io_uring::types::Fd(fd)
168}
169
170impl IoUring {
171
172 pub fn new() -> io::Result<Self> {
176
177 let shared = Arc::new(IoUringState {
178 io_uring: io_uring::IoUring::new(4)?,
179 sq_lock: Mutex::new(()),
180 in_flight: AtomicUsize::new(0)
181 });
182
183 let shared_clone = Arc::clone(&shared);
184
185 Ok(Self {
186 shared,
187 reaper: Some(thread::spawn(move || {
188
189 let mut should_exit = false;
190
191 loop {
192
193 shared_clone.io_uring.submit_and_wait(1).unwrap();
194
195 let cq = unsafe { shared_clone.io_uring.completion_shared() };
197
198 for entry in cq {
199
200 if shared_clone.in_flight.fetch_sub(1, Ordering::Relaxed) == 0 {
201 shared_clone.in_flight.store(0, Ordering::Relaxed); };
203
204 if entry.user_data() == 0 {
205 should_exit = true;
206 continue;
207 }
208
209 let user_data = unsafe { Arc::from_raw(entry.user_data() as *mut CompletionState) };
210 let mut guard = user_data.inner.lock().unwrap();
211
212 guard.result = Some(entry.result());
213 let waker = guard.waker.take();
214 drop(guard);
216 drop(user_data);
217
218 if let Some(waker) = waker {
219 waker.wake();
220 }
221 }
222
223 if should_exit && shared_clone.in_flight.load(Ordering::Relaxed) == 0 {
225 return
226 }
227
228 }
229 }))
230 })
231
232 }
233
234 pub fn open<P: AsRef<Path>>(&self, path: P, flags: Flags) -> impl Future<Output=io::Result<File>> {
238
239 let data = Vec::from(
240 path.as_ref().as_os_str().as_encoded_bytes()
241 ).into_boxed_slice(); let state = Arc::new(CompletionState {
244 inner: Mutex::new(CompletionInner {
245 waker: None,
246 result: None,
247 }),
248 data: CompletionData::Path(data)
249 });
250
251 let reaper_state = Arc::clone(&state);
252 let data_ref = state.data.as_path();
253 let entry = opcode::OpenAt::new(io_uring_fd(libc::AT_FDCWD), data_ref.as_ptr() as *const i8)
254 .flags(flags.inner)
255 .build()
256 .user_data(Arc::into_raw(reaper_state) as u64);
257
258 self.push_and_submit(entry);
259
260 async move {
261
262 let result = Completion {
263 shared: state,
264 }.await;
265
266 if result < 0 {
267 return Err(io::Error::from_raw_os_error(-result))
268 }
269
270 Ok(unsafe { File::from_raw_fd(result) })
271
272 }
273
274
275 }
276
277 pub fn stat<P: AsRef<Path>>(&self, path: P) -> impl Future<Output=io::Result<Stat>> {
279
280 let data = StatCompletionData {
281 path: Vec::from(path.as_ref().as_os_str().as_encoded_bytes()).into_boxed_slice(),
282 statx: UnsafeCell::new(unsafe { zeroed::<libc::statx>() })
283 }; let state = Arc::new(CompletionState {
286 inner: Mutex::new(CompletionInner {
287 waker: None,
288 result: None,
289 }),
290 data: CompletionData::Stat(data)
291 });
292
293 let reaper_state = Arc::clone(&state);
294 let data_ref = state.data.as_stat();
295 let entry = opcode::Statx::new(io_uring_fd(libc::AT_FDCWD), data_ref.path.as_ptr() as *const i8, data_ref.statx.get() as *mut _)
296 .build()
297 .user_data(Arc::into_raw(reaper_state) as u64);
298
299 self.push_and_submit(entry);
300
301 async move {
302
303 let completion_state = Arc::clone(&state);
304 let result = Completion {
305 shared: completion_state,
306 }.await;
307
308 if result < 0 {
309 return Err(io::Error::from_raw_os_error(-result))
310 }
311
312 let exclusive_state = Arc::into_inner(state).unwrap();
313 let data = exclusive_state.data.into_stat();
314 Ok(Stat { raw: data.statx.into_inner() })
315
316 }
317
318 }
319
320 pub fn read(&self, fd: &File, size: u32) -> impl Future<Output=io::Result<Vec<u8>>> {
327
328 let mut data = UnsafeCell::new(Vec::new());
329 data.get_mut().resize(size as usize, 0);
330
331 let state = Arc::new(CompletionState {
332 inner: Mutex::new(CompletionInner {
333 waker: None,
334 result: None,
335 }),
336 data: CompletionData::Buffer(data)
337 });
338
339 let reaper_state = Arc::clone(&state);
340 let data_ref = state.data.as_buffer();
341 let entry = opcode::Read::new(io_uring_fd(fd.as_raw_fd()), unsafe { &mut *data_ref.get() }.as_mut_ptr() as *mut _, size as u32)
342 .offset(-1i64 as u64)
343 .build()
344 .user_data(Arc::into_raw(reaper_state) as u64);
345
346 self.push_and_submit(entry);
347
348 async move {
349
350 let completion_state = Arc::clone(&state);
351 let result = Completion {
352 shared: completion_state,
353 }.await;
354
355 if result < 0 {
356 return Err(io::Error::from_raw_os_error(-result))
357 }
358
359 let exclusive_state = Arc::into_inner(state).unwrap();
360 let mut data = exclusive_state.data.into_buffer().into_inner(); data.truncate(result as usize); Ok(data)
363
364 }
365
366 }
367
368 pub async fn read_all(&self, fd: &File) -> io::Result<Vec<u8>> {
372
373 let mut buffer = Vec::with_capacity(2048);
374 let mut chunk_size = 2048;
375
376 loop {
377 let some_data = self.read(fd, chunk_size).await?;
378 if some_data.is_empty() { break };
379 buffer.extend_from_slice(&some_data);
380 chunk_size *= 2;
381 }
382
383 Ok(buffer)
384
385 }
386
387 pub fn write(&self, fd: &File, buffer: Vec<u8>) -> impl Future<Output=io::Result<()>> {
391
392 let state = Arc::new(CompletionState {
393 inner: Mutex::new(CompletionInner {
394 waker: None,
395 result: None,
396 }),
397 data: CompletionData::ReadOnlyBuffer(buffer)
398 });
399
400 let reaper_state = Arc::clone(&state);
401 let data_ref = state.data.as_read_only_buffer();
402 let entry = opcode::Write::new(io_uring_fd(fd.as_raw_fd()), data_ref.as_ptr(), data_ref.len() as u32)
403 .offset(-1i64 as u64)
404 .build()
405 .user_data(Arc::into_raw(reaper_state) as u64);
406
407 self.push_and_submit(entry);
408
409 async move {
410
411 let result = Completion {
412 shared: state
413 }.await;
414
415 if result < 0 {
416 return Err(io::Error::from_raw_os_error(-result))
417 }
418
419 Ok(())
420
421 }
422
423 }
424
425 pub fn cancel_all(&self) -> io::Result<()> {
430
431 self.shared.in_flight.store(0, Ordering::Relaxed);
432
433 Ok(())
434
435 }
436
437 fn push_and_submit(&self, entry: io_uring::squeue::Entry) {
438
439 let guard = self.shared.sq_lock.lock().unwrap();
440 let mut sq = unsafe { self.shared.io_uring.submission_shared() };
441
442 unsafe { sq.push(&entry).unwrap() };
443
444 sq.sync();
445
446 assert!(sq.len() == 1);
447
448 self.shared.in_flight.fetch_add(1, Ordering::Relaxed);
449 self.shared.io_uring.submit().unwrap();
450
451 drop(sq); drop(guard);
453
454 }
455
456 fn shutdown(&self) -> io::Result<()> {
457
458 let entry = opcode::Nop::new()
459 .build()
460 .user_data(0);
461
462 self.push_and_submit(entry);
463
464 Ok(())
465
466 }
467
468}
469
470impl Future for Completion {
471 type Output = i32;
472 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
473 let mut guard = self.shared.inner.lock().unwrap();
474 if let Some(result) = guard.result {
475 task::Poll::Ready(result)
476 } else {
477 guard.waker = Some(cx.waker().clone());
478 task::Poll::Pending
479 }
480 }
481}
482
483#[cfg(test)]
484mod test {
485 #[test]
486 fn foo() {
487 extreme::run(assert_send(async {
488 let io = crate::IoUring::new().unwrap();
489 std::env::set_current_dir("src").unwrap();
490 let fd = io.open("file.txt", crate::Flags::RDWR).await.unwrap();
491 let _stat = io.stat("file.txt").await.unwrap();
492 let content = io.read_all(&fd).await.unwrap();
493 println!("{}", String::from_utf8_lossy(&content));
494 }))
495 }
496 fn assert_send<T: Send>(t: T) -> T {
497 t
498 }
499}