foyer_storage/io/engine/
uring.rs

1// Copyright 2026 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    fmt::Debug,
17    sync::{mpsc, Arc},
18};
19
20use core_affinity::CoreId;
21#[cfg(feature = "tracing")]
22use fastrace::prelude::*;
23use foyer_common::error::{Error, ErrorKind, Result};
24use futures_core::future::BoxFuture;
25use futures_util::FutureExt;
26use io_uring::{opcode, types::Fd, IoUring};
27use mea::oneshot;
28
29use crate::{
30    io::{
31        bytes::{IoB, IoBuf, IoBufMut},
32        device::Partition,
33        engine::{IoEngine, IoEngineBuildContext, IoEngineConfig, IoHandle},
34    },
35    RawFile,
36};
37
38/// Config for io_uring based I/O engine.
39#[derive(Debug)]
40pub struct UringIoEngineConfig {
41    threads: usize,
42    cpus: Vec<u32>,
43    io_depth: usize,
44    sqpoll: bool,
45    sqpoll_cpus: Vec<u32>,
46    sqpoll_idle: u32,
47    iopoll: bool,
48    weight: f64,
49
50    #[cfg(any(test, feature = "test_utils"))]
51    write_io_latency: Option<std::ops::Range<std::time::Duration>>,
52    #[cfg(any(test, feature = "test_utils"))]
53    read_io_latency: Option<std::ops::Range<std::time::Duration>>,
54}
55
56impl Default for UringIoEngineConfig {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62impl UringIoEngineConfig {
63    /// Create a new io_uring based I/O engine config with default configurations.
64    pub fn new() -> Self {
65        Self {
66            threads: 1,
67            cpus: vec![],
68            io_depth: 64,
69            sqpoll: false,
70            sqpoll_cpus: vec![],
71            sqpoll_idle: 10,
72            iopoll: false,
73            weight: 1.0,
74            #[cfg(any(test, feature = "test_utils"))]
75            write_io_latency: None,
76            #[cfg(any(test, feature = "test_utils"))]
77            read_io_latency: None,
78        }
79    }
80
81    /// Set the number of threads to use for the I/O engine.
82    pub fn with_threads(mut self, threads: usize) -> Self {
83        self.threads = threads;
84        self
85    }
86
87    /// Bind the engine threads to specific CPUs.
88    ///
89    /// The length of `cpus` must be equal to the threads.
90    pub fn with_cpus(mut self, cpus: Vec<u32>) -> Self {
91        self.cpus = cpus;
92        self
93    }
94
95    /// Set the I/O depth for each thread.
96    pub fn with_io_depth(mut self, io_depth: usize) -> Self {
97        self.io_depth = io_depth;
98        self
99    }
100
101    /// Enable or disable I/O polling.
102    ///
103    /// FYI:
104    ///
105    /// - [io_uring_setup(2)](https://man7.org/linux/man-pages/man2/io_uring_setup.2.html)
106    /// - [crate - io-uring](https://docs.rs/io-uring/latest/io_uring/struct.Builder.html#method.setup_iopoll)
107    ///
108    /// Related syscall flag: `IORING_SETUP_IOPOLL`.
109    ///
110    /// NOTE:
111    ///
112    /// - If this feature is enabled, the underlying device MUST be opened with the `O_DIRECT` flag.
113    /// - If this feature is enabled, the underlying device MUST support io polling.
114    ///
115    /// Default: `false`.
116    pub fn with_iopoll(mut self, iopoll: bool) -> Self {
117        self.iopoll = iopoll;
118        self
119    }
120
121    /// Set the weight of read/write priorities.
122    ///
123    /// The engine will try to keep the read/write iodepth ratio as close to the specified weight as possible.
124    pub fn with_weight(mut self, weight: f64) -> Self {
125        self.weight = weight;
126        self
127    }
128
129    /// Enable or disable SQ polling.
130    ///
131    /// FYI:
132    ///
133    /// - [io_uring_setup(2)](https://man7.org/linux/man-pages/man2/io_uring_setup.2.html)
134    /// - [crate - io-uring](https://docs.rs/io-uring/latest/io_uring/struct.Builder.html#method.setup_sqpoll)
135    ///
136    /// Related syscall flag: `IORING_SETUP_IOPOLL`.
137    ///
138    /// NOTE: If this feature is enabled, the underlying device must be opened with the `O_DIRECT` flag.
139    ///
140    /// Default: `false`.
141    pub fn with_sqpoll(mut self, sqpoll: bool) -> Self {
142        self.sqpoll = sqpoll;
143        self
144    }
145
146    /// Bind the kernel’s SQ poll thread to the specified cpu.
147    ///
148    /// This flag is only meaningful when [`Self::with_sqpoll`] is enabled.
149    ///
150    /// The length of `cpus` must be equal to the number of threads.
151    pub fn with_sqpoll_cpus(mut self, cpus: Vec<u32>) -> Self {
152        self.sqpoll_cpus = cpus;
153        self
154    }
155
156    /// After idle milliseconds, the kernel thread will go to sleep and you will have to wake it up again with a system
157    /// call.
158    ///
159    /// This flag is only meaningful when [`Self::with_sqpoll`] is enabled.
160    pub fn with_sqpoll_idle(mut self, idle: u32) -> Self {
161        self.sqpoll_idle = idle;
162        self
163    }
164
165    /// Set the simulated additional write I/O latency for testing purposes.
166    #[cfg(any(test, feature = "test_utils"))]
167    pub fn with_write_io_latency(mut self, latency: std::ops::Range<std::time::Duration>) -> Self {
168        self.write_io_latency = Some(latency);
169        self
170    }
171
172    /// Set the simulated additional read I/O latency for testing purposes.
173    #[cfg(any(test, feature = "test_utils"))]
174    pub fn with_read_io_latency(mut self, latency: std::ops::Range<std::time::Duration>) -> Self {
175        self.read_io_latency = Some(latency);
176        self
177    }
178}
179
180impl IoEngineConfig for UringIoEngineConfig {
181    fn build(self: Box<Self>, _: IoEngineBuildContext) -> BoxFuture<'static, Result<Arc<dyn IoEngine>>> {
182        async move {
183            if self.threads == 0 {
184                return Err(Error::new(ErrorKind::Config, "shards must be greater than 0")
185                    .with_context("threads", self.threads));
186            }
187
188            let (read_txs, read_rxs): (Vec<mpsc::SyncSender<_>>, Vec<mpsc::Receiver<_>>) = (0..self.threads)
189                .map(|_| {
190                    let (tx, rx) = mpsc::sync_channel(4096);
191                    (tx, rx)
192                })
193                .unzip();
194
195            let (write_txs, write_rxs): (Vec<mpsc::SyncSender<_>>, Vec<mpsc::Receiver<_>>) = (0..self.threads)
196                .map(|_| {
197                    let (tx, rx) = mpsc::sync_channel(4096);
198                    (tx, rx)
199                })
200                .unzip();
201
202            for (i, (read_rx, write_rx)) in read_rxs.into_iter().zip(write_rxs.into_iter()).enumerate() {
203                let mut builder = IoUring::builder();
204                if self.iopoll {
205                    builder.setup_iopoll();
206                }
207                if self.sqpoll {
208                    builder.setup_sqpoll(self.sqpoll_idle);
209                    if !self.sqpoll_cpus.is_empty() {
210                        let cpu = self.sqpoll_cpus[i];
211                        builder.setup_sqpoll_cpu(cpu);
212                    }
213                }
214                let cpu = if self.cpus.is_empty() { None } else { Some(self.cpus[i]) };
215                let uring = builder.build(self.io_depth as _).map_err(Error::io_error)?;
216                let shard = UringIoEngineShard {
217                    read_rx,
218                    write_rx,
219                    uring,
220                    io_depth: self.io_depth,
221                    weight: self.weight,
222                    read_inflight: 0,
223                    write_inflight: 0,
224                };
225
226                std::thread::Builder::new()
227                    .name(format!("foyer-uring-{i}"))
228                    .spawn(move || {
229                        if let Some(cpu) = cpu {
230                            core_affinity::set_for_current(CoreId { id: cpu as _ });
231                        }
232                        shard.run();
233                    })
234                    .map_err(Error::io_error)?;
235            }
236
237            let engine = UringIoEngine { read_txs, write_txs };
238            let engine = Arc::new(engine);
239            Ok(engine as Arc<dyn IoEngine>)
240        }
241        .boxed()
242    }
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246enum UringIoType {
247    Read,
248    Write,
249}
250
251struct RawBuf {
252    ptr: *mut u8,
253    len: usize,
254}
255
256unsafe impl Send for RawBuf {}
257unsafe impl Sync for RawBuf {}
258
259struct RawFileAddress {
260    file: RawFile,
261    offset: u64,
262}
263
264struct UringIoCtx {
265    tx: oneshot::Sender<Result<()>>,
266    io_type: UringIoType,
267    rbuf: RawBuf,
268    addr: RawFileAddress,
269    #[cfg(feature = "tracing")]
270    span: fastrace::Span,
271}
272
273struct UringIoEngineShard {
274    read_rx: mpsc::Receiver<UringIoCtx>,
275    write_rx: mpsc::Receiver<UringIoCtx>,
276    weight: f64,
277    uring: IoUring,
278    io_depth: usize,
279    read_inflight: usize,
280    write_inflight: usize,
281}
282
283impl UringIoEngineShard {
284    fn run(mut self) {
285        loop {
286            'prepare: loop {
287                if self.read_inflight + self.write_inflight >= self.io_depth {
288                    break 'prepare;
289                }
290
291                let ctx = if (self.read_inflight as f64) < self.write_inflight as f64 * self.weight {
292                    match self.read_rx.try_recv() {
293                        Err(mpsc::TryRecvError::Disconnected) => return,
294                        Ok(ctx) => Some(ctx),
295                        Err(mpsc::TryRecvError::Empty) => match self.write_rx.try_recv() {
296                            Err(mpsc::TryRecvError::Disconnected) => return,
297                            Ok(ctx) => Some(ctx),
298                            Err(mpsc::TryRecvError::Empty) => None,
299                        },
300                    }
301                } else {
302                    match self.write_rx.try_recv() {
303                        Err(mpsc::TryRecvError::Disconnected) => return,
304                        Ok(ctx) => Some(ctx),
305                        Err(mpsc::TryRecvError::Empty) => match self.read_rx.try_recv() {
306                            Err(mpsc::TryRecvError::Disconnected) => return,
307                            Ok(ctx) => Some(ctx),
308                            Err(mpsc::TryRecvError::Empty) => None,
309                        },
310                    }
311                };
312
313                let ctx = match ctx {
314                    Some(ctx) => ctx,
315                    None => break 'prepare,
316                };
317
318                let ctx = Box::new(ctx);
319
320                let fd = Fd(ctx.addr.file.0);
321                let sqe = match ctx.io_type {
322                    UringIoType::Read => {
323                        self.read_inflight += 1;
324                        opcode::Read::new(fd, ctx.rbuf.ptr, ctx.rbuf.len as _)
325                            .offset(ctx.addr.offset)
326                            .build()
327                    }
328                    UringIoType::Write => {
329                        self.write_inflight += 1;
330                        opcode::Write::new(fd, ctx.rbuf.ptr, ctx.rbuf.len as _)
331                            .offset(ctx.addr.offset)
332                            .build()
333                    }
334                };
335                let data = Box::into_raw(ctx) as u64;
336                let sqe = sqe.user_data(data);
337                unsafe { self.uring.submission().push(&sqe).unwrap() }
338            }
339
340            if self.read_inflight + self.write_inflight > 0 {
341                self.uring.submit().unwrap();
342            }
343
344            for cqe in self.uring.completion() {
345                let data = cqe.user_data();
346                let ctx = unsafe { Box::from_raw(data as *mut UringIoCtx) };
347
348                match ctx.io_type {
349                    UringIoType::Read => self.read_inflight -= 1,
350                    UringIoType::Write => self.write_inflight -= 1,
351                }
352
353                let res = cqe.result();
354                if res < 0 {
355                    let err = Error::raw_os_io_error(res);
356                    let _ = ctx.tx.send(Err(err));
357                } else {
358                    let _ = ctx.tx.send(Ok(()));
359                }
360
361                #[cfg(feature = "tracing")]
362                drop(ctx.span);
363            }
364        }
365    }
366}
367
368/// The io_uring based I/O engine.
369pub struct UringIoEngine {
370    read_txs: Vec<mpsc::SyncSender<UringIoCtx>>,
371    write_txs: Vec<mpsc::SyncSender<UringIoCtx>>,
372}
373
374impl Debug for UringIoEngine {
375    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376        f.debug_struct("UringIoEngine").finish()
377    }
378}
379
380impl UringIoEngine {
381    #[cfg_attr(
382        feature = "tracing",
383        fastrace::trace(name = "foyer::storage::io::engine::uring::read")
384    )]
385    fn read(&self, buf: Box<dyn IoBufMut>, partition: &dyn Partition, offset: u64) -> IoHandle {
386        let (tx, rx) = oneshot::channel();
387        let shard = &self.read_txs[partition.id() as usize % self.read_txs.len()];
388        let (ptr, len) = buf.as_raw_parts();
389        let rbuf = RawBuf { ptr, len };
390        let (file, offset) = partition.translate(offset);
391        let addr = RawFileAddress { file, offset };
392        #[cfg(feature = "tracing")]
393        let span = Span::enter_with_local_parent("foyer::storage::io::engine::uring::read::io");
394        let _ = shard.send(UringIoCtx {
395            tx,
396            io_type: UringIoType::Read,
397            rbuf,
398            addr,
399            #[cfg(feature = "tracing")]
400            span,
401        });
402        async move {
403            let res = match rx.await {
404                Ok(res) => res,
405                Err(e) => Err(Error::new(ErrorKind::ChannelClosed, "io completion channel closed").with_source(e)),
406            };
407            let buf: Box<dyn IoB> = buf.into_iob();
408            (buf, res)
409        }
410        .boxed()
411        .into()
412    }
413
414    #[cfg_attr(
415        feature = "tracing",
416        fastrace::trace(name = "foyer::storage::io::engine::uring::write")
417    )]
418    fn write(&self, buf: Box<dyn IoBuf>, partition: &dyn Partition, offset: u64) -> IoHandle {
419        let (tx, rx) = oneshot::channel();
420        let shard = &self.write_txs[partition.id() as usize % self.write_txs.len()];
421        let (ptr, len) = buf.as_raw_parts();
422        let rbuf = RawBuf { ptr, len };
423        let (file, offset) = partition.translate(offset);
424        let addr = RawFileAddress { file, offset };
425        #[cfg(feature = "tracing")]
426        let span = Span::enter_with_local_parent("foyer::storage::io::engine::uring::write::io");
427        let _ = shard.send(UringIoCtx {
428            tx,
429            io_type: UringIoType::Write,
430            rbuf,
431            addr,
432            #[cfg(feature = "tracing")]
433            span,
434        });
435        async move {
436            let res = match rx.await {
437                Ok(res) => res,
438                Err(e) => Err(Error::new(ErrorKind::ChannelClosed, "io completion channel closed").with_source(e)),
439            };
440            let buf: Box<dyn IoB> = buf.into_iob();
441            (buf, res)
442        }
443        .boxed()
444        .into()
445    }
446}
447
448impl IoEngine for UringIoEngine {
449    fn read(&self, buf: Box<dyn IoBufMut>, partition: &dyn Partition, offset: u64) -> IoHandle {
450        self.read(buf, partition, offset)
451    }
452
453    fn write(&self, buf: Box<dyn IoBuf>, partition: &dyn Partition, offset: u64) -> IoHandle {
454        self.write(buf, partition, offset)
455    }
456}