libblobd_direct/backing_store/
uring.rs

1use super::BackingStore;
2use crate::pages::Pages;
3use async_trait::async_trait;
4use bufpool::buf::Buf;
5use dashmap::DashMap;
6use io_uring::cqueue::Entry as CEntry;
7use io_uring::opcode;
8use io_uring::squeue::Entry as SEntry;
9use io_uring::types;
10use io_uring::IoUring;
11use off64::u32;
12use off64::usz;
13use signal_future::SignalFuture;
14use signal_future::SignalFutureController;
15use std::collections::VecDeque;
16use std::fmt;
17use std::fs::File;
18use std::io;
19use std::os::fd::AsRawFd;
20use std::sync::Arc;
21use std::thread;
22use strum::Display;
23use tokio::time::Instant;
24use tracing::trace;
25
26fn assert_result_is_ok(req: &Request, res: i32) -> u32 {
27  if res < 0 {
28    panic!(
29      "{:?} failed with {:?}",
30      req,
31      io::Error::from_raw_os_error(-res)
32    );
33  };
34  u32!(res)
35}
36
37struct ReadRequest {
38  out_buf: Buf,
39  offset: u64,
40  len: u32,
41}
42
43struct WriteRequest {
44  offset: u64,
45  data: Buf,
46}
47
48#[derive(Display)]
49enum Request {
50  Read {
51    req: ReadRequest,
52    res: SignalFutureController<Buf>,
53  },
54  Write {
55    req: WriteRequest,
56    res: SignalFutureController<Buf>,
57  },
58  Sync {
59    res: SignalFutureController<()>,
60  },
61}
62
63impl fmt::Debug for Request {
64  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65    match self {
66      Self::Read { req, .. } => write!(f, "read {} len {}", req.offset, req.len),
67      Self::Write { req, .. } => write!(f, "write {} len {}", req.offset, req.data.len()),
68      Self::Sync { .. } => write!(f, "sync"),
69    }
70  }
71}
72
73// For now, we just use one ring, with one submitter on one thread and one receiver on another thread. While there are possibly faster ways, like one ring per thread and using one thread for both submitting and receiving (to avoid any locking), the actual I/O should be the bottleneck so we can just stick with this for now.
74/// This can be cheaply cloned.
75#[derive(Clone)]
76pub(crate) struct UringBackingStore {
77  pages: Pages,
78  // We don't use std::sync::mpsc::Sender as it is not Sync, so it's really complicated to use from any async function.
79  sender: crossbeam_channel::Sender<Request>,
80}
81
82/// For advanced users only. Some of these may cause EINVAL, or worsen performance.
83#[derive(Clone, Default, Debug)]
84pub(crate) struct UringCfg {
85  pub coop_taskrun: bool,
86  pub defer_taskrun: bool,
87  pub iopoll: bool,
88  /// This requires CAP_SYS_NICE.
89  pub sqpoll: Option<u32>,
90}
91
92impl UringBackingStore {
93  /// `offset` must be a multiple of the underlying device's sector size.
94  pub fn new(file: File, pages: Pages, cfg: UringCfg) -> Self {
95    let (sender, receiver) = crossbeam_channel::unbounded::<Request>();
96    let pending: Arc<DashMap<u64, (Request, Instant)>> = Default::default();
97    let ring = {
98      let mut builder = IoUring::<SEntry, CEntry>::builder();
99      builder.setup_clamp();
100      if cfg.coop_taskrun {
101        builder.setup_coop_taskrun();
102      };
103      if cfg.defer_taskrun {
104        builder.setup_defer_taskrun();
105      };
106      if cfg.iopoll {
107        builder.setup_iopoll();
108      }
109      if let Some(sqpoll) = cfg.sqpoll {
110        builder.setup_sqpoll(sqpoll);
111      };
112      builder.build(134217728).unwrap()
113    };
114    ring
115      .submitter()
116      .register_files(&[file.as_raw_fd()])
117      .unwrap();
118    let ring = Arc::new(ring);
119    // Submission thread.
120    thread::spawn({
121      let pending = pending.clone();
122      let ring = ring.clone();
123      // This is outside the loop to avoid reallocation each time.
124      let mut msgbuf = VecDeque::new();
125      move || {
126        let mut submission = unsafe { ring.submission_shared() };
127        let mut next_id = 0;
128        // If this loop exits, it means we've dropped the `UringBackingStore` and can safely stop.
129        while let Ok(init_msg) = receiver.recv() {
130          // Process multiple messages at once to avoid too many io_uring submits.
131          msgbuf.push_back(init_msg);
132          while let Ok(msg) = receiver.try_recv() {
133            msgbuf.push_back(msg);
134          }
135          // How the io_uring submission queue work:
136          // - The buffer is shared between the kernel and userspace.
137          // - There are atomic head and tail indices that allow them to be shared mutably between kernel and userspace safely.
138          // - The Rust library we're using abstracts over this by caching the head and tail as local values. Once we've made our inserts, we update the atomic tail and then tell the kernel to consume some of the queue. When we update the atomic tail, we also check the atomic head and update our local cached value; some entries may have been consumed by the kernel in some other thread since we last checked and we may actually have more free space than we thought.
139          while let Some(msg) = msgbuf.pop_front() {
140            let id = next_id;
141            next_id += 1;
142            trace!(id, typ = msg.to_string(), "submitting request");
143            let submission_entry = match &msg {
144              Request::Read { req, .. } => {
145                // Using `as_mut_ptr` would require a mutable borrow.
146                let ptr = req.out_buf.as_ptr() as *mut u8;
147                opcode::Read::new(types::Fixed(0), ptr, req.len)
148                  .offset(req.offset)
149                  .build()
150                  .user_data(id)
151              }
152              Request::Write { req, .. } => {
153                // Using `as_mut_ptr` would require a mutable borrow.
154                let ptr = req.data.as_ptr() as *mut u8;
155                // This takes ownership of `buf` which will ensure it doesn't get dropped while waiting for io_uring.
156                let len = u32!(req.data.len());
157                opcode::Write::new(types::Fixed(0), ptr, len)
158                  .offset(req.offset)
159                  .build()
160                  .user_data(id)
161              }
162              Request::Sync { .. } => opcode::Fsync::new(types::Fixed(0)).build().user_data(id),
163            };
164            // Insert before submitting.
165            pending.insert(id, (msg, Instant::now()));
166            if submission.is_full() {
167              submission.sync();
168              ring.submit_and_wait(1).unwrap();
169            }
170            unsafe {
171              // This call only has one error: queue is full. It should never happen because we just checked that it's not.
172              submission.push(&submission_entry).unwrap();
173            };
174          }
175          submission.sync();
176          // This is still necessary even with sqpoll, as our kernel thread may have gone to sleep.
177          ring.submit().unwrap();
178        }
179      }
180    });
181
182    // Completion thread.
183    thread::spawn({
184      let pending = pending.clone();
185      let ring = ring.clone();
186      move || {
187        let mut completion = unsafe { ring.completion_shared() };
188        // TODO Stop this loop if `UringBackingStore` has been dropped.
189        loop {
190          let Some(e) = completion.next() else {
191            ring.submit_and_wait(1).unwrap();
192            completion.sync();
193            continue;
194          };
195          let id = e.user_data();
196          let (req, started) = pending.remove(&id).unwrap().1;
197          trace!(
198            id,
199            typ = req.to_string(),
200            exec_us = started.elapsed().as_micros(),
201            "completing request"
202          );
203          let rv = assert_result_is_ok(&req, e.result());
204          match req {
205            Request::Read { req, res } => {
206              // We may have read fewer bytes.
207              assert_eq!(usz!(rv), req.out_buf.len());
208              res.signal(req.out_buf);
209            }
210            Request::Write { req, res } => {
211              // Assert that all requested bytes to write were written.
212              assert_eq!(rv, u32!(req.data.len()));
213              res.signal(req.data);
214            }
215            Request::Sync { res } => {
216              res.signal(());
217            }
218          }
219        }
220      }
221    });
222
223    Self { pages, sender }
224  }
225}
226
227// Sources:
228// - Example: https://github1s.com/tokio-rs/io-uring/blob/HEAD/examples/tcp_echo.rs
229// - liburing docs: https://unixism.net/loti/ref-liburing/completion.html
230// - Quick high-level overview: https://man.archlinux.org/man/io_uring.7.en
231// - io_uring walkthrough: https://unixism.net/2020/04/io-uring-by-example-part-1-introduction/
232// - Multithreading:
233//   - https://github.com/axboe/liburing/issues/109#issuecomment-1114213402
234//   - https://github.com/axboe/liburing/issues/109#issuecomment-1166378978
235//   - https://github.com/axboe/liburing/issues/109#issuecomment-614911522
236//   - https://github.com/axboe/liburing/issues/125
237//   - https://github.com/axboe/liburing/issues/127
238//   - https://github.com/axboe/liburing/issues/129
239//   - https://github.com/axboe/liburing/issues/571#issuecomment-1106480309
240// - Kernel poller: https://unixism.net/loti/tutorial/sq_poll.html
241#[async_trait]
242impl BackingStore for UringBackingStore {
243  /// `offset` and `len` must be multiples of the underlying device's sector size.
244  async fn read_at(&self, offset: u64, len: u64) -> Buf {
245    let out_buf = self.pages.allocate_uninitialised(len);
246    let (fut, fut_ctl) = SignalFuture::new();
247    self
248      .sender
249      .send(Request::Read {
250        req: ReadRequest {
251          out_buf,
252          offset,
253          len: u32!(len),
254        },
255        res: fut_ctl,
256      })
257      .unwrap();
258    fut.await
259  }
260
261  /// `offset` and `data.len()` must be multiples of the underlying device's sector size.
262  /// Returns the original `data` so that it can be reused, if desired.
263  async fn write_at(&self, offset: u64, data: Buf) -> Buf {
264    let (fut, fut_ctl) = SignalFuture::new();
265    self
266      .sender
267      .send(Request::Write {
268        req: WriteRequest { offset, data },
269        res: fut_ctl,
270      })
271      .unwrap();
272    fut.await
273  }
274
275  /// Even when using direct I/O, `fsync` is still necessary, as it ensures the device itself has flushed any internal caches.
276  async fn sync(&self) {
277    let (fut, fut_ctl) = SignalFuture::new();
278    self.sender.send(Request::Sync { res: fut_ctl }).unwrap();
279    fut.await
280  }
281}