uring_fs/
lib.rs

1
2//! # Features
3//! - Truly asynchronous and safe file operations using io-uring.
4//! - Usable with any async runtime.
5//! - Supports: open, stat, read, write.
6//! - Depends on [io_uring](https://crates.io/crates/io_uring) and [libc](https://crates.io/crates/libc).
7//!   So it doesn't run on any operating systems that these don't support.
8//!
9//! See [`IoUring`] documentation for more important infos and examples.
10//! 
11//! # Example
12//! ```rust
13//! # use uring_fs::*;
14//! # use std::fs;
15//! # use std::io::{self, Seek, SeekFrom};
16//! # fn main() -> io::Result<()> {
17//! # extreme::run(async{
18//! let io = IoUring::new()?; // create a new io-uring context
19//! let mut file: fs::File = io.open("src/file.txt", Flags::RDONLY).await?;
20//! //            ^^^^ you could also use File::open or OpenOptions
21//! let info = io.stat("src/file.txt").await?;
22//! // there is also read_all, which doesn't require querying the file size
23//! // using stat, however this is a bit more efficient since we only allocate the data buffer once
24//! let content = io.read(&file, info.size()).await?;
25//! println!("we read {} bytes", content.len());
26//! // btw you can also seek the file using io::Seek
27//! file.seek(SeekFrom::Current(-10));
28//! # Ok(())
29//! # })
30//! # }
31//!```
32//!
33//! # Notes
34//! This library will spawn a reaper thread that waits for io-uring completions and notifies the apropriate future.
35//! Still, this is light weight in comparison to using a thread pool.
36
37use std::{
38    path::Path,
39    thread,
40    task,
41    sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}},
42    io,
43    os::fd::{RawFd, FromRawFd, AsRawFd},
44    task::Waker,
45    pin::Pin,
46    future::Future, mem::zeroed, cell::UnsafeCell, fs::File,
47};
48
49use io_uring::opcode;
50
51/// An `io-uring` subsystem. Can be shared between threads safely.
52///
53/// Will spawn a thread on creation. On drop it will stop the thread, but wait for any I/O operations
54/// to complete first.
55/// If this waiting is an issue to you, you should call [`cancel_all`](IoUring::cancel_all) to cancel all operations.
56/// This will, however leak some memory for every active operation.
57///
58/// Every function that interacts with `io-uring` will panic if that interaction fails. Only errors for *your*
59/// operation will be returned inside the output of the future.
60/// In theory you can check for `io-uring` support using a [`Probe`](https://docs.rs/io-uring/latest/io_uring/register/struct.Probe.html.).
61
62/// # Note
63/// Some function aren't marked as `async`, however they still return futures.
64pub struct IoUring {
65    shared: Arc<IoUringState>,
66    reaper: Option<thread::JoinHandle<()>>
67}
68
69impl Drop for IoUring {
70    /// On drop: Shut down the reaper thread, blocking for pending operations to complete and free their memory.
71    fn drop(&mut self) {
72        self.shutdown().unwrap();
73        self.reaper.take().unwrap().join().unwrap();
74    }
75}
76
77struct IoUringState {
78    pub io_uring: io_uring::IoUring,
79    pub sq_lock: Mutex<()>,
80    pub in_flight: AtomicUsize,
81}
82
83struct Completion {
84    shared: Arc<CompletionState>,
85}
86
87struct CompletionState {
88    inner: Mutex<CompletionInner>, // needs a lock, because it will be accessed by the reaper thread
89    data: CompletionData // extra stuff that needs to be destroyed at completion
90}
91
92enum CompletionData {
93    Path(Box<[u8]>),
94    Stat(StatCompletionData),
95    Buffer(UnsafeCell<Vec<u8>>),
96    ReadOnlyBuffer(Vec<u8>)
97}
98
99unsafe impl Sync for CompletionData {}
100
101impl CompletionData {
102    pub fn as_path(&self) -> &Box<[u8]> {
103        if let Self::Path(val) = self { val }
104        else { unreachable!() }
105    }
106    pub fn as_stat(&self) -> &StatCompletionData {
107        if let Self::Stat(val) = self { val }
108        else { unreachable!() }
109    }
110    pub fn into_stat(self) -> StatCompletionData {
111        if let Self::Stat(val) = self { val }
112        else { unreachable!() }
113    }
114    pub fn as_buffer(&self) -> &UnsafeCell<Vec<u8>> {
115        if let Self::Buffer(val) = self { val }
116        else { unreachable!() }
117    }
118    pub fn into_buffer(self) -> UnsafeCell<Vec<u8>> {
119        if let Self::Buffer(val) = self { val }
120        else { unreachable!() }
121    }
122    pub fn as_read_only_buffer(&self) -> &Vec<u8> {
123        if let Self::ReadOnlyBuffer(val) = self { val }
124        else { unreachable!() }
125    }
126}
127
128struct StatCompletionData {
129    pub path: Box<[u8]>,
130    pub statx: UnsafeCell<libc::statx>
131}
132
133struct CompletionInner {
134    pub waker: Option<Waker>,
135    pub result: Option<i32>,
136}
137
138/// Like OpenOptions. Use libc or the associated constants.
139///
140/// Std `OpenOpentions` sadly doesn't provide any way of getting the libc flags out of it.
141/// At least I don't know of any way.
142pub struct Flags {
143    pub inner: i32
144}
145
146impl Flags {
147    pub const RDONLY: Self = Self { inner: libc::O_RDONLY };
148    pub const WRONLY: Self = Self { inner: libc::O_WRONLY };
149    pub const RDWR:   Self = Self { inner: libc::O_RDWR };
150}
151
152/// Result of a `stat` operation. Information about a file.
153///
154/// The raw result is provided as the `raw` field. Use it for more info.
155pub struct Stat {
156    pub raw: libc::statx,
157}
158
159impl Stat {
160    /// The size of the file in bytes.
161    pub fn size(&self) -> u32 {
162        self.raw.stx_size as u32
163    }
164}
165
166fn io_uring_fd(fd: RawFd) -> io_uring::types::Fd {
167    io_uring::types::Fd(fd)
168}
169
170impl IoUring {
171
172    /// Create a new `io-uring` context.
173    ///
174    /// Will spawn a reaper thread.
175    pub fn new() -> io::Result<Self> {
176
177        let shared = Arc::new(IoUringState {
178            io_uring: io_uring::IoUring::new(4)?,
179            sq_lock: Mutex::new(()),
180            in_flight: AtomicUsize::new(0)
181        });
182
183        let shared_clone = Arc::clone(&shared);
184
185        Ok(Self {
186            shared,
187            reaper: Some(thread::spawn(move || {
188
189                let mut should_exit = false;
190
191                loop {
192
193                    shared_clone.io_uring.submit_and_wait(1).unwrap();
194
195                    // we don't need locking here since the cq is only accessed right here
196                    let cq = unsafe { shared_clone.io_uring.completion_shared() };
197
198                    for entry in cq {
199
200                        if shared_clone.in_flight.fetch_sub(1, Ordering::Relaxed) == 0 {
201                            shared_clone.in_flight.store(0, Ordering::Relaxed); // cancel out the wrap-around: set it back to 0
202                        };
203
204                        if entry.user_data() == 0 {
205                            should_exit = true;
206                            continue;
207                        }
208
209                        let user_data = unsafe { Arc::from_raw(entry.user_data() as *mut CompletionState) };
210                        let mut guard = user_data.inner.lock().unwrap();
211
212                        guard.result = Some(entry.result());
213                        let waker = guard.waker.take();
214                        // it is important to drop the user data before waking up the future, so that the other side has exclusive ownership of the Arc
215                        drop(guard);
216                        drop(user_data);
217
218                        if let Some(waker) = waker {
219                            waker.wake();
220                        }
221                    }
222
223                    // make sure all io requests are finished
224                    if should_exit && shared_clone.in_flight.load(Ordering::Relaxed) == 0 {
225                        return
226                    }
227
228                }
229            }))
230        })
231
232    }
233
234    /// Open a file.
235    ///
236    /// You can also open it using std. See [`Fd`] docs.
237    pub fn open<P: AsRef<Path>>(&self, path: P, flags: Flags) -> impl Future<Output=io::Result<File>> {
238
239        let data = Vec::from(
240            path.as_ref().as_os_str().as_encoded_bytes()
241        ).into_boxed_slice(); // will be kept alive until completion
242
243        let state = Arc::new(CompletionState {
244            inner: Mutex::new(CompletionInner {
245                waker: None,
246                result: None,
247            }),
248            data: CompletionData::Path(data)
249        });
250
251        let reaper_state = Arc::clone(&state);
252        let data_ref = state.data.as_path();
253        let entry = opcode::OpenAt::new(io_uring_fd(libc::AT_FDCWD), data_ref.as_ptr() as *const i8)
254            .flags(flags.inner)
255            .build()
256            .user_data(Arc::into_raw(reaper_state) as u64);
257
258        self.push_and_submit(entry);
259
260        async move {
261
262            let result = Completion {
263                shared: state,
264            }.await;
265
266            if result < 0 {
267                return Err(io::Error::from_raw_os_error(-result))
268            }
269
270            Ok(unsafe { File::from_raw_fd(result) })
271
272        }
273
274
275    }
276
277    /// Obtain information about a file.
278    pub fn stat<P: AsRef<Path>>(&self, path: P) -> impl Future<Output=io::Result<Stat>> {
279
280        let data = StatCompletionData {
281            path: Vec::from(path.as_ref().as_os_str().as_encoded_bytes()).into_boxed_slice(),
282            statx: UnsafeCell::new(unsafe { zeroed::<libc::statx>() })
283        }; // will be kept alive until completion
284
285        let state = Arc::new(CompletionState {
286            inner: Mutex::new(CompletionInner {
287                waker: None,
288                result: None,
289            }),
290            data: CompletionData::Stat(data)
291        });
292
293        let reaper_state = Arc::clone(&state);
294        let data_ref = state.data.as_stat();
295        let entry = opcode::Statx::new(io_uring_fd(libc::AT_FDCWD), data_ref.path.as_ptr() as *const i8, data_ref.statx.get() as *mut _)
296            .build()
297            .user_data(Arc::into_raw(reaper_state) as u64);
298
299        self.push_and_submit(entry);
300        
301        async move {
302
303            let completion_state = Arc::clone(&state);
304            let result = Completion {
305                shared: completion_state,
306            }.await;
307
308            if result < 0 {
309                return Err(io::Error::from_raw_os_error(-result))
310            }
311
312            let exclusive_state = Arc::into_inner(state).unwrap();
313            let data = exclusive_state.data.into_stat();
314            Ok(Stat { raw: data.statx.into_inner() })
315            
316        }
317
318    }
319
320    /// Read exactly `size` bytes from a file.
321    ///
322    /// **This will advance the internal file cursor.**
323    ///
324    /// This function returns a `Vec` so it can be used safely without
325    /// creating memory-leaks or use-after-free bugs.
326    pub fn read(&self, fd: &File, size: u32) -> impl Future<Output=io::Result<Vec<u8>>> {
327
328        let mut data = UnsafeCell::new(Vec::new());
329        data.get_mut().resize(size as usize, 0);
330
331        let state = Arc::new(CompletionState {
332            inner: Mutex::new(CompletionInner {
333                waker: None,
334                result: None,
335            }),
336            data: CompletionData::Buffer(data) 
337        });
338
339        let reaper_state = Arc::clone(&state);
340        let data_ref = state.data.as_buffer();
341        let entry = opcode::Read::new(io_uring_fd(fd.as_raw_fd()), unsafe { &mut *data_ref.get() }.as_mut_ptr() as *mut _, size as u32)
342            .offset(-1i64 as u64)
343            .build()
344            .user_data(Arc::into_raw(reaper_state) as u64);
345
346        self.push_and_submit(entry);
347
348        async move {
349
350            let completion_state = Arc::clone(&state);
351            let result = Completion {
352                shared: completion_state,
353            }.await;
354
355            if result < 0 {
356                return Err(io::Error::from_raw_os_error(-result))
357            }
358
359            let exclusive_state = Arc::into_inner(state).unwrap();
360            let mut data = exclusive_state.data.into_buffer().into_inner(); // there will be no other references to this at this point
361            data.truncate(result as usize); // important because we might have read less bytes than requested
362            Ok(data)
363
364        }
365
366    }
367
368    /// Read the full file.
369    ///
370    /// **This will advance the internal file cursor.**
371    pub async fn read_all(&self, fd: &File) -> io::Result<Vec<u8>> {
372
373        let mut buffer = Vec::with_capacity(2048);
374        let mut chunk_size = 2048;
375
376        loop {
377            let some_data = self.read(fd, chunk_size).await?;
378            if some_data.is_empty() { break };
379            buffer.extend_from_slice(&some_data);
380            chunk_size *= 2;
381        }
382
383        Ok(buffer)
384        
385    }
386
387    /// Write all the data to the file. 
388    ///
389    /// **This will advance the internal file cursor.**
390    pub fn write(&self, fd: &File, buffer: Vec<u8>) -> impl Future<Output=io::Result<()>> {
391
392        let state = Arc::new(CompletionState {
393            inner: Mutex::new(CompletionInner {
394                waker: None,
395                result: None,
396            }),
397            data: CompletionData::ReadOnlyBuffer(buffer)
398        });
399
400        let reaper_state = Arc::clone(&state);
401        let data_ref = state.data.as_read_only_buffer();
402        let entry = opcode::Write::new(io_uring_fd(fd.as_raw_fd()), data_ref.as_ptr(), data_ref.len() as u32)
403            .offset(-1i64 as u64)
404            .build()
405            .user_data(Arc::into_raw(reaper_state) as u64);
406
407        self.push_and_submit(entry);
408
409        async move {
410            
411            let result = Completion {
412                shared: state
413            }.await;
414
415            if result < 0 {
416                return Err(io::Error::from_raw_os_error(-result))
417            }
418
419            Ok(())
420
421        }
422
423    }
424
425    /// Soft-cancels all currently running I/O operations.
426    /// 
427    /// This means, that if the reaper thread is stopped (by dropping this struct),
428    /// this will potentially leak their memory and make their futures never complete.
429    pub fn cancel_all(&self) -> io::Result<()> {
430
431        self.shared.in_flight.store(0, Ordering::Relaxed);
432
433        Ok(())
434        
435    }
436
437    fn push_and_submit(&self, entry: io_uring::squeue::Entry) {
438        
439        let guard = self.shared.sq_lock.lock().unwrap();
440        let mut sq = unsafe { self.shared.io_uring.submission_shared() };
441
442        unsafe { sq.push(&entry).unwrap() };
443
444        sq.sync();
445
446        assert!(sq.len() == 1);
447
448        self.shared.in_flight.fetch_add(1, Ordering::Relaxed);
449        self.shared.io_uring.submit().unwrap();
450
451        drop(sq); // <- sq and guard MUST BE dropped together in this order
452        drop(guard);
453
454    }
455
456    fn shutdown(&self) -> io::Result<()> {
457
458        let entry = opcode::Nop::new()
459            .build()
460            .user_data(0);
461    
462        self.push_and_submit(entry);
463
464        Ok(())
465        
466    }
467
468}
469
470impl Future for Completion {
471    type Output = i32;
472    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
473        let mut guard = self.shared.inner.lock().unwrap();
474        if let Some(result) = guard.result {
475            task::Poll::Ready(result)
476        } else {
477            guard.waker = Some(cx.waker().clone());
478            task::Poll::Pending
479        }
480    }
481}
482
483#[cfg(test)]
484mod test {
485    #[test]
486    fn foo() {
487        extreme::run(assert_send(async {
488            let io = crate::IoUring::new().unwrap();
489            std::env::set_current_dir("src").unwrap();
490            let fd = io.open("file.txt", crate::Flags::RDWR).await.unwrap();
491            let _stat = io.stat("file.txt").await.unwrap();
492            let content = io.read_all(&fd).await.unwrap();
493            println!("{}", String::from_utf8_lossy(&content));
494        }))
495    }
496    fn assert_send<T: Send>(t: T) -> T {
497        t
498    }
499}