foyer_storage/io/engine/
psync.rs

1// Copyright 2026 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    fmt::Debug,
17    fs::File,
18    mem::ManuallyDrop,
19    ops::{Deref, DerefMut},
20    sync::Arc,
21};
22
23#[cfg(feature = "tracing")]
24use fastrace::prelude::*;
25use foyer_common::{
26    error::{Error, Result},
27    spawn::Spawner,
28};
29use futures_core::future::BoxFuture;
30use futures_util::FutureExt;
31
32use crate::{
33    io::{
34        bytes::{IoB, IoBuf, IoBufMut, Raw},
35        device::Partition,
36        engine::{IoEngine, IoEngineBuildContext, IoEngineConfig, IoHandle},
37    },
38    RawFile,
39};
40
41#[derive(Debug)]
42struct FileHandle(ManuallyDrop<File>);
43
44#[cfg(target_family = "windows")]
45impl From<RawFile> for FileHandle {
46    fn from(raw: RawFile) -> Self {
47        use std::os::windows::io::FromRawHandle;
48        let file = unsafe { File::from_raw_handle(raw.0) };
49        let file = ManuallyDrop::new(file);
50        Self(file)
51    }
52}
53
54#[cfg(target_family = "unix")]
55impl From<RawFile> for FileHandle {
56    fn from(raw: RawFile) -> Self {
57        use std::os::unix::io::FromRawFd;
58        let file = unsafe { File::from_raw_fd(raw.0) };
59        let file = ManuallyDrop::new(file);
60        Self(file)
61    }
62}
63
64impl Deref for FileHandle {
65    type Target = File;
66
67    fn deref(&self) -> &Self::Target {
68        &self.0
69    }
70}
71
72impl DerefMut for FileHandle {
73    fn deref_mut(&mut self) -> &mut Self::Target {
74        &mut self.0
75    }
76}
77
78/// Config for synchronous I/O engine with pread(2)/pwrite(2).
79#[derive(Debug)]
80pub struct PsyncIoEngineConfig {
81    #[cfg(any(test, feature = "test_utils"))]
82    write_io_latency: Option<std::ops::Range<std::time::Duration>>,
83
84    #[cfg(any(test, feature = "test_utils"))]
85    read_io_latency: Option<std::ops::Range<std::time::Duration>>,
86}
87
88impl Default for PsyncIoEngineConfig {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl From<PsyncIoEngineConfig> for Box<dyn IoEngineConfig> {
95    fn from(builder: PsyncIoEngineConfig) -> Self {
96        builder.boxed()
97    }
98}
99
100impl PsyncIoEngineConfig {
101    /// Create a new synchronous I/O engine config with default configurations.
102    pub fn new() -> Self {
103        Self {
104            #[cfg(any(test, feature = "test_utils"))]
105            write_io_latency: None,
106            #[cfg(any(test, feature = "test_utils"))]
107            read_io_latency: None,
108        }
109    }
110
111    /// Set the simulated additional write I/O latency for testing purposes.
112    #[cfg(any(test, feature = "test_utils"))]
113    pub fn with_write_io_latency(mut self, latency: std::ops::Range<std::time::Duration>) -> Self {
114        self.write_io_latency = Some(latency);
115        self
116    }
117
118    /// Set the simulated additional read I/O latency for testing purposes.
119    #[cfg(any(test, feature = "test_utils"))]
120    pub fn with_read_io_latency(mut self, latency: std::ops::Range<std::time::Duration>) -> Self {
121        self.read_io_latency = Some(latency);
122        self
123    }
124}
125
126impl IoEngineConfig for PsyncIoEngineConfig {
127    fn build(self: Box<Self>, ctx: IoEngineBuildContext) -> BoxFuture<'static, Result<Arc<dyn IoEngine>>> {
128        async move {
129            let engine = PsyncIoEngine {
130                spawner: ctx.spawner,
131                #[cfg(any(test, feature = "test_utils"))]
132                write_io_latency: None,
133                #[cfg(any(test, feature = "test_utils"))]
134                read_io_latency: None,
135            };
136            let engine: Arc<dyn IoEngine> = Arc::new(engine);
137            Ok(engine)
138        }
139        .boxed()
140    }
141}
142
143/// The synchronous I/O engine that uses pread(2)/pwrite(2) and tokio thread pool for reading and writing.
144pub struct PsyncIoEngine {
145    spawner: Spawner,
146
147    #[cfg(any(test, feature = "test_utils"))]
148    write_io_latency: Option<std::ops::Range<std::time::Duration>>,
149    #[cfg(any(test, feature = "test_utils"))]
150    read_io_latency: Option<std::ops::Range<std::time::Duration>>,
151}
152
153impl Debug for PsyncIoEngine {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        f.debug_struct("PsyncIoEngine").finish()
156    }
157}
158
159impl IoEngine for PsyncIoEngine {
160    #[cfg_attr(
161        feature = "tracing",
162        fastrace::trace(name = "foyer::storage::io::engine::psync::read")
163    )]
164    fn read(&self, buf: Box<dyn IoBufMut>, partition: &dyn Partition, offset: u64) -> IoHandle {
165        let (raw, offset) = partition.translate(offset);
166        let file = FileHandle::from(raw);
167        let runtime = self.spawner.clone();
168
169        #[cfg(feature = "tracing")]
170        let span = Span::enter_with_local_parent("foyer::storage::io::engine::psync::read::io");
171
172        #[cfg(any(test, feature = "test_utils"))]
173        let read_io_latency = self.read_io_latency.clone();
174        async move {
175            let (buf, res) = match runtime
176                .spawn_blocking(move || {
177                    let (ptr, len) = buf.as_raw_parts();
178                    let slice = unsafe { std::slice::from_raw_parts_mut(ptr, len) };
179                    let res = {
180                        #[cfg(target_family = "windows")]
181                        {
182                            use std::os::windows::fs::FileExt;
183                            file.seek_read(slice, offset).map(|_| ()).map_err(Error::io_error)
184                        }
185                        #[cfg(target_family = "unix")]
186                        {
187                            use std::os::unix::fs::FileExt;
188                            file.read_exact_at(slice, offset).map_err(Error::io_error)
189                        }
190                    };
191                    #[cfg(any(test, feature = "test_utils"))]
192                    if let Some(lat) = read_io_latency {
193                        std::thread::sleep(rand::random_range(lat));
194                    }
195                    (buf, res)
196                })
197                .await
198            {
199                Ok((buf, res)) => {
200                    #[cfg(feature = "tracing")]
201                    drop(span);
202                    (buf, res)
203                }
204                Err(e) => return (Box::new(Raw::new(0)) as Box<dyn IoB>, Err(e)),
205            };
206            let buf: Box<dyn IoB> = buf.into_iob();
207            (buf, res)
208        }
209        .boxed()
210        .into()
211    }
212
213    #[cfg_attr(
214        feature = "tracing",
215        fastrace::trace(name = "foyer::storage::io::engine::psync::write")
216    )]
217    fn write(&self, buf: Box<dyn IoBuf>, partition: &dyn Partition, offset: u64) -> IoHandle {
218        let (raw, offset) = partition.translate(offset);
219        let file = FileHandle::from(raw);
220        let runtime = self.spawner.clone();
221
222        #[cfg(feature = "tracing")]
223        let span = Span::enter_with_local_parent("foyer::storage::io::engine::psync::write::io");
224
225        #[cfg(any(test, feature = "test_utils"))]
226        let write_io_latency = self.write_io_latency.clone();
227        async move {
228            let (buf, res) = match runtime
229                .spawn_blocking(move || {
230                    let (ptr, len) = buf.as_raw_parts();
231                    let slice = unsafe { std::slice::from_raw_parts(ptr, len) };
232                    let res = {
233                        #[cfg(target_family = "windows")]
234                        {
235                            use std::os::windows::fs::FileExt;
236                            file.seek_write(slice, offset).map(|_| ()).map_err(Error::io_error)
237                        }
238                        #[cfg(target_family = "unix")]
239                        {
240                            use std::os::unix::fs::FileExt;
241                            file.write_all_at(slice, offset).map_err(Error::io_error)
242                        }
243                    };
244                    #[cfg(any(test, feature = "test_utils"))]
245                    if let Some(lat) = write_io_latency {
246                        std::thread::sleep(rand::random_range(lat));
247                    }
248                    (buf, res)
249                })
250                .await
251            {
252                Ok((buf, res)) => {
253                    #[cfg(feature = "tracing")]
254                    drop(span);
255                    (buf, res)
256                }
257                Err(e) => return (Box::new(Raw::new(0)) as Box<dyn IoB>, Err(e)),
258            };
259            let buf: Box<dyn IoB> = buf.into_iob();
260            (buf, res)
261        }
262        .boxed()
263        .into()
264    }
265}