foyer_storage/io/engine/
mod.rs1pub mod monitor;
16pub mod noop;
17pub mod psync;
18
19#[cfg(target_os = "linux")]
20pub mod uring;
21
22use std::{
23 fmt::Debug,
24 future::Future,
25 pin::Pin,
26 sync::Arc,
27 task::{ready, Context, Poll},
28};
29
30#[cfg(feature = "tracing")]
31use fastrace::{future::InSpan, prelude::*};
32use foyer_common::{error::Result, spawn::Spawner};
33use futures_core::future::BoxFuture;
34use pin_project::pin_project;
35
36use crate::io::{
37 bytes::{IoB, IoBuf, IoBufMut},
38 device::Partition,
39};
40
41#[cfg(not(feature = "tracing"))]
42type IoHandleInner = BoxFuture<'static, (Box<dyn IoB>, Result<()>)>;
43#[cfg(feature = "tracing")]
44type IoHandleInner = InSpan<BoxFuture<'static, (Box<dyn IoB>, Result<()>)>>;
45#[pin_project]
47pub struct IoHandle {
48 #[pin]
49 inner: IoHandleInner,
50 callback: Option<Box<dyn FnOnce() + Send + 'static>>,
51}
52
53impl Debug for IoHandle {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("IoHandle").finish()
56 }
57}
58
59#[cfg(not(feature = "tracing"))]
60impl From<BoxFuture<'static, (Box<dyn IoB>, Result<()>)>> for IoHandle {
61 fn from(inner: BoxFuture<'static, (Box<dyn IoB>, Result<()>)>) -> Self {
62 Self { inner, callback: None }
63 }
64}
65
66#[cfg(feature = "tracing")]
67impl From<BoxFuture<'static, (Box<dyn IoB>, Result<()>)>> for IoHandle {
68 fn from(inner: BoxFuture<'static, (Box<dyn IoB>, Result<()>)>) -> Self {
69 let inner = inner.in_span(Span::enter_with_local_parent("foyer::storage::io::io_handle"));
70 Self { inner, callback: None }
71 }
72}
73
74impl IoHandle {
75 pub(crate) fn with_callback<F>(mut self, callback: F) -> Self
76 where
77 F: FnOnce() + Send + 'static,
78 {
79 assert!(self.callback.is_none(), "io handle callback can only be set once");
80 self.callback = Some(Box::new(callback));
81 self
82 }
83}
84
85impl Future for IoHandle {
86 type Output = (Box<dyn IoB>, Result<()>);
87
88 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89 let this = self.project();
90 let res = ready!(this.inner.poll(cx));
91 if let Some(callback) = this.callback.take() {
92 callback();
93 }
94 Poll::Ready(res)
95 }
96}
97
98pub struct IoEngineBuildContext {
100 pub spawner: Spawner,
102}
103
104pub trait IoEngineConfig: Send + Sync + 'static + Debug {
106 fn build(self: Box<Self>, ctx: IoEngineBuildContext) -> BoxFuture<'static, Result<Arc<dyn IoEngine>>>;
108
109 fn boxed(self) -> Box<Self>
111 where
112 Self: Sized,
113 {
114 Box::new(self)
115 }
116}
117
118pub trait IoEngine: Send + Sync + 'static + Debug {
120 fn read(&self, buf: Box<dyn IoBufMut>, partition: &dyn Partition, offset: u64) -> IoHandle;
122 fn write(&self, buf: Box<dyn IoBuf>, partition: &dyn Partition, offset: u64) -> IoHandle;
124}
125
126#[cfg(test)]
127mod tests {
128 use std::path::Path;
129
130 use rand::{rng, Fill};
131 use tempfile::tempdir;
132
133 use super::*;
134 #[cfg(not(madsim))]
135 #[cfg(target_os = "linux")]
136 use crate::io::engine::uring::UringIoEngineConfig;
137 use crate::io::{
138 bytes::IoSliceMut,
139 device::{file::FileDeviceBuilder, Device, DeviceBuilder},
140 engine::psync::PsyncIoEngineConfig,
141 };
142
143 const KIB: usize = 1024;
144 const MIB: usize = 1024 * 1024;
145
146 fn build_test_file_device(path: impl AsRef<Path>) -> Result<Arc<dyn Device>> {
147 let device = FileDeviceBuilder::new(&path).with_capacity(16 * MIB).build()?;
148 for _ in 0..16 {
149 device.create_partition(MIB)?;
150 }
151 Ok(device)
152 }
153
154 async fn test_read_write(engine: Arc<dyn IoEngine>, device: &dyn Device) {
155 let mut b1 = Box::new(IoSliceMut::new(16 * KIB));
156 Fill::fill(&mut b1[..], &mut rng());
157
158 let (b1, res) = engine.write(b1, device.partition(0).as_ref(), 0).await;
159 res.unwrap();
160 let b1 = b1.try_into_io_slice_mut().unwrap();
161
162 let b2 = Box::new(IoSliceMut::new(16 * KIB));
163 let (b2, res) = engine.read(b2, device.partition(0).as_ref(), 0).await;
164 res.unwrap();
165 let b2 = b2.try_into_io_slice_mut().unwrap();
166 assert_eq!(b1, b2);
167 }
168
169 #[test_log::test(tokio::test)]
170 async fn test_io_engine() {
171 let dir = tempdir().unwrap();
172
173 #[cfg(not(madsim))]
174 #[cfg(target_os = "linux")]
175 {
176 let path = dir.path().join("test_file_1");
177 let device = build_test_file_device(&path).unwrap();
178 let engine = UringIoEngineConfig::new()
179 .with_threads(4)
180 .with_io_depth(64)
181 .boxed()
182 .build(IoEngineBuildContext {
183 spawner: Spawner::current(),
184 })
185 .await
186 .unwrap();
187 test_read_write(engine, device.as_ref()).await;
188 }
189
190 let path = dir.path().join("test_file_1");
191 let device = build_test_file_device(&path).unwrap();
192 let engine = PsyncIoEngineConfig::new()
193 .boxed()
194 .build(IoEngineBuildContext {
195 spawner: Spawner::current(),
196 })
197 .await
198 .unwrap();
199 test_read_write(engine, device.as_ref()).await;
200 }
201}