foyer_storage/io/engine/
psync.rs1use std::{
16 fmt::Debug,
17 fs::File,
18 mem::ManuallyDrop,
19 ops::{Deref, DerefMut},
20 sync::Arc,
21};
22
23#[cfg(feature = "tracing")]
24use fastrace::prelude::*;
25use foyer_common::{
26 error::{Error, Result},
27 spawn::Spawner,
28};
29use futures_core::future::BoxFuture;
30use futures_util::FutureExt;
31
32use crate::{
33 io::{
34 bytes::{IoB, IoBuf, IoBufMut, Raw},
35 device::Partition,
36 engine::{IoEngine, IoEngineBuildContext, IoEngineConfig, IoHandle},
37 },
38 RawFile,
39};
40
41#[derive(Debug)]
42struct FileHandle(ManuallyDrop<File>);
43
44#[cfg(target_family = "windows")]
45impl From<RawFile> for FileHandle {
46 fn from(raw: RawFile) -> Self {
47 use std::os::windows::io::FromRawHandle;
48 let file = unsafe { File::from_raw_handle(raw.0) };
49 let file = ManuallyDrop::new(file);
50 Self(file)
51 }
52}
53
54#[cfg(target_family = "unix")]
55impl From<RawFile> for FileHandle {
56 fn from(raw: RawFile) -> Self {
57 use std::os::unix::io::FromRawFd;
58 let file = unsafe { File::from_raw_fd(raw.0) };
59 let file = ManuallyDrop::new(file);
60 Self(file)
61 }
62}
63
64impl Deref for FileHandle {
65 type Target = File;
66
67 fn deref(&self) -> &Self::Target {
68 &self.0
69 }
70}
71
72impl DerefMut for FileHandle {
73 fn deref_mut(&mut self) -> &mut Self::Target {
74 &mut self.0
75 }
76}
77
78#[derive(Debug)]
80pub struct PsyncIoEngineConfig {
81 #[cfg(any(test, feature = "test_utils"))]
82 write_io_latency: Option<std::ops::Range<std::time::Duration>>,
83
84 #[cfg(any(test, feature = "test_utils"))]
85 read_io_latency: Option<std::ops::Range<std::time::Duration>>,
86}
87
88impl Default for PsyncIoEngineConfig {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl From<PsyncIoEngineConfig> for Box<dyn IoEngineConfig> {
95 fn from(builder: PsyncIoEngineConfig) -> Self {
96 builder.boxed()
97 }
98}
99
100impl PsyncIoEngineConfig {
101 pub fn new() -> Self {
103 Self {
104 #[cfg(any(test, feature = "test_utils"))]
105 write_io_latency: None,
106 #[cfg(any(test, feature = "test_utils"))]
107 read_io_latency: None,
108 }
109 }
110
111 #[cfg(any(test, feature = "test_utils"))]
113 pub fn with_write_io_latency(mut self, latency: std::ops::Range<std::time::Duration>) -> Self {
114 self.write_io_latency = Some(latency);
115 self
116 }
117
118 #[cfg(any(test, feature = "test_utils"))]
120 pub fn with_read_io_latency(mut self, latency: std::ops::Range<std::time::Duration>) -> Self {
121 self.read_io_latency = Some(latency);
122 self
123 }
124}
125
126impl IoEngineConfig for PsyncIoEngineConfig {
127 fn build(self: Box<Self>, ctx: IoEngineBuildContext) -> BoxFuture<'static, Result<Arc<dyn IoEngine>>> {
128 async move {
129 let engine = PsyncIoEngine {
130 spawner: ctx.spawner,
131 #[cfg(any(test, feature = "test_utils"))]
132 write_io_latency: None,
133 #[cfg(any(test, feature = "test_utils"))]
134 read_io_latency: None,
135 };
136 let engine: Arc<dyn IoEngine> = Arc::new(engine);
137 Ok(engine)
138 }
139 .boxed()
140 }
141}
142
143pub struct PsyncIoEngine {
145 spawner: Spawner,
146
147 #[cfg(any(test, feature = "test_utils"))]
148 write_io_latency: Option<std::ops::Range<std::time::Duration>>,
149 #[cfg(any(test, feature = "test_utils"))]
150 read_io_latency: Option<std::ops::Range<std::time::Duration>>,
151}
152
153impl Debug for PsyncIoEngine {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 f.debug_struct("PsyncIoEngine").finish()
156 }
157}
158
159impl IoEngine for PsyncIoEngine {
160 #[cfg_attr(
161 feature = "tracing",
162 fastrace::trace(name = "foyer::storage::io::engine::psync::read")
163 )]
164 fn read(&self, buf: Box<dyn IoBufMut>, partition: &dyn Partition, offset: u64) -> IoHandle {
165 let (raw, offset) = partition.translate(offset);
166 let file = FileHandle::from(raw);
167 let runtime = self.spawner.clone();
168
169 #[cfg(feature = "tracing")]
170 let span = Span::enter_with_local_parent("foyer::storage::io::engine::psync::read::io");
171
172 #[cfg(any(test, feature = "test_utils"))]
173 let read_io_latency = self.read_io_latency.clone();
174 async move {
175 let (buf, res) = match runtime
176 .spawn_blocking(move || {
177 let (ptr, len) = buf.as_raw_parts();
178 let slice = unsafe { std::slice::from_raw_parts_mut(ptr, len) };
179 let res = {
180 #[cfg(target_family = "windows")]
181 {
182 use std::os::windows::fs::FileExt;
183 file.seek_read(slice, offset).map(|_| ()).map_err(Error::io_error)
184 }
185 #[cfg(target_family = "unix")]
186 {
187 use std::os::unix::fs::FileExt;
188 file.read_exact_at(slice, offset).map_err(Error::io_error)
189 }
190 };
191 #[cfg(any(test, feature = "test_utils"))]
192 if let Some(lat) = read_io_latency {
193 std::thread::sleep(rand::random_range(lat));
194 }
195 (buf, res)
196 })
197 .await
198 {
199 Ok((buf, res)) => {
200 #[cfg(feature = "tracing")]
201 drop(span);
202 (buf, res)
203 }
204 Err(e) => return (Box::new(Raw::new(0)) as Box<dyn IoB>, Err(e)),
205 };
206 let buf: Box<dyn IoB> = buf.into_iob();
207 (buf, res)
208 }
209 .boxed()
210 .into()
211 }
212
213 #[cfg_attr(
214 feature = "tracing",
215 fastrace::trace(name = "foyer::storage::io::engine::psync::write")
216 )]
217 fn write(&self, buf: Box<dyn IoBuf>, partition: &dyn Partition, offset: u64) -> IoHandle {
218 let (raw, offset) = partition.translate(offset);
219 let file = FileHandle::from(raw);
220 let runtime = self.spawner.clone();
221
222 #[cfg(feature = "tracing")]
223 let span = Span::enter_with_local_parent("foyer::storage::io::engine::psync::write::io");
224
225 #[cfg(any(test, feature = "test_utils"))]
226 let write_io_latency = self.write_io_latency.clone();
227 async move {
228 let (buf, res) = match runtime
229 .spawn_blocking(move || {
230 let (ptr, len) = buf.as_raw_parts();
231 let slice = unsafe { std::slice::from_raw_parts(ptr, len) };
232 let res = {
233 #[cfg(target_family = "windows")]
234 {
235 use std::os::windows::fs::FileExt;
236 file.seek_write(slice, offset).map(|_| ()).map_err(Error::io_error)
237 }
238 #[cfg(target_family = "unix")]
239 {
240 use std::os::unix::fs::FileExt;
241 file.write_all_at(slice, offset).map_err(Error::io_error)
242 }
243 };
244 #[cfg(any(test, feature = "test_utils"))]
245 if let Some(lat) = write_io_latency {
246 std::thread::sleep(rand::random_range(lat));
247 }
248 (buf, res)
249 })
250 .await
251 {
252 Ok((buf, res)) => {
253 #[cfg(feature = "tracing")]
254 drop(span);
255 (buf, res)
256 }
257 Err(e) => return (Box::new(Raw::new(0)) as Box<dyn IoB>, Err(e)),
258 };
259 let buf: Box<dyn IoB> = buf.into_iob();
260 (buf, res)
261 }
262 .boxed()
263 .into()
264 }
265}