libblobd_direct/backing_store/
uring.rs1use super::BackingStore;
2use crate::pages::Pages;
3use async_trait::async_trait;
4use bufpool::buf::Buf;
5use dashmap::DashMap;
6use io_uring::cqueue::Entry as CEntry;
7use io_uring::opcode;
8use io_uring::squeue::Entry as SEntry;
9use io_uring::types;
10use io_uring::IoUring;
11use off64::u32;
12use off64::usz;
13use signal_future::SignalFuture;
14use signal_future::SignalFutureController;
15use std::collections::VecDeque;
16use std::fmt;
17use std::fs::File;
18use std::io;
19use std::os::fd::AsRawFd;
20use std::sync::Arc;
21use std::thread;
22use strum::Display;
23use tokio::time::Instant;
24use tracing::trace;
25
26fn assert_result_is_ok(req: &Request, res: i32) -> u32 {
27 if res < 0 {
28 panic!(
29 "{:?} failed with {:?}",
30 req,
31 io::Error::from_raw_os_error(-res)
32 );
33 };
34 u32!(res)
35}
36
37struct ReadRequest {
38 out_buf: Buf,
39 offset: u64,
40 len: u32,
41}
42
43struct WriteRequest {
44 offset: u64,
45 data: Buf,
46}
47
48#[derive(Display)]
49enum Request {
50 Read {
51 req: ReadRequest,
52 res: SignalFutureController<Buf>,
53 },
54 Write {
55 req: WriteRequest,
56 res: SignalFutureController<Buf>,
57 },
58 Sync {
59 res: SignalFutureController<()>,
60 },
61}
62
63impl fmt::Debug for Request {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 match self {
66 Self::Read { req, .. } => write!(f, "read {} len {}", req.offset, req.len),
67 Self::Write { req, .. } => write!(f, "write {} len {}", req.offset, req.data.len()),
68 Self::Sync { .. } => write!(f, "sync"),
69 }
70 }
71}
72
73#[derive(Clone)]
76pub(crate) struct UringBackingStore {
77 pages: Pages,
78 sender: crossbeam_channel::Sender<Request>,
80}
81
82#[derive(Clone, Default, Debug)]
84pub(crate) struct UringCfg {
85 pub coop_taskrun: bool,
86 pub defer_taskrun: bool,
87 pub iopoll: bool,
88 pub sqpoll: Option<u32>,
90}
91
92impl UringBackingStore {
93 pub fn new(file: File, pages: Pages, cfg: UringCfg) -> Self {
95 let (sender, receiver) = crossbeam_channel::unbounded::<Request>();
96 let pending: Arc<DashMap<u64, (Request, Instant)>> = Default::default();
97 let ring = {
98 let mut builder = IoUring::<SEntry, CEntry>::builder();
99 builder.setup_clamp();
100 if cfg.coop_taskrun {
101 builder.setup_coop_taskrun();
102 };
103 if cfg.defer_taskrun {
104 builder.setup_defer_taskrun();
105 };
106 if cfg.iopoll {
107 builder.setup_iopoll();
108 }
109 if let Some(sqpoll) = cfg.sqpoll {
110 builder.setup_sqpoll(sqpoll);
111 };
112 builder.build(134217728).unwrap()
113 };
114 ring
115 .submitter()
116 .register_files(&[file.as_raw_fd()])
117 .unwrap();
118 let ring = Arc::new(ring);
119 thread::spawn({
121 let pending = pending.clone();
122 let ring = ring.clone();
123 let mut msgbuf = VecDeque::new();
125 move || {
126 let mut submission = unsafe { ring.submission_shared() };
127 let mut next_id = 0;
128 while let Ok(init_msg) = receiver.recv() {
130 msgbuf.push_back(init_msg);
132 while let Ok(msg) = receiver.try_recv() {
133 msgbuf.push_back(msg);
134 }
135 while let Some(msg) = msgbuf.pop_front() {
140 let id = next_id;
141 next_id += 1;
142 trace!(id, typ = msg.to_string(), "submitting request");
143 let submission_entry = match &msg {
144 Request::Read { req, .. } => {
145 let ptr = req.out_buf.as_ptr() as *mut u8;
147 opcode::Read::new(types::Fixed(0), ptr, req.len)
148 .offset(req.offset)
149 .build()
150 .user_data(id)
151 }
152 Request::Write { req, .. } => {
153 let ptr = req.data.as_ptr() as *mut u8;
155 let len = u32!(req.data.len());
157 opcode::Write::new(types::Fixed(0), ptr, len)
158 .offset(req.offset)
159 .build()
160 .user_data(id)
161 }
162 Request::Sync { .. } => opcode::Fsync::new(types::Fixed(0)).build().user_data(id),
163 };
164 pending.insert(id, (msg, Instant::now()));
166 if submission.is_full() {
167 submission.sync();
168 ring.submit_and_wait(1).unwrap();
169 }
170 unsafe {
171 submission.push(&submission_entry).unwrap();
173 };
174 }
175 submission.sync();
176 ring.submit().unwrap();
178 }
179 }
180 });
181
182 thread::spawn({
184 let pending = pending.clone();
185 let ring = ring.clone();
186 move || {
187 let mut completion = unsafe { ring.completion_shared() };
188 loop {
190 let Some(e) = completion.next() else {
191 ring.submit_and_wait(1).unwrap();
192 completion.sync();
193 continue;
194 };
195 let id = e.user_data();
196 let (req, started) = pending.remove(&id).unwrap().1;
197 trace!(
198 id,
199 typ = req.to_string(),
200 exec_us = started.elapsed().as_micros(),
201 "completing request"
202 );
203 let rv = assert_result_is_ok(&req, e.result());
204 match req {
205 Request::Read { req, res } => {
206 assert_eq!(usz!(rv), req.out_buf.len());
208 res.signal(req.out_buf);
209 }
210 Request::Write { req, res } => {
211 assert_eq!(rv, u32!(req.data.len()));
213 res.signal(req.data);
214 }
215 Request::Sync { res } => {
216 res.signal(());
217 }
218 }
219 }
220 }
221 });
222
223 Self { pages, sender }
224 }
225}
226
227#[async_trait]
242impl BackingStore for UringBackingStore {
243 async fn read_at(&self, offset: u64, len: u64) -> Buf {
245 let out_buf = self.pages.allocate_uninitialised(len);
246 let (fut, fut_ctl) = SignalFuture::new();
247 self
248 .sender
249 .send(Request::Read {
250 req: ReadRequest {
251 out_buf,
252 offset,
253 len: u32!(len),
254 },
255 res: fut_ctl,
256 })
257 .unwrap();
258 fut.await
259 }
260
261 async fn write_at(&self, offset: u64, data: Buf) -> Buf {
264 let (fut, fut_ctl) = SignalFuture::new();
265 self
266 .sender
267 .send(Request::Write {
268 req: WriteRequest { offset, data },
269 res: fut_ctl,
270 })
271 .unwrap();
272 fut.await
273 }
274
275 async fn sync(&self) {
277 let (fut, fut_ctl) = SignalFuture::new();
278 self.sender.send(Request::Sync { res: fut_ctl }).unwrap();
279 fut.await
280 }
281}