Skip to main content

limbo_core/io/
mod.rs

1use crate::Result;
2use bitflags::bitflags;
3use std::fmt;
4use std::sync::Arc;
5use std::{
6    cell::{Cell, Ref, RefCell, RefMut},
7    fmt::Debug,
8    mem::ManuallyDrop,
9    pin::Pin,
10    rc::Rc,
11};
12
13pub trait File: Send + Sync {
14    fn lock_file(&self, exclusive: bool) -> Result<()>;
15    fn unlock_file(&self) -> Result<()>;
16    fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()>;
17    fn pwrite(&self, pos: usize, buffer: Arc<RefCell<Buffer>>, c: Arc<Completion>) -> Result<()>;
18    fn sync(&self, c: Arc<Completion>) -> Result<()>;
19    fn size(&self) -> Result<u64>;
20    /// Truncate the file to `len` bytes. Used to reset the WAL to empty after a
21    /// checkpoint in Truncate/Restart mode.
22    fn truncate(&self, len: usize, c: Arc<Completion>) -> Result<()>;
23}
24
25#[derive(Debug, Copy, Clone, PartialEq)]
26pub struct OpenFlags(i32);
27
28bitflags! {
29    impl OpenFlags: i32 {
30        const None = 0b00000000;
31        const Create = 0b0000001;
32        const ReadOnly = 0b0000010;
33    }
34}
35
36impl Default for OpenFlags {
37    fn default() -> Self {
38        Self::Create
39    }
40}
41
42pub trait IO: Clock + Send + Sync {
43    fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Arc<dyn File>>;
44
45    fn run_once(&self) -> Result<()>;
46
47    fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()>;
48
49    fn generate_random_number(&self) -> i64;
50
51    fn get_memory_io(&self) -> Arc<MemoryIO>;
52}
53
54pub type Complete = dyn Fn(Arc<RefCell<Buffer>>);
55pub type WriteComplete = dyn Fn(i32);
56pub type SyncComplete = dyn Fn(i32);
57
58pub enum Completion {
59    Read(ReadCompletion),
60    Write(WriteCompletion),
61    Sync(SyncCompletion),
62}
63
64pub struct ReadCompletion {
65    pub buf: Arc<RefCell<Buffer>>,
66    pub complete: Box<Complete>,
67    pub is_completed: Cell<bool>,
68}
69
70impl Completion {
71    pub fn is_completed(&self) -> bool {
72        match self {
73            Self::Read(r) => r.is_completed.get(),
74            Self::Write(w) => w.is_completed.get(),
75            Self::Sync(s) => s.is_completed.get(),
76        }
77    }
78
79    pub fn complete(&self, result: i32) {
80        match self {
81            Self::Read(r) => r.complete(),
82            Self::Write(w) => w.complete(result),
83            Self::Sync(s) => s.complete(result), // fix
84        }
85    }
86
87    /// only call this method if you are sure that the completion is
88    /// a ReadCompletion, panics otherwise
89    pub fn as_read(&self) -> &ReadCompletion {
90        match self {
91            Self::Read(ref r) => r,
92            _ => unreachable!(),
93        }
94    }
95}
96
97pub struct WriteCompletion {
98    pub complete: Box<WriteComplete>,
99    pub is_completed: Cell<bool>,
100}
101
102pub struct SyncCompletion {
103    pub complete: Box<SyncComplete>,
104    pub is_completed: Cell<bool>,
105}
106
107impl ReadCompletion {
108    pub fn new(buf: Arc<RefCell<Buffer>>, complete: Box<Complete>) -> Self {
109        Self {
110            buf,
111            complete,
112            is_completed: Cell::new(false),
113        }
114    }
115
116    pub fn buf(&self) -> Ref<'_, Buffer> {
117        self.buf.borrow()
118    }
119
120    pub fn buf_mut(&self) -> RefMut<'_, Buffer> {
121        self.buf.borrow_mut()
122    }
123
124    pub fn complete(&self) {
125        (self.complete)(self.buf.clone());
126        self.is_completed.set(true);
127    }
128}
129
130impl WriteCompletion {
131    pub fn new(complete: Box<WriteComplete>) -> Self {
132        Self {
133            complete,
134            is_completed: Cell::new(false),
135        }
136    }
137
138    pub fn complete(&self, bytes_written: i32) {
139        (self.complete)(bytes_written);
140        self.is_completed.set(true);
141    }
142}
143
144impl SyncCompletion {
145    pub fn new(complete: Box<SyncComplete>) -> Self {
146        Self {
147            complete,
148            is_completed: Cell::new(false),
149        }
150    }
151
152    pub fn complete(&self, res: i32) {
153        (self.complete)(res);
154        self.is_completed.set(true);
155    }
156}
157
158pub type BufferData = Pin<Vec<u8>>;
159
160pub type BufferDropFn = Rc<dyn Fn(BufferData)>;
161
162#[derive(Clone)]
163pub struct Buffer {
164    data: ManuallyDrop<BufferData>,
165    drop: BufferDropFn,
166}
167
168impl Debug for Buffer {
169    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170        write!(f, "{:?}", self.data)
171    }
172}
173
174impl Drop for Buffer {
175    fn drop(&mut self) {
176        let data = unsafe { ManuallyDrop::take(&mut self.data) };
177        (self.drop)(data);
178    }
179}
180
181impl Buffer {
182    pub fn allocate(size: usize, drop: BufferDropFn) -> Self {
183        let data = ManuallyDrop::new(Pin::new(vec![0; size]));
184        Self { data, drop }
185    }
186
187    pub fn new(data: BufferData, drop: BufferDropFn) -> Self {
188        let data = ManuallyDrop::new(data);
189        Self { data, drop }
190    }
191
192    pub fn len(&self) -> usize {
193        self.data.len()
194    }
195
196    pub fn is_empty(&self) -> bool {
197        self.data.is_empty()
198    }
199
200    pub fn as_slice(&self) -> &[u8] {
201        &self.data
202    }
203
204    pub fn as_mut_slice(&mut self) -> &mut [u8] {
205        &mut self.data
206    }
207
208    pub fn as_ptr(&self) -> *const u8 {
209        self.data.as_ptr()
210    }
211
212    pub fn as_mut_ptr(&mut self) -> *mut u8 {
213        self.data.as_mut_ptr()
214    }
215}
216
217// ---- I/O backend selection -------------------------------------------------
218//
219// By DEFAULT the engine uses the pure, std-blocking `generic` backend
220// (`std::fs` pread/pwrite/sync + `getrandom`; no polling/rustix/libloading), so
221// the default dependency closure stays free of `-sys` / FFI crates.
222//
223// The off-by-default `native-io` feature re-enables the native epoll/kqueue
224// event-loop backend (`unix`, via polling + rustix) on Linux/macOS — a
225// throughput-oriented path that pulls platform `-sys` crates (e.g.
226// linux-raw-sys on Linux). `io_uring` implies `native-io`. Prefer the default
227// pure backend unless you specifically need the async event loop (perf
228// trade-off).
229
230// Linux io_uring backend (opt-in; `io_uring` implies `native-io`).
231#[cfg(all(target_os = "linux", feature = "io_uring"))]
232mod io_uring;
233#[cfg(all(target_os = "linux", feature = "io_uring", feature = "fs"))]
234pub use io_uring::UringIO;
235
236// Native epoll/kqueue event-loop backend — compiled only when `native-io`
237// is enabled (and on Linux/macOS).
238#[cfg(all(any(target_os = "linux", target_os = "macos"), feature = "native-io"))]
239mod unix;
240#[cfg(all(
241    any(target_os = "linux", target_os = "macos"),
242    feature = "native-io",
243    feature = "fs"
244))]
245pub use unix::UnixIO;
246#[cfg(all(any(target_os = "linux", target_os = "macos"), feature = "native-io"))]
247pub use unix::UnixIO as PlatformIO;
248#[cfg(all(any(target_os = "linux", target_os = "macos"), feature = "native-io"))]
249pub use PlatformIO as SyscallIO;
250
251// Windows native backend (unchanged).
252#[cfg(target_os = "windows")]
253mod windows;
254#[cfg(target_os = "windows")]
255pub use windows::WindowsIO as PlatformIO;
256#[cfg(target_os = "windows")]
257pub use PlatformIO as SyscallIO;
258
259// Pure std-blocking DEFAULT backend: used on every non-Windows target without
260// `native-io`, and on any target outside linux/macos/windows.
261#[cfg(any(
262    not(any(target_os = "linux", target_os = "macos", target_os = "windows")),
263    all(
264        any(target_os = "linux", target_os = "macos"),
265        not(feature = "native-io")
266    )
267))]
268mod generic;
269#[cfg(any(
270    not(any(target_os = "linux", target_os = "macos", target_os = "windows")),
271    all(
272        any(target_os = "linux", target_os = "macos"),
273        not(feature = "native-io")
274    )
275))]
276pub use generic::GenericIO as PlatformIO;
277#[cfg(any(
278    not(any(target_os = "linux", target_os = "macos", target_os = "windows")),
279    all(
280        any(target_os = "linux", target_os = "macos"),
281        not(feature = "native-io")
282    )
283))]
284pub use PlatformIO as SyscallIO;
285
286mod memory;
287#[cfg(feature = "fs")]
288mod vfs;
289pub use memory::MemoryIO;
290pub mod clock;
291// `common` (file-lock env knob + cross-process lock test helpers) is only used
292// by the native `unix` / `io_uring` backends, so it is gated with them to keep
293// the pure default build free of dead code.
294#[cfg(all(any(target_os = "linux", target_os = "macos"), feature = "native-io"))]
295mod common;
296pub use clock::Clock;