foyer_storage/io/engine/
mod.rs

1// Copyright 2026 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod monitor;
16pub mod noop;
17pub mod psync;
18
19#[cfg(target_os = "linux")]
20pub mod uring;
21
22use std::{
23    fmt::Debug,
24    future::Future,
25    pin::Pin,
26    sync::Arc,
27    task::{ready, Context, Poll},
28};
29
30#[cfg(feature = "tracing")]
31use fastrace::{future::InSpan, prelude::*};
32use foyer_common::{error::Result, spawn::Spawner};
33use futures_core::future::BoxFuture;
34use pin_project::pin_project;
35
36use crate::io::{
37    bytes::{IoB, IoBuf, IoBufMut},
38    device::Partition,
39};
40
41#[cfg(not(feature = "tracing"))]
42type IoHandleInner = BoxFuture<'static, (Box<dyn IoB>, Result<()>)>;
43#[cfg(feature = "tracing")]
44type IoHandleInner = InSpan<BoxFuture<'static, (Box<dyn IoB>, Result<()>)>>;
45/// A detached I/O handle that can be polled for completion.
46#[pin_project]
47pub struct IoHandle {
48    #[pin]
49    inner: IoHandleInner,
50    callback: Option<Box<dyn FnOnce() + Send + 'static>>,
51}
52
53impl Debug for IoHandle {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("IoHandle").finish()
56    }
57}
58
59#[cfg(not(feature = "tracing"))]
60impl From<BoxFuture<'static, (Box<dyn IoB>, Result<()>)>> for IoHandle {
61    fn from(inner: BoxFuture<'static, (Box<dyn IoB>, Result<()>)>) -> Self {
62        Self { inner, callback: None }
63    }
64}
65
66#[cfg(feature = "tracing")]
67impl From<BoxFuture<'static, (Box<dyn IoB>, Result<()>)>> for IoHandle {
68    fn from(inner: BoxFuture<'static, (Box<dyn IoB>, Result<()>)>) -> Self {
69        let inner = inner.in_span(Span::enter_with_local_parent("foyer::storage::io::io_handle"));
70        Self { inner, callback: None }
71    }
72}
73
74impl IoHandle {
75    pub(crate) fn with_callback<F>(mut self, callback: F) -> Self
76    where
77        F: FnOnce() + Send + 'static,
78    {
79        assert!(self.callback.is_none(), "io handle callback can only be set once");
80        self.callback = Some(Box::new(callback));
81        self
82    }
83}
84
85impl Future for IoHandle {
86    type Output = (Box<dyn IoB>, Result<()>);
87
88    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89        let this = self.project();
90        let res = ready!(this.inner.poll(cx));
91        if let Some(callback) = this.callback.take() {
92            callback();
93        }
94        Poll::Ready(res)
95    }
96}
97
98/// Context for building the disk cache io engine.
99pub struct IoEngineBuildContext {
100    /// The runtime for the disk cache engine.
101    pub spawner: Spawner,
102}
103
104/// I/O engine config trait.
105pub trait IoEngineConfig: Send + Sync + 'static + Debug {
106    /// Build an I/O engine from the given configuration.
107    fn build(self: Box<Self>, ctx: IoEngineBuildContext) -> BoxFuture<'static, Result<Arc<dyn IoEngine>>>;
108
109    /// Box the config.
110    fn boxed(self) -> Box<Self>
111    where
112        Self: Sized,
113    {
114        Box::new(self)
115    }
116}
117
118/// I/O engine builder trait.
119pub trait IoEngine: Send + Sync + 'static + Debug {
120    /// Read data into the buffer from the specified partition and offset.
121    fn read(&self, buf: Box<dyn IoBufMut>, partition: &dyn Partition, offset: u64) -> IoHandle;
122    /// Write data from the buffer to the specified block and offset.
123    fn write(&self, buf: Box<dyn IoBuf>, partition: &dyn Partition, offset: u64) -> IoHandle;
124}
125
126#[cfg(test)]
127mod tests {
128    use std::path::Path;
129
130    use rand::{rng, Fill};
131    use tempfile::tempdir;
132
133    use super::*;
134    #[cfg(not(madsim))]
135    #[cfg(target_os = "linux")]
136    use crate::io::engine::uring::UringIoEngineConfig;
137    use crate::io::{
138        bytes::IoSliceMut,
139        device::{file::FileDeviceBuilder, Device, DeviceBuilder},
140        engine::psync::PsyncIoEngineConfig,
141    };
142
143    const KIB: usize = 1024;
144    const MIB: usize = 1024 * 1024;
145
146    fn build_test_file_device(path: impl AsRef<Path>) -> Result<Arc<dyn Device>> {
147        let device = FileDeviceBuilder::new(&path).with_capacity(16 * MIB).build()?;
148        for _ in 0..16 {
149            device.create_partition(MIB)?;
150        }
151        Ok(device)
152    }
153
154    async fn test_read_write(engine: Arc<dyn IoEngine>, device: &dyn Device) {
155        let mut b1 = Box::new(IoSliceMut::new(16 * KIB));
156        Fill::fill(&mut b1[..], &mut rng());
157
158        let (b1, res) = engine.write(b1, device.partition(0).as_ref(), 0).await;
159        res.unwrap();
160        let b1 = b1.try_into_io_slice_mut().unwrap();
161
162        let b2 = Box::new(IoSliceMut::new(16 * KIB));
163        let (b2, res) = engine.read(b2, device.partition(0).as_ref(), 0).await;
164        res.unwrap();
165        let b2 = b2.try_into_io_slice_mut().unwrap();
166        assert_eq!(b1, b2);
167    }
168
169    #[test_log::test(tokio::test)]
170    async fn test_io_engine() {
171        let dir = tempdir().unwrap();
172
173        #[cfg(not(madsim))]
174        #[cfg(target_os = "linux")]
175        {
176            let path = dir.path().join("test_file_1");
177            let device = build_test_file_device(&path).unwrap();
178            let engine = UringIoEngineConfig::new()
179                .with_threads(4)
180                .with_io_depth(64)
181                .boxed()
182                .build(IoEngineBuildContext {
183                    spawner: Spawner::current(),
184                })
185                .await
186                .unwrap();
187            test_read_write(engine, device.as_ref()).await;
188        }
189
190        let path = dir.path().join("test_file_1");
191        let device = build_test_file_device(&path).unwrap();
192        let engine = PsyncIoEngineConfig::new()
193            .boxed()
194            .build(IoEngineBuildContext {
195                spawner: Spawner::current(),
196            })
197            .await
198            .unwrap();
199        test_read_write(engine, device.as_ref()).await;
200    }
201}