1use std::{
16 fmt::Debug,
17 sync::{mpsc, Arc},
18};
19
20use core_affinity::CoreId;
21#[cfg(feature = "tracing")]
22use fastrace::prelude::*;
23use foyer_common::error::{Error, ErrorKind, Result};
24use futures_core::future::BoxFuture;
25use futures_util::FutureExt;
26use io_uring::{opcode, types::Fd, IoUring};
27use mea::oneshot;
28
29use crate::{
30 io::{
31 bytes::{IoB, IoBuf, IoBufMut},
32 device::Partition,
33 engine::{IoEngine, IoEngineBuildContext, IoEngineConfig, IoHandle},
34 },
35 RawFile,
36};
37
38#[derive(Debug)]
40pub struct UringIoEngineConfig {
41 threads: usize,
42 cpus: Vec<u32>,
43 io_depth: usize,
44 sqpoll: bool,
45 sqpoll_cpus: Vec<u32>,
46 sqpoll_idle: u32,
47 iopoll: bool,
48 weight: f64,
49
50 #[cfg(any(test, feature = "test_utils"))]
51 write_io_latency: Option<std::ops::Range<std::time::Duration>>,
52 #[cfg(any(test, feature = "test_utils"))]
53 read_io_latency: Option<std::ops::Range<std::time::Duration>>,
54}
55
56impl Default for UringIoEngineConfig {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62impl UringIoEngineConfig {
63 pub fn new() -> Self {
65 Self {
66 threads: 1,
67 cpus: vec![],
68 io_depth: 64,
69 sqpoll: false,
70 sqpoll_cpus: vec![],
71 sqpoll_idle: 10,
72 iopoll: false,
73 weight: 1.0,
74 #[cfg(any(test, feature = "test_utils"))]
75 write_io_latency: None,
76 #[cfg(any(test, feature = "test_utils"))]
77 read_io_latency: None,
78 }
79 }
80
81 pub fn with_threads(mut self, threads: usize) -> Self {
83 self.threads = threads;
84 self
85 }
86
87 pub fn with_cpus(mut self, cpus: Vec<u32>) -> Self {
91 self.cpus = cpus;
92 self
93 }
94
95 pub fn with_io_depth(mut self, io_depth: usize) -> Self {
97 self.io_depth = io_depth;
98 self
99 }
100
101 pub fn with_iopoll(mut self, iopoll: bool) -> Self {
117 self.iopoll = iopoll;
118 self
119 }
120
121 pub fn with_weight(mut self, weight: f64) -> Self {
125 self.weight = weight;
126 self
127 }
128
129 pub fn with_sqpoll(mut self, sqpoll: bool) -> Self {
142 self.sqpoll = sqpoll;
143 self
144 }
145
146 pub fn with_sqpoll_cpus(mut self, cpus: Vec<u32>) -> Self {
152 self.sqpoll_cpus = cpus;
153 self
154 }
155
156 pub fn with_sqpoll_idle(mut self, idle: u32) -> Self {
161 self.sqpoll_idle = idle;
162 self
163 }
164
165 #[cfg(any(test, feature = "test_utils"))]
167 pub fn with_write_io_latency(mut self, latency: std::ops::Range<std::time::Duration>) -> Self {
168 self.write_io_latency = Some(latency);
169 self
170 }
171
172 #[cfg(any(test, feature = "test_utils"))]
174 pub fn with_read_io_latency(mut self, latency: std::ops::Range<std::time::Duration>) -> Self {
175 self.read_io_latency = Some(latency);
176 self
177 }
178}
179
180impl IoEngineConfig for UringIoEngineConfig {
181 fn build(self: Box<Self>, _: IoEngineBuildContext) -> BoxFuture<'static, Result<Arc<dyn IoEngine>>> {
182 async move {
183 if self.threads == 0 {
184 return Err(Error::new(ErrorKind::Config, "shards must be greater than 0")
185 .with_context("threads", self.threads));
186 }
187
188 let (read_txs, read_rxs): (Vec<mpsc::SyncSender<_>>, Vec<mpsc::Receiver<_>>) = (0..self.threads)
189 .map(|_| {
190 let (tx, rx) = mpsc::sync_channel(4096);
191 (tx, rx)
192 })
193 .unzip();
194
195 let (write_txs, write_rxs): (Vec<mpsc::SyncSender<_>>, Vec<mpsc::Receiver<_>>) = (0..self.threads)
196 .map(|_| {
197 let (tx, rx) = mpsc::sync_channel(4096);
198 (tx, rx)
199 })
200 .unzip();
201
202 for (i, (read_rx, write_rx)) in read_rxs.into_iter().zip(write_rxs.into_iter()).enumerate() {
203 let mut builder = IoUring::builder();
204 if self.iopoll {
205 builder.setup_iopoll();
206 }
207 if self.sqpoll {
208 builder.setup_sqpoll(self.sqpoll_idle);
209 if !self.sqpoll_cpus.is_empty() {
210 let cpu = self.sqpoll_cpus[i];
211 builder.setup_sqpoll_cpu(cpu);
212 }
213 }
214 let cpu = if self.cpus.is_empty() { None } else { Some(self.cpus[i]) };
215 let uring = builder.build(self.io_depth as _).map_err(Error::io_error)?;
216 let shard = UringIoEngineShard {
217 read_rx,
218 write_rx,
219 uring,
220 io_depth: self.io_depth,
221 weight: self.weight,
222 read_inflight: 0,
223 write_inflight: 0,
224 };
225
226 std::thread::Builder::new()
227 .name(format!("foyer-uring-{i}"))
228 .spawn(move || {
229 if let Some(cpu) = cpu {
230 core_affinity::set_for_current(CoreId { id: cpu as _ });
231 }
232 shard.run();
233 })
234 .map_err(Error::io_error)?;
235 }
236
237 let engine = UringIoEngine { read_txs, write_txs };
238 let engine = Arc::new(engine);
239 Ok(engine as Arc<dyn IoEngine>)
240 }
241 .boxed()
242 }
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246enum UringIoType {
247 Read,
248 Write,
249}
250
251struct RawBuf {
252 ptr: *mut u8,
253 len: usize,
254}
255
256unsafe impl Send for RawBuf {}
257unsafe impl Sync for RawBuf {}
258
259struct RawFileAddress {
260 file: RawFile,
261 offset: u64,
262}
263
264struct UringIoCtx {
265 tx: oneshot::Sender<Result<()>>,
266 io_type: UringIoType,
267 rbuf: RawBuf,
268 addr: RawFileAddress,
269 #[cfg(feature = "tracing")]
270 span: fastrace::Span,
271}
272
273struct UringIoEngineShard {
274 read_rx: mpsc::Receiver<UringIoCtx>,
275 write_rx: mpsc::Receiver<UringIoCtx>,
276 weight: f64,
277 uring: IoUring,
278 io_depth: usize,
279 read_inflight: usize,
280 write_inflight: usize,
281}
282
283impl UringIoEngineShard {
284 fn run(mut self) {
285 loop {
286 'prepare: loop {
287 if self.read_inflight + self.write_inflight >= self.io_depth {
288 break 'prepare;
289 }
290
291 let ctx = if (self.read_inflight as f64) < self.write_inflight as f64 * self.weight {
292 match self.read_rx.try_recv() {
293 Err(mpsc::TryRecvError::Disconnected) => return,
294 Ok(ctx) => Some(ctx),
295 Err(mpsc::TryRecvError::Empty) => match self.write_rx.try_recv() {
296 Err(mpsc::TryRecvError::Disconnected) => return,
297 Ok(ctx) => Some(ctx),
298 Err(mpsc::TryRecvError::Empty) => None,
299 },
300 }
301 } else {
302 match self.write_rx.try_recv() {
303 Err(mpsc::TryRecvError::Disconnected) => return,
304 Ok(ctx) => Some(ctx),
305 Err(mpsc::TryRecvError::Empty) => match self.read_rx.try_recv() {
306 Err(mpsc::TryRecvError::Disconnected) => return,
307 Ok(ctx) => Some(ctx),
308 Err(mpsc::TryRecvError::Empty) => None,
309 },
310 }
311 };
312
313 let ctx = match ctx {
314 Some(ctx) => ctx,
315 None => break 'prepare,
316 };
317
318 let ctx = Box::new(ctx);
319
320 let fd = Fd(ctx.addr.file.0);
321 let sqe = match ctx.io_type {
322 UringIoType::Read => {
323 self.read_inflight += 1;
324 opcode::Read::new(fd, ctx.rbuf.ptr, ctx.rbuf.len as _)
325 .offset(ctx.addr.offset)
326 .build()
327 }
328 UringIoType::Write => {
329 self.write_inflight += 1;
330 opcode::Write::new(fd, ctx.rbuf.ptr, ctx.rbuf.len as _)
331 .offset(ctx.addr.offset)
332 .build()
333 }
334 };
335 let data = Box::into_raw(ctx) as u64;
336 let sqe = sqe.user_data(data);
337 unsafe { self.uring.submission().push(&sqe).unwrap() }
338 }
339
340 if self.read_inflight + self.write_inflight > 0 {
341 self.uring.submit().unwrap();
342 }
343
344 for cqe in self.uring.completion() {
345 let data = cqe.user_data();
346 let ctx = unsafe { Box::from_raw(data as *mut UringIoCtx) };
347
348 match ctx.io_type {
349 UringIoType::Read => self.read_inflight -= 1,
350 UringIoType::Write => self.write_inflight -= 1,
351 }
352
353 let res = cqe.result();
354 if res < 0 {
355 let err = Error::raw_os_io_error(res);
356 let _ = ctx.tx.send(Err(err));
357 } else {
358 let _ = ctx.tx.send(Ok(()));
359 }
360
361 #[cfg(feature = "tracing")]
362 drop(ctx.span);
363 }
364 }
365 }
366}
367
368pub struct UringIoEngine {
370 read_txs: Vec<mpsc::SyncSender<UringIoCtx>>,
371 write_txs: Vec<mpsc::SyncSender<UringIoCtx>>,
372}
373
374impl Debug for UringIoEngine {
375 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376 f.debug_struct("UringIoEngine").finish()
377 }
378}
379
380impl UringIoEngine {
381 #[cfg_attr(
382 feature = "tracing",
383 fastrace::trace(name = "foyer::storage::io::engine::uring::read")
384 )]
385 fn read(&self, buf: Box<dyn IoBufMut>, partition: &dyn Partition, offset: u64) -> IoHandle {
386 let (tx, rx) = oneshot::channel();
387 let shard = &self.read_txs[partition.id() as usize % self.read_txs.len()];
388 let (ptr, len) = buf.as_raw_parts();
389 let rbuf = RawBuf { ptr, len };
390 let (file, offset) = partition.translate(offset);
391 let addr = RawFileAddress { file, offset };
392 #[cfg(feature = "tracing")]
393 let span = Span::enter_with_local_parent("foyer::storage::io::engine::uring::read::io");
394 let _ = shard.send(UringIoCtx {
395 tx,
396 io_type: UringIoType::Read,
397 rbuf,
398 addr,
399 #[cfg(feature = "tracing")]
400 span,
401 });
402 async move {
403 let res = match rx.await {
404 Ok(res) => res,
405 Err(e) => Err(Error::new(ErrorKind::ChannelClosed, "io completion channel closed").with_source(e)),
406 };
407 let buf: Box<dyn IoB> = buf.into_iob();
408 (buf, res)
409 }
410 .boxed()
411 .into()
412 }
413
414 #[cfg_attr(
415 feature = "tracing",
416 fastrace::trace(name = "foyer::storage::io::engine::uring::write")
417 )]
418 fn write(&self, buf: Box<dyn IoBuf>, partition: &dyn Partition, offset: u64) -> IoHandle {
419 let (tx, rx) = oneshot::channel();
420 let shard = &self.write_txs[partition.id() as usize % self.write_txs.len()];
421 let (ptr, len) = buf.as_raw_parts();
422 let rbuf = RawBuf { ptr, len };
423 let (file, offset) = partition.translate(offset);
424 let addr = RawFileAddress { file, offset };
425 #[cfg(feature = "tracing")]
426 let span = Span::enter_with_local_parent("foyer::storage::io::engine::uring::write::io");
427 let _ = shard.send(UringIoCtx {
428 tx,
429 io_type: UringIoType::Write,
430 rbuf,
431 addr,
432 #[cfg(feature = "tracing")]
433 span,
434 });
435 async move {
436 let res = match rx.await {
437 Ok(res) => res,
438 Err(e) => Err(Error::new(ErrorKind::ChannelClosed, "io completion channel closed").with_source(e)),
439 };
440 let buf: Box<dyn IoB> = buf.into_iob();
441 (buf, res)
442 }
443 .boxed()
444 .into()
445 }
446}
447
448impl IoEngine for UringIoEngine {
449 fn read(&self, buf: Box<dyn IoBufMut>, partition: &dyn Partition, offset: u64) -> IoHandle {
450 self.read(buf, partition, offset)
451 }
452
453 fn write(&self, buf: Box<dyn IoBuf>, partition: &dyn Partition, offset: u64) -> IoHandle {
454 self.write(buf, partition, offset)
455 }
456}